数据表分片策略统一出口封装

This commit is contained in:
陈益 2025-03-24 21:55:22 +08:00
parent 9da1745573
commit af9b0d8a77
5 changed files with 59 additions and 17 deletions

View File

@ -1,7 +1,10 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
@ -9,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@ -16,16 +20,16 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService,ICapSubscribe
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
{
private readonly ILogger<SubscriberAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
/// <summary>
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
@ -38,12 +42,12 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="messageReceivedEventRepository">The message received event repository.</param>
/// <param name="deviceRepository">The device repository.</param>
/// <param name="meterReadingRecordsRepository">The device repository.</param>
public SubscriberAppService(ILogger<SubscriberAppService> logger,
ITcpService tcpService, IServiceProvider serviceProvider,
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
IRepository<Device, Guid> deviceRepository, IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
public SubscriberAppService(ILogger<SubscriberAppService> logger,
ITcpService tcpService, IServiceProvider serviceProvider,
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
IRepository<Device, Guid> deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository)
{
_logger = logger;
_tcpService = tcpService;
@ -79,7 +83,7 @@ namespace JiShe.CollectBus.Subscribers
throw new ArgumentOutOfRangeException();
}
var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
if (device!=null)
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
}
@ -96,7 +100,34 @@ namespace JiShe.CollectBus.Subscribers
else
{
//todo 会根据不同的协议进行解析,然后做业务处理
TB3761FN fN = await protocolPlugin.AnalyzeAsync<TB3761FN>(receivedMessage);
TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage);
if(fN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return;
}
var tb3761FN = fN.FnList.FirstOrDefault();
if (tb3761FN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return;
}
//todo 查找是否有下发任务
await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords()
{
ReceivedMessageHexString = receivedMessage.MessageHexString,
AFN = fN.Afn,
Fn = tb3761FN.Fn,
Pn = 0,
FocusAddress = "",
MeterAddress = "",
DataResult = tb3761FN.Text,
});
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
}

View File

@ -167,5 +167,15 @@ namespace JiShe.CollectBus.Common.Extensions
)
);
}
/// <summary>
/// 获取数据表分片策略
/// </summary>
/// <param name="dateTime"></param>
/// <returns></returns>
public static string GetDataTableShardingStrategy(this DateTime dateTime)
{
return $"{dateTime:yyyyMMddHHmm}";
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using JiShe.CollectBus.Common.Extensions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -21,7 +22,7 @@ namespace JiShe.CollectBus.ShardingStrategy
public string GetCollectionName(DateTime dateTime)
{
var baseName = typeof(TEntity).Name;
return $"{baseName}_{dateTime:yyyyMMddHHmm}";
return $"{baseName}_{dateTime.GetDataTableShardingStrategy()}";
}
/// <summary>
@ -31,7 +32,7 @@ namespace JiShe.CollectBus.ShardingStrategy
public string GetCurrentCollectionName()
{
var baseName = typeof(TEntity).Name;
return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}";
return $"{baseName}_{DateTime.Now.GetDataTableShardingStrategy()}";
}
/// <summary>
@ -49,7 +50,7 @@ namespace JiShe.CollectBus.ShardingStrategy
while (current <= end)
{
months.Add($"{baseName}_{current:yyyyMMddHHmm}");
months.Add($"{baseName}_{current.GetDataTableShardingStrategy()}");
current = current.AddMonths(1);
}

View File

@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
//await _protocolInfoCache.Get()
}
public abstract Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761FN;
public abstract Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761;
/// <summary>
/// 登录帧解析

View File

@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
Task AddAsync();
Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761FN;
Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761;
Task LoginAsync(MessageReceivedLogin messageReceived);