Compare commits

...

16 Commits

405 changed files with 598 additions and 1566 deletions

5
.gitignore vendored
View File

@ -400,5 +400,6 @@ FodyWeavers.xsd
# ABP Studio
**/.abpstudio/
/src/JiShe.CollectBus.Host/Plugins/*.dll
JiShe.CollectBus.Kafka.Test
/web/JiShe.CollectBus.Host/Plugins/*.dll
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.dll
/web/JiShe.CollectBus.Host/Plugins/JiShe.CollectBus.Protocol.Test.dll

View File

@ -3,41 +3,49 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.9.34728.123
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "src\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain.Shared", "shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj", "{D64C1577-4929-4B60-939E-96DE1534891A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "src\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Domain", "services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj", "{F2840BC7-0188-4606-9126-DADD0F5ABF7A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "src\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application.Contracts", "services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj", "{BD65D04F-08D5-40C1-8C24-77CA0BACB877}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "src\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Application", "services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj", "{78040F9E-3501-4A40-82DF-00A597710F35}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{649A3FFA-182F-4E56-9717-E6A9A2BEC545}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.MongoDB", "src\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{F1C58097-4C08-4D88-8976-6B3389391481}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "web\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.HttpApi", "src\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj", "{077AA5F8-8B61-420C-A6B5-0150A66FDB34}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "web\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Host", "src\JiShe.CollectBus.Host\JiShe.CollectBus.Host.csproj", "{35829A15-4127-4F69-8BDE-9405DEAACA9A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Common", "src\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj", "{AD2F1928-4411-4511-B564-5FB996EC08B9}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol", "src\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj", "{C62EFF95-5C32-435F-BD78-6977E828F894}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Contracts", "src\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj", "{38C1808B-009A-418B-B17B-AB3626341B5D}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "services\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator", "src\JiShe.CollectBus.DbMigrator\JiShe.CollectBus.DbMigrator.csproj", "{8BA01C3D-297D-42DF-BD63-EF07202A0A67}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "modules\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "src\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeRedis", "modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Kafka", "modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.IoTDB", "modules\JiShe.CollectBus.IoTDB\JiShe.CollectBus.IoTDB.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.Test", "protocols\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Cassandra", "modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "1.Web", "1.Web", "{A02F7D8A-04DC-44D6-94D4-3F65712D6B94}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "4.Modules", "4.Modules", "{2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "3.Protocols", "3.Protocols", "{3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services", "{BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -118,23 +126,23 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{D64C1577-4929-4B60-939E-96DE1534891A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{78040F9E-3501-4A40-82DF-00A597710F35} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{F1C58097-4C08-4D88-8976-6B3389391481} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{C62EFF95-5C32-435F-BD78-6977E828F894} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{38C1808B-009A-418B-B17B-AB3626341B5D} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{D64C1577-4929-4B60-939E-96DE1534891A} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
{F2840BC7-0188-4606-9126-DADD0F5ABF7A} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{BD65D04F-08D5-40C1-8C24-77CA0BACB877} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{78040F9E-3501-4A40-82DF-00A597710F35} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{F1C58097-4C08-4D88-8976-6B3389391481} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{077AA5F8-8B61-420C-A6B5-0150A66FDB34} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
{35829A15-4127-4F69-8BDE-9405DEAACA9A} = {A02F7D8A-04DC-44D6-94D4-3F65712D6B94}
{AD2F1928-4411-4511-B564-5FB996EC08B9} = {EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}
{C62EFF95-5C32-435F-BD78-6977E828F894} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{38C1808B-009A-418B-B17B-AB3626341B5D} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {BA4DA3E7-9AD0-47AD-A0E6-A0BB6700DA23}
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{C06C4082-638F-2996-5FED-7784475766C1} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{F0288175-F0EC-48BD-945F-CF1512850943} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -3,7 +3,6 @@ using Cassandra.Data.Linq;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Extensions;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDBProvider;
using Microsoft.AspNetCore.Http;
using System.Reflection;
using Thrift.Protocol.Entities;

View File

@ -15,8 +15,8 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -1,9 +1,9 @@
using JiShe.CollectBus.FreeRedisProvider.Options;
using JiShe.CollectBus.FreeRedis.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.FreeRedisProvider
namespace JiShe.CollectBus.FreeRedis
{
public class CollectBusFreeRedisModule : AbpModule
{

View File

@ -1,16 +1,11 @@
using FreeRedis;
using System.Diagnostics;
using FreeRedis;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedisProvider.Options;
using JiShe.CollectBus.FreeRedis.Options;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text.Json;
using Volo.Abp.DependencyInjection;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Collections.Concurrent;
namespace JiShe.CollectBus.FreeRedisProvider
namespace JiShe.CollectBus.FreeRedis
{
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency

View File

@ -1,7 +1,6 @@
using FreeRedis;
using JiShe.CollectBus.Common.Models;
namespace JiShe.CollectBus.FreeRedisProvider
namespace JiShe.CollectBus.FreeRedis
{
public interface IFreeRedisProvider
{

View File

@ -10,6 +10,6 @@
<PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.FreeRedisProvider.Options
namespace JiShe.CollectBus.FreeRedis.Options
{
public class FreeRedisOptions
{

View File

@ -0,0 +1,10 @@
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// Column分类标记特性ATTRIBUTE字段,也就是属性字段
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class ATTRIBUTEColumnAttribute : System.Attribute
{
}
}

View File

@ -0,0 +1,10 @@
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// Column分类标记特性FIELD字段数据列字段
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class FIELDColumnAttribute : System.Attribute
{
}
}

View File

@ -1,16 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// 用于标识当前实体为单侧点模式单侧点模式只有一个Filed标识字段,类型是Tuple<string,object>,Item1=>测点名称Item2=>测点值,泛型
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class SingleMeasuringAttribute : Attribute
public class SingleMeasuringAttribute : System.Attribute
{
public string FieldName { get; set;}

View File

@ -0,0 +1,10 @@
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// Column分类标记特性TAG字段标签字段
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class TAGColumnAttribute : System.Attribute
{
}
}

View File

@ -1,17 +1,12 @@
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Modularity;
using static Thrift.Server.TThreadPoolAsyncServer;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB
{
public class CollectBusIoTDBModule : AbpModule
{

View File

@ -1,11 +1,7 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Options;
namespace JiShe.CollectBus.IoTDBProvider.Context
namespace JiShe.CollectBus.IoTDB.Context
{
/// <summary>
/// IoTDB SessionPool 运行时上下文

View File

@ -1,6 +1,8 @@
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Interface
{
/// <summary>
/// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置

View File

@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Interface
namespace JiShe.CollectBus.IoTDB.Interface
{
/// <summary>
/// Session 工厂接口

View File

@ -1,11 +1,6 @@
using Apache.IoTDB.DataStructure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Interface
namespace JiShe.CollectBus.IoTDB.Interface
{
/// <summary>
/// Session 连接池

View File

@ -11,6 +11,6 @@
<PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Options
{
/// <summary>
/// IOTDB配置

View File

@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Options
{
/// <summary>
/// 查询条件

View File

@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Options
{
/// <summary>
/// 查询条件

View File

@ -1,11 +1,6 @@
using Apache.IoTDB;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// 设备元数据

View File

@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// 设备路径构建器

View File

@ -1,16 +1,16 @@
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Reflection;
using System.Text;
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// IoTDB数据源

View File

@ -1,13 +1,9 @@
using JiShe.CollectBus.IoTDBProvider.Interface;
using System.Collections.Concurrent;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Provider
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>

View File

@ -1,4 +1,6 @@
namespace JiShe.CollectBus.IoTDBProvider
using JiShe.CollectBus.IoTDB.Attribute;
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// IoT实体基类

View File

@ -1,14 +1,10 @@
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IoTDBProvider.Interface;
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider.Provider
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// 树模型连接池

View File

@ -1,14 +1,10 @@
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using JiShe.CollectBus.IoTDBProvider.Interface;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider.Provider
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// 表模型Session连接池

View File

@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary>
/// 订阅的主题
/// </summary>
public string Topic { get; set; }
public string Topic { get; set; } = null!;
/// <summary>
/// 分区
@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary>
/// 消费者组
/// </summary>
public string GroupId { get; set; }
public string GroupId { get; set; } = "default";
/// <summary>
/// 任务数(默认是多少个分区多少个任务)
@ -42,35 +42,27 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary>
/// 批次超时时间
/// 格式:("00:05:00")
/// </summary>
public TimeSpan? BatchTimeout { get; set; }=null;
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
/// <param name="batchTimeout"></param>
public KafkaSubscribeAttribute(string topic)
{
this.Topic = topic;
this.GroupId = groupId;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
public KafkaSubscribeAttribute(string topic, int partition)
{
this.Topic = topic;
this.Partition = partition;
this.GroupId = groupId;
this.TaskCount = 1;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
}
}
}

View File

@ -39,15 +39,17 @@ namespace JiShe.CollectBus.Kafka
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>();
context.Services.AddHostedService<HostedService>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
// 注册Subscriber
app.ApplicationServices.UseKafkaSubscribers();
//app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
}

View File

@ -1,4 +1,5 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -50,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
//AllowAutoCreateTopics= true, // 启用自动创建
AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
};
@ -105,27 +106,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<TKey, TValue>(groupId),
// cts
//)).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<TKey, TValue>(groupId);
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
await Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
//consumer.Commit(result); // 手动提交
continue;
}
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
@ -151,7 +151,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
}
}
});
@ -170,57 +170,74 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var cts = new CancellationTokenSource();
try {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var cts = new CancellationTokenSource();
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
//{
// string ssss = "";
//}
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<string, TValue>(groupId),
// cts
//)).Consumer as IConsumer<string, TValue>;
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
var consumer = CreateConsumer<Ignore, TValue>(groupId);
consumer!.Subscribe(topics);
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
_ = Task.Run(async () =>
{
try
int count = 0;
while (!cts.IsCancellationRequested)
{
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
try
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
//consumer.Commit(result); // 手动提交
continue;
}
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
count++;
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
await Task.Delay(500, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
}
});
});
} catch (Exception ex)
{
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
}
await Task.CompletedTask;
}
@ -339,7 +356,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
@ -385,14 +402,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
{
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
CreateConsumer<string, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
)).Consumer as IConsumer<string, TValue>;
consumer!.Subscribe(topics);

View File

@ -0,0 +1,45 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class HostedService : IHostedService, IDisposable
{
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
public HostedService(ILogger<HostedService> logger, IServiceProvider provider)
{
_logger = logger;
_provider = provider;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(() =>
{
_provider.UseKafkaSubscribe();
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
public void Dispose()
{
}
}
}

View File

@ -13,7 +13,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -1,6 +1,7 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
@ -10,7 +11,10 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System.Collections.Generic;
using System.Reflection;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
@ -21,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IServiceProvider provider)
public static async Task UseKafkaSubscribe(this IServiceProvider provider)
{
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
@ -36,10 +40,10 @@ namespace JiShe.CollectBus.Kafka
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
List<Task> tasks = new List<Task>();
//lifetime.ApplicationStarted.Register(async() =>
//{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
@ -62,14 +66,15 @@ namespace JiShe.CollectBus.Kafka
!type.IsAbstract && !type.IsInterface).ToList(); ;
if (subscribeTypes.Count == 0)
continue;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe =>
subscribes.ForEach(async subscribe =>
{
if (subscribe!=null)
{
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
@ -77,58 +82,59 @@ namespace JiShe.CollectBus.Kafka
}
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
//});
await Task.WhenAll(tasks);
}
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
//public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
//{
// var provider = app.ApplicationServices;
// var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
// //初始化主题信息
// var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
// var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
// List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
// topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
// foreach (var item in topics)
// {
// kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
// }
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
// lifetime.ApplicationStarted.Register(async () =>
// {
// var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
// int threadCount = 0;
// int topicCount = 0;
// var subscribeTypes = assembly.GetTypes()
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
// .ToList();
if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => {
// if (subscribeTypes.Count == 0) return;
// foreach (var subscribeType in subscribeTypes)
// {
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(async subscribe => {
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
// if (subscribe != null)
// {
// Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
// }
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
// });
//}
/// <summary>
/// 构建Kafka订阅
/// </summary>
/// <param name="subscribe"></param>
/// <param name="provider"></param>
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig, List<Task> tasks)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -136,20 +142,26 @@ namespace JiShe.CollectBus.Kafka
.ToArray();
//var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0;
foreach (var sub in subscribedMethods)
{
int partitionCount = kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService<IAdminClientService>();
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
#if DEBUG
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
#endif
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)
{
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
threadCount++;
}
}
return Tuple.Create(threadCount, subscribedMethods.Length);
return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length));
}
/// <summary>
@ -166,12 +178,15 @@ namespace JiShe.CollectBus.Kafka
if (attr.EnableBatch)
{
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
{
try
{
#if DEBUG
logger.LogInformation($"kafka批量消费消息:{message}");
#endif
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
return await ProcessMessageAsync(message.ToList(), method, subscribe);
}
catch (ConsumeException ex)
{
@ -179,16 +194,19 @@ namespace JiShe.CollectBus.Kafka
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
}, attr.GroupId, attr.BatchSize,attr.BatchTimeout);
}, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
}
else
{
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
{
try
{
#if DEBUG
logger.LogInformation($"kafka消费消息:{message}");
#endif
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
}
catch (ConsumeException ex)
{
@ -198,7 +216,7 @@ namespace JiShe.CollectBus.Kafka
return await Task.FromResult(false);
}, attr.GroupId);
}
}
@ -209,36 +227,42 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param>
/// <param name="subscribe"></param>
/// <returns></returns>
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
//dynamic? messageObj= null;
//if (existParameters)
//{
// var paramType = parameters[0].ParameterType;
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
//}
if (isGenericTask)
List<object>? messageObj = null;
if (existParameters)
{
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
if (result is ISubscribeAck ackResult)
messageObj = new List<object>();
var paramType = parameters[0].ParameterType;
foreach (var msg in messages)
{
return ackResult.Ack;
var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg;
if (data != null)
messageObj.Add(data);
}
}
else
var result = method.Invoke(subscribe, messageObj?.ToArray());
if (result is Task<ISubscribeAck> genericTask)
{
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
await genericTask.ConfigureAwait(false);
return genericTask.Result.Ack;
}
else if (result is Task nonGenericTask)
{
await nonGenericTask.ConfigureAwait(false);
return true;
}
else if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
return false;
}
}
}

View File

@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
}
}

View File

@ -126,9 +126,10 @@ namespace JiShe.CollectBus.Kafka.Producer
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{
var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey);
var message = new Message<string, TValue>
var producer = GetProducer<Null, TValue>(typeKey);
var message = new Message<Null, TValue>
{
//Key= _kafkaOptionConfig.ServerTagName,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
@ -183,17 +184,18 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
{
var message = new Message<string, TValue>
var message = new Message<Null, TValue>
{
//Key = _kafkaOptionConfig.ServerTagName,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
}
};
var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey);
var typeKey = typeof(KafkaProducer<Null, TValue>);
var producer = GetProducer<Null, TValue>(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);

View File

@ -17,7 +17,7 @@
<PackageReference Include="Volo.Abp.MongoDB" Version="8.3.3" />
<PackageReference Include="Volo.Abp.BackgroundJobs.MongoDB" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AuditLogging.MongoDB" Version="8.3.3" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
</ItemGroup>
</Project>

View File

@ -6,6 +6,12 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Compile Remove="Extensions\**" />
<EmbeddedResource Remove="Extensions\**" />
<None Remove="Extensions\**" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
@ -15,13 +21,9 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Extensions\" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -16,13 +16,13 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent">
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.Test.dll $(ProjectDir)..\JiShe.CollectBus.Host\Plugins\" />
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.Test.dll $(ProjectDir)..\..\web\JiShe.CollectBus.Host\Plugins\" />
</Target>
</Project>

View File

@ -16,13 +16,13 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent">
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.dll $(ProjectDir)..\JiShe.CollectBus.Host\Plugins\" />
<Exec Command="copy $(TargetDir)JiShe.CollectBus.Protocol.dll $(ProjectDir)..\..\web\JiShe.CollectBus.Host\Plugins\" />
</Target>
</Project>

Some files were not shown because too many files have changed in this diff Show More