Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 

7.7 KiB

Kafka 模式切换说明

本文档说明 emp-test / emp-uat 如何在两种 Kafka 链路之间切换:

  • 甲方 Kafka 转发方案
  • 部署包自带 Kafka 方案

默认推荐使用甲方 Kafka 转发方案。自带 Kafka 主要用于甲方转发链路未就绪、联调排查或临时内测。

两种链路的区别

甲方 Kafka 转发方案

模拟器先把数据推到甲方接收 topic,甲方服务消费后再转发到 EMP 后端消费 topic。

emp-ws 模拟器 -> 甲方接收 topic -> 甲方转发服务 -> EMP 后端消费 topic -> emp-data

此模式下,模拟器推送 topic 和后端消费 topic 不相同。

环境 Broker 模拟器推送 topic 后端消费 topic 消费组
emp-test ip-cld.cn:29362 test-vehicle-real-data YuanJing-test-vehicle-mock-data emp-test-data-group
emp-uat ip-cld.cn:29362 uat-vehicle-real-data YuanJing-uat-vehicle-mock-data emp-uat-data-group

自带 Kafka 方案

模拟器和后端都连本环境 Docker 内网里的 Kafka。

emp-ws 模拟器 -> kafka:9092 -> emp-data

此模式下没有甲方转发服务,所以模拟器推送 topic 和后端消费 topic 必须相同。

环境 Broker 模拟器推送 topic 后端消费 topic 消费组
emp-test kafka:9092 YuanJing-test-vehicle-mock-data YuanJing-test-vehicle-mock-data emp-test-data-group
emp-uat kafka:9092 YuanJing-uat-vehicle-mock-data YuanJing-uat-vehicle-mock-data emp-uat-data-group

切换前准备

进入目标环境 runtime 目录,并定义 dc

# test
cd /home/admin-x99/emp/emp-test/runtime
dc() { docker compose --env-file .env -f docker-compose.yml -p emp-test "$@"; }
# uat
cd /home/admin-x99/emp/emp-uat/runtime
dc() { docker compose --env-file .env -f docker-compose.yml -p emp-uat "$@"; }

建议每次切换前备份 .env

cp -a .env ".env.bak.kafka.$(date +%Y%m%d%H%M%S)"

下面命令会用到一个写 .env 的辅助函数:

set_env() {
  key="$1"
  val="$2"
  if grep -q "^${key}=" .env; then
    sed -i "s|^${key}=.*|${key}=${val}|" .env
  else
    printf '\n%s=%s\n' "$key" "$val" >> .env
  fi
}

emp-test 切换为自带 Kafka

set_env KAFKA_BROKERS kafka:9092
set_env KAFKA_TOPIC YuanJing-test-vehicle-mock-data
set_env KAFKA_GROUP_ID emp-test-data-group
set_env KAFKA_USER ''
set_env KAFKA_PWD ''

set_env SIMULATOR_KAFKA_BROKERS kafka:9092
set_env SIMULATOR_KAFKA_TOPIC YuanJing-test-vehicle-mock-data
set_env SIMULATOR_KAFKA_USER ''
set_env SIMULATOR_KAFKA_PASSWORD ''
set_env SIMULATOR_KAFKA_CLIENT_ID emp-test-simulator

启动自带 Kafka,并创建 topic:

dc --profile local-kafka up -d kafka
sleep 10
dc --profile local-kafka run --rm kafka-init

重建读取 Kafka 配置的服务:

dc up -d --force-recreate emp-ws emp-data

如果访问 /simulator/ 出现 502,通常是 emp-admin Nginx 缓存了旧的 emp-ws 地址,重启 emp-admin 即可:

dc restart emp-admin

emp-test 切换为甲方 Kafka 转发方案

set_env KAFKA_BROKERS ip-cld.cn:29362
set_env KAFKA_TOPIC YuanJing-test-vehicle-mock-data
set_env KAFKA_GROUP_ID emp-test-data-group
set_env KAFKA_USER ''
set_env KAFKA_PWD ''

set_env SIMULATOR_KAFKA_BROKERS ip-cld.cn:29362
set_env SIMULATOR_KAFKA_TOPIC test-vehicle-real-data
set_env SIMULATOR_KAFKA_USER ''
set_env SIMULATOR_KAFKA_PASSWORD ''
set_env SIMULATOR_KAFKA_CLIENT_ID emp-test-simulator

重建读取 Kafka 配置的服务:

dc up -d --force-recreate emp-ws emp-data
dc restart emp-admin

自带 Kafka 可以保留运行,也可以停止:

dc --profile local-kafka stop kafka kafka-init

停止自带 Kafka 不会影响甲方转发方案,因为服务已经改为连接 ip-cld.cn:29362

emp-uat 切换为自带 Kafka

