调整kafka结构

This commit is contained in:
zenghongyao 2025-04-19 00:35:21 +08:00
commit 2612cd5d0b
19 changed files with 527 additions and 475 deletions

View File

@ -59,5 +59,10 @@ namespace JiShe.CollectBus.Kafka.Internal
/// </summary>
public string? SaslPassword { get; set; }
/// <summary>
/// 首次采集时间
/// </summary>
public DateTime FirstCollectionTime { get; set; }
}
}

View File

@ -65,7 +65,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
//await dbContext.InitAmmeterCacheData();
await dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData();
//初始化主题信息

View File

@ -26,7 +26,11 @@ using System.Diagnostics;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
<<<<<<< HEAD
using JiShe.CollectBus.Kafka.Internal;
=======
using JiShe.CollectBus.Common.Extensions;
>>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172
namespace JiShe.CollectBus.Samples;
@ -243,6 +247,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
}
/// <summary>
/// 下一个采集时间点验证
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<DateTime> TestCalculateNextCollectionTime(string time, int timeDensity)
{
DateTime nextTaskTime = Convert.ToDateTime(time);
return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity));
}
public Task<SampleDto> GetAsync()
{
return Task.FromResult(

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Ammeters;
using DnsClient.Protocol;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
@ -8,6 +9,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
@ -24,9 +26,12 @@ using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Interface;
using static FreeSql.Internal.GlobalFilter;
<<<<<<< HEAD
using JiShe.CollectBus.Kafka.Internal;
=======
using static System.Runtime.InteropServices.JavaScript.JSType;
>>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -101,6 +106,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return;
}
var currentTime = DateTime.Now;
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
@ -130,70 +137,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
var timer = Stopwatch.StartNew();
//_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}");
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
//处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, groupIndex) =>
// {
// _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
// }
//);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
}
);
timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
//todo 水表任务创建待处理
//await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos);
_ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity,
taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{
//AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
});
}
else
{
@ -205,7 +173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
}
@ -230,23 +198,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
#if DEBUG
//var timeDensity = "15";
//string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
////获取缓存中的电表信息
//var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
////List<string> focusAddressDataLista = new List<string>();
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//foreach (var item in tempMeterInfos)
//{
// var tempData = item.Adapt<AmmeterInfo>();
// tempData.FocusId = item.FocusID;
// tempData.MeterId = item.Id;
// meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress);
//}
return;
@ -258,23 +210,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
List<string> focusAddressDataLista = new List<string>();
var timer1 = Stopwatch.StartNew();
//decimal? cursor = null;
//string member = null;
//bool hasNext;
//do
//{
// var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
var allIds = new HashSet<string>();
decimal? score = null;
@ -306,8 +241,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
return;
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
//return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
@ -329,6 +264,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
TimeDensity = item.Key,
NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
@ -403,25 +354,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
ammeterInfos.Add(ammeter);
//keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
}
//await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
TimeDensity = itemTimeDensity.Key,
NextTaskTime = DateTime.Now.AddMinutes(1)
};
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
//初始化设备组负载控制
@ -446,63 +385,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
//获取缓存中的电表信息
int timeDensity = 1;
int timeDensity = 5;
var currentTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
var options = new ParallelOptions
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
MaxDegreeOfParallelism = recommendedThreads,
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_= _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
//_dbProvider.SwitchSessionPool(true);
//await _dbProvider.InsertAsync(meterTaskInfosList);
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
}
_ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
});
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
_logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
await Task.CompletedTask;
}
@ -516,57 +421,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 5;
var currentTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
var options = new ParallelOptions
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
MaxDegreeOfParallelism = recommendedThreads,
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
}
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
_logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
_ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
});
}
/// <summary>
@ -575,12 +445,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
//获取缓存中的电表信息
int timeDensity = 15;
var currentDateTime = DateTime.Now;
var currentTime = DateTime.Now;
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
@ -589,23 +456,43 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
MaxDegreeOfParallelism = recommendedThreads,
};
string taskBatch = "20250417155016";
var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>();
_ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
});
}
/// <summary>
/// 创建电表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
{
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
@ -614,82 +501,39 @@ namespace JiShe.CollectBus.ScheduledMeterReading
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
_= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
}
);
} while (hasNext);
});
//var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
//var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
// return;
//}
////获取下发任务缓存数据
//Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
//if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
// return;
//}
//List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
////将取出的缓存任务数据发送到Kafka消息队列中
//foreach (var focusItem in meterTaskInfos)
//{
// foreach (var ammerterItem in focusItem.Value)
// {
// var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
// {
// MessageHexString = ammerterItem.Value.IssuedMessageHexString,
// MessageId = ammerterItem.Value.IssuedMessageId,
// FocusAddress = ammerterItem.Value.FocusAddress,
// TimeDensity = timeDensity.ToString(),
// };
// //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// //_ = _producerBus.Publish(tempMsg);
// meterTaskInfosList.Add(ammerterItem.Value);
// }
//}
//if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
//{
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
//}
//stopwatch.Stop();
//_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
timer.Stop();
_logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
}
/// <summary>
/// 电表创建发布任务
/// 创建电表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private void AmmerterCreatePublishTask(int timeDensity
private void AmmerterCreatePublishTaskAction(int timeDensity
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
@ -781,7 +625,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
//Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes)
@ -801,11 +644,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
TelemetryPacketResponse builderResponse = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
}
else
{
@ -814,7 +662,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
dataInfos = handler(new TelemetryPacketRequest()
builderResponse = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
@ -829,7 +677,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0)
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
@ -850,36 +698,28 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusId = ammeterInfo.FocusId,
AFN = aFN,
Fn = fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
};
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
taskList.Add(meterReadingRecords);
}
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
//return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
//using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// pipe.HSet(redisCacheKey, keyValuePairs);
// object[] ret = pipe.EndPipe();
//}
if (taskList == null
|| taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
return;
}
@ -906,76 +746,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// taskList);
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingTelemetryPacketInfo taskRecord,int partition)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
}
await _producerService.ProduceAsync(topicName, partition, taskRecord);
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
{
var currentDateTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType);
//FreeRedisProvider.Instance.key()
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
}
#endregion
@ -1044,6 +814,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
}
@ -1109,12 +881,58 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
}
/// <summary>
/// 创建水表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="meterInfo">水表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private void WatermeterCreatePublishTaskAction(int timeDensity
, WatermeterInfo meterInfo, int groupIndex, string taskBatch)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var taskInfo = new MeterReadingTelemetryPacketInfo()
{
Seq= null,
};
//
Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address);
using (var pipe = FreeRedisProvider.Instance.StartPipe())
{
// 主数据存储Hash
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
// Set索引缓存
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId);
// ZSET索引缓存Key
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
pipe.EndPipe();
}
}
#endregion
#region
/// <summary>
/// 判断是否需要生成采集指令
/// </summary>
@ -1131,39 +949,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return false;
}
///// <summary>
///// 指定时间对比当前时间
///// </summary>
///// <param name="lastTime"></param>
///// <param name="subtrahend"></param>
///// <returns></returns>
//private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
//{
// if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
// return false;
// return true;
//}
///// <summary>
///// 缓存下一个时间的任务
///// </summary>
///// <param name="timeDensity">采集频率</param>
///// <param name="meterType">表类型</param>
///// <returns></returns>
//private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType)
//{
// //缓存下一个时间的任务
// TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
// {
// TimeDensity = timeDensity,
// NextTask = DateTime.Now.AddMinutes(timeDensity)
// };
// var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity);
// await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
//}
/// <summary>
/// 获取缓存表计下发指令缓存key前缀
/// </summary>
@ -1175,6 +960,130 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
}
/// <summary>
/// 创建表的待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
private async Task CreateMeterPublishTask<T>(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action<int, T, int, string> taskCreateAction) where T : DeviceCacheBasicModel
{
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
List<T> meterInfos = new List<T>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<T>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
taskCreateAction(timeDensity, data, groupIndex, taskBatch);
}
);
timer.Stop();
_logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
}
/// <summary>
/// 创建Kafka消息
/// </summary>
/// <param name="redisCacheTelemetryPacketInfoHashKey"></param>
/// <param name="redisCacheTelemetryPacketInfoZSetScoresIndexKey"></param>
/// <returns></returns>
private async Task CreateMeterKafkaTaskMessage(
string redisCacheTelemetryPacketInfoHashKey,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
{
if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey))
{
throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败参数异常,-101");
}
decimal? cursor = null;
string member = null;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
do
{
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: page.Items,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
}
);
} while (hasNext);
stopwatch.Stop();
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessageAction(string topicName,
MeterReadingTelemetryPacketInfo taskRecord, int partition)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
await _producerService.ProduceAsync(topicName, partition, taskRecord);
}
#endregion
}

