using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Mqtt.Controllers; using Mqtt.Hubs; using Mqtt.Models; using MQTTnet; using MQTTnet.Client; using MQTTnet.Server; using MySqlX.XDevAPI.Common; using Newtonsoft.Json.Linq; using Repository.BackendRepository.Interface; using Repository.Models; using Serilog.Core; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Timers; namespace Mqtt.Services { public class MqttClientService { private IMqttClient _mqttClient; private static readonly Dictionary _mqttClients = new Dictionary(); private readonly ILogger _logger; private readonly IBackendRepository _backendRepository; private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository; private readonly IConfiguration _configuration; private static List allAlarmSettings = new List(); public MqttClientService(ILogger logger, IConfiguration configuration, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository) { this._logger = logger; this._configuration = configuration; this._backendRepository = backendRepository; this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository; } public async Task StartAsync() { try { #region get alarmSettings await RefreshAlarmSettings(); #endregion #region devices subscribing await DeviceSubcribe(); #endregion } catch (Exception e) { _logger.LogError($"Mqtt devices subcribe error, message: {e.Message}"); } } private async Task ConnetMqtt (device_list device) { try { if (_mqttClients.ContainsKey(device.topic)) { _logger.LogWarning($"Device {device.topic} is already connected."); return; } string sql = string.Empty; var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer(new EDFunction().AESDecrypt(_configuration.GetSection("MQTT:server").Get() ?? "192.168.0.217"), int.Parse(new EDFunction().AESDecrypt(_configuration.GetSection("MQTT:port").Get()) ?? "1883")) .WithCleanSession() .Build(); var alarmSettings = allAlarmSettings.Where(x => x.main_id == device.main_id).ToList(); Timer messageTimer = null; int timeoutSeconds = 0; if (alarmSettings.Any() && alarmSettings.Any(x => x.factor == 1)) { timeoutSeconds = alarmSettings.FirstOrDefault(x => x.factor == 1).delay; messageTimer = new Timer(timeoutSeconds * 1000); // Convert seconds to milliseconds alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now; messageTimer.Elapsed += async (sender, args) => { // Perform your timeout logic here messageTimer.Stop(); // Stop the timer to prevent multiple triggers if ((alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime >= alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at || alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at == null) && alarmSettings.FirstOrDefault(x => x.factor == 1).state == "normal") { // Example: Log timeout sql = @$"insert into alarm_log (device_number, points, state, is_sentLine, is_sentEmail, created_at) select as1.device_number, as1.points, 'offnormal', 0, 0, now() from device_main dm join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 where dm.main_id = {device.main_id} and as1.id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id} and dm.deleted = 0"; await _backendRepository.ExecuteSql(sql); sql = $"update alarm_setting set state = 'offnormal', state_updated_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; await _backendRepository.ExecuteSql(sql); alarmSettings.FirstOrDefault(x => x.factor == 1).state = "offnormal"; alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at = DateTime.Now; _logger.LogInformation($"{device.topic} topic timer up, added alarm_log"); } alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now; messageTimer.Start(); // Restart the timer for the next cycle }; } _mqttClient.ApplicationMessageReceivedAsync += async e => { if (timeoutSeconds > 0) messageTimer.Stop(); // Reset timer on message received var message = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload); var payLoad = message; string value = string.Empty; JObject jsonObj = JObject.Parse(payLoad); #region update st point if (alarmSettings.Any(x => x.factor == 1)) { sql = $"update alarm_setting set state = 'normal', state_updated_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}"; await _backendRepository.ExecuteSql(sql); alarmSettings.FirstOrDefault(x => x.factor == 1).state = "normal"; alarmSettings.FirstOrDefault(x => x.factor == 1).state_updated_at = DateTime.Now; } #endregion #region check point foreach (var dlp in device.deviceListPoints) { int nLevel = 0; string[] astrKey = null; astrKey = dlp.pointOrg.Split('_'); if (astrKey != null) nLevel = astrKey.Count(); if (nLevel > 1) { switch (nLevel) { case 2: { string strKeyL0 = astrKey[0]; string strKeyL1 = astrKey[1]; value = (string)jsonObj[strKeyL0][strKeyL1]; } break; case 3: { string strKeyL0 = astrKey[0]; string strKeyL1 = astrKey[1]; string strKeyL2 = astrKey[2]; value = (string)jsonObj[strKeyL0][strKeyL1][strKeyL2]; } break; case 4: { string strKeyL0 = astrKey[0]; string strKeyL1 = astrKey[1]; string strKeyL2 = astrKey[2]; string strKeyL3 = astrKey[3]; value = (string)jsonObj[strKeyL0][strKeyL1][strKeyL2][strKeyL3]; } break; } } else { value = (string)jsonObj[dlp.pointOrg]; } if (value != null && alarmSettings.Any(x => x.main_id == device.main_id && x.points == dlp.PointSys)) { var alarmSetting = alarmSettings.FirstOrDefault(x => x.main_id == device.main_id && x.points == dlp.PointSys); if (alarmSetting != null) { var chkResult = false; if (alarmSetting.state == "normal") { //factor 2 if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result) && (result > alarmSetting.highLimit || result > alarmSetting.highDelay || result < alarmSetting.lowLimit || result < alarmSetting.lowDelay)) { chkResult = true; } //factor 3 if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value.ToLower() == alarmSetting.alarm_value.ToLower()) { chkResult = true; } //factor 4 if (alarmSetting.factor == 4 && !string.IsNullOrWhiteSpace(value)) { chkResult = true; } } else if (alarmSetting.state == "offnormal") { //factor 2 if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result) && (result <= alarmSetting.highLimit && result <= alarmSetting.highDelay && result >= alarmSetting.lowLimit && result >= alarmSetting.lowDelay)) { chkResult = true; } //factor 3 if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value.ToLower() != alarmSetting.alarm_value.ToLower()) { chkResult = true; } //factor 4 if (alarmSetting.factor == 4 && string.IsNullOrWhiteSpace(value)) { chkResult = true; } } if (chkResult) { alarmSetting.state = alarmSetting.state == "normal" ? "offnormal" : "normal"; sql = $@"insert into alarm_log (device_number, points, state, is_sentLine, is_sentEmail, created_at) select as1.device_number, as1.points, '{alarmSetting.state}', 0, 0, now() from device_main dm join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0 where dm.main_id = {device.main_id} and as1.id = {alarmSetting.id} and dm.deleted = 0"; await _backendRepository.ExecuteSql(sql); sql = $"update alarm_setting set state = '{alarmSetting.state}', state_updated_at = now() where id = {alarmSetting.id}"; await _backendRepository.ExecuteSql(sql); _logger.LogInformation($"{device.topic} topic added alarm_log"); } } } } #endregion //Console.WriteLine($"{value}"); //Console.WriteLine($"Message received: {message}"); // Broadcast to SignalR clients //await _hubContext.Clients.All.SendAsync("ReceiveMessage", e.ApplicationMessage.Topic, message); // Restart the timer for the next message if (timeoutSeconds > 0) messageTimer.Start(); }; // Connect to the broker await _mqttClient.ConnectAsync(options); await _mqttClient.SubscribeAsync(device.topic); _mqttClients[device.topic] = _mqttClient; _logger.LogInformation($"{device.topic} subscribed"); // Start the timer when subscription is successful if (timeoutSeconds > 0) messageTimer.Start(); } catch (Exception e) { _logger.LogError($"{device.topic} subscribe failed, message: {e.Message}"); } } private async Task CloseMqtt() { try { if (_mqttClients.Count > 0) { foreach (var (deviceId, mqttClient) in _mqttClients.ToList()) { await mqttClient.DisconnectAsync(); //_logger.LogInformation($"Device {deviceId} disconnected."); } _mqttClients.Clear(); _logger.LogInformation("All MQTT connections closed."); await DeviceSubcribe(); } } catch (Exception e) { _logger.LogInformation($"Close Mqtt error, message: {e.Message}"); } } public async Task> GetAlarmSettings() { return allAlarmSettings; } public async Task DeviceSubcribe() { try { _logger.LogInformation("MQTT devices subscribing..."); string sql = "select distinct topic, \"tagIoT\", main_id from device_list where trim(topic) != 'null' and topic is not null and TRIM(topic) != ''"; var devices = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); if (devices.Any()) { foreach (var d in devices) { sql = $"select \"PointOrg\", \"PointSys\" from \"IoTPointMap\" where \"TagIoT\" = '{d.tagIot}'"; d.deviceListPoints = new List(); d.deviceListPoints = await _backgroundServicePostgresqlRepository.GetAllAsync(sql); if (d.deviceListPoints.Any()) { await ConnetMqtt(d); } } } _logger.LogInformation("MQTT devices subscribed end"); } catch (Exception e) { throw e; } } public async Task RefreshAlarmSettings() { try { _logger.LogInformation("Get Alarm Settings..."); string sql = @"select dm.main_id, as1.* from alarm_setting as1 join device_main dm on as1.device_number = dm.device_number and dm.deleted = 0 where as1.deleted = 0"; allAlarmSettings = await _backendRepository.GetAllAsync(sql); await CloseMqtt(); _logger.LogInformation("Done Alarm Settings..."); } catch (Exception e) { _logger.LogError($"get alarm settings error, message: {e.Message}"); throw e; } } public async Task PublishAsync(string topic, string payload) { if (_mqttClients.Count > 0 && _mqttClients.ContainsKey(topic)) { var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) //.WithExactlyOnceQoS() .WithRetainFlag(false) .Build(); await _mqttClients[topic].PublishAsync(message); _logger.LogInformation($"{topic} published message: {payload}"); } } } }