Compare commits

...

16 Commits

405 changed files with 598 additions and 1566 deletions

5
.gitignore vendored
View File

@ -400,5 +400,6 @@ FodyWeavers.xsd
# ABP Studio # ABP Studio
**/.abpstudio/ **/.abpstudio/
/src/JiShe.CollectBus.Host/Plugins/*.dll /web/JiShe.CollectBus.Host/Plugins/*.dll
JiShe.CollectBus.Kafka.Test /web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.dll
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.Test.dll

View File

@ -3,41 +3,49 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17 # Visual Studio Version 17
VisualStudioVersion = 17.9.34728.123 VisualStudioVersion = 17.9.34728.123
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "src\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "src\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "src\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "src\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{649A3FFA-182F-4E56-9717-E6A9A2BEC545}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "src\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "web\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "src\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "web\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "src\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "src\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "src\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "src\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "services\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "src\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "modules\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "src\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeRedis", "modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Kafka", "modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.IoTDB", "modules\JiShe.CollectBus.IoTDB\JiShe.CollectBus.IoTDB.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Test", "protocols\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Cassandra", "modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "1.Web", "1.Web", "{A02F7D8A-04DC-44D6-94D4-3F65712D6B94}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "4.Modules", "4.Modules", "{2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "3.Protocols", "3.Protocols", "{3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services", "{BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -118,23 +126,23 @@ Global
HideSolutionNode = FALSE HideSolutionNode = FALSE
EndGlobalSection EndGlobalSection
GlobalSection(NestedProjects) = preSolution GlobalSection(NestedProjects) = preSolution
{D64C1577-4929-4B60-939E-96DE1534891A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {D64C1577-4929-4B60-939E-96DE1534891A} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{78040F9E-3501-4A40-82DF-00A597710F35} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {78040F9E-3501-4A40-82DF-00A597710F35} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{F1C58097-4C08-4D88-8976-6B3389391481} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {F1C58097-4C08-4D88-8976-6B3389391481} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {35829A15-4127-4F69-8BDE-9405DEAACA9A} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {AD2F1928-4411-4511-B564-5FB996EC08B9} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
{C62EFF95-5C32-435F-BD78-6977E828F894} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {C62EFF95-5C32-435F-BD78-6977E828F894} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{38C1808B-009A-418B-B17B-AB3626341B5D} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {38C1808B-009A-418B-B17B-AB3626341B5D} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {C06C4082-638F-2996-5FED-7784475766C1} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {F0288175-F0EC-48BD-945F-CF1512850943} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -3,7 +3,6 @@ using Cassandra.Data.Linq;
using Cassandra.Mapping; using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Extensions; using JiShe.CollectBus.Cassandra.Extensions;
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDBProvider;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using System.Reflection; using System.Reflection;
using Thrift.Protocol.Entities; using Thrift.Protocol.Entities;

View File

@ -15,8 +15,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,9 +1,9 @@
using JiShe.CollectBus.FreeRedisProvider.Options; using JiShe.CollectBus.FreeRedis.Options;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedis
{ {
public class CollectBusFreeRedisModule : AbpModule public class CollectBusFreeRedisModule : AbpModule
{ {

View File

@ -1,16 +1,11 @@
using FreeRedis; using System.Diagnostics;
using FreeRedis;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.FreeRedis.Options;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedisProvider.Options;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text.Json;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Collections.Concurrent;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedis
{ {
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency

View File

@ -1,7 +1,6 @@
using FreeRedis; using FreeRedis;
using JiShe.CollectBus.Common.Models;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedis
{ {
public interface IFreeRedisProvider public interface IFreeRedisProvider
{ {

View File

@ -10,6 +10,6 @@
<PackageReference Include="Volo.Abp" Version="8.3.3" /> <PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,10 +1,4 @@
using System; namespace JiShe.CollectBus.FreeRedis.Options
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.FreeRedisProvider.Options
{ {
public class FreeRedisOptions public class FreeRedisOptions
{ {

View File

@ -0,0 +1,10 @@
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// Column分类标记特性ATTRIBUTE字段,也就是属性字段
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class ATTRIBUTEColumnAttribute : System.Attribute
{
}
}

View File

@ -0,0 +1,10 @@
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// Column分类标记特性FIELD字段数据列字段
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class FIELDColumnAttribute : System.Attribute
{
}
}

View File

@ -1,16 +1,10 @@
using System; namespace JiShe.CollectBus.IoTDB.Attribute
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// 用于标识当前实体为单侧点模式单侧点模式只有一个Filed标识字段,类型是Tuple<string,object>,Item1=>测点名称Item2=>测点值,泛型 /// 用于标识当前实体为单侧点模式单侧点模式只有一个Filed标识字段,类型是Tuple<string,object>,Item1=>测点名称Item2=>测点值,泛型
/// </summary> /// </summary>
[AttributeUsage(AttributeTargets.Property)] [AttributeUsage(AttributeTargets.Property)]
public class SingleMeasuringAttribute : Attribute public class SingleMeasuringAttribute : System.Attribute
{ {
public string FieldName { get; set;} public string FieldName { get; set;}

View File

@ -0,0 +1,10 @@
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// Column分类标记特性TAG字段标签字段
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class TAGColumnAttribute : System.Attribute
{
}
}

View File

@ -1,17 +1,12 @@
using JiShe.CollectBus.IoTDBProvider.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDBProvider.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using static Thrift.Server.TThreadPoolAsyncServer;
namespace JiShe.CollectBus.IoTDBProvider namespace JiShe.CollectBus.IoTDB
{ {
public class CollectBusIoTDBModule : AbpModule public class CollectBusIoTDBModule : AbpModule
{ {

View File

@ -1,11 +1,7 @@
using Microsoft.Extensions.Options; using JiShe.CollectBus.IoTDB.Options;
using System; using Microsoft.Extensions.Options;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Context namespace JiShe.CollectBus.IoTDB.Context
{ {
/// <summary> /// <summary>
/// IoTDB SessionPool 运行时上下文 /// IoTDB SessionPool 运行时上下文

View File

@ -1,6 +1,8 @@
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
namespace JiShe.CollectBus.IoTDBProvider namespace JiShe.CollectBus.IoTDB.Interface
{ {
/// <summary> /// <summary>
/// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置 /// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置

View File

@ -1,10 +1,4 @@
using System; namespace JiShe.CollectBus.IoTDB.Interface
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Interface
{ {
/// <summary> /// <summary>
/// Session 工厂接口 /// Session 工厂接口

View File

@ -1,11 +1,6 @@
using Apache.IoTDB.DataStructure; using Apache.IoTDB.DataStructure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Interface namespace JiShe.CollectBus.IoTDB.Interface
{ {
/// <summary> /// <summary>
/// Session 连接池 /// Session 连接池

View File

@ -11,6 +11,6 @@
<PackageReference Include="Volo.Abp" Version="8.3.3" /> <PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,10 +1,4 @@
using System; namespace JiShe.CollectBus.IoTDB.Options
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// IOTDB配置 /// IOTDB配置

View File

@ -1,10 +1,4 @@
using System; namespace JiShe.CollectBus.IoTDB.Options
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// 查询条件 /// 查询条件

View File

@ -1,10 +1,4 @@
using System; namespace JiShe.CollectBus.IoTDB.Options
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// 查询条件 /// 查询条件

View File

@ -1,11 +1,6 @@
using Apache.IoTDB; using Apache.IoTDB;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider namespace JiShe.CollectBus.IoTDB.Provider
{ {
/// <summary> /// <summary>
/// 设备元数据 /// 设备元数据

View File

@ -1,10 +1,4 @@
using System; namespace JiShe.CollectBus.IoTDB.Provider
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// 设备路径构建器 /// 设备路径构建器

View File

@ -1,16 +1,16 @@
using Apache.IoTDB; using System.Collections.Concurrent;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Reflection; using System.Reflection;
using System.Text; using System.Text;
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDB.Provider
namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// IoTDB数据源 /// IoTDB数据源

View File

@ -1,13 +1,9 @@
using JiShe.CollectBus.IoTDBProvider.Interface; using System.Collections.Concurrent;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Provider namespace JiShe.CollectBus.IoTDB.Provider
{ {
/// <summary> /// <summary>

View File

@ -1,4 +1,6 @@
namespace JiShe.CollectBus.IoTDBProvider using JiShe.CollectBus.IoTDB.Attribute;
namespace JiShe.CollectBus.IoTDB.Provider
{ {
/// <summary> /// <summary>
/// IoT实体基类 /// IoT实体基类

View File

@ -1,14 +1,10 @@
using Apache.IoTDB.DataStructure; using Apache.IoTDB;
using Apache.IoTDB; using Apache.IoTDB.DataStructure;
using System; using JiShe.CollectBus.IoTDB.Interface;
using System.Collections.Generic; using JiShe.CollectBus.IoTDB.Options;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IoTDBProvider.Interface;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider.Provider namespace JiShe.CollectBus.IoTDB.Provider
{ {
/// <summary> /// <summary>
/// 树模型连接池 /// 树模型连接池

View File

@ -1,14 +1,10 @@
using Apache.IoTDB.DataStructure; using Apache.IoTDB;
using Apache.IoTDB; using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.IoTDBProvider.Interface; using JiShe.CollectBus.IoTDB.Interface;
using System; using JiShe.CollectBus.IoTDB.Options;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider.Provider namespace JiShe.CollectBus.IoTDB.Provider
{ {
/// <summary> /// <summary>
/// 表模型Session连接池 /// 表模型Session连接池

View File

@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 订阅的主题 /// 订阅的主题
/// </summary> /// </summary>
public string Topic { get; set; } public string Topic { get; set; } = null!;
/// <summary> /// <summary>
/// 分区 /// 分区
@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 消费者组 /// 消费者组
/// </summary> /// </summary>
public string GroupId { get; set; } public string GroupId { get; set; } = "default";
/// <summary> /// <summary>
/// 任务数(默认是多少个分区多少个任务) /// 任务数(默认是多少个分区多少个任务)
@ -42,35 +42,27 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 批次超时时间 /// 批次超时时间
/// 格式:("00:05:00")
/// </summary> /// </summary>
public TimeSpan? BatchTimeout { get; set; }=null; public TimeSpan? BatchTimeout { get; set; }=null;
/// <summary> /// <summary>
/// 订阅主题 /// 订阅主题
/// </summary> /// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param> /// <param name="batchTimeout"></param>
public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null) public KafkaSubscribeAttribute(string topic)
{ {
this.Topic = topic; this.Topic = topic;
this.GroupId = groupId;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
} }
/// <summary> /// <summary>
/// 订阅主题 /// 订阅主题
/// </summary> /// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param> public KafkaSubscribeAttribute(string topic, int partition)
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
{ {
this.Topic = topic; this.Topic = topic;
this.Partition = partition; this.Partition = partition;
this.GroupId = groupId;
this.TaskCount = 1;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
} }
} }
} }

View File

@ -39,6 +39,8 @@ namespace JiShe.CollectBus.Kafka
context.Services.AddSingleton<IProducerService, ProducerService>(); context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer // 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>(); context.Services.AddSingleton<IConsumerService, ConsumerService>();
context.Services.AddHostedService<HostedService>();
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
@ -46,7 +48,7 @@ namespace JiShe.CollectBus.Kafka
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
// 注册Subscriber // 注册Subscriber
app.ApplicationServices.UseKafkaSubscribers(); //app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集 // 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));

View File

@ -1,4 +1,5 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -50,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记 EnablePartitionEof = true, // 启用分区末尾标记
//AllowAutoCreateTopics= true, // 启用自动创建 AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
}; };
@ -105,27 +106,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
var consumerKey = typeof(KafkaConsumer<TKey, TValue>); var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( //(
CreateConsumer<TKey, TValue>(groupId), // CreateConsumer<TKey, TValue>(groupId),
cts // cts
)).Consumer as IConsumer<TKey, TValue>; //)).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<TKey, TValue>(groupId);
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
_ = Task.Run(async () => await Task.Run(async () =>
{ {
while (!cts.IsCancellationRequested) while (!cts.IsCancellationRequested)
{ {
try try
{ {
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token); var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null) if (result == null || result.Message==null || result.Message.Value == null)
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
//consumer.Commit(result); // 手动提交
continue; continue;
}
if (result.IsPartitionEOF) if (result.IsPartitionEOF)
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
@ -151,7 +151,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); _logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
} }
} }
}); });
@ -170,30 +170,38 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{ {
try {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>); var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
//{
// string ssss = "";
//}
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<string, TValue>(groupId),
// cts
//)).Consumer as IConsumer<string, TValue>;
var consumer = _consumerStore.GetOrAdd(consumerKey, _=> var consumer = CreateConsumer<Ignore, TValue>(groupId);
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
int count = 0;
while (!cts.IsCancellationRequested) while (!cts.IsCancellationRequested)
{ {
try try
{ {
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
count++;
var result = consumer.Consume(cts.Token); var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null) if (result == null || result.Message == null || result.Message.Value == null)
{ {
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 手动提交
continue; continue;
} }
if (result.IsPartitionEOF) if (result.IsPartitionEOF)
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
@ -206,6 +214,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 提交偏移量 //consumer.Commit(result); // 提交偏移量
// 跳过消息 // 跳过消息
continue; continue;
@ -214,13 +223,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
bool sucess = await messageHandler(result.Message.Value); bool sucess = await messageHandler(result.Message.Value);
if (sucess) if (sucess)
consumer.Commit(result); // 手动提交 consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); _logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
} }
} }
}); });
} catch (Exception ex)
{
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
}
await Task.CompletedTask; await Task.CompletedTask;
} }
@ -339,7 +356,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); _logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -385,14 +402,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="consumeTimeout">消费等待时间</param> /// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
{ {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>); var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( (
CreateConsumer<Ignore, TValue>(groupId), CreateConsumer<string, TValue>(groupId),
cts cts
)).Consumer as IConsumer<Ignore, TValue>; )).Consumer as IConsumer<string, TValue>;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);

View File

@ -0,0 +1,45 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class HostedService : IHostedService, IDisposable
{
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
public HostedService(ILogger<HostedService> logger, IServiceProvider provider)
{
_logger = logger;
_provider = provider;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(() =>
{
_provider.UseKafkaSubscribe();
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
public void Dispose()
{
}
}
}

View File

@ -13,7 +13,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,6 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
@ -10,7 +11,10 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System.Collections.Generic;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka
{ {
@ -21,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="app"></param> /// <param name="app"></param>
/// <param name="assembly"></param> /// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IServiceProvider provider) public static async Task UseKafkaSubscribe(this IServiceProvider provider)
{ {
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); //var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息 //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
@ -36,9 +40,9 @@ namespace JiShe.CollectBus.Kafka
{ {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} }
List<Task> tasks = new List<Task>();
lifetime.ApplicationStarted.Register(() => //lifetime.ApplicationStarted.Register(async() =>
{ //{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; int threadCount = 0;
int topicCount = 0; int topicCount = 0;
@ -62,14 +66,15 @@ namespace JiShe.CollectBus.Kafka
!type.IsAbstract && !type.IsInterface).ToList(); ; !type.IsAbstract && !type.IsInterface).ToList(); ;
if (subscribeTypes.Count == 0) if (subscribeTypes.Count == 0)
continue; continue;
foreach (var subscribeType in subscribeTypes) foreach (var subscribeType in subscribeTypes)
{ {
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => subscribes.ForEach(async subscribe =>
{ {
if (subscribe!=null) if (subscribe!=null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -77,58 +82,59 @@ namespace JiShe.CollectBus.Kafka
} }
} }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); //});
await Task.WhenAll(tasks);
} }
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{ //{
var provider = app.ApplicationServices; // var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); // var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息 // //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); // var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>(); // var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); // List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); // topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics) // foreach (var item in topics)
{ // {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); // kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} // }
lifetime.ApplicationStarted.Register(() => // lifetime.ApplicationStarted.Register(async () =>
{ // {
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); // var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; // int threadCount = 0;
int topicCount = 0; // int topicCount = 0;
var subscribeTypes = assembly.GetTypes() // var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList(); // .ToList();
if (subscribeTypes.Count == 0) return; // if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes) // foreach (var subscribeType in subscribeTypes)
{ // {
var subscribes = provider.GetServices(subscribeType).ToList(); // var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => { // subscribes.ForEach(async subscribe => {
if (subscribe != null) // if (subscribe != null)
{ // {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); // Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; // threadCount += tuple.Item1;
topicCount += tuple.Item2; // topicCount += tuple.Item2;
} // }
}); // });
} // }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); // });
} //}
/// <summary> /// <summary>
/// 构建Kafka订阅 /// 构建Kafka订阅
/// </summary> /// </summary>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig) private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig, List<Task> tasks)
{ {
var subscribedMethods = subscribe.GetType().GetMethods() var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() }) .Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -136,20 +142,26 @@ namespace JiShe.CollectBus.Kafka
.ToArray(); .ToArray();
//var configuration = provider.GetRequiredService<IConfiguration>(); //var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0; int threadCount = 0;
foreach (var sub in subscribedMethods) foreach (var sub in subscribedMethods)
{ {
int partitionCount = kafkaOptionConfig.NumPartitions; int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService<IAdminClientService>(); #if DEBUG
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
#endif
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)
partitionCount = 1; partitionCount = 1;
for (int i = 0; i < partitionCount; i++) for (int i = 0; i < partitionCount; i++)
{ {
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
threadCount++; threadCount++;
} }
} }
return Tuple.Create(threadCount, subscribedMethods.Length); return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length));
} }
/// <summary> /// <summary>
@ -166,12 +178,15 @@ namespace JiShe.CollectBus.Kafka
if (attr.EnableBatch) if (attr.EnableBatch)
{ {
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
{ {
try try
{ {
#if DEBUG
logger.LogInformation($"kafka批量消费消息:{message}");
#endif
// 处理消息 // 处理消息
return await ProcessMessageAsync(message, method, subscribe); return await ProcessMessageAsync(message.ToList(), method, subscribe);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
@ -179,16 +194,19 @@ namespace JiShe.CollectBus.Kafka
logger.LogError($"kafka批量消费异常:{ex.Message}"); logger.LogError($"kafka批量消费异常:{ex.Message}");
} }
return await Task.FromResult(false); return await Task.FromResult(false);
}, attr.GroupId, attr.BatchSize,attr.BatchTimeout); }, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
} }
else else
{ {
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
{ {
try try
{ {
#if DEBUG
logger.LogInformation($"kafka消费消息:{message}");
#endif
// 处理消息 // 处理消息
return await ProcessMessageAsync(message, method, subscribe); return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
@ -209,34 +227,40 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param> /// <param name="method"></param>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <returns></returns> /// <returns></returns>
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe) private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe)
{ {
var parameters = method.GetParameters(); var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0; bool existParameters = parameters.Length > 0;
//dynamic? messageObj= null; List<object>? messageObj = null;
//if (existParameters) if (existParameters)
//{
// var paramType = parameters[0].ParameterType;
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
//}
if (isGenericTask)
{ {
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!; messageObj = new List<object>();
if (result is ISubscribeAck ackResult) var paramType = parameters[0].ParameterType;
foreach (var msg in messages)
{
var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg;
if (data != null)
messageObj.Add(data);
}
}
var result = method.Invoke(subscribe, messageObj?.ToArray());
if (result is Task<ISubscribeAck> genericTask)
{
await genericTask.ConfigureAwait(false);
return genericTask.Result.Ack;
}
else if (result is Task nonGenericTask)
{
await nonGenericTask.ConfigureAwait(false);
return true;
}
else if (result is ISubscribeAck ackResult)
{ {
return ackResult.Ack; return ackResult.Ack;
} }
}
else
{
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
}
return false; return false;
} }

View File

@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class; Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class; Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
} }
} }

View File

@ -126,9 +126,10 @@ namespace JiShe.CollectBus.Kafka.Producer
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{ {
var typeKey = typeof(KafkaProducer<string, TValue>); var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey); var producer = GetProducer<Null, TValue>(typeKey);
var message = new Message<string, TValue> var message = new Message<Null, TValue>
{ {
//Key= _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
@ -183,17 +184,18 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <param name="partition"></param> /// <param name="partition"></param>
/// <param name="deliveryHandler"></param> /// <param name="deliveryHandler"></param>
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
{ {
var message = new Message<string, TValue> var message = new Message<Null, TValue>
{ {
//Key = _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }
}; };
var typeKey = typeof(KafkaProducer<string, TValue>); var typeKey = typeof(KafkaProducer<Null, TValue>);
var producer = GetProducer<string, TValue>(typeKey); var producer = GetProducer<Null, TValue>(typeKey);
if (partition.HasValue) if (partition.HasValue)
{ {
var topicPartition = new TopicPartition(topic, partition.Value); var topicPartition = new TopicPartition(topic, partition.Value);

View File

@ -17,7 +17,7 @@
<PackageReference Include="Volo.Abp.MongoDB" Version="8.3.3" /> <PackageReference Include="Volo.Abp.MongoDB" Version="8.3.3" />
<PackageReference Include="Volo.Abp.BackgroundJobs.MongoDB" Version="8.3.3" /> <PackageReference Include="Volo.Abp.BackgroundJobs.MongoDB" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AuditLogging.MongoDB" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AuditLogging.MongoDB" Version="8.3.3" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -6,6 +6,12 @@
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<Compile Remove="Extensions\**" />
<EmbeddedResource Remove="Extensions\**" />
<None Remove="Extensions\**" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" /> <PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" /> <PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
@ -15,13 +21,9 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Extensions\" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -16,13 +16,13 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent"> <Target Name="PostBuild" AfterTargets="PostBuildEvent">
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.Test.dll $(ProjectDir)..\JiShe.CollectBus.Host\Plugins\" /> <Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.Test.dll $(ProjectDir)..\..\web\JiShe.CollectBus.Host\Plugins\" />
</Target> </Target>
</Project> </Project>

View File

@ -16,13 +16,13 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent"> <Target Name="PostBuild" AfterTargets="PostBuildEvent">
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.dll $(ProjectDir)..\JiShe.CollectBus.Host\Plugins\" /> <Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.dll $(ProjectDir)..\..\web\JiShe.CollectBus.Host\Plugins\" />
</Target> </Target>
</Project> </Project>

Some files were not shown because too many files have changed in this diff Show More