Compare commits

..

5 Commits

Author SHA1 Message Date
ChenYi
e7f94ceae4 自定分库分表实现 2025-03-20 16:40:27 +08:00
陈益
e836522e3a 新增抄读记录表自定义仓储 2025-03-19 22:52:53 +08:00
ChenYi
83d8785ff4 完善报文解析,搭建数据迁移架构 2025-03-19 14:31:04 +08:00
陈益
76fe43ae54 优化消息存储 2025-03-18 22:43:24 +08:00
ChenYi
7f56b6e91f 完善代码 2025-03-18 21:21:35 +08:00
28 changed files with 943 additions and 106 deletions

View File

@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.DataMigration
{
/// <summary>
/// 数据迁移服务
/// </summary>
public interface IDataMigrationService
{
/// <summary>
/// 开始迁移
/// </summary>
/// <returns></returns>
Task StartMigrationAsync();
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.DataMigration.Options
{
/// <summary>
/// 数据迁移配置
/// </summary>
public class DataMigrationOptions
{
/// <summary>
/// MongoDb每批处理量
/// </summary>
public int MongoDbDataBatchSize { get; set; } = 1000;
/// <summary>
/// 批量处理通道容量
/// </summary>
public int ChannelCapacity { get; set; } = 100;
/// <summary>
/// 数据库 每批处理量
/// </summary>
public int SqlBulkBatchSize { get; set; } = 1000;
/// <summary>
/// 数据库 每批处理超时时间
/// </summary>
public int SqlBulkTimeout { get; set; } = 60;
/// <summary>
/// 处理器数量
/// </summary>
public int ProcessorsCount { get; set; } = 4;
}
}

View File

@ -26,6 +26,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,5 +1,7 @@
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -12,6 +14,9 @@ namespace JiShe.CollectBus.Subscribers
{ {
#region #region
Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery();
/// <summary> /// <summary>
/// 1分钟采集电表数据下行消息消费订阅 /// 1分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>

View File

@ -13,6 +13,7 @@ using Volo.Abp.AspNetCore.Mvc.AntiForgery;
using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeRedisProvider;
using JiShe.CollectBus.Workers; using JiShe.CollectBus.Workers;
using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.BackgroundWorkers.Hangfire;
using JiShe.CollectBus.MongoDB;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -22,6 +23,7 @@ namespace JiShe.CollectBus;
typeof(AbpDddApplicationModule), typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule), typeof(AbpAutoMapperModule),
typeof(AbpBackgroundWorkersHangfireModule), typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusMongoDbModule),
typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule) typeof(CollectBusFreeSqlModule)
)] )]

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using MassTransit; using MassTransit;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -48,7 +49,7 @@ namespace JiShe.CollectBus.Consumers
var list = new List<MessageReceived>(); var list = new List<MessageReceived>();
foreach (var contextItem in context.Message) foreach (var contextItem in context.Message)
{ {
await protocolPlugin.AnalyzeAsync(contextItem.Message); await protocolPlugin.AnalyzeAsync<TB3761FN>(contextItem.Message);
list.Add(contextItem.Message); list.Add(contextItem.Message);
} }
await _messageReceivedEventRepository.InsertManyAsync(list); await _messageReceivedEventRepository.InsertManyAsync(list);

View File