set_env KAFKA_BROKERS kafka:9092
set_env KAFKA_TOPIC YuanJing-uat-vehicle-mock-data
set_env KAFKA_GROUP_ID emp-uat-data-group
set_env KAFKA_USER ''
set_env KAFKA_PWD ''

set_env SIMULATOR_KAFKA_BROKERS kafka:9092
set_env SIMULATOR_KAFKA_TOPIC YuanJing-uat-vehicle-mock-data
set_env SIMULATOR_KAFKA_USER ''
set_env SIMULATOR_KAFKA_PASSWORD ''
set_env SIMULATOR_KAFKA_CLIENT_ID emp-uat-simulator

启动自带 Kafka,并创建 topic:

dc --profile local-kafka up -d kafka
sleep 10
dc --profile local-kafka run --rm kafka-init

重建读取 Kafka 配置的服务:

dc up -d --force-recreate emp-ws emp-data
dc restart emp-admin

emp-uat 切换为甲方 Kafka 转发方案

set_env KAFKA_BROKERS ip-cld.cn:29362
set_env KAFKA_TOPIC YuanJing-uat-vehicle-mock-data
set_env KAFKA_GROUP_ID emp-uat-data-group
set_env KAFKA_USER ''
set_env KAFKA_PWD ''

set_env SIMULATOR_KAFKA_BROKERS ip-cld.cn:29362
set_env SIMULATOR_KAFKA_TOPIC uat-vehicle-real-data
set_env SIMULATOR_KAFKA_USER ''
set_env SIMULATOR_KAFKA_PASSWORD ''
set_env SIMULATOR_KAFKA_CLIENT_ID emp-uat-simulator

重建读取 Kafka 配置的服务:

dc up -d --force-recreate emp-ws emp-data
dc restart emp-admin

自带 Kafka 可以保留运行,也可以停止:

dc --profile local-kafka stop kafka kafka-init

验证配置是否生效

查看模拟器 Kafka 配置:

dc exec emp-ws sh -lc 'echo SIMULATOR_KAFKA_BROKERS=$SIMULATOR_KAFKA_BROKERS; echo SIMULATOR_KAFKA_TOPIC=$SIMULATOR_KAFKA_TOPIC; echo SIMULATOR_KAFKA_CLIENT_ID=$SIMULATOR_KAFKA_CLIENT_ID'

查看后端消费配置:

dc exec emp-data sh -lc 'echo KAFKA_BROKERS=$KAFKA_BROKERS; echo KAFKA_TOPIC=$KAFKA_TOPIC; echo KAFKA_GROUP_ID=$KAFKA_GROUP_ID'

查看服务状态:

dc ps kafka emp-ws emp-data emp-admin

自带 Kafka 模式下可以查看 topic:

dc exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --list

查看模拟器发送日志:

dc logs -f --tail=200 emp-ws | grep -E --line-buffered '\[simulator\]|Kafka|kafka|tick failed|Error|error'

查看后端消费日志:

dc logs -f --tail=200 emp-data | grep -E --line-buffered 'Kafka|kafka|ConsumerRecord|vehicle|异常|error|Exception'

如果看到 emp-ws 持续输出类似:

[simulator] ... 上报=...

说明模拟器 producer 已经发送成功。

如果 emp-data 输出类似:

Kafka批量消费车辆数据

说明后端已经消费到 Kafka 消息。后续如果报 TDengine写入失败,说明 Kafka 链路已通,问题在 TDengine 表结构、密码或写入 SQL,不再是 Kafka 问题。

常见问题

切换后 .env 已修改但容器里还是旧配置

.env 修改不会自动进入已运行容器,必须重建读取该配置的服务:

dc up -d --force-recreate emp-ws emp-data

切换后 /simulator/ 502

通常是 emp-admin Nginx 缓存了重建前的 emp-ws 容器 IP:

dc restart emp-admin

自带 Kafka 模式下 emp-data 不启动

emp-data 还依赖 MySQL、Redis、Nacos、TDengine。如果 Compose 提示等待 tdengine,先检查 TDengine 健康状态:

dc ps tdengine
dc logs --tail=80 tdengine

这类问题不是 Kafka 问题。

甲方转发方案下模拟器发送成功但后端没消费

先确认 EMP 配置:

dc exec emp-ws sh -lc 'echo $SIMULATOR_KAFKA_BROKERS; echo $SIMULATOR_KAFKA_TOPIC'
dc exec emp-data sh -lc 'echo $KAFKA_BROKERS; echo $KAFKA_TOPIC; echo $KAFKA_GROUP_ID'

如果配置正确,说明需要甲方检查转发服务:

源 topic 是否收到消息
源 topic 到目标 topic 的转发规则是否启用
topic 名是否完全一致
是否有车辆关系、权限、过滤规则导致数据被丢弃