Compare commits
No commits in common. "7704c6374ed9f6a3989d2a96dde8c2d1d4cdf217" and "76ae008e015a7f2aa6776c12d5d57783b26b11f7" have entirely different histories.
7704c6374e
...
76ae008e01
5
.gitignore
vendored
5
.gitignore
vendored
@ -400,6 +400,5 @@ FodyWeavers.xsd
|
|||||||
|
|
||||||
# ABP Studio
|
# ABP Studio
|
||||||
**/.abpstudio/
|
**/.abpstudio/
|
||||||
/web/JiShe.CollectBus.Host/Plugins/*.dll
|
/src/JiShe.CollectBus.Host/Plugins/*.dll
|
||||||
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.dll
|
JiShe.CollectBus.Kafka.Test
|
||||||
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.Test.dll
|
|
||||||
|
|||||||
@ -3,49 +3,41 @@ Microsoft Visual Studio Solution File, Format Version 12.00
|
|||||||
# Visual Studio Version 17
|
# Visual Studio Version 17
|
||||||
VisualStudioVersion = 17.9.34728.123
|
VisualStudioVersion = 17.9.34728.123
|
||||||
MinimumVisualStudioVersion = 10.0.40219.1
|
MinimumVisualStudioVersion = 10.0.40219.1
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "src\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "src\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "src\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "src\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
|
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{649A3FFA-182F-4E56-9717-E6A9A2BEC545}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "web\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "src\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "web\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "src\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "src\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "src\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "src\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "services\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "src\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "modules\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "src\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeRedis", "modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "src\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Kafka", "modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.IoTDB", "modules\JiShe.CollectBus.IoTDB\JiShe.CollectBus.IoTDB.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Test", "protocols\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Cassandra", "modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "1.Web", "1.Web", "{A02F7D8A-04DC-44D6-94D4-3F65712D6B94}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
||||||
EndProject
|
|
||||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "4.Modules", "4.Modules", "{2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}"
|
|
||||||
EndProject
|
|
||||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "3.Protocols", "3.Protocols", "{3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}"
|
|
||||||
EndProject
|
|
||||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services", "{BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}"
|
|
||||||
EndProject
|
|
||||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
|
|
||||||
EndProject
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
@ -126,23 +118,23 @@ Global
|
|||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(NestedProjects) = preSolution
|
GlobalSection(NestedProjects) = preSolution
|
||||||
{D64C1577-4929-4B60-939E-96DE1534891A} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
|
{D64C1577-4929-4B60-939E-96DE1534891A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{78040F9E-3501-4A40-82DF-00A597710F35} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
{78040F9E-3501-4A40-82DF-00A597710F35} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{F1C58097-4C08-4D88-8976-6B3389391481} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
{F1C58097-4C08-4D88-8976-6B3389391481} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
|
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
|
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
|
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{C62EFF95-5C32-435F-BD78-6977E828F894} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
|
{C62EFF95-5C32-435F-BD78-6977E828F894} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{38C1808B-009A-418B-B17B-AB3626341B5D} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
|
{38C1808B-009A-418B-B17B-AB3626341B5D} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{C06C4082-638F-2996-5FED-7784475766C1} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
|
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -1,10 +0,0 @@
|
|||||||
namespace JiShe.CollectBus.IoTDB.Attribute
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Column分类标记特性(ATTRIBUTE字段),也就是属性字段
|
|
||||||
/// </summary>
|
|
||||||
[AttributeUsage(AttributeTargets.Property)]
|
|
||||||
public class ATTRIBUTEColumnAttribute : System.Attribute
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,10 +0,0 @@
|
|||||||
namespace JiShe.CollectBus.IoTDB.Attribute
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Column分类标记特性(FIELD字段),数据列字段
|
|
||||||
/// </summary>
|
|
||||||
[AttributeUsage(AttributeTargets.Property)]
|
|
||||||
public class FIELDColumnAttribute : System.Attribute
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,10 +0,0 @@
|
|||||||
namespace JiShe.CollectBus.IoTDB.Attribute
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Column分类标记特性(TAG字段),标签字段
|
|
||||||
/// </summary>
|
|
||||||
[AttributeUsage(AttributeTargets.Property)]
|
|
||||||
public class TAGColumnAttribute : System.Attribute
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,45 +0,0 @@
|
|||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
public class HostedService : IHostedService, IDisposable
|
|
||||||
{
|
|
||||||
private readonly ILogger _logger;
|
|
||||||
private readonly IServiceProvider _provider;
|
|
||||||
public HostedService(ILogger<HostedService> logger, IServiceProvider provider)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
_provider = provider;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task StartAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("程序启动");
|
|
||||||
Task.Run(() =>
|
|
||||||
{
|
|
||||||
_provider.UseKafkaSubscribe();
|
|
||||||
});
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task StopAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("结束");
|
|
||||||
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -23,10 +23,10 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Volo.Abp.Ddd.Application.Contracts" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Ddd.Application.Contracts" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Authorization" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Authorization" Version="8.3.3" />
|
||||||
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -133,6 +133,17 @@ namespace JiShe.CollectBus.Application.Contracts
|
|||||||
where T : DeviceCacheBasicModel;
|
where T : DeviceCacheBasicModel;
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 优化后的分页获取方法(支持百万级数据)
|
||||||
|
/// </summary>
|
||||||
|
Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
|
||||||
|
string redisHashCacheKey,
|
||||||
|
string redisZSetScoresIndexCacheKey,
|
||||||
|
int pageSize = 1000,
|
||||||
|
decimal? lastScore = null,
|
||||||
|
string lastMember = null,
|
||||||
|
bool descending = true) where T : DeviceCacheBasicModel;
|
||||||
|
|
||||||
///// <summary>
|
///// <summary>
|
||||||
///// 游标分页查询
|
///// 游标分页查询
|
||||||
///// </summary>
|
///// </summary>
|
||||||
@ -2,6 +2,7 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.Localization;
|
using JiShe.CollectBus.Localization;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
@ -9,7 +10,6 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.FreeRedis;
|
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
namespace JiShe.CollectBus;
|
||||||
@ -1,6 +1,8 @@
|
|||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.Kafka;
|
using JiShe.CollectBus.Kafka;
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
using JiShe.CollectBus.Kafka.AdminClient;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
@ -12,8 +14,6 @@ using System.Linq;
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.Cassandra;
|
using JiShe.CollectBus.Cassandra;
|
||||||
using JiShe.CollectBus.FreeRedis;
|
|
||||||
using JiShe.CollectBus.IoTDB;
|
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
using Volo.Abp.Application;
|
using Volo.Abp.Application;
|
||||||
using Volo.Abp.Autofac;
|
using Volo.Abp.Autofac;
|
||||||
@ -25,14 +25,14 @@
|
|||||||
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
||||||
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
||||||
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
||||||
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
|
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj" />
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
|
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -4,6 +4,7 @@ using JiShe.CollectBus.Application.Contracts;
|
|||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
@ -11,7 +12,6 @@ using System.Collections.Generic;
|
|||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.FreeRedis;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
using static FreeSql.Internal.GlobalFilter;
|
||||||
using static System.Runtime.InteropServices.JavaScript.JSType;
|
using static System.Runtime.InteropServices.JavaScript.JSType;
|
||||||
@ -366,6 +366,181 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
throw new Exception();
|
throw new Exception();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 优化后的分页获取方法(支持百万级数据)
|
||||||
|
/// </summary>
|
||||||
|
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
|
||||||
|
string redisHashCacheKey,
|
||||||
|
string redisZSetScoresIndexCacheKey,
|
||||||
|
int pageSize = 1000,
|
||||||
|
decimal? lastScore = null,
|
||||||
|
string lastMember = null,
|
||||||
|
bool descending = true) where T : DeviceCacheBasicModel
|
||||||
|
{
|
||||||
|
// 参数校验
|
||||||
|
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
||||||
|
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
||||||
|
{
|
||||||
|
_logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
|
||||||
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
||||||
|
}
|
||||||
|
|
||||||
|
pageSize = Math.Clamp(pageSize, 1, 10000);
|
||||||
|
|
||||||
|
const string luaScript = @"
|
||||||
|
local command = ARGV[1]
|
||||||
|
local range_start = ARGV[2]
|
||||||
|
local range_end = ARGV[3]
|
||||||
|
local limit = tonumber(ARGV[4])
|
||||||
|
local last_score = ARGV[5]
|
||||||
|
local last_member = ARGV[6]
|
||||||
|
|
||||||
|
-- 获取扩展数据(5倍分页大小)
|
||||||
|
local members
|
||||||
|
if command == 'ZRANGEBYSCORE' then
|
||||||
|
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
|
||||||
|
'WITHSCORES', 'LIMIT', 0, limit * 5)
|
||||||
|
else
|
||||||
|
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
|
||||||
|
'WITHSCORES', 'LIMIT', 0, limit * 5)
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 精确分页过滤
|
||||||
|
local filtered = {}
|
||||||
|
local count = 0
|
||||||
|
local start_index = 1
|
||||||
|
|
||||||
|
-- 存在锚点时寻找起始位置
|
||||||
|
if last_score ~= '' and last_member ~= '' then
|
||||||
|
for i=1,#members,2 do
|
||||||
|
local score = members[i+1]
|
||||||
|
local member = members[i]
|
||||||
|
|
||||||
|
if command == 'ZRANGEBYSCORE' then
|
||||||
|
if tonumber(score) > tonumber(last_score) then
|
||||||
|
start_index = i
|
||||||
|
break
|
||||||
|
elseif tonumber(score) == tonumber(last_score) then
|
||||||
|
if member > last_member then
|
||||||
|
start_index = i
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
else
|
||||||
|
if tonumber(score) < tonumber(last_score) then
|
||||||
|
start_index = i
|
||||||
|
break
|
||||||
|
elseif tonumber(score) == tonumber(last_score) then
|
||||||
|
if member < last_member then
|
||||||
|
start_index = i
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 收集有效数据
|
||||||
|
for i=start_index,#members,2 do
|
||||||
|
if count >= limit then break end
|
||||||
|
table.insert(filtered, members[i])
|
||||||
|
table.insert(filtered, members[i+1])
|
||||||
|
count = count + 1
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 提取有效数据
|
||||||
|
local result_members = {}
|
||||||
|
local result_scores = {}
|
||||||
|
for i=1,#filtered,2 do
|
||||||
|
table.insert(result_members, filtered[i])
|
||||||
|
table.insert(result_scores, filtered[i+1])
|
||||||
|
end
|
||||||
|
|
||||||
|
if #result_members == 0 then return {0,{},{},{}} end
|
||||||
|
|
||||||
|
-- 获取Hash数据
|
||||||
|
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
||||||
|
return {#result_members, result_members, result_scores, hash_data}";
|
||||||
|
|
||||||
|
// 构造查询范围(包含等于)
|
||||||
|
string rangeStart, rangeEnd;
|
||||||
|
if (descending)
|
||||||
|
{
|
||||||
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
|
||||||
|
rangeEnd = "-inf";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
|
||||||
|
rangeEnd = "+inf";
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var scriptResult = (object[])await Instance.EvalAsync(
|
||||||
|
luaScript,
|
||||||
|
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
|
||||||
|
new object[]
|
||||||
|
{
|
||||||
|
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
|
||||||
|
rangeStart,
|
||||||
|
rangeEnd,
|
||||||
|
pageSize,
|
||||||
|
lastScore?.ToString() ?? "",
|
||||||
|
lastMember ?? ""
|
||||||
|
});
|
||||||
|
|
||||||
|
var itemCount = (long)scriptResult[0];
|
||||||
|
if (itemCount == 0)
|
||||||
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
||||||
|
|
||||||
|
// 处理结果集
|
||||||
|
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
|
||||||
|
var scores = ((object[])scriptResult[2]).Cast<string>()
|
||||||
|
.Select(decimal.Parse).ToList();
|
||||||
|
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
||||||
|
|
||||||
|
var validItems = members.AsParallel()
|
||||||
|
.Select((m, i) =>
|
||||||
|
{
|
||||||
|
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
|
||||||
|
catch { return null; }
|
||||||
|
})
|
||||||
|
.Where(x => x != null)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
// 精确分页控制
|
||||||
|
var hasNext = validItems.Count >= pageSize;
|
||||||
|
var actualItems = validItems.Take(pageSize).ToList();
|
||||||
|
|
||||||
|
// 计算下一页锚点(必须基于原始排序)
|
||||||
|
decimal? nextScore = null;
|
||||||
|
string nextMember = null;
|
||||||
|
if (hasNext && actualItems.Count > 0)
|
||||||
|
{
|
||||||
|
var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
|
||||||
|
nextScore = scores[lastValidIndex];
|
||||||
|
nextMember = members[lastValidIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
return new BusCacheGlobalPagedResult<T>
|
||||||
|
{
|
||||||
|
Items = actualItems,
|
||||||
|
HasNext = hasNext,
|
||||||
|
NextScore = nextScore,
|
||||||
|
NextMember = nextMember,
|
||||||
|
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
|
||||||
|
PageSize = pageSize
|
||||||
|
};
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "分页查询异常");
|
||||||
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
|
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -387,14 +562,17 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
where T : DeviceCacheBasicModel
|
where T : DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
// 参数校验增强
|
// 参数校验增强
|
||||||
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
||||||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
|
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
|
||||||
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
pageSize = Math.Clamp(pageSize, 1, 10000);
|
if (pageSize < 1 || pageSize > 10000)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
var luaScript = @"
|
var luaScript = @"
|
||||||
local command = ARGV[1]
|
local command = ARGV[1]
|
||||||
@ -6,10 +6,12 @@ using Apache.IoTDB;
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.PrepayModel;
|
using JiShe.CollectBus.IotSystems.PrepayModel;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider.Context;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using JiShe.CollectBus.IotSystems.AFNEntity;
|
using JiShe.CollectBus.IotSystems.AFNEntity;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
@ -24,9 +26,6 @@ using JiShe.CollectBus.Kafka;
|
|||||||
using JiShe.CollectBus.Application.Contracts;
|
using JiShe.CollectBus.Application.Contracts;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using JiShe.CollectBus.IoTDB.Context;
|
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
|
||||||
using JiShe.CollectBus.IoTDB.Options;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Samples;
|
namespace JiShe.CollectBus.Samples;
|
||||||
|
|
||||||
@ -227,19 +226,6 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
|
|
||||||
timer1.Stop();
|
timer1.Stop();
|
||||||
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
List<string> focusAddressDataLista = new List<string>();
|
|
||||||
foreach (var item in meterInfos)
|
|
||||||
{
|
|
||||||
focusAddressDataLista.Add(item.FocusAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
|
|
||||||
|
|
||||||
// 打印分布统计
|
|
||||||
DeviceGroupBalanceControl.PrintDistributionStats();
|
|
||||||
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -279,7 +265,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
return aa == null;
|
return aa == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.TESTTOPIC)]
|
//[KafkaSubscribe("test-topic1")]
|
||||||
|
|
||||||
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
|
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
|
||||||
{
|
{
|
||||||
@ -6,10 +6,12 @@ using Apache.IoTDB;
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.PrepayModel;
|
using JiShe.CollectBus.IotSystems.PrepayModel;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider.Context;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.IotSystems.AFNEntity;
|
using JiShe.CollectBus.IotSystems.AFNEntity;
|
||||||
@ -9,6 +9,7 @@ using JiShe.CollectBus.Common.Extensions;
|
|||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
@ -24,9 +25,7 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
using static FreeSql.Internal.GlobalFilter;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
@ -170,9 +169,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
|
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
|
||||||
// items: meterInfos,
|
// items: meterInfos,
|
||||||
// deviceIdSelector: data => data.FocusAddress,
|
// deviceIdSelector: data => data.FocusAddress,
|
||||||
// processor: (data, groupIndex) =>
|
// processor: (data, threadId) =>
|
||||||
// {
|
// {
|
||||||
// _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
|
// _ = AmmerterCreatePublishTask(timeDensity, data);
|
||||||
// }
|
// }
|
||||||
//);
|
//);
|
||||||
|
|
||||||
@ -181,9 +180,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||||
items: meterInfos,
|
items: meterInfos,
|
||||||
deviceIdSelector: data => data.FocusAddress,
|
deviceIdSelector: data => data.FocusAddress,
|
||||||
processor: (data, groupIndex) =>
|
processor: (data,groupIndex) =>
|
||||||
{
|
{
|
||||||
AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
|
_ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -283,7 +282,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
|
||||||
redisCacheMeterInfoHashKeyTemp,
|
redisCacheMeterInfoHashKeyTemp,
|
||||||
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
||||||
pageSize: 1000,
|
pageSize: 1000,
|
||||||
@ -291,13 +290,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
lastMember: member);
|
lastMember: member);
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
meterInfos.AddRange(page.Items);
|
||||||
focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
|
focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
|
||||||
foreach (var item in page.Items)
|
foreach (var item in page.Items)
|
||||||
{
|
{
|
||||||
if (!allIds.Add(item.MemberId))
|
if (!allIds.Add(item.MemberId))
|
||||||
{
|
throw new Exception("Duplicate data found!");
|
||||||
_logger.LogError($"{item.MemberId}Duplicate data found!");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (!page.HasNext) break;
|
if (!page.HasNext) break;
|
||||||
score = page.NextScore;
|
score = page.NextScore;
|
||||||
@ -412,7 +409,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
|
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
|
||||||
redisCacheMeterInfoHashKey,
|
redisCacheMeterInfoHashKey,
|
||||||
redisCacheMeterInfoSetIndexKey,
|
redisCacheMeterInfoSetIndexKey,
|
||||||
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
|
redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos);
|
||||||
|
|
||||||
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
|
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
@ -583,104 +580,126 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
int timeDensity = 15;
|
int timeDensity = 15;
|
||||||
var currentDateTime = DateTime.Now;
|
var currentDateTime = DateTime.Now;
|
||||||
|
|
||||||
// 自动计算最佳并发度
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
|
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
|
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
||||||
var options = new ParallelOptions
|
|
||||||
{
|
{
|
||||||
MaxDegreeOfParallelism = recommendedThreads,
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//获取下发任务缓存数据
|
||||||
|
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
||||||
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
||||||
|
|
||||||
|
//将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
|
foreach (var focusItem in meterTaskInfos)
|
||||||
|
{
|
||||||
|
foreach (var ammerterItem in focusItem.Value)
|
||||||
|
{
|
||||||
|
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
||||||
|
{
|
||||||
|
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
||||||
|
MessageId = ammerterItem.Value.IssuedMessageId,
|
||||||
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
string taskBatch = "20250417155016";
|
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
|
|
||||||
{
|
|
||||||
Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
|
|
||||||
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
|
|
||||||
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>();
|
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
decimal? cursor = null;
|
|
||||||
string member = null;
|
|
||||||
bool hasNext;
|
|
||||||
do
|
|
||||||
{
|
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
|
|
||||||
redisCacheTelemetryPacketInfoHashKey,
|
|
||||||
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
|
||||||
pageSize: 1000,
|
|
||||||
lastScore: cursor,
|
|
||||||
lastMember: member);
|
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
cursor = page.HasNext ? page.NextScore : null;
|
|
||||||
member = page.HasNext ? page.NextMember : null;
|
|
||||||
hasNext = page.HasNext;
|
|
||||||
|
|
||||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
items: meterInfos,
|
|
||||||
deviceIdSelector: data => data.FocusAddress,
|
|
||||||
processor: (data, groupIndex) =>
|
|
||||||
{
|
|
||||||
_= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
|
|
||||||
}
|
}
|
||||||
);
|
}
|
||||||
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
} while (hasNext);
|
{
|
||||||
});
|
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
|
||||||
//var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
|
||||||
//if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
|
||||||
//{
|
|
||||||
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
|
||||||
// return;
|
|
||||||
//}
|
|
||||||
|
|
||||||
////获取下发任务缓存数据
|
|
||||||
//Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
|
||||||
//if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
|
||||||
//{
|
|
||||||
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
|
||||||
// return;
|
|
||||||
//}
|
|
||||||
|
|
||||||
//List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
|
||||||
|
|
||||||
////将取出的缓存任务数据发送到Kafka消息队列中
|
|
||||||
//foreach (var focusItem in meterTaskInfos)
|
|
||||||
//{
|
|
||||||
// foreach (var ammerterItem in focusItem.Value)
|
|
||||||
// {
|
|
||||||
// var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
|
||||||
// {
|
|
||||||
// MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
// MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
// FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
// TimeDensity = timeDensity.ToString(),
|
|
||||||
// };
|
|
||||||
// //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
// _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
// //_ = _producerBus.Publish(tempMsg);
|
|
||||||
|
|
||||||
// meterTaskInfosList.Add(ammerterItem.Value);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
|
||||||
//{
|
|
||||||
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
|
|
||||||
//}
|
|
||||||
|
|
||||||
|
|
||||||
//stopwatch.Stop();
|
|
||||||
|
|
||||||
//_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
stopwatch.Stop();
|
||||||
|
|
||||||
|
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表采集任务指令创建
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
|
||||||
|
/// <param name="focusGroup">集中器数据分组</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||||
|
{
|
||||||
|
if (timeDensity <= 0)
|
||||||
|
{
|
||||||
|
timeDensity = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeDensity > 15)
|
||||||
|
{
|
||||||
|
timeDensity = 15;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (focusGroup == null || focusGroup.Count <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try
|
||||||
|
{
|
||||||
|
//将采集器编号的hash值取模分组
|
||||||
|
const int TotalShards = 1024;
|
||||||
|
var focusHashGroups = new Dictionary<int, Dictionary<string, Dictionary<string, AmmeterInfo>>>();
|
||||||
|
|
||||||
|
foreach (var (collectorId, ammetersDictionary) in focusGroup)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(collectorId))
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算哈希分组ID
|
||||||
|
int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards);
|
||||||
|
|
||||||
|
// 获取或创建分组(避免重复查找)
|
||||||
|
if (!focusHashGroups.TryGetValue(hashGroupId, out var group))
|
||||||
|
{
|
||||||
|
group = new Dictionary<string, Dictionary<string, AmmeterInfo>>();
|
||||||
|
focusHashGroups[hashGroupId] = group;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 将当前集中器数据加入分组
|
||||||
|
group[collectorId] = ammetersDictionary;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (focusHashGroups == null)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//根据分组创建线程批处理集中器
|
||||||
|
foreach (var group in focusHashGroups)
|
||||||
|
{
|
||||||
|
await AmmerterCreatePublishTask2(timeDensity, group.Value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 电表创建发布任务
|
/// 电表创建发布任务
|
||||||
@ -690,15 +709,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="groupIndex">集中器所在分组</param>
|
/// <param name="groupIndex">集中器所在分组</param>
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private void AmmerterCreatePublishTask(int timeDensity
|
private async Task AmmerterCreatePublishTask(int timeDensity
|
||||||
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
|
, AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
|
||||||
{
|
{
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||||
|
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
||||||
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
@ -880,31 +899,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
|
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
await _redisDataCacheService.BatchInsertDataAsync(
|
||||||
using (var pipe = FreeRedisProvider.Instance.StartPipe())
|
redisCacheTelemetryPacketInfoHashKey,
|
||||||
{
|
redisCacheTelemetryPacketInfoSetIndexKey,
|
||||||
foreach (var item in taskList)
|
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
||||||
{
|
taskList);
|
||||||
// 主数据存储Hash
|
|
||||||
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize());
|
|
||||||
|
|
||||||
// Set索引缓存
|
|
||||||
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId);
|
|
||||||
|
|
||||||
// ZSET索引缓存Key
|
|
||||||
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId);
|
|
||||||
}
|
|
||||||
pipe.EndPipe();
|
|
||||||
}
|
|
||||||
|
|
||||||
//await _redisDataCacheService.BatchInsertDataAsync(
|
|
||||||
// redisCacheTelemetryPacketInfoHashKey,
|
|
||||||
// redisCacheTelemetryPacketInfoSetIndexKey,
|
|
||||||
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
|
||||||
// taskList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -912,15 +914,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="topicName">主题名称</param>
|
/// <param name="topicName">主题名称</param>
|
||||||
/// <param name="taskRecord">任务记录</param>
|
/// <param name="taskRecord">任务记录</param>
|
||||||
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task KafkaProducerIssuedMessage(string topicName,
|
private async Task KafkaProducerIssuedMessage(string topicName,
|
||||||
MeterReadingTelemetryPacketInfo taskRecord,int partition)
|
MeterReadingRecords taskRecord)
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
||||||
{
|
{
|
||||||
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
|
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
|
||||||
}
|
}
|
||||||
|
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
|
||||||
|
|
||||||
await _producerService.ProduceAsync(topicName, partition, taskRecord);
|
await _producerService.ProduceAsync(topicName, partition, taskRecord);
|
||||||
}
|
}
|
||||||
@ -977,6 +979,191 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表创建发布任务
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeDensity">采集频率</param>
|
||||||
|
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
private async Task AmmerterCreatePublishTask2(int timeDensity
|
||||||
|
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||||
|
{
|
||||||
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
|
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
||||||
|
|
||||||
|
var currentTime = DateTime.Now;
|
||||||
|
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||||
|
foreach (var focusInfo in focusGroup)
|
||||||
|
{
|
||||||
|
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
||||||
|
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
|
||||||
|
|
||||||
|
foreach (var ammeterInfo in focusInfo.Value)
|
||||||
|
{
|
||||||
|
var ammeter = ammeterInfo.Value;
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
//载波的不处理
|
||||||
|
if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ammeter.State.Equals(2))
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
|
||||||
|
//if (!IsGennerateCmd(ammeter.LastTime, -1))
|
||||||
|
//{
|
||||||
|
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令");
|
||||||
|
// continue;
|
||||||
|
//}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (string.IsNullOrWhiteSpace(ammeter.Address))
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (Convert.ToInt32(ammeter.Address) > 65535)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<string> tempCodes = ammeter.ItemCodes.Deserialize<List<string>>()!;
|
||||||
|
|
||||||
|
//TODO:自动上报数据只主动采集1类数据。
|
||||||
|
if (ammeter.AutomaticReport.Equals(1))
|
||||||
|
{
|
||||||
|
var tempSubCodes = new List<string>();
|
||||||
|
if (tempCodes.Contains("0C_49"))
|
||||||
|
{
|
||||||
|
tempSubCodes.Add("0C_49");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tempSubCodes.Contains("0C_149"))
|
||||||
|
{
|
||||||
|
tempSubCodes.Add("0C_149");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ammeter.ItemCodes.Contains("10_97"))
|
||||||
|
{
|
||||||
|
tempSubCodes.Add("10_97");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tempSubCodes == null || tempSubCodes.Count <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tempCodes = tempSubCodes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
|
||||||
|
|
||||||
|
foreach (var tempItem in tempCodes)
|
||||||
|
{
|
||||||
|
//排除已发送日冻结和月冻结采集项配置
|
||||||
|
if (DayFreezeCodes.Contains(tempItem))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (MonthFreezeCodes.Contains(tempItem))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var itemCodeArr = tempItem.Split('_');
|
||||||
|
var aFNStr = itemCodeArr[0];
|
||||||
|
var aFN = (AFN)aFNStr.HexToDec();
|
||||||
|
var fn = int.Parse(itemCodeArr[1]);
|
||||||
|
byte[] dataInfos = null;
|
||||||
|
if (ammeter.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据)
|
||||||
|
{
|
||||||
|
//实时数据
|
||||||
|
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
||||||
|
//特殊表暂不处理
|
||||||
|
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
||||||
|
, out var handler))
|
||||||
|
{
|
||||||
|
dataInfos = handler(new TelemetryPacketRequest()
|
||||||
|
{
|
||||||
|
FocusAddress = ammeter.FocusAddress,
|
||||||
|
Fn = fn,
|
||||||
|
Pn = ammeter.MeteringCode
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}无效编码。");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//TODO:特殊表
|
||||||
|
|
||||||
|
if (dataInfos == null || dataInfos.Length <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
var meterReadingRecords = new MeterReadingRecords()
|
||||||
|
{
|
||||||
|
ProjectID = ammeter.ProjectID,
|
||||||
|
DatabaseBusiID = ammeter.DatabaseBusiID,
|
||||||
|
PendingCopyReadTime = pendingCopyReadTime,
|
||||||
|
CreationTime = currentTime,
|
||||||
|
MeterAddress = ammeter.AmmerterAddress,
|
||||||
|
MeterId = ammeter.MeterId,
|
||||||
|
MeterType = MeterTypeEnum.Ammeter,
|
||||||
|
FocusAddress = ammeter.FocusAddress,
|
||||||
|
FocusID = ammeter.FocusId,
|
||||||
|
AFN = aFN,
|
||||||
|
Fn = fn,
|
||||||
|
ItemCode = tempItem,
|
||||||
|
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
|
||||||
|
ManualOrNot = false,
|
||||||
|
Pn = ammeter.MeteringCode,
|
||||||
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||||
|
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
||||||
|
};
|
||||||
|
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
||||||
|
|
||||||
|
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
|
||||||
|
}
|
||||||
|
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
@ -10,7 +10,7 @@ using JiShe.CollectBus.Common.DeviceBalanceControl;
|
|||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
@ -3,6 +3,7 @@ using JiShe.CollectBus.Common.Consts;
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
@ -17,7 +18,6 @@ using Microsoft.Extensions.Logging;
|
|||||||
using System;
|
using System;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
|
||||||
using TouchSocket.Sockets;
|
using TouchSocket.Sockets;
|
||||||
using Volo.Abp.Domain.Repositories;
|
using Volo.Abp.Domain.Repositories;
|
||||||
|
|
||||||
@ -3,6 +3,7 @@ using Cassandra.Data.Linq;
|
|||||||
using Cassandra.Mapping;
|
using Cassandra.Mapping;
|
||||||
using JiShe.CollectBus.Cassandra.Extensions;
|
using JiShe.CollectBus.Cassandra.Extensions;
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
using JiShe.CollectBus.Common.Attributes;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using Microsoft.AspNetCore.Http;
|
using Microsoft.AspNetCore.Http;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using Thrift.Protocol.Entities;
|
using Thrift.Protocol.Entities;
|
||||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user