View File

@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.Common.Consts;
@ -27,13 +28,15 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
CronExpression = "* 10 * * * *";
CronExpression = "0 0/1 * * * *";
TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
{
_logger.LogError($"{DateTime.Now}");
// await _scheduledMeterReadingService.CreateToBeIssueTasks();
}
}

View File

@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.ScheduledMeterReading;
@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
CronExpression = "* 15 * * * *";
CronExpression = "0 0/15 * * * *";
TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.ScheduledMeterReading;
@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
CronExpression = "* 5 * * * *";
CronExpression = "0 0/5 * * * *";
TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.ScheduledMeterReading;
@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberOneMinuteWorker);
CronExpression = "* 1 * * * *";
CronExpression = "0 0/1 * * * *";
TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -27,11 +27,6 @@ namespace JiShe.CollectBus.Ammeters
/// </summary>
public string Name { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 集中器地址
/// </summary>

View File

@ -34,7 +34,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary>
/// 任务数据唯一标记
/// </summary>
public string TaskMark { get; set; }
public decimal TaskMark { get; set; }
/// <summary>
/// 时间戳标记IoTDB时间列处理上报通过构建标记获取唯一标记匹配时间戳。
@ -97,6 +97,20 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// </summary>
public string ItemCode { get; set;}
/// <summary>
/// 帧序列域SEQ
/// </summary>
public required Seq Seq { get; set; }
/// <summary>
/// 地址域A3的主站地址MSA
/// </summary>
public int MSA { get; set; }
/// <summary>
/// 是否发送
/// </summary>
public bool IsSend { get; set; }
/// <summary>
/// 创建时间
@ -133,6 +147,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// </summary>
public string ReceivedRemark { get; set; }
/// <summary>
/// 是否已上报
/// </summary>
public bool IsReceived { get; set; }
//public void CreateDataId(Guid Id)
//{
// this.Id = Id;

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Enums;
using FreeSql.DataAnnotations;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
@ -13,6 +14,18 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// </summary>
public class WatermeterInfo: DeviceCacheBasicModel
{
/// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
[Column(IsIgnore = true)]
public override string MemberId => $"{FocusId}:{MeterId}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
[Column(IsIgnore = true)]
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
/// <summary>
/// 水表名称
/// </summary>
@ -22,11 +35,6 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// </summary>
public string Password { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 一个集中器下的[MeteringCode]必须唯一。 PN
/// </summary>

View File

@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
/// <param name="request.FocusAddress"></param>
/// <param name="request.Fn"></param>
/// <param name="request.Pn"></param>
public delegate byte[] AFNDelegate(TelemetryPacketRequest request);
public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request);
/// <summary>
/// 编码与方法的映射表
@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
}
#region AFN_00H
public static byte[] AFN00_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -64,13 +64,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_01H
public static byte[] AFN01_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -89,13 +89,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_02H
public static byte[] AFN02_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -114,12 +114,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_04H
public static byte[] AFN04_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -138,13 +138,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_05H
public static byte[] AFN05_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -163,12 +163,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_09H
public static byte[] AFN09_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -187,13 +187,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0AH
public static byte[] AFN0A_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -212,12 +212,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0CH
public static byte[] AFN0C_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -236,12 +236,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0DH
public static byte[] AFN0D_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -260,12 +260,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN10H
public static byte[] AFN10_Fn_Send(TelemetryPacketRequest request)
public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@ -283,8 +283,8 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Pn = request.Pn,
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter,request.DataUnit);
return bytes;
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit);
return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#region SpecialAmmeter

