dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
7 changed files with 73 additions and 38 deletions
Showing only changes of commit 5a294b437c - Show all commits

View File

@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly ICapPublisher _capBus;
private readonly IPublishEndpoint _producerBus;
private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
@ -32,16 +32,16 @@ namespace JiShe.CollectBus.Plugins
/// <summary>
///
/// </summary>
/// <param name="capBus"></param>
/// <param name="producerBus"></param>
/// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param>
public TcpMonitor(ICapPublisher capBus,
public TcpMonitor(IPublishEndpoint producerBus,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache)
{
_capBus = capBus;
_producerBus = producerBus;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
@ -161,7 +161,9 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
await _producerBus.Publish( messageReceivedLoginEvent);
}
private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
@ -198,12 +200,13 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
await _producerBus.Publish(new MessageReceived
{
ClientId = client.Id,
ClientIp = client.IP,
@ -212,6 +215,16 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
});
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
}
}
}

View File

@ -39,15 +39,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _capBus;
private readonly IPublishEndpoint _producerBus;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher capBus,
IPublishEndpoint producerBus,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{
_capBus = capBus;
_producerBus = producerBus;
_logger = logger;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
}
@ -298,8 +298,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
_= _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
@ -363,7 +365,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
//_= _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
@ -433,9 +437,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//await _massTransitBus.Publish(tempMsg);
_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
@ -823,7 +827,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
@ -889,7 +896,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
@ -954,7 +964,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}

View File

@ -29,7 +29,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository)
IPublishEndpoint producerBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, producerBus, meterReadingRecordsRepository)
{
}

View File

@ -62,19 +62,18 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
public async Task IssuedEvent(IssuedEventMessage issuedEventMessage)
{
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 回复下发内容IssuedEvent:{issuedEventMessage.MessageId}");
{
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
_logger.LogInformation($"IssuedEvent:{issuedEventMessage.MessageId}");
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
break;
case IssuedEventType.Login:
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;

View File

@ -21,6 +21,7 @@ using JiShe.CollectBus.Consumers;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Confluent.Kafka;
namespace JiShe.CollectBus.Host
@ -279,9 +280,13 @@ namespace JiShe.CollectBus.Host
/// <param name="configuration">The configuration.</param>
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{
//context.Services.AddSingleton<IBus>();
context.Services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddConfigureEndpointsCallback((c, name, cfg) =>
{
@ -308,37 +313,38 @@ namespace JiShe.CollectBus.Host
rider.UsingKafka((c, cfg) =>
{
cfg.Host(configuration.GetConnectionString("Kafka"));
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberReceivedHeartbeatEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer<ReceivedHeartbeatConsumer>(c);
configurator.ConfigureConsumeTopology = false;
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberReceivedLoginEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer<ReceivedLoginConsumer>(c);
configurator.ConfigureConsumeTopology = false;
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer<ReceivedConsumer>(c);
configurator.ConfigureConsumeTopology = false;
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer<IssuedConsumer>(c);
configurator.ConfigureConsumeTopology = false;
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
//cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
//{
// configurator.ConfigureConsumer<ScheduledMeterReadingConsumer>(c);
// configurator.ConfigureConsumeTopology = false;
//});
cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer<ScheduledMeterReadingConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
});
});
});

View File

@ -43,8 +43,8 @@ namespace JiShe.CollectBus.Host
ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration);
ConfigureHangfire(context);
ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration);
//ConfigureCap(context, configuration);
ConfigureMassTransit(context, configuration);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
}

View File

@ -11,12 +11,13 @@ using JiShe.CollectBus.Protocol.Contracts.AnalysisData;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class BaseProtocolPlugin : IProtocolPlugin
{
private readonly ICapPublisher _capBus;
private readonly IPublishEndpoint _producerBus;
private readonly ILogger<BaseProtocolPlugin> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
@ -36,7 +37,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_capBus = serviceProvider.GetRequiredService<ICapPublisher>();
_producerBus = serviceProvider.GetRequiredService<IPublishEndpoint>();
}
public abstract ProtocolInfo Info { get; }
@ -86,7 +87,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
//await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
/// <summary>
@ -124,7 +126,9 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
//await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}
}