783 lines
37 KiB
C#
783 lines
37 KiB
C#
using Dapper;
|
||
using Microsoft.Extensions.Options;
|
||
using Repository.FrontendRepository.Interface;
|
||
using Repository.Helper;
|
||
using Repository.Services.Implement;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.Data;
|
||
using System.Linq;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace Repository.FrontendRepository.Implement
|
||
{
|
||
public class FrontendRepository : BaseRepository.Implement.BaseRepository, IFrontendRepository
|
||
{
|
||
private string where_format = "{0} = '{1}'";
|
||
private string where_list_format = "{0} = @{1}";
|
||
private readonly IOptions<ObixApiConfig> _obixApiConfig;
|
||
|
||
public FrontendRepository(IDatabaseHelper databaseHelper, IOptions<ObixApiConfig> _obixApiConfig) : base(databaseHelper)
|
||
{
|
||
UseDB = "MySQL";
|
||
this._obixApiConfig = _obixApiConfig;
|
||
//con = databaseHelper.GetMySqlConnection();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 透過Account,取得當前登入使用者資訊
|
||
/// </summary>
|
||
/// <param name="account"></param>
|
||
/// <returns></returns>
|
||
public A GetMyUserInfoByAccount<A>(string account)
|
||
{
|
||
A result;
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
try
|
||
{
|
||
var sql = $@"SELECT a.*,b.layer FROM userinfo a
|
||
left join role b on a.role_guid = b.role_guid
|
||
WHERE a.deleted = 0 AND a.status = @Status AND account = @Account ";
|
||
|
||
result = conn.QueryFirstOrDefault<A>(sql, new { Status = 0, Account = account });
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
return result;
|
||
}
|
||
}
|
||
|
||
public void UpdateProcessPID(Dictionary<string, object> dict, string Table_name, string sWhere)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
List<string> properties = dict.Keys.ToList();
|
||
string sql = UpdateGenerateString(properties, Table_name, sWhere);
|
||
|
||
conn.Execute(sql, dict, trans);
|
||
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task AddOneFromDataDelivery(int task_id, List<string> pks, Dictionary<string, object> dict, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
foreach (var pair in dict)
|
||
{
|
||
//移除@
|
||
var temp_key = pair.Key.Replace("@", string.Empty);
|
||
|
||
if (pks.Contains(temp_key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(temp_key);
|
||
index_of_pk.Add(index, pair.Value.ToString());
|
||
}
|
||
}
|
||
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
|
||
var j = 0;
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", pair.Value);
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
var data_delivery_log_properties = data_delivery_log.Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_log, trans);
|
||
|
||
//透過資料派送新增至各資料表
|
||
List<string> properties = dict.Keys.ToList();
|
||
string sql = InsertGenerateString(properties, targetTable);
|
||
await conn.ExecuteAsync(sql, dict, trans);
|
||
|
||
OntimeDeviceSubscripService ontimeDeviceSubscripService = new OntimeDeviceSubscripService(conn, trans, _obixApiConfig);
|
||
ontimeDeviceSubscripService.MaintainOntimeDeviceSubscription(targetTable, "insert", dict);
|
||
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task AddMutiFromDataDelivery(int task_id, List<string> pks, List<Dictionary<string, object>> dicts, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
if (pks.Count() == 1 && pks[0].ToLower() == "id")
|
||
{
|
||
//如果id 為pk則直接新增
|
||
|
||
}
|
||
|
||
foreach (var pair in dicts.First())
|
||
{
|
||
//移除@
|
||
var temp_key = pair.Key.Replace("@", string.Empty);
|
||
if (pks.Contains(temp_key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(temp_key);
|
||
index_of_pk.Add(index, pair.Key);
|
||
}
|
||
}
|
||
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
List<Dictionary<string, object>> data_delivery_logs = new List<Dictionary<string, object>>();
|
||
|
||
foreach (var dict in dicts)
|
||
{
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
var j = 0;
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", dict[pair.Value]);
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
data_delivery_logs.Add(data_delivery_log);
|
||
}
|
||
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
var data_delivery_log_properties = data_delivery_logs[0].Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_logs, trans);
|
||
|
||
//透過資料派送新增至各資料表
|
||
List<string> properties = dicts[0].Keys.ToList();
|
||
string sql = InsertGenerateString(properties, targetTable);
|
||
await conn.ExecuteAsync(sql, dicts, trans);
|
||
|
||
//針對設備表(device)去修改設備點位訂閱表
|
||
if (targetTable == "device")
|
||
{
|
||
OntimeDeviceSubscripService ontimeDeviceSubscripService = new OntimeDeviceSubscripService(conn, trans, _obixApiConfig);
|
||
ontimeDeviceSubscripService.MaintainMutiOntimeDeviceSubscription(targetTable, dicts);
|
||
}
|
||
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task UpdateOneFromDataDelivery(int task_id, List<string> pks, Dictionary<string, object> dict, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
List<string> update_one_wheres = new List<string>();
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
foreach (var pair in dict)
|
||
{
|
||
if (pks.Contains(pair.Key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(pair.Key);
|
||
index_of_pk.Add(index, pair.Value.ToString());
|
||
|
||
//以pk當where條件
|
||
var temp_str = string.Format(where_format, pair.Key, pair.Value);
|
||
update_one_wheres.Add(temp_str);
|
||
}
|
||
}
|
||
|
||
//排序pk
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
//透過pk找出 data_delivery_log 資料表中最後一筆任務編號
|
||
List<string> last_one_id_where = new List<string>();
|
||
var i = 0;
|
||
foreach (var pair in order_pk)
|
||
{
|
||
last_one_id_where.Add(string.Format(where_format, $"pk{++i}", pair.Value));
|
||
}
|
||
var last_one_id_where_str = string.Join(" AND ", last_one_id_where);
|
||
var last_one_id_sql = $@"SELECT task_id FROM data_delivery_log WHERE {last_one_id_where_str} ORDER BY task_id DESC";
|
||
|
||
var last_one_id = await conn.QueryFirstOrDefaultAsync<int>(last_one_id_sql);
|
||
|
||
if (last_one_id < task_id)
|
||
{
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
var j = 0;
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", pair.Value);
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
var data_delivery_log_properties = data_delivery_log.Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_log, trans);
|
||
|
||
//找出原本的資料,用在設備點位訂閱表
|
||
string origData = string.Empty;
|
||
if (targetTable == "device" || targetTable == "device_item")
|
||
{
|
||
if(targetTable == "device")
|
||
{
|
||
var sql_orig_data = $@"SELECT device_number FROM device WHERE device_guid = @device_guid";
|
||
|
||
object device_guid = null;
|
||
var has_value = dict.TryGetValue("device_guid", out device_guid);
|
||
if (has_value)
|
||
{
|
||
origData = await conn.QueryFirstOrDefaultAsync<string>(sql_orig_data, new { device_guid = device_guid.ToString() }, trans);
|
||
}
|
||
}
|
||
else if(targetTable == "device_item")
|
||
{
|
||
var sql_orig_data = $@"SELECT points FROM device_item WHERE device_item_guid = @device_item_guid";
|
||
|
||
object device_item_guid = null;
|
||
var has_value = dict.TryGetValue("device_item_guid", out device_item_guid);
|
||
if (has_value)
|
||
{
|
||
origData = await conn.QueryFirstOrDefaultAsync<string>(sql_orig_data, new { device_item_guid = device_item_guid.ToString() }, trans);
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
// 透過資料派送更新至各資料表
|
||
List<string> properties = dict.Keys.ToList();
|
||
string sql = UpdateGenerateString(properties, targetTable, string.Join(" AND ", update_one_wheres));
|
||
await conn.ExecuteAsync(sql, dict, trans);
|
||
|
||
//針對設備表(device)與點位表(device_item)去修改設備點位訂閱表
|
||
if(targetTable == "device" || targetTable == "device_item")
|
||
{
|
||
var isDelete = false;
|
||
foreach (var property in properties)
|
||
{
|
||
if (property == "@deleted")
|
||
{
|
||
isDelete = true;
|
||
}
|
||
}
|
||
|
||
if (isDelete)
|
||
{
|
||
OntimeDeviceSubscripService ontimeDeviceSubscripService = new OntimeDeviceSubscripService(conn, trans, _obixApiConfig);
|
||
ontimeDeviceSubscripService.MaintainOntimeDeviceSubscription(targetTable, "delete", dict);
|
||
}
|
||
else
|
||
{
|
||
OntimeDeviceSubscripService ontimeDeviceSubscripService = new OntimeDeviceSubscripService(conn, trans, _obixApiConfig);
|
||
ontimeDeviceSubscripService.MaintainOntimeDeviceSubscription(targetTable, "update", dict, origData);
|
||
}
|
||
}
|
||
}
|
||
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task UpdateListFromDataDelivery(int task_id, List<string> pks, List<Dictionary<string, object>> dicts, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
List<string> update_list_wheres = new List<string>();
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
foreach (var pair in dicts.First())
|
||
{
|
||
//移除@
|
||
var temp_key = pair.Key.Replace("@", string.Empty);
|
||
|
||
if (pks.Contains(temp_key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(temp_key);
|
||
index_of_pk.Add(index, pair.Key);
|
||
|
||
//以pk當where條件
|
||
var temp_str = string.Format(where_list_format, temp_key, temp_key);
|
||
update_list_wheres.Add(temp_str);
|
||
}
|
||
}
|
||
|
||
//排序pk
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
//透過pk找出 data_delivery_log 資料表中 每一待更新的資料 最後一次更新的任務編號
|
||
|
||
foreach (var dict in dicts)
|
||
{
|
||
List<string> last_one_id_where = new List<string>();
|
||
var i = 0;
|
||
foreach (var pair in order_pk)
|
||
{
|
||
last_one_id_where.Add(string.Format(where_format, $"pk{++i}", dict[pair.Value]));
|
||
}
|
||
var last_one_id_where_str = string.Join(" AND ", last_one_id_where);
|
||
var last_one_id_sql = $@"SELECT task_id FROM data_delivery_log WHERE {last_one_id_where_str} ORDER BY task_id DESC";
|
||
|
||
var last_one_id = await conn.QueryFirstOrDefaultAsync<int>(last_one_id_sql);
|
||
|
||
dict.Add("last_one_id", last_one_id);
|
||
}
|
||
|
||
|
||
//重新找出需要更新的的資料
|
||
List<Dictionary<string, object>> data_delivery_logs = new List<Dictionary<string, object>>();
|
||
List<Dictionary<string, object>> new_dicts = new List<Dictionary<string, object>>();
|
||
foreach (var dict in dicts)
|
||
{
|
||
if ((Int32)dict["last_one_id"] < task_id)
|
||
{
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
var j = 0;
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", dict[pair.Value]);
|
||
|
||
//pk 重新整理
|
||
var temp_key = pair.Value.Replace("@", string.Empty);
|
||
dict.Add(temp_key, dict[pair.Value]);
|
||
dict.Remove(pair.Value);
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
data_delivery_logs.Add(data_delivery_log);
|
||
|
||
dict.Remove("last_one_id");
|
||
new_dicts.Add(dict);
|
||
}
|
||
}
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
var data_delivery_log_properties = data_delivery_logs.First().Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_logs, trans);
|
||
|
||
//透過資料派送更新至各資料表
|
||
List<string> properties = dicts.First().Keys.ToList();
|
||
string sql = UpdateGenerateString(properties, targetTable, string.Join(" AND ", update_list_wheres));
|
||
await conn.ExecuteAsync(sql, new_dicts, trans);
|
||
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task PurgeOneFromDataDelivery(int task_id, List<string> pks, Dictionary<string, object> dict, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
List<string> purge_one_wheres = new List<string>();
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
foreach (var pair in dict)
|
||
{
|
||
//移除@
|
||
var temp_key = pair.Key.Replace("@", string.Empty);
|
||
|
||
if (pks.Contains(temp_key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(temp_key);
|
||
index_of_pk.Add(index, pair.Key);
|
||
|
||
//以pk當where條件
|
||
var temp_str = string.Format(where_format, pair.Key, pair.Value);
|
||
purge_one_wheres.Add(temp_str);
|
||
}
|
||
}
|
||
|
||
//排序pk
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
List<string> last_one_id_where = new List<string>();
|
||
var i = 0;
|
||
foreach (var pair in order_pk)
|
||
{
|
||
last_one_id_where.Add(string.Format(where_format, $"pk{++i}", dict[pair.Value]));
|
||
}
|
||
var last_one_id_where_str = string.Join(" AND ", last_one_id_where);
|
||
var last_one_id_sql = $@"SELECT task_id FROM data_delivery_log WHERE {last_one_id_where_str} ORDER BY task_id DESC";
|
||
|
||
var last_one_id = await conn.QueryFirstOrDefaultAsync<int>(last_one_id_sql);
|
||
|
||
if (last_one_id < task_id)
|
||
{
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
var j = 0;
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", dict[pair.Value]);
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
var data_delivery_log_properties = data_delivery_log.Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_log, trans);
|
||
|
||
var purge_one_where_str = string.Join(" AND ", purge_one_wheres);
|
||
var sql = $"DELETE FROM {targetTable} WHERE {purge_one_where_str}";
|
||
await conn.ExecuteAsync(sql, dict, trans);
|
||
}
|
||
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task PurgeSpecifyInsertFromDataDelivery(int task_id, List<string> pks, List<Dictionary<string, object>> dicts, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
foreach (var pair in dicts.First())
|
||
{
|
||
//以沒有@為判斷刪除條件條件
|
||
|
||
if (pks.Contains(pair.Key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(pair.Key);
|
||
index_of_pk.Add(index, pair.Key);
|
||
}
|
||
}
|
||
|
||
//排序pk
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
//透過pk找出 data_delivery_log 資料表中 每一待更新的資料 最後一次更新的任務編號
|
||
|
||
foreach (var dict in dicts)
|
||
{
|
||
List<string> last_one_id_where = new List<string>();
|
||
var i = 0;
|
||
foreach (var pair in order_pk)
|
||
{
|
||
last_one_id_where.Add(string.Format(where_format, $"pk{++i}", dict[pair.Value]));
|
||
}
|
||
var last_one_id_where_str = string.Join(" AND ", last_one_id_where);
|
||
var last_one_id_sql = $@"SELECT task_id FROM data_delivery_log WHERE {last_one_id_where_str} ORDER BY task_id DESC";
|
||
|
||
var last_one_id = await conn.QueryFirstOrDefaultAsync<int>(last_one_id_sql);
|
||
|
||
dict.Add("last_one_id", last_one_id);
|
||
}
|
||
|
||
var temp_deleted_targets = new List<Dictionary<string, object>>(); //待刪除的資料列表
|
||
|
||
//重新找出需要更新的的資料
|
||
List<Dictionary<string, object>> data_delivery_logs = new List<Dictionary<string, object>>();
|
||
List<Dictionary<string, object>> new_dicts = new List<Dictionary<string, object>>();
|
||
foreach (var dict in dicts)
|
||
{
|
||
if ((Int32)dict["last_one_id"] < task_id)
|
||
{
|
||
Dictionary<string, object> temp_deleted_target = new Dictionary<string, object>();
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
var j = 0;
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", dict[pair.Value]);
|
||
|
||
//pk 重新整理
|
||
|
||
if (dict[pair.Value] != null && pair.Value.Substring(0, 1) != "@")
|
||
{
|
||
temp_deleted_target.Add(pair.Value, dict[pair.Value]);
|
||
|
||
//var temp_key = pair.Value.Replace("@", string.Empty);
|
||
dict.Add("@" + pair.Value, dict[pair.Value]);
|
||
dict.Remove(pair.Value);
|
||
}
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
var equal = data_delivery_logs.Any(x => x.Values.SequenceEqual(data_delivery_log.Values));
|
||
|
||
if (!equal)
|
||
{
|
||
data_delivery_logs.Add(data_delivery_log);
|
||
}
|
||
|
||
//判斷待刪除的是否已存在
|
||
var delete_equal = temp_deleted_targets.Any(x => x.Values.SequenceEqual(temp_deleted_target.Values));
|
||
if (!delete_equal)
|
||
{
|
||
temp_deleted_targets.Add(temp_deleted_target);
|
||
}
|
||
|
||
dict.Remove("last_one_id");
|
||
new_dicts.Add(dict);
|
||
}
|
||
}
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
if (data_delivery_logs.Count > 0 && new_dicts.Count > 0)
|
||
{
|
||
data_delivery_logs = data_delivery_logs.Distinct().ToList();
|
||
var data_delivery_log_properties = data_delivery_logs.First().Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_logs, trans);
|
||
|
||
var deleted_where = "";
|
||
var list_temp_where = new List<string>();
|
||
foreach (var deleted_target in temp_deleted_targets)
|
||
{
|
||
foreach (var pair in deleted_target)
|
||
{
|
||
var str = string.Format(where_format, pair.Key, pair.Value);
|
||
|
||
list_temp_where.Add(str);
|
||
}
|
||
|
||
deleted_where = string.Join(" AND ", list_temp_where);
|
||
|
||
var deleted_sql = $@"DELETE FROM {targetTable} WHERE {deleted_where}";
|
||
await conn.ExecuteAsync(deleted_sql, trans);
|
||
}
|
||
|
||
//透過資料派送新增至各資料表
|
||
List<string> properties = dicts[0].Keys.ToList();
|
||
string sql = InsertGenerateString(properties, targetTable);
|
||
await conn.ExecuteAsync(sql, new_dicts, trans);
|
||
}
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task PurgeAllInsertFromDataDelivery(int task_id, List<string> pks, List<Dictionary<string, object>> dicts, string targetTable)
|
||
{
|
||
using (IDbConnection conn = GetDbConnection())
|
||
{
|
||
conn.Open();
|
||
using (var trans = conn.BeginTransaction())
|
||
{
|
||
try
|
||
{
|
||
Dictionary<int, string> index_of_pk = new Dictionary<int, string>();
|
||
|
||
foreach (var pair in dicts.First())
|
||
{
|
||
//移除@
|
||
var temp_key = pair.Key.Replace("@", string.Empty);
|
||
|
||
if (pks.Contains(temp_key))
|
||
{
|
||
//找出pk的順序
|
||
var index = pks.IndexOf(temp_key);
|
||
index_of_pk.Add(index, pair.Key);
|
||
}
|
||
}
|
||
|
||
//排序pk
|
||
var order_pk = index_of_pk.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
|
||
|
||
//透過pk找出 data_delivery_log 資料表中 每一待更新的資料 最後一次更新的任務編號
|
||
|
||
foreach (var dict in dicts)
|
||
{
|
||
List<string> last_one_id_where = new List<string>();
|
||
var i = 0;
|
||
foreach (var pair in order_pk)
|
||
{
|
||
last_one_id_where.Add(string.Format(where_format, $"pk{++i}", dict[pair.Value]));
|
||
}
|
||
var last_one_id_where_str = string.Join(" AND ", last_one_id_where);
|
||
var last_one_id_sql = $@"SELECT task_id FROM data_delivery_log WHERE {last_one_id_where_str} ORDER BY task_id DESC";
|
||
|
||
var last_one_id = await conn.QueryFirstOrDefaultAsync<int>(last_one_id_sql);
|
||
|
||
dict.Add("last_one_id", last_one_id);
|
||
}
|
||
|
||
//重新找出需要更新的的資料
|
||
List<Dictionary<string, object>> data_delivery_logs = new List<Dictionary<string, object>>();
|
||
List<Dictionary<string, object>> new_dicts = new List<Dictionary<string, object>>();
|
||
foreach (var dict in dicts)
|
||
{
|
||
if ((Int32)dict["last_one_id"] < task_id)
|
||
{
|
||
Dictionary<string, object> data_delivery_log = new Dictionary<string, object>();
|
||
var j = 0;
|
||
data_delivery_log.Add("@target_table", targetTable);
|
||
foreach (var pair in order_pk)
|
||
{
|
||
data_delivery_log.Add($"@pk{++j}", dict[pair.Value]);
|
||
|
||
////pk 重新整理
|
||
//var temp_key = pair.Value.Replace("@", string.Empty);
|
||
//dict.Add(temp_key, dict[pair.Value]);
|
||
//dict.Remove(pair.Value);
|
||
}
|
||
data_delivery_log.Add("@task_id", task_id);
|
||
|
||
data_delivery_logs.Add(data_delivery_log);
|
||
|
||
dict.Remove("last_one_id");
|
||
new_dicts.Add(dict);
|
||
}
|
||
}
|
||
//新增至資料派送log,以確保當前派送任務執行編號
|
||
if (data_delivery_logs.Count > 0 && new_dicts.Count > 0)
|
||
{
|
||
var data_delivery_log_properties = data_delivery_logs.First().Keys.ToList();
|
||
string data_delivery_log_sql = InsertGenerateString(data_delivery_log_properties, "data_delivery_log");
|
||
await conn.ExecuteAsync(data_delivery_log_sql, data_delivery_logs, trans);
|
||
|
||
var deleted_sql = $@"DELETE FROM {targetTable}";
|
||
await conn.ExecuteAsync(deleted_sql, trans);
|
||
|
||
//透過資料派送新增至各資料表
|
||
List<string> properties = dicts[0].Keys.ToList();
|
||
string sql = InsertGenerateString(properties, targetTable);
|
||
await conn.ExecuteAsync(sql, new_dicts, trans);
|
||
}
|
||
trans.Commit();
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
trans.Rollback();
|
||
throw exception;
|
||
}
|
||
finally
|
||
{
|
||
conn.Close();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|