View File

@ -0,0 +1,30 @@
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.BuildSendDatas
{
/// <summary>
/// 报文构建返回结果
/// </summary>
public class TelemetryPacketResponse
{
/// <summary>
/// 帧序列域SEQ
/// </summary>
public required Seq Seq { get; set; }
/// <summary>
/// 地址域A3的主站地址MSA
/// </summary>
public int MSA { get; set; }
/// <summary>
/// 报文体
/// </summary>
public required byte[] Data { get; set; }
}
}

View File

@ -11,25 +11,31 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public class CommonConst
{
/// <summary>
/// 服务器标识
/// </summary>
public const string ServerTagName = "ServerTagName";
/// <summary>
/// Kafka
/// </summary>
public const string Kafka = "Kafka";
/// <summary>
/// 服务器标识
/// </summary>
public const string ServerTagName = $"{Kafka}:ServerTagName";
/// <summary>
/// Kafka副本数量
/// </summary>
public const string KafkaReplicationFactor = "KafkaReplicationFactor";
public const string KafkaReplicationFactor = $"{Kafka}:KafkaReplicationFactor";
/// <summary>
/// Kafka主题分区数量
/// </summary>
public const string NumPartitions = "NumPartitions";
public const string NumPartitions = $"{Kafka}:NumPartitions";
/// <summary>
/// 首次采集时间
/// </summary>
public const string FirstCollectionTime = "FirstCollectionTime";
}
}