@ -0,0 +1,153 @@
using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using LiteDB;
using Microsoft.Extensions.Options;
using System;
using System.Data;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.DataMigration
{
/// <summary>
/// 数据迁移服务
/// </summary>
public class DataMigrationService: CollectBusAppService, IDataMigrationService
{
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
private readonly DataMigrationOptions _options;
public DataMigrationService(IOptions<DataMigrationOptions> options,
IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
{
_options = options.Value;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
}
/// <summary>
/// 开始迁移
/// </summary>
/// <returns></returns>
public async Task StartMigrationAsync()
{
var rawDataChannel = Channel.CreateBounded<MeterReadingRecords[]>(new BoundedChannelOptions(_options.ChannelCapacity)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
var cleanDataChannel = Channel.CreateBounded<DataTable>(new BoundedChannelOptions(_options.ChannelCapacity)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
// 启动生产者和消费者
var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer));
var processors = Enumerable.Range(0, _options.ProcessorsCount)
.Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer)))
.ToArray();
var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader));
await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer }));
}
/// <summary>
/// 生产者,生产数据,主要是从MongoDB中读取数据
/// </summary>
/// <param name="writer"></param>
/// <returns></returns>
private async Task ProduceDataAsync(ChannelWriter<MeterReadingRecords[]> writer)
{
while (true)
{
var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
.Take(_options.MongoDbDataBatchSize)
.ToArray();
if (batchRecords == null || batchRecords.Length == 0)
{
writer.Complete();
break;
}
await writer.WriteAsync(batchRecords);
}
}
/// <summary>
/// 清洗数据
/// </summary>
/// <param name="reader"></param>
/// <param name="writer"></param>
/// <returns></returns>
private async Task ProcessDataAsync(ChannelReader<MeterReadingRecords[]> reader, ChannelWriter<DataTable> writer)
{
await foreach (var batch in reader.ReadAllAsync())
{
//var dataTable = new DataTable();
//dataTable.Columns.Add("Id", typeof(string));
//dataTable.Columns.Add("CleanName", typeof(string));
//dataTable.Columns.Add("ProcessedTime", typeof(DateTime));
//foreach (var doc in batch)
//{
// // 业务清洗逻辑
// var cleanName = doc["name"].AsString.Trim().ToUpper();
// dataTable.Rows.Add(
// doc["_id"].ToString(),
// cleanName,
// DateTime.UtcNow);
//}
//await writer.WriteAsync(dataTable);
// 批量更新标记
var ids = batch.Select(d => d.Id).ToArray();
foreach (var item in batch)
{
item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
item.MigrationTime = DateTime.Now;
}
await _meterReadingRecordsRepository.UpdateManyAsync(batch);
}
writer.Complete();
}
/// <summary>
/// 消费清洗后的数据入库
/// </summary>
/// <param name="reader"></param>
/// <returns></returns>
private async Task ConsumeDataAsync(ChannelReader<DataTable> reader)
{
//await using var connection = new SqlConnection(_sqlConnectionString);
//await connection.OpenAsync();
//await foreach (var dataTable in reader.ReadAllAsync())
//{
// using var bulkCopy = new SqlBulkCopy(connection)
// {
// DestinationTableName = "CleanData",
// BatchSize = 5000,
// BulkCopyTimeout = 300
// };
// bulkCopy.ColumnMappings.Add("Id", "Id");
// bulkCopy.ColumnMappings.Add("CleanName", "CleanName");
// bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime");
// await bulkCopy.WriteToServerAsync(dataTable);
//}
}
}
}

View File

