From 42125fd97eb8ea84eabff23e4467cf1c8c711914 Mon Sep 17 00:00:00 2001 From: dev02 Date: Wed, 12 Feb 2025 09:03:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Mqtt/Controllers/AlarmController.cs | 12 +- Mqtt/Models/Alarm.cs | 2 +- Mqtt/Services/MqttClientService.cs | 213 ++++++++++++++++------------ 3 files changed, 133 insertions(+), 94 deletions(-) diff --git a/Mqtt/Controllers/AlarmController.cs b/Mqtt/Controllers/AlarmController.cs index cf0128e..0cb0ce4 100644 --- a/Mqtt/Controllers/AlarmController.cs +++ b/Mqtt/Controllers/AlarmController.cs @@ -1,5 +1,6 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Mqtt.Hubs; using Mqtt.Models; @@ -18,13 +19,16 @@ namespace Mqtt.Controllers { Dictionary dic = new Dictionary(); - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly IBackendRepository _backendRepository; private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository; + private readonly IConfiguration _configuration; - public AlarmController(ILogger logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository) + public AlarmController(ILogger logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository, + IConfiguration configuration) { this._logger = logger; + this._configuration = configuration; this._backendRepository = backendRepository; this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository; } @@ -32,7 +36,7 @@ namespace Mqtt.Controllers [HttpGet] public async Task AlarmList() { - dic.Add("data", await new MqttClientService(_logger, _backgroundServicePostgresqlRepository).GetAlarmSettings()); + dic.Add("data", await new MqttClientService(_logger, _configuration, _backgroundServicePostgresqlRepository, _backendRepository).GetAlarmSettings()); return new OkObjectResult(dic); } @@ -41,7 +45,7 @@ namespace Mqtt.Controllers { try { - await new MqttClientService(_logger, _backgroundServicePostgresqlRepository).RefreshAlarmSettings(); + await new MqttClientService(_logger, _configuration, _backgroundServicePostgresqlRepository, _backendRepository).RefreshAlarmSettings(); dic.Add("ok", true); dic.Add("msg", "重新設定成功"); } diff --git a/Mqtt/Models/Alarm.cs b/Mqtt/Models/Alarm.cs index 3f5f797..ad99d98 100644 --- a/Mqtt/Models/Alarm.cs +++ b/Mqtt/Models/Alarm.cs @@ -22,7 +22,7 @@ namespace Mqtt.Models public string created_by { get; set; } public DateTime created_at { get; set; } public int delay { get; set; } - public DateTime state_update_at { get; set; } + public DateTime state_updated_at { get; set; } public DateTime ChkStateTime { get; set; } } } diff --git a/Mqtt/Services/MqttClientService.cs b/Mqtt/Services/MqttClientService.cs index b410be6..8c93f31 100644 --- a/Mqtt/Services/MqttClientService.cs +++ b/Mqtt/Services/MqttClientService.cs @@ -6,6 +6,7 @@ using Mqtt.Hubs; using Mqtt.Models; using MQTTnet; using MQTTnet.Client; +using MQTTnet.Server; using MySqlX.XDevAPI.Common; using Newtonsoft.Json.Linq; using Repository.BackendRepository.Interface; @@ -23,30 +24,22 @@ namespace Mqtt.Services public class MqttClientService { private IMqttClient _mqttClient; - private readonly IHubContext _hubContext; + private static readonly Dictionary _mqttClients = new Dictionary(); private readonly ILogger _logger; - private readonly ILogger _loggerControll; private readonly IBackendRepository _backendRepository; private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository; private readonly IConfiguration _configuration; private static List allAlarmSettings = new List(); - public MqttClientService(IHubContext hubContext, ILogger logger, IConfiguration configuration, + public MqttClientService(ILogger logger, IConfiguration configuration, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository) { - this._hubContext = hubContext; this._logger = logger; this._configuration = configuration; this._backendRepository = backendRepository; this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository; } - public MqttClientService(ILogger logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository) - { - this._loggerControll = logger; - this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository; - } - public async Task StartAsync() { try @@ -56,26 +49,7 @@ namespace Mqtt.Services #endregion #region devices subscribing - _logger.LogInformation("MQTT devices subscribing..."); - string sql = "select distinct topic, \"tagIoT\", main_id from device_list where trim(topic) != 'null' and topic is not null and TRIM(topic) != ''"; - var devices = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); - - if (devices.Any()) - { - foreach (var d in devices) - { - sql = $"select \"PointOrg\" from \"IoTPointMap\" where \"TagIoT\" = '{d.tagIot}'"; - d.deviceListPoints = new List(); - d.deviceListPoints = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); - - if (d.deviceListPoints.Any()) - { - await ConnetMqtt(d); - } - } - } - - _logger.LogInformation("MQTT devices subscribed end"); + await DeviceSubcribe(); #endregion } catch (Exception e) @@ -88,6 +62,12 @@ namespace Mqtt.Services { try { + if (_mqttClients.ContainsKey(device.topic)) + { + _logger.LogWarning($"Device {device.topic} is already connected."); + return; + } + string sql = string.Empty; var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); @@ -112,28 +92,28 @@ namespace Mqtt.Services // Perform your timeout logic here messageTimer.Stop(); // Stop the timer to prevent multiple triggers - if ((alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime >= alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at - || alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at == null) + if ((alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime >= alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at + || alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at == null) && alarmSettings.FirstOrDefault(x => x.factor == 1).state == "normal") { // Example: Log timeout - sql = @$"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at) - select uuid(), as1.device_number, as1.points, 'offnormal', 0, 0, now(), now() + sql = @$"insert into alarm_log (device_number, points, state, is_sentLine, is_sentEmail, created_at) + select as1.device_number, as1.points, 'offnormal', 0, 0, now() from device_main dm join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 - where dm.main_id = {device.main_id} and dm.deleted = 0"; + where dm.main_id = {device.main_id} and as1.id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id} and dm.deleted = 0"; await _backendRepository.ExecuteSql(sql); - sql = $"update alarm_setting set state = 'offnormal', state_update_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; + sql = $"update alarm_setting set state = 'offnormal', state_updated_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; await _backendRepository.ExecuteSql(sql); alarmSettings.FirstOrDefault(x => x.factor == 1).state = "offnormal"; - alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at = DateTime.Now; - } + alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at = DateTime.Now; - _logger.LogInformation($"{device.topic} topic timer up, added alarm_log"); + _logger.LogInformation($"{device.topic} topic timer up, added alarm_log"); + } alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now; @@ -154,12 +134,12 @@ namespace Mqtt.Services #region update st point if (alarmSettings.Any(x => x.factor == 1)) { - sql = $"update alarm_setting set state = 'normal', state_update_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; + sql = $"update alarm_setting set state = 'normal', state_updated_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; await _backendRepository.ExecuteSql(sql); alarmSettings.FirstOrDefault(x => x.factor == 1).state = "normal"; - alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at = DateTime.Now; + alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at = DateTime.Now; } #endregion @@ -214,64 +194,70 @@ namespace Mqtt.Services if (alarmSetting != null) { - var offnormalResult = false; + var chkResult = false; + if (alarmSetting.state == "normal") { + //factor 2 if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result) && (result > alarmSetting.highLimit || result > alarmSetting.highDelay || result < alarmSetting.lowLimit || result < alarmSetting.lowDelay)) { - offnormalResult = true; - } - if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value == alarmSetting.alarm_value) - { - offnormalResult = true; + chkResult = true; } - if (offnormalResult) + //factor 3 + if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value.ToLower() == alarmSetting.alarm_value.ToLower()) { - sql = $@"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at) - select uuid(), as1.device_number, as1.points, 'offnormal', 0, 0, now(), now() - from device_main dm - join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 - where dm.main_id = {device.main_id} - and dm.deleted = 0"; + chkResult = true; + } - await _backendRepository.ExecuteSql(sql); - - sql = $"update alarm_setting set state = 'offnormal', state_update_at = now() where id = {alarmSetting.id}"; - - await _backendRepository.ExecuteSql(sql); + //factor 4 + if (alarmSetting.factor == 4 && !string.IsNullOrWhiteSpace(value)) + { + chkResult = true; } } else if (alarmSetting.state == "offnormal") { - var normalResult = false; + //factor 2 if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result) && (result <= alarmSetting.highLimit && result <= alarmSetting.highDelay && result >= alarmSetting.lowLimit && result >= alarmSetting.lowDelay)) { - normalResult = true; + chkResult = true; } - //continue factor 3 - if (normalResult) + //factor 3 + if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value.ToLower() != alarmSetting.alarm_value.ToLower()) { - sql = $@"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at) - select uuid(), as1.device_number, as1.points, 'normal', 0, 0, now(), now() - from device_main dm - join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 - where dm.main_id = {device.main_id} - and dm.deleted = 0"; + chkResult = true; + } - await _backendRepository.ExecuteSql(sql); - - sql = $"update alarm_setting set state = 'normal', state_update_at = now() where id = {alarmSetting.id}"; - - await _backendRepository.ExecuteSql(sql); + //factor 4 + if (alarmSetting.factor == 4 && string.IsNullOrWhiteSpace(value)) + { + chkResult = true; } } - } - _logger.LogInformation($"{device.topic} topic added alarm_log"); + if (chkResult) + { + alarmSetting.state = alarmSetting.state == "normal" ? "offnormal" : "normal"; + + sql = $@"insert into alarm_log (device_number, points, state, is_sentLine, is_sentEmail, created_at) + select as1.device_number, as1.points, '{alarmSetting.state}', 0, 0, now() + from device_main dm + join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 + where dm.main_id = {device.main_id} and as1.id = {alarmSetting.id} and dm.deleted = 0"; + + await _backendRepository.ExecuteSql(sql); + + sql = $"update alarm_setting set state = '{alarmSetting.state}', state_updated_at = now() where id = {alarmSetting.id}"; + + await _backendRepository.ExecuteSql(sql); + + _logger.LogInformation($"{device.topic} topic added alarm_log"); + } + } } } #endregion @@ -290,6 +276,7 @@ namespace Mqtt.Services // Connect to the broker await _mqttClient.ConnectAsync(options); await _mqttClient.SubscribeAsync(device.topic); + _mqttClients[device.topic] = _mqttClient; _logger.LogInformation($"{device.topic} subscribed"); @@ -303,38 +290,86 @@ namespace Mqtt.Services } } + private async Task CloseMqtt() + { + try + { + if (_mqttClients.Count > 0) + { + foreach (var (deviceId, mqttClient) in _mqttClients.ToList()) + { + await mqttClient.DisconnectAsync(); + //_logger.LogInformation($"Device {deviceId} disconnected."); + } + + _mqttClients.Clear(); + _logger.LogInformation("All MQTT connections closed."); + + await DeviceSubcribe(); + } + } + catch (Exception e) + { + _logger.LogInformation($"Close Mqtt error, message: {e.Message}"); + } + } + public async Task> GetAlarmSettings() { return allAlarmSettings; } + public async Task DeviceSubcribe() + { + try + { + _logger.LogInformation("MQTT devices subscribing..."); + string sql = "select distinct topic, \"tagIoT\", main_id from device_list where trim(topic) != 'null' and topic is not null and TRIM(topic) != ''"; + var devices = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); + + if (devices.Any()) + { + foreach (var d in devices) + { + sql = $"select \"PointOrg\", \"PointSys\" from \"IoTPointMap\" where \"TagIoT\" = '{d.tagIot}'"; + d.deviceListPoints = new List(); + d.deviceListPoints = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); + + if (d.deviceListPoints.Any()) + { + await ConnetMqtt(d); + } + } + } + + _logger.LogInformation("MQTT devices subscribed end"); + } + catch (Exception e) + { + throw e; + } + } + public async Task RefreshAlarmSettings() { try { - if (_loggerControll != null) - _loggerControll.LogInformation("Get Alarm Settings..."); - else if (_logger != null) - _logger.LogInformation("Get Alarm Settings..."); + _logger.LogInformation("Get Alarm Settings..."); string sql = @"select dm.main_id, as1.* from alarm_setting as1 join device_main dm on as1.device_number = dm.device_number and dm.deleted = 0 where as1.deleted = 0"; - allAlarmSettings = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); + allAlarmSettings = await _backendRepository.GetAllAsync(sql); - if (_loggerControll != null) - _loggerControll.LogInformation("Done Alarm Settings..."); - else if (_logger != null) - _logger.LogInformation("Done Alarm Settings..."); + await CloseMqtt(); + + _logger.LogInformation("Done Alarm Settings..."); } catch (Exception e) { - if (_loggerControll != null) - _loggerControll.LogError($"get alarm settings error, message: {e.Message}"); - else if (_logger != null) - _logger.LogError($"get alarm settings error, message: {e.Message}"); + _logger.LogError($"get alarm settings error, message: {e.Message}"); throw e; }