View File

@ -200,5 +200,38 @@ namespace JiShe.CollectBus.Common.Extensions
{
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
}
/// <summary>
/// 采集时间节点计算
/// </summary>
/// <param name="referenceTime">待采集时间</param>
/// <param name="interval"></param>
/// <returns></returns>
public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval)
{
// 计算精确到分钟的基准时间
var baseTime = new DateTime(
referenceTime.Year,
referenceTime.Month,
referenceTime.Day,
referenceTime.Hour,
referenceTime.Minute,
0);
// 计算总分钟数和下一个间隔点
int totalMinutes = baseTime.Hour * 60 + baseTime.Minute;
int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval;
// 处理跨天情况
int daysToAdd = nextTotalMinutes / (24 * 60);
int remainingMinutes = nextTotalMinutes % (24 * 60);
int hours = remainingMinutes / 60;
int minutes = remainingMinutes % 60;
return baseTime.Date
.AddDays(daysToAdd)
.AddHours(hours)
.AddMinutes(minutes);
}
}
}

View File

@ -767,10 +767,13 @@ namespace JiShe.CollectBus.Common.Helpers
/// <param name="afn"></param>
/// <param name="fn"></param>
/// <param name="pn"></param>
/// <param name="msa"></param>
/// <returns></returns>
public static string GetTaskMark(int afn,int fn,int pn)
public static decimal GetTaskMark(int afn, int fn, int pn, int msa)
{
return $"{afn.ToString().PadLeft(2,'0')}{fn}{pn}";
var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
return Convert.ToInt32(makstr) << 32 | msa;
}
}
}

View File

@ -30,5 +30,15 @@ namespace JiShe.CollectBus.Common.Models
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId;
/// <summary>
/// 是否已处理
/// </summary>
public virtual bool IsHandle { get; set; } = false;
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set;}
}
}

View File

@ -84,7 +84,12 @@
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
<<<<<<< HEAD
"ServerTagName": "JiSheCollectBus99"
=======
"ServerTagName": "JiSheCollectBus3",
"FirstCollectionTime": "2025-04-18 00:00:00"
>>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172
},
"IoTDBOptions": {
"UserName": "root",
@ -95,7 +100,10 @@
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
<<<<<<< HEAD
"ServerTagName": "JiSheCollectBus",
=======
>>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172
"Cassandra": {
"ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy