Redis实现特性订阅方式处理

This commit is contained in:
ChenYi 2025-08-21 11:34:16 +08:00
parent 5b0a85b882
commit 51938481e9
15 changed files with 4666 additions and 32 deletions

View File

@ -0,0 +1,93 @@
{
"App": {
"SelfUrl": "http://localhost:44315",
"CorsOrigins": "https://*.IoT.com,http://localhost:4200,http://localhost:3100,http://localhost:80,http://10.10.70.11:4200,http://iot-admin-web:30711"
},
"ConnectionStrings": {
"Default": "Data Source=mysql;Port=3306;Database=JiSheIoTProDB;uid=root;pwd=JiShe!aqG#5kGgh&0;charset=utf8mb4;Allow User Variables=true;AllowLoadLocalInfile=true;"
},
"Hangfire": {
"Redis": {
"Host": "redis:6379,password=1q3J@BGf!yhTaD46nS#",
"DB": "2"
}
},
"Redis": {
"Configuration": "redis:6379,defaultdatabase=5,password=1q3J@BGf!yhTaD46nS#"
},
"Kafka": {
"BootstrapServers": "47.110.62.104:9094,47.110.53.196:9094,47.110.60.222:9094",
"EnableFilter": true,
"EnableAuthorization": false,
"SaslUserName": "lixiao",
"SaslPassword": "lixiao@1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"TaskThreadCount": -1
},
"Pulsar": {
"ServiceUrl": "pulsar://pulsar-broker:9093",
"WebUrl": "http://pulsar-broker:9094",
"UserName": "admin",
"TenantName": "1YMVZZkAkRArjxSD8457",
"Namespace": "OneNET",
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
"PulsarSubscriptionCustomName": "sub",
"EnableTls": false,
"ValidateServerCertificate": false,
"ConnectionTimeout": 30,
"OperationTimeout": 30,
"KeepAliveInterval": 30,
"TaskThreadCount": 1,
"IsSubscriber": true,
"DefaultPartitions": 16,
"DefaultBundles": 16,
"EnableAutoCreation": true, //Topic
"TopicMode": "Static", //Dynamic
"EnableTopicTypeFilter": true, //Topic
"AllowedTopicTypes": [ "Static" ], //Topic
"AllowedClusters": [ "pulsar-cluster-1" ], //
"AdminRoles": [ "admin" ]
},
"IoTDBOptions": {
"UserName": "root",
"Password": "Lixiao@1980",
"TreeModelClusterList": [ "iotdb-standalone:6667" ],
"TableModelClusterList": [ "iotdb-standalone:6667" ],
"PoolSize": 32,
"DataBaseName": "jisheiotdata",
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus01",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",
"AutomaticTerminalVersionTime": "17:07:00",
"AutomaticTelematicsModuleTime": "17:30:00",
"AutomaticDayFreezeTime": "02:30:00",
"AutomaticMonthFreezeTime": "03:30:00",
"DefaultProtocolPlugin": "T37612012ProtocolPlugin",
"VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"DistributedMessage": 2
},
"Jwt": {
"Audience": "JiShe.IoT",
"SecurityKey": "dzehzRz9a8asdfaf43ghVD@d#fasdfaf567sdadfasdf=",
"Issuer": "JiShe.IoT",
"ExpirationTime": 2
},
"FreeRedisOptions": {
"ConnectionString": "redis:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14",
"UseDistributedCache": true
},
"FreeSqlProviderOptions": {
"UsePrepayDB": false,
"UseEnergyDB": false,
"PrintLog": false
},
"OneNETSecureReceiveOptions": {
"OneNETVerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"OneNETAesKey": "RPTEIGCA1KvDEXS1"
}
}

View File

