This commit is contained in:
cli 2025-03-18 13:56:38 +08:00
parent 798c48650a
commit 9cda1af5fc
4 changed files with 52 additions and 21 deletions

View File

@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.EnergySystems.TableViews; using JiShe.CollectBus.EnergySystems.TableViews;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
@ -26,8 +27,16 @@ namespace JiShe.CollectBus.EnergySystem
foreach (var group in groupData) foreach (var group in groupData)
{ {
var aa = data.ToDictionary(a => a.Address, b => b); try
await FreeRedisProvider.Instance.HSetAsync(group.Key, aa); {
var aa = group.ToDictionary(a => $"{a.ID}_{a.Address}" , b => b);
await FreeRedisProvider.Instance.HSetAsync($"{RedisConst.CacheAmmeterFocusKey}:{group.Key}", aa);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
} }
} }

View File

@ -5,6 +5,7 @@ using System.Linq;
using System.Net; using System.Net;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
using FreeSql;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
@ -24,6 +25,7 @@ using MassTransit;
using MassTransit.Internals.GraphValidation; using MassTransit.Internals.GraphValidation;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
using Volo.Abp.Uow;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -35,16 +37,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _capBus; private readonly ICapPublisher _capBus;
private readonly IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> _meterReadingIssuedRepository; private readonly IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> _meterReadingIssuedRepository;
private readonly IUnitOfWorkManager _unitOfWorkManager;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher capBus, ICapPublisher capBus,
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository, IUnitOfWorkManager unitOfWorkManager)
{ {
_capBus = capBus; _capBus = capBus;
_logger = logger; _logger = logger;
_meterReadingIssuedRepository = meterReadingIssuedRepository; _meterReadingIssuedRepository = meterReadingIssuedRepository;
_unitOfWorkManager = unitOfWorkManager;
} }
/// <summary> /// <summary>
@ -116,7 +120,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>(); Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
foreach (var ammeter in item) foreach (var ammeter in item)
{ {
@ -178,7 +182,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task AmmeterScheduledMeterOneMinuteReading() public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{ {
//获取缓存中的电表信息 //获取缓存中的电表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{ {
@ -207,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task AmmeterScheduledMeterFiveMinuteReading() public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
{ {
//获取缓存中的电表信息 //获取缓存中的电表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{ {
@ -234,7 +238,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{ {
//获取缓存中的电表信息 //获取缓存中的电表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{ {
@ -307,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>(); Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
foreach (var subItem in item) foreach (var subItem in item)
{ {
@ -327,7 +331,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task WatermeterScheduledMeterOneMinuteReading() public virtual async Task WatermeterScheduledMeterOneMinuteReading()
{ {
//获取缓存中的水表信息 //获取缓存中的水表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{ {
@ -354,7 +358,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
//获取缓存中的水表信息 //获取缓存中的水表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{ {
@ -380,7 +384,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task WatermeterScheduledMeterFifteenMinuteReading() public virtual async Task WatermeterScheduledMeterFifteenMinuteReading()
{ {
//获取缓存中的水表信息 //获取缓存中的水表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{ {
@ -434,7 +438,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
string key = (string)item[0];//集中器地址对应的Redis缓存Key string key = (string)item[0];//集中器地址对应的Redis缓存Key
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, minute)}"; var redisCacheKey = $"{string.Format(RedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, minute)}";
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
var meterHashs = new Dictionary<string, T>(); var meterHashs = new Dictionary<string, T>();
@ -515,7 +519,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var group in focusHashGroups) foreach (var group in focusHashGroups)
{ {
//TODO _meterReadingIssuedRepository 需要优化 //TODO _meterReadingIssuedRepository 需要优化
//_ = Task.Run(async () => { await CreatePublishTask(eventName, group.Value); });
await Task.Factory.StartNew(async () =>
{
using var uow = _unitOfWorkManager.Begin();
await CreatePublishTask(eventName, group.Value);
await uow.SaveChangesAsync();
});
//await Task.Run(async () => { await CreatePublishTask(eventName, group.Value); });
await CreatePublishTask(eventName, group.Value); await CreatePublishTask(eventName, group.Value);
} }

View File

@ -11,6 +11,7 @@ using JiShe.CollectBus.IotSystems.Watermeter;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
using Volo.Abp.Uow;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -22,7 +23,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, ICapPublisher capBus, IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository) public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher capBus,
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository,
IUnitOfWorkManager unitOfWorkManager) :base(logger, capBus, meterReadingIssuedRepository, unitOfWorkManager)
{ {
} }

View File

@ -6,36 +6,41 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts namespace JiShe.CollectBus.Common.Consts
{ {
public class FreeRedisConst public class RedisConst
{ {
/// <summary> /// <summary>
/// 缓存基础目录 /// 缓存基础目录
/// </summary> /// </summary>
public const string CacheBasicDirectoryKey = "CollectBus:"; public const string CacheBasicDirectoryKey = "CollectBus";
/// <summary> /// <summary>
/// 1分钟采集间隔 /// 1分钟采集间隔
/// </summary> /// </summary>
public const string OneMinuteAcquisitionTimeInterval = $"one"; public const string OneMinuteAcquisitionTimeInterval = "One";
/// <summary> /// <summary>
/// 5分钟采集间隔 /// 5分钟采集间隔
/// </summary> /// </summary>
public const string FiveMinuteAcquisitionTimeInterval = $"Five"; public const string FiveMinuteAcquisitionTimeInterval = "Five";
/// <summary> /// <summary>
/// 15分钟采集间隔 /// 15分钟采集间隔
/// </summary> /// </summary>
public const string FifteenMinuteAcquisitionTimeInterval = $"Fifteen"; public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen";
/// <summary> /// <summary>
/// 缓存电表信息 /// 缓存电表信息
/// </summary> /// </summary>
public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:AmmeterInfo:"; public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}:{"{0}"}:{"{1}"}:AmmeterInfo:";
/// <summary> /// <summary>
/// 缓存水表信息 /// 缓存水表信息
/// </summary> /// </summary>
public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:WatermeterInfo:"; public const string CacheAmmeterFocusKey = $"{CacheBasicDirectoryKey}:AmmeterFocus";
/// <summary>
/// 缓存水表信息
/// </summary>
public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}:{"{0}"}:{"{1}"}:WatermeterInfo:";
} }
} }