完成mqtt

This commit is contained in:
dev02 2025-02-12 09:03:23 +08:00
parent 9adf6880a3
commit 42125fd97e
3 changed files with 133 additions and 94 deletions

View File

@ -1,5 +1,6 @@
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Mqtt.Hubs; using Mqtt.Hubs;
using Mqtt.Models; using Mqtt.Models;
@ -18,13 +19,16 @@ namespace Mqtt.Controllers
{ {
Dictionary<string, object> dic = new Dictionary<string, object>(); Dictionary<string, object> dic = new Dictionary<string, object>();
private readonly ILogger<AlarmController> _logger; private readonly ILogger<MqttClientService> _logger;
private readonly IBackendRepository _backendRepository; private readonly IBackendRepository _backendRepository;
private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository; private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository;
private readonly IConfiguration _configuration;
public AlarmController(ILogger<AlarmController> logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository) public AlarmController(ILogger<MqttClientService> logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository,
IConfiguration configuration)
{ {
this._logger = logger; this._logger = logger;
this._configuration = configuration;
this._backendRepository = backendRepository; this._backendRepository = backendRepository;
this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository; this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository;
} }
@ -32,7 +36,7 @@ namespace Mqtt.Controllers
[HttpGet] [HttpGet]
public async Task<IActionResult> AlarmList() public async Task<IActionResult> AlarmList()
{ {
dic.Add("data", await new MqttClientService(_logger, _backgroundServicePostgresqlRepository).GetAlarmSettings()); dic.Add("data", await new MqttClientService(_logger, _configuration, _backgroundServicePostgresqlRepository, _backendRepository).GetAlarmSettings());
return new OkObjectResult(dic); return new OkObjectResult(dic);
} }
@ -41,7 +45,7 @@ namespace Mqtt.Controllers
{ {
try try
{ {
await new MqttClientService(_logger, _backgroundServicePostgresqlRepository).RefreshAlarmSettings(); await new MqttClientService(_logger, _configuration, _backgroundServicePostgresqlRepository, _backendRepository).RefreshAlarmSettings();
dic.Add("ok", true); dic.Add("ok", true);
dic.Add("msg", "重新設定成功"); dic.Add("msg", "重新設定成功");
} }

View File

@ -22,7 +22,7 @@ namespace Mqtt.Models
public string created_by { get; set; } public string created_by { get; set; }
public DateTime created_at { get; set; } public DateTime created_at { get; set; }
public int delay { get; set; } public int delay { get; set; }
public DateTime state_update_at { get; set; } public DateTime state_updated_at { get; set; }
public DateTime ChkStateTime { get; set; } public DateTime ChkStateTime { get; set; }
} }
} }

View File

