Compare commits
16 Commits
76ae008e01
...
7704c6374e
| Author | SHA1 | Date | |
|---|---|---|---|
| 7704c6374e | |||
| 26e361075d | |||
| c93feb631d | |||
| 02d189358e | |||
| ca1e4e28e5 | |||
| db6e314aca | |||
| 444b01095f | |||
| 532acc575f | |||
| 19bc05c601 | |||
| f11e514ef4 | |||
| c4d4078bd9 | |||
| b064ff3d14 | |||
|
|
ac546dd029 | ||
|
|
3020a49672 | ||
|
|
d002472854 | ||
| 48b91183c2 |
5
.gitignore
vendored
5
.gitignore
vendored
@ -400,5 +400,6 @@ FodyWeavers.xsd
|
|||||||
|
|
||||||
# ABP Studio
|
# ABP Studio
|
||||||
**/.abpstudio/
|
**/.abpstudio/
|
||||||
/src/JiShe.CollectBus.Host/Plugins/*.dll
|
/web/JiShe.CollectBus.Host/Plugins/*.dll
|
||||||
JiShe.CollectBus.Kafka.Test
|
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.dll
|
||||||
|
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.Test.dll
|
||||||
|
|||||||
@ -3,41 +3,49 @@ Microsoft Visual Studio Solution File, Format Version 12.00
|
|||||||
# Visual Studio Version 17
|
# Visual Studio Version 17
|
||||||
VisualStudioVersion = 17.9.34728.123
|
VisualStudioVersion = 17.9.34728.123
|
||||||
MinimumVisualStudioVersion = 10.0.40219.1
|
MinimumVisualStudioVersion = 10.0.40219.1
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "src\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "src\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "src\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "src\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{649A3FFA-182F-4E56-9717-E6A9A2BEC545}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "src\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "web\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "src\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "web\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "src\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "src\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "src\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "src\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "services\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "src\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "modules\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "src\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeRedis", "modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Kafka", "modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.IoTDB", "modules\JiShe.CollectBus.IoTDB\JiShe.CollectBus.IoTDB.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Test", "protocols\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Cassandra", "modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "1.Web", "1.Web", "{A02F7D8A-04DC-44D6-94D4-3F65712D6B94}"
|
||||||
|
EndProject
|
||||||
|
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "4.Modules", "4.Modules", "{2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}"
|
||||||
|
EndProject
|
||||||
|
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "3.Protocols", "3.Protocols", "{3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}"
|
||||||
|
EndProject
|
||||||
|
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services", "{BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}"
|
||||||
|
EndProject
|
||||||
|
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
|
||||||
EndProject
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
@ -118,23 +126,23 @@ Global
|
|||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(NestedProjects) = preSolution
|
GlobalSection(NestedProjects) = preSolution
|
||||||
{D64C1577-4929-4B60-939E-96DE1534891A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{D64C1577-4929-4B60-939E-96DE1534891A} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
|
||||||
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
||||||
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
||||||
{78040F9E-3501-4A40-82DF-00A597710F35} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{78040F9E-3501-4A40-82DF-00A597710F35} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
||||||
{F1C58097-4C08-4D88-8976-6B3389391481} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{F1C58097-4C08-4D88-8976-6B3389391481} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
||||||
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
|
||||||
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
|
||||||
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
|
||||||
{C62EFF95-5C32-435F-BD78-6977E828F894} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{C62EFF95-5C32-435F-BD78-6977E828F894} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
|
||||||
{38C1808B-009A-418B-B17B-AB3626341B5D} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{38C1808B-009A-418B-B17B-AB3626341B5D} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
|
||||||
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
|
||||||
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
||||||
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{C06C4082-638F-2996-5FED-7784475766C1} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{F0288175-F0EC-48BD-945F-CF1512850943} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -3,7 +3,6 @@ using Cassandra.Data.Linq;
|
|||||||
using Cassandra.Mapping;
|
using Cassandra.Mapping;
|
||||||
using JiShe.CollectBus.Cassandra.Extensions;
|
using JiShe.CollectBus.Cassandra.Extensions;
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
using JiShe.CollectBus.Common.Attributes;
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
|
||||||
using Microsoft.AspNetCore.Http;
|
using Microsoft.AspNetCore.Http;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using Thrift.Protocol.Entities;
|
using Thrift.Protocol.Entities;
|
||||||
@ -15,8 +15,8 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -1,9 +1,9 @@
|
|||||||
using JiShe.CollectBus.FreeRedisProvider.Options;
|
using JiShe.CollectBus.FreeRedis.Options;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.FreeRedisProvider
|
namespace JiShe.CollectBus.FreeRedis
|
||||||
{
|
{
|
||||||
public class CollectBusFreeRedisModule : AbpModule
|
public class CollectBusFreeRedisModule : AbpModule
|
||||||
{
|
{
|
||||||
@ -1,16 +1,11 @@
|
|||||||
using FreeRedis;
|
using System.Diagnostics;
|
||||||
|
using FreeRedis;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.FreeRedis.Options;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
|
||||||
using JiShe.CollectBus.FreeRedisProvider.Options;
|
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System.Diagnostics;
|
|
||||||
using System.Text.Json;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
using static System.Runtime.InteropServices.JavaScript.JSType;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.FreeRedisProvider
|
namespace JiShe.CollectBus.FreeRedis
|
||||||
{
|
{
|
||||||
|
|
||||||
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency
|
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency
|
||||||
@ -1,7 +1,6 @@
|
|||||||
using FreeRedis;
|
using FreeRedis;
|
||||||
using JiShe.CollectBus.Common.Models;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.FreeRedisProvider
|
namespace JiShe.CollectBus.FreeRedis
|
||||||
{
|
{
|
||||||
public interface IFreeRedisProvider
|
public interface IFreeRedisProvider
|
||||||
{
|
{
|
||||||
@ -10,6 +10,6 @@
|
|||||||
<PackageReference Include="Volo.Abp" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp" Version="8.3.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
@ -1,10 +1,4 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.FreeRedis.Options
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.FreeRedisProvider.Options
|
|
||||||
{
|
{
|
||||||
public class FreeRedisOptions
|
public class FreeRedisOptions
|
||||||
{
|
{
|
||||||
@ -0,0 +1,10 @@
|
|||||||
|
namespace JiShe.CollectBus.IoTDB.Attribute
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Column分类标记特性(ATTRIBUTE字段),也就是属性字段
|
||||||
|
/// </summary>
|
||||||
|
[AttributeUsage(AttributeTargets.Property)]
|
||||||
|
public class ATTRIBUTEColumnAttribute : System.Attribute
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,10 @@
|
|||||||
|
namespace JiShe.CollectBus.IoTDB.Attribute
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Column分类标记特性(FIELD字段),数据列字段
|
||||||
|
/// </summary>
|
||||||
|
[AttributeUsage(AttributeTargets.Property)]
|
||||||
|
public class FIELDColumnAttribute : System.Attribute
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,16 +1,10 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.IoTDB.Attribute
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 用于标识当前实体为单侧点模式,单侧点模式只有一个Filed标识字段,类型是Tuple<string,object>,Item1=>测点名称,Item2=>测点值,泛型
|
/// 用于标识当前实体为单侧点模式,单侧点模式只有一个Filed标识字段,类型是Tuple<string,object>,Item1=>测点名称,Item2=>测点值,泛型
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[AttributeUsage(AttributeTargets.Property)]
|
[AttributeUsage(AttributeTargets.Property)]
|
||||||
public class SingleMeasuringAttribute : Attribute
|
public class SingleMeasuringAttribute : System.Attribute
|
||||||
{
|
{
|
||||||
public string FieldName { get; set;}
|
public string FieldName { get; set;}
|
||||||
|
|
||||||
@ -0,0 +1,10 @@
|
|||||||
|
namespace JiShe.CollectBus.IoTDB.Attribute
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Column分类标记特性(TAG字段),标签字段
|
||||||
|
/// </summary>
|
||||||
|
[AttributeUsage(AttributeTargets.Property)]
|
||||||
|
public class TAGColumnAttribute : System.Attribute
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,17 +1,12 @@
|
|||||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IoTDBProvider.Provider;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
|
using JiShe.CollectBus.IoTDB.Provider;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
using static Thrift.Server.TThreadPoolAsyncServer;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
namespace JiShe.CollectBus.IoTDB
|
||||||
{
|
{
|
||||||
public class CollectBusIoTDBModule : AbpModule
|
public class CollectBusIoTDBModule : AbpModule
|
||||||
{
|
{
|
||||||
@ -1,11 +1,7 @@
|
|||||||
using Microsoft.Extensions.Options;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using System;
|
using Microsoft.Extensions.Options;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider.Context
|
namespace JiShe.CollectBus.IoTDB.Context
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IoTDB SessionPool 运行时上下文
|
/// IoTDB SessionPool 运行时上下文
|
||||||
@ -1,6 +1,8 @@
|
|||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
|
using JiShe.CollectBus.IoTDB.Provider;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
namespace JiShe.CollectBus.IoTDB.Interface
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置
|
/// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置
|
||||||
@ -1,10 +1,4 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.IoTDB.Interface
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider.Interface
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Session 工厂接口
|
/// Session 工厂接口
|
||||||
@ -1,11 +1,6 @@
|
|||||||
using Apache.IoTDB.DataStructure;
|
using Apache.IoTDB.DataStructure;
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider.Interface
|
namespace JiShe.CollectBus.IoTDB.Interface
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Session 连接池
|
/// Session 连接池
|
||||||
@ -11,6 +11,6 @@
|
|||||||
<PackageReference Include="Volo.Abp" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp" Version="8.3.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
@ -1,10 +1,4 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.IoTDB.Options
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IOTDB配置
|
/// IOTDB配置
|
||||||
@ -1,10 +1,4 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.IoTDB.Options
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 查询条件
|
/// 查询条件
|
||||||
@ -1,10 +1,4 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.IoTDB.Options
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 查询条件
|
/// 查询条件
|
||||||
@ -1,11 +1,6 @@
|
|||||||
using Apache.IoTDB;
|
using Apache.IoTDB;
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备元数据
|
/// 设备元数据
|
||||||
@ -1,10 +1,4 @@
|
|||||||
using System;
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备路径构建器
|
/// 设备路径构建器
|
||||||
@ -1,16 +1,16 @@
|
|||||||
using Apache.IoTDB;
|
using System.Collections.Concurrent;
|
||||||
using Apache.IoTDB.DataStructure;
|
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
|
||||||
using JiShe.CollectBus.Common.Models;
|
|
||||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
|
||||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using Apache.IoTDB;
|
||||||
|
using Apache.IoTDB.DataStructure;
|
||||||
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.IoTDB.Attribute;
|
||||||
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
namespace JiShe.CollectBus.IoTDBProvider
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IoTDB数据源
|
/// IoTDB数据源
|
||||||
@ -1,13 +1,9 @@
|
|||||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
using System.Collections.Concurrent;
|
||||||
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider.Provider
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
{
|
{
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -1,4 +1,6 @@
|
|||||||
namespace JiShe.CollectBus.IoTDBProvider
|
using JiShe.CollectBus.IoTDB.Attribute;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IoT实体基类
|
/// IoT实体基类
|
||||||
@ -1,14 +1,10 @@
|
|||||||
using Apache.IoTDB.DataStructure;
|
using Apache.IoTDB;
|
||||||
using Apache.IoTDB;
|
using Apache.IoTDB.DataStructure;
|
||||||
using System;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using System.Collections.Generic;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider.Provider
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 树模型连接池
|
/// 树模型连接池
|
||||||
@ -1,14 +1,10 @@
|
|||||||
using Apache.IoTDB.DataStructure;
|
using Apache.IoTDB;
|
||||||
using Apache.IoTDB;
|
using Apache.IoTDB.DataStructure;
|
||||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using System;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDBProvider.Provider
|
namespace JiShe.CollectBus.IoTDB.Provider
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 表模型Session连接池
|
/// 表模型Session连接池
|
||||||
@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 订阅的主题
|
/// 订阅的主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string Topic { get; set; }
|
public string Topic { get; set; } = null!;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 分区
|
/// 分区
|
||||||
@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 消费者组
|
/// 消费者组
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string GroupId { get; set; }
|
public string GroupId { get; set; } = "default";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 任务数(默认是多少个分区多少个任务)
|
/// 任务数(默认是多少个分区多少个任务)
|
||||||
@ -42,35 +42,27 @@ namespace JiShe.CollectBus.Kafka.Attributes
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 批次超时时间
|
/// 批次超时时间
|
||||||
|
/// 格式:("00:05:00")
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public TimeSpan? BatchTimeout { get; set; }=null;
|
public TimeSpan? BatchTimeout { get; set; }=null;
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 订阅主题
|
/// 订阅主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
|
/// <param name="batchTimeout"></param>
|
||||||
public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
|
public KafkaSubscribeAttribute(string topic)
|
||||||
{
|
{
|
||||||
this.Topic = topic;
|
this.Topic = topic;
|
||||||
this.GroupId = groupId;
|
|
||||||
this.EnableBatch = enableBatch;
|
|
||||||
this.BatchSize = batchSize;
|
|
||||||
this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 订阅主题
|
/// 订阅主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
|
public KafkaSubscribeAttribute(string topic, int partition)
|
||||||
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
|
|
||||||
{
|
{
|
||||||
this.Topic = topic;
|
this.Topic = topic;
|
||||||
this.Partition = partition;
|
this.Partition = partition;
|
||||||
this.GroupId = groupId;
|
|
||||||
this.TaskCount = 1;
|
|
||||||
this.EnableBatch = enableBatch;
|
|
||||||
this.BatchSize = batchSize;
|
|
||||||
this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -39,6 +39,8 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
context.Services.AddSingleton<IProducerService, ProducerService>();
|
context.Services.AddSingleton<IProducerService, ProducerService>();
|
||||||
// 注册Consumer
|
// 注册Consumer
|
||||||
context.Services.AddSingleton<IConsumerService, ConsumerService>();
|
context.Services.AddSingleton<IConsumerService, ConsumerService>();
|
||||||
|
|
||||||
|
context.Services.AddHostedService<HostedService>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
||||||
@ -46,7 +48,7 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
var app = context.GetApplicationBuilder();
|
var app = context.GetApplicationBuilder();
|
||||||
|
|
||||||
// 注册Subscriber
|
// 注册Subscriber
|
||||||
app.ApplicationServices.UseKafkaSubscribers();
|
//app.ApplicationServices.UseKafkaSubscribe();
|
||||||
|
|
||||||
// 获取程序集
|
// 获取程序集
|
||||||
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
||||||
@ -1,4 +1,5 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@ -50,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
EnableAutoCommit = false, // 禁止AutoCommit
|
EnableAutoCommit = false, // 禁止AutoCommit
|
||||||
EnablePartitionEof = true, // 启用分区末尾标记
|
EnablePartitionEof = true, // 启用分区末尾标记
|
||||||
//AllowAutoCreateTopics= true, // 启用自动创建
|
AllowAutoCreateTopics = true, // 启用自动创建
|
||||||
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
|
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -105,27 +106,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
|
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
(
|
//(
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
// CreateConsumer<TKey, TValue>(groupId),
|
||||||
cts
|
// cts
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
//)).Consumer as IConsumer<TKey, TValue>;
|
||||||
|
var consumer = CreateConsumer<TKey, TValue>(groupId);
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
await Task.Run(async () =>
|
||||||
{
|
{
|
||||||
while (!cts.IsCancellationRequested)
|
while (!cts.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
|
||||||
|
|
||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
if (result == null || result.Message==null || result.Message.Value == null)
|
||||||
{
|
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
|
||||||
//consumer.Commit(result); // 手动提交
|
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
if (result.IsPartitionEOF)
|
if (result.IsPartitionEOF)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
||||||
@ -151,7 +151,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -170,30 +170,38 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||||
{
|
{
|
||||||
|
try {
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
|
||||||
|
//{
|
||||||
|
// string ssss = "";
|
||||||
|
//}
|
||||||
|
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
|
//(
|
||||||
|
// CreateConsumer<string, TValue>(groupId),
|
||||||
|
// cts
|
||||||
|
//)).Consumer as IConsumer<string, TValue>;
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
var consumer = CreateConsumer<Ignore, TValue>(groupId);
|
||||||
(
|
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
_ = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
|
int count = 0;
|
||||||
while (!cts.IsCancellationRequested)
|
while (!cts.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
|
||||||
|
count++;
|
||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message == null || result.Message.Value == null)
|
if (result == null || result.Message == null || result.Message.Value == null)
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
await Task.Delay(500, cts.Token);
|
||||||
//consumer.Commit(result); // 手动提交
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.IsPartitionEOF)
|
if (result.IsPartitionEOF)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
||||||
@ -206,6 +214,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
// 检查 Header 是否符合条件
|
// 检查 Header 是否符合条件
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
if (!headersFilter.Match(result.Message.Headers))
|
||||||
{
|
{
|
||||||
|
await Task.Delay(500, cts.Token);
|
||||||
//consumer.Commit(result); // 提交偏移量
|
//consumer.Commit(result); // 提交偏移量
|
||||||
// 跳过消息
|
// 跳过消息
|
||||||
continue;
|
continue;
|
||||||
@ -214,13 +223,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
bool sucess = await messageHandler(result.Message.Value);
|
bool sucess = await messageHandler(result.Message.Value);
|
||||||
if (sucess)
|
if (sucess)
|
||||||
consumer.Commit(result); // 手动提交
|
consumer.Commit(result); // 手动提交
|
||||||
|
else
|
||||||
|
consumer.StoreOffset(result);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
await Task.CompletedTask;
|
await Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -339,7 +356,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
{
|
{
|
||||||
@ -385,14 +402,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <param name="consumeTimeout">消费等待时间</param>
|
/// <param name="consumeTimeout">消费等待时间</param>
|
||||||
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<string, TValue>(groupId),
|
||||||
cts
|
cts
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
)).Consumer as IConsumer<string, TValue>;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
45
modules/JiShe.CollectBus.Kafka/HostedService.cs
Normal file
45
modules/JiShe.CollectBus.Kafka/HostedService.cs
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Kafka
|
||||||
|
{
|
||||||
|
public class HostedService : IHostedService, IDisposable
|
||||||
|
{
|
||||||
|
private readonly ILogger _logger;
|
||||||
|
private readonly IServiceProvider _provider;
|
||||||
|
public HostedService(ILogger<HostedService> logger, IServiceProvider provider)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_provider = provider;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("程序启动");
|
||||||
|
Task.Run(() =>
|
||||||
|
{
|
||||||
|
_provider.UseKafkaSubscribe();
|
||||||
|
});
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StopAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("结束");
|
||||||
|
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -13,7 +13,7 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -1,6 +1,7 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
using JiShe.CollectBus.Kafka.AdminClient;
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
using JiShe.CollectBus.Kafka.Attributes;
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
using JiShe.CollectBus.Kafka.Consumer;
|
||||||
@ -10,7 +11,10 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
namespace JiShe.CollectBus.Kafka
|
||||||
{
|
{
|
||||||
@ -21,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="app"></param>
|
/// <param name="app"></param>
|
||||||
/// <param name="assembly"></param>
|
/// <param name="assembly"></param>
|
||||||
public static void UseKafkaSubscribers(this IServiceProvider provider)
|
public static async Task UseKafkaSubscribe(this IServiceProvider provider)
|
||||||
{
|
{
|
||||||
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
//var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
||||||
|
|
||||||
//初始化主题信息
|
//初始化主题信息
|
||||||
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
|
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
|
||||||
@ -36,9 +40,9 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
{
|
{
|
||||||
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
|
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
}
|
}
|
||||||
|
List<Task> tasks = new List<Task>();
|
||||||
lifetime.ApplicationStarted.Register(() =>
|
//lifetime.ApplicationStarted.Register(async() =>
|
||||||
{
|
//{
|
||||||
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
||||||
int threadCount = 0;
|
int threadCount = 0;
|
||||||
int topicCount = 0;
|
int topicCount = 0;
|
||||||
@ -62,14 +66,15 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
!type.IsAbstract && !type.IsInterface).ToList(); ;
|
!type.IsAbstract && !type.IsInterface).ToList(); ;
|
||||||
if (subscribeTypes.Count == 0)
|
if (subscribeTypes.Count == 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
foreach (var subscribeType in subscribeTypes)
|
foreach (var subscribeType in subscribeTypes)
|
||||||
{
|
{
|
||||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||||
subscribes.ForEach(subscribe =>
|
subscribes.ForEach(async subscribe =>
|
||||||
{
|
{
|
||||||
if (subscribe!=null)
|
if (subscribe!=null)
|
||||||
{
|
{
|
||||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
|
Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
|
||||||
threadCount += tuple.Item1;
|
threadCount += tuple.Item1;
|
||||||
topicCount += tuple.Item2;
|
topicCount += tuple.Item2;
|
||||||
}
|
}
|
||||||
@ -77,58 +82,59 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
||||||
});
|
//});
|
||||||
|
await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
|
//public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
|
||||||
{
|
//{
|
||||||
var provider = app.ApplicationServices;
|
// var provider = app.ApplicationServices;
|
||||||
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
// var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
||||||
//初始化主题信息
|
// //初始化主题信息
|
||||||
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
|
// var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
|
||||||
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
|
// var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
|
||||||
|
|
||||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
// List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||||
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
// topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
||||||
|
|
||||||
foreach (var item in topics)
|
// foreach (var item in topics)
|
||||||
{
|
// {
|
||||||
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
|
// kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
}
|
// }
|
||||||
|
|
||||||
lifetime.ApplicationStarted.Register(() =>
|
// lifetime.ApplicationStarted.Register(async () =>
|
||||||
{
|
// {
|
||||||
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
// var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
||||||
int threadCount = 0;
|
// int threadCount = 0;
|
||||||
int topicCount = 0;
|
// int topicCount = 0;
|
||||||
var subscribeTypes = assembly.GetTypes()
|
// var subscribeTypes = assembly.GetTypes()
|
||||||
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
|
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
|
||||||
.ToList();
|
// .ToList();
|
||||||
|
|
||||||
if (subscribeTypes.Count == 0) return;
|
// if (subscribeTypes.Count == 0) return;
|
||||||
foreach (var subscribeType in subscribeTypes)
|
// foreach (var subscribeType in subscribeTypes)
|
||||||
{
|
// {
|
||||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
// var subscribes = provider.GetServices(subscribeType).ToList();
|
||||||
subscribes.ForEach(subscribe => {
|
// subscribes.ForEach(async subscribe => {
|
||||||
|
|
||||||
if (subscribe != null)
|
// if (subscribe != null)
|
||||||
{
|
// {
|
||||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
|
// Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||||
threadCount += tuple.Item1;
|
// threadCount += tuple.Item1;
|
||||||
topicCount += tuple.Item2;
|
// topicCount += tuple.Item2;
|
||||||
}
|
// }
|
||||||
});
|
// });
|
||||||
}
|
// }
|
||||||
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
||||||
});
|
// });
|
||||||
}
|
//}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 构建Kafka订阅
|
/// 构建Kafka订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="subscribe"></param>
|
/// <param name="subscribe"></param>
|
||||||
/// <param name="provider"></param>
|
/// <param name="provider"></param>
|
||||||
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig, List<Task> tasks)
|
||||||
{
|
{
|
||||||
var subscribedMethods = subscribe.GetType().GetMethods()
|
var subscribedMethods = subscribe.GetType().GetMethods()
|
||||||
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
||||||
@ -136,20 +142,26 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
.ToArray();
|
.ToArray();
|
||||||
//var configuration = provider.GetRequiredService<IConfiguration>();
|
//var configuration = provider.GetRequiredService<IConfiguration>();
|
||||||
int threadCount = 0;
|
int threadCount = 0;
|
||||||
|
|
||||||
foreach (var sub in subscribedMethods)
|
foreach (var sub in subscribedMethods)
|
||||||
{
|
{
|
||||||
int partitionCount = kafkaOptionConfig.NumPartitions;
|
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
|
||||||
//var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
#if DEBUG
|
||||||
|
var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
||||||
|
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
|
||||||
|
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
|
||||||
|
#endif
|
||||||
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
|
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
|
||||||
if (partitionCount <= 0)
|
if (partitionCount <= 0)
|
||||||
partitionCount = 1;
|
partitionCount = 1;
|
||||||
for (int i = 0; i < partitionCount; i++)
|
for (int i = 0; i < partitionCount; i++)
|
||||||
{
|
{
|
||||||
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
|
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
|
||||||
|
tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
|
||||||
threadCount++;
|
threadCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Tuple.Create(threadCount, subscribedMethods.Length);
|
return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -166,12 +178,15 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
|
|
||||||
if (attr.EnableBatch)
|
if (attr.EnableBatch)
|
||||||
{
|
{
|
||||||
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
|
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
|
logger.LogInformation($"kafka批量消费消息:{message}");
|
||||||
|
#endif
|
||||||
// 处理消息
|
// 处理消息
|
||||||
return await ProcessMessageAsync(message, method, subscribe);
|
return await ProcessMessageAsync(message.ToList(), method, subscribe);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
@ -183,12 +198,15 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
|
await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
|
logger.LogInformation($"kafka消费消息:{message}");
|
||||||
|
#endif
|
||||||
// 处理消息
|
// 处理消息
|
||||||
return await ProcessMessageAsync(message, method, subscribe);
|
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
@ -209,34 +227,40 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// <param name="method"></param>
|
/// <param name="method"></param>
|
||||||
/// <param name="subscribe"></param>
|
/// <param name="subscribe"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
|
private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe)
|
||||||
{
|
{
|
||||||
var parameters = method.GetParameters();
|
var parameters = method.GetParameters();
|
||||||
bool isGenericTask = method.ReturnType.IsGenericType
|
bool isGenericTask = method.ReturnType.IsGenericType
|
||||||
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
|
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
|
||||||
bool existParameters = parameters.Length > 0;
|
bool existParameters = parameters.Length > 0;
|
||||||
//dynamic? messageObj= null;
|
List<object>? messageObj = null;
|
||||||
//if (existParameters)
|
if (existParameters)
|
||||||
//{
|
|
||||||
// var paramType = parameters[0].ParameterType;
|
|
||||||
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
|
|
||||||
//}
|
|
||||||
if (isGenericTask)
|
|
||||||
{
|
{
|
||||||
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
|
messageObj = new List<object>();
|
||||||
if (result is ISubscribeAck ackResult)
|
var paramType = parameters[0].ParameterType;
|
||||||
|
foreach (var msg in messages)
|
||||||
|
{
|
||||||
|
var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg;
|
||||||
|
if (data != null)
|
||||||
|
messageObj.Add(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var result = method.Invoke(subscribe, messageObj?.ToArray());
|
||||||
|
if (result is Task<ISubscribeAck> genericTask)
|
||||||
|
{
|
||||||
|
await genericTask.ConfigureAwait(false);
|
||||||
|
return genericTask.Result.Ack;
|
||||||
|
}
|
||||||
|
else if (result is Task nonGenericTask)
|
||||||
|
{
|
||||||
|
await nonGenericTask.ConfigureAwait(false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else if (result is ISubscribeAck ackResult)
|
||||||
{
|
{
|
||||||
return ackResult.Ack;
|
return ackResult.Ack;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
|
|
||||||
if (result is ISubscribeAck ackResult)
|
|
||||||
{
|
|
||||||
return ackResult.Ack;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
|
|
||||||
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
||||||
|
|
||||||
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class;
|
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -126,9 +126,10 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
||||||
{
|
{
|
||||||
var typeKey = typeof(KafkaProducer<string, TValue>);
|
var typeKey = typeof(KafkaProducer<string, TValue>);
|
||||||
var producer = GetProducer<string, TValue>(typeKey);
|
var producer = GetProducer<Null, TValue>(typeKey);
|
||||||
var message = new Message<string, TValue>
|
var message = new Message<Null, TValue>
|
||||||
{
|
{
|
||||||
|
//Key= _kafkaOptionConfig.ServerTagName,
|
||||||
Value = value,
|
Value = value,
|
||||||
Headers = new Headers{
|
Headers = new Headers{
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||||||
@ -183,17 +184,18 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <param name="partition"></param>
|
/// <param name="partition"></param>
|
||||||
/// <param name="deliveryHandler"></param>
|
/// <param name="deliveryHandler"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class
|
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
|
||||||
{
|
{
|
||||||
var message = new Message<string, TValue>
|
var message = new Message<Null, TValue>
|
||||||
{
|
{
|
||||||
|
//Key = _kafkaOptionConfig.ServerTagName,
|
||||||
Value = value,
|
Value = value,
|
||||||
Headers = new Headers{
|
Headers = new Headers{
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
var typeKey = typeof(KafkaProducer<string, TValue>);
|
var typeKey = typeof(KafkaProducer<Null, TValue>);
|
||||||
var producer = GetProducer<string, TValue>(typeKey);
|
var producer = GetProducer<Null, TValue>(typeKey);
|
||||||
if (partition.HasValue)
|
if (partition.HasValue)
|
||||||
{
|
{
|
||||||
var topicPartition = new TopicPartition(topic, partition.Value);
|
var topicPartition = new TopicPartition(topic, partition.Value);
|
||||||
@ -17,7 +17,7 @@
|
|||||||
<PackageReference Include="Volo.Abp.MongoDB" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.MongoDB" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.BackgroundJobs.MongoDB" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.BackgroundJobs.MongoDB" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.AuditLogging.MongoDB" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AuditLogging.MongoDB" Version="8.3.3" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -6,6 +6,12 @@
|
|||||||
<Nullable>enable</Nullable>
|
<Nullable>enable</Nullable>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<Compile Remove="Extensions\**" />
|
||||||
|
<EmbeddedResource Remove="Extensions\**" />
|
||||||
|
<None Remove="Extensions\**" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
|
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
|
||||||
@ -15,13 +21,9 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<Folder Include="Extensions\" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -16,13 +16,13 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<Target Name="PostBuild" AfterTargets="PostBuildEvent">
|
<Target Name="PostBuild" AfterTargets="PostBuildEvent">
|
||||||
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.Test.dll $(ProjectDir)..\JiShe.CollectBus.Host\Plugins\" />
|
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.Test.dll $(ProjectDir)..\..\web\JiShe.CollectBus.Host\Plugins\" />
|
||||||
</Target>
|
</Target>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -16,13 +16,13 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<Target Name="PostBuild" AfterTargets="PostBuildEvent">
|
<Target Name="PostBuild" AfterTargets="PostBuildEvent">
|
||||||
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.dll $(ProjectDir)..\JiShe.CollectBus.Host\Plugins\" />
|
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.dll $(ProjectDir)..\..\web\JiShe.CollectBus.Host\Plugins\" />
|
||||||
</Target>
|
</Target>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user