| @@ -0,0 +1,307 @@ | |||
| # Kafka 模式切换说明 | |||
| 本文档说明 `emp-test` / `emp-uat` 如何在两种 Kafka 链路之间切换: | |||
| - 甲方 Kafka 转发方案 | |||
| - 部署包自带 Kafka 方案 | |||
| 默认推荐使用甲方 Kafka 转发方案。自带 Kafka 主要用于甲方转发链路未就绪、联调排查或临时内测。 | |||
| ## 两种链路的区别 | |||
| ### 甲方 Kafka 转发方案 | |||
| 模拟器先把数据推到甲方接收 topic,甲方服务消费后再转发到 EMP 后端消费 topic。 | |||
| ```text | |||
| emp-ws 模拟器 -> 甲方接收 topic -> 甲方转发服务 -> EMP 后端消费 topic -> emp-data | |||
| ``` | |||
| 此模式下,模拟器推送 topic 和后端消费 topic 不相同。 | |||
| | 环境 | Broker | 模拟器推送 topic | 后端消费 topic | 消费组 | | |||
| | --- | --- | --- | --- | --- | | |||
| | emp-test | `ip-cld.cn:29362` | `test-vehicle-real-data` | `YuanJing-test-vehicle-mock-data` | `emp-test-data-group` | | |||
| | emp-uat | `ip-cld.cn:29362` | `uat-vehicle-real-data` | `YuanJing-uat-vehicle-mock-data` | `emp-uat-data-group` | | |||
| ### 自带 Kafka 方案 | |||
| 模拟器和后端都连本环境 Docker 内网里的 Kafka。 | |||
| ```text | |||
| emp-ws 模拟器 -> kafka:9092 -> emp-data | |||
| ``` | |||
| 此模式下没有甲方转发服务,所以模拟器推送 topic 和后端消费 topic 必须相同。 | |||
| | 环境 | Broker | 模拟器推送 topic | 后端消费 topic | 消费组 | | |||
| | --- | --- | --- | --- | --- | | |||
| | emp-test | `kafka:9092` | `YuanJing-test-vehicle-mock-data` | `YuanJing-test-vehicle-mock-data` | `emp-test-data-group` | | |||
| | emp-uat | `kafka:9092` | `YuanJing-uat-vehicle-mock-data` | `YuanJing-uat-vehicle-mock-data` | `emp-uat-data-group` | | |||
| ## 切换前准备 | |||
| 进入目标环境 runtime 目录,并定义 `dc`: | |||
| ```bash | |||
| # test | |||
| cd /home/admin-x99/emp/emp-test/runtime | |||
| dc() { docker compose --env-file .env -f docker-compose.yml -p emp-test "$@"; } | |||
| ``` | |||
| ```bash | |||
| # uat | |||
| cd /home/admin-x99/emp/emp-uat/runtime | |||
| dc() { docker compose --env-file .env -f docker-compose.yml -p emp-uat "$@"; } | |||
| ``` | |||
| 建议每次切换前备份 `.env`: | |||
| ```bash | |||
| cp -a .env ".env.bak.kafka.$(date +%Y%m%d%H%M%S)" | |||
| ``` | |||
| 下面命令会用到一个写 `.env` 的辅助函数: | |||
| ```bash | |||
| set_env() { | |||
| key="$1" | |||
| val="$2" | |||
| if grep -q "^${key}=" .env; then | |||
| sed -i "s|^${key}=.*|${key}=${val}|" .env | |||
| else | |||
| printf '\n%s=%s\n' "$key" "$val" >> .env | |||
| fi | |||
| } | |||
| ``` | |||
| ## emp-test 切换为自带 Kafka | |||
| ```bash | |||
| set_env KAFKA_BROKERS kafka:9092 | |||
| set_env KAFKA_TOPIC YuanJing-test-vehicle-mock-data | |||
| set_env KAFKA_GROUP_ID emp-test-data-group | |||
| set_env KAFKA_USER '' | |||
| set_env KAFKA_PWD '' | |||
| set_env SIMULATOR_KAFKA_BROKERS kafka:9092 | |||
| set_env SIMULATOR_KAFKA_TOPIC YuanJing-test-vehicle-mock-data | |||
| set_env SIMULATOR_KAFKA_USER '' | |||
| set_env SIMULATOR_KAFKA_PASSWORD '' | |||
| set_env SIMULATOR_KAFKA_CLIENT_ID emp-test-simulator | |||
| ``` | |||
| 启动自带 Kafka,并创建 topic: | |||
| ```bash | |||
| dc --profile local-kafka up -d kafka | |||
| sleep 10 | |||
| dc --profile local-kafka run --rm kafka-init | |||
| ``` | |||
| 重建读取 Kafka 配置的服务: | |||
| ```bash | |||
| dc up -d --force-recreate emp-ws emp-data | |||
| ``` | |||
| 如果访问 `/simulator/` 出现 502,通常是 `emp-admin` Nginx 缓存了旧的 `emp-ws` 地址,重启 `emp-admin` 即可: | |||
| ```bash | |||
| dc restart emp-admin | |||
| ``` | |||
| ## emp-test 切换为甲方 Kafka 转发方案 | |||
| ```bash | |||
| set_env KAFKA_BROKERS ip-cld.cn:29362 | |||
| set_env KAFKA_TOPIC YuanJing-test-vehicle-mock-data | |||
| set_env KAFKA_GROUP_ID emp-test-data-group | |||
| set_env KAFKA_USER '' | |||
| set_env KAFKA_PWD '' | |||
| set_env SIMULATOR_KAFKA_BROKERS ip-cld.cn:29362 | |||
| set_env SIMULATOR_KAFKA_TOPIC test-vehicle-real-data | |||
| set_env SIMULATOR_KAFKA_USER '' | |||
| set_env SIMULATOR_KAFKA_PASSWORD '' | |||
| set_env SIMULATOR_KAFKA_CLIENT_ID emp-test-simulator | |||
| ``` | |||
| 重建读取 Kafka 配置的服务: | |||
| ```bash | |||
| dc up -d --force-recreate emp-ws emp-data | |||
| dc restart emp-admin | |||
| ``` | |||
| 自带 Kafka 可以保留运行,也可以停止: | |||
| ```bash | |||
| dc --profile local-kafka stop kafka kafka-init | |||
| ``` | |||
| 停止自带 Kafka 不会影响甲方转发方案,因为服务已经改为连接 `ip-cld.cn:29362`。 | |||
| ## emp-uat 切换为自带 Kafka | |||
| ```bash | |||
| set_env KAFKA_BROKERS kafka:9092 | |||
| set_env KAFKA_TOPIC YuanJing-uat-vehicle-mock-data | |||
| set_env KAFKA_GROUP_ID emp-uat-data-group | |||
| set_env KAFKA_USER '' | |||
| set_env KAFKA_PWD '' | |||
| set_env SIMULATOR_KAFKA_BROKERS kafka:9092 | |||
| set_env SIMULATOR_KAFKA_TOPIC YuanJing-uat-vehicle-mock-data | |||
| set_env SIMULATOR_KAFKA_USER '' | |||
| set_env SIMULATOR_KAFKA_PASSWORD '' | |||
| set_env SIMULATOR_KAFKA_CLIENT_ID emp-uat-simulator | |||
| ``` | |||
| 启动自带 Kafka,并创建 topic: | |||
| ```bash | |||
| dc --profile local-kafka up -d kafka | |||
| sleep 10 | |||
| dc --profile local-kafka run --rm kafka-init | |||
| ``` | |||
| 重建读取 Kafka 配置的服务: | |||
| ```bash | |||
| dc up -d --force-recreate emp-ws emp-data | |||
| dc restart emp-admin | |||
| ``` | |||
| ## emp-uat 切换为甲方 Kafka 转发方案 | |||
| ```bash | |||
| set_env KAFKA_BROKERS ip-cld.cn:29362 | |||
| set_env KAFKA_TOPIC YuanJing-uat-vehicle-mock-data | |||
| set_env KAFKA_GROUP_ID emp-uat-data-group | |||
| set_env KAFKA_USER '' | |||
| set_env KAFKA_PWD '' | |||
| set_env SIMULATOR_KAFKA_BROKERS ip-cld.cn:29362 | |||
| set_env SIMULATOR_KAFKA_TOPIC uat-vehicle-real-data | |||
| set_env SIMULATOR_KAFKA_USER '' | |||
| set_env SIMULATOR_KAFKA_PASSWORD '' | |||
| set_env SIMULATOR_KAFKA_CLIENT_ID emp-uat-simulator | |||
| ``` | |||
| 重建读取 Kafka 配置的服务: | |||
| ```bash | |||
| dc up -d --force-recreate emp-ws emp-data | |||
| dc restart emp-admin | |||
| ``` | |||
| 自带 Kafka 可以保留运行,也可以停止: | |||
| ```bash | |||
| dc --profile local-kafka stop kafka kafka-init | |||
| ``` | |||
| ## 验证配置是否生效 | |||
| 查看模拟器 Kafka 配置: | |||
| ```bash | |||
| dc exec emp-ws sh -lc 'echo SIMULATOR_KAFKA_BROKERS=$SIMULATOR_KAFKA_BROKERS; echo SIMULATOR_KAFKA_TOPIC=$SIMULATOR_KAFKA_TOPIC; echo SIMULATOR_KAFKA_CLIENT_ID=$SIMULATOR_KAFKA_CLIENT_ID' | |||
| ``` | |||
| 查看后端消费配置: | |||
| ```bash | |||
| dc exec emp-data sh -lc 'echo KAFKA_BROKERS=$KAFKA_BROKERS; echo KAFKA_TOPIC=$KAFKA_TOPIC; echo KAFKA_GROUP_ID=$KAFKA_GROUP_ID' | |||
| ``` | |||
| 查看服务状态: | |||
| ```bash | |||
| dc ps kafka emp-ws emp-data emp-admin | |||
| ``` | |||
| 自带 Kafka 模式下可以查看 topic: | |||
| ```bash | |||
| dc exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh \ | |||
| --bootstrap-server 127.0.0.1:9092 \ | |||
| --list | |||
| ``` | |||
| 查看模拟器发送日志: | |||
| ```bash | |||
| dc logs -f --tail=200 emp-ws | grep -E --line-buffered '\[simulator\]|Kafka|kafka|tick failed|Error|error' | |||
| ``` | |||
| 查看后端消费日志: | |||
| ```bash | |||
| dc logs -f --tail=200 emp-data | grep -E --line-buffered 'Kafka|kafka|ConsumerRecord|vehicle|异常|error|Exception' | |||
| ``` | |||
| 如果看到 `emp-ws` 持续输出类似: | |||
| ```text | |||
| [simulator] ... 上报=... | |||
| ``` | |||
| 说明模拟器 producer 已经发送成功。 | |||
| 如果 `emp-data` 输出类似: | |||
| ```text | |||
| Kafka批量消费车辆数据 | |||
| ``` | |||
| 说明后端已经消费到 Kafka 消息。后续如果报 `TDengine写入失败`,说明 Kafka 链路已通,问题在 TDengine 表结构、密码或写入 SQL,不再是 Kafka 问题。 | |||
| ## 常见问题 | |||
| ### 切换后 `.env` 已修改但容器里还是旧配置 | |||
| `.env` 修改不会自动进入已运行容器,必须重建读取该配置的服务: | |||
| ```bash | |||
| dc up -d --force-recreate emp-ws emp-data | |||
| ``` | |||
| ### 切换后 `/simulator/` 502 | |||
| 通常是 `emp-admin` Nginx 缓存了重建前的 `emp-ws` 容器 IP: | |||
| ```bash | |||
| dc restart emp-admin | |||
| ``` | |||
| ### 自带 Kafka 模式下 `emp-data` 不启动 | |||
| `emp-data` 还依赖 MySQL、Redis、Nacos、TDengine。如果 Compose 提示等待 `tdengine`,先检查 TDengine 健康状态: | |||
| ```bash | |||
| dc ps tdengine | |||
| dc logs --tail=80 tdengine | |||
| ``` | |||
| 这类问题不是 Kafka 问题。 | |||
| ### 甲方转发方案下模拟器发送成功但后端没消费 | |||
| 先确认 EMP 配置: | |||
| ```bash | |||
| dc exec emp-ws sh -lc 'echo $SIMULATOR_KAFKA_BROKERS; echo $SIMULATOR_KAFKA_TOPIC' | |||
| dc exec emp-data sh -lc 'echo $KAFKA_BROKERS; echo $KAFKA_TOPIC; echo $KAFKA_GROUP_ID' | |||
| ``` | |||
| 如果配置正确,说明需要甲方检查转发服务: | |||
| ```text | |||
| 源 topic 是否收到消息 | |||
| 源 topic 到目标 topic 的转发规则是否启用 | |||
| topic 名是否完全一致 | |||
| 是否有车辆关系、权限、过滤规则导致数据被丢弃 | |||
| ``` | |||