@ -25,10 +25,7 @@
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" /> <PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -18,8 +18,11 @@ using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Workers; using JiShe.CollectBus.Workers;
using MassTransit; using MassTransit;
using MassTransit.Internals.GraphValidation; using MassTransit.Internals.GraphValidation;
@ -36,17 +39,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _capBus; private readonly ICapPublisher _capBus;
private readonly IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> _meterReadingIssuedRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher capBus, ICapPublisher capBus,
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) IMeterReadingRecordRepository meterReadingRecordsRepository)
{ {
_capBus = capBus; _capBus = capBus;
_logger = logger; _logger = logger;
_meterReadingIssuedRepository = meterReadingIssuedRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository;
} }
/// <summary> /// <summary>
@ -99,7 +102,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率 //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率
var tempArryay = item.Split(":"); var tempArryay = item.Split(":");
string meteryType = tempArryay[3];//表计类别 string meteryType = tempArryay[3];//表计类别
string timeDensity = tempArryay[4];//采集频率 int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率
//获取缓存中的电表信息 //获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*";
@ -113,7 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
// 解析结果(结果为嵌套数组) // 解析结果(结果为嵌套数组)
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, timeDensity, meteryType); var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104"); _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
@ -186,8 +189,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG #if DEBUG
//每次缓存时,删除缓存,避免缓存数据错误 //每次缓存时,删除缓存,避免缓存数据错误
await FreeRedisProvider.Instance.DelAsync(redisCacheKey); await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else #else
//每次缓存时,删除缓存,避免缓存数据错误 //每次缓存时,删除缓存,避免缓存数据错误
await FreeRedisProvider.Instance.DelAsync(redisCacheKey); await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
@ -274,29 +277,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//获取下发任务缓存数据 //获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return; return;
} }
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>(); List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中 //将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos) foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) foreach (var ammerterItem in focusItem.Value)
{ {
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value); var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
} }
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//缓存下一个时间的任务 //缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
@ -320,37 +334,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 5; int timeDensity = 5;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return; return;
} }
//获取下发任务缓存数据 //获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return; return;
} }
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>(); List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中 //将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos) foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) foreach (var ammerterItem in focusItem.Value)
{ {
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value); var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
} }
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
//缓存下一个时间的任务 //缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
@ -375,41 +400,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
var currentDateTime = DateTime.Now;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return; return;
} }
//获取下发任务缓存数据 //获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return; return;
} }
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>(); List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中 //将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos) foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) foreach (var ammerterItem in focusItem.Value)
{ {
//todo 可能需要优化如果使用等待会很慢但使用不等待mongodb 连接池又没法抗住,先发送微妙级的延时队列消息,暂时先这样处理 var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value); {
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
} }
//删除任务数据 //删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
//缓存下一个时间的任务 //缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
@ -420,7 +454,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity); var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
stopwatch.Stop(); stopwatch.Stop();
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
@ -432,9 +466,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param> /// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="focusGroup">集中器数据分组</param> /// <param name="focusGroup">集中器数据分组</param>
/// <returns></returns> /// <returns></returns>
private async Task AmmerterScheduledMeterReadingIssued(string timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup) private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
{ {
if (string.IsNullOrWhiteSpace(timeDensity) || focusGroup == null || focusGroup.Count <= 0) if (timeDensity <= 0)
{
timeDensity = 1;
}
if (timeDensity > 15)
{
timeDensity = 15;
}
if ( focusGroup == null || focusGroup.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101"); _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
return; return;
@ -492,12 +536,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param> /// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
/// <returns></returns> /// <returns></returns>
private async Task AmmerterCreatePublishTask(string timeDensity private async Task AmmerterCreatePublishTask(int timeDensity
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup) , Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
{ {
var HandlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
foreach (var focusInfo in focusGroup) foreach (var focusInfo in focusGroup)
{ {
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型 //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
@ -586,7 +631,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
Dictionary<string, ScheduledMeterReadingIssuedEventMessage> keyValuePairs = new Dictionary<string, ScheduledMeterReadingIssuedEventMessage>(); Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
foreach (var tempItem in tempCodes) foreach (var tempItem in tempCodes)
{ {
@ -615,7 +660,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
string methonCode = $"AFN{aFNStr}_Fn_Send"; string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理 //特殊表暂不处理
if (HandlerPacketBuilder != null && HandlerPacketBuilder.TryGetValue(methonCode if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler)) , out var handler))
{ {
dataInfos = handler(new TelemetryPacketRequest() dataInfos = handler(new TelemetryPacketRequest()
@ -639,16 +684,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
var evenMessageInfo = new ScheduledMeterReadingIssuedEventMessage
var meterReadingRecords = new MeterReadingRecords()
{ {
MessageHexString = Convert.ToHexString(dataInfos), PendingCopyReadTime = pendingCopyReadTime,
DeviceNo = ammeter.FocusAddress,
MessageId = NewId.NextGuid().ToString(),
TimeDensity = timeDensity,
WasSuccessful = false,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeter.AmmerterAddress,
MeterId = ammeter.ID,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress,
FocusID = ammeter.FocusID,
AFN = aFN,
Fn = fn,
Pn = ammeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
}; };
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", evenMessageInfo); meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
@ -742,29 +797,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//获取下发任务缓存数据 //获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return; return;
} }
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>(); List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中 //将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos) foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) foreach (var ammerterItem in focusItem.Value)
{ {
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value); var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
} }
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//缓存下一个时间的任务 //缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
@ -789,37 +855,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 5; int timeDensity = 5;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return; return;
} }
//获取下发任务缓存数据 //获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return; return;
} }
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>(); List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中 //将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos) foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) foreach (var ammerterItem in focusItem.Value)
{ {
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value); var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
} }
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
//缓存下一个时间的任务 //缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
@ -829,7 +906,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity); var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
} }
@ -843,37 +920,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return; return;
} }
//获取下发任务缓存数据 //获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return; return;
} }
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>(); List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中 //将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos) foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) foreach (var ammerterItem in focusItem.Value)
{ {
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value); var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
} }
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
//缓存下一个时间的任务 //缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
@ -953,7 +1041,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return meterInfos; return meterInfos;
} }
/// <summary> /// <summary>
/// 指定时间对比当前时间 /// 指定时间对比当前时间
/// </summary> /// </summary>

