修改代码

This commit is contained in:
ChenYi 2025-04-10 17:06:53 +08:00
parent 8257d6b129
commit 3a61b46036
9 changed files with 116 additions and 19 deletions

View File

@ -70,6 +70,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息 //默认初始化表计信息
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
} }
} }

View File

@ -225,9 +225,9 @@ namespace JiShe.CollectBus.Plugins
//}); //});
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
await _producerBus.PublishAsync(topicName, new MessageReceived await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{ {
ClientId = client.Id, ClientId = client.Id,
ClientIp = client.IP, ClientIp = client.IP,

View File

@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
@ -31,6 +32,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository; private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDBProvider _dbProvider;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class. /// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
@ -48,7 +50,9 @@ namespace JiShe.CollectBus.Subscribers
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository, IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository, IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IRepository<MessageReceived, Guid> messageReceivedEventRepository, IRepository<MessageReceived, Guid> messageReceivedEventRepository,
IRepository<Device, Guid> deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository) IRepository<Device, Guid> deviceRepository,
IIoTDBProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{ {
_logger = logger; _logger = logger;
_tcpService = tcpService; _tcpService = tcpService;
@ -58,6 +62,7 @@ namespace JiShe.CollectBus.Subscribers
_messageReceivedEventRepository = messageReceivedEventRepository; _messageReceivedEventRepository = messageReceivedEventRepository;
_deviceRepository = deviceRepository; _deviceRepository = deviceRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
} }
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] [CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
@ -119,6 +124,8 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] [CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task ReceivedEvent(MessageReceived receivedMessage) public async Task ReceivedEvent(MessageReceived receivedMessage)
{ {
var currentTime = Clock.Now;
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
@ -126,6 +133,7 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
//todo 会根据不同的协议进行解析,然后做业务处理 //todo 会根据不同的协议进行解析,然后做业务处理
TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage); TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage);
if(fN == null) if(fN == null)
@ -140,11 +148,8 @@ namespace JiShe.CollectBus.Subscribers
return; return;
} }
//todo 查找是否有下发任务 //报文入库
var entity = new MeterReadingRecords()
await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords()
{ {
ReceivedMessageHexString = receivedMessage.MessageHexString, ReceivedMessageHexString = receivedMessage.MessageHexString,
AFN = fN.Afn, AFN = fN.Afn,
@ -152,8 +157,18 @@ namespace JiShe.CollectBus.Subscribers
Pn = 0, Pn = 0,
FocusAddress = "", FocusAddress = "",
MeterAddress = "", MeterAddress = "",
//DataResult = tb3761FN.Text, };
});
//如果没数据,则插入,有数据则更新
var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime);
if (updateEntity == null)
{
await _meterReadingRecordsRepository.InsertAsync(entity, currentTime);
}
_dbProvider.InsertAsync();
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage); //await _messageReceivedEventRepository.InsertAsync(receivedMessage);
} }

View File

@ -175,7 +175,11 @@ namespace JiShe.CollectBus.Common.Extensions
/// <returns></returns> /// <returns></returns>
public static string GetDataTableShardingStrategy(this DateTime dateTime) public static string GetDataTableShardingStrategy(this DateTime dateTime)
{ {
#if DEBUG
return $"{dateTime:yyyyMMddHHmm}"; return $"{dateTime:yyyyMMddHHmm}";
#else
return $"{dateTime:yyyyMMddHH}";
#endif
} }
} }
} }

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IotSystems.AFNEntity
{
/// <summary>
/// AFN单项数据实体
/// </summary>
public class AFNDataEntity
{
}
}

View File

@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host
ConfigureHangfire(context); ConfigureHangfire(context);
ConfigureCap(context, configuration); ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration); //ConfigureMassTransit(context, configuration);
//ConfigureKafkaTopic(context, configuration); ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
} }

View File

@ -87,7 +87,6 @@
"SaslMechanism": "PLAIN", "SaslMechanism": "PLAIN",
"SaslUserName": "lixiao", "SaslUserName": "lixiao",
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"ServerTagName": "JiSheCollectBus",
"NumPartitions": 50 "NumPartitions": 50
}, },
"IoTDBOptions": { "IoTDBOptions": {
@ -98,5 +97,6 @@
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
} },
"ServerTagName": "JiSheCollectBus"
} }

View File

@ -103,8 +103,6 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
{ {
var collection = await GetShardedCollection(entity.CreationTime); var collection = await GetShardedCollection(entity.CreationTime);
var dbContext = await DbContextProvider.GetDbContextAsync();
await collection.UpdateOneAsync(filter, update); await collection.UpdateOneAsync(filter, update);
return entity; return entity;
} }
@ -120,8 +118,8 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime) public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
{ {
var collection = await GetShardedCollection(dateTime); var collection = await GetShardedCollection(dateTime);
//await collection.findon var query = await collection.FindAsync(d => d.CreationTime == dateTime.Value && d.AFN == entity.AFN && d.Fn == entity.Fn && d.FocusAddress == entity.FocusAddress);
throw new NotImplementedException(); return await query.FirstOrDefaultAsync();
} }
/// <summary> /// <summary>

View File

@ -106,7 +106,71 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// <summary> /// <summary>
/// AFN上行主题格式 /// AFN上行主题格式
/// </summary> /// </summary>
public const string AFNTopicNameFormat = "received.afn{0}.event"; public const string AFNTopicNameFormat = "received.afn{0}h.event";
/// <summary>
/// AFN00H上行主题格式
/// </summary>
public const string SubscriberAFN00ReceivedEventNameTemp = "received.afn00h.event";
/// <summary>
/// AFN01H上行主题格式
/// </summary>
public const string SubscriberAFN00HReceivedEventNameTemp = "received.afn01h.event";
/// <summary>
/// AFN02H上行主题格式
/// </summary>
public const string SubscriberAFN01HReceivedEventNameTemp = "received.afn02h.event";
/// <summary>
/// AFN03H上行主题格式
/// </summary>
public const string SubscriberAFN02HReceivedEventNameTemp = "received.afn03h.event";
/// <summary>
/// AFN04H上行主题格式
/// </summary>
public const string SubscriberAFN04HReceivedEventNameTemp = "received.afn04h.event";
/// <summary>
/// AFN05H上行主题格式
/// </summary>
public const string SubscriberAFN05HReceivedEventNameTemp = "received.afn05h.event";
/// <summary>
/// AFN09H上行主题格式
/// </summary>
public const string SubscriberAFN09HReceivedEventNameTemp = "received.afn09h.event";
/// <summary>
/// AFN0AH上行主题格式
/// </summary>
public const string SubscriberAFN0AHReceivedEventNameTemp = "received.afn10h.event";
/// <summary>
/// AFN0BH上行主题格式
/// </summary>
public const string SubscriberAFN0BHReceivedEventNameTemp = "received.afn11h.event";
/// <summary>
/// AFN0CH上行主题格式
/// </summary>
public const string SubscriberAFN0CHReceivedEventNameTemp = "received.afn12h.event";
/// <summary>
/// AFN0DH上行主题格式
/// </summary>
public const string SubscriberAFN0DHReceivedEventNameTemp = "received.afn13h.event";
/// <summary>
/// AFN0EH上行主题格式
/// </summary>
public const string SubscriberAFN0EHReceivedEventNameTemp = "received.afn14h.event";
/// <summary>
/// AFN10H上行主题格式
/// </summary>
public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event";
} }
} }