@ -6,6 +6,7 @@ using Mqtt.Hubs;
using Mqtt.Models; using Mqtt.Models;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Server;
using MySqlX.XDevAPI.Common; using MySqlX.XDevAPI.Common;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Repository.BackendRepository.Interface; using Repository.BackendRepository.Interface;
@ -23,30 +24,22 @@ namespace Mqtt.Services
public class MqttClientService public class MqttClientService
{ {
private IMqttClient _mqttClient; private IMqttClient _mqttClient;
private readonly IHubContext<MqttHub> _hubContext; private static readonly Dictionary<string, IMqttClient> _mqttClients = new Dictionary<string, IMqttClient>();
private readonly ILogger<MqttClientService> _logger; private readonly ILogger<MqttClientService> _logger;
private readonly ILogger<AlarmController> _loggerControll;
private readonly IBackendRepository _backendRepository; private readonly IBackendRepository _backendRepository;
private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository; private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository;
private readonly IConfiguration _configuration; private readonly IConfiguration _configuration;
private static List<alarm_setting> allAlarmSettings = new List<alarm_setting>(); private static List<alarm_setting> allAlarmSettings = new List<alarm_setting>();
public MqttClientService(IHubContext<MqttHub> hubContext, ILogger<MqttClientService> logger, IConfiguration configuration, public MqttClientService(ILogger<MqttClientService> logger, IConfiguration configuration,
IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository) IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository)
{ {
this._hubContext = hubContext;
this._logger = logger; this._logger = logger;
this._configuration = configuration; this._configuration = configuration;
this._backendRepository = backendRepository; this._backendRepository = backendRepository;
this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository; this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository;
} }
public MqttClientService(ILogger<AlarmController> logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository)
{
this._loggerControll = logger;
this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository;
}
public async Task StartAsync() public async Task StartAsync()
{ {
try try
@ -56,26 +49,7 @@ namespace Mqtt.Services
#endregion #endregion
#region devices subscribing #region devices subscribing
_logger.LogInformation("MQTT devices subscribing..."); await DeviceSubcribe();
string sql = "select distinct topic, \"tagIoT\", main_id from device_list where trim(topic) != 'null' and topic is not null and TRIM(topic) != ''";
var devices = await _backgroundServicePostgresqlRepository.GetAllAsync<device_list>(sql);
if (devices.Any())
{
foreach (var d in devices)
{
sql = $"select \"PointOrg\" from \"IoTPointMap\" where \"TagIoT\" = '{d.tagIot}'";
d.deviceListPoints = new List<device_list_point>();
d.deviceListPoints = await _backgroundServicePostgresqlRepository.GetAllAsync<device_list_point>(sql);
if (d.deviceListPoints.Any())
{
await ConnetMqtt(d);
}
}
}
_logger.LogInformation("MQTT devices subscribed end");
#endregion #endregion
} }
catch (Exception e) catch (Exception e)
@ -88,6 +62,12 @@ namespace Mqtt.Services
{ {
try try
{ {
if (_mqttClients.ContainsKey(device.topic))
{
_logger.LogWarning($"Device {device.topic} is already connected.");
return;
}
string sql = string.Empty; string sql = string.Empty;
var factory = new MqttFactory(); var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient(); _mqttClient = factory.CreateMqttClient();
@ -112,28 +92,28 @@ namespace Mqtt.Services
// Perform your timeout logic here // Perform your timeout logic here
messageTimer.Stop(); // Stop the timer to prevent multiple triggers messageTimer.Stop(); // Stop the timer to prevent multiple triggers
if ((alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime >= alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at if ((alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime >= alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at
|| alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at == null) || alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at == null)
&& alarmSettings.FirstOrDefault(x => x.factor == 1).state == "normal") && alarmSettings.FirstOrDefault(x => x.factor == 1).state == "normal")
{ {
// Example: Log timeout // Example: Log timeout
sql = @$"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at) sql = @$"insert into alarm_log (device_number, points, state, is_sentLine, is_sentEmail, created_at)
select uuid(), as1.device_number, as1.points, 'offnormal', 0, 0, now(), now() select as1.device_number, as1.points, 'offnormal', 0, 0, now()
from device_main dm from device_main dm
join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0
where dm.main_id = {device.main_id} and dm.deleted = 0"; where dm.main_id = {device.main_id} and as1.id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id} and dm.deleted = 0";
await _backendRepository.ExecuteSql(sql); await _backendRepository.ExecuteSql(sql);
sql = $"update alarm_setting set state = 'offnormal', state_update_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; sql = $"update alarm_setting set state = 'offnormal', state_updated_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}";
await _backendRepository.ExecuteSql(sql); await _backendRepository.ExecuteSql(sql);
alarmSettings.FirstOrDefault(x => x.factor == 1).state = "offnormal"; alarmSettings.FirstOrDefault(x => x.factor == 1).state = "offnormal";
alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at = DateTime.Now; alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at = DateTime.Now;
}
_logger.LogInformation($"{device.topic} topic timer up, added alarm_log"); _logger.LogInformation($"{device.topic} topic timer up, added alarm_log");
}
alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now; alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now;
@ -154,12 +134,12 @@ namespace Mqtt.Services
#region update st point #region update st point
if (alarmSettings.Any(x => x.factor == 1)) if (alarmSettings.Any(x => x.factor == 1))
{ {
sql = $"update alarm_setting set state = 'normal', state_update_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; sql = $"update alarm_setting set state = 'normal', state_updated_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}";
await _backendRepository.ExecuteSql(sql); await _backendRepository.ExecuteSql(sql);
alarmSettings.FirstOrDefault(x => x.factor == 1).state = "normal"; alarmSettings.FirstOrDefault(x => x.factor == 1).state = "normal";
alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at = DateTime.Now; alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at = DateTime.Now;
} }
#endregion #endregion
@ -214,66 +194,72 @@ namespace Mqtt.Services
if (alarmSetting != null) if (alarmSetting != null)
{ {
var offnormalResult = false; var chkResult = false;
if (alarmSetting.state == "normal") if (alarmSetting.state == "normal")
{ {
//factor 2
if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result) if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result)
&& (result > alarmSetting.highLimit || result > alarmSetting.highDelay || result < alarmSetting.lowLimit || result < alarmSetting.lowDelay)) && (result > alarmSetting.highLimit || result > alarmSetting.highDelay || result < alarmSetting.lowLimit || result < alarmSetting.lowDelay))
{ {
offnormalResult = true; chkResult = true;
}
if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value == alarmSetting.alarm_value)
{
offnormalResult = true;
} }
if (offnormalResult) //factor 3
if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value.ToLower() == alarmSetting.alarm_value.ToLower())
{ {
sql = $@"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at) chkResult = true;
select uuid(), as1.device_number, as1.points, 'offnormal', 0, 0, now(), now() }
from device_main dm
join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0
where dm.main_id = {device.main_id}
and dm.deleted = 0";
await _backendRepository.ExecuteSql(sql); //factor 4
if (alarmSetting.factor == 4 && !string.IsNullOrWhiteSpace(value))
sql = $"update alarm_setting set state = 'offnormal', state_update_at = now() where id = {alarmSetting.id}"; {
chkResult = true;
await _backendRepository.ExecuteSql(sql);
} }
} }
else if (alarmSetting.state == "offnormal") else if (alarmSetting.state == "offnormal")
{ {
var normalResult = false; //factor 2
if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result) if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result)
&& (result <= alarmSetting.highLimit && result <= alarmSetting.highDelay && result >= alarmSetting.lowLimit && result >= alarmSetting.lowDelay)) && (result <= alarmSetting.highLimit && result <= alarmSetting.highDelay && result >= alarmSetting.lowLimit && result >= alarmSetting.lowDelay))
{ {
normalResult = true; chkResult = true;
} }
//continue factor 3
if (normalResult) //factor 3
if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value.ToLower() != alarmSetting.alarm_value.ToLower())
{ {
sql = $@"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at) chkResult = true;
select uuid(), as1.device_number, as1.points, 'normal', 0, 0, now(), now() }
//factor 4
if (alarmSetting.factor == 4 && string.IsNullOrWhiteSpace(value))
{
chkResult = true;
}
}
if (chkResult)
{
alarmSetting.state = alarmSetting.state == "normal" ? "offnormal" : "normal";
sql = $@"insert into alarm_log (device_number, points, state, is_sentLine, is_sentEmail, created_at)
select as1.device_number, as1.points, '{alarmSetting.state}', 0, 0, now()
from device_main dm from device_main dm
join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0
where dm.main_id = {device.main_id} where dm.main_id = {device.main_id} and as1.id = {alarmSetting.id} and dm.deleted = 0";
and dm.deleted = 0";
await _backendRepository.ExecuteSql(sql); await _backendRepository.ExecuteSql(sql);
sql = $"update alarm_setting set state = 'normal', state_update_at = now() where id = {alarmSetting.id}"; sql = $"update alarm_setting set state = '{alarmSetting.state}', state_updated_at = now() where id = {alarmSetting.id}";
await _backendRepository.ExecuteSql(sql); await _backendRepository.ExecuteSql(sql);
}
}
}
_logger.LogInformation($"{device.topic} topic added alarm_log"); _logger.LogInformation($"{device.topic} topic added alarm_log");
} }
} }
}
}
#endregion #endregion
//Console.WriteLine($"{value}"); //Console.WriteLine($"{value}");
@ -290,6 +276,7 @@ namespace Mqtt.Services
// Connect to the broker // Connect to the broker
await _mqttClient.ConnectAsync(options); await _mqttClient.ConnectAsync(options);
await _mqttClient.SubscribeAsync(device.topic); await _mqttClient.SubscribeAsync(device.topic);
_mqttClients[device.topic] = _mqttClient;
_logger.LogInformation($"{device.topic} subscribed"); _logger.LogInformation($"{device.topic} subscribed");
@ -303,18 +290,70 @@ namespace Mqtt.Services
} }
} }
private async Task CloseMqtt()
{
try
{
if (_mqttClients.Count > 0)
{
foreach (var (deviceId, mqttClient) in _mqttClients.ToList())
{
await mqttClient.DisconnectAsync();
//_logger.LogInformation($"Device {deviceId} disconnected.");
}
_mqttClients.Clear();
_logger.LogInformation("All MQTT connections closed.");
await DeviceSubcribe();
}
}
catch (Exception e)
{
_logger.LogInformation($"Close Mqtt error, message: {e.Message}");
}
}
public async Task<List<alarm_setting>> GetAlarmSettings() public async Task<List<alarm_setting>> GetAlarmSettings()
{ {
return allAlarmSettings; return allAlarmSettings;
} }
public async Task DeviceSubcribe()
{
try
{
_logger.LogInformation("MQTT devices subscribing...");
string sql = "select distinct topic, \"tagIoT\", main_id from device_list where trim(topic) != 'null' and topic is not null and TRIM(topic) != ''";
var devices = await _backgroundServicePostgresqlRepository.GetAllAsync<device_list>(sql);
if (devices.Any())
{
foreach (var d in devices)
{
sql = $"select \"PointOrg\", \"PointSys\" from \"IoTPointMap\" where \"TagIoT\" = '{d.tagIot}'";
d.deviceListPoints = new List<device_list_point>();
d.deviceListPoints = await _backgroundServicePostgresqlRepository.GetAllAsync<device_list_point>(sql);
if (d.deviceListPoints.Any())
{
await ConnetMqtt(d);
}
}
}
_logger.LogInformation("MQTT devices subscribed end");
}
catch (Exception e)
{
throw e;
}
}
public async Task RefreshAlarmSettings() public async Task RefreshAlarmSettings()
{ {
try try
{ {
if (_loggerControll != null)
_loggerControll.LogInformation("Get Alarm Settings...");
else if (_logger != null)
_logger.LogInformation("Get Alarm Settings..."); _logger.LogInformation("Get Alarm Settings...");
string sql = @"select dm.main_id, as1.* string sql = @"select dm.main_id, as1.*
@ -322,18 +361,14 @@ namespace Mqtt.Services
join device_main dm on as1.device_number = dm.device_number and dm.deleted = 0 join device_main dm on as1.device_number = dm.device_number and dm.deleted = 0
where as1.deleted = 0"; where as1.deleted = 0";
allAlarmSettings = await _backgroundServicePostgresqlRepository.GetAllAsync<alarm_setting>(sql); allAlarmSettings = await _backendRepository.GetAllAsync<alarm_setting>(sql);
await CloseMqtt();
if (_loggerControll != null)
_loggerControll.LogInformation("Done Alarm Settings...");
else if (_logger != null)
_logger.LogInformation("Done Alarm Settings..."); _logger.LogInformation("Done Alarm Settings...");
} }
catch (Exception e) catch (Exception e)
{ {
if (_loggerControll != null)
_loggerControll.LogError($"get alarm settings error, message: {e.Message}");
else if (_logger != null)
_logger.LogError($"get alarm settings error, message: {e.Message}"); _logger.LogError($"get alarm settings error, message: {e.Message}");
throw e; throw e;