@ -0,0 +1,36 @@
services:
iotdb-standalone:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/iotdb:2.0.4-standalone
hostname: iotdb-standalone
container_name: iotdb-standalone
restart: "no"
deploy:
resources:
limits:
cpus: "8"
memory: 16g
ports:
- "30710:6667"
environment:
- cn_internal_address=iotdb-standalone
- cn_internal_port=10710
- cn_consensus_port=10720
- cn_seed_config_node=iotdb-standalone:10710
- dn_rpc_address=iotdb-standalone
- dn_internal_address=iotdb-standalone
- dn_rpc_port=6667
- dn_internal_port=10730
- dn_mpp_data_exchange_port=10740
- dn_schema_region_consensus_port=10750
- dn_data_region_consensus_port=10760
- dn_seed_config_node=iotdb-standalone:10710
privileged: true
volumes:
- ./iotdb/data:/iotdb/data
- ./iotdb/logs:/iotdb/logs
networks:
- iotdb-net
networks:
iotdb-net:
driver: bridge

View File

@ -0,0 +1,319 @@
services:
zookeeper:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/pulsar:4.0.6
container_name: zookeeper
hostname: zookeeper
restart: unless-stopped
deploy:
resources:
limits:
cpus: "2"
memory: 2g
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/zookeeper.conf &&
bin/pulsar zookeeper"
environment:
- PULSAR_MEM=-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -Djute.maxbuffer=10485760
ports:
- "2181:2181"
networks:
- pulsar-net
volumes:
- pulsar-zookeeper-data:/pulsar/data/zookeeper
healthcheck:
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
pulsar-cluster-init:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/pulsar:4.0.6
container_name: pulsar-cluster-init
hostname: pulsar-cluster-init
command: >
/bin/bash -c
"bin/pulsar initialize-cluster-metadata --cluster pulsar-cluster-1 --zookeeper zookeeper:2181 --configuration-store zookeeper:2181 --web-service-url http://47.110.53.196:9094 --broker-service-url pulsar://47.110.53.196:9093 || echo 'Cluster metadata already exists'"
depends_on:
zookeeper:
condition: service_healthy
networks:
- pulsar-net
bookie:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/pulsar:4.0.6
container_name: pulsar-bookie
hostname: pulsar-bookie
restart: unless-stopped
deploy:
resources:
limits:
cpus: "2"
memory: 2g
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/bookkeeper.conf &&
(bin/bookkeeper shell metaformat -n --force || echo 'Bookie already formatted') &&
bin/pulsar bookie"
environment:
- PULSAR_MEM=-Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g
- BOOKIE_MEM=-Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g
- PULSAR_PREFIX_zkServers=zookeeper:2181
depends_on:
zookeeper:
condition: service_healthy
pulsar-cluster-init:
condition: service_completed_successfully
networks:
- pulsar-net
volumes:
- pulsar-bookkeeper-data:/pulsar/data/bookkeeper
broker:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/pulsar:4.0.6
container_name: pulsar-broker
hostname: pulsar-broker
restart: unless-stopped
deploy:
resources:
limits:
cpus: "2"
memory: 2g
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/broker.conf && bin/pulsar broker"
environment:
- PULSAR_MEM=-Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g
- PULSAR_PREFIX_clusterName=pulsar-cluster-1
- PULSAR_PREFIX_zookeeperServers=zookeeper:2181
- PULSAR_PREFIX_configurationStoreServers=zookeeper:2181
- PULSAR_PREFIX_bookkeeperClientPort=3181
- PULSAR_PREFIX_brokerServicePort=9093
- PULSAR_PREFIX_webServicePort=9094
- PULSAR_PREFIX_advertisedAddress=47.110.53.196
- PULSAR_PREFIX_managedLedgerDefaultEnsembleSize=1
- PULSAR_PREFIX_managedLedgerDefaultWriteQuorum=1
- PULSAR_PREFIX_managedLedgerDefaultAckQuorum=1
- PULSAR_PREFIX_allowAutoTopicCreation=true
# 启用非持久化主题
- PULSAR_PREFIX_enableNonPersistentTopics=true
# 基本认证配置
- PULSAR_PREFIX_authenticationEnabled=true
- PULSAR_PREFIX_authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderBasic
- PULSAR_PREFIX_basicAuthConf=file:///pulsar/auth/.htpasswd
# Broker自身的认证配置
- PULSAR_PREFIX_brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationBasic
- PULSAR_PREFIX_brokerClientAuthenticationParameters={"userId":"admin","password":"0fd7afb8b0d04e6abc4fdfdac2190a79"}
# 授权配置
- PULSAR_PREFIX_authorizationEnabled=false
- PULSAR_PREFIX_allowAutoTopicCreationType=partitioned
- PULSAR_PREFIX_allowAnonymousAccess=false
# 非持久化主题特殊配置
- PULSAR_PREFIX_allowNonPersistentTopics=true
- PULSAR_PREFIX_nonPersistentTopicsEnabled=true
ports:
- "9093:9093"
- "9094:9094"
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
pulsar-cluster-init:
condition: service_completed_successfully
networks:
- pulsar-net
volumes:
- pulsar-broker-data:/pulsar/data
- ./pulsar/auth:/pulsar/auth
pulsar-init:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/pulsar:4.0.6
container_name: pulsar-init
hostname: pulsar-init
command: >
/bin/bash -c
"
# 等待broker就绪
until bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' clusters list; do
echo 'Waiting for Pulsar broker to be ready...'
sleep 5
done;
# 创建租户和命名空间
bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' tenants create public --allowed-clusters pulsar-cluster-1 -r admin || echo 'Tenant public already exists';
bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' namespaces create public/default || echo 'Namespace public/default already exists';
# 创建非持久化主题
bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' topics create non-persistent://public/default/default-topic || echo 'Topic non-persistent://public/default/default-topic already exists';
# 设置命名空间策略
echo 'Setting namespace policies...';
bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' namespaces set-subscription-types-enabled public/default --types NonDurable;
bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' namespaces set-message-ttl public/default --messageTTL 0;
bin/pulsar-admin --admin-url http://pulsar-broker:9094 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationBasic --auth-params '{\"userId\":\"admin\",\"password\":\"0fd7afb8b0d04e6abc4fdfdac2190a79\"}' namespaces set-retention public/default --sizeLimit -1 --timeLimit -1;
echo 'Pulsar initialization completed with Basic Authentication!';
"
depends_on:
- broker
networks:
- pulsar-net
iotdb-standalone-service:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/iotdb:2.0.4-standalone
hostname: iotdb-standalone
container_name: iotdb-standalone
restart: always
deploy:
resources:
limits:
cpus: "8"
memory: 16g
ports:
- "30710:6667"
environment:
- cn_internal_address=iotdb-standalone
- cn_internal_port=10710
- cn_consensus_port=10720
- cn_seed_config_node=iotdb-standalone:10710
- dn_rpc_address=iotdb-standalone
- dn_internal_address=iotdb-standalone
- dn_rpc_port=6667
- dn_internal_port=10730
- dn_mpp_data_exchange_port=10740
- dn_schema_region_consensus_port=10750
- dn_data_region_consensus_port=10760
- dn_seed_config_node=iotdb-standalone:10710
privileged: true
volumes:
- ./iotdb/conf:/iotdb/conf
- ./iotdb/data:/iotdb/data
- ./iotdb/logs:/iotdb/logs
networks:
- pulsar-net
redis-service:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/redis:8.0.2-alpine3.21
container_name: redis
restart: always
hostname: redis
deploy:
resources:
limits:
cpus: "2.0"
memory: 2g
ports:
- "30712:6379"
volumes:
- ./redis/outdata:/data
- ./redis/conf:/etc/redis/conf
command: redis-server /etc/redis/conf/redis.conf
stdin_open: true
tty: true
networks:
- pulsar-net
mysql-service:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/mysql:8.4.6
container_name: mysql
hostname: mysql
restart: always
privileged: true
user: "1000:1000" # 使用宿主机用户ID
ports:
- "13306:3306"
environment:
MYSQL_ROOT_PASSWORD: JiShe!aqG#5kGgh&0
TZ: Asia/Shanghai
volumes:
- ./mysql/log:/var/log/mysql
- ./mysql/data:/var/lib/mysql
# - ./mysql/init:/docker-entrypoint-initdb.d
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
# - --user=mysql # 确保MySQL以mysql用户运行
deploy:
resources:
limits:
cpus: "2.0"
memory: 2g
stdin_open: true
tty: true
networks:
- pulsar-net
admin-api-service:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/jishe.iot.admin:2025.0820.1140
container_name: admin-api
hostname: admin-api
restart: always
deploy:
resources:
limits:
cpus: "2.0"
memory: 4g
ports:
- "28080:10500"
volumes:
- ./adminapi/conf:/app/configs
- ./adminapi/logs:/app/logs
stdin_open: true
tty: true
depends_on:
- mysql-service
- redis-service
- iotdb-standalone-service
- pulsar-init
networks:
- pulsar-net
admin-web-service:
image: registry.cn-qingdao.aliyuncs.com/jisheyun/jishe.iot.ui:2025.0819.2313
container_name: admin-web
hostname: admin-web
restart: always
deploy:
resources:
limits:
cpus: "1.0"
memory: 2g
ports:
- "30711:8080"
stdin_open: true
tty: true
depends_on:
- admin-api-service
networks:
- pulsar-net
networks:
pulsar-net:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.23.0.0/16
gateway: 172.23.0.1
volumes:
pulsar-zookeeper-data:
driver: local
driver_opts:
type: none
o: bind
device: ./pulsar/zookeeper-data
pulsar-bookkeeper-data:
driver: local
driver_opts:
type: none
o: bind
device: ./pulsar/bookkeeper-data
pulsar-broker-data:
driver: local
driver_opts:
type: none
o: bind
device: ./pulsar/pulsar-data

