2025-05-15 14:34:44 +08:00

593 lines
20 KiB
C#

using Confluent.Kafka;
using DeviceDetectorNET.Parser.Device;
using FreeSql.Internal.CommonProvider;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Sockets;
using static IdentityModel.ClaimComparer;
namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe
{
private readonly ILogger<SampleAppService> _logger;
private readonly IIoTDbProvider _iotDBProvider;
private readonly IoTDBRuntimeContext _dbContext;
private readonly IoTDbOptions _options;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly IProducerService _producerService;
private readonly ITcpService _tcpService;
public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options,
IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService, IProducerService producerService, ITcpService tcpService)
{
_iotDBProvider = iotDBProvider;
_options = options.Value;
_dbContext = dbContext;
_logger = logger;
_redisDataCacheService = redisDataCacheService;
_producerService =producerService;
_tcpService=tcpService;
}
/// <summary>
/// 测试 UseSessionPool
/// </summary>
/// <param name="testTime"></param>
/// <returns></returns>
[HttpGet]
public async Task UseSessionPool(long testTime)
{
var dataTime = DateTime.Now;
List<string> values = new List<string>() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", };
ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel()
{
SystemName = "energy",
DeviceId = "402440506s",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectId = "10059",
Voltage = 10,
IssuedMessageHexString = "messageHexString",
Timestamps = testTime// DateTimeOffset.UtcNow.ToUnixTimeNanoseconds()//testTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
};
//ElectricityMeterTreeModelExtension.GetCurrent()
//SourceEntityAccessorFactory.SetCurrent(meter);
//ElectricityMeterTreeModelAccessor.
//TableModelSingleMeasuringEntityExtension
//TableModelSingleMeasuringEntityAccessor.GetSystemName(meter);
//ElectricityMeterAccessor
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试Session切换
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task UseTableSessionPool(DateTime time)
{
var testTime = time;
ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectId = "10059",
Voltage = 10,
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
};
await _iotDBProvider.InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter = new ElectricityMeter()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectId = "10059",
Voltage = 10,
CurrentdDateTime = DateTime.Now,
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
};
await _iotDBProvider.InsertAsync(meter);
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = nameof(ElectricityMeter),
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
var pageResult = await _iotDBProvider.QueryAsync<ElectricityMeter>(query);
}
/// <summary>
/// 测试Session切换3
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task UseTableSessionPool3(DateTime time)
{
var testTime = time;
ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectId = "10059",
Voltage = 10,
IssuedMessageHexString = "dsdfsfd",
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(),
};
await _iotDBProvider.InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter3 = new ElectricityMeter()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectId = "10059",
Voltage = 10,
Currentd = 22,
IssuedMessageHexString = "dsdfsfd",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
};
//var dd = DateTimeOffset.Now.ToUnixTimeMilliseconds();
//var dd3 = DateTimeOffset.Now.ToUnixTimeMicroseconds();
//var dd2 = DateTimeOffset.Now.ToUnixTimeNanoseconds();
await _iotDBProvider.InsertAsync(meter3);
}
/// <summary>
/// 测试树模型单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{
time = DateTime.Now;
//System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo
var meter = new TreeModelSingleMeasuringEntity<DateTime>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "1",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = (measuring, time)
};
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试树模型单个测点数据项2
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{
time = DateTime.Now;
var meter = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = (measuring, true)
};
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试树模型单个测点数据项查询
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntityQuery()
{
var time = DateTime.Now;
var meter = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = ("measuring", true)
};
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
var pageResult = await _iotDBProvider.QueryAsync<DeviceTreeModelDataInfo>(query);
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试表模型单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTableModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{
time = DateTime.Now;
var meter = new TableModelSingleMeasuringEntity<string>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = (measuring, value)
};
_dbContext.UseTableSessionPool = true;
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试表模型单个测点数据项2
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTableModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{
time = DateTime.Now;
var meter = new TableModelSingleMeasuringEntity<int>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = (measuring, value)
};
_dbContext.UseTableSessionPool = true;
await _iotDBProvider.InsertAsync(meter);
var meter3 = new TableModelSingleMeasuringEntity<bool>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = ("DeviceResult", true)
};
_dbContext.UseTableSessionPool = true;
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
var pageResult = await _iotDBProvider.QueryAsync<DeviceTreeModelDataInfo>(query);
}
/// <summary>
/// 测试设备分组均衡控制算法
/// </summary>
/// <param name="deviceCount"></param>
/// <returns></returns>
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
//var deviceList = new List<string>();
//for (int i = 0; i < deviceCount; i++)
//{
// deviceList.Add($"Device_{Guid.NewGuid()}");
//}
//// 初始化缓存
//DeviceGroupBalanceControl.InitializeCache(deviceList);
//var timeDensity = "15";
////获取缓存中的电表信息
//var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
//List<string> focusAddressDataLista = new List<string>();
//foreach (var item in meterInfos)
//{
// focusAddressDataLista.Add(item.FocusAddress);
//}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//// 打印分布统计
//DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
/// <summary>
/// 测试设备分组均衡控制算法获取分组Id
/// </summary>
/// <param name="deviceAddress"></param>
/// <returns></returns>
[HttpGet]
public async Task TestGetDeviceGroupBalanceControl(string deviceAddress)
{
var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress);
Console.WriteLine(groupId);
await Task.CompletedTask;
}
/// <summary>
/// 测试Redis批量读取10万条数据性能
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestRedisCacheGetAllPagedData()
{
//var timeDensity = "15";
//string SystemType = "Energy";
//string ServerTagName = "JiSheCollectBus2";
//var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var timer1 = Stopwatch.StartNew();
//decimal? cursor = null;
//string member = null;
//bool hasNext;
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//do
//{
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
//timer1.Stop();
//_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
//List<string> focusAddressDataLista = new List<string>();
//foreach (var item in meterInfos)
//{
// focusAddressDataLista.Add(item.FocusAddress);
//}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//// 打印分布统计
//DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
/// <summary>
/// 下一个采集时间点验证
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<DateTime> TestCalculateNextCollectionTime(string time, int timeDensity)
{
DateTime nextTaskTime = Convert.ToDateTime(time);
return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity));
}
public Task<SampleDto> GetAsync()
{
return Task.FromResult(
new SampleDto
{
Value = 42
}
);
}
[Authorize]
public Task<SampleDto> GetAuthorizedAsync()
{
return Task.FromResult(
new SampleDto
{
Value = 42
}
);
}
[AllowAnonymous]
public async Task<List<Vi_BaseAmmeterInfo>> Test()
{
var ammeterList = await SqlProvider.Instance.Change(DbEnum.PrepayDB).Select<Vi_BaseAmmeterInfo>().Where(d => d.TB_CustomerID == 5).Take(10).ToListAsync();
return ammeterList;
}
[AllowAnonymous]
public bool GetTestProtocol()
{
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
return aa == null;
}
[KafkaSubscribe(ProtocolConst.TESTTOPIC)]
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{
_logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success();
}
/// <summary>
/// 测试Kafka下发报文
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<bool> KafkaSendAsync(KafkaSendDto input)
{
ArgumentException.ThrowIfNullOrWhiteSpace(input.Address);
ArgumentException.ThrowIfNullOrWhiteSpace(input.Frame);
input.Frame = input.Frame.Replace(" ", "");
await _producerService.ProduceAsync<KafkaSendDto>(ProtocolConst.TESTSENDTOPIC, input);
return await Task.FromResult(true);
}
/// <summary>
/// 订阅下发的数据
/// </summary>
/// <param name="dto"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.TESTSENDTOPIC), ApiExplorerSettings(IgnoreApi = true)]
public async Task<ISubscribeAck> KafkaSubscribeTestSendAsync(KafkaSendDto dto)
{
if (_tcpService.ClientExists(dto.Address))
{
// 发送给设备
await _tcpService.SendAsync(dto.Address, Convert.FromHexString(dto.Frame));
_logger.LogWarning($"{dto.Address}下发消息报文:{dto.Frame}");
}
else
{
_logger.LogWarning($"{dto.Address}集中器未上线: {dto.Serialize()}");
}
// 测试不管是否上线都ACK
return SubscribeAck.Success();
}
/// <summary>
/// 测试Redis批量读取10万条数据性能
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestRedisCacheGetData(string scores)
{
//var timeDensity = "15";
//string SystemType = "Energy";
//string ServerTagName = "JiSheCollectBus5";
//var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
//var page = await _redisDataCacheService.GetSingleData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// "973219481:17",
// pageSize: 1000,
// lastScore: 100,
// lastMember: "memberId",
// descending: true
// );
await Task.CompletedTask;
}
}