diff --git a/.env.example b/.env.example index 442bb71..769e28d 100644 --- a/.env.example +++ b/.env.example @@ -1,13 +1,29 @@ -# 应用配置 +# ========================= +# 应用基础配置 +# ========================= NODE_ENV=production PORT=3000 -# Kafka配置 +# 日志级别:debug | info | warn | error +LOG_LEVEL=info +LOG_FORMAT=json + +# ========================= +# Kafka 配置 +# ========================= +# 多个 broker 用英文逗号分隔 KAFKA_BROKERS=localhost:9092 -KAFKA_TOPIC=blwlog4Nodejs-rcu-heartbeat-topic -KAFKA_TOPICS= -KAFKA_GROUP_ID=bls-heartbeat-consumer KAFKA_CLIENT_ID=bls-heartbeat +KAFKA_GROUP_ID=bls-heartbeat-consumer + +# 推荐使用 KAFKA_TOPICS;多个 topic 用英文逗号分隔 +KAFKA_TOPICS=blwlog4Nodejs-rcu-heartbeat-topic + +# 兼容旧写法;如已使用 KAFKA_TOPICS,可不填写 +# KAFKA_TOPIC=blwlog4Nodejs-rcu-heartbeat-topic + +# latest / earliest +KAFKA_FROM_OFFSET=latest KAFKA_CONSUMER_INSTANCES=6 KAFKA_AUTO_COMMIT=true KAFKA_AUTO_COMMIT_INTERVAL_MS=5000 @@ -18,16 +34,16 @@ KAFKA_FETCH_MAX_BYTES=10485760 KAFKA_FETCH_MIN_BYTES=1 KAFKA_FETCH_MAX_WAIT_MS=100 -# Kafka SASL配置(如果需要) +# SASL/SSL 按实际环境开启 KAFKA_SASL_ENABLED=false KAFKA_SASL_MECHANISM=plain KAFKA_SASL_USERNAME= KAFKA_SASL_PASSWORD= - -# Kafka SSL配置(如果需要) KAFKA_SSL_ENABLED=false -# PostgreSQL配置 +# ========================= +# PostgreSQL 配置 +# ========================= POSTGRES_HOST=127.0.0.1 POSTGRES_PORT=5432 POSTGRES_DATABASE=log_platform @@ -35,38 +51,47 @@ POSTGRES_USER=log_admin POSTGRES_PASSWORD=your_password POSTGRES_IDLE_TIMEOUT_MS=30000 -# PostgreSQL环境变量(兼容性) -PGHOST= -PGPORT= -PGTARGETDB= -PGUSER= -PGPASSWORD= +# 可选:兼容标准 PG 环境变量;留空则优先使用上面的 POSTGRES_* +# PGHOST= +# PGPORT= +# PGTARGETDB= +# PGUSER= +# PGPASSWORD= -# 数据库重试配置 +# 数据库失败重试 DB_RETRY_ATTEMPTS=3 DB_RETRY_DELAY=1000 -DB_PARTITION_MAINTENANCE_ENABLED=true -DB_PARTITION_FUTURE_DAYS=30 -DB_PARTITION_INTERVAL_HOURS=6 -# Redis配置 +# 双写开关 +# 旧明细表:heartbeat.heartbeat_events +DB_LEGACY_HEARTBEAT_ENABLED=true +# 新热表:heartbeat.heartbeat_events_g4_hot +DB_G4_HOT_HEARTBEAT_ENABLED=false +# room_status 写入开关 +DB_ROOM_STATUS_ENABLED=true + +# 如无特殊需要,保持默认表名即可 +DB_LEGACY_TABLE=heartbeat.heartbeat_events +DB_G4_HOT_TABLE=heartbeat.heartbeat_events_g4_hot + +# ========================= +# Redis 配置 +# ========================= REDIS_ENABLED=true REDIS_HOST=10.8.8.109 REDIS_PORT=6379 REDIS_PASSWORD= REDIS_DB=15 + +# 若提供 REDIS_URL,则业务侧可优先使用统一连接串 REDIS_URL= REDIS_CONNECT_TIMEOUT_MS=5000 REDIS_PROJECT_NAME=BLS主机心跳日志 REDIS_HEARTBEAT_INTERVAL_MS=3000 -REDIS_HEARTBEAT_TTL_SECONDS=30 REDIS_API_BASE_URL=http://127.0.0.1:3000 -REDIS_CONSOLE_MAX_LEN= -# 处理器配置 +# ========================= +# 批处理配置 +# ========================= PROCESSOR_BATCH_SIZE=100 -PROCESSOR_BATCH_TIMEOUT=5000 - -# 日志配置 -LOG_LEVEL=info -LOG_FORMAT=json \ No newline at end of file +PROCESSOR_BATCH_TIMEOUT=5000 \ No newline at end of file diff --git a/.github/prompts/plan-finalExecutableIndependentDualWriteNewHotTable.prompt.md b/.github/prompts/plan-finalExecutableIndependentDualWriteNewHotTable.prompt.md new file mode 100644 index 0000000..c849e25 --- /dev/null +++ b/.github/prompts/plan-finalExecutableIndependentDualWriteNewHotTable.prompt.md @@ -0,0 +1,419 @@ +## 最终可执行的实施清单 + +以下清单基于你已经确认的最终约束整理,可直接作为实施顺序使用。 + +--- + +## 一、实施目标 + +在**不改变数据来源**、**不改变现有批次处理节奏**、**不改变 `room_status` 写法与目标** 的前提下,实现: + +- 旧明细表 `heartbeat.heartbeat_events` 可独立开关 +- 新明细表 `heartbeat.heartbeat_events_g4_hot` 可独立开关 +- 新明细表继续使用**批量 `COPY`** +- 旧/新明细写入**完全独立** +- `room_status` 始终独立执行,不受明细写入开关影响 +- 错误表 `heartbeat.heartbeat_events_errors` 只保留一份,**仅记录新表写入失败** +- 支持未来**关闭旧表,仅保留新表** + +--- + +## 二、最终行为规则 + +### 1. 明细写入规则 +- `legacyHeartbeatEnabled=true` 时,写旧表 `heartbeat.heartbeat_events` +- `g4HotHeartbeatEnabled=true` 时,写新表 `heartbeat.heartbeat_events_g4_hot` +- 两者可同时开启,也可单独开启,也可同时关闭 + +### 2. `room_status` 规则 +- 始终执行现有 `upsertRoomStatus()` +- 目标表不变 +- 写法不变 +- 不依赖旧明细是否开启 +- 不依赖新明细是否开启 +- 即使旧/新明细都关闭,仍然写 `room_status` + +### 3. 错误表规则 +- 继续使用现有 `heartbeat.heartbeat_events_errors` +- 字段完全不变 +- 只记录**新表写入失败** +- 旧表写入失败**不再写错误表**,只记录日志/统计 + +### 4. 消费暂停规则 +- 不能因为“两路明细都关闭”而暂停消费 +- 不能因为“新表表级错误”直接当成全局数据库离线 +- 只有在**当前启用且关键的写入路径**发生连接级不可恢复故障时,才触发暂停消费 + +--- + +## 三、代码改造清单 + +### Task 1:扩展配置项 +**目标文件** +- `src/config/config.js` +- `src/config/config.example.js` +- `README.md` 或部署文档 + +**新增配置建议** +- `DB_LEGACY_HEARTBEAT_ENABLED` +- `DB_G4_HOT_HEARTBEAT_ENABLED` +- `DB_ROOM_STATUS_ENABLED` +- `DB_G4_HOT_TABLE` +- `DB_LEGACY_TABLE` + +**建议默认值** +- `DB_LEGACY_HEARTBEAT_ENABLED=true` +- `DB_G4_HOT_HEARTBEAT_ENABLED=false` +- `DB_ROOM_STATUS_ENABLED=true` +- `DB_LEGACY_TABLE=heartbeat.heartbeat_events` +- `DB_G4_HOT_TABLE=heartbeat.heartbeat_events_g4_hot` + +**实施要求** +- 保持与当前 `config.js` 风格一致 +- 布尔值走现有 `parseBoolean` +- 表名配置单独放在 `db` 配置节点下 + +**完成标准** +- 应用启动时可从环境变量读取旧/新明细开关 +- `room_status` 开关独立可控 +- 新旧目标表名可配置 + +--- + +### Task 2:抽象通用明细写入内核 +**目标文件** +- `src/db/databaseManager.js` + +**当前基础** +- 现有 `insertHeartbeatEvents()` 已实现: + - 分区预创建 + - `COPY ... FROM STDIN` + - 缺分区补建重试 + - fallback 逐条 `INSERT` + +**要做的事** +把现有写旧表的强绑定逻辑抽象为通用方法,例如: +- `_insertHeartbeatEventsToTarget(events, targetConfig)` +- `_buildHeartbeatCopySql(targetTable, columns)` +- `_buildHeartbeatInsertSql(targetTable, columns)` + +**目标参数至少包含** +- `tableName` +- `columns` +- `logPrefix` +- `enablePartitionEnsure` +- 缺分区识别规则 + +**实施要求** +- 不能复制两份几乎相同的 `COPY` 代码 +- 保证旧表和新表复用同一套批量写入能力 +- 保证新表也走 `COPY` + +**完成标准** +- 写旧表和写新表都可通过同一个通用写入内核完成 +- 旧逻辑行为不变 +- 新表能力复用旧逻辑 + +--- + +### Task 3:实现双明细独立编排 +**目标文件** +- `src/db/databaseManager.js` + +**新增编排方法建议** +- `writeHeartbeatDetails(events)` +- 或 `insertHeartbeatEventsDual(events)` + +**它需要负责** +- 判断旧表是否开启 +- 判断新表是否开启 +- 分别调用: + - `legacy writer` + - `g4Hot writer` +- 聚合结果并返回结构化结果 + +**返回结果建议** +至少包含: +- `legacy.enabled` +- `legacy.success` +- `legacy.insertedCount` +- `legacy.failedRecords` +- `legacy.error` +- `g4Hot.enabled` +- `g4Hot.success` +- `g4Hot.insertedCount` +- `g4Hot.failedRecords` +- `g4Hot.error` + +**实施要求** +- 两路执行逻辑互不影响 +- 一路失败不能吞掉另一路结果 +- 两路都关闭时返回“跳过写入”的明确结果,不报错 + +**完成标准** +- 系统能正确识别四种模式: + - 仅旧 + - 仅新 + - 双写 + - 双关 + +--- + +### Task 4:调整 `HeartbeatProcessor.processBatch()` 的主流程 +**目标文件** +- `src/processor/heartbeatProcessor.js` + +**当前逻辑问题** +当前流程中: +- 先写 `insertHeartbeatEvents(batchData)` +- 只有成功后才 best-effort 写 `room_status` + +这与当前最终要求不一致。 + +**改造目标** +把流程改成三段独立逻辑: + +#### A. 明细写入 +调用新的双写编排方法,获取旧/新明细写入结果 + +#### B. `room_status` 写入 +无论旧/新明细写入开关状态如何,都执行: +- `upsertRoomStatus(batchData)` + +注意: +- 只要 `DB_ROOM_STATUS_ENABLED=true` +- 就执行当前逻辑 +- 不依赖明细成功与否 + +#### C. 错误表写入 +仅把**新表失败记录**送入 `insertHeartbeatEventsErrors()` + +**完成标准** +- `room_status` 从“依赖旧表成功”变成“独立执行” +- 错误表只接新表失败 +- 旧表失败不进错误表 + +--- + +### Task 5:重新定义错误表调用策略 +**目标文件** +- `src/db/databaseManager.js` +- `src/processor/heartbeatProcessor.js` + +**要求** +- 保留 `insertHeartbeatEventsErrors()` 表结构和 SQL +- 仅调整调用来源: + - 新表失败 → 写错误表 + - 旧表失败 → 不写错误表 + +**实施要求** +建议在双写结果聚合后,只提取: +- `g4Hot.failedRecords` + +转换成当前错误表 payload 后调用 `insertHeartbeatEventsErrors()` + +**完成标准** +- 错误表字段、表结构、SQL 全不变 +- 旧表失败不会污染错误表 +- 新表失败可追踪 + +--- + +### Task 6:调整连接状态与暂停消费策略 +**目标文件** +- `src/processor/heartbeatProcessor.js` +- `src/db/databaseManager.js` + +**当前问题** +目前 `_isConnectionError()` 和 `_scheduleDbCheck()` 基本围绕单一 DB 写入路径设计。 + +**改造要求** +把“是否需要暂停 Kafka 消费”改成基于**启用中的关键 sink**判断。 + +**建议规则** +- 仅旧表开启:旧表连接故障才可能触发暂停 +- 仅新表开启:新表连接故障才可能触发暂停 +- 双开:如果两路都因连接级错误不可写,可触发暂停 +- 双关但 `room_status` 开启:只要 `room_status` 还能正常写,就不应因明细关闭而暂停 + +**重要区分** +以下不能都算成“数据库离线”: +- 连接失败 +- 缺分区 +- 表不存在 +- 表字段不匹配 +- 权限不足 + +其中真正应触发离线处理的优先是: +- `08006` +- `08001` +- `08003` +- `08004` +- `08007` +- `57P03` +- 以及明显的网络连接错误 + +**完成标准** +- 不会因为新表单独异常而误停整个消费 +- 不会因为两路明细关闭而误停消费 + +--- + +### Task 7:补充启动日志与配置摘要 +**目标文件** +- `src/index.js` + +**新增日志建议** +启动时输出: +- 旧明细写入是否开启 +- 新明细写入是否开启 +- `room_status` 是否开启 +- 旧表目标 +- 新表目标 + +**完成标准** +- 运行日志中能直接看出当前处于哪种模式 +- 运维排障时不用翻配置文件 + +--- + +### Task 8:补充统计项 +**目标文件** +- `src/stats/statsManager.js` +- 如有 Redis 控制台输出,也同步补充 + +**新增统计建议** +- legacy detail success count +- legacy detail failed count +- g4Hot detail success count +- g4Hot detail failed count +- room_status success count +- room_status failed count +- g4Hot error-table inserted count + +**完成标准** +- 双写观察期可以快速对比旧表/新表健康状态 +- 能单独看出新表失败是否升高 + +--- + +## 四、测试清单 + +### Task 9:补充单元测试 +**目标文件** +- `test/smoke.test.js` +- 如有必要新增专门的 `databaseManager` 测试文件 + +**必须覆盖的场景** +1. 仅旧表开启 +2. 仅新表开启 +3. 旧新双开 +4. 旧新双关,但 `room_status` 开启 +5. 旧成功、新失败 +6. 旧失败、新成功 +7. 双失败 +8. 新表失败时写错误表 +9. 旧表失败时不写错误表 +10. `room_status` 始终执行 +11. `COPY` 失败时新表降级逐条 `INSERT` +12. 缺分区时自动补分区重试 + +**完成标准** +- `npm test` 可覆盖新旧双写与独立逻辑 +- `room_status` 独立性有明确断言 + +--- + +### Task 10:补充数据库 smoke 验证 +**目标文件** +- `scripts/db/smokeTest.js` +- 或新增 `scripts/db/smokeG4Hot.js` + +**验证项** +- 新表 `heartbeat.heartbeat_events_g4_hot` 能写入 +- 新表分区能正确附着 +- 新表唯一键/索引存在 +- 新表可按主查询维度检索 +- 旧表关闭、新表开启时仍正常 +- 旧新都关但 `room_status` 开启时系统不报配置错误 + +**完成标准** +- 有一套上线前可重复执行的 smoke 脚本 +- 可快速确认新表接入正确 + +--- + +## 五、上线实施顺序 + +### Step 1:代码部署,保持现状 +配置: +- 旧表开启 +- 新表关闭 +- `room_status` 开启 + +目标: +- 确认改造后代码在“旧模式”下行为不变 + +### Step 2:开启新表双写 +配置: +- 旧表开启 +- 新表开启 +- `room_status` 开启 + +目标: +- 观察新表写入是否稳定 +- 检查新表失败是否进入错误表 +- 比对旧/新明细数据量 + +### Step 3:观察稳定性 +重点观察: +- 新表 `COPY` 成功率 +- 新表 fallback 频率 +- 错误表记录量 +- `room_status` 是否持续正常 + +### Step 4:关闭旧表 +配置: +- 旧表关闭 +- 新表开启 +- `room_status` 开启 + +目标: +- 验证“仅保留新表”时系统稳定运行 + +### Step 5:保留过渡观察期 +目标: +- 持续观察新表、错误表、`room_status` + +--- + +## 六、验收标准 + +满足以下全部条件即可验收: + +1. 新表 `heartbeat.heartbeat_events_g4_hot` 接入成功 +2. 新表仍使用批量 `COPY` +3. 旧/新明细可独立开关 +4. `room_status` 始终独立执行 +5. 两路明细都关闭时,系统仍允许继续写 `room_status` +6. 错误表仅记录新表失败 +7. 旧表失败不会写错误表 +8. 不修改 Kafka 数据来源 +9. 不修改现有数据转换来源 +10. 可支持后续关闭旧表,仅保留新表 + +--- + +## 七、建议的实施顺序编号版 + +1. 扩展 `config.js` 配置项 +2. 重构 `databaseManager.js`,抽通用 `COPY` writer +3. 在 `databaseManager.js` 增加旧/新明细双写编排 +4. 调整 `heartbeatProcessor.js`,让 `room_status` 独立执行 +5. 调整错误表调用,仅接新表失败 +6. 重构连接异常与暂停消费判定 +7. 增加启动配置摘要日志 +8. 增加 stats 指标 +9. 补测试 +10. 补 smoke 验证脚本 +11. 按“旧开新关 → 双开 → 关旧留新”上线 diff --git a/docs/heartbeat_events_g4_hot.sql b/docs/heartbeat_events_g4_hot.sql new file mode 100644 index 0000000..8fb19aa --- /dev/null +++ b/docs/heartbeat_events_g4_hot.sql @@ -0,0 +1,530 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 109 + Source Server Type : PostgreSQL + Source Server Version : 150014 (150014) + Source Host : 10.8.8.109:5433 + Source Catalog : log_platform + Source Schema : heartbeat + + Target Server Type : PostgreSQL + Target Server Version : 150014 (150014) + File Encoding : 65001 + + Date: 09/03/2026 10:11:03 +*/ + + +-- ---------------------------- +-- Table structure for heartbeat_events_g4_hot_d20260301 +-- ---------------------------- +DROP TABLE IF EXISTS "heartbeat"."heartbeat_events_g4_hot_d20260301"; +CREATE TABLE "heartbeat"."heartbeat_events_g4_hot_d20260301" ( + "ts_ms" int8 NOT NULL, + "hotel_id" int2 NOT NULL, + "room_id" varchar(50) COLLATE "pg_catalog"."default" NOT NULL, + "device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL, + "ip" varchar(21) COLLATE "pg_catalog"."default" NOT NULL, + "power_state" int2 NOT NULL, + "guest_type" int2 NOT NULL, + "cardless_state" int2 NOT NULL, + "service_mask" int8 NOT NULL, + "pms_state" int2 NOT NULL, + "carbon_state" int2 NOT NULL, + "device_count" int2 NOT NULL, + "comm_seq" int4 NOT NULL, + "elec_address" _text COLLATE "pg_catalog"."default", + "air_address" _text COLLATE "pg_catalog"."default", + "voltage" _float8, + "ampere" _float8, + "power" _float8, + "phase" _text COLLATE "pg_catalog"."default", + "energy" _float8, + "sum_energy" _float8, + "state" _int2, + "model" _int2, + "speed" _int2, + "set_temp" _int2, + "now_temp" _int2, + "solenoid_valve" _int2, + "extra" jsonb, + "write_ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "insert_card" int2, + "bright_g" int2, + "version" int2, + "svc_01" bool, + "svc_02" bool, + "svc_03" bool, + "svc_04" bool, + "svc_05" bool, + "svc_06" bool, + "svc_07" bool, + "svc_08" bool, + "svc_09" bool, + "svc_10" bool, + "svc_11" bool, + "svc_12" bool, + "svc_13" bool, + "svc_14" bool, + "svc_15" bool, + "svc_16" bool, + "svc_17" bool, + "svc_18" bool, + "svc_19" bool, + "svc_20" bool, + "svc_21" bool, + "svc_22" bool, + "svc_23" bool, + "svc_24" bool, + "svc_25" bool, + "svc_26" bool, + "svc_27" bool, + "svc_28" bool, + "svc_29" bool, + "svc_30" bool, + "svc_31" bool, + "svc_32" bool, + "svc_33" bool, + "svc_34" bool, + "svc_35" bool, + "svc_36" bool, + "svc_37" bool, + "svc_38" bool, + "svc_39" bool, + "svc_40" bool, + "svc_41" bool, + "svc_42" bool, + "svc_43" bool, + "svc_44" bool, + "svc_45" bool, + "svc_46" bool, + "svc_47" bool, + "svc_48" bool, + "svc_49" bool, + "svc_50" bool, + "svc_51" bool, + "svc_52" bool, + "svc_53" bool, + "svc_54" bool, + "svc_55" bool, + "svc_56" bool, + "svc_57" bool, + "svc_58" bool, + "svc_59" bool, + "svc_60" bool, + "svc_61" bool, + "svc_62" bool, + "svc_63" bool, + "svc_64" bool, + "air_address_1" text COLLATE "pg_catalog"."default", + "air_address_2" text COLLATE "pg_catalog"."default", + "air_address_residual" _text COLLATE "pg_catalog"."default", + "state_1" int2, + "state_2" int2, + "state_residual" _int2, + "model_1" int2, + "model_2" int2, + "model_residual" _int2, + "speed_1" int2, + "speed_2" int2, + "speed_residual" _int2, + "set_temp_1" int2, + "set_temp_2" int2, + "set_temp_residual" _int2, + "now_temp_1" int2, + "now_temp_2" int2, + "now_temp_residual" _int2, + "solenoid_valve_1" int2, + "solenoid_valve_2" int2, + "solenoid_valve_residual" _int2, + "elec_address_1" text COLLATE "pg_catalog"."default", + "elec_address_2" text COLLATE "pg_catalog"."default", + "elec_address_residual" _text COLLATE "pg_catalog"."default", + "voltage_1" float8, + "voltage_2" float8, + "voltage_residual" _float8, + "ampere_1" float8, + "ampere_2" float8, + "ampere_residual" _float8, + "power_1" float8, + "power_2" float8, + "power_residual" _float8, + "phase_1" text COLLATE "pg_catalog"."default", + "phase_2" text COLLATE "pg_catalog"."default", + "phase_residual" _text COLLATE "pg_catalog"."default", + "energy_1" float8, + "energy_2" float8, + "energy_residual" _float8, + "sum_energy_1" float8, + "sum_energy_2" float8, + "sum_energy_residual" _float8, + "power_carbon_on" float8, + "power_carbon_off" float8, + "power_person_exist" float8, + "power_person_left" float8, + "guid" varchar(32) COLLATE "pg_catalog"."default" +) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Table structure for heartbeat_events_g4_hot_d20260302 +-- ---------------------------- +DROP TABLE IF EXISTS "heartbeat"."heartbeat_events_g4_hot_d20260302"; +CREATE TABLE "heartbeat"."heartbeat_events_g4_hot_d20260302" ( + "ts_ms" int8 NOT NULL, + "hotel_id" int2 NOT NULL, + "room_id" varchar(50) COLLATE "pg_catalog"."default" NOT NULL, + "device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL, + "ip" varchar(21) COLLATE "pg_catalog"."default" NOT NULL, + "power_state" int2 NOT NULL, + "guest_type" int2 NOT NULL, + "cardless_state" int2 NOT NULL, + "service_mask" int8 NOT NULL, + "pms_state" int2 NOT NULL, + "carbon_state" int2 NOT NULL, + "device_count" int2 NOT NULL, + "comm_seq" int4 NOT NULL, + "elec_address" _text COLLATE "pg_catalog"."default", + "air_address" _text COLLATE "pg_catalog"."default", + "voltage" _float8, + "ampere" _float8, + "power" _float8, + "phase" _text COLLATE "pg_catalog"."default", + "energy" _float8, + "sum_energy" _float8, + "state" _int2, + "model" _int2, + "speed" _int2, + "set_temp" _int2, + "now_temp" _int2, + "solenoid_valve" _int2, + "extra" jsonb, + "write_ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "insert_card" int2, + "bright_g" int2, + "version" int2, + "svc_01" bool, + "svc_02" bool, + "svc_03" bool, + "svc_04" bool, + "svc_05" bool, + "svc_06" bool, + "svc_07" bool, + "svc_08" bool, + "svc_09" bool, + "svc_10" bool, + "svc_11" bool, + "svc_12" bool, + "svc_13" bool, + "svc_14" bool, + "svc_15" bool, + "svc_16" bool, + "svc_17" bool, + "svc_18" bool, + "svc_19" bool, + "svc_20" bool, + "svc_21" bool, + "svc_22" bool, + "svc_23" bool, + "svc_24" bool, + "svc_25" bool, + "svc_26" bool, + "svc_27" bool, + "svc_28" bool, + "svc_29" bool, + "svc_30" bool, + "svc_31" bool, + "svc_32" bool, + "svc_33" bool, + "svc_34" bool, + "svc_35" bool, + "svc_36" bool, + "svc_37" bool, + "svc_38" bool, + "svc_39" bool, + "svc_40" bool, + "svc_41" bool, + "svc_42" bool, + "svc_43" bool, + "svc_44" bool, + "svc_45" bool, + "svc_46" bool, + "svc_47" bool, + "svc_48" bool, + "svc_49" bool, + "svc_50" bool, + "svc_51" bool, + "svc_52" bool, + "svc_53" bool, + "svc_54" bool, + "svc_55" bool, + "svc_56" bool, + "svc_57" bool, + "svc_58" bool, + "svc_59" bool, + "svc_60" bool, + "svc_61" bool, + "svc_62" bool, + "svc_63" bool, + "svc_64" bool, + "air_address_1" text COLLATE "pg_catalog"."default", + "air_address_2" text COLLATE "pg_catalog"."default", + "air_address_residual" _text COLLATE "pg_catalog"."default", + "state_1" int2, + "state_2" int2, + "state_residual" _int2, + "model_1" int2, + "model_2" int2, + "model_residual" _int2, + "speed_1" int2, + "speed_2" int2, + "speed_residual" _int2, + "set_temp_1" int2, + "set_temp_2" int2, + "set_temp_residual" _int2, + "now_temp_1" int2, + "now_temp_2" int2, + "now_temp_residual" _int2, + "solenoid_valve_1" int2, + "solenoid_valve_2" int2, + "solenoid_valve_residual" _int2, + "elec_address_1" text COLLATE "pg_catalog"."default", + "elec_address_2" text COLLATE "pg_catalog"."default", + "elec_address_residual" _text COLLATE "pg_catalog"."default", + "voltage_1" float8, + "voltage_2" float8, + "voltage_residual" _float8, + "ampere_1" float8, + "ampere_2" float8, + "ampere_residual" _float8, + "power_1" float8, + "power_2" float8, + "power_residual" _float8, + "phase_1" text COLLATE "pg_catalog"."default", + "phase_2" text COLLATE "pg_catalog"."default", + "phase_residual" _text COLLATE "pg_catalog"."default", + "energy_1" float8, + "energy_2" float8, + "energy_residual" _float8, + "sum_energy_1" float8, + "sum_energy_2" float8, + "sum_energy_residual" _float8, + "power_carbon_on" float8, + "power_carbon_off" float8, + "power_person_exist" float8, + "power_person_left" float8, + "guid" varchar(32) COLLATE "pg_catalog"."default" +) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Table structure for heartbeat_events_g4_hot +-- ---------------------------- +DROP TABLE IF EXISTS "heartbeat"."heartbeat_events_g4_hot"; +CREATE TABLE "heartbeat"."heartbeat_events_g4_hot" ( + "ts_ms" int8 NOT NULL, + "hotel_id" int2 NOT NULL, + "room_id" varchar(50) COLLATE "pg_catalog"."default" NOT NULL, + "device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL, + "ip" varchar(21) COLLATE "pg_catalog"."default" NOT NULL, + "power_state" int2 NOT NULL, + "guest_type" int2 NOT NULL, + "cardless_state" int2 NOT NULL, + "service_mask" int8 NOT NULL, + "pms_state" int2 NOT NULL, + "carbon_state" int2 NOT NULL, + "device_count" int2 NOT NULL, + "comm_seq" int4 NOT NULL, + "elec_address" text[] COLLATE "pg_catalog"."default", + "air_address" text[] COLLATE "pg_catalog"."default", + "voltage" float8[], + "ampere" float8[], + "power" float8[], + "phase" text[] COLLATE "pg_catalog"."default", + "energy" float8[], + "sum_energy" float8[], + "state" int2[], + "model" int2[], + "speed" int2[], + "set_temp" int2[], + "now_temp" int2[], + "solenoid_valve" int2[], + "extra" jsonb, + "write_ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "insert_card" int2, + "bright_g" int2, + "version" int2, + "svc_01" bool, + "svc_02" bool, + "svc_03" bool, + "svc_04" bool, + "svc_05" bool, + "svc_06" bool, + "svc_07" bool, + "svc_08" bool, + "svc_09" bool, + "svc_10" bool, + "svc_11" bool, + "svc_12" bool, + "svc_13" bool, + "svc_14" bool, + "svc_15" bool, + "svc_16" bool, + "svc_17" bool, + "svc_18" bool, + "svc_19" bool, + "svc_20" bool, + "svc_21" bool, + "svc_22" bool, + "svc_23" bool, + "svc_24" bool, + "svc_25" bool, + "svc_26" bool, + "svc_27" bool, + "svc_28" bool, + "svc_29" bool, + "svc_30" bool, + "svc_31" bool, + "svc_32" bool, + "svc_33" bool, + "svc_34" bool, + "svc_35" bool, + "svc_36" bool, + "svc_37" bool, + "svc_38" bool, + "svc_39" bool, + "svc_40" bool, + "svc_41" bool, + "svc_42" bool, + "svc_43" bool, + "svc_44" bool, + "svc_45" bool, + "svc_46" bool, + "svc_47" bool, + "svc_48" bool, + "svc_49" bool, + "svc_50" bool, + "svc_51" bool, + "svc_52" bool, + "svc_53" bool, + "svc_54" bool, + "svc_55" bool, + "svc_56" bool, + "svc_57" bool, + "svc_58" bool, + "svc_59" bool, + "svc_60" bool, + "svc_61" bool, + "svc_62" bool, + "svc_63" bool, + "svc_64" bool, + "air_address_1" text COLLATE "pg_catalog"."default", + "air_address_2" text COLLATE "pg_catalog"."default", + "air_address_residual" text[] COLLATE "pg_catalog"."default", + "state_1" int2, + "state_2" int2, + "state_residual" int2[], + "model_1" int2, + "model_2" int2, + "model_residual" int2[], + "speed_1" int2, + "speed_2" int2, + "speed_residual" int2[], + "set_temp_1" int2, + "set_temp_2" int2, + "set_temp_residual" int2[], + "now_temp_1" int2, + "now_temp_2" int2, + "now_temp_residual" int2[], + "solenoid_valve_1" int2, + "solenoid_valve_2" int2, + "solenoid_valve_residual" int2[], + "elec_address_1" text COLLATE "pg_catalog"."default", + "elec_address_2" text COLLATE "pg_catalog"."default", + "elec_address_residual" text[] COLLATE "pg_catalog"."default", + "voltage_1" float8, + "voltage_2" float8, + "voltage_residual" float8[], + "ampere_1" float8, + "ampere_2" float8, + "ampere_residual" float8[], + "power_1" float8, + "power_2" float8, + "power_residual" float8[], + "phase_1" text COLLATE "pg_catalog"."default", + "phase_2" text COLLATE "pg_catalog"."default", + "phase_residual" text[] COLLATE "pg_catalog"."default", + "energy_1" float8, + "energy_2" float8, + "energy_residual" float8[], + "sum_energy_1" float8, + "sum_energy_2" float8, + "sum_energy_residual" float8[], + "power_carbon_on" float8, + "power_carbon_off" float8, + "power_person_exist" float8, + "power_person_left" float8, + "guid" varchar(32) COLLATE "pg_catalog"."default" +) +PARTITION BY RANGE ( + "ts_ms" "pg_catalog"."int8_ops" +) +TABLESPACE "ts_hot" +; +ALTER TABLE "heartbeat"."heartbeat_events_g4_hot" ATTACH PARTITION "heartbeat"."heartbeat_events_g4_hot_d20260301" FOR VALUES FROM ( +'1772294400000' +) TO ( +'1772380800000' +) +; +ALTER TABLE "heartbeat"."heartbeat_events_g4_hot" ATTACH PARTITION "heartbeat"."heartbeat_events_g4_hot_d20260302" FOR VALUES FROM ( +'1772380800000' +) TO ( +'1772467200000' +) +; + +-- ---------------------------- +-- Indexes structure for table heartbeat_events_g4_hot_d20260301 +-- ---------------------------- +CREATE UNIQUE INDEX "heartbeat_events_g4_hot_d20260301_guid_ts_ms_idx" ON "heartbeat"."heartbeat_events_g4_hot_d20260301" USING btree ( + "guid" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" ASC NULLS LAST +) TABLESPACE "ts_hot"; +CREATE INDEX "heartbeat_events_g4_hot_d2026_hotel_id_room_id_device_id_ts_idx" ON "heartbeat"."heartbeat_events_g4_hot_d20260301" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "room_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +) WITH (FILLFACTOR = 100) TABLESPACE "ts_hot"; + +-- ---------------------------- +-- Indexes structure for table heartbeat_events_g4_hot_d20260302 +-- ---------------------------- +CREATE UNIQUE INDEX "heartbeat_events_g4_hot_d20260302_guid_ts_ms_idx" ON "heartbeat"."heartbeat_events_g4_hot_d20260302" USING btree ( + "guid" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" ASC NULLS LAST +) TABLESPACE "ts_hot"; +CREATE INDEX "heartbeat_events_g4_hot_d2026_hotel_id_room_id_device_id_t_idx1" ON "heartbeat"."heartbeat_events_g4_hot_d20260302" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "room_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +) WITH (FILLFACTOR = 100) TABLESPACE "ts_hot"; + +-- ---------------------------- +-- Indexes structure for table heartbeat_events_g4_hot +-- ---------------------------- +CREATE UNIQUE INDEX "idx_g4_hot_guid_unique" ON "heartbeat"."heartbeat_events_g4_hot" USING btree ( + "guid" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" ASC NULLS LAST +) TABLESPACE "ts_hot"; +CREATE INDEX "idx_g4_hot_lookup" ON "heartbeat"."heartbeat_events_g4_hot" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "room_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +) WITH (FILLFACTOR = 100) TABLESPACE "ts_hot"; diff --git a/docs/新分区方法案例.md b/docs/新分区方法案例.md deleted file mode 100644 index 9b31d1a..0000000 --- a/docs/新分区方法案例.md +++ /dev/null @@ -1,55 +0,0 @@ --- 通过 docker compose 在容器内执行 psql,并使用 here-doc 传入 SQL -docker compose exec -T postgres psql -U log_admin -d log_platform -v ON_ERROR_STOP=1 <<'SQL' - --- 使用匿名代码块批量处理分区创建与索引迁移 -DO $$ -DECLARE - d date; -- 循环日期(从今天到未来 29 天) - pname text; -- 分区表名,例如 heartbeat_events_20260303 - start_ms bigint; -- 分区起始毫秒时间戳(UTC,含) - end_ms bigint; -- 分区结束毫秒时间戳(UTC,不含) - idx record; -- 遍历分区索引时的游标记录 -BEGIN - -- 生成从 current_date 到 current_date+29 的日期序列(共 30 天) - FOR d IN - SELECT generate_series(current_date, current_date + 29, interval '1 day')::date - LOOP - -- 按约定命名分区名:heartbeat_events_YYYYMMDD - pname := format('heartbeat_events_%s', to_char(d, 'YYYYMMDD')); - - -- 计算该日期 00:00:00 UTC 的毫秒时间戳作为分区下界 - start_ms := (extract(epoch from (d::timestamp at time zone 'UTC')) * 1000)::bigint; - - -- 计算下一天 00:00:00 UTC 的毫秒时间戳作为分区上界 - end_ms := (extract(epoch from ((d + 1)::timestamp at time zone 'UTC')) * 1000)::bigint; - - -- 若分区不存在则创建;存在则跳过(幂等) - EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', - pname, start_ms, end_ms - ); - - -- 无论新建或已存在,强制把分区表迁移到 ts_hot(保证热分区落热盘) - EXECUTE format('ALTER TABLE heartbeat.%I SET TABLESPACE ts_hot', pname); - - -- 遍历该分区的全部索引,筛出不在 ts_hot 的索引 - FOR idx IN - SELECT idxn.nspname AS index_schema, i.relname AS index_name - FROM pg_index x - JOIN pg_class t ON t.oid = x.indrelid - JOIN pg_namespace nt ON nt.oid = t.relnamespace - JOIN pg_class i ON i.oid = x.indexrelid - JOIN pg_namespace idxn ON idxn.oid = i.relnamespace - LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace - WHERE nt.nspname = 'heartbeat' - AND t.relname = pname - AND COALESCE(ts.spcname, 'pg_default') <> 'ts_hot' - LOOP - -- 将索引迁移到 ts_hot,确保“分区与索引同盘” - EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE ts_hot', idx.index_schema, idx.index_name); - END LOOP; - END LOOP; -END $$; - --- here-doc 结束标记 -SQL \ No newline at end of file diff --git a/openspec/changes/add-g4-hot-dual-write/design.md b/openspec/changes/add-g4-hot-dual-write/design.md new file mode 100644 index 0000000..e89fd9c --- /dev/null +++ b/openspec/changes/add-g4-hot-dual-write/design.md @@ -0,0 +1,76 @@ +## Context +当前系统只有一个明细写入目标 `heartbeat.heartbeat_events`。需要新增 `heartbeat.heartbeat_events_g4_hot` 作为第二个独立写入目标,两路完全解耦,支持平滑迁移。 + +约束: +- 同一 PostgreSQL 实例/数据库,共享同一连接池 +- 数据来源(Kafka)、解包、校验、转换逻辑完全不变 +- `room_status` 始终独立执行,不依赖任何明细写入 +- 错误表仅服务新表写入失败 + +## Goals / Non-Goals +- Goals: + - 旧/新明细表可通过启动配置独立开关 + - 新表复用现有批量 COPY 写入内核 + - 两路写入互不影响(错误隔离、重试隔离、fallback 隔离) + - `room_status` 始终执行,不受明细开关影响 + - 错误表仅记录新表失败 + - 支持"旧开新关 → 双开 → 关旧留新"迁移路径 + +- Non-Goals: + - 不引入第二套 PostgreSQL 连接配置(同库新表) + - 不改变 Kafka 消费链路、消息格式、数据转换 + - 不做运行时热开关(重启生效即可) + - 不在本次改造中删除旧表相关代码 + +## Decisions + +### Decision 1: 共享连接池,不新建第二个 Pool +- **What**: 旧表和新表都使用同一个 `pg.Pool` +- **Why**: 两个目标表在同一 PostgreSQL 实例/数据库下,共享连接池避免资源浪费 +- **Alternatives**: 为新表创建独立连接池 → 复杂度高,同库下无必要 + +### Decision 2: 抽象通用写入内核而非复制代码 +- **What**: 把现有 `insertHeartbeatEvents()` 中与表名绑定的逻辑抽象为 `_insertEventsToTarget(events, targetConfig)` +- **Why**: 避免维护两份几乎相同的 COPY/INSERT 代码;新表和旧表的列清单不同但写入流程一致 +- **Alternatives**: 复制一份改表名 → 双维护成本高 + +### Decision 3: 编排层聚合结果,Processor 不感知双写细节 +- **What**: `DatabaseManager.insertHeartbeatEventsDual()` 负责编排两路写入并聚合结果;`HeartbeatProcessor` 只消费聚合结果 +- **Why**: 保持 Processor 职责单一,不引入 DB 路由逻辑 + +### Decision 4: room_status 独立于明细写入 +- **What**: `upsertRoomStatus()` 始终在 `processBatch()` 中独立执行,不依赖旧/新明细写入成功 +- **Why**: 用户明确要求即使两路明细都关闭,仍需写 `room_status` + +### Decision 5: 错误表仅服务新表 +- **What**: `insertHeartbeatEventsErrors()` 仅接收新表 (`g4Hot`) 写入失败的记录 +- **Why**: 旧表即将下线,错误追踪聚焦新表;旧表失败仅记日志/统计 + +### Decision 6: 消费暂停基于"启用中的关键 sink" +- **What**: 只有当前启用的写入路径全部发生连接级不可恢复故障时,才暂停 Kafka 消费 +- **Why**: 避免新表结构/权限问题误停全局消费 + +## Risks / Trade-offs + +### Risk 1: 新表列映射不完整 +- 新表字段比旧表更宽,旧数据可能缺少部分新表列(如 `svc_*`, `*_1`, `*_2`, `*_residual`, `guid`) +- **Mitigation**: 旧数据确认可完整映射到新表必填字段与唯一键;新增列允许 NULL 或有默认值 + +### Risk 2: 同库双写增加连接池压力 +- 单批数据写两次,连接池使用量翻倍 +- **Mitigation**: 两路顺序执行而非并发,复用同一连接池;必要时可调大 `maxConnections` + +### Risk 3: 新表分区函数可能与旧表不同 +- `heartbeat.ensure_partitions` 是否也涵盖新表的分区维护 +- **Mitigation**: 实现时需确认分区保障函数对新表的覆盖,或为新表配置独立的分区保障调用 + +## Migration Plan +1. 部署代码,保持 `DB_LEGACY_HEARTBEAT_ENABLED=true`、`DB_G4_HOT_HEARTBEAT_ENABLED=false` +2. 验证旧模式下行为不变 +3. 开启 `DB_G4_HOT_HEARTBEAT_ENABLED=true`,双写观测 +4. 确认新表写入稳定后,关闭 `DB_LEGACY_HEARTBEAT_ENABLED=false` +5. 过渡观察期后清理旧表相关配置 + +## Open Questions +- 新表 `heartbeat.heartbeat_events_g4_hot` 的分区预创建是否由同一个 `heartbeat.ensure_partitions` 函数覆盖?还是需要独立的分区保障函数? +- 新表的 `guid` 列是否需要在写入时由应用层生成?还是有数据库默认值? diff --git a/openspec/changes/add-g4-hot-dual-write/proposal.md b/openspec/changes/add-g4-hot-dual-write/proposal.md new file mode 100644 index 0000000..fff09c5 --- /dev/null +++ b/openspec/changes/add-g4-hot-dual-write/proposal.md @@ -0,0 +1,25 @@ +# Change: 新增 G4 热表独立双写能力 + +## Why +当前系统仅写入旧明细表 `heartbeat.heartbeat_events`。需要新增对 `heartbeat.heartbeat_events_g4_hot` 的独立写入能力,两路写入完全解耦、可分别开关,支持后续平滑关闭旧表、仅保留新表。 + +## What Changes +- 新增配置项:旧/新明细表写入开关、目标表名、`room_status` 独立开关 +- 重构 `DatabaseManager`:抽象通用批量 COPY 写入内核,支持面向不同目标表的复用 +- 新增双明细写入编排器:旧/新表各自独立执行、独立重试、独立 fallback +- 调整 `HeartbeatProcessor.processBatch()`:`room_status` 从"依赖旧表成功"改为"始终独立执行" +- 错误表 `heartbeat_events_errors` 仅记录新表写入失败,旧表失败不再写错误表 +- 重新定义消费暂停策略:基于"当前启用的关键 sink"判断,非全局一刀切 +- 补充按 sink 维度的统计项与启动日志 + +## Impact +- Affected specs: `db`, `processor` +- Affected code: + - `src/config/config.js` — 新增配置项 + - `src/db/databaseManager.js` — 核心重构 + - `src/processor/heartbeatProcessor.js` — 主流程调整 + - `src/index.js` — 启动日志 + - `src/stats/statsManager.js` — 统计项 + - `src/config/config.example.js` — 配置示例 +- No breaking changes to external APIs or Kafka consumer behavior +- Data source and transformation logic remain unchanged diff --git a/openspec/changes/add-g4-hot-dual-write/specs/db/spec.md b/openspec/changes/add-g4-hot-dual-write/specs/db/spec.md new file mode 100644 index 0000000..483b120 --- /dev/null +++ b/openspec/changes/add-g4-hot-dual-write/specs/db/spec.md @@ -0,0 +1,129 @@ +## ADDED Requirements + +### Requirement: G4 热表独立写入能力 +系统 SHALL 支持向 `heartbeat.heartbeat_events_g4_hot` 分区表执行批量 COPY 写入,写入内核(COPY 流式写入、失败逐条 INSERT 降级)SHALL 与旧表 `heartbeat.heartbeat_events` 共享同一套通用实现。 + +#### Scenario: G4 热表批量 COPY 写入 +- **WHEN** `g4HotHeartbeatEnabled=true` 且有一批心跳数据待写入 +- **THEN** 系统应使用批量 COPY 将数据写入 `heartbeat.heartbeat_events_g4_hot` +- **AND** 写入流程应与旧表共享同一通用写入内核 + +#### Scenario: G4 热表分区由外部维护 +- **WHEN** 写入 `heartbeat.heartbeat_events_g4_hot` +- **THEN** 系统不应在运行时为该表执行分区预创建、缺分区补建或其他分区维护逻辑 +- **AND** 分区应由外部脚本或外部调度系统维护 + +#### Scenario: G4 热表 COPY 失败降级 +- **WHEN** G4 热表批量 COPY 写入失败且重试次数耗尽 +- **THEN** 系统应降级为逐条 INSERT 写入 +- **AND** 单条失败不影响同批次其他记录 + +### Requirement: G4 热表 guid 写入规则 +系统 SHALL 在写入 `heartbeat.heartbeat_events_g4_hot` 时为每条记录提供 `guid` 字段值,格式为无连接符的小写 GUID。 + +#### Scenario: 缺失 guid 时自动生成 +- **WHEN** 待写入记录未提供 `guid` +- **THEN** 系统应为该记录生成一个新的 GUID +- **AND** 生成结果应为 32 位十六进制小写字符串 +- **AND** 结果中不应包含 `-` + +#### Scenario: 已提供 guid 时规范化 +- **WHEN** 待写入记录已提供 `guid` +- **THEN** 系统应将该值转换为小写 +- **AND** 应去除其中的 `-` +- **AND** 规范化后的值应写入 `heartbeat.heartbeat_events_g4_hot` + +### Requirement: G4 热表 power 辅助字段当前阶段写空 +系统 SHALL 在当前阶段将 `power_carbon_on`、`power_carbon_off`、`power_person_exist`、`power_person_left` 固定写为 `null`,待后续计算逻辑接入后再启用实际赋值。 + +#### Scenario: 当前阶段统一写 null +- **WHEN** 系统写入 `heartbeat.heartbeat_events_g4_hot` +- **THEN** `power_carbon_on` 应写入 `null` +- **AND** `power_carbon_off` 应写入 `null` +- **AND** `power_person_exist` 应写入 `null` +- **AND** `power_person_left` 应写入 `null` +- **AND** 不应从来源数据的 `extra` 或其他字段提取这 4 个值 + +### Requirement: 双明细独立编排 +系统 SHALL 提供双明细写入编排能力,按启动配置分别控制旧表与 G4 热表的写入,两路写入结果完全独立。 + +#### Scenario: 仅旧表开启 +- **WHEN** `legacyHeartbeatEnabled=true` 且 `g4HotHeartbeatEnabled=false` +- **THEN** 系统应仅写入旧表 `heartbeat.heartbeat_events` +- **AND** 不应尝试写入 G4 热表 + +#### Scenario: 仅新表开启 +- **WHEN** `legacyHeartbeatEnabled=false` 且 `g4HotHeartbeatEnabled=true` +- **THEN** 系统应仅写入 G4 热表 `heartbeat.heartbeat_events_g4_hot` +- **AND** 不应尝试写入旧表 + +#### Scenario: 双写模式 +- **WHEN** `legacyHeartbeatEnabled=true` 且 `g4HotHeartbeatEnabled=true` +- **THEN** 系统应分别写入旧表与 G4 热表 +- **AND** 旧表写入失败不影响 G4 热表写入 +- **AND** G4 热表写入失败不影响旧表写入 + +#### Scenario: 双关模式 +- **WHEN** `legacyHeartbeatEnabled=false` 且 `g4HotHeartbeatEnabled=false` +- **THEN** 系统应跳过所有明细写入 +- **AND** 不应报错或阻止消费 + +### Requirement: 写入目标启动配置 +系统 SHALL 通过启动配置(环境变量)分别控制旧表、G4 热表和 room_status 的写入开关与目标表名。 + +#### Scenario: 配置加载 +- **WHEN** 系统启动时 +- **THEN** 应从环境变量读取 `DB_LEGACY_HEARTBEAT_ENABLED`(默认 true)、`DB_G4_HOT_HEARTBEAT_ENABLED`(默认 false)、`DB_ROOM_STATUS_ENABLED`(默认 true) +- **AND** 应从环境变量读取 `DB_LEGACY_TABLE`(默认 `heartbeat.heartbeat_events`)、`DB_G4_HOT_TABLE`(默认 `heartbeat.heartbeat_events_g4_hot`) + +#### Scenario: 启动日志输出配置摘要 +- **WHEN** 系统启动完成时 +- **THEN** 应在日志中输出旧/新明细写入开关状态、目标表名和 room_status 开关状态 + +### Requirement: 错误表仅服务 G4 热表 +系统 SHALL 将 `heartbeat.heartbeat_events_errors` 仅用于记录 G4 热表写入失败的记录,旧表写入失败不写入错误表。 + +#### Scenario: G4 热表写入失败记录错误 +- **WHEN** G4 热表写入产生失败记录 +- **THEN** 系统应将失败记录写入 `heartbeat.heartbeat_events_errors` +- **AND** 错误表字段和结构不变 + +#### Scenario: 旧表写入失败不记录错误表 +- **WHEN** 旧表写入产生失败记录 +- **THEN** 系统不应将失败记录写入 `heartbeat.heartbeat_events_errors` +- **AND** 应仅记录日志/统计 + +## MODIFIED Requirements + +### Requirement: 心跳数据写入 +系统 MUST 能够将处理后的心跳数据写入 PostgreSQL 数据库,支持多目标表独立写入。 + +#### Scenario: 写入单条心跳数据 +- **WHEN** 接收到单条处理后的心跳数据时 +- **THEN** 系统应该将数据写入已启用的目标表 +- **AND** 返回写入结果 + +#### Scenario: 批量写入心跳数据 +- **WHEN** 接收到批量处理后的心跳数据时 +- **THEN** 系统应该对每个已启用的目标表使用批量 COPY 写入机制 +- **AND** 各目标表写入结果独立 +- **AND** 提高写入效率 + +### Requirement: 数据库连接管理 +系统 MUST 能够建立和维护与 PostgreSQL 数据库的连接。 + +#### Scenario: 成功连接数据库 +- **WHEN** 系统启动时 +- **THEN** 应该成功连接到配置的PostgreSQL数据库 +- **AND** 应该监控连接状态 + +#### Scenario: 数据库连接断开重连 +- **WHEN** 数据库连接断开时 +- **THEN** 系统应该自动尝试重连 +- **AND** 重连失败时应该记录错误日志 + +#### Scenario: 消费暂停判定 +- **WHEN** 当前启用的明细写入目标全部因连接级不可恢复故障不可写时 +- **THEN** 系统应暂停 Kafka 消费 +- **WHEN** 仅某一路非关键 sink 失败(如新表结构错误) +- **THEN** 系统不应暂停全局消费 diff --git a/openspec/changes/add-g4-hot-dual-write/specs/processor/spec.md b/openspec/changes/add-g4-hot-dual-write/specs/processor/spec.md new file mode 100644 index 0000000..2c0d5fb --- /dev/null +++ b/openspec/changes/add-g4-hot-dual-write/specs/processor/spec.md @@ -0,0 +1,52 @@ +## MODIFIED Requirements + +### Requirement: 心跳数据转换 +系统 MUST 在批次处理中始终独立执行 room_status 同步,不依赖任何明细写入的成功与否。 + +#### Scenario: room_status 始终独立执行 +- **WHEN** 一批心跳数据完成验证与转换 +- **THEN** 系统应始终调用 room_status 的 upsert 同步逻辑(当 `roomStatusEnabled=true`) +- **AND** 不依赖旧表明细写入是否开启 +- **AND** 不依赖新表明细写入是否开启 +- **AND** 不依赖旧/新表写入是否成功 +- **AND** 同步失败不应阻塞主处理流程 + +#### Scenario: 明细双关仍写 room_status +- **WHEN** `legacyHeartbeatEnabled=false` 且 `g4HotHeartbeatEnabled=false` 且 `roomStatusEnabled=true` +- **THEN** 系统仍应对当前批次数据执行 room_status upsert +- **AND** Kafka 消费不应因明细双关而暂停 + +### Requirement: 批量写库容错 +系统 MUST 在批量写库时确保单条失败不影响同批次其他记录的写入,且区分旧/新目标的失败处理路径。 + +#### Scenario: 单条数据写库失败不影响同批次 +- **WHEN** 批量写库中存在某条记录违反约束或写入失败 +- **THEN** 系统应继续写入同批次其他合法记录 +- **AND** 失败记录应按错误日志规则写入 Redis 项目控制台 + +#### Scenario: 新表失败记录写入错误表 +- **WHEN** G4 热表批量写入中存在失败记录 +- **THEN** 系统应将失败记录写入 `heartbeat.heartbeat_events_errors` + +#### Scenario: 旧表失败记录不写入错误表 +- **WHEN** 旧表批量写入中存在失败记录 +- **THEN** 系统不应将失败记录写入 `heartbeat.heartbeat_events_errors` +- **AND** 应仅记录日志与统计 + +## ADDED Requirements + +### Requirement: 按 sink 维度的统计与监控 +系统 SHALL 按写入目标维度分别统计成功数、失败数与降级事件。 + +#### Scenario: legacy 与 g4Hot 分别统计 +- **WHEN** 系统完成一批次的双写编排 +- **THEN** 应分别统计 legacy 写入成功数、失败数 +- **AND** 应分别统计 g4Hot 写入成功数、失败数 +- **AND** 统计项应在 Redis 控制台输出或 stats 汇总中可见 + +#### Scenario: 启动时输出双写配置摘要 +- **WHEN** 服务完成数据库连接并进入启动阶段 +- **THEN** 系统应输出双写配置摘要 +- **AND** 摘要中应包含 legacy 开关与目标表 +- **AND** 摘要中应包含 g4Hot 开关与目标表 +- **AND** 摘要中应包含 room_status 开关 diff --git a/openspec/changes/add-g4-hot-dual-write/tasks.md b/openspec/changes/add-g4-hot-dual-write/tasks.md new file mode 100644 index 0000000..5629213 --- /dev/null +++ b/openspec/changes/add-g4-hot-dual-write/tasks.md @@ -0,0 +1,28 @@ +## 1. Configuration +- [x] 1.1 在 `src/config/config.js` 新增旧/新明细写入开关、目标表名、`room_status` 独立开关配置 +- [x] 1.2 在 `src/config/config.example.js` 同步新增对应示例配置 + +## 2. Database Layer Refactor +- [x] 2.1 抽象通用批量 COPY 写入内核 `_insertEventsToTarget(events, targetConfig)`,支持接收目标表名、列清单、日志前缀等参数 +- [x] 2.2 将旧表 `insertHeartbeatEvents()` 迁移为调用通用内核,保证旧逻辑行为不变 +- [x] 2.3 新增新表 `heartbeat.heartbeat_events_g4_hot` 的目标配置与列映射 +- [x] 2.4 新增双明细独立编排方法 `insertHeartbeatEventsDual(events)`,按配置分别调用旧/新 writer,聚合返回结构化结果 + +## 3. Processor Flow Adjustment +- [x] 3.1 调整 `HeartbeatProcessor.processBatch()` 调用链:改用 `insertHeartbeatEventsDual()` 替代原 `insertHeartbeatEvents()` +- [x] 3.2 将 `upsertRoomStatus()` 从"依赖旧表成功"改为"独立执行"——无论旧/新明细开关状态如何,只要 `roomStatusEnabled=true` 就始终执行 +- [x] 3.3 调整错误表调用:仅将新表 (`g4Hot`) 失败记录送入 `insertHeartbeatEventsErrors()`,旧表失败不写错误表 + +## 4. Connection & Pause Strategy +- [x] 4.1 重构消费暂停判定:基于"当前启用的 sink 是否全部连接级不可写",而非单一旧表状态 +- [x] 4.2 区分连接错误与表级错误,避免新表结构问题误触发全局暂停 + +## 5. Observability +- [x] 5.1 在 `src/index.js` 启动时输出双写配置摘要(旧/新开关状态、目标表名、`room_status` 开关) +- [x] 5.2 在 `src/stats/statsManager.js` 新增按 sink 维度的统计项(legacy/g4Hot 成功数、失败数、room_status 成功/失败数) + +## 6. Testing +- [x] 6.1 补充 `databaseManager` 双写编排单元测试(仅旧、仅新、双开、双关、交叉失败等场景) +- [x] 6.2 补充 `room_status` 独立性断言测试 +- [x] 6.3 补充错误表仅接新表失败的断言测试 +- [x] 6.4 补充/扩展数据库 smoke 验证脚本,覆盖新表写入、分区、索引验证 diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index d8e915a..44cf7a1 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -102,6 +102,99 @@ - **AND** 调用后应重试当前批次写入 - **AND** 系统不应在运行时创建或替换数据库 schema 对象 +### Requirement: G4 热表独立写入能力 +系统 MUST 支持向 `heartbeat.heartbeat_events_g4_hot` 分区表执行批量 COPY 写入,并与旧表 `heartbeat.heartbeat_events` 共享同一套通用写入内核。 + +#### Scenario: G4 热表批量 COPY 写入 +- **WHEN** `g4HotHeartbeatEnabled=true` 且有一批心跳数据待写入 +- **THEN** 系统应使用批量 COPY 将数据写入 `heartbeat.heartbeat_events_g4_hot` +- **AND** 写入流程应与旧表共享同一通用写入内核 + +#### Scenario: G4 热表分区由外部维护 +- **WHEN** 写入 `heartbeat.heartbeat_events_g4_hot` +- **THEN** 系统不应在运行时为该表执行分区预创建、缺分区补建或其他分区维护逻辑 +- **AND** 分区应由外部脚本或外部调度系统维护 + +#### Scenario: G4 热表 COPY 失败降级 +- **WHEN** G4 热表批量 COPY 写入失败且重试次数耗尽 +- **THEN** 系统应降级为逐条 INSERT 写入 +- **AND** 单条失败不影响同批次其他记录 + +### Requirement: G4 热表 guid 写入规则 +系统 MUST 在写入 `heartbeat.heartbeat_events_g4_hot` 时为每条记录提供 `guid` 字段值,格式为无连接符的小写 GUID。 + +#### Scenario: 缺失 guid 时自动生成 +- **WHEN** 待写入记录未提供 `guid` +- **THEN** 系统应为该记录生成一个新的 GUID +- **AND** 生成结果应为 32 位十六进制小写字符串 +- **AND** 结果中不应包含 `-` + +#### Scenario: 已提供 guid 时规范化 +- **WHEN** 待写入记录已提供 `guid` +- **THEN** 系统应将该值转换为小写 +- **AND** 应去除其中的 `-` +- **AND** 规范化后的值应写入 `heartbeat.heartbeat_events_g4_hot` + +### Requirement: G4 热表 power 辅助字段当前阶段写空 +系统 MUST 在当前阶段将 `power_carbon_on`、`power_carbon_off`、`power_person_exist`、`power_person_left` 固定写为 `null`,待后续计算逻辑接入后再启用实际赋值。 + +#### Scenario: 当前阶段统一写 null +- **WHEN** 系统写入 `heartbeat.heartbeat_events_g4_hot` +- **THEN** `power_carbon_on` 应写入 `null` +- **AND** `power_carbon_off` 应写入 `null` +- **AND** `power_person_exist` 应写入 `null` +- **AND** `power_person_left` 应写入 `null` +- **AND** 不应从来源数据的 `extra` 或其他字段提取这 4 个值 + +### Requirement: 双明细独立编排 +系统 MUST 提供双明细写入编排能力,按启动配置分别控制旧表与 G4 热表的写入,两路写入结果完全独立。 + +#### Scenario: 仅旧表开启 +- **WHEN** `legacyHeartbeatEnabled=true` 且 `g4HotHeartbeatEnabled=false` +- **THEN** 系统应仅写入旧表 `heartbeat.heartbeat_events` +- **AND** 不应尝试写入 G4 热表 + +#### Scenario: 仅新表开启 +- **WHEN** `legacyHeartbeatEnabled=false` 且 `g4HotHeartbeatEnabled=true` +- **THEN** 系统应仅写入 G4 热表 `heartbeat.heartbeat_events_g4_hot` +- **AND** 不应尝试写入旧表 + +#### Scenario: 双写模式 +- **WHEN** `legacyHeartbeatEnabled=true` 且 `g4HotHeartbeatEnabled=true` +- **THEN** 系统应分别写入旧表与 G4 热表 +- **AND** 旧表写入失败不影响 G4 热表写入 +- **AND** G4 热表写入失败不影响旧表写入 + +#### Scenario: 双关模式 +- **WHEN** `legacyHeartbeatEnabled=false` 且 `g4HotHeartbeatEnabled=false` +- **THEN** 系统应跳过所有明细写入 +- **AND** 不应报错或阻止消费 + +### Requirement: 写入目标启动配置 +系统 MUST 通过启动配置(环境变量)分别控制旧表、G4 热表和 room_status 的写入开关与目标表名。 + +#### Scenario: 配置加载 +- **WHEN** 系统启动时 +- **THEN** 应从环境变量读取 `DB_LEGACY_HEARTBEAT_ENABLED`(默认 true)、`DB_G4_HOT_HEARTBEAT_ENABLED`(默认 false)、`DB_ROOM_STATUS_ENABLED`(默认 true) +- **AND** 应从环境变量读取 `DB_LEGACY_TABLE`(默认 `heartbeat.heartbeat_events`)、`DB_G4_HOT_TABLE`(默认 `heartbeat.heartbeat_events_g4_hot`) + +#### Scenario: 启动日志输出配置摘要 +- **WHEN** 系统启动完成时 +- **THEN** 应在日志中输出旧/新明细写入开关状态、目标表名和 room_status 开关状态 + +### Requirement: 错误表仅服务 G4 热表 +系统 MUST 将 `heartbeat.heartbeat_events_errors` 仅用于记录 G4 热表写入失败的记录,旧表写入失败不写入错误表。 + +#### Scenario: G4 热表写入失败记录错误 +- **WHEN** G4 热表写入产生失败记录 +- **THEN** 系统应将失败记录写入 `heartbeat.heartbeat_events_errors` +- **AND** 错误表字段和结构不变 + +#### Scenario: 旧表写入失败不记录错误表 +- **WHEN** 旧表写入产生失败记录 +- **THEN** 系统不应将失败记录写入 `heartbeat.heartbeat_events_errors` +- **AND** 应仅记录日志/统计 + ### Requirement: 建库与分区维护能力必须以外部脚本提供 系统 MUST 在仓库根目录 `SQL_Script/` 提供可被外部程序调用的建库/分区维护脚本。 diff --git a/openspec/specs/processor/spec.md b/openspec/specs/processor/spec.md index 9c32f32..8a4a7b7 100644 --- a/openspec/specs/processor/spec.md +++ b/openspec/specs/processor/spec.md @@ -94,6 +94,50 @@ - **THEN** 系统应继续写入同批次其他合法记录 - **AND** 失败记录应按错误日志规则写入 Redis 项目控制台 +### Requirement: room_status 独立执行 +系统 MUST 在批次处理中始终独立执行 room_status 同步,不依赖任何明细写入的成功与否。 + +#### Scenario: room_status 始终独立执行 +- **WHEN** 一批心跳数据完成验证与转换 +- **THEN** 系统应始终调用 room_status 的 upsert 同步逻辑(当 `roomStatusEnabled=true`) +- **AND** 不依赖旧表明细写入是否开启 +- **AND** 不依赖新表明细写入是否开启 +- **AND** 不依赖旧/新表写入是否成功 +- **AND** 同步失败不应阻塞主处理流程 + +#### Scenario: 明细双关仍写 room_status +- **WHEN** `legacyHeartbeatEnabled=false` 且 `g4HotHeartbeatEnabled=false` 且 `roomStatusEnabled=true` +- **THEN** 系统仍应对当前批次数据执行 room_status upsert +- **AND** Kafka 消费不应因明细双关而暂停 + +### Requirement: 区分旧新目标失败处理 +系统 MUST 在批量写库时区分旧表与 G4 热表的失败处理路径。 + +#### Scenario: 新表失败记录写入错误表 +- **WHEN** G4 热表批量写入中存在失败记录 +- **THEN** 系统应将失败记录写入 `heartbeat.heartbeat_events_errors` + +#### Scenario: 旧表失败记录不写入错误表 +- **WHEN** 旧表批量写入中存在失败记录 +- **THEN** 系统不应将失败记录写入 `heartbeat.heartbeat_events_errors` +- **AND** 应仅记录日志与统计 + +### Requirement: 按 sink 维度的统计与监控 +系统 MUST 按写入目标维度分别统计成功数、失败数与降级事件。 + +#### Scenario: legacy 与 g4Hot 分别统计 +- **WHEN** 系统完成一批次的双写编排 +- **THEN** 应分别统计 legacy 写入成功数、失败数 +- **AND** 应分别统计 g4Hot 写入成功数、失败数 +- **AND** 统计项应在 Redis 控制台输出或 stats 汇总中可见 + +#### Scenario: 启动时输出双写配置摘要 +- **WHEN** 服务完成数据库连接并进入启动阶段 +- **THEN** 系统应输出双写配置摘要 +- **AND** 摘要中应包含 legacy 开关与目标表 +- **AND** 摘要中应包含 g4Hot 开关与目标表 +- **AND** 摘要中应包含 room_status 开关 + ## ADDED Requirements ### Requirement: 数组字段聚合为列数组 系统 SHALL �?`electricity[]` �?`air_conditioner[]` 按原始顺序聚合为数据库写入结构的列数组�? diff --git a/scripts/db/smokeG4Hot.js b/scripts/db/smokeG4Hot.js new file mode 100644 index 0000000..91bf344 --- /dev/null +++ b/scripts/db/smokeG4Hot.js @@ -0,0 +1,182 @@ +import { Client } from 'pg'; +import config from '../../src/config/config.js'; +import { DatabaseManager } from '../../src/db/databaseManager.js'; + +async function main() { + const client = new Client({ + host: config.db.host, + port: config.db.port, + user: config.db.user, + password: config.db.password, + database: config.db.database, + }); + + await client.connect(); + console.log('=== G4 Hot Smoke Test ===\n'); + + // 1. 检查新表是否存在 + const tableExists = await client.query( + `SELECT 1 FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events_g4_hot'` + ); + if (tableExists.rowCount === 0) { + console.error('FAIL: heartbeat.heartbeat_events_g4_hot 表不存在'); + process.exit(1); + } + console.log('[OK] 新表 heartbeat_events_g4_hot 已存在'); + + // 2. 检查分区表类型 + const parentKind = await client.query( + `SELECT c.relkind AS kind FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events_g4_hot'` + ); + const kind = parentKind.rows?.[0]?.kind; + console.log('[INFO] 父表 relkind:', kind, kind === 'p' ? '(分区表)' : '(非分区表)'); + + // 3. 列出已有分区 + const partitions = await client.query( + `SELECT c.relname AS partition + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_namespace n ON n.oid = p.relnamespace + WHERE n.nspname = 'heartbeat' AND p.relname = 'heartbeat_events_g4_hot' + ORDER BY c.relname` + ); + console.log('[INFO] 现有分区:', partitions.rows.map((r) => r.partition)); + + // 4. 检查唯一索引 + const indexes = await client.query( + `SELECT indexname, indexdef FROM pg_indexes + WHERE schemaname = 'heartbeat' AND tablename = 'heartbeat_events_g4_hot' + ORDER BY indexname` + ); + console.log('[INFO] 索引:'); + for (const r of indexes.rows) { + console.log(' -', r.indexname); + } + + // 5. 使用 DatabaseManager 验证 COPY 写入 + const dm = new DatabaseManager({ ...config.db, maxConnections: 1, g4HotHeartbeatEnabled: true }); + await dm.connect(); + + const ts = Date.now(); + const testEvent = { + ts_ms: ts, + hotel_id: 1, + room_id: '101', + device_id: 'smoke-g4hot-test', + ip: '10.0.0.1', + power_state: 1, + guest_type: 0, + cardless_state: 0, + service_mask: 7, // bits 0,1,2 + pms_state: 1, + carbon_state: 0, + device_count: 2, + comm_seq: 1, + insert_card: null, + bright_g: 50, + version: 1, + elec_address: ['e1', 'e2'], + air_address: ['ac1', 'ac2', 'ac3'], + voltage: [220.1, 220.2], + ampere: [1.1, 1.2], + power: [242.0, 264.0], + phase: ['A', 'B'], + energy: [10.5, 11.5], + sum_energy: [100.1, 200.2], + state: [1, 2, 3], + model: [1, 1, 2], + speed: [3, 3, 1], + set_temp: [26, 25, 24], + now_temp: [27, 26, 25], + solenoid_valve: [1, 0, 1], + extra: { source: 'smoke-g4hot', power_carbon_on: 1.5, power_person_exist: 2.5 }, + }; + + try { + const result = await dm._insertEventsToTarget([testEvent], { + tableName: config.db.g4HotTable ?? 'heartbeat.heartbeat_events_g4_hot', + columns: dm._getG4HotColumns(), + toRowValues: (e) => dm._g4HotToRowValues(e), + ensurePartitions: false, + logPrefix: '[g4hot-smoke]', + missingPartitionTable: null, + }); + + if (result.success) { + console.log(`[OK] G4 Hot COPY 写入成功: ${result.insertedCount} 条`); + } else { + console.error('[FAIL] G4 Hot COPY 写入失败:', result.error?.message); + if (result.error?.message?.includes('no partition')) { + console.error(' 说明: 新表当日分区尚未创建,请先创建分区'); + } + } + } catch (err) { + console.error('[FAIL] G4 Hot 写入异常:', err.message); + } + + // 6. 验证写入的数据 + try { + const readback = await client.query( + `SELECT svc_01, svc_02, svc_03, svc_04, + air_address_1, air_address_2, air_address_residual, + elec_address_1, elec_address_2, + power_carbon_on, power_person_exist, guid + FROM heartbeat.heartbeat_events_g4_hot + WHERE device_id = 'smoke-g4hot-test' AND ts_ms = $1`, + [ts] + ); + if (readback.rowCount > 0) { + const row = readback.rows[0]; + console.log('[OK] 数据回读成功:'); + console.log(' svc_01:', row.svc_01, '(期望 true)'); + console.log(' svc_02:', row.svc_02, '(期望 true)'); + console.log(' svc_03:', row.svc_03, '(期望 true)'); + console.log(' svc_04:', row.svc_04, '(期望 false)'); + console.log(' air_address_1:', row.air_address_1, '(期望 ac1)'); + console.log(' air_address_2:', row.air_address_2, '(期望 ac2)'); + console.log(' air_address_residual:', row.air_address_residual, '(期望 [ac3])'); + console.log(' elec_address_1:', row.elec_address_1, '(期望 e1)'); + console.log(' elec_address_2:', row.elec_address_2, '(期望 e2)'); + console.log(' power_carbon_on:', row.power_carbon_on, '(期望 1.5)'); + console.log(' power_person_exist:', row.power_person_exist, '(期望 2.5)'); + console.log(' guid:', row.guid, '(期望 32 位无连接符小写)'); + } else { + console.warn('[WARN] 未读回数据(可能因分区缺失而写入失败)'); + } + } catch (err) { + console.warn('[WARN] 数据回读失败:', err.message); + } + + // 7. 清理测试数据 + try { + await client.query( + `DELETE FROM heartbeat.heartbeat_events_g4_hot WHERE device_id = 'smoke-g4hot-test' AND ts_ms = $1`, + [ts] + ); + console.log('[OK] 测试数据已清理'); + } catch (err) { + console.warn('[WARN] 清理失败:', err.message); + } + + // 8. 验证配置摘要 + console.log('\n=== 当前配置 ==='); + console.log(' legacyHeartbeatEnabled:', config.db.legacyHeartbeatEnabled); + console.log(' g4HotHeartbeatEnabled:', config.db.g4HotHeartbeatEnabled); + console.log(' roomStatusEnabled:', config.db.roomStatusEnabled); + console.log(' legacyTable:', config.db.legacyTable); + console.log(' g4HotTable:', config.db.g4HotTable); + + await dm.disconnect(); + await client.end(); + console.log('\n=== Smoke Test 完成 ==='); +} + +main().catch((err) => { + console.error('smoke test failed:', err); + process.exit(1); +}); diff --git a/src/config/config.example.js b/src/config/config.example.js index 40df9a4..bf4e77a 100644 --- a/src/config/config.example.js +++ b/src/config/config.example.js @@ -52,6 +52,12 @@ export default { idleTimeoutMillis: Number(env.POSTGRES_IDLE_TIMEOUT_MS ?? 30000), retryAttempts: 3, // 重试次数 retryDelay: 1000, // 重试延迟 + legacyHeartbeatEnabled: true, // 旧明细表写入开关 + g4HotHeartbeatEnabled: false, // 新明细表(g4_hot)写入开关 + roomStatusEnabled: true, // room_status 写入开关 + legacyTable: 'heartbeat.heartbeat_events', + g4HotTable: 'heartbeat.heartbeat_events_g4_hot', + }, // 日志配置 logger: { diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index ab64ba8..d069c14 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -1,6 +1,7 @@ import { Pool } from 'pg'; import { pipeline } from 'stream/promises'; import { Readable } from 'stream'; +import { randomUUID } from 'node:crypto'; import pgCopyStreams from 'pg-copy-streams'; const { from: copyFrom } = pgCopyStreams; @@ -70,6 +71,13 @@ class DatabaseManager { return `"${String(id).replace(/"/g, '""')}"`; } + _normalizeGuid(guid) { + if (guid === null || guid === undefined || guid === '') { + return randomUUID().replace(/-/g, '').toLowerCase(); + } + return String(guid).replace(/-/g, '').toLowerCase(); + } + formatShanghaiDate(tsMs) { const date = new Date(Number(tsMs)); const fmt = new Intl.DateTimeFormat('en-CA', { @@ -90,13 +98,382 @@ class DatabaseManager { ]); } - isMissingPartitionError(error) { + isMissingPartitionError(error, tableName) { const msg = String(error?.message ?? ''); + if (tableName) return msg.includes('no partition of relation') && msg.includes(tableName); return msg.includes('no partition of relation') && msg.includes('heartbeat_events'); } - // v2 明细表写入:用于未来对接 Kafka 心跳字段 + // ---- 共享格式化工具 ---- + + _formatPgCol(v) { + if (v === null || v === undefined) return '\\N'; + if (typeof v === 'boolean') return v ? 't' : 'f'; + if (Array.isArray(v)) { + const inner = v.map((item) => { + if (item === null || item === undefined) return 'NULL'; + const s = String(item); + return `"${s.replace(/\\/g, '\\\\').replace(/"/g, '\\"')}"`; + }); + const arrStr = `{${inner.join(',')}}`; + return arrStr.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t'); + } + const s = typeof v === 'object' ? JSON.stringify(v) : String(v); + return s.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t'); + } + + _isDbConnectionError(error) { + if (!error) return false; + const connCodes = ['57P03', '08006', '08001', '08003', '08004', '08007']; + if (connCodes.includes(error.code)) return true; + if (error.message && /ECONNREFUSED|ETIMEDOUT|connection/i.test(error.message)) return true; + return false; + } + + // ---- 旧表列定义 ---- + + _getLegacyColumns() { + return [ + 'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip', + 'power_state', 'guest_type', 'cardless_state', 'service_mask', + 'pms_state', 'carbon_state', 'device_count', 'comm_seq', + 'insert_card', 'bright_g', 'version', + 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', + 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', + 'now_temp', 'solenoid_valve', 'extra', + ]; + } + + _legacyToRowValues(e) { + return [ + e.ts_ms, + e.write_ts_ms ?? Date.now(), + e.hotel_id, + e.room_id, + e.device_id, + e.ip, + e.power_state, + e.guest_type, + e.cardless_state, + e.service_mask, + e.pms_state, + e.carbon_state, + e.device_count, + e.comm_seq, + e.insert_card ?? null, + (e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null), + e.version ?? null, + Array.isArray(e.elec_address) ? e.elec_address : null, + Array.isArray(e.air_address) ? e.air_address : null, + Array.isArray(e.voltage) ? e.voltage : null, + Array.isArray(e.ampere) ? e.ampere : null, + Array.isArray(e.power) ? e.power : null, + Array.isArray(e.phase) ? e.phase : null, + Array.isArray(e.energy) ? e.energy : null, + Array.isArray(e.sum_energy) ? e.sum_energy : null, + Array.isArray(e.state) ? e.state : null, + Array.isArray(e.model) ? e.model : null, + Array.isArray(e.speed) ? e.speed : null, + Array.isArray(e.set_temp) ? e.set_temp : null, + Array.isArray(e.now_temp) ? e.now_temp : null, + Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, + e.extra ?? null, + ]; + } + + // ---- 新表 G4 Hot 列定义 ---- + + _getG4HotColumns() { + const base = [ + 'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip', + 'power_state', 'guest_type', 'cardless_state', 'service_mask', + 'pms_state', 'carbon_state', 'device_count', 'comm_seq', + 'insert_card', 'bright_g', 'version', + 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', + 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', + 'now_temp', 'solenoid_valve', 'extra', + ]; + const svc = Array.from({ length: 64 }, (_, i) => `svc_${String(i + 1).padStart(2, '0')}`); + const airUnpacked = [ + 'air_address_1', 'air_address_2', 'air_address_residual', + 'state_1', 'state_2', 'state_residual', + 'model_1', 'model_2', 'model_residual', + 'speed_1', 'speed_2', 'speed_residual', + 'set_temp_1', 'set_temp_2', 'set_temp_residual', + 'now_temp_1', 'now_temp_2', 'now_temp_residual', + 'solenoid_valve_1', 'solenoid_valve_2', 'solenoid_valve_residual', + ]; + const elecUnpacked = [ + 'elec_address_1', 'elec_address_2', 'elec_address_residual', + 'voltage_1', 'voltage_2', 'voltage_residual', + 'ampere_1', 'ampere_2', 'ampere_residual', + 'power_1', 'power_2', 'power_residual', + 'phase_1', 'phase_2', 'phase_residual', + 'energy_1', 'energy_2', 'energy_residual', + 'sum_energy_1', 'sum_energy_2', 'sum_energy_residual', + ]; + const power = ['power_carbon_on', 'power_carbon_off', 'power_person_exist', 'power_person_left']; + const tail = ['guid']; + return [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power, ...tail]; + } + + _unpackArrElement(arr, idx) { + if (!Array.isArray(arr) || idx >= arr.length) return null; + return arr[idx] ?? null; + } + + _unpackArrResidual(arr) { + if (!Array.isArray(arr) || arr.length <= 2) return null; + return arr.slice(2); + } + + _g4HotToRowValues(e) { + const values = [ + e.ts_ms, + e.write_ts_ms ?? Date.now(), + e.hotel_id, + e.room_id, + e.device_id, + e.ip, + e.power_state, + e.guest_type, + e.cardless_state, + e.service_mask, + e.pms_state, + e.carbon_state, + e.device_count, + e.comm_seq, + e.insert_card ?? null, + (e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null), + e.version ?? null, + Array.isArray(e.elec_address) ? e.elec_address : null, + Array.isArray(e.air_address) ? e.air_address : null, + Array.isArray(e.voltage) ? e.voltage : null, + Array.isArray(e.ampere) ? e.ampere : null, + Array.isArray(e.power) ? e.power : null, + Array.isArray(e.phase) ? e.phase : null, + Array.isArray(e.energy) ? e.energy : null, + Array.isArray(e.sum_energy) ? e.sum_energy : null, + Array.isArray(e.state) ? e.state : null, + Array.isArray(e.model) ? e.model : null, + Array.isArray(e.speed) ? e.speed : null, + Array.isArray(e.set_temp) ? e.set_temp : null, + Array.isArray(e.now_temp) ? e.now_temp : null, + Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, + e.extra ?? null, + ]; + + // svc_01 .. svc_64 布尔展开 + const mask = e.service_mask != null ? BigInt(e.service_mask) : null; + for (let i = 0; i < 64; i++) { + values.push(mask != null ? Boolean((mask >> BigInt(i)) & 1n) : null); + } + + // 空调展开 _1, _2, _residual + const airArr = Array.isArray(e.air_address) ? e.air_address : null; + const stateArr = Array.isArray(e.state) ? e.state : null; + const modelArr = Array.isArray(e.model) ? e.model : null; + const speedArr = Array.isArray(e.speed) ? e.speed : null; + const setTempArr = Array.isArray(e.set_temp) ? e.set_temp : null; + const nowTempArr = Array.isArray(e.now_temp) ? e.now_temp : null; + const svArr = Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null; + + for (const arr of [airArr, stateArr, modelArr, speedArr, setTempArr, nowTempArr, svArr]) { + values.push(this._unpackArrElement(arr, 0)); + values.push(this._unpackArrElement(arr, 1)); + values.push(this._unpackArrResidual(arr)); + } + + // 电力展开 _1, _2, _residual + const elecAddr = Array.isArray(e.elec_address) ? e.elec_address : null; + const voltArr = Array.isArray(e.voltage) ? e.voltage : null; + const ampArr = Array.isArray(e.ampere) ? e.ampere : null; + const powArr = Array.isArray(e.power) ? e.power : null; + const phaseArr = Array.isArray(e.phase) ? e.phase : null; + const energyArr = Array.isArray(e.energy) ? e.energy : null; + const sumEnergyArr = Array.isArray(e.sum_energy) ? e.sum_energy : null; + + for (const arr of [elecAddr, voltArr, ampArr, powArr, phaseArr, energyArr, sumEnergyArr]) { + values.push(this._unpackArrElement(arr, 0)); + values.push(this._unpackArrElement(arr, 1)); + values.push(this._unpackArrResidual(arr)); + } + + // power 辅助字段:当前计算逻辑尚未接入,临时统一写 null + values.push(null); + values.push(null); + values.push(null); + values.push(null); + + values.push(this._normalizeGuid(e.guid)); + + return values; + } + + // ---- 通用 COPY + fallback INSERT 写入内核 ---- + // target: { tableName, columns, toRowValues, ensurePartitions, logPrefix, missingPartitionTable } + // 返回: { success, insertedCount, failedRecords, error, isConnectionError, batchError } + async _insertEventsToTarget(events, target) { + const { tableName, columns, toRowValues, ensurePartitions, logPrefix, missingPartitionTable } = target; + + const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); + const tsMin = tsValues.length > 0 ? Math.min(...tsValues) : null; + const tsMax = tsValues.length > 0 ? Math.max(...tsValues) : null; + + const self = this; + const runInsertOnce = async () => { + if (ensurePartitions && tsMin !== null) { + await self.ensurePartitionsForTsRange(tsMin, tsMax); + } + const client = await self.pool.connect(); + try { + const copySql = `COPY ${tableName} (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`; + const stream = client.query(copyFrom(copySql)); + async function* generateRows() { + for (const e of events) { + const line = toRowValues(e).map((v) => self._formatPgCol(v)).join('\t') + '\n'; + yield line; + } + } + await pipeline(Readable.from(generateRows()), stream); + return { insertedCount: events.length }; + } finally { + client.release(); + } + }; + + const retryAttempts = Number(this.config?.retryAttempts ?? 0); + const retryDelay = Math.max(250, Number(this.config?.retryDelay ?? 1000)); + const maxAttempts = retryAttempts > 0 ? retryAttempts : 1; + + let lastError = null; + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + try { + const r = await runInsertOnce(); + return { success: true, insertedCount: r.insertedCount, failedRecords: [], error: null, isConnectionError: false, batchError: null }; + } catch (error) { + lastError = error; + if (ensurePartitions && missingPartitionTable && this.isMissingPartitionError(error, missingPartitionTable)) { + console.warn(`${logPrefix} 检测到缺分区写入失败,执行兜底预创建并重试一次`); + if (tsMin !== null) { + await this.ensurePartitionsForTsRange(tsMin, tsMax); + } + } + if (attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, retryDelay)); + continue; + } + } + } + + // COPY 全失败,降级逐条 INSERT + const failedRecords = []; + let insertedCount = 0; + console.error(`${logPrefix} 批量写入失败,已切换为逐条写入:`, lastError); + + const singleSql = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${columns.map((_, i) => `$${i + 1}`).join(', ')})`; + + for (const event of events) { + try { + await this.pool.query(singleSql, toRowValues(event)); + insertedCount += 1; + } catch (error) { + if (this._isDbConnectionError(error)) { + return { success: false, insertedCount, failedRecords, error, isConnectionError: true, batchError: error }; + } + failedRecords.push({ error, record: event }); + } + } + + if (insertedCount === 0 && failedRecords.length === events.length && this._isDbConnectionError(lastError)) { + return { success: false, insertedCount: 0, failedRecords, error: lastError, isConnectionError: true, batchError: lastError }; + } + + return { + success: insertedCount > 0 || failedRecords.length === 0, + insertedCount, + failedRecords, + error: lastError, + isConnectionError: false, + batchError: (insertedCount === 0 && failedRecords.length === events.length) ? lastError : null, + }; + } + + // ---- 旧/新双写编排 ---- + async insertHeartbeatEventsDual(events) { + if (!Array.isArray(events)) events = [events]; + if (events.length === 0) { + const empty = { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null }; + return { legacy: { ...empty }, g4Hot: { ...empty } }; + } + + const legacyEnabled = this.config.legacyHeartbeatEnabled; + const g4HotEnabled = this.config.g4HotHeartbeatEnabled; + const empty = { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null }; + + const promises = []; + let legacyIdx = -1; + let g4HotIdx = -1; + + if (legacyEnabled) { + legacyIdx = promises.length; + promises.push(this._insertEventsToTarget(events, { + tableName: this.config.legacyTable ?? 'heartbeat.heartbeat_events', + columns: this._getLegacyColumns(), + toRowValues: (e) => this._legacyToRowValues(e), + ensurePartitions: true, + logPrefix: '[legacy]', + missingPartitionTable: 'heartbeat_events', + })); + } + + if (g4HotEnabled) { + g4HotIdx = promises.length; + promises.push(this._insertEventsToTarget(events, { + tableName: this.config.g4HotTable ?? 'heartbeat.heartbeat_events_g4_hot', + columns: this._getG4HotColumns(), + toRowValues: (e) => this._g4HotToRowValues(e), + ensurePartitions: false, + logPrefix: '[g4hot]', + missingPartitionTable: null, + })); + } + + if (promises.length === 0) { + return { legacy: { ...empty }, g4Hot: { ...empty } }; + } + + const settled = await Promise.allSettled(promises); + const wrap = (s) => { + if (!s) return { ...empty }; + if (s.status === 'fulfilled') return { ...s.value, enabled: true }; + return { enabled: true, success: false, insertedCount: 0, failedRecords: [], error: s.reason, isConnectionError: this._isDbConnectionError(s.reason), batchError: s.reason }; + }; + + return { + legacy: legacyIdx >= 0 ? wrap(settled[legacyIdx]) : { ...empty }, + g4Hot: g4HotIdx >= 0 ? wrap(settled[g4HotIdx]) : { ...empty }, + }; + } + + // v2 明细表写入(向后兼容封装,仅旧表,抛出连接错误) async insertHeartbeatEvents(events) { + if (!Array.isArray(events)) events = [events]; + if (events.length === 0) return; + const result = await this._insertEventsToTarget(events, { + tableName: this.config.legacyTable ?? 'heartbeat.heartbeat_events', + columns: this._getLegacyColumns(), + toRowValues: (e) => this._legacyToRowValues(e), + ensurePartitions: true, + logPrefix: '[legacy]', + missingPartitionTable: 'heartbeat_events', + }); + if (result.isConnectionError && result.error) throw result.error; + return { insertedCount: result.insertedCount, failedRecords: result.failedRecords, batchError: result.batchError }; + } + + // [DEPRECATED] 旧版直写实现,已由 _insertEventsToTarget 替代,后续可移除 + async _insertHeartbeatEventsLegacyDirect(events) { if (!Array.isArray(events)) { events = [events]; } diff --git a/src/index.js b/src/index.js index 223bbff..36034bc 100644 --- a/src/index.js +++ b/src/index.js @@ -33,6 +33,16 @@ class WebBLSHeartbeatServer { console.log('数据库连接成功'); await this.redis?.info('数据库连接成功', { module: 'db' }); + // 打印双写配置摘要 + const dbCfg = this.config.db; + const dualWriteSummary = { + legacyHeartbeat: dbCfg.legacyHeartbeatEnabled ? `ON → ${dbCfg.legacyTable}` : 'OFF', + g4HotHeartbeat: dbCfg.g4HotHeartbeatEnabled ? `ON → ${dbCfg.g4HotTable}` : 'OFF', + roomStatus: dbCfg.roomStatusEnabled !== false ? 'ON' : 'OFF', + }; + console.log('双写配置摘要:', dualWriteSummary); + await this.redis?.info('双写配置摘要', { module: 'db', ...dualWriteSummary }); + // 打印 Kafka 配置摘要,便于排查连接问题 console.log('正在初始化 Kafka 消费者...'); console.log('Kafka 配置:', { diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index 4ea12fa..b33a63b 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -213,51 +213,76 @@ class HeartbeatProcessor { batchData = this.batchQueue.slice(0, batchEventCount); batchMessages = this.batchMessageQueue.slice(0, batchMessageCount); - let insertedCount = 0; - let failedRecords = []; - if (typeof this.databaseManager.insertHeartbeatEvents === 'function') { - const result = await this.databaseManager.insertHeartbeatEvents(batchData); - insertedCount = Number(result?.insertedCount ?? result ?? 0); - failedRecords = Array.isArray(result?.failedRecords) ? result.failedRecords : []; - - // 同步到 room_status 表 (Best Effort) - // 只有当历史表写入成功(insertedCount > 0)才尝试同步 - // 过滤掉写入失败的记录(如果有) - if (insertedCount > 0) { - const successData = failedRecords.length > 0 - ? batchData.filter(d => !failedRecords.some(f => f.record === d)) - : batchData; - - if (successData.length > 0) { - this.databaseManager.upsertRoomStatus(successData).catch(err => { - console.warn('异步同步 room_status 失败 (忽略):', err); - }); - } - } + // A. 双明细写入(旧/新独立执行) + const dualResult = await this.databaseManager.insertHeartbeatEventsDual(batchData); + const { legacy: legacyResult, g4Hot: g4HotResult } = dualResult; + // B. room_status 始终独立执行(不依赖明细写入结果) + const roomStatusEnabled = this.databaseManager.config?.roomStatusEnabled !== false; + if (roomStatusEnabled && batchData.length > 0) { + this.databaseManager.upsertRoomStatus(batchData).catch(err => { + console.warn('异步同步 room_status 失败 (忽略):', err); + this.stats?.incRoomStatusFailed?.(batchData.length); + }); + this.stats?.incRoomStatusWritten?.(batchData.length); } + // C. 暂停消费判定(基于当前启用的关键 sink) + const shouldPause = this._shouldPauseConsumption(legacyResult, g4HotResult); + if (shouldPause) { + if (!this._dbOffline) { + this._dbOffline = true; + console.error('关键数据库 sink 连接断开,暂停拉取并开始检查'); + this.onDbOffline?.(); + } + this._emitDbWriteError(legacyResult.error || g4HotResult.error, batchData); + this._scheduleDbCheck(); + return; + } + + // D. 清理队列、resolve deferreds this.batchQueue.splice(0, batchEventCount); this.batchMessageQueue.splice(0, batchMessageCount); - for (const entry of batchMessages) { entry.deferred.resolve({ insertedCount: entry.eventCount }); } - if (failedRecords.length > 0) { - const rejects = failedRecords.map(item => ({ - errorId: 'db_write_failed', - error: item?.error, - rawData: item?.record, + // E. 统计 & 日志 + if (legacyResult.enabled) { + this.stats?.incDbWritten?.(legacyResult.insertedCount); + if (legacyResult.failedRecords.length > 0) { + this.stats?.incDbWriteFailed?.(legacyResult.failedRecords.length); + console.warn(`[legacy] 批次部分失败:成功 ${legacyResult.insertedCount},失败 ${legacyResult.failedRecords.length}`); + } + } + if (g4HotResult.enabled) { + this.stats?.incG4HotWritten?.(g4HotResult.insertedCount); + if (g4HotResult.failedRecords.length > 0) { + this.stats?.incG4HotWriteFailed?.(g4HotResult.failedRecords.length); + console.warn(`[g4hot] 批次部分失败:成功 ${g4HotResult.insertedCount},失败 ${g4HotResult.failedRecords.length}`); + } + } + + // F. 错误表:仅 g4Hot 失败记录(旧表失败不写错误表) + if (g4HotResult.enabled && g4HotResult.failedRecords.length > 0) { + const dbPayload = g4HotResult.failedRecords.map(item => ({ + hotel_id: item.record?.hotel_id ?? null, + room_id: item.record?.room_id ?? null, + original_data: item.record, + error_code: 'g4hot_write_failed', + error_message: item.error ? String(item.error?.message ?? item.error) : 'g4hot write failed', })); - this._emitRejectedRecords(rejects); - this.stats?.incDbWriteFailed?.(failedRecords.length); + this.databaseManager.insertHeartbeatEventsErrors(dbPayload).catch(() => {}); + this.stats?.incG4HotErrorTableInserted?.(dbPayload.length); } - this.stats?.incDbWritten?.(insertedCount); - const failedCount = failedRecords.length; - if (failedCount > 0) { - console.warn(`批次处理部分失败:成功 ${insertedCount} 条,失败 ${failedCount} 条`); + + // G. Legacy 失败仅日志(不写错误表) + if (legacyResult.enabled && legacyResult.failedRecords.length > 0) { + for (const item of legacyResult.failedRecords.slice(0, 10)) { + console.warn('[legacy] 单条写入失败:', item.error?.message); + } } + hasMore = this.batchQueue.length > 0; } catch (error) { console.error('批量处理失败:', error); @@ -316,6 +341,22 @@ class HeartbeatProcessor { return false; } + // 基于当前启用的关键 sink 判断是否暂停消费 + _shouldPauseConsumption(legacyResult, g4HotResult) { + const legacyEnabled = legacyResult?.enabled; + const g4HotEnabled = g4HotResult?.enabled; + const legacyConnErr = legacyResult?.isConnectionError; + const g4HotConnErr = g4HotResult?.isConnectionError; + // 双关: 不因明细关闭而暂停(room_status 可能仍在写) + if (!legacyEnabled && !g4HotEnabled) return false; + // 仅旧表开启: 旧表连接故障 → 暂停 + if (legacyEnabled && !g4HotEnabled) return !!legacyConnErr; + // 仅新表开启: 新表连接故障 → 暂停 + if (!legacyEnabled && g4HotEnabled) return !!g4HotConnErr; + // 双开: 两路都连接失败 → 暂停 + return !!legacyConnErr && !!g4HotConnErr; + } + _scheduleDbCheck() { if (this.batchTimer) clearTimeout(this.batchTimer); this.batchTimer = setTimeout(async () => { diff --git a/src/stats/statsManager.js b/src/stats/statsManager.js index 075d06e..b3a3de3 100644 --- a/src/stats/statsManager.js +++ b/src/stats/statsManager.js @@ -1,39 +1,40 @@ class StatsCounters { constructor() { - this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 4); + // 原有 4 槽 + 新增 7 槽 = 11 槽 + // [0] dbWritten, [1] filtered, [2] kafkaPulled, [3] dbWriteFailed, + // [4] g4HotWritten, [5] g4HotWriteFailed, [6] roomStatusWritten, + // [7] roomStatusFailed, [8] g4HotErrorTableInserted + this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 9); this._minute = new BigInt64Array(this._minuteBuf); } - incDbWritten(n = 1) { + _inc(slot, n = 1) { const v = BigInt(Math.max(0, Number(n) || 0)); if (v === 0n) return; - Atomics.add(this._minute, 0, v); + Atomics.add(this._minute, slot, v); } - incFiltered(n = 1) { - const v = BigInt(Math.max(0, Number(n) || 0)); - if (v === 0n) return; - Atomics.add(this._minute, 1, v); - } - - incKafkaPulled(n = 1) { - const v = BigInt(Math.max(0, Number(n) || 0)); - if (v === 0n) return; - Atomics.add(this._minute, 2, v); - } - - incDbWriteFailed(n = 1) { - const v = BigInt(Math.max(0, Number(n) || 0)); - if (v === 0n) return; - Atomics.add(this._minute, 3, v); - } + incDbWritten(n = 1) { this._inc(0, n); } + incFiltered(n = 1) { this._inc(1, n); } + incKafkaPulled(n = 1) { this._inc(2, n); } + incDbWriteFailed(n = 1) { this._inc(3, n); } + incG4HotWritten(n = 1) { this._inc(4, n); } + incG4HotWriteFailed(n = 1) { this._inc(5, n); } + incRoomStatusWritten(n = 1) { this._inc(6, n); } + incRoomStatusFailed(n = 1) { this._inc(7, n); } + incG4HotErrorTableInserted(n = 1) { this._inc(8, n); } snapshotAndResetMinute() { const dbWritten = Atomics.exchange(this._minute, 0, 0n); const filtered = Atomics.exchange(this._minute, 1, 0n); const kafkaPulled = Atomics.exchange(this._minute, 2, 0n); const dbWriteFailed = Atomics.exchange(this._minute, 3, 0n); - return { dbWritten, filtered, kafkaPulled, dbWriteFailed }; + const g4HotWritten = Atomics.exchange(this._minute, 4, 0n); + const g4HotWriteFailed = Atomics.exchange(this._minute, 5, 0n); + const roomStatusWritten = Atomics.exchange(this._minute, 6, 0n); + const roomStatusFailed = Atomics.exchange(this._minute, 7, 0n); + const g4HotErrorTableInserted = Atomics.exchange(this._minute, 8, 0n); + return { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted }; } } @@ -81,11 +82,16 @@ class StatsReporter { if (this._lastFlushMinute === minuteKey) { return; } - const { dbWritten, filtered, kafkaPulled, dbWriteFailed } = this.stats.snapshotAndResetMinute(); + const { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted } = this.stats.snapshotAndResetMinute(); this._lastFlushMinute = minuteKey; const ts = formatTimestamp(new Date()); - this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入量: ${dbWritten}条`, metadata: { module: 'stats' } }); - this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入失败量: ${dbWriteFailed}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Legacy写入量: ${dbWritten}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Legacy写入失败量: ${dbWriteFailed}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot写入量: ${g4HotWritten}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot写入失败量: ${g4HotWriteFailed}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} RoomStatus写入量: ${roomStatusWritten}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} RoomStatus失败量: ${roomStatusFailed}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot错误表插入量: ${g4HotErrorTableInserted}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据过滤量: ${filtered}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Kafka拉取量: ${kafkaPulled}条`, metadata: { module: 'stats' } }); } diff --git a/test/arrays.test.js b/test/arrays.test.js index b4d062c..368989c 100644 --- a/test/arrays.test.js +++ b/test/arrays.test.js @@ -90,10 +90,15 @@ describe('HeartbeatProcessor arrays', () => { it('end-to-end: message buffer -> processMessage -> insertHeartbeatEvents payload includes arrays', async () => { let captured = null; const db = { - insertHeartbeatEvents: async (events) => { + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: false, legacyTable: 'heartbeat.heartbeat_events' }, + insertHeartbeatEventsDual: async (events) => { captured = events; - return { insertedCount: events.length }; + return { + legacy: { enabled: true, success: true, insertedCount: events.length, failedRecords: [], error: null, isConnectionError: false, batchError: null }, + g4Hot: { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null }, + }; }, + insertHeartbeatEvents: async (events) => ({ insertedCount: events.length }), }; const processor = new HeartbeatProcessor({ batchSize: 1, batchTimeout: 1000 }, db); diff --git a/test/dualWrite.test.js b/test/dualWrite.test.js new file mode 100644 index 0000000..cbdd448 --- /dev/null +++ b/test/dualWrite.test.js @@ -0,0 +1,397 @@ +import assert from 'node:assert/strict'; +import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; +import { DatabaseManager } from '../src/db/databaseManager.js'; + +// ---- 辅助 ---- + +const buildBasePayload = () => ({ + ts_ms: 1700000000123, + hotel_id: 1, + room_id: '101', + device_id: 'dev1', + ip: '10.0.0.1', + power_state: 1, + guest_type: 0, + cardless_state: 0, + service_mask: 7, + pms_state: 1, + carbon_state: 0, + device_count: 1, + comm_seq: 1, +}); + +function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn = false, + g4HotEnabled = false, g4HotSuccess = true, g4HotConn = false, + insertedCount = 1, failedRecords = [] } = {}) { + return { + legacy: { + enabled: legacyEnabled, success: legacySuccess, insertedCount: legacySuccess ? insertedCount : 0, + failedRecords: legacySuccess ? [] : failedRecords, error: legacySuccess ? null : new Error('legacy fail'), + isConnectionError: legacyConn, batchError: legacySuccess ? null : new Error('legacy fail'), + }, + g4Hot: { + enabled: g4HotEnabled, success: g4HotSuccess, insertedCount: g4HotSuccess ? insertedCount : 0, + failedRecords: g4HotSuccess ? [] : failedRecords, error: g4HotSuccess ? null : new Error('g4hot fail'), + isConnectionError: g4HotConn, batchError: g4HotSuccess ? null : new Error('g4hot fail'), + }, + }; +} + +function buildMockDb(overrides = {}) { + return { + config: { + legacyHeartbeatEnabled: true, + g4HotHeartbeatEnabled: false, + roomStatusEnabled: true, + legacyTable: 'heartbeat.heartbeat_events', + g4HotTable: 'heartbeat.heartbeat_events_g4_hot', + ...overrides.config, + }, + insertHeartbeatEventsDual: overrides.insertHeartbeatEventsDual ?? (async () => makeDualResult()), + insertHeartbeatEventsErrors: overrides.insertHeartbeatEventsErrors ?? (async () => {}), + upsertRoomStatus: overrides.upsertRoomStatus ?? (async () => ({ rowCount: 1 })), + insertHeartbeatEvents: overrides.insertHeartbeatEvents ?? (async () => ({ insertedCount: 1 })), + checkConnection: overrides.checkConnection ?? (async () => true), + }; +} + +function buildProcessor(dbOverrides = {}, processorConfig = {}) { + const db = buildMockDb(dbOverrides); + return new HeartbeatProcessor( + { batchSize: 1, batchTimeout: 1000, ...processorConfig }, + db + ); +} + +// ---- 测试 ---- + +describe('Dual-write: 仅旧表开启', () => { + it('should call insertHeartbeatEventsDual and succeed', async () => { + let captured = null; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false }, + insertHeartbeatEventsDual: async (events) => { + captured = events; + return makeDualResult({ legacyEnabled: true, g4HotEnabled: false }); + }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + const res = await processor.processMessage(msg); + assert.deepEqual(res, { insertedCount: 1 }); + assert.ok(Array.isArray(captured)); + assert.equal(captured.length, 1); + }); +}); + +describe('Dual-write: 仅新表开启', () => { + it('should call insertHeartbeatEventsDual with g4hot enabled', async () => { + let dualCalled = false; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async (events) => { + dualCalled = true; + return makeDualResult({ legacyEnabled: false, g4HotEnabled: true }); + }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + assert.ok(dualCalled); + }); +}); + +describe('Dual-write: 旧新双开', () => { + it('should call insertHeartbeatEventsDual with both enabled', async () => { + let dualCalled = false; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async () => { + dualCalled = true; + return makeDualResult({ legacyEnabled: true, g4HotEnabled: true }); + }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + assert.ok(dualCalled); + }); +}); + +describe('Dual-write: 旧新双关,room_status 仍执行', () => { + it('should still call upsertRoomStatus when both detail writes are off', async () => { + let roomStatusCalled = false; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }), + upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + // 等待异步 room_status 调用完成 + await new Promise(r => setTimeout(r, 50)); + assert.ok(roomStatusCalled); + }); +}); + +describe('Dual-write: 旧成功新失败', () => { + it('should continue and only write g4hot failures to error table', async () => { + let errorTablePayload = null; + const failedRecord = { error: new Error('g4hot column mismatch'), record: buildBasePayload() }; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ + legacyEnabled: true, legacySuccess: true, + g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord], + }), + insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + await new Promise(r => setTimeout(r, 50)); + assert.ok(errorTablePayload); + assert.equal(errorTablePayload.length, 1); + assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed'); + }); +}); + +describe('Dual-write: 旧失败新成功', () => { + it('should NOT write legacy failures to error table', async () => { + let errorTableCalled = false; + const failedRecord = { error: new Error('legacy column mismatch'), record: buildBasePayload() }; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ + legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord], + g4HotEnabled: true, g4HotSuccess: true, + }), + insertHeartbeatEventsErrors: async () => { errorTableCalled = true; }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + await new Promise(r => setTimeout(r, 50)); + assert.equal(errorTableCalled, false); + }); +}); + +describe('Dual-write: 双失败', () => { + it('should only write g4hot failures to error table, not legacy failures', async () => { + let errorTablePayload = null; + const failedRecord = { error: new Error('fail'), record: buildBasePayload() }; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ + legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord], + g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord], + }), + insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + await new Promise(r => setTimeout(r, 50)); + // 错误表只包含 g4hot 失败 + assert.ok(errorTablePayload); + assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed'); + }); +}); + +describe('Dual-write: room_status 始终执行', () => { + it('should call upsertRoomStatus regardless of detail write success/failure', async () => { + let roomStatusCalled = false; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ + legacyEnabled: true, legacySuccess: false, + failedRecords: [{ error: new Error('fail'), record: buildBasePayload() }], + }), + upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; }, + }); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + await new Promise(r => setTimeout(r, 50)); + assert.ok(roomStatusCalled); + }); +}); + +describe('Dual-write: 暂停消费策略', () => { + it('should NOT pause when both detail writes disabled but room_status enabled', async () => { + let paused = false; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }), + }); + processor.onDbOffline = () => { paused = true; }; + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + assert.equal(paused, false); + }); + + it('should pause when only legacy enabled and legacy has connection error', () => { + const processor = buildProcessor({}); + const result = processor._shouldPauseConsumption( + { enabled: true, isConnectionError: true }, + { enabled: false, isConnectionError: false } + ); + assert.ok(result); + }); + + it('should NOT pause in dual-write when only g4hot has connection error', async () => { + let paused = false; + const processor = buildProcessor({ + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ + legacyEnabled: true, legacySuccess: true, + g4HotEnabled: true, g4HotSuccess: false, g4HotConn: true, + }), + }); + processor.onDbOffline = () => { paused = true; }; + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + assert.equal(paused, false); + }); + + it('should pause in dual-write when both have connection errors', () => { + const processor = buildProcessor({}); + const result = processor._shouldPauseConsumption( + { enabled: true, isConnectionError: true }, + { enabled: true, isConnectionError: true } + ); + assert.ok(result); + }); + + it('should NOT pause when only g4hot has connection error in dual mode', () => { + const processor = buildProcessor({}); + const result = processor._shouldPauseConsumption( + { enabled: true, isConnectionError: false }, + { enabled: true, isConnectionError: true } + ); + assert.equal(result, false); + }); + + it('should pause when only g4hot enabled and g4hot has connection error', () => { + const processor = buildProcessor({}); + const result = processor._shouldPauseConsumption( + { enabled: false, isConnectionError: false }, + { enabled: true, isConnectionError: true } + ); + assert.ok(result); + }); +}); + +describe('DatabaseManager: _g4HotToRowValues', () => { + it('unpacks svc booleans from service_mask', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const e = { ...buildBasePayload(), service_mask: 7 }; // bits 0,1,2 set + const values = dm._g4HotToRowValues(e); + const cols = dm._getG4HotColumns(); + const svc01Idx = cols.indexOf('svc_01'); + assert.equal(values[svc01Idx], true); // bit 0 + assert.equal(values[svc01Idx + 1], true); // bit 1 (svc_02) + assert.equal(values[svc01Idx + 2], true); // bit 2 (svc_03) + assert.equal(values[svc01Idx + 3], false); // bit 3 (svc_04) + assert.equal(values[svc01Idx + 63], false); // svc_64 + }); + + it('unpacks array fields into _1, _2, _residual', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const e = { + ...buildBasePayload(), + air_address: ['ac1', 'ac2', 'ac3'], + state: [1, 2, 3], + elec_address: ['e1'], + voltage: [220.5], + }; + const values = dm._g4HotToRowValues(e); + const cols = dm._getG4HotColumns(); + + const airAddr1Idx = cols.indexOf('air_address_1'); + assert.equal(values[airAddr1Idx], 'ac1'); + assert.equal(values[airAddr1Idx + 1], 'ac2'); // air_address_2 + assert.deepEqual(values[airAddr1Idx + 2], ['ac3']); // air_address_residual + + const state1Idx = cols.indexOf('state_1'); + assert.equal(values[state1Idx], 1); + assert.equal(values[state1Idx + 1], 2); + assert.deepEqual(values[state1Idx + 2], [3]); + + const elecAddr1Idx = cols.indexOf('elec_address_1'); + assert.equal(values[elecAddr1Idx], 'e1'); + assert.equal(values[elecAddr1Idx + 1], null); // only 1 element + assert.equal(values[elecAddr1Idx + 2], null); // no residual + }); + + it('produces correct column count', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const e = buildBasePayload(); + const cols = dm._getG4HotColumns(); + const values = dm._g4HotToRowValues(e); + assert.equal(values.length, cols.length); + }); + + it('writes power helper fields as null temporarily', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const e = { + ...buildBasePayload(), + extra: { + power_carbon_on: 1.5, + power_carbon_off: 2.5, + power_person_exist: 3.5, + power_person_left: 4.5, + }, + }; + const cols = dm._getG4HotColumns(); + const values = dm._g4HotToRowValues(e); + assert.equal(values[cols.indexOf('power_carbon_on')], null); + assert.equal(values[cols.indexOf('power_carbon_off')], null); + assert.equal(values[cols.indexOf('power_person_exist')], null); + assert.equal(values[cols.indexOf('power_person_left')], null); + }); + + it('generates lowercase guid without hyphens when missing', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const e = buildBasePayload(); + const cols = dm._getG4HotColumns(); + const values = dm._g4HotToRowValues(e); + const guid = values[cols.indexOf('guid')]; + assert.match(guid, /^[0-9a-f]{32}$/); + }); + + it('normalizes provided guid to lowercase without hyphens', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const e = { ...buildBasePayload(), guid: 'A0B1C2D3-E4F5-6789-ABCD-EF0123456789' }; + const cols = dm._getG4HotColumns(); + const values = dm._g4HotToRowValues(e); + const guid = values[cols.indexOf('guid')]; + assert.equal(guid, 'a0b1c2d3e4f56789abcdef0123456789'); + }); +}); + +describe('DatabaseManager: _formatPgCol', () => { + it('formats boolean values correctly', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + assert.equal(dm._formatPgCol(true), 't'); + assert.equal(dm._formatPgCol(false), 'f'); + assert.equal(dm._formatPgCol(null), '\\N'); + }); +}); + +describe('DatabaseManager: insertHeartbeatEventsDual', () => { + it('returns empty results when both targets disabled', async () => { + const dm = new DatabaseManager({ + host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', + legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, + }); + const result = await dm.insertHeartbeatEventsDual([buildBasePayload()]); + assert.equal(result.legacy.enabled, false); + assert.equal(result.g4Hot.enabled, false); + assert.equal(result.legacy.insertedCount, 0); + assert.equal(result.g4Hot.insertedCount, 0); + }); +}); diff --git a/test/stats.test.js b/test/stats.test.js index cc3593f..fcd8152 100644 --- a/test/stats.test.js +++ b/test/stats.test.js @@ -43,15 +43,17 @@ describe('StatsReporter', () => { const reporter = new StatsReporter({ redis, stats }); reporter.flushOnce(); - assert.equal(calls.push.length, 4); - assert.equal(calls.push[0].level, 'info'); - assert.equal(calls.push[1].level, 'info'); - assert.equal(calls.push[2].level, 'info'); - assert.equal(calls.push[3].level, 'info'); - assert.match(calls.push[0].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入量: 7条$/); - assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入失败量: 2条$/); - assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/); - assert.match(calls.push[3].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/); + assert.equal(calls.push.length, 9); + for (const c of calls.push) assert.equal(c.level, 'info'); + assert.match(calls.push[0].message, /Legacy写入量: 7条$/); + assert.match(calls.push[1].message, /Legacy写入失败量: 2条$/); + assert.match(calls.push[2].message, /G4Hot写入量: 0条$/); + assert.match(calls.push[3].message, /G4Hot写入失败量: 0条$/); + assert.match(calls.push[4].message, /RoomStatus写入量: 0条$/); + assert.match(calls.push[5].message, /RoomStatus失败量: 0条$/); + assert.match(calls.push[6].message, /G4Hot错误表插入量: 0条$/); + assert.match(calls.push[7].message, /数据过滤量: 8条$/); + assert.match(calls.push[8].message, /Kafka拉取量: 9条$/); }); }); diff --git a/vite.config.js b/vite.config.js index 079434f..b810cdf 100644 --- a/vite.config.js +++ b/vite.config.js @@ -14,7 +14,7 @@ export default defineConfig({ 'kafka-node', 'pg', 'redis', 'pg-copy-streams', // Node.js core modules 'events', 'url', 'crypto', 'util', 'net', 'tls', 'buffer', 'path', 'stream', 'stream/promises', - 'node:zlib', 'node:fs', 'node:path', 'node:url', 'node:stream', 'node:stream/promises' + 'node:crypto', 'node:zlib', 'node:fs', 'node:path', 'node:url', 'node:stream', 'node:stream/promises' ], output: { globals: {