448 lines
15 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Confluent.Kafka;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.Kafka.Internal;
using JiShe.ServicePro.Kafka.Producer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using TouchSocket.Sockets;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe
{
private readonly ILogger<SampleAppService> _logger;
private readonly IoTDBSessionPoolProvider _iotDBProvider;
private readonly IoTDbOptions _options;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly IProducerService _producerService;
private readonly ITcpService _tcpService;
private readonly IFreeRedisProvider _freeRedisProvider;
public SampleAppService(IoTDBSessionPoolProvider iotDBProvider, IOptions<IoTDbOptions> options,
ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService, IProducerService producerService, ITcpService tcpService, IFreeRedisProvider freeRedisProvider)
{
_iotDBProvider = iotDBProvider;
_options = options.Value;
_logger = logger;
_redisDataCacheService = redisDataCacheService;
_producerService = producerService;
_tcpService = tcpService;
_freeRedisProvider = freeRedisProvider;
}
/// <summary>
/// 测试 UseSessionPool
/// </summary>
/// <param name="testTime"></param>
/// <returns></returns>
[HttpGet]
public async Task UseSessionPool(long testTime)
{
var dataTime = DateTime.Now;
List<string> values = new List<string>() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", };
//ElectricityMeterTreeModelExtension.GetCurrent()
//SourceEntityAccessorFactory.SetCurrent(meter);
//ElectricityMeterTreeModelAccessor.
//TableModelSingleMeasuringEntityExtension
//TableModelSingleMeasuringEntityAccessor.GetSystemName(meter);
//ElectricityMeterAccessor
}
/// <summary>
/// 测试Session切换
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task UseTableSessionPool(DateTime time)
{
var testTime = time;
}
/// <summary>
/// 测试Session切换3
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task UseTableSessionPool3(DateTime time)
{
var testTime = time;
}
/// <summary>
/// 测试树模型单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{
}
/// <summary>
/// 测试树模型单个测点数据项2
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{
}
/// <summary>
/// 测试树模型单个测点数据项查询
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntityQuery()
{
}
/// <summary>
/// 测试表模型单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTableModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{
time = DateTime.Now;
}
/// <summary>
/// 测试表模型单个测点数据项2
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTableModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{
time = DateTime.Now;
}
/// <summary>
/// 测试设备分组均衡控制算法
/// </summary>
/// <param name="deviceCount"></param>
/// <returns></returns>
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
//var deviceList = new List<string>();
//for (int i = 0; i < deviceCount; i++)
//{
// deviceList.Add($"Device_{Guid.NewGuid()}");
//}
//// 初始化缓存
//DeviceGroupBalanceControl.InitializeCache(deviceList);
//var timeDensity = "15";
////获取缓存中的电表信息
//var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
//List<string> focusAddressDataLista = new List<string>();
//foreach (var item in meterInfos)
//{
// focusAddressDataLista.Add(item.FocusAddress);
//}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//// 打印分布统计
//DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
/// <summary>
/// 测试设备分组均衡控制算法获取分组Id
/// </summary>
/// <param name="deviceAddress"></param>
/// <returns></returns>
[HttpGet]
public async Task TestGetDeviceGroupBalanceControl(string deviceAddress)
{
var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress);
Console.WriteLine(groupId);
await Task.CompletedTask;
}
/// <summary>
/// 测试Redis批量读取10万条数据性能
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestRedisCacheGetAllPagedData()
{
//var timeDensity = "15";
//string SystemType = "Energy";
//string ServerTagName = "JiSheCollectBus2";
//var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var timer1 = Stopwatch.StartNew();
//decimal? cursor = null;
//string member = null;
//bool hasNext;
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//do
//{
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
//timer1.Stop();
//_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
//List<string> focusAddressDataLista = new List<string>();
//foreach (var item in meterInfos)
//{
// focusAddressDataLista.Add(item.FocusAddress);
//}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//// 打印分布统计
//DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
/// <summary>
/// 下一个采集时间点验证
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<DateTime> TestCalculateNextCollectionTime(string time, int timeDensity)
{
DateTime nextTaskTime = Convert.ToDateTime(time);
return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity));
}
public Task<SampleDto> GetAsync()
{
return Task.FromResult(
new SampleDto
{
Value = 42
}
);
}
[Authorize]
public Task<SampleDto> GetAuthorizedAsync()
{
return Task.FromResult(
new SampleDto
{
Value = 42
}
);
}
[AllowAnonymous]
public async Task<List<Vi_BaseAmmeterInfo>> Test()
{
var ammeterList = await SqlProvider.Instance.Change(DbEnum.PrepayDB).Select<Vi_BaseAmmeterInfo>().Where(d => d.TB_CustomerID == 5).Take(10).ToListAsync();
return ammeterList;
}
[AllowAnonymous]
public bool GetTestProtocol()
{
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
return aa == null;
}
[KafkaSubscribe(KafkaTopicConsts.TESTTOPIC)]
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{
_logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success();
}
/// <summary>
/// 测试Kafka下发报文
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<bool> KafkaSendAsync(KafkaSendDto input)
{
ArgumentException.ThrowIfNullOrWhiteSpace(input.Address);
ArgumentException.ThrowIfNullOrWhiteSpace(input.Frame);
input.Frame = input.Frame.Replace(" ", "");
await _producerService.ProduceAsync<KafkaSendDto>(KafkaTopicConsts.TESTSENDTOPIC, input);
return await Task.FromResult(true);
}
/// <summary>
/// 订阅下发的数据
/// </summary>
/// <param name="dto"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.TESTSENDTOPIC), ApiExplorerSettings(IgnoreApi = true)]
public async Task<ISubscribeAck> KafkaSubscribeTestSendAsync(KafkaSendDto dto)
{
if (_tcpService.ClientExists(dto.Address))
{
// 发送给设备
await _tcpService.SendAsync(dto.Address, Convert.FromHexString(dto.Frame));
_logger.LogWarning($"{dto.Address}下发消息报文:{dto.Frame}");
}
else
{
_logger.LogWarning($"{dto.Address}集中器未上线: {dto.Serialize()}");
}
// 测试不管是否上线都ACK
return SubscribeAck.Success();
}
/// <summary>
/// 测试Redis批量读取10万条数据性能
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestRedisCacheGetData(string scores)
{
//var timeDensity = "15";
//string SystemType = "Energy";
//string ServerTagName = "JiSheCollectBus5";
//var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var page = await _redisDataCacheService.GetSingleData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// "973219481:17",
// pageSize: 1000,
// lastScore: 100,
// lastMember: "memberId",
// descending: true
// );
await Task.CompletedTask;
}
/// <summary>
/// IoTDB空表查询情况
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestIoTDBEmptyTableQuery()
{
var meter = new MeterReadingTelemetryPacketInfo() { DevicePath = "MeterReadingTelemetryPacketInfo", DeviceId = "1111" };
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
await _iotDBProvider.GetSessionPool(true).InitTableSessionModelAsync();
var pageResult = await _iotDBProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(query);
await Task.CompletedTask;
}
/// <summary>
/// 设置集中器采集项
/// </summary>
/// <param name="focusAddress"></param>
/// <param name="meterAddress"></param>
/// <param name="cycle">值可选(15,5)</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
[HttpPost]
public async Task<string> AutoCollectionItemsSet(string focusAddress, string meterAddress,int cycle)
{
var key = $"{RedisConst.CacheAllDeviceInfoHashKey}";
List<DeviceCacheInfo> deviceCacheInfos= await _freeRedisProvider.Instance.HGetAsync<List<DeviceCacheInfo>>(key, focusAddress);
if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0)
{
throw new Exception($"未能在缓存中找到设备{focusAddress}缓存key:{key}");
}
DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == meterAddress);
if (deviceCacheInfo == null) {
throw new Exception($"未能在缓存中找到设备{focusAddress}下的表{meterAddress}缓存key:{key}");
}
var details = deviceCacheInfo.ItemCodes.Where(n => n.Trim().Substring(0, 3) == "0D_").Select(it => new PnFn(1, Convert.ToInt32(it.Split('_')[1]))).ToList();
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(focusAddress, 1, 0, cycle, DateTime.Now,1, details);
string frame = bytes.ToHexString();
await _producerService.ProduceAsync<KafkaSendDto>(KafkaTopicConsts.TESTSENDTOPIC, new KafkaSendDto() {
Address = focusAddress,
Frame = frame
});
return frame;
}
}