View File

@ -7,7 +7,10 @@ using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
@ -24,8 +27,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher capBus, ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository)
{ {
} }

View File

@ -5,8 +5,10 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using TouchSocket.Sockets; using TouchSocket.Sockets;
@ -23,6 +25,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository; private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository; private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class. /// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
@ -34,12 +37,13 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param> /// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
/// <param name="messageReceivedEventRepository">The message received event repository.</param> /// <param name="messageReceivedEventRepository">The message received event repository.</param>
/// <param name="deviceRepository">The device repository.</param> /// <param name="deviceRepository">The device repository.</param>
/// <param name="meterReadingRecordsRepository">The device repository.</param>
public SubscriberAppService(ILogger<SubscriberAppService> logger, public SubscriberAppService(ILogger<SubscriberAppService> logger,
ITcpService tcpService, IServiceProvider serviceProvider, ITcpService tcpService, IServiceProvider serviceProvider,
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository, IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository, IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IRepository<MessageReceived, Guid> messageReceivedEventRepository, IRepository<MessageReceived, Guid> messageReceivedEventRepository,
IRepository<Device, Guid> deviceRepository) IRepository<Device, Guid> deviceRepository, IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
{ {
_logger = logger; _logger = logger;
_tcpService = tcpService; _tcpService = tcpService;
@ -48,6 +52,7 @@ namespace JiShe.CollectBus.Subscribers
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
_messageReceivedEventRepository = messageReceivedEventRepository; _messageReceivedEventRepository = messageReceivedEventRepository;
_deviceRepository = deviceRepository; _deviceRepository = deviceRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
} }
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)] [CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
@ -90,8 +95,9 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
await protocolPlugin.AnalyzeAsync(receivedMessage); //todo 会根据不同的协议进行解析,然后做业务处理
await _messageReceivedEventRepository.InsertAsync(receivedMessage); TB3761FN fN = await protocolPlugin.AnalyzeAsync<TB3761FN>(receivedMessage);
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
} }
} }

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device; using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP; using DotNetCore.CAP;
@ -6,8 +7,10 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -27,6 +30,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
/// <summary> /// <summary>
@ -39,16 +43,34 @@ namespace JiShe.CollectBus.Subscribers
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger, public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
ITcpService tcpService, ITcpService tcpService,
IRepository<Device, Guid> deviceRepository, IRepository<Device, Guid> deviceRepository,
IMeterReadingRecordRepository meterReadingRecordsRepository,
IServiceProvider serviceProvider) IServiceProvider serviceProvider)
{ {
_logger = logger; _logger = logger;
_tcpService = tcpService; _tcpService = tcpService;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_deviceRepository = deviceRepository; _deviceRepository = deviceRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
} }
#region #region
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
/// <returns></returns>
[HttpGet]
[Route("ammeter/oneminute/issued-eventQuery")]
public async Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery()
{
var currentDateTime = DateTime.Now;
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
return list;
}
/// <summary> /// <summary>
/// 一分钟定时抄读任务消息消费订阅 /// 一分钟定时抄读任务消息消费订阅
/// </summary> /// </summary>
@ -67,7 +89,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
@ -94,7 +116,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
@ -121,7 +143,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
@ -150,7 +172,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
@ -177,7 +199,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
@ -204,7 +226,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));

View File

