359 lines
17 KiB
C#
359 lines
17 KiB
C#
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.Logging;
|
|
using Mqtt.Controllers;
|
|
using Mqtt.Hubs;
|
|
using Mqtt.Models;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using MySqlX.XDevAPI.Common;
|
|
using Newtonsoft.Json.Linq;
|
|
using Repository.BackendRepository.Interface;
|
|
using Repository.Models;
|
|
using Serilog.Core;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using System.Timers;
|
|
|
|
namespace Mqtt.Services
|
|
{
|
|
public class MqttClientService
|
|
{
|
|
private IMqttClient _mqttClient;
|
|
private readonly IHubContext<MqttHub> _hubContext;
|
|
private readonly ILogger<MqttClientService> _logger;
|
|
private readonly ILogger<AlarmController> _loggerControll;
|
|
private readonly IBackendRepository _backendRepository;
|
|
private readonly IBackgroundServicePostgresqlRepository _backgroundServicePostgresqlRepository;
|
|
private readonly IConfiguration _configuration;
|
|
private static List<alarm_setting> allAlarmSettings = new List<alarm_setting>();
|
|
|
|
public MqttClientService(IHubContext<MqttHub> hubContext, ILogger<MqttClientService> logger, IConfiguration configuration,
|
|
IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository, IBackendRepository backendRepository)
|
|
{
|
|
this._hubContext = hubContext;
|
|
this._logger = logger;
|
|
this._configuration = configuration;
|
|
this._backendRepository = backendRepository;
|
|
this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository;
|
|
}
|
|
|
|
public MqttClientService(ILogger<AlarmController> logger, IBackgroundServicePostgresqlRepository backgroundServicePostgresqlRepository)
|
|
{
|
|
this._loggerControll = logger;
|
|
this._backgroundServicePostgresqlRepository = backgroundServicePostgresqlRepository;
|
|
}
|
|
|
|
public async Task StartAsync()
|
|
{
|
|
try
|
|
{
|
|
#region get alarmSettings
|
|
await RefreshAlarmSettings();
|
|
#endregion
|
|
|
|
#region devices subscribing
|
|
_logger.LogInformation("MQTT devices subscribing...");
|
|
string sql = "select distinct topic, \"tagIoT\", main_id from device_list where trim(topic) != 'null' and topic is not null and TRIM(topic) != ''";
|
|
var devices = await _backgroundServicePostgresqlRepository.GetAllAsync<device_list>(sql);
|
|
|
|
if (devices.Any())
|
|
{
|
|
foreach (var d in devices)
|
|
{
|
|
sql = $"select \"PointOrg\" from \"IoTPointMap\" where \"TagIoT\" = '{d.tagIot}'";
|
|
d.deviceListPoints = new List<device_list_point>();
|
|
d.deviceListPoints = await _backgroundServicePostgresqlRepository.GetAllAsync<device_list_point>(sql);
|
|
|
|
if (d.deviceListPoints.Any())
|
|
{
|
|
await ConnetMqtt(d);
|
|
}
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation("MQTT devices subscribed end");
|
|
#endregion
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError($"Mqtt devices subcribe error, message: {e.Message}");
|
|
}
|
|
}
|
|
|
|
private async Task ConnetMqtt (device_list device)
|
|
{
|
|
try
|
|
{
|
|
string sql = string.Empty;
|
|
var factory = new MqttFactory();
|
|
_mqttClient = factory.CreateMqttClient();
|
|
|
|
var options = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(new EDFunction().AESDecrypt(_configuration.GetSection("MQTT:server").Get<string>() ?? "192.168.0.217"),
|
|
int.Parse(new EDFunction().AESDecrypt(_configuration.GetSection("MQTT:port").Get<string>()) ?? "1883"))
|
|
.WithCleanSession()
|
|
.Build();
|
|
|
|
var alarmSettings = allAlarmSettings.Where(x => x.main_id == device.main_id).ToList();
|
|
Timer messageTimer = null;
|
|
int timeoutSeconds = 0;
|
|
|
|
if (alarmSettings.Any() && alarmSettings.Any(x => x.factor == 1))
|
|
{
|
|
timeoutSeconds = alarmSettings.FirstOrDefault(x => x.factor == 1).delay;
|
|
messageTimer = new Timer(timeoutSeconds * 1000); // Convert seconds to milliseconds
|
|
alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now;
|
|
messageTimer.Elapsed += async (sender, args) =>
|
|
{
|
|
// Perform your timeout logic here
|
|
messageTimer.Stop(); // Stop the timer to prevent multiple triggers
|
|
|
|
if ((alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime >= alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at
|
|
|| alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at == null)
|
|
&& alarmSettings.FirstOrDefault(x => x.factor == 1).state == "normal")
|
|
{
|
|
// Example: Log timeout
|
|
sql = @$"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at)
|
|
select uuid(), as1.device_number, as1.points, 'offnormal', 0, 0, now(), now()
|
|
from device_main dm
|
|
join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0
|
|
where dm.main_id = {device.main_id} and dm.deleted = 0";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
|
|
sql = $"update alarm_setting set state = 'offnormal', state_update_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
|
|
alarmSettings.FirstOrDefault(x => x.factor == 1).state = "offnormal";
|
|
alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at = DateTime.Now;
|
|
}
|
|
|
|
_logger.LogInformation($"{device.topic} topic timer up, added alarm_log");
|
|
|
|
alarmSettings.FirstOrDefault(x => x.factor == 1).ChkStateTime = DateTime.Now;
|
|
|
|
messageTimer.Start(); // Restart the timer for the next cycle
|
|
};
|
|
}
|
|
|
|
_mqttClient.ApplicationMessageReceivedAsync += async e =>
|
|
{
|
|
if (timeoutSeconds > 0)
|
|
messageTimer.Stop(); // Reset timer on message received
|
|
|
|
var message = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
|
var payLoad = message;
|
|
string value = string.Empty;
|
|
JObject jsonObj = JObject.Parse(payLoad);
|
|
|
|
#region update st point
|
|
if (alarmSettings.Any(x => x.factor == 1))
|
|
{
|
|
sql = $"update alarm_setting set state = 'normal', state_update_at = now() where id = {alarmSettings.FirstOrDefault(x => x.factor == 1).id}";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
|
|
alarmSettings.FirstOrDefault(x => x.factor == 1).state = "normal";
|
|
alarmSettings.FirstOrDefault(x => x.factor == 1).state_update_at = DateTime.Now;
|
|
}
|
|
#endregion
|
|
|
|
#region check point
|
|
foreach (var dlp in device.deviceListPoints)
|
|
{
|
|
int nLevel = 0;
|
|
string[] astrKey = null;
|
|
astrKey = dlp.pointOrg.Split('_');
|
|
|
|
if (astrKey != null)
|
|
nLevel = astrKey.Count();
|
|
|
|
if (nLevel > 1)
|
|
{
|
|
switch (nLevel)
|
|
{
|
|
case 2:
|
|
{
|
|
string strKeyL0 = astrKey[0];
|
|
string strKeyL1 = astrKey[1];
|
|
value = (string)jsonObj[strKeyL0][strKeyL1];
|
|
}
|
|
break;
|
|
case 3:
|
|
{
|
|
string strKeyL0 = astrKey[0];
|
|
string strKeyL1 = astrKey[1];
|
|
string strKeyL2 = astrKey[2];
|
|
value = (string)jsonObj[strKeyL0][strKeyL1][strKeyL2];
|
|
}
|
|
break;
|
|
case 4:
|
|
{
|
|
string strKeyL0 = astrKey[0];
|
|
string strKeyL1 = astrKey[1];
|
|
string strKeyL2 = astrKey[2];
|
|
string strKeyL3 = astrKey[3];
|
|
value = (string)jsonObj[strKeyL0][strKeyL1][strKeyL2][strKeyL3];
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
value = (string)jsonObj[dlp.pointOrg];
|
|
}
|
|
|
|
if (value != null && alarmSettings.Any(x => x.main_id == device.main_id && x.points == dlp.PointSys))
|
|
{
|
|
var alarmSetting = alarmSettings.FirstOrDefault(x => x.main_id == device.main_id && x.points == dlp.PointSys);
|
|
|
|
if (alarmSetting != null)
|
|
{
|
|
var offnormalResult = false;
|
|
if (alarmSetting.state == "normal")
|
|
{
|
|
if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result)
|
|
&& (result > alarmSetting.highLimit || result > alarmSetting.highDelay || result < alarmSetting.lowLimit || result < alarmSetting.lowDelay))
|
|
{
|
|
offnormalResult = true;
|
|
}
|
|
if (alarmSetting.factor == 3 && !string.IsNullOrWhiteSpace(value) && value == alarmSetting.alarm_value)
|
|
{
|
|
offnormalResult = true;
|
|
}
|
|
|
|
if (offnormalResult)
|
|
{
|
|
sql = $@"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at)
|
|
select uuid(), as1.device_number, as1.points, 'offnormal', 0, 0, now(), now()
|
|
from device_main dm
|
|
join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0
|
|
where dm.main_id = {device.main_id}
|
|
and dm.deleted = 0";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
|
|
sql = $"update alarm_setting set state = 'offnormal', state_update_at = now() where id = {alarmSetting.id}";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
}
|
|
}
|
|
else if (alarmSetting.state == "offnormal")
|
|
{
|
|
var normalResult = false;
|
|
if (alarmSetting.factor == 2 && decimal.TryParse(value, out decimal result)
|
|
&& (result <= alarmSetting.highLimit && result <= alarmSetting.highDelay && result >= alarmSetting.lowLimit && result >= alarmSetting.lowDelay))
|
|
{
|
|
normalResult = true;
|
|
}
|
|
//continue factor 3
|
|
|
|
if (normalResult)
|
|
{
|
|
sql = $@"insert into alarm_log (alarm_guid, device_number, points, state, is_sentLine, is_sentEmail, alarm_time, created_at)
|
|
select uuid(), as1.device_number, as1.points, 'normal', 0, 0, now(), now()
|
|
from device_main dm
|
|
join alarm_setting as1 on dm.device_number = as1.device_number and as1.deleted = 0
|
|
where dm.main_id = {device.main_id}
|
|
and dm.deleted = 0";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
|
|
sql = $"update alarm_setting set state = 'normal', state_update_at = now() where id = {alarmSetting.id}";
|
|
|
|
await _backendRepository.ExecuteSql(sql);
|
|
}
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation($"{device.topic} topic added alarm_log");
|
|
}
|
|
}
|
|
#endregion
|
|
|
|
//Console.WriteLine($"{value}");
|
|
//Console.WriteLine($"Message received: {message}");
|
|
|
|
// Broadcast to SignalR clients
|
|
//await _hubContext.Clients.All.SendAsync("ReceiveMessage", e.ApplicationMessage.Topic, message);
|
|
|
|
// Restart the timer for the next message
|
|
if (timeoutSeconds > 0)
|
|
messageTimer.Start();
|
|
};
|
|
|
|
// Connect to the broker
|
|
await _mqttClient.ConnectAsync(options);
|
|
await _mqttClient.SubscribeAsync(device.topic);
|
|
|
|
_logger.LogInformation($"{device.topic} subscribed");
|
|
|
|
// Start the timer when subscription is successful
|
|
if (timeoutSeconds > 0)
|
|
messageTimer.Start();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError($"{device.topic} subscribe failed, message: {e.Message}");
|
|
}
|
|
}
|
|
|
|
public async Task<List<alarm_setting>> GetAlarmSettings()
|
|
{
|
|
return allAlarmSettings;
|
|
}
|
|
|
|
public async Task RefreshAlarmSettings()
|
|
{
|
|
try
|
|
{
|
|
if (_loggerControll != null)
|
|
_loggerControll.LogInformation("Get Alarm Settings...");
|
|
else if (_logger != null)
|
|
_logger.LogInformation("Get Alarm Settings...");
|
|
|
|
string sql = @"select dm.main_id, as1.*
|
|
from alarm_setting as1
|
|
join device_main dm on as1.device_number = dm.device_number and dm.deleted = 0
|
|
where as1.deleted = 0";
|
|
|
|
allAlarmSettings = await _backgroundServicePostgresqlRepository.GetAllAsync<alarm_setting>(sql);
|
|
|
|
if (_loggerControll != null)
|
|
_loggerControll.LogInformation("Done Alarm Settings...");
|
|
else if (_logger != null)
|
|
_logger.LogInformation("Done Alarm Settings...");
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
if (_loggerControll != null)
|
|
_loggerControll.LogError($"get alarm settings error, message: {e.Message}");
|
|
else if (_logger != null)
|
|
_logger.LogError($"get alarm settings error, message: {e.Message}");
|
|
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
public async Task PublishAsync(string topic, string payload)
|
|
{
|
|
if (_mqttClient.IsConnected)
|
|
{
|
|
var message = new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(payload)
|
|
//.WithExactlyOnceQoS()
|
|
.WithRetainFlag()
|
|
.Build();
|
|
|
|
await _mqttClient.PublishAsync(message);
|
|
Console.WriteLine($"Published message to topic {topic}: {payload}");
|
|
}
|
|
}
|
|
}
|
|
} |