优化json序列化,移除EventBus组件
This commit is contained in:
parent
3d56d351d3
commit
4c94859286
@ -1,36 +1,24 @@
|
|||||||
using System.Linq;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Volo.Abp.AutoMapper;
|
|
||||||
using Volo.Abp.Modularity;
|
|
||||||
using Volo.Abp.Application;
|
|
||||||
using Volo.Abp.BackgroundWorkers;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp;
|
|
||||||
using System.Reflection;
|
|
||||||
using JiShe.CollectBus.FreeSql;
|
|
||||||
using System;
|
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using Volo.Abp.AspNetCore.Mvc.AntiForgery;
|
|
||||||
using JiShe.CollectBus.FreeRedisProvider;
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using JiShe.CollectBus.Workers;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using Volo.Abp.BackgroundWorkers.Hangfire;
|
|
||||||
using JiShe.CollectBus.MongoDB;
|
|
||||||
using JiShe.CollectBus.ScheduledMeterReading;
|
|
||||||
using AutoMapper.Configuration.Annotations;
|
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using Confluent.Kafka.Admin;
|
using JiShe.CollectBus.Kafka;
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using Thrift;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Volo.Abp.EventBus.Kafka;
|
|
||||||
using Volo.Abp.Kafka;
|
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
using JiShe.CollectBus.Kafka.AdminClient;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
|
using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Reflection;
|
||||||
|
using Volo.Abp;
|
||||||
|
using Volo.Abp.Application;
|
||||||
|
using Volo.Abp.AutoMapper;
|
||||||
|
using Volo.Abp.BackgroundWorkers;
|
||||||
|
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
using Volo.Abp.EventBus;
|
||||||
|
using Volo.Abp.Modularity;
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
namespace JiShe.CollectBus;
|
||||||
|
|
||||||
@ -42,8 +30,7 @@ namespace JiShe.CollectBus;
|
|||||||
typeof(AbpBackgroundWorkersHangfireModule),
|
typeof(AbpBackgroundWorkersHangfireModule),
|
||||||
typeof(CollectBusFreeRedisModule),
|
typeof(CollectBusFreeRedisModule),
|
||||||
typeof(CollectBusFreeSqlModule),
|
typeof(CollectBusFreeSqlModule),
|
||||||
typeof(AbpEventBusModule),
|
typeof(CollectBusKafkaModule),
|
||||||
typeof(AbpKafkaModule),
|
|
||||||
typeof(CollectBusIoTDBModule)
|
typeof(CollectBusIoTDBModule)
|
||||||
)]
|
)]
|
||||||
public class CollectBusApplicationModule : AbpModule
|
public class CollectBusApplicationModule : AbpModule
|
||||||
@ -56,44 +43,12 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
Configure<AbpAutoMapperOptions>(options =>
|
Configure<AbpAutoMapperOptions>(options =>
|
||||||
{
|
{
|
||||||
options.AddMaps<CollectBusApplicationModule>(validate: true);
|
options.AddMaps<CollectBusApplicationModule>(validate: true);
|
||||||
});
|
});
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(configuration.GetSection("Kafka"));
|
|
||||||
Configure<AbpKafkaEventBusOptions>(configuration.GetSection("Kafka:EventBus"));
|
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(options =>
|
|
||||||
{
|
|
||||||
options.ConfigureConsumer = config =>
|
|
||||||
{
|
|
||||||
config.GroupId = configuration.GetValue<string>("Kafka:Consumer:GroupId");
|
|
||||||
config.EnableAutoCommit = configuration.GetValue<bool>("Kafka:Consumer:EnableAutoCommit");
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(options =>
|
|
||||||
{
|
|
||||||
options.ConfigureProducer = config =>
|
|
||||||
{
|
|
||||||
config.MessageTimeoutMs = configuration.GetValue<int>("Kafka:Producer:MessageTimeoutMs");
|
|
||||||
config.Acks = (Acks)configuration.GetValue<int>("Kafka:Producer:Acks");
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(options =>
|
|
||||||
{
|
|
||||||
options.ConfigureTopic = specification =>
|
|
||||||
{
|
|
||||||
specification.ReplicationFactor = configuration.GetValue<short>("Kafka:Topic:ReplicationFactor");
|
|
||||||
specification.NumPartitions = configuration.GetValue<int>("Kafka:Topic:NumPartitions");
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void OnApplicationInitialization(
|
public override void OnApplicationInitialization(
|
||||||
ApplicationInitializationContext context)
|
ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
context.ServiceProvider.GetRequiredService<CustomKafkaDistributedEventBus>().Initialize();
|
|
||||||
|
|
||||||
var assembly = Assembly.GetExecutingAssembly();
|
var assembly = Assembly.GetExecutingAssembly();
|
||||||
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
|
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
|
||||||
foreach (var type in types)
|
foreach (var type in types)
|
||||||
|
|||||||
@ -1,251 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Reflection;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using Volo.Abp;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
using Volo.Abp.EventBus.Distributed;
|
|
||||||
using Volo.Abp.EventBus.Kafka;
|
|
||||||
using Volo.Abp.EventBus.Local;
|
|
||||||
using Volo.Abp.Guids;
|
|
||||||
using Volo.Abp.Kafka;
|
|
||||||
using Volo.Abp.MultiTenancy;
|
|
||||||
using Volo.Abp.Timing;
|
|
||||||
using Volo.Abp.Tracing;
|
|
||||||
using Volo.Abp.Uow;
|
|
||||||
using Volo.Abp.Threading;
|
|
||||||
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
|
||||||
|
|
||||||
[Dependency(ReplaceServices = true)]
|
|
||||||
[ExposeServices(typeof(IDistributedEventBus), typeof(CustomKafkaDistributedEventBus))]
|
|
||||||
public class CustomKafkaDistributedEventBus : DistributedEventBusBase, ISingletonDependency
|
|
||||||
{
|
|
||||||
|
|
||||||
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
|
|
||||||
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
|
|
||||||
protected IKafkaSerializer Serializer { get; }
|
|
||||||
protected IProducerPool ProducerPool { get; }
|
|
||||||
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
|
|
||||||
protected ConcurrentDictionary<string, Type> EventTypes { get; }
|
|
||||||
protected IKafkaMessageConsumer Consumer { get; private set; } = default!;
|
|
||||||
|
|
||||||
|
|
||||||
public CustomKafkaDistributedEventBus(IServiceScopeFactory serviceScopeFactory,
|
|
||||||
ICurrentTenant currentTenant,
|
|
||||||
IUnitOfWorkManager unitOfWorkManager,
|
|
||||||
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
|
|
||||||
IGuidGenerator guidGenerator,
|
|
||||||
IClock clock,
|
|
||||||
IEventHandlerInvoker eventHandlerInvoker,
|
|
||||||
ILocalEventBus localEventBus,
|
|
||||||
ICorrelationIdProvider correlationIdProvider,
|
|
||||||
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions,
|
|
||||||
IKafkaMessageConsumerFactory messageConsumerFactory,
|
|
||||||
IKafkaSerializer serializer,
|
|
||||||
IProducerPool producerPool)
|
|
||||||
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider)
|
|
||||||
{
|
|
||||||
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
|
|
||||||
MessageConsumerFactory = messageConsumerFactory;
|
|
||||||
Serializer = serializer;
|
|
||||||
ProducerPool = producerPool;
|
|
||||||
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
|
|
||||||
EventTypes = new ConcurrentDictionary<string, Type>();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void Initialize()
|
|
||||||
{
|
|
||||||
|
|
||||||
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
|
|
||||||
}
|
|
||||||
|
|
||||||
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
|
|
||||||
{
|
|
||||||
var handlerFactories = GetOrCreateHandlerFactories(eventType);
|
|
||||||
|
|
||||||
if (factory.IsInFactories(handlerFactories))
|
|
||||||
{
|
|
||||||
return NullDisposable.Instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
handlerFactories.Add(factory);
|
|
||||||
|
|
||||||
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
|
|
||||||
{
|
|
||||||
Check.NotNull(action, nameof(action));
|
|
||||||
|
|
||||||
GetOrCreateHandlerFactories(typeof(TEvent))
|
|
||||||
.Locking(factories =>
|
|
||||||
{
|
|
||||||
factories.RemoveAll(
|
|
||||||
factory =>
|
|
||||||
{
|
|
||||||
var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
|
|
||||||
if (singleInstanceFactory == null)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
|
|
||||||
if (actionHandler == null)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return actionHandler.Action == action;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Unsubscribe(Type eventType, IEventHandler handler)
|
|
||||||
{
|
|
||||||
GetOrCreateHandlerFactories(eventType)
|
|
||||||
.Locking(factories =>
|
|
||||||
{
|
|
||||||
factories.RemoveAll(
|
|
||||||
factory =>
|
|
||||||
factory is SingleInstanceHandlerFactory handlerFactory &&
|
|
||||||
handlerFactory.HandlerInstance == handler
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
|
|
||||||
{
|
|
||||||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void UnsubscribeAll(Type eventType)
|
|
||||||
{
|
|
||||||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
|
|
||||||
{
|
|
||||||
var headers = new Headers
|
|
||||||
{
|
|
||||||
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
|
|
||||||
};
|
|
||||||
|
|
||||||
if (CorrelationIdProvider.Get() != null)
|
|
||||||
{
|
|
||||||
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!));
|
|
||||||
}
|
|
||||||
|
|
||||||
var topicAttribute = eventType.GetCustomAttribute<TopicNameAttribute>();
|
|
||||||
|
|
||||||
await PublishAsync(
|
|
||||||
topicAttribute==null?AbpKafkaEventBusOptions.TopicName:topicAttribute.Name,
|
|
||||||
eventType,
|
|
||||||
eventData,
|
|
||||||
headers
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
|
|
||||||
{
|
|
||||||
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
|
|
||||||
{
|
|
||||||
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
|
|
||||||
|
|
||||||
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
|
|
||||||
{
|
|
||||||
handlerFactoryList.Add(
|
|
||||||
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
|
|
||||||
}
|
|
||||||
|
|
||||||
return handlerFactoryList.ToArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override byte[] Serialize(object eventData)
|
|
||||||
{
|
|
||||||
return Serializer.Serialize(eventData);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
|
|
||||||
{
|
|
||||||
return HandlerFactories.GetOrAdd(
|
|
||||||
eventType,
|
|
||||||
type =>
|
|
||||||
{
|
|
||||||
var eventName = EventNameAttribute.GetNameOrDefault(type);
|
|
||||||
EventTypes.GetOrAdd(eventName, eventType);
|
|
||||||
return new List<IEventHandlerFactory>();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
|
|
||||||
{
|
|
||||||
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
|
|
||||||
var body = Serializer.Serialize(eventData);
|
|
||||||
|
|
||||||
return PublishAsync(topicName, eventName, body, headers);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Task<DeliveryResult<string, byte[]>> PublishAsync(
|
|
||||||
string topicName,
|
|
||||||
string eventName,
|
|
||||||
byte[] body,
|
|
||||||
Headers headers)
|
|
||||||
{
|
|
||||||
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
|
|
||||||
|
|
||||||
return producer.ProduceAsync(
|
|
||||||
topicName,
|
|
||||||
new Message<string, byte[]>
|
|
||||||
{
|
|
||||||
Key = eventName,
|
|
||||||
Value = body,
|
|
||||||
Headers = headers
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
|
|
||||||
{
|
|
||||||
//Should trigger same type
|
|
||||||
if (handlerEventType == targetEventType)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
//Should trigger for inherited types
|
|
||||||
if (handlerEventType.IsAssignableFrom(targetEventType))
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -24,7 +24,6 @@
|
|||||||
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
||||||
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
||||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||||
|
|||||||
@ -12,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
|
using JiShe.CollectBus.Serializer;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
@ -95,7 +96,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
//检查任务时间节点
|
//检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过
|
||||||
|
if (!IsGennerateCmd(tasksToBeIssueModel.NextTaskTime))
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
||||||
var tempArryay = item.Split(":");
|
var tempArryay = item.Split(":");
|
||||||
@ -107,7 +113,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-103");
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,7 +123,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
|
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
|
await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
|
||||||
@ -129,13 +135,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106");
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
|
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
|
||||||
|
|
||||||
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
|
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
|
||||||
tasksToBeIssueModel.NextTask = tasksToBeIssueModel.NextTask.AddMinutes(timeDensity);
|
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
|
||||||
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -253,7 +259,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
TimeDensity = itemTimeDensity.Key,
|
TimeDensity = itemTimeDensity.Key,
|
||||||
NextTask = DateTime.Now.AddMinutes(1)
|
NextTaskTime = DateTime.Now.AddMinutes(1)
|
||||||
};
|
};
|
||||||
|
|
||||||
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
|
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
|
||||||
@ -546,6 +552,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||||
{
|
{
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
|
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||||
@ -782,7 +789,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
TimeDensity = itemTimeDensity.Key,
|
TimeDensity = itemTimeDensity.Key,
|
||||||
NextTask = DateTime.Now.AddMinutes(1)
|
NextTaskTime = DateTime.Now.AddMinutes(1)
|
||||||
};
|
};
|
||||||
|
|
||||||
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
|
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
|
||||||
|
|||||||
@ -1,11 +1,5 @@
|
|||||||
using System;
|
using DotNetCore.CAP;
|
||||||
using System.Linq;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using DeviceDetectorNET.Parser.Device;
|
|
||||||
using DotNetCore.CAP;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
@ -15,8 +9,12 @@ using JiShe.CollectBus.Protocol.Contracts;
|
|||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
|
using JiShe.CollectBus.Serializer;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using TouchSocket.Sockets;
|
using TouchSocket.Sockets;
|
||||||
using Volo.Abp.Domain.Repositories;
|
using Volo.Abp.Domain.Repositories;
|
||||||
|
|
||||||
|
|||||||
@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 下个任务时间
|
/// 下个任务时间
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTime NextTask { get; set; }
|
public DateTime NextTaskTime { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 采集时间间隔,1分钟,5分钟,15分钟
|
/// 采集时间间隔,1分钟,5分钟,15分钟
|
||||||
|
|||||||
@ -7,12 +7,12 @@ using System.Text.Json.Serialization;
|
|||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.Helpers
|
namespace JiShe.CollectBus.Serializer
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// json帮助类
|
/// json帮助类
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static class JsonHelper
|
public static class BusJsonSerializer
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// json对象转换成字符串
|
/// json对象转换成字符串
|
||||||
@ -28,18 +28,19 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
jsonSerializerOptions = new JsonSerializerOptions
|
jsonSerializerOptions = new JsonSerializerOptions
|
||||||
{
|
{
|
||||||
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
||||||
WriteIndented = false,
|
WriteIndented = false,// 设置格式化输出
|
||||||
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
|
||||||
IgnoreReadOnlyFields = true,
|
IgnoreReadOnlyFields = true,
|
||||||
IgnoreReadOnlyProperties = true,
|
IgnoreReadOnlyProperties = true,
|
||||||
|
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
|
||||||
|
AllowTrailingCommas = true, // 忽略尾随逗号
|
||||||
|
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
|
||||||
|
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
|
||||||
|
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
|
||||||
|
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (jsonSerializerOptions.Converters != null)
|
|
||||||
{
|
|
||||||
jsonSerializerOptions.Converters.Add(new DateTimeJsonConverter());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsIgnore == true)
|
if (IsIgnore == true)
|
||||||
{
|
{
|
||||||
jsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; // 忽略循环引用
|
jsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; // 忽略循环引用
|
||||||
@ -67,18 +68,19 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
jsonSerializerOptions = new JsonSerializerOptions
|
jsonSerializerOptions = new JsonSerializerOptions
|
||||||
{
|
{
|
||||||
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
||||||
WriteIndented = false,
|
WriteIndented = false,// 设置格式化输出
|
||||||
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
|
||||||
IgnoreReadOnlyFields = true,
|
IgnoreReadOnlyFields = true,
|
||||||
IgnoreReadOnlyProperties = true,
|
IgnoreReadOnlyProperties = true,
|
||||||
|
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
|
||||||
|
AllowTrailingCommas = true, // 忽略尾随逗号
|
||||||
|
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
|
||||||
|
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
|
||||||
|
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
|
||||||
|
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (jsonSerializerOptions.Converters != null)
|
|
||||||
{
|
|
||||||
jsonSerializerOptions.Converters.Add(new DateTimeJsonConverter());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsIgnore == true)
|
if (IsIgnore == true)
|
||||||
{
|
{
|
||||||
jsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; // 忽略循环引用
|
jsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; // 忽略循环引用
|
||||||
@ -91,6 +93,48 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// json字符串转换成json对象
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T"></typeparam>
|
||||||
|
/// <param name="json"></param>
|
||||||
|
/// <param name="IsIgnore">是否忽略实体中实体,不再序列化里面包含的实体</param>
|
||||||
|
/// <param name="jsonSerializerOptions">配置</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static object? Deserialize(this string json, Type type, bool IsIgnore = false, JsonSerializerOptions jsonSerializerOptions = null)
|
||||||
|
{
|
||||||
|
if (jsonSerializerOptions == null)
|
||||||
|
{
|
||||||
|
jsonSerializerOptions = new JsonSerializerOptions
|
||||||
|
{
|
||||||
|
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
||||||
|
WriteIndented = false,// 设置格式化输出
|
||||||
|
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
|
||||||
|
IgnoreReadOnlyFields = true,
|
||||||
|
IgnoreReadOnlyProperties = true,
|
||||||
|
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
|
||||||
|
AllowTrailingCommas = true, // 忽略尾随逗号
|
||||||
|
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
|
||||||
|
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
|
||||||
|
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
|
||||||
|
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if (IsIgnore == true)
|
||||||
|
{
|
||||||
|
jsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; // 忽略循环引用
|
||||||
|
|
||||||
|
return json == null ? null : JsonSerializer.Deserialize(json, type, jsonSerializerOptions);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return json == null ? null : JsonSerializer.Deserialize(json, type, jsonSerializerOptions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// list json字符串转换成list
|
/// list json字符串转换成list
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -1,13 +1,16 @@
|
|||||||
using FreeRedis;
|
using FreeRedis;
|
||||||
using JetBrains.Annotations;
|
using JetBrains.Annotations;
|
||||||
using JiShe.CollectBus.FreeRedisProvider.Options;
|
using JiShe.CollectBus.FreeRedisProvider.Options;
|
||||||
|
using JiShe.CollectBus.Serializer;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using System.Text.Encodings.Web;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
|
using System.Text.Json.Serialization;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
@ -36,10 +39,11 @@ namespace JiShe.CollectBus.FreeRedisProvider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public IRedisClient GetInstance()
|
public IRedisClient GetInstance()
|
||||||
{
|
{
|
||||||
|
|
||||||
var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}";
|
var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}";
|
||||||
Instance = new RedisClient(connectionString);
|
Instance = new RedisClient(connectionString);
|
||||||
Instance.Serialize = obj => JsonSerializer.Serialize(obj);
|
Instance.Serialize = obj => BusJsonSerializer.Serialize(obj);
|
||||||
Instance.Deserialize = (json, type) => JsonSerializer.Deserialize(json, type);
|
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
|
||||||
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
|
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
|
||||||
return Instance;
|
return Instance;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user