@ -36,6 +36,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
MSA.Add(i); MSA.Add(i);
} }
} }
/// <summary> /// <summary>
/// Gets the msa. /// Gets the msa.
/// </summary> /// </summary>

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
/// <summary>
/// 数据迁移状态
/// </summary>
public enum RecordsDataMigrationStatusEnum
{
/// <summary>
/// 未开始
/// </summary>
NotStarted = 0,
/// <summary>
/// 进行中
/// </summary>
InProgress = 1,
/// <summary>
/// 已完成
/// </summary>
Completed = 2,
/// <summary>
/// 已取消
/// </summary>
Cancelled = 3,
/// <summary>
/// 失败
/// </summary>
Failed = 4,
}
}

View File

@ -7,17 +7,17 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
/// <summary> /// <summary>
/// 定时抄读Kafka消息实体1分钟、5分钟、15分钟 /// 定时抄读Kafka消息实体1分钟、5分钟、15分钟
/// </summary> /// </summary>
public class ScheduledMeterReadingIssuedEventMessage : AggregateRoot<Guid> public class ScheduledMeterReadingIssuedEventMessage
{ {
/// <summary> /// <summary>
/// 消息内容 /// 下发消息内容
/// </summary> /// </summary>
public string MessageHexString { get; set; } public string MessageHexString { get; set; }
/// <summary> /// <summary>
/// 集中器编号 /// 集中器编号
/// </summary> /// </summary>
public string DeviceNo { get; set; } public string FocusAddress { get; set; }
/// <summary> /// <summary>
/// 采集时间间隔通过Kafka主题区分(分钟如15) /// 采集时间间隔通过Kafka主题区分(分钟如15)
@ -29,14 +29,5 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
/// </summary> /// </summary>
public string MessageId { get; set; } public string MessageId { get; set; }
/// <summary>
/// 是否下发成功
/// </summary>
public bool WasSuccessful { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreationTime { get; set; }
} }
} }

View File

@ -0,0 +1,135 @@
using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Entities.Auditing;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 抄读数据记录
/// </summary>
public class MeterReadingRecords : AggregateRoot<Guid>
{
/// <summary>
/// 是否手动操作
/// </summary>
public bool ManualOrNot { get; set; }
/// <summary>
/// 待抄读时间
/// </summary>
public DateTime PendingCopyReadTime { get; set; }
/// <summary>
/// 下发消息内容
/// </summary>
public string IssuedMessageHexString { get; set; }
/// <summary>
/// 下发消息Id
/// </summary>
public string IssuedMessageId { get; set; }
/// <summary>
/// 集中器ID
/// </summary>
public int FocusID { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 表Id
/// </summary>
public int MeterId { get; set; }
/// <summary>
/// 表地址
/// </summary>
public string MeterAddress { get; set; }
/// <summary>
/// 表类型
/// </summary>
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary>
/// AFN功能码
/// </summary>
public AFN AFN { get; set; }
/// <summary>
/// 抄读功能码
/// </summary>
public int Fn { get; set; }
/// <summary>
/// 抄读计量点
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 是否下发成功
/// </summary>
public bool WasSuccessful { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreationTime { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>
public string? ReceivedMessageHexString { get; set; }
/// <summary>
/// 消息上报时间
/// </summary>
public DateTime? ReceivedTime { get; set; }
/// <summary>
/// 上报消息Id
/// </summary>
public string ReceivedMessageId { get; set; }
/// <summary>
/// 数据迁移状态
/// </summary>
public RecordsDataMigrationStatusEnum MigrationStatus { get; set; }
/// <summary>
/// 数据结果,最终的解析报文结果值
/// </summary>
public string DataResult { get; set; }
/// <summary>
/// 数据迁移时间
/// </summary>
public DateTime? MigrationTime { get; set; }
public void CreateDataId(Guid Id)
{
this.Id = Id;
}
}
}

View File

@ -25,7 +25,6 @@ namespace JiShe.CollectBus.Host
typeof(AbpAspNetCoreSerilogModule), typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule), typeof(AbpSwashbuckleModule),
typeof(CollectBusApplicationModule), typeof(CollectBusApplicationModule),
typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule), typeof(AbpCachingStackExchangeRedisModule),
typeof(AbpBackgroundWorkersHangfireModule) typeof(AbpBackgroundWorkersHangfireModule)
)] )]

View File