View File

@ -0,0 +1,38 @@
[mysqld]
# 基本配置
user = mysql
port = 3306
basedir = /usr
datadir = /var/lib/mysql
socket = /var/run/mysqld/mysqld.sock
pid-file = /var/run/mysqld/mysqld.pid
# 字符集配置
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
init_connect = 'SET NAMES utf8mb4'
# 基本连接配置
max_connections = 1000
max_connect_errors = 1000
# 日志配置
log-error = /var/log/mysql/error.log
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
# InnoDB基本配置
innodb_buffer_pool_size = 1G
innodb_log_file_size = 256M
innodb_log_buffer_size = 16M
innodb_flush_log_at_trx_commit = 1
# 时区配置
default-time-zone = '+8:00'
[mysql]
default-character-set = utf8mb4
[client]
default-character-set = utf8mb4

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,184 @@
#!/bin/bash
echo "=== 采集端一键启动脚本 ==="
# 检查htpasswd是否安装
if ! command -v htpasswd &> /dev/null; then
echo "❌ htpasswd 未安装,正在安装..."
if command -v apt &> /dev/null; then
sudo apt update && sudo apt install -y apache2-utils
elif command -v yum &> /dev/null; then
sudo yum install -y httpd-tools
else
echo "❌ 无法自动安装htpasswd请手动安装后重试"
exit 1
fi
fi
# 创建认证配置
echo "🔐 创建Pulsar基本认证配置..."
mkdir -p ./pulsar/auth
htpasswd -cmb ./pulsar/auth/.htpasswd admin 0fd7afb8b0d04e6abc4fdfdac2190a79
echo "✅ 认证配置完成 - 用户名: admin, 密码: 0fd7afb8b0d04e6abc4fdfdac2190a79"
# 创建数据目录
echo "📁 创建数据目录..."
sudo mkdir -p /mnt/dockerdata/appservice/{pulsar,redis,mysql,iotdb,adminapi}
sudo mkdir -p /mnt/dockerdata/appservice/pulsar/{zookeeper-data,bookkeeper-data,pulsar-data}
sudo mkdir -p /mnt/dockerdata/appservice/{redis/{outdata,conf},mysql/{conf,log,data},iotdb/{conf,data,logs},adminapi/{conf,logs}}
sudo chown -R 10000:0 /mnt/dockerdata/appservice/pulsar/
# 创建本地目录的软链接(如果不存在)
echo "🔗 创建本地目录链接..."
mkdir -p ./pulsar/{zookeeper-data,bookkeeper-data,pulsar-data,auth}
mkdir -p ./redis/{outdata,conf}
mkdir -p ./mysql/{conf,log,data}
mkdir -p ./iotdb/{conf,data,logs}
mkdir -p ./adminapi/{conf,logs}
# 修复MySQL目录权限
echo "🔧 修复MySQL目录权限..."
sudo chown -R 1000:1000 ./mysql/data
sudo chown -R 1000:1000 ./mysql/log
sudo chmod -R 755 ./mysql/data
sudo chmod -R 755 ./mysql/log
sudo chmod -R 755 ./adminapi/logs
# 第一步启动IoTDB独立服务获取配置
echo "🚀 第一步启动IoTDB独立服务获取配置..."
docker-compose -f docker-compose-iotdb-standalone.yml up -d
# 等待IoTDB启动完成
echo "⏳ 等待IoTDB服务启动完成..."
sleep 10
# 第二步:拷贝配置目录
echo "📁 第二步拷贝IoTDB配置目录..."
docker cp iotdb-standalone:/iotdb/conf ./iotdb
echo "✅ IoTDB配置目录拷贝完成"
# 第三步:修改时间戳精度配置
echo "⚙️ 第三步修改IoTDB时间戳精度配置..."
if [ -f "./iotdb/conf/iotdb-system.properties" ]; then
# 检查是否已经包含配置
if ! grep -q "timestamp_precision=ns" ./iotdb/conf/iotdb-system.properties; then
# 添加换行和说明
echo "" >> ./iotdb/conf/iotdb-system.properties
echo "# ms 毫秒 us 微秒 ns 纳秒" >> ./iotdb/conf/iotdb-system.properties
echo "timestamp_precision=ns" >> ./iotdb/conf/iotdb-system.properties
echo "✅ 时间戳精度配置已添加timestamp_precision=ns"
else
echo " 时间戳精度配置已存在"
fi
else
echo "❌ 配置文件不存在,请检查配置目录"
fi
# 第四步停止IoTDB独立服务
echo "🛑 第四步停止IoTDB独立服务..."
docker-compose -f docker-compose-iotdb-standalone.yml down
# 第五步清理IoTDB数据目录保留配置目录
echo "🧹 第五步清理IoTDB数据目录保留配置目录..."
sudo rm -rf ./iotdb/data/*
sudo rm -rf ./iotdb/logs/*
echo "✅ IoTDB数据目录已清理配置目录已保留"
# MySQL配置检查
#echo "⚙️ 检查MySQL配置..."
#if [ ! -d "./mysql/init" ]; then
# echo "❌ MySQL初始化脚本目录不存在请检查"
# exit 1
#fi
#
#if [ ! -f "./mysql/init/init-db.sql" ]; then
# echo "❌ MySQL初始化脚本不存在请检查"
# exit 1
#fi
#
#echo "✅ MySQL配置检查通过"
sleep 15
# 启动主服务
echo "🚀 启动 部署主脚本 ..."
# 检查是否是Pulsar重启
if [ -d "./pulsar/zookeeper-data" ] && [ "$(ls -A ./pulsar/zookeeper-data)" ]; then
echo "⚠️ 检测到Pulsar数据目录已存在可能是重启操作"
echo "🧹 清理Pulsar状态数据以确保正常启动..."
# 停止可能运行的服务
docker-compose -f docker-compose.yml down
# 清理Pulsar状态数据保留数据目录
echo "清理Zookeeper状态..."
sudo rm -rf ./pulsar/zookeeper-data/*
echo "清理BookKeeper状态..."
sudo rm -rf ./pulsar/bookkeeper-data/*
echo "清理Pulsar broker状态..."
sudo rm -rf ./pulsar/pulsar-data/*
echo "✅ Pulsar状态数据已清理"
sleep 5
fi
docker-compose -f docker-compose.yml up -d
# 等待IoTDB启动完成
echo "⏳ 等待IoTDB服务启动完成..."
sleep 10
# 设置IoTDB密码
echo "🔐 设置IoTDB root用户密码..."
docker exec -i iotdb-standalone /bin/bash << 'EOF'
# 等待IoTDB完全启动
echo "等待IoTDB服务就绪..."
sleep 10
echo "IoTDB服务已就绪开始设置密码和创建数据库..."
# 使用非交互式方式创建表模型数据库
echo "CREATE DATABASE jisheiotdata;" | ./start-cli.sh -h iotdb-standalone -sql_dialect table
# 使用非交互式方式设置密码
echo "ALTER USER root SET PASSWORD 'Lixiao@1980';" | ./start-cli.sh -h iotdb-standalone
# 等待一下再执行确认密码
sleep 2
echo "Lixiao@1980" | ./start-cli.sh -h iotdb-standalone
echo "IoTDB密码设置完成"
EOF
echo "✅ IoTDB密码设置完成 - 用户名: root, 密码: Lixiao@1980"
echo ""
echo "🎉 Pulsar 基本认证集群启动完成!"
echo ""
echo "📋 连接信息:"
echo " Pulsar Broker: 47.110.53.196:9093"
echo " Pulsar Admin: http://47.110.53.196:9094"
echo " IoTDB: 47.110.53.196:30710 (root/Lixiao@1980)"
echo " Redis: 47.110.53.196:6379"
echo " MySQL: 47.110.53.196:13306"
echo " Admin API: http://47.110.53.196:28080"
echo " Admin UI: http://47.110.53.196:30711"
echo " 用户名: admin"
echo " 密码: 0fd7afb8b0d04e6abc4fdfdac2190a79"
echo ""
echo "🔧 常用命令:"
echo " 查看状态: docker-compose -f docker-compose.yml ps"
echo " 查看日志: docker-compose -f docker-compose.yml logs -f"
echo " 停止服务: docker-compose -f docker-compose.yml down"
echo ""
echo "🧪 测试连接:"
echo " Pulsar: curl -u admin:0fd7afb8b0d04e6abc4fdfdac2190a79 http://47.110.53.196:9094/admin/v2/clusters"
echo " Redis: redis-cli -h 47.110.53.196 -p 6379 ping"
echo " MySQL: mysql -h 47.110.53.196 -P 13306 -u root -pJiShe!aqG#5kGgh&0 -e 'SELECT 1'"
echo " IoTDB: docker exec -it iotdb-standalone ./start-cli.sh -h iotdb-standalone -u root -p Lixiao@1980"
echo ""
echo "🎯 等待服务启动完成..."
echo "⏳ 请等待几分钟让所有服务完全启动"

View File

@ -64,9 +64,13 @@
<ItemGroup>
<Compile Remove="Logs\**" />
<Compile Remove="moduleSwagger\**" />
<Content Remove="Logs\**" />
<Content Remove="moduleSwagger\**" />
<EmbeddedResource Remove="Logs\**" />
<EmbeddedResource Remove="moduleSwagger\**" />
<None Remove="Logs\**" />
<None Remove="moduleSwagger\**" />
</ItemGroup>
<ItemGroup>
@ -74,7 +78,6 @@
</ItemGroup>
<ItemGroup>
<Folder Include="moduleSwagger\" />
<Folder Include="UploadFile\" />
</ItemGroup>

View File

@ -76,7 +76,9 @@
"AutomaticDayFreezeTime": "02:30:00",
"AutomaticMonthFreezeTime": "03:30:00",
"DefaultProtocolPlugin": "T37612012ProtocolPlugin",
"VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"SignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"AesSecurityKey": "RPTEIGCA1KvDEXS1",
"IsAesEncrypted": true,
"DistributedMessage": 2
},
"Jwt": {

View File

@ -1,10 +1,12 @@
using JiShe.IoT.OneNETAggregation.Dto;
using JiShe.IoT.Workshops;
using JiShe.ServicePro;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Encrypt;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.Kafka.Consts;
using JiShe.ServicePro.Kafka.Producer;
using JiShe.ServicePro.ServerOptions;
@ -21,7 +23,7 @@ namespace JiShe.IoT.BusinessSystemAggregation
/// <summary>
/// 业务系统聚合服务
/// </summary>
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IKafkaProducerService _producerService, IDeviceAppService deviceAppService) : IoTAppService, IBusinessSystemAggregationService
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService) : IoTAppService, IBusinessSystemAggregationService
{
ServerApplicationOptions srverOptions = options.Value;
@ -34,7 +36,7 @@ namespace JiShe.IoT.BusinessSystemAggregation
try
{
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.VerifySignatureToken);
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.SignatureToken);
if (verifySignatureReult == false)//签名校验失败
{
return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
@ -45,13 +47,27 @@ namespace JiShe.IoT.BusinessSystemAggregation
return HttpDataResultExtensions.Failed("指令下发内容不能为空", -102, ResponeResultEnum.Fail);
}
//查询设备信息,判断设备在哪个平台
var messageBody = input.Message.Deserialize<ReceiveCommandInfoDto>();
//判断是否需要解密
ReceiveCommandInfoDto messageBody = null;
if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey))
{
string tempRaw = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
messageBody = tempRaw.Deserialize<ReceiveCommandInfoDto>();
}
else
{
messageBody = input.Message.Deserialize<ReceiveCommandInfoDto>();
}
if (messageBody == null)
{
return HttpDataResultExtensions.Failed("指令下发内容不能为空", -103, ResponeResultEnum.Fail);
}
//限定来源类型必须为业务系统
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
{
return HttpDataResultExtensions.Failed("设备指令来源类型错误业务系统传固定值2", -103, ResponeResultEnum.Fail);
return HttpDataResultExtensions.Failed("设备指令来源类型错误业务系统传固定值2", -104, ResponeResultEnum.Fail);
}
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
@ -59,23 +75,22 @@ namespace JiShe.IoT.BusinessSystemAggregation
//将指令存储Kafka的OneNET主题中
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
await _producerService.ProduceAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, $"{GuidGenerator.Create()}", input);
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, input);
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
await _producerService.ProduceAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName, $"{GuidGenerator.Create()}", input);
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName, input);
}
else
{
return HttpDataResultExtensions.Failed("指令处理失败,当前设备平台类型异常",-104);
return HttpDataResultExtensions.Failed("指令处理失败,当前设备平台类型异常", -105);
}
return HttpDataResultExtensions.Success("指令下发Kafka成功");
return HttpDataResultExtensions.Success("指令下发成功");
}
catch (Exception)
catch (Exception ex)
{
throw;
return HttpDataResultExtensions.Failed($"指令处理失败,发送异常:{ex.Message}", -106);
}
}
}

View File

@ -39,7 +39,7 @@ namespace JiShe.IoT.CTWingAggregation
{
try
{
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.VerifySignatureToken);
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.SignatureToken);
if (verifySignatureReult == false)//签名校验失败
{
return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
@ -89,7 +89,7 @@ namespace JiShe.IoT.CTWingAggregation
{
try
{
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.VerifySignatureToken);
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.SignatureToken);
if (verifySignatureReult == false)//签名校验失败
{
return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
@ -113,7 +113,7 @@ namespace JiShe.IoT.CTWingAggregation
}
var rawMessage = pageList.Serialize();
var signatureResult = EncryptUtil.OpenApiSignature(rawMessage, srverOptions.VerifySignatureToken);
var signatureResult = EncryptUtil.OpenApiSignature(rawMessage, srverOptions.SignatureToken);
return HttpDataResultExtensions.Success(rawMessage, signatureResult.Item1, signatureResult.Item2);
}

View File

@ -24,8 +24,8 @@ namespace JiShe.IoT.DeviceAggregation
/// <param name="logger"></param>
/// <param name="deviceAppService">设备服务</param>
/// <param name="oneNETDeviceService">OneNET设备服务</param>
/// <param name="producerService">KafKa生产服务</param>
public class DeviceAggregationService(ILogger<DeviceAggregationService> logger, IDeviceAppService deviceAppService, IOneNETDeviceService oneNETDeviceService, IKafkaProducerService producerService) : IoTAppService, IDeviceAggregationService
/// <param name="redisPubSubService">Redis发布订阅服务</param>
public class DeviceAggregationService(ILogger<DeviceAggregationService> logger, IDeviceAppService deviceAppService, IOneNETDeviceService oneNETDeviceService, IReliableRedisPubSubService redisPubSubService) : IoTAppService, IDeviceAggregationService
{
/// <summary>
/// 管理后台创建设备信息
@ -300,7 +300,7 @@ namespace JiShe.IoT.DeviceAggregation
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
await producerService.ProduceAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName, $"{GuidGenerator.Create()}", commandRequest);
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName,commandRequest);
return true;
}
else
@ -599,7 +599,7 @@ namespace JiShe.IoT.DeviceAggregation
throw new UserFriendlyException("设备不在线");
}
await producerService.ProduceAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, $"{GuidGenerator.Create()}", commandRequest);
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, commandRequest);
return true;
}
catch (Exception)

View File

@ -60,8 +60,8 @@ namespace JiShe.IoT
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
commonService.InitSelectTypetList();
var communicationChannelService = context.ServiceProvider.GetRequiredService<IServiceCommunicationChannelSubscriberService>();
communicationChannelService.ServiceCommunicationDeviceStatusSubscriber();
var issueSubscriberService = context.ServiceProvider.GetRequiredService<IOneNetIssueSubscriberService>();
issueSubscriberService.IssueCommandRedisSubscriber();
}
}
}

View File

@ -22,7 +22,7 @@ namespace JiShe.IoT.OneNETAggregation
/// <param name="logger"></param>
public class OneNETAggregationService(IDeviceAggregationService deviceAggregationService, IOptions<ServerApplicationOptions> options, ILogger<OneNETAggregationService> logger) : IoTAppService, IOneNETAggregationService
{
ServerApplicationOptions srverOptions = options.Value;
ServerApplicationOptions serverOptions = options.Value;
/// <summary>
/// 接收车间生产信息
@ -35,14 +35,23 @@ namespace JiShe.IoT.OneNETAggregation
try
{
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.VerifySignatureToken);
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, serverOptions.SignatureToken);
if (verifySignatureReult == false)//签名校验失败
{
return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
}
ProductionEquipmentMessageBody productionEquipmentMessageBody = input.Message.Deserialize<ProductionEquipmentMessageBody>();
ProductionEquipmentMessageBody productionEquipmentMessageBody = null;
if (serverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(serverOptions.AesSecurityKey))
{
string tempRaw = EncryptUtil.OpenApiDecrypto(input.Message, serverOptions.AesSecurityKey);
productionEquipmentMessageBody = tempRaw.Deserialize<ProductionEquipmentMessageBody>();
}
else
{
productionEquipmentMessageBody = input.Message.Deserialize<ProductionEquipmentMessageBody>();
}
if (string.IsNullOrWhiteSpace(productionEquipmentMessageBody.IoTPlatformProductId))
{
@ -89,12 +98,18 @@ namespace JiShe.IoT.OneNETAggregation
try
{
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.VerifySignatureToken);
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, serverOptions.SignatureToken);
if (verifySignatureReult == false)//签名校验失败
{
return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
}
if (serverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(serverOptions.AesSecurityKey))
{
string tempRaw = EncryptUtil.OpenApiDecrypto(input.Message, serverOptions.AesSecurityKey);
}
var pageListQuery = FreeSqlDbContext.Instance.Select<OneNETProductInfos>()
.Where(e => e.IsEnabled == true)
.OrderByDescending(e => e.CreationTime);
@ -114,7 +129,7 @@ namespace JiShe.IoT.OneNETAggregation
}
var rawMessage = pageList.Serialize();
var signatureResult = EncryptUtil.OpenApiSignature(rawMessage, srverOptions.VerifySignatureToken);
var signatureResult = EncryptUtil.OpenApiSignature(rawMessage, serverOptions.SignatureToken);
return HttpDataResultExtensions.Success(rawMessage, signatureResult.Item1, signatureResult.Item2);
}

View File

@ -49,14 +49,14 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
try
{
_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
//_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult();
return true;
}
catch (Exception ex)
{
_logger.LogWarning($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
_logger.LogError($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message);
return false;
}