Compare commits
No commits in common. "e7f94ceae4e9ffca4e081f0b22deb96da9278f89" and "69cad7bdee366cf365e5a7427675d09848eab68a" have entirely different histories.
e7f94ceae4
...
69cad7bdee
@ -1,20 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.DataMigration
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 数据迁移服务
|
|
||||||
/// </summary>
|
|
||||||
public interface IDataMigrationService
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 开始迁移
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task StartMigrationAsync();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,39 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.DataMigration.Options
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 数据迁移配置
|
|
||||||
/// </summary>
|
|
||||||
public class DataMigrationOptions
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// MongoDb每批处理量
|
|
||||||
/// </summary>
|
|
||||||
public int MongoDbDataBatchSize { get; set; } = 1000;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量处理通道容量
|
|
||||||
/// </summary>
|
|
||||||
public int ChannelCapacity { get; set; } = 100;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据库 每批处理量
|
|
||||||
/// </summary>
|
|
||||||
public int SqlBulkBatchSize { get; set; } = 1000;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据库 每批处理超时时间
|
|
||||||
/// </summary>
|
|
||||||
public int SqlBulkTimeout { get; set; } = 60;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 处理器数量
|
|
||||||
/// </summary>
|
|
||||||
public int ProcessorsCount { get; set; } = 4;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -26,7 +26,6 @@
|
|||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -1,7 +1,5 @@
|
|||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
|
|
||||||
@ -14,9 +12,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
{
|
{
|
||||||
|
|
||||||
#region 电表消息采集
|
#region 电表消息采集
|
||||||
|
|
||||||
Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery();
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集电表数据下行消息消费订阅
|
/// 1分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -13,7 +13,6 @@ using Volo.Abp.AspNetCore.Mvc.AntiForgery;
|
|||||||
using JiShe.CollectBus.FreeRedisProvider;
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using JiShe.CollectBus.Workers;
|
using JiShe.CollectBus.Workers;
|
||||||
using Volo.Abp.BackgroundWorkers.Hangfire;
|
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
using JiShe.CollectBus.MongoDB;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
namespace JiShe.CollectBus;
|
||||||
|
|
||||||
@ -23,7 +22,6 @@ namespace JiShe.CollectBus;
|
|||||||
typeof(AbpDddApplicationModule),
|
typeof(AbpDddApplicationModule),
|
||||||
typeof(AbpAutoMapperModule),
|
typeof(AbpAutoMapperModule),
|
||||||
typeof(AbpBackgroundWorkersHangfireModule),
|
typeof(AbpBackgroundWorkersHangfireModule),
|
||||||
typeof(CollectBusMongoDbModule),
|
|
||||||
typeof(CollectBusFreeRedisModule),
|
typeof(CollectBusFreeRedisModule),
|
||||||
typeof(CollectBusFreeSqlModule)
|
typeof(CollectBusFreeSqlModule)
|
||||||
)]
|
)]
|
||||||
|
|||||||
@ -3,7 +3,6 @@ using System.Collections.Generic;
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -49,7 +48,7 @@ namespace JiShe.CollectBus.Consumers
|
|||||||
var list = new List<MessageReceived>();
|
var list = new List<MessageReceived>();
|
||||||
foreach (var contextItem in context.Message)
|
foreach (var contextItem in context.Message)
|
||||||
{
|
{
|
||||||
await protocolPlugin.AnalyzeAsync<TB3761FN>(contextItem.Message);
|
await protocolPlugin.AnalyzeAsync(contextItem.Message);
|
||||||
list.Add(contextItem.Message);
|
list.Add(contextItem.Message);
|
||||||
}
|
}
|
||||||
await _messageReceivedEventRepository.InsertManyAsync(list);
|
await _messageReceivedEventRepository.InsertManyAsync(list);
|
||||||
|
|||||||
@ -1,153 +0,0 @@
|
|||||||
using JiShe.CollectBus.DataMigration.Options;
|
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using LiteDB;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using System;
|
|
||||||
using System.Data;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Threading.Channels;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Domain.Repositories;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.DataMigration
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 数据迁移服务
|
|
||||||
/// </summary>
|
|
||||||
public class DataMigrationService: CollectBusAppService, IDataMigrationService
|
|
||||||
{
|
|
||||||
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
|
|
||||||
private readonly DataMigrationOptions _options;
|
|
||||||
|
|
||||||
|
|
||||||
public DataMigrationService(IOptions<DataMigrationOptions> options,
|
|
||||||
IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
|
|
||||||
{
|
|
||||||
_options = options.Value;
|
|
||||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 开始迁移
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task StartMigrationAsync()
|
|
||||||
{
|
|
||||||
var rawDataChannel = Channel.CreateBounded<MeterReadingRecords[]>(new BoundedChannelOptions(_options.ChannelCapacity)
|
|
||||||
{
|
|
||||||
SingleWriter = false,
|
|
||||||
SingleReader = false,
|
|
||||||
FullMode = BoundedChannelFullMode.Wait
|
|
||||||
});
|
|
||||||
|
|
||||||
var cleanDataChannel = Channel.CreateBounded<DataTable>(new BoundedChannelOptions(_options.ChannelCapacity)
|
|
||||||
{
|
|
||||||
SingleWriter = false,
|
|
||||||
SingleReader = false,
|
|
||||||
FullMode = BoundedChannelFullMode.Wait
|
|
||||||
});
|
|
||||||
|
|
||||||
// 启动生产者和消费者
|
|
||||||
var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer));
|
|
||||||
|
|
||||||
var processors = Enumerable.Range(0, _options.ProcessorsCount)
|
|
||||||
.Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer)))
|
|
||||||
.ToArray();
|
|
||||||
|
|
||||||
var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader));
|
|
||||||
|
|
||||||
await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer }));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 生产者,生产数据,主要是从MongoDB中读取数据
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="writer"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task ProduceDataAsync(ChannelWriter<MeterReadingRecords[]> writer)
|
|
||||||
{
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
|
|
||||||
var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
|
|
||||||
.Take(_options.MongoDbDataBatchSize)
|
|
||||||
.ToArray();
|
|
||||||
|
|
||||||
if (batchRecords == null || batchRecords.Length == 0)
|
|
||||||
{
|
|
||||||
writer.Complete();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
await writer.WriteAsync(batchRecords);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 清洗数据
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="reader"></param>
|
|
||||||
/// <param name="writer"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task ProcessDataAsync(ChannelReader<MeterReadingRecords[]> reader, ChannelWriter<DataTable> writer)
|
|
||||||
{
|
|
||||||
await foreach (var batch in reader.ReadAllAsync())
|
|
||||||
{
|
|
||||||
//var dataTable = new DataTable();
|
|
||||||
//dataTable.Columns.Add("Id", typeof(string));
|
|
||||||
//dataTable.Columns.Add("CleanName", typeof(string));
|
|
||||||
//dataTable.Columns.Add("ProcessedTime", typeof(DateTime));
|
|
||||||
|
|
||||||
//foreach (var doc in batch)
|
|
||||||
//{
|
|
||||||
// // 业务清洗逻辑
|
|
||||||
// var cleanName = doc["name"].AsString.Trim().ToUpper();
|
|
||||||
// dataTable.Rows.Add(
|
|
||||||
// doc["_id"].ToString(),
|
|
||||||
// cleanName,
|
|
||||||
// DateTime.UtcNow);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//await writer.WriteAsync(dataTable);
|
|
||||||
|
|
||||||
// 批量更新标记
|
|
||||||
var ids = batch.Select(d => d.Id).ToArray();
|
|
||||||
foreach (var item in batch)
|
|
||||||
{
|
|
||||||
item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
|
|
||||||
item.MigrationTime = DateTime.Now;
|
|
||||||
}
|
|
||||||
|
|
||||||
await _meterReadingRecordsRepository.UpdateManyAsync(batch);
|
|
||||||
}
|
|
||||||
writer.Complete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消费清洗后的数据入库
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="reader"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task ConsumeDataAsync(ChannelReader<DataTable> reader)
|
|
||||||
{
|
|
||||||
//await using var connection = new SqlConnection(_sqlConnectionString);
|
|
||||||
//await connection.OpenAsync();
|
|
||||||
|
|
||||||
//await foreach (var dataTable in reader.ReadAllAsync())
|
|
||||||
//{
|
|
||||||
// using var bulkCopy = new SqlBulkCopy(connection)
|
|
||||||
// {
|
|
||||||
// DestinationTableName = "CleanData",
|
|
||||||
// BatchSize = 5000,
|
|
||||||
// BulkCopyTimeout = 300
|
|
||||||
// };
|
|
||||||
|
|
||||||
// bulkCopy.ColumnMappings.Add("Id", "Id");
|
|
||||||
// bulkCopy.ColumnMappings.Add("CleanName", "CleanName");
|
|
||||||
// bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime");
|
|
||||||
|
|
||||||
// await bulkCopy.WriteToServerAsync(dataTable);
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -25,7 +25,10 @@
|
|||||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||||
|
<ProjectReference Include="..\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|||||||
@ -18,11 +18,8 @@ using JiShe.CollectBus.GatherItem;
|
|||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Repository;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using JiShe.CollectBus.Workers;
|
using JiShe.CollectBus.Workers;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using MassTransit.Internals.GraphValidation;
|
using MassTransit.Internals.GraphValidation;
|
||||||
@ -39,17 +36,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
||||||
private readonly ICapPublisher _capBus;
|
private readonly ICapPublisher _capBus;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
private readonly IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> _meterReadingIssuedRepository;
|
||||||
|
|
||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
ICapPublisher capBus,
|
ICapPublisher capBus,
|
||||||
IMeterReadingRecordRepository meterReadingRecordsRepository)
|
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository)
|
||||||
{
|
{
|
||||||
_capBus = capBus;
|
_capBus = capBus;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
_meterReadingIssuedRepository = meterReadingIssuedRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -102,7 +99,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
||||||
var tempArryay = item.Split(":");
|
var tempArryay = item.Split(":");
|
||||||
string meteryType = tempArryay[3];//表计类别
|
string meteryType = tempArryay[3];//表计类别
|
||||||
int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率
|
string timeDensity = tempArryay[4];//采集频率
|
||||||
|
|
||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*";
|
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*";
|
||||||
@ -116,7 +113,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
||||||
{
|
{
|
||||||
// 解析结果(结果为嵌套数组)
|
// 解析结果(结果为嵌套数组)
|
||||||
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
|
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, timeDensity, meteryType);
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
|
||||||
@ -277,40 +274,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
|
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
foreach (var focusItem in meterTaskInfos)
|
foreach (var focusItem in meterTaskInfos)
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value);
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
|
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
|
||||||
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
@ -334,48 +320,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 5;
|
int timeDensity = 5;
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
||||||
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
|
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
foreach (var focusItem in meterTaskInfos)
|
foreach (var focusItem in meterTaskInfos)
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value);
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
|
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
|
||||||
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
@ -400,50 +375,41 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 15;
|
int timeDensity = 15;
|
||||||
var currentDateTime = DateTime.Now;
|
|
||||||
|
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
||||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
|
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
foreach (var focusItem in meterTaskInfos)
|
foreach (var focusItem in meterTaskInfos)
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
//todo 可能需要优化,如果使用等待,会很慢,但使用不等待,mongodb 连接池又没法抗住,先发送微妙级的延时队列消息,暂时先这样处理
|
||||||
{
|
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value);
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
|
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
//删除任务数据
|
||||||
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
@ -466,19 +432,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
|
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
|
||||||
/// <param name="focusGroup">集中器数据分组</param>
|
/// <param name="focusGroup">集中器数据分组</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
private async Task AmmerterScheduledMeterReadingIssued(string timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||||
{
|
{
|
||||||
if (timeDensity <= 0)
|
if (string.IsNullOrWhiteSpace(timeDensity) || focusGroup == null || focusGroup.Count <= 0)
|
||||||
{
|
|
||||||
timeDensity = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (timeDensity > 15)
|
|
||||||
{
|
|
||||||
timeDensity = 15;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( focusGroup == null || focusGroup.Count <= 0)
|
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
||||||
return;
|
return;
|
||||||
@ -536,13 +492,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="timeDensity">采集频率</param>
|
/// <param name="timeDensity">采集频率</param>
|
||||||
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
|
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task AmmerterCreatePublishTask(int timeDensity
|
private async Task AmmerterCreatePublishTask(string timeDensity
|
||||||
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||||
{
|
{
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
var HandlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
|
||||||
foreach (var focusInfo in focusGroup)
|
foreach (var focusInfo in focusGroup)
|
||||||
{
|
{
|
||||||
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
||||||
@ -631,7 +586,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
|
Dictionary<string, ScheduledMeterReadingIssuedEventMessage> keyValuePairs = new Dictionary<string, ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
foreach (var tempItem in tempCodes)
|
foreach (var tempItem in tempCodes)
|
||||||
{
|
{
|
||||||
@ -660,7 +615,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
||||||
//特殊表暂不处理
|
//特殊表暂不处理
|
||||||
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
if (HandlerPacketBuilder != null && HandlerPacketBuilder.TryGetValue(methonCode
|
||||||
, out var handler))
|
, out var handler))
|
||||||
{
|
{
|
||||||
dataInfos = handler(new TelemetryPacketRequest()
|
dataInfos = handler(new TelemetryPacketRequest()
|
||||||
@ -684,26 +639,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var evenMessageInfo = new ScheduledMeterReadingIssuedEventMessage
|
||||||
|
|
||||||
var meterReadingRecords = new MeterReadingRecords()
|
|
||||||
{
|
{
|
||||||
PendingCopyReadTime = pendingCopyReadTime,
|
MessageHexString = Convert.ToHexString(dataInfos),
|
||||||
|
DeviceNo = ammeter.FocusAddress,
|
||||||
|
MessageId = NewId.NextGuid().ToString(),
|
||||||
|
TimeDensity = timeDensity,
|
||||||
|
WasSuccessful = false,
|
||||||
CreationTime = currentTime,
|
CreationTime = currentTime,
|
||||||
MeterAddress = ammeter.AmmerterAddress,
|
|
||||||
MeterId = ammeter.ID,
|
|
||||||
MeterType = MeterTypeEnum.Ammeter,
|
|
||||||
FocusAddress = ammeter.FocusAddress,
|
|
||||||
FocusID = ammeter.FocusID,
|
|
||||||
AFN = aFN,
|
|
||||||
Fn = fn,
|
|
||||||
Pn = ammeter.MeteringCode,
|
|
||||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
|
||||||
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
|
||||||
};
|
};
|
||||||
meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", evenMessageInfo);
|
||||||
|
|
||||||
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords);
|
|
||||||
}
|
}
|
||||||
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
||||||
}
|
}
|
||||||
@ -797,40 +742,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
foreach (var focusItem in meterTaskInfos)
|
foreach (var focusItem in meterTaskInfos)
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value);
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
|
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
|
||||||
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
@ -855,48 +789,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 5;
|
int timeDensity = 5;
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
||||||
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
foreach (var focusItem in meterTaskInfos)
|
foreach (var focusItem in meterTaskInfos)
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value);
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
|
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
|
||||||
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
@ -920,48 +843,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 15;
|
int timeDensity = 15;
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
||||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
|
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
foreach (var focusItem in meterTaskInfos)
|
foreach (var focusItem in meterTaskInfos)
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value);
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
|
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
|
||||||
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
|
|||||||
@ -7,10 +7,7 @@ using JiShe.CollectBus.Common.Consts;
|
|||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Repository;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Volo.Abp.Domain.Repositories;
|
using Volo.Abp.Domain.Repositories;
|
||||||
@ -27,7 +24,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
|
|
||||||
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||||
ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
|
ICapPublisher capBus,
|
||||||
|
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository)
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,10 +5,8 @@ using JiShe.CollectBus.Common.Enums;
|
|||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using TouchSocket.Sockets;
|
using TouchSocket.Sockets;
|
||||||
@ -25,7 +23,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
|
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
|
||||||
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
|
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
|
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
|
||||||
@ -37,13 +34,12 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
|
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
|
||||||
/// <param name="messageReceivedEventRepository">The message received event repository.</param>
|
/// <param name="messageReceivedEventRepository">The message received event repository.</param>
|
||||||
/// <param name="deviceRepository">The device repository.</param>
|
/// <param name="deviceRepository">The device repository.</param>
|
||||||
/// <param name="meterReadingRecordsRepository">The device repository.</param>
|
|
||||||
public SubscriberAppService(ILogger<SubscriberAppService> logger,
|
public SubscriberAppService(ILogger<SubscriberAppService> logger,
|
||||||
ITcpService tcpService, IServiceProvider serviceProvider,
|
ITcpService tcpService, IServiceProvider serviceProvider,
|
||||||
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
||||||
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
|
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
|
||||||
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
|
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
|
||||||
IRepository<Device, Guid> deviceRepository, IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
|
IRepository<Device, Guid> deviceRepository)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
@ -52,7 +48,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
|
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
|
||||||
_messageReceivedEventRepository = messageReceivedEventRepository;
|
_messageReceivedEventRepository = messageReceivedEventRepository;
|
||||||
_deviceRepository = deviceRepository;
|
_deviceRepository = deviceRepository;
|
||||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
|
||||||
@ -95,9 +90,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//todo 会根据不同的协议进行解析,然后做业务处理
|
await protocolPlugin.AnalyzeAsync(receivedMessage);
|
||||||
TB3761FN fN = await protocolPlugin.AnalyzeAsync<TB3761FN>(receivedMessage);
|
await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
||||||
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using DeviceDetectorNET.Parser.Device;
|
using DeviceDetectorNET.Parser.Device;
|
||||||
using DotNetCore.CAP;
|
using DotNetCore.CAP;
|
||||||
@ -7,10 +6,8 @@ using JiShe.CollectBus.Common.Enums;
|
|||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -30,7 +27,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -43,34 +39,16 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
||||||
ITcpService tcpService,
|
ITcpService tcpService,
|
||||||
IRepository<Device, Guid> deviceRepository,
|
IRepository<Device, Guid> deviceRepository,
|
||||||
IMeterReadingRecordRepository meterReadingRecordsRepository,
|
|
||||||
IServiceProvider serviceProvider)
|
IServiceProvider serviceProvider)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
_serviceProvider = serviceProvider;
|
_serviceProvider = serviceProvider;
|
||||||
_deviceRepository = deviceRepository;
|
_deviceRepository = deviceRepository;
|
||||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#region 电表消息采集
|
#region 电表消息采集
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 一分钟定时抄读任务消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
[HttpGet]
|
|
||||||
[Route("ammeter/oneminute/issued-eventQuery")]
|
|
||||||
public async Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery()
|
|
||||||
{
|
|
||||||
var currentDateTime = DateTime.Now;
|
|
||||||
|
|
||||||
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
|
|
||||||
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 一分钟定时抄读任务消息消费订阅
|
/// 一分钟定时抄读任务消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -89,7 +67,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||||
if (device != null)
|
if (device != null)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
@ -116,7 +94,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||||
if (device != null)
|
if (device != null)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
@ -143,7 +121,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||||
if (device != null)
|
if (device != null)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
@ -172,7 +150,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||||
if (device != null)
|
if (device != null)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
@ -199,7 +177,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||||
if (device != null)
|
if (device != null)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
@ -226,7 +204,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||||
if (device != null)
|
if (device != null)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
|
|||||||
@ -36,7 +36,6 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
|
|||||||
MSA.Add(i);
|
MSA.Add(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets the msa.
|
/// Gets the msa.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -1,39 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.Enums
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 数据迁移状态
|
|
||||||
/// </summary>
|
|
||||||
public enum RecordsDataMigrationStatusEnum
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 未开始
|
|
||||||
/// </summary>
|
|
||||||
NotStarted = 0,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 进行中
|
|
||||||
/// </summary>
|
|
||||||
InProgress = 1,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 已完成
|
|
||||||
/// </summary>
|
|
||||||
Completed = 2,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 已取消
|
|
||||||
/// </summary>
|
|
||||||
Cancelled = 3,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 失败
|
|
||||||
/// </summary>
|
|
||||||
Failed = 4,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -7,17 +7,17 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 定时抄读Kafka消息实体,1分钟、5分钟、15分钟
|
/// 定时抄读Kafka消息实体,1分钟、5分钟、15分钟
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class ScheduledMeterReadingIssuedEventMessage
|
public class ScheduledMeterReadingIssuedEventMessage : AggregateRoot<Guid>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 下发消息内容
|
/// 消息内容
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string MessageHexString { get; set; }
|
public string MessageHexString { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 集中器编号
|
/// 集中器编号
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string FocusAddress { get; set; }
|
public string DeviceNo { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 采集时间间隔,通过Kafka主题区分(分钟,如15)
|
/// 采集时间间隔,通过Kafka主题区分(分钟,如15)
|
||||||
@ -29,5 +29,14 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public string MessageId { get; set; }
|
public string MessageId { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 是否下发成功
|
||||||
|
/// </summary>
|
||||||
|
public bool WasSuccessful { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 创建时间
|
||||||
|
/// </summary>
|
||||||
|
public DateTime CreationTime { get; set; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,135 +0,0 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Domain.Entities;
|
|
||||||
using Volo.Abp.Domain.Entities.Auditing;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读数据记录
|
|
||||||
/// </summary>
|
|
||||||
public class MeterReadingRecords : AggregateRoot<Guid>
|
|
||||||
{
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 是否手动操作
|
|
||||||
/// </summary>
|
|
||||||
public bool ManualOrNot { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 待抄读时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime PendingCopyReadTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 下发消息内容
|
|
||||||
/// </summary>
|
|
||||||
public string IssuedMessageHexString { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 下发消息Id
|
|
||||||
/// </summary>
|
|
||||||
public string IssuedMessageId { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器ID
|
|
||||||
/// </summary>
|
|
||||||
public int FocusID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器地址
|
|
||||||
/// </summary>
|
|
||||||
public string FocusAddress { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 表Id
|
|
||||||
/// </summary>
|
|
||||||
public int MeterId { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 表地址
|
|
||||||
/// </summary>
|
|
||||||
public string MeterAddress { get; set; }
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 表类型
|
|
||||||
/// </summary>
|
|
||||||
public MeterTypeEnum MeterType { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 项目ID
|
|
||||||
/// </summary>
|
|
||||||
public int ProjectID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据库业务ID
|
|
||||||
/// </summary>
|
|
||||||
public int DatabaseBusiID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// AFN功能码
|
|
||||||
/// </summary>
|
|
||||||
public AFN AFN { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读功能码
|
|
||||||
/// </summary>
|
|
||||||
public int Fn { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读计量点
|
|
||||||
/// </summary>
|
|
||||||
public int Pn { get; set; }
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 是否下发成功
|
|
||||||
/// </summary>
|
|
||||||
public bool WasSuccessful { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 创建时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime CreationTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消息上报内容
|
|
||||||
/// </summary>
|
|
||||||
public string? ReceivedMessageHexString { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消息上报时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime? ReceivedTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 上报消息Id
|
|
||||||
/// </summary>
|
|
||||||
public string ReceivedMessageId { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据迁移状态
|
|
||||||
/// </summary>
|
|
||||||
public RecordsDataMigrationStatusEnum MigrationStatus { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据结果,最终的解析报文结果值
|
|
||||||
/// </summary>
|
|
||||||
public string DataResult { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据迁移时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime? MigrationTime { get; set; }
|
|
||||||
|
|
||||||
public void CreateDataId(Guid Id)
|
|
||||||
{
|
|
||||||
this.Id = Id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -25,6 +25,7 @@ namespace JiShe.CollectBus.Host
|
|||||||
typeof(AbpAspNetCoreSerilogModule),
|
typeof(AbpAspNetCoreSerilogModule),
|
||||||
typeof(AbpSwashbuckleModule),
|
typeof(AbpSwashbuckleModule),
|
||||||
typeof(CollectBusApplicationModule),
|
typeof(CollectBusApplicationModule),
|
||||||
|
typeof(CollectBusMongoDbModule),
|
||||||
typeof(AbpCachingStackExchangeRedisModule),
|
typeof(AbpCachingStackExchangeRedisModule),
|
||||||
typeof(AbpBackgroundWorkersHangfireModule)
|
typeof(AbpBackgroundWorkersHangfireModule)
|
||||||
)]
|
)]
|
||||||
|
|||||||
@ -55,6 +55,7 @@
|
|||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
|
||||||
|
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@ -34,13 +34,13 @@
|
|||||||
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
|
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
|
||||||
},
|
},
|
||||||
"ConnectionStrings": {
|
"ConnectionStrings": {
|
||||||
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
|
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin",
|
||||||
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
|
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
|
||||||
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
|
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
|
||||||
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
||||||
},
|
},
|
||||||
"Redis": {
|
"Redis": {
|
||||||
"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
"Configuration": "192.168.111.248:6379,password=123456abcD,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||||
"DefaultDB": "14",
|
"DefaultDB": "14",
|
||||||
"HangfireDB": "15"
|
"HangfireDB": "15"
|
||||||
},
|
},
|
||||||
|
|||||||
@ -1,14 +1,8 @@
|
|||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using JiShe.CollectBus.IotSystems.Protocols;
|
using JiShe.CollectBus.IotSystems.Protocols;
|
||||||
using JiShe.CollectBus.ShardingStrategy;
|
|
||||||
using MongoDB.Bson;
|
|
||||||
using MongoDB.Bson.Serialization;
|
|
||||||
using MongoDB.Driver;
|
using MongoDB.Driver;
|
||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using Volo.Abp.Data;
|
using Volo.Abp.Data;
|
||||||
using Volo.Abp.MongoDB;
|
using Volo.Abp.MongoDB;
|
||||||
using Volo.Abp.MultiTenancy;
|
using Volo.Abp.MultiTenancy;
|
||||||
@ -29,47 +23,12 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
|
|||||||
public IMongoCollection<Device> Devices => Collection<Device>();
|
public IMongoCollection<Device> Devices => Collection<Device>();
|
||||||
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
|
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
|
||||||
|
|
||||||
|
public IMongoCollection<ScheduledMeterReadingIssuedEventMessage> MeterReadingIssued => Collection<ScheduledMeterReadingIssuedEventMessage>();
|
||||||
|
|
||||||
protected override void CreateModel(IMongoModelBuilder modelBuilder)
|
protected override void CreateModel(IMongoModelBuilder modelBuilder)
|
||||||
{
|
{
|
||||||
base.CreateModel(modelBuilder);
|
base.CreateModel(modelBuilder);
|
||||||
|
|
||||||
modelBuilder.ConfigureCollectBus();
|
modelBuilder.ConfigureCollectBus();
|
||||||
|
|
||||||
modelBuilder.Entity<MeterReadingRecords>(builder =>
|
|
||||||
{
|
|
||||||
// 创建索引
|
|
||||||
builder.ConfigureIndexes(index =>
|
|
||||||
{
|
|
||||||
//List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
|
|
||||||
//createIndexModels.Add(new CreateIndexModel<BsonDocument>(
|
|
||||||
// Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
|
||||||
// new CreateIndexOptions
|
|
||||||
// {
|
|
||||||
// Unique = true
|
|
||||||
// }
|
|
||||||
// ));
|
|
||||||
|
|
||||||
|
|
||||||
//var indexKeys = Builders<BsonDocument>.IndexKeys
|
|
||||||
//.Ascending("CreationTime")
|
|
||||||
//.Ascending("OrderNumber");
|
|
||||||
|
|
||||||
//var indexOptions = new CreateIndexOptions
|
|
||||||
//{
|
|
||||||
// Background = true,
|
|
||||||
// Name = "IX_CreationTime_OrderNumber"
|
|
||||||
//};
|
|
||||||
//index.CreateOne(
|
|
||||||
//new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
|
|
||||||
|
|
||||||
index.CreateOne(new CreateIndexModel<BsonDocument>(
|
|
||||||
Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
|
||||||
new CreateIndexOptions
|
|
||||||
{
|
|
||||||
Unique = true
|
|
||||||
}
|
|
||||||
));
|
|
||||||
});
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,15 +1,6 @@
|
|||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using JiShe.CollectBus.Repository;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using JiShe.CollectBus.ShardingStrategy;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
|
||||||
using System;
|
|
||||||
using Volo.Abp;
|
|
||||||
using Volo.Abp.AuditLogging.MongoDB;
|
using Volo.Abp.AuditLogging.MongoDB;
|
||||||
using Volo.Abp.BackgroundJobs.MongoDB;
|
using Volo.Abp.BackgroundJobs.MongoDB;
|
||||||
using Volo.Abp.Domain.Repositories;
|
|
||||||
using Volo.Abp.Domain.Repositories.MongoDB;
|
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
using Volo.Abp.MongoDB;
|
using Volo.Abp.MongoDB;
|
||||||
using Volo.Abp.Uow;
|
using Volo.Abp.Uow;
|
||||||
@ -29,14 +20,6 @@ public class CollectBusMongoDbModule : AbpModule
|
|||||||
context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options =>
|
context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options =>
|
||||||
{
|
{
|
||||||
options.AddDefaultRepositories();
|
options.AddDefaultRepositories();
|
||||||
|
|
||||||
// 注册分表策略
|
|
||||||
context.Services.AddTransient(
|
|
||||||
typeof(IShardingStrategy<>),
|
|
||||||
typeof(DayShardingStrategy<>));
|
|
||||||
|
|
||||||
// 分表策略仓储 替换默认仓储
|
|
||||||
options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
context.Services.AddAlwaysDisableUnitOfWorkTransaction();
|
context.Services.AddAlwaysDisableUnitOfWorkTransaction();
|
||||||
|
|||||||
@ -1,42 +0,0 @@
|
|||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Domain.Repositories;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读仓储接口
|
|
||||||
/// </summary>
|
|
||||||
public interface IMeterReadingRecordRepository : IRepository<MeterReadingRecords, Guid>
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 批量插入
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="entities"></param>
|
|
||||||
/// <param name="dateTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task InsertManyAsync(List<MeterReadingRecords> entities,
|
|
||||||
DateTime? dateTime);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 单个插入
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="entity"></param>
|
|
||||||
/// <param name="dateTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 多集合数据查询
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="startTime"></param>
|
|
||||||
/// <param name="endTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task<List<MeterReadingRecords>> ParallelQueryAsync(DateTime startTime, DateTime endTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,139 +0,0 @@
|
|||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
||||||
using JiShe.CollectBus.MongoDB;
|
|
||||||
using JiShe.CollectBus.ShardingStrategy;
|
|
||||||
using MongoDB.Bson;
|
|
||||||
using MongoDB.Driver;
|
|
||||||
using System;
|
|
||||||
using System.Collections;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Domain.Repositories.MongoDB;
|
|
||||||
using Volo.Abp.MongoDB;
|
|
||||||
using static System.Net.Mime.MediaTypeNames;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读记录仓储
|
|
||||||
/// </summary>
|
|
||||||
public class MeterReadingRecordRepository : MongoDbRepository<CollectBusMongoDbContext, MeterReadingRecords, Guid>, IMeterReadingRecordRepository
|
|
||||||
{
|
|
||||||
|
|
||||||
private readonly IShardingStrategy<MeterReadingRecords> _shardingStrategy;
|
|
||||||
private readonly IMongoDbContextProvider<CollectBusMongoDbContext> _dbContextProvider;
|
|
||||||
|
|
||||||
public MeterReadingRecordRepository(
|
|
||||||
IMongoDbContextProvider<CollectBusMongoDbContext> dbContextProvider,
|
|
||||||
IShardingStrategy<MeterReadingRecords> shardingStrategy
|
|
||||||
)
|
|
||||||
: base(dbContextProvider)
|
|
||||||
{
|
|
||||||
_dbContextProvider = dbContextProvider;
|
|
||||||
_shardingStrategy = shardingStrategy;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量插入
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="entities"></param>
|
|
||||||
/// <param name="cancellationToken"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public override async Task<IEnumerable<MeterReadingRecords>> InsertManyAsync(IEnumerable<MeterReadingRecords> entities, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken))
|
|
||||||
{
|
|
||||||
var collection = await GetShardedCollection(DateTime.Now);
|
|
||||||
await collection.InsertManyAsync(entities);
|
|
||||||
|
|
||||||
return entities;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量插入
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="entities"></param>
|
|
||||||
/// <param name="dateTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task InsertManyAsync(List<MeterReadingRecords> entities, DateTime? dateTime)
|
|
||||||
{
|
|
||||||
var collection = await GetShardedCollection(dateTime);
|
|
||||||
await collection.InsertManyAsync(entities);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 单条插入
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="entity"></param>
|
|
||||||
/// <param name="cancellationToken"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public override async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken))
|
|
||||||
{
|
|
||||||
var collection = await GetShardedCollection(DateTime.Now);
|
|
||||||
await collection.InsertOneAsync(entity);
|
|
||||||
return entity;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 单条插入
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="entity"></param>
|
|
||||||
/// <param name="dateTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime)
|
|
||||||
{
|
|
||||||
var collection = await GetShardedCollection(dateTime);
|
|
||||||
await collection.InsertOneAsync(entity);
|
|
||||||
return entity;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 多集合数据查询
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="startTime"></param>
|
|
||||||
/// <param name="endTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task<List<MeterReadingRecords>> ParallelQueryAsync(DateTime startTime, DateTime endTime)
|
|
||||||
{
|
|
||||||
var collectionNames = _shardingStrategy.GetQueryCollectionNames(startTime, endTime);
|
|
||||||
var database = await GetDatabaseAsync();
|
|
||||||
|
|
||||||
|
|
||||||
var tasks = collectionNames.Select(async name =>
|
|
||||||
{
|
|
||||||
var collection = database.GetCollection<MeterReadingRecords>(name);
|
|
||||||
var filter = Builders<MeterReadingRecords>.Filter.And(
|
|
||||||
Builders<MeterReadingRecords>.Filter.Gte(x => x.CreationTime, startTime),
|
|
||||||
Builders<MeterReadingRecords>.Filter.Lte(x => x.CreationTime, endTime)
|
|
||||||
);
|
|
||||||
return await collection.Find(filter).ToListAsync();
|
|
||||||
});
|
|
||||||
|
|
||||||
var results = await Task.WhenAll(tasks);
|
|
||||||
return results.SelectMany(r => r).ToList();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获得分片集合
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task<IMongoCollection<MeterReadingRecords>> GetShardedCollection(DateTime? dateTime)
|
|
||||||
{
|
|
||||||
var database = await GetDatabaseAsync();
|
|
||||||
string collectionName = string.Empty;
|
|
||||||
|
|
||||||
if (dateTime != null)
|
|
||||||
{
|
|
||||||
collectionName = _shardingStrategy.GetCollectionName(dateTime.Value);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
collectionName = _shardingStrategy.GetCurrentCollectionName();
|
|
||||||
}
|
|
||||||
|
|
||||||
return database.GetCollection<MeterReadingRecords>(collectionName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,59 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ShardingStrategy
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 按天分表策略
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TEntity"></typeparam>
|
|
||||||
public class DayShardingStrategy<TEntity> : IShardingStrategy<TEntity>
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 获取指定时间对应的集合名
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="dateTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public string GetCollectionName(DateTime dateTime)
|
|
||||||
{
|
|
||||||
var baseName = typeof(TEntity).Name;
|
|
||||||
return $"{baseName}_{dateTime:yyyyMMddHHmm}";
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取当前时间对应的集合名
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public string GetCurrentCollectionName()
|
|
||||||
{
|
|
||||||
var baseName = typeof(TEntity).Name;
|
|
||||||
return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}";
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 用于查询时确定目标集合
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="startTime"></param>
|
|
||||||
/// <param name="endTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public IEnumerable<string> GetQueryCollectionNames(DateTime? startTime, DateTime? endTime)
|
|
||||||
{
|
|
||||||
var months = new List<string>();
|
|
||||||
var current = startTime ?? DateTime.MinValue;
|
|
||||||
var end = endTime ?? DateTime.MaxValue;
|
|
||||||
var baseName = typeof(TEntity).Name;
|
|
||||||
|
|
||||||
while (current <= end)
|
|
||||||
{
|
|
||||||
months.Add($"{baseName}_{current:yyyyMMddHHmm}");
|
|
||||||
current = current.AddMonths(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
return months.Distinct();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ShardingStrategy
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 数据存储分片策略
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TEntity"></typeparam>
|
|
||||||
public interface IShardingStrategy<TEntity>
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 获取指定时间对应的集合名
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
string GetCollectionName(DateTime dateTime);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取当前时间对应的集合名
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
string GetCurrentCollectionName();
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 用于查询时确定目标集合
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="startTime"></param>
|
|
||||||
/// <param name="endTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
IEnumerable<string> GetQueryCollectionNames(DateTime? startTime = null,
|
|
||||||
DateTime? endTime = null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
//await _protocolInfoCache.Get()
|
//await _protocolInfoCache.Get()
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761FN;
|
public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 登录帧解析
|
/// 登录帧解析
|
||||||
|
|||||||
@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
|
|||||||
|
|
||||||
Task AddAsync();
|
Task AddAsync();
|
||||||
|
|
||||||
Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761FN;
|
Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null);
|
||||||
|
|
||||||
Task LoginAsync(MessageReceivedLogin messageReceived);
|
Task LoginAsync(MessageReceivedLogin messageReceived);
|
||||||
|
|
||||||
|
|||||||
@ -21,15 +21,12 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
|
|
||||||
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
|
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
|
||||||
|
|
||||||
public override async Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null)
|
public override Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null)
|
||||||
{
|
{
|
||||||
var hexStringList = messageReceived.MessageHexString.StringToPairs();
|
var hexStringList = messageReceived.MessageHexString.StringToPairs();
|
||||||
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
|
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
|
||||||
var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
|
var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
|
||||||
var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
|
var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
|
||||||
|
|
||||||
T analyze = default;
|
|
||||||
|
|
||||||
switch ((AFN)afn)
|
switch ((AFN)afn)
|
||||||
{
|
{
|
||||||
case AFN.确认或否认:
|
case AFN.确认或否认:
|
||||||
@ -38,15 +35,15 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
case AFN.设置参数: break;
|
case AFN.设置参数: break;
|
||||||
case AFN.查询参数: break;
|
case AFN.查询参数: break;
|
||||||
case AFN.请求实时数据:
|
case AFN.请求实时数据:
|
||||||
if (Enum.IsDefined(typeof(ATypeOfDataItems), fn))
|
if (Enum.IsDefined(typeof(ATypeOfDataItems), fn)) //Enum.TryParse(afn.ToString(), out ATypeOfDataItems parseResult)
|
||||||
{
|
{
|
||||||
analyze = (T?)AnalyzeReadingDataAsync(messageReceived, sendAction);
|
AnalyzeReadingDataAsync(messageReceived, sendAction);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case AFN.请求历史数据:
|
case AFN.请求历史数据:
|
||||||
if (Enum.IsDefined(typeof(IIdataTypeItems), fn))
|
if (Enum.IsDefined(typeof(IIdataTypeItems), fn))
|
||||||
{
|
{
|
||||||
analyze = (T?)AnalyzeReadingTdcDataAsync(messageReceived, sendAction);
|
AnalyzeReadingTdcDataAsync(messageReceived, sendAction);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case AFN.数据转发:
|
case AFN.数据转发:
|
||||||
@ -54,7 +51,7 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return await Task.FromResult(analyze);
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
#region 上行命令
|
#region 上行命令
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user