@ -55,7 +55,6 @@
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -34,13 +34,13 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100" "CorsOrigins": "http://localhost:4200,http://localhost:3100"
}, },
"ConnectionStrings": { "ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin", "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
}, },
"Redis": { "Redis": {
"Configuration": "192.168.111.248:6379,password=123456abcD,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14", "DefaultDB": "14",
"HangfireDB": "15" "HangfireDB": "15"
}, },

View File

@ -1,8 +1,14 @@
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.ShardingStrategy;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver; using MongoDB.Driver;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Volo.Abp.Data; using Volo.Abp.Data;
using Volo.Abp.MongoDB; using Volo.Abp.MongoDB;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
@ -22,13 +28,48 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
public IMongoCollection<MessageReceivedHeartbeat> MessageReceivedHeartbeats => Collection<MessageReceivedHeartbeat>(); public IMongoCollection<MessageReceivedHeartbeat> MessageReceivedHeartbeats => Collection<MessageReceivedHeartbeat>();
public IMongoCollection<Device> Devices => Collection<Device>(); public IMongoCollection<Device> Devices => Collection<Device>();
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>(); public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
public IMongoCollection<ScheduledMeterReadingIssuedEventMessage> MeterReadingIssued => Collection<ScheduledMeterReadingIssuedEventMessage>();
protected override void CreateModel(IMongoModelBuilder modelBuilder) protected override void CreateModel(IMongoModelBuilder modelBuilder)
{ {
base.CreateModel(modelBuilder); base.CreateModel(modelBuilder);
modelBuilder.ConfigureCollectBus(); modelBuilder.ConfigureCollectBus();
}
modelBuilder.Entity<MeterReadingRecords>(builder =>
{
// 创建索引
builder.ConfigureIndexes(index =>
{
//List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
//createIndexModels.Add(new CreateIndexModel<BsonDocument>(
// Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
// new CreateIndexOptions
// {
// Unique = true
// }
// ));
//var indexKeys = Builders<BsonDocument>.IndexKeys
//.Ascending("CreationTime")
//.Ascending("OrderNumber");
//var indexOptions = new CreateIndexOptions
//{
// Background = true,
// Name = "IX_CreationTime_OrderNumber"
//};
//index.CreateOne(
//new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
index.CreateOne(new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
new CreateIndexOptions
{
Unique = true
}
));
});
});
}
} }

View File

@ -1,6 +1,15 @@
using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.ShardingStrategy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using Volo.Abp;
using Volo.Abp.AuditLogging.MongoDB; using Volo.Abp.AuditLogging.MongoDB;
using Volo.Abp.BackgroundJobs.MongoDB; using Volo.Abp.BackgroundJobs.MongoDB;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.Domain.Repositories.MongoDB;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Volo.Abp.MongoDB; using Volo.Abp.MongoDB;
using Volo.Abp.Uow; using Volo.Abp.Uow;
@ -20,6 +29,14 @@ public class CollectBusMongoDbModule : AbpModule
context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options => context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options =>
{ {
options.AddDefaultRepositories(); options.AddDefaultRepositories();
// 注册分表策略
context.Services.AddTransient(
typeof(IShardingStrategy<>),
typeof(DayShardingStrategy<>));
// 分表策略仓储 替换默认仓储
options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>();
}); });
context.Services.AddAlwaysDisableUnitOfWorkTransaction(); context.Services.AddAlwaysDisableUnitOfWorkTransaction();

View File

@ -0,0 +1,42 @@
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Repository.MeterReadingRecord
{
/// <summary>
/// 抄读仓储接口
/// </summary>
public interface IMeterReadingRecordRepository : IRepository<MeterReadingRecords, Guid>
{
/// <summary>
/// 批量插入
/// </summary>
/// <param name="entities"></param>
/// <param name="dateTime"></param>
/// <returns></returns>
Task InsertManyAsync(List<MeterReadingRecords> entities,
DateTime? dateTime);
/// <summary>
/// 单个插入
/// </summary>
/// <param name="entity"></param>
/// <param name="dateTime"></param>
/// <returns></returns>
Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime);
/// <summary>
/// 多集合数据查询
/// </summary>
/// <param name="startTime"></param>
/// <param name="endTime"></param>
/// <returns></returns>
Task<List<MeterReadingRecords>> ParallelQueryAsync(DateTime startTime, DateTime endTime);
}
}

