diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 3195961..78bc5eb 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -70,6 +70,7 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); } } diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 7224cdb..ea84d9b 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -225,9 +225,9 @@ namespace JiShe.CollectBus.Plugins //}); - string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); + //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); - await _producerBus.PublishAsync(topicName, new MessageReceived + await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived { ClientId = client.Id, ClientIp = client.IP, diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index a801454..a9cb5f8 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; @@ -31,6 +32,7 @@ namespace JiShe.CollectBus.Subscribers private readonly IRepository _messageReceivedEventRepository; private readonly IRepository _deviceRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; + private readonly IIoTDBProvider _dbProvider; /// /// Initializes a new instance of the class. @@ -48,7 +50,9 @@ namespace JiShe.CollectBus.Subscribers IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, IRepository messageReceivedEventRepository, - IRepository deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository) + IRepository deviceRepository, + IIoTDBProvider dbProvider, + IMeterReadingRecordRepository meterReadingRecordsRepository) { _logger = logger; _tcpService = tcpService; @@ -58,6 +62,7 @@ namespace JiShe.CollectBus.Subscribers _messageReceivedEventRepository = messageReceivedEventRepository; _deviceRepository = deviceRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository; + _dbProvider = dbProvider; } [CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] @@ -119,6 +124,8 @@ namespace JiShe.CollectBus.Subscribers [CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { + var currentTime = Clock.Now; + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { @@ -126,6 +133,7 @@ namespace JiShe.CollectBus.Subscribers } else { + //todo 会根据不同的协议进行解析,然后做业务处理 TB3761 fN = await protocolPlugin.AnalyzeAsync(receivedMessage); if(fN == null) @@ -140,11 +148,8 @@ namespace JiShe.CollectBus.Subscribers return; } - //todo 查找是否有下发任务 - - - - await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords() + //报文入库 + var entity = new MeterReadingRecords() { ReceivedMessageHexString = receivedMessage.MessageHexString, AFN = fN.Afn, @@ -152,8 +157,18 @@ namespace JiShe.CollectBus.Subscribers Pn = 0, FocusAddress = "", MeterAddress = "", - //DataResult = tb3761FN.Text, - }); + }; + + //如果没数据,则插入,有数据则更新 + var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime); + if (updateEntity == null) + { + await _meterReadingRecordsRepository.InsertAsync(entity, currentTime); + } + + + _dbProvider.InsertAsync(); + //todo 查找是否有下发任务 //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } diff --git a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 904d4b9..7734288 100644 --- a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -175,7 +175,11 @@ namespace JiShe.CollectBus.Common.Extensions /// public static string GetDataTableShardingStrategy(this DateTime dateTime) { +#if DEBUG return $"{dateTime:yyyyMMddHHmm}"; +#else + return $"{dateTime:yyyyMMddHH}"; +#endif } } } diff --git a/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs b/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs new file mode 100644 index 0000000..386baf3 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IotSystems.AFNEntity +{ + /// + /// AFN单项数据实体 + /// + public class AFNDataEntity + { + } +} diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index aabc2ba..31f2651 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host ConfigureHangfire(context); ConfigureCap(context, configuration); //ConfigureMassTransit(context, configuration); - //ConfigureKafkaTopic(context, configuration); + ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); ConfigureCustom(context, configuration); } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 2217c9e..21ae13c 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -87,7 +87,6 @@ "SaslMechanism": "PLAIN", "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", - "ServerTagName": "JiSheCollectBus", "NumPartitions": 50 }, "IoTDBOptions": { @@ -98,5 +97,6 @@ "DataBaseName": "energy", "OpenDebugMode": true, "UseTableSessionPoolByDefault": false - } + }, + "ServerTagName": "JiSheCollectBus" } \ No newline at end of file diff --git a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs index 7e34031..cb35b5c 100644 --- a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs +++ b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs @@ -88,7 +88,7 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord public async Task InsertAsync(MeterReadingRecords entity, DateTime? dateTime) { var collection = await GetShardedCollection(dateTime); - await collection.InsertOneAsync(entity); + await collection.InsertOneAsync(entity); return entity; } @@ -103,8 +103,6 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord { var collection = await GetShardedCollection(entity.CreationTime); - var dbContext = await DbContextProvider.GetDbContextAsync(); - await collection.UpdateOneAsync(filter, update); return entity; } @@ -120,8 +118,8 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord public async Task FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime) { var collection = await GetShardedCollection(dateTime); - //await collection.findon - throw new NotImplementedException(); + var query = await collection.FindAsync(d => d.CreationTime == dateTime.Value && d.AFN == entity.AFN && d.Fn == entity.Fn && d.FocusAddress == entity.FocusAddress); + return await query.FirstOrDefaultAsync(); } /// diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs index 707dfea..e7caa1c 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs @@ -106,7 +106,71 @@ namespace JiShe.CollectBus.Protocol.Contracts /// /// AFN上行主题格式 /// - public const string AFNTopicNameFormat = "received.afn{0}.event"; + public const string AFNTopicNameFormat = "received.afn{0}h.event"; + /// + /// AFN00H上行主题格式 + /// + public const string SubscriberAFN00ReceivedEventNameTemp = "received.afn00h.event"; + + /// + /// AFN01H上行主题格式 + /// + public const string SubscriberAFN00HReceivedEventNameTemp = "received.afn01h.event"; + + /// + /// AFN02H上行主题格式 + /// + public const string SubscriberAFN01HReceivedEventNameTemp = "received.afn02h.event"; + + /// + /// AFN03H上行主题格式 + /// + public const string SubscriberAFN02HReceivedEventNameTemp = "received.afn03h.event"; + + /// + /// AFN04H上行主题格式 + /// + public const string SubscriberAFN04HReceivedEventNameTemp = "received.afn04h.event"; + + /// + /// AFN05H上行主题格式 + /// + public const string SubscriberAFN05HReceivedEventNameTemp = "received.afn05h.event"; + + /// + /// AFN09H上行主题格式 + /// + public const string SubscriberAFN09HReceivedEventNameTemp = "received.afn09h.event"; + + /// + /// AFN0AH上行主题格式 + /// + public const string SubscriberAFN0AHReceivedEventNameTemp = "received.afn10h.event"; + + /// + /// AFN0BH上行主题格式 + /// + public const string SubscriberAFN0BHReceivedEventNameTemp = "received.afn11h.event"; + + /// + /// AFN0CH上行主题格式 + /// + public const string SubscriberAFN0CHReceivedEventNameTemp = "received.afn12h.event"; + + /// + /// AFN0DH上行主题格式 + /// + public const string SubscriberAFN0DHReceivedEventNameTemp = "received.afn13h.event"; + + /// + /// AFN0EH上行主题格式 + /// + public const string SubscriberAFN0EHReceivedEventNameTemp = "received.afn14h.event"; + + /// + /// AFN10H上行主题格式 + /// + public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event"; } }