View File

@ -0,0 +1,139 @@
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.MongoDB;
using JiShe.CollectBus.ShardingStrategy;
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Domain.Repositories.MongoDB;
using Volo.Abp.MongoDB;
using static System.Net.Mime.MediaTypeNames;
namespace JiShe.CollectBus.Repository.MeterReadingRecord
{
/// <summary>
/// 抄读记录仓储
/// </summary>
public class MeterReadingRecordRepository : MongoDbRepository<CollectBusMongoDbContext, MeterReadingRecords, Guid>, IMeterReadingRecordRepository
{
private readonly IShardingStrategy<MeterReadingRecords> _shardingStrategy;
private readonly IMongoDbContextProvider<CollectBusMongoDbContext> _dbContextProvider;
public MeterReadingRecordRepository(
IMongoDbContextProvider<CollectBusMongoDbContext> dbContextProvider,
IShardingStrategy<MeterReadingRecords> shardingStrategy
)
: base(dbContextProvider)
{
_dbContextProvider = dbContextProvider;
_shardingStrategy = shardingStrategy;
}
/// <summary>
/// 批量插入
/// </summary>
/// <param name="entities"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task<IEnumerable<MeterReadingRecords>> InsertManyAsync(IEnumerable<MeterReadingRecords> entities, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken))
{
var collection = await GetShardedCollection(DateTime.Now);
await collection.InsertManyAsync(entities);
return entities;
}
/// <summary>
/// 批量插入
/// </summary>
/// <param name="entities"></param>
/// <param name="dateTime"></param>
/// <returns></returns>
public async Task InsertManyAsync(List<MeterReadingRecords> entities, DateTime? dateTime)
{
var collection = await GetShardedCollection(dateTime);
await collection.InsertManyAsync(entities);
}
/// <summary>
/// 单条插入
/// </summary>
/// <param name="entity"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken))
{
var collection = await GetShardedCollection(DateTime.Now);
await collection.InsertOneAsync(entity);
return entity;
}
/// <summary>
/// 单条插入
/// </summary>
/// <param name="entity"></param>
/// <param name="dateTime"></param>
/// <returns></returns>
public async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime)
{
var collection = await GetShardedCollection(dateTime);
await collection.InsertOneAsync(entity);
return entity;
}
/// <summary>
/// 多集合数据查询
/// </summary>
/// <param name="startTime"></param>
/// <param name="endTime"></param>
/// <returns></returns>
public async Task<List<MeterReadingRecords>> ParallelQueryAsync(DateTime startTime, DateTime endTime)
{
var collectionNames = _shardingStrategy.GetQueryCollectionNames(startTime, endTime);
var database = await GetDatabaseAsync();
var tasks = collectionNames.Select(async name =>
{
var collection = database.GetCollection<MeterReadingRecords>(name);
var filter = Builders<MeterReadingRecords>.Filter.And(
Builders<MeterReadingRecords>.Filter.Gte(x => x.CreationTime, startTime),
Builders<MeterReadingRecords>.Filter.Lte(x => x.CreationTime, endTime)
);
return await collection.Find(filter).ToListAsync();
});
var results = await Task.WhenAll(tasks);
return results.SelectMany(r => r).ToList();
}
/// <summary>
/// 获得分片集合
/// </summary>
/// <returns></returns>
private async Task<IMongoCollection<MeterReadingRecords>> GetShardedCollection(DateTime? dateTime)
{
var database = await GetDatabaseAsync();
string collectionName = string.Empty;
if (dateTime != null)
{
collectionName = _shardingStrategy.GetCollectionName(dateTime.Value);
}
else
{
collectionName = _shardingStrategy.GetCurrentCollectionName();
}
return database.GetCollection<MeterReadingRecords>(collectionName);
}
}
}

View File

@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.ShardingStrategy
{
/// <summary>
/// 按天分表策略
/// </summary>
/// <typeparam name="TEntity"></typeparam>
public class DayShardingStrategy<TEntity> : IShardingStrategy<TEntity>
{
/// <summary>
/// 获取指定时间对应的集合名
/// </summary>
/// <param name="dateTime"></param>
/// <returns></returns>
public string GetCollectionName(DateTime dateTime)
{
var baseName = typeof(TEntity).Name;
return $"{baseName}_{dateTime:yyyyMMddHHmm}";
}
/// <summary>
/// 获取当前时间对应的集合名
/// </summary>
/// <returns></returns>
public string GetCurrentCollectionName()
{
var baseName = typeof(TEntity).Name;
return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}";
}
/// <summary>
/// 用于查询时确定目标集合
/// </summary>
/// <param name="startTime"></param>
/// <param name="endTime"></param>
/// <returns></returns>
public IEnumerable<string> GetQueryCollectionNames(DateTime? startTime, DateTime? endTime)
{
var months = new List<string>();
var current = startTime ?? DateTime.MinValue;
var end = endTime ?? DateTime.MaxValue;
var baseName = typeof(TEntity).Name;
while (current <= end)
{
months.Add($"{baseName}_{current:yyyyMMddHHmm}");
current = current.AddMonths(1);
}
return months.Distinct();
}
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.ShardingStrategy
{
/// <summary>
/// 数据存储分片策略
/// </summary>
/// <typeparam name="TEntity"></typeparam>
public interface IShardingStrategy<TEntity>
{
/// <summary>
/// 获取指定时间对应的集合名
/// </summary>
/// <returns></returns>
string GetCollectionName(DateTime dateTime);
/// <summary>
/// 获取当前时间对应的集合名
/// </summary>
/// <returns></returns>
string GetCurrentCollectionName();
/// <summary>
/// 用于查询时确定目标集合
/// </summary>
/// <param name="startTime"></param>
/// <param name="endTime"></param>
/// <returns></returns>
IEnumerable<string> GetQueryCollectionNames(DateTime? startTime = null,
DateTime? endTime = null);
}
}

View File

@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
//await _protocolInfoCache.Get() //await _protocolInfoCache.Get()
} }
public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null); public abstract Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761FN;
/// <summary> /// <summary>
/// 登录帧解析 /// 登录帧解析

View File

@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
Task AddAsync(); Task AddAsync();
Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null); Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761FN;
Task LoginAsync(MessageReceivedLogin messageReceived); Task LoginAsync(MessageReceivedLogin messageReceived);

View File

@ -8,8 +8,8 @@ using JiShe.CollectBus.Protocol.Contracts.Models;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
namespace JiShe.CollectBus.Protocol namespace JiShe.CollectBus.Protocol
{ {
public class StandardProtocolPlugin: BaseProtocolPlugin public class StandardProtocolPlugin : BaseProtocolPlugin
{ {
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class. /// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
@ -21,12 +21,15 @@ namespace JiShe.CollectBus.Protocol
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
public override Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null) public override async Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null)
{ {
var hexStringList = messageReceived.MessageHexString.StringToPairs(); var hexStringList = messageReceived.MessageHexString.StringToPairs();
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
T analyze = default;
switch ((AFN)afn) switch ((AFN)afn)
{ {
case AFN.: case AFN.:
@ -35,15 +38,15 @@ namespace JiShe.CollectBus.Protocol
case AFN.: break; case AFN.: break;
case AFN.: break; case AFN.: break;
case AFN.: case AFN.:
if (Enum.IsDefined(typeof(ATypeOfDataItems), fn)) //Enum.TryParse(afn.ToString(), out ATypeOfDataItems parseResult) if (Enum.IsDefined(typeof(ATypeOfDataItems), fn))
{ {
AnalyzeReadingDataAsync(messageReceived, sendAction); analyze = (T?)AnalyzeReadingDataAsync(messageReceived, sendAction);
} }
break; break;
case AFN.: case AFN.:
if (Enum.IsDefined(typeof(IIdataTypeItems), fn)) if (Enum.IsDefined(typeof(IIdataTypeItems), fn))
{ {
AnalyzeReadingTdcDataAsync(messageReceived, sendAction); analyze = (T?)AnalyzeReadingTdcDataAsync(messageReceived, sendAction);
} }
break; break;
case AFN.: case AFN.:
@ -51,7 +54,7 @@ namespace JiShe.CollectBus.Protocol
break; break;
} }
throw new NotImplementedException(); return await Task.FromResult(analyze);
} }
#region #region