diff --git a/.env.example b/.env.example index 769e28d..aeaa8ea 100644 --- a/.env.example +++ b/.env.example @@ -67,12 +67,19 @@ DB_RETRY_DELAY=1000 DB_LEGACY_HEARTBEAT_ENABLED=true # 新热表:heartbeat.heartbeat_events_g4_hot DB_G4_HOT_HEARTBEAT_ENABLED=false +# G5 临时热表:heartbeat.heartbeat_events_g5 +DB_G5_HEARTBEAT_ENABLED=false # room_status 写入开关 DB_ROOM_STATUS_ENABLED=true +# G5 room_status 写入开关 +DB_G5_ROOM_STATUS_ENABLED=false # 如无特殊需要,保持默认表名即可 DB_LEGACY_TABLE=heartbeat.heartbeat_events DB_G4_HOT_TABLE=heartbeat.heartbeat_events_g4_hot +DB_G5_TABLE=heartbeat.heartbeat_events_g5 +DB_ROOM_STATUS_TABLE=room_status.room_status_moment +DB_G5_ROOM_STATUS_TABLE=room_status.room_status_moment_g5 # ========================= # Redis 配置 diff --git a/docs/heartbeat_events_g5.sql b/docs/heartbeat_events_g5.sql new file mode 100644 index 0000000..7e98a2b --- /dev/null +++ b/docs/heartbeat_events_g5.sql @@ -0,0 +1,193 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 80 + Source Server Type : PostgreSQL + Source Server Version : 150017 (150017) + Source Host : 10.8.8.80:5434 + Source Catalog : log_platform + Source Schema : heartbeat + + Target Server Type : PostgreSQL + Target Server Version : 150017 (150017) + File Encoding : 65001 + + Date: 10/03/2026 10:18:37 +*/ + + +-- ---------------------------- +-- Table structure for heartbeat_events_g5 +-- ---------------------------- +DROP TABLE IF EXISTS "heartbeat"."heartbeat_events_g5"; +CREATE TABLE "heartbeat"."heartbeat_events_g5" ( + "guid" int4 NOT NULL DEFAULT nextval('"heartbeat".heartbeat_events_g5_guid_seq'::regclass), + "ts_ms" int8 NOT NULL, + "hotel_id" int2 NOT NULL, + "room_id" varchar(50) COLLATE "pg_catalog"."default" NOT NULL, + "device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL, + "ip" varchar(21) COLLATE "pg_catalog"."default" NOT NULL, + "power_state" int2 NOT NULL, + "guest_type" int2 NOT NULL, + "cardless_state" int2 NOT NULL, + "service_mask" int8, + "pms_state" int2 NOT NULL, + "carbon_state" int2 NOT NULL, + "device_count" int2 NOT NULL, + "comm_seq" int4 NOT NULL, + "elec_address" _text COLLATE "pg_catalog"."default", + "air_address" _text COLLATE "pg_catalog"."default", + "voltage" _float8, + "ampere" _float8, + "power" _float8, + "phase" _text COLLATE "pg_catalog"."default", + "energy" _float8, + "sum_energy" _float8, + "state" _int2, + "model" _int2, + "speed" _int2, + "set_temp" _int2, + "now_temp" _int2, + "solenoid_valve" _int2, + "extra" jsonb, + "write_ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "insert_card" int2, + "bright_g" int2, + "version" int2, + "svc_01" bool, + "svc_02" bool, + "svc_03" bool, + "svc_04" bool, + "svc_05" bool, + "svc_06" bool, + "svc_07" bool, + "svc_08" bool, + "svc_09" bool, + "svc_10" bool, + "svc_11" bool, + "svc_12" bool, + "svc_13" bool, + "svc_14" bool, + "svc_15" bool, + "svc_16" bool, + "svc_17" bool, + "svc_18" bool, + "svc_19" bool, + "svc_20" bool, + "svc_21" bool, + "svc_22" bool, + "svc_23" bool, + "svc_24" bool, + "svc_25" bool, + "svc_26" bool, + "svc_27" bool, + "svc_28" bool, + "svc_29" bool, + "svc_30" bool, + "svc_31" bool, + "svc_32" bool, + "svc_33" bool, + "svc_34" bool, + "svc_35" bool, + "svc_36" bool, + "svc_37" bool, + "svc_38" bool, + "svc_39" bool, + "svc_40" bool, + "svc_41" bool, + "svc_42" bool, + "svc_43" bool, + "svc_44" bool, + "svc_45" bool, + "svc_46" bool, + "svc_47" bool, + "svc_48" bool, + "svc_49" bool, + "svc_50" bool, + "svc_51" bool, + "svc_52" bool, + "svc_53" bool, + "svc_54" bool, + "svc_55" bool, + "svc_56" bool, + "svc_57" bool, + "svc_58" bool, + "svc_59" bool, + "svc_60" bool, + "svc_61" bool, + "svc_62" bool, + "svc_63" bool, + "svc_64" bool, + "air_address_1" text COLLATE "pg_catalog"."default", + "air_address_2" text COLLATE "pg_catalog"."default", + "air_address_residual" _text COLLATE "pg_catalog"."default", + "state_1" int2, + "state_2" int2, + "state_residual" _int2, + "model_1" int2, + "model_2" int2, + "model_residual" _int2, + "speed_1" int2, + "speed_2" int2, + "speed_residual" _int2, + "set_temp_1" int2, + "set_temp_2" int2, + "set_temp_residual" _int2, + "now_temp_1" int2, + "now_temp_2" int2, + "now_temp_residual" _int2, + "solenoid_valve_1" int2, + "solenoid_valve_2" int2, + "solenoid_valve_residual" _int2, + "elec_address_1" text COLLATE "pg_catalog"."default", + "elec_address_2" text COLLATE "pg_catalog"."default", + "elec_address_residual" _text COLLATE "pg_catalog"."default", + "voltage_1" float8, + "voltage_2" float8, + "voltage_residual" _float8, + "ampere_1" float8, + "ampere_2" float8, + "ampere_residual" _float8, + "power_1" float8, + "power_2" float8, + "power_residual" _float8, + "phase_1" text COLLATE "pg_catalog"."default", + "phase_2" text COLLATE "pg_catalog"."default", + "phase_residual" _text COLLATE "pg_catalog"."default", + "energy_1" float8, + "energy_2" float8, + "energy_residual" _float8, + "sum_energy_1" float8, + "sum_energy_2" float8, + "sum_energy_residual" _float8, + "power_carbon_on" float8, + "power_carbon_off" float8, + "power_person_exist" float8, + "power_person_left" float8 +) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Indexes structure for table heartbeat_events_g5 +-- ---------------------------- +CREATE INDEX "heartbeat_events_g5_ts_ms_idx" ON "heartbeat"."heartbeat_events_g5" USING btree ( + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +) TABLESPACE "ts_hot"; +CREATE INDEX "idx_hb_g5_device_ts" ON "heartbeat"."heartbeat_events_g5" USING btree ( + "device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +); +CREATE INDEX "idx_hb_g5_room_query" ON "heartbeat"."heartbeat_events_g5" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "room_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST, + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +); +CREATE INDEX "idx_hb_g5_ts_ms" ON "heartbeat"."heartbeat_events_g5" USING btree ( + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +); + +-- ---------------------------- +-- Primary Key structure for table heartbeat_events_g5 +-- ---------------------------- +ALTER TABLE "heartbeat"."heartbeat_events_g5" ADD CONSTRAINT "heartbeat_events_g5_pkey" PRIMARY KEY ("ts_ms", "guid"); diff --git a/docs/room_status_moment_g5.sql b/docs/room_status_moment_g5.sql new file mode 100644 index 0000000..06d8d0d --- /dev/null +++ b/docs/room_status_moment_g5.sql @@ -0,0 +1,88 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 80 + Source Server Type : PostgreSQL + Source Server Version : 150017 (150017) + Source Host : 10.8.8.80:5434 + Source Catalog : log_platform + Source Schema : room_status + + Target Server Type : PostgreSQL + Target Server Version : 150017 (150017) + File Encoding : 65001 + + Date: 10/03/2026 10:32:13 +*/ + + +-- ---------------------------- +-- Table structure for room_status_moment_g5 +-- ---------------------------- +DROP TABLE IF EXISTS "room_status"."room_status_moment_g5"; +CREATE TABLE "room_status"."room_status_moment_g5" ( + "hotel_id" int2 NOT NULL, + "room_id" text COLLATE "pg_catalog"."default" NOT NULL, + "device_id" text COLLATE "pg_catalog"."default" NOT NULL, + "ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "sys_lock_status" int2, + "online_status" int2, + "launcher_version" text COLLATE "pg_catalog"."default", + "app_version" text COLLATE "pg_catalog"."default", + "config_version" text COLLATE "pg_catalog"."default", + "register_ts_ms" int8, + "upgrade_ts_ms" int8, + "config_ts_ms" int8, + "ip" text COLLATE "pg_catalog"."default", + "pms_status" int2, + "power_state" int2, + "cardless_state" int2, + "service_mask" int8, + "insert_card" int2, + "bright_g" int2, + "agreement_ver" text COLLATE "pg_catalog"."default", + "air_address" _text COLLATE "pg_catalog"."default", + "air_state" _int2, + "air_model" _int2, + "air_speed" _int2, + "air_set_temp" _int2, + "air_now_temp" _int2, + "air_solenoid_valve" _int2, + "elec_address" _text COLLATE "pg_catalog"."default", + "elec_voltage" _float8, + "elec_ampere" _float8, + "elec_power" _float8, + "elec_phase" _float8, + "elec_energy" _float8, + "elec_sum_energy" _float8, + "carbon_state" int2, + "dev_loops" jsonb, + "energy_carbon_sum" float8, + "energy_nocard_sum" float8, + "external_device" jsonb DEFAULT '{}'::jsonb, + "faulty_device_count" jsonb DEFAULT '{}'::jsonb +) +WITH (fillfactor=90) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Indexes structure for table room_status_moment_g5 +-- ---------------------------- +CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "online_status" "pg_catalog"."int2_ops" ASC NULLS LAST, + "power_state" "pg_catalog"."int2_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Triggers structure for table room_status_moment_g5 +-- ---------------------------- +CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5" +FOR EACH ROW +EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"(); + +-- ---------------------------- +-- Primary Key structure for table room_status_moment_g5 +-- ---------------------------- +ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id"); diff --git a/openspec/changes/add-g5-independent-write/proposal.md b/openspec/changes/add-g5-independent-write/proposal.md new file mode 100644 index 0000000..edf4f2c --- /dev/null +++ b/openspec/changes/add-g5-independent-write/proposal.md @@ -0,0 +1,15 @@ +# Change: add g5 independent write + +## Why +当前服务已经支持 legacy 与 G4 热表双写,但需要在不影响现有主链路的前提下,再额外向一个临时 G5 库独立写入同构数据,便于过渡期联调与验证。 + +## What Changes +- 新增 G5 独立数据库连接配置与可关闭的写入开关 +- 在现有 legacy/G4 写入成功路径后,追加独立的 G5 写入流程 +- G5 使用与 G4 相同的数据结构映射,但不写入 guid,由数据库自生成 int4 guid +- room_status 新增 G5 独立 upsert 写入路径,并保留旧表与 G5 表的独立开关 +- 新增 G5 写入统计与启动摘要输出 + +## Impact +- Affected specs: db, processor +- Affected code: src/config/config.js, src/index.js, src/db/databaseManager.js, src/processor/heartbeatProcessor.js, src/stats/statsManager.js, test/*.test.js \ No newline at end of file diff --git a/openspec/changes/add-g5-independent-write/specs/db/spec.md b/openspec/changes/add-g5-independent-write/specs/db/spec.md new file mode 100644 index 0000000..6ecdc62 --- /dev/null +++ b/openspec/changes/add-g5-independent-write/specs/db/spec.md @@ -0,0 +1,63 @@ +## MODIFIED Requirements +### Requirement: 双明细独立编排 +系统 MUST 提供多目标明细写入编排能力,按启动配置分别控制旧表、G4 热表和临时 G5 热表的写入,各路写入结果相互独立。 + +#### Scenario: 旧表、G4 与 G5 可独立组合 +- **WHEN** `legacyHeartbeatEnabled`、`g4HotHeartbeatEnabled`、`g5HeartbeatEnabled` 任意组合启停 +- **THEN** 系统应只对开启的目标执行写入 +- **AND** 任一路写入失败都不应阻塞其他已开启目标的写入 + +#### Scenario: G5 写入不影响主链路暂停判定 +- **WHEN** G5 临时库写入失败或连接异常 +- **THEN** 系统不应因此暂停 legacy/G4 主链路消费 +- **AND** 应记录日志与统计以便排查 + +### Requirement: 写入目标启动配置 +系统 MUST 通过启动配置(环境变量)分别控制旧表、G4 热表、临时 G5 热表和 room_status 的写入开关与目标表名。 + +#### Scenario: 读取 G5 独立连接与表配置 +- **WHEN** 系统启动时 +- **THEN** 应读取 `DB_G5_HEARTBEAT_ENABLED` 作为 G5 写入开关 +- **AND** 应读取 `DB_G5_TABLE` 作为 G5 目标表名,默认 `heartbeat.heartbeat_events_g5` +- **AND** 应读取 `POSTGRES_HOST_G5`、`POSTGRES_PORT_G5`、`POSTGRES_DATABASE_G5`、`POSTGRES_USER_G5`、`POSTGRES_PASSWORD_G5`、`POSTGRES_IDLE_TIMEOUT_MS_G5` 作为 G5 独立连接参数 + +### Requirement: G5 热表独立写入能力 +系统 MUST 支持向 `heartbeat.heartbeat_events_g5` 执行批量 COPY 写入,并与 G4 热表共享同一套字段展开逻辑。 + +#### Scenario: G5 写入复用 G4 字段映射 +- **WHEN** `g5HeartbeatEnabled=true` 且有一批心跳数据待写入 +- **THEN** 系统应使用与 G4 热表一致的字段展开规则写入 `heartbeat.heartbeat_events_g5` +- **AND** `service_mask` 应展开为 `svc_01` 至 `svc_64` +- **AND** 电力与空调数组应展开为 `_1`、`_2`、`_residual` 列 + +#### Scenario: G5 guid 由数据库生成 +- **WHEN** 系统写入 `heartbeat.heartbeat_events_g5` +- **THEN** 系统不应为 `guid` 字段赋值 +- **AND** 应依赖数据库默认值生成 `int4` 类型 guid + +#### Scenario: G5 原始数组列固定写空 +- **WHEN** 系统写入 `heartbeat.heartbeat_events_g5` +- **THEN** `service_mask`、`elec_address`、`air_address`、`voltage`、`ampere`、`power`、`phase`、`energy`、`sum_energy`、`state`、`model`、`speed`、`set_temp`、`now_temp`、`solenoid_valve`、`extra` 应统一写入 `null` +- **AND** `_1`、`_2`、`_residual` 展开列仍应按来源数据正常写入 + +### Requirement: room_status 新旧双表独立 upsert +系统 MUST 支持将 room_status 同时写入旧表与 G5 表,并允许分别开关。 + +#### Scenario: 旧表与 G5 表可独立开关 +- **WHEN** `DB_ROOM_STATUS_ENABLED` 与 `DB_G5_ROOM_STATUS_ENABLED` 任意组合启停 +- **THEN** 系统应仅对开启的 room_status 目标表执行 upsert +- **AND** 任一 room_status 目标失败不应阻塞另一目标 + +#### Scenario: room_status 全盘使用 ON CONFLICT DO UPDATE +- **WHEN** 系统写入旧 room_status 表或 G5 room_status 表 +- **THEN** 系统应统一使用 `INSERT ... ON CONFLICT DO UPDATE` +- **AND** 不应退回为单独的 `INSERT` 或 `UPDATE` 路径 + +#### Scenario: room_status SQL 不更新 ts_ms +- **WHEN** 系统执行旧 room_status 或 G5 room_status 的 upsert SQL +- **THEN** `DO UPDATE SET` 子句中不应包含 `ts_ms = EXCLUDED.ts_ms` + +#### Scenario: G5 room_status 使用 hotel_id 与 room_id 冲突键 +- **WHEN** 系统写入 `room_status.room_status_moment_g5` +- **THEN** 应使用 `(hotel_id, room_id)` 作为 `ON CONFLICT` 的冲突键 +- **AND** 应将 `device_id` 作为普通可更新列写入 \ No newline at end of file diff --git a/openspec/changes/add-g5-independent-write/specs/processor/spec.md b/openspec/changes/add-g5-independent-write/specs/processor/spec.md new file mode 100644 index 0000000..e30bbc5 --- /dev/null +++ b/openspec/changes/add-g5-independent-write/specs/processor/spec.md @@ -0,0 +1,19 @@ +## MODIFIED Requirements +### Requirement: 按 sink 维度的统计与监控 +系统 MUST 按写入目标维度分别统计成功数、失败数与降级事件。 + +#### Scenario: legacy、g4Hot 与 g5 分别统计 +- **WHEN** 系统完成一批次的多目标写入编排 +- **THEN** 应分别统计 legacy 写入成功数、失败数 +- **AND** 应分别统计 g4Hot 写入成功数、失败数 +- **AND** 应分别统计 g5 写入成功数、失败数 +- **AND** 统计项应在 Redis 控制台输出或 stats 汇总中可见 + +### Requirement: room_status 新旧双表独立执行 +系统 MUST 在批次处理中独立执行旧 room_status 与 G5 room_status 的 upsert,不依赖任意明细写入结果。 + +#### Scenario: 两个 room_status 目标独立执行 +- **WHEN** 一批心跳数据完成验证与转换 +- **THEN** 只要 `DB_ROOM_STATUS_ENABLED=true` 就应执行旧 room_status upsert +- **AND** 只要 `DB_G5_ROOM_STATUS_ENABLED=true` 就应执行 G5 room_status upsert +- **AND** 任一路 room_status 写入失败都不应阻塞另一条 room_status 路径或主处理流程 \ No newline at end of file diff --git a/openspec/changes/add-g5-independent-write/tasks.md b/openspec/changes/add-g5-independent-write/tasks.md new file mode 100644 index 0000000..2c3576d --- /dev/null +++ b/openspec/changes/add-g5-independent-write/tasks.md @@ -0,0 +1,6 @@ +## 1. Implementation +- [x] 1.1 增加 G5 独立连接与开关配置读取 +- [x] 1.2 增加 G5 独立写库实现,复用 G4 热表映射逻辑并省略 guid 写入 +- [x] 1.3 将 G5 写入接入现有批处理流程,保持与 legacy/G4/room_status 相互独立 +- [x] 1.4 增加 G5 统计、启动摘要与测试覆盖 +- [x] 1.5 增加旧 room_status 与 G5 room_status 的独立 ON CONFLICT DO UPDATE 写入路径 \ No newline at end of file diff --git a/src/config/config.example.js b/src/config/config.example.js index bf4e77a..0f5a048 100644 --- a/src/config/config.example.js +++ b/src/config/config.example.js @@ -54,9 +54,28 @@ export default { retryDelay: 1000, // 重试延迟 legacyHeartbeatEnabled: true, // 旧明细表写入开关 g4HotHeartbeatEnabled: false, // 新明细表(g4_hot)写入开关 + g5HeartbeatEnabled: (env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true', // 临时 G5 库写入开关 roomStatusEnabled: true, // room_status 写入开关 legacyTable: 'heartbeat.heartbeat_events', g4HotTable: 'heartbeat.heartbeat_events_g4_hot', + roomStatusTable: env.DB_ROOM_STATUS_TABLE ?? 'room_status.room_status_moment', + }, + + g5db: { + enabled: (env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true', + host: env.POSTGRES_HOST_G5 ?? '10.8.8.80', + port: Number(env.POSTGRES_PORT_G5 ?? 5434), + user: env.POSTGRES_USER_G5 ?? 'log_admin', + password: env.POSTGRES_PASSWORD_G5 ?? 'YourActualStrongPasswordForG5!', + database: env.POSTGRES_DATABASE_G5 ?? 'log_platform', + maxConnections: Number(env.POSTGRES_MAX_CONNECTIONS_G5 ?? 1), + idleTimeoutMillis: Number(env.POSTGRES_IDLE_TIMEOUT_MS_G5 ?? 30000), + retryAttempts: Number(env.DB_G5_RETRY_ATTEMPTS ?? 3), + retryDelay: Number(env.DB_G5_RETRY_DELAY ?? 1000), + g5HeartbeatEnabled: (env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true', + g5Table: env.DB_G5_TABLE ?? 'heartbeat.heartbeat_events_g5', + roomStatusEnabled: (env.DB_G5_ROOM_STATUS_ENABLED ?? env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true', + roomStatusTable: env.DB_G5_ROOM_STATUS_TABLE ?? 'room_status.room_status_moment_g5', }, // 日志配置 diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index d069c14..bbb6944 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -184,6 +184,14 @@ class DatabaseManager { // ---- 新表 G4 Hot 列定义 ---- _getG4HotColumns() { + return this._getExpandedHotColumns({ includeGuid: true }); + } + + _getG5Columns() { + return this._getExpandedHotColumns({ includeGuid: false }); + } + + _getExpandedHotColumns({ includeGuid }) { const base = [ 'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip', 'power_state', 'guest_type', 'cardless_state', 'service_mask', @@ -213,8 +221,9 @@ class DatabaseManager { 'sum_energy_1', 'sum_energy_2', 'sum_energy_residual', ]; const power = ['power_carbon_on', 'power_carbon_off', 'power_person_exist', 'power_person_left']; - const tail = ['guid']; - return [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power, ...tail]; + return includeGuid + ? [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power, 'guid'] + : [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power]; } _unpackArrElement(arr, idx) { @@ -228,6 +237,14 @@ class DatabaseManager { } _g4HotToRowValues(e) { + return this._expandedHotToRowValues(e, { includeGuid: true, nullifyArrayColumns: false, nullifyG5BaseColumns: false }); + } + + _g5ToRowValues(e) { + return this._expandedHotToRowValues(e, { includeGuid: false, nullifyArrayColumns: true, nullifyG5BaseColumns: true }); + } + + _expandedHotToRowValues(e, { includeGuid, nullifyArrayColumns, nullifyG5BaseColumns }) { const values = [ e.ts_ms, e.write_ts_ms ?? Date.now(), @@ -238,7 +255,7 @@ class DatabaseManager { e.power_state, e.guest_type, e.cardless_state, - e.service_mask, + nullifyG5BaseColumns ? null : e.service_mask, e.pms_state, e.carbon_state, e.device_count, @@ -246,21 +263,21 @@ class DatabaseManager { e.insert_card ?? null, (e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null), e.version ?? null, - Array.isArray(e.elec_address) ? e.elec_address : null, - Array.isArray(e.air_address) ? e.air_address : null, - Array.isArray(e.voltage) ? e.voltage : null, - Array.isArray(e.ampere) ? e.ampere : null, - Array.isArray(e.power) ? e.power : null, - Array.isArray(e.phase) ? e.phase : null, - Array.isArray(e.energy) ? e.energy : null, - Array.isArray(e.sum_energy) ? e.sum_energy : null, - Array.isArray(e.state) ? e.state : null, - Array.isArray(e.model) ? e.model : null, - Array.isArray(e.speed) ? e.speed : null, - Array.isArray(e.set_temp) ? e.set_temp : null, - Array.isArray(e.now_temp) ? e.now_temp : null, - Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, - e.extra ?? null, + nullifyArrayColumns ? null : (Array.isArray(e.elec_address) ? e.elec_address : null), + nullifyArrayColumns ? null : (Array.isArray(e.air_address) ? e.air_address : null), + nullifyArrayColumns ? null : (Array.isArray(e.voltage) ? e.voltage : null), + nullifyArrayColumns ? null : (Array.isArray(e.ampere) ? e.ampere : null), + nullifyArrayColumns ? null : (Array.isArray(e.power) ? e.power : null), + nullifyArrayColumns ? null : (Array.isArray(e.phase) ? e.phase : null), + nullifyArrayColumns ? null : (Array.isArray(e.energy) ? e.energy : null), + nullifyArrayColumns ? null : (Array.isArray(e.sum_energy) ? e.sum_energy : null), + nullifyArrayColumns ? null : (Array.isArray(e.state) ? e.state : null), + nullifyArrayColumns ? null : (Array.isArray(e.model) ? e.model : null), + nullifyArrayColumns ? null : (Array.isArray(e.speed) ? e.speed : null), + nullifyArrayColumns ? null : (Array.isArray(e.set_temp) ? e.set_temp : null), + nullifyArrayColumns ? null : (Array.isArray(e.now_temp) ? e.now_temp : null), + nullifyArrayColumns ? null : (Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null), + nullifyG5BaseColumns ? null : (e.extra ?? null), ]; // svc_01 .. svc_64 布尔展开 @@ -305,7 +322,9 @@ class DatabaseManager { values.push(null); values.push(null); - values.push(this._normalizeGuid(e.guid)); + if (includeGuid) { + values.push(this._normalizeGuid(e.guid)); + } return values; } @@ -456,6 +475,24 @@ class DatabaseManager { }; } + async insertHeartbeatEventsG5(events) { + if (!Array.isArray(events)) events = [events]; + if (events.length === 0) { + return { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null }; + } + + const result = await this._insertEventsToTarget(events, { + tableName: this.config.g5Table ?? 'heartbeat.heartbeat_events_g5', + columns: this._getG5Columns(), + toRowValues: (e) => this._g5ToRowValues(e), + ensurePartitions: false, + logPrefix: '[g5]', + missingPartitionTable: null, + }); + + return { ...result, enabled: true }; + } + // v2 明细表写入(向后兼容封装,仅旧表,抛出连接错误) async insertHeartbeatEvents(events) { if (!Array.isArray(events)) events = [events]; @@ -683,34 +720,8 @@ class DatabaseManager { } } - - - // 同步更新 room_status.room_status_moment 表 - // 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert - async upsertRoomStatus(events) { - if (!Array.isArray(events)) { - events = [events]; - } - if (events.length === 0) return { insertedCount: 0, updatedCount: 0 }; - - // 批次内去重:按 (hotel_id, room_id, device_id) 分组,只保留 ts_ms 最大的一条 - // 原因:PostgreSQL ON CONFLICT 不允许同一语句中多次更新同一行 - const uniqueEventsMap = new Map(); - for (const e of events) { - if (!e.hotel_id || !e.room_id || !e.device_id) continue; - const key = `${e.hotel_id}_${e.room_id}_${e.device_id}`; - const existing = uniqueEventsMap.get(key); - // 如果没有记录,或者当前记录时间更新,则覆盖 - if (!existing || (BigInt(e.ts_ms || 0) > BigInt(existing.ts_ms || 0))) { - uniqueEventsMap.set(key, e); - } - } - const uniqueEvents = Array.from(uniqueEventsMap.values()); - if (uniqueEvents.length === 0) return { insertedCount: 0, updatedCount: 0 }; - - // 字段映射:心跳字段 -> room_status 字段 - // 注意:只更新心跳包里有的字段 - const columns = [ + _getRoomStatusBaseColumns() { + return [ 'ts_ms', 'hotel_id', 'room_id', @@ -723,7 +734,7 @@ class DatabaseManager { 'insert_card', 'carbon_state', 'bright_g', - 'agreement_ver', // map from version + 'agreement_ver', 'air_address', 'air_state', 'air_model', @@ -739,77 +750,103 @@ class DatabaseManager { 'elec_energy', 'elec_sum_energy', ]; + } - const toRowValues = (e) => [ - e.ts_ms, - e.hotel_id, - e.room_id, - e.device_id, - e.ip, - e.pms_state, // pms_status - e.power_state, - e.cardless_state, - e.service_mask, - e.insert_card ?? null, - e.carbon_state, - e.bright_g === -1 ? null : (e.bright_g ?? null), - e.version ?? null, // agreement_ver - Array.isArray(e.air_address) ? e.air_address : null, - Array.isArray(e.state) ? e.state : null, // air_state - Array.isArray(e.model) ? e.model : null, // air_model - Array.isArray(e.speed) ? e.speed : null, // air_speed - Array.isArray(e.set_temp) ? e.set_temp : null, // air_set_temp - Array.isArray(e.now_temp) ? e.now_temp : null, // air_now_temp - Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, // air_solenoid_valve - Array.isArray(e.elec_address) ? e.elec_address : null, - Array.isArray(e.voltage) ? e.voltage : null, // elec_voltage - Array.isArray(e.ampere) ? e.ampere : null, // elec_ampere - Array.isArray(e.power) ? e.power : null, // elec_power - Array.isArray(e.phase) ? e.phase : null, // elec_phase - Array.isArray(e.energy) ? e.energy : null, // elec_energy - Array.isArray(e.sum_energy) ? e.sum_energy : null, // elec_sum_energy + _roomStatusToRowValues(event) { + return [ + event.ts_ms, + event.hotel_id, + event.room_id, + event.device_id, + event.ip, + event.pms_state, + event.power_state, + event.cardless_state, + event.service_mask, + event.insert_card ?? null, + event.carbon_state, + event.bright_g === -1 ? null : (event.bright_g ?? null), + event.version === null || event.version === undefined ? null : String(event.version), + Array.isArray(event.air_address) ? event.air_address : null, + Array.isArray(event.state) ? event.state : null, + Array.isArray(event.model) ? event.model : null, + Array.isArray(event.speed) ? event.speed : null, + Array.isArray(event.set_temp) ? event.set_temp : null, + Array.isArray(event.now_temp) ? event.now_temp : null, + Array.isArray(event.solenoid_valve) ? event.solenoid_valve : null, + Array.isArray(event.elec_address) ? event.elec_address : null, + Array.isArray(event.voltage) ? event.voltage : null, + Array.isArray(event.ampere) ? event.ampere : null, + Array.isArray(event.power) ? event.power : null, + Array.isArray(event.phase) ? event.phase : null, + Array.isArray(event.energy) ? event.energy : null, + Array.isArray(event.sum_energy) ? event.sum_energy : null, ]; + } - // 构建 UPDATE SET 子句(排除主键和 guid) - // 使用 EXCLUDED.col 引用新值 - // 使用 IS DISTINCT FROM 避免无意义更新 - const updateColumns = columns.filter(c => !['hotel_id', 'room_id', 'device_id'].includes(c)); - const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', '); + _buildRoomStatusUpsertQuery(events, target) { + const { tableName, conflictColumns, includeGuid, tableRef } = target; + const columns = this._getRoomStatusBaseColumns(); + const uniqueEventsMap = new Map(); - // 构建 WHERE 子句:仅当至少一个字段发生变化,且时间戳未回退时才更新 - // 注意:room_status.room_status_moment.ts_ms 是 bigint,EXCLUDED.ts_ms 也是 bigint - const whereConditions = updateColumns.map(col => `room_status.room_status_moment.${col} IS DISTINCT FROM EXCLUDED.${col}`).join(' OR '); + for (const event of events) { + const keyValues = conflictColumns.map((column) => event?.[column]); + if (keyValues.some((value) => value === undefined || value === null || value === '')) continue; + const key = keyValues.join('_'); + const existing = uniqueEventsMap.get(key); + if (!existing || BigInt(event.ts_ms || 0) > BigInt(existing.ts_ms || 0)) { + uniqueEventsMap.set(key, event); + } + } + + const uniqueEvents = Array.from(uniqueEventsMap.values()); + if (uniqueEvents.length === 0) { + return { sql: null, values: [], uniqueEvents: [] }; + } + + const allColumns = includeGuid ? [...columns, 'guid'] : [...columns]; + const updateColumns = columns.filter((column) => !['ts_ms', ...conflictColumns].includes(column)); + const updateSet = updateColumns.map((column) => `${column} = EXCLUDED.${column}`).join(', '); + const whereConditions = updateColumns.map((column) => `${tableRef}.${column} IS DISTINCT FROM EXCLUDED.${column}`).join(' OR '); - // 生成批量插入 SQL - // 注意:ON CONFLICT (hotel_id, room_id, device_id) 依赖于唯一索引 idx_room_status_unique_device const values = []; - const placeholders = uniqueEvents.map((e, idx) => { - const rowVals = toRowValues(e); - values.push(...rowVals); - // 额外插入 gen_random_uuid() 作为 guid - const p = rowVals.map((_, i) => `$${idx * rowVals.length + i + 1}`).join(', '); - return `(${p}, gen_random_uuid())`; + const placeholders = uniqueEvents.map((event, eventIndex) => { + const rowValues = this._roomStatusToRowValues(event); + values.push(...rowValues); + const start = eventIndex * rowValues.length; + const params = rowValues.map((_, valueIndex) => `$${start + valueIndex + 1}`).join(', '); + return includeGuid ? `(${params}, gen_random_uuid())` : `(${params})`; }).join(', '); - const allCols = [...columns, 'guid'].join(', '); - const sql = ` - INSERT INTO room_status.room_status_moment (${allCols}) + INSERT INTO ${tableName} (${allColumns.join(', ')}) VALUES ${placeholders} - ON CONFLICT (hotel_id, room_id, device_id) + ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updateSet} - WHERE - room_status.room_status_moment.ts_ms <= EXCLUDED.ts_ms + WHERE + ${tableRef}.ts_ms <= EXCLUDED.ts_ms AND (${whereConditions}) `; + return { sql, values, uniqueEvents }; + } + + async _upsertRoomStatusToTarget(events, target) { + if (!Array.isArray(events)) { + events = [events]; + } + if (events.length === 0) return { rowCount: 0 }; + + const { sql, values, uniqueEvents } = this._buildRoomStatusUpsertQuery(events, target); + if (!sql || uniqueEvents.length === 0) return { rowCount: 0 }; + try { const res = await this.pool.query(sql, values); - return { rowCount: res.rowCount }; // 包括插入和更新的行数 + return { rowCount: res.rowCount }; } catch (error) { - if (this.isRoomStatusMissingPartitionError(error)) { - const hotelIds = [...new Set(uniqueEvents.map(e => e.hotel_id).filter(id => id != null))]; + if (target.autoCreatePartitions && this.isRoomStatusMissingPartitionError(error)) { + const hotelIds = [...new Set(uniqueEvents.map((event) => event.hotel_id).filter((id) => id != null))]; if (hotelIds.length > 0) { console.log(`[db] 检测到 room_status 分区缺失,尝试自动创建分区,hotelIds: ${hotelIds.join(', ')}`); await this.ensureRoomStatusPartitions(hotelIds); @@ -817,18 +854,43 @@ class DatabaseManager { const res = await this.pool.query(sql, values); return { rowCount: res.rowCount }; } catch (retryError) { - console.warn('[db] upsertRoomStatus retry failed:', retryError.message); + console.warn(`[db] ${target.logPrefix} retry failed:`, retryError.message); return { error: retryError }; } } } - // 不抛出错误,只记录日志,避免影响主流程(Heartbeat History 写入已成功) - console.warn('[db] upsertRoomStatus failed:', error.message); + console.warn(`[db] ${target.logPrefix} failed:`, error.message); return { error }; } } + + + // 同步更新 room_status.room_status_moment 表 + // 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert + async upsertRoomStatus(events) { + return this._upsertRoomStatusToTarget(events, { + tableName: this.config.roomStatusTable ?? 'room_status.room_status_moment', + conflictColumns: ['hotel_id', 'room_id', 'device_id'], + includeGuid: true, + autoCreatePartitions: true, + tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment', + logPrefix: 'upsertRoomStatus', + }); + } + + async upsertRoomStatusG5(events) { + return this._upsertRoomStatusToTarget(events, { + tableName: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5', + conflictColumns: ['hotel_id', 'room_id'], + includeGuid: false, + autoCreatePartitions: false, + tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5', + logPrefix: 'upsertRoomStatusG5', + }); + } + isRoomStatusMissingPartitionError(error) { const msg = String(error?.message ?? ''); // 错误码 23514 (check_violation) 通常在插入分区表且无对应分区时触发 diff --git a/src/index.js b/src/index.js index 36034bc..141f799 100644 --- a/src/index.js +++ b/src/index.js @@ -12,6 +12,7 @@ class WebBLSHeartbeatServer { this.kafkaConsumer = null; this.heartbeatProcessor = null; this.databaseManager = null; + this.g5DatabaseManager = null; this.redis = null; this.consumers = null; this.stats = new StatsCounters(); @@ -33,12 +34,31 @@ class WebBLSHeartbeatServer { console.log('数据库连接成功'); await this.redis?.info('数据库连接成功', { module: 'db' }); + if (this.config.g5db?.enabled) { + try { + this.g5DatabaseManager = new DatabaseManager({ ...this.config.g5db, maxConnections: 1 }); + await this.g5DatabaseManager.connect(); + console.log('G5数据库连接成功'); + await this.redis?.info('G5数据库连接成功', { module: 'db', table: this.config.g5db?.g5Table }); + } catch (error) { + this.g5DatabaseManager = null; + console.warn('G5数据库连接失败,已跳过 G5 写入:', error); + await this.redis?.warn('G5数据库连接失败,已跳过 G5 写入', { + module: 'db', + table: this.config.g5db?.g5Table, + error: String(error?.message ?? error), + }); + } + } + // 打印双写配置摘要 const dbCfg = this.config.db; const dualWriteSummary = { legacyHeartbeat: dbCfg.legacyHeartbeatEnabled ? `ON → ${dbCfg.legacyTable}` : 'OFF', g4HotHeartbeat: dbCfg.g4HotHeartbeatEnabled ? `ON → ${dbCfg.g4HotTable}` : 'OFF', - roomStatus: dbCfg.roomStatusEnabled !== false ? 'ON' : 'OFF', + g5Heartbeat: this.config.g5db?.enabled ? `ON → ${this.config.g5db?.g5Table}` : 'OFF', + roomStatus: dbCfg.roomStatusEnabled !== false ? `ON → ${dbCfg.roomStatusTable}` : 'OFF', + g5RoomStatus: this.config.g5db?.roomStatusEnabled ? `ON → ${this.config.g5db?.roomStatusTable}` : 'OFF', }; console.log('双写配置摘要:', dualWriteSummary); await this.redis?.info('双写配置摘要', { module: 'db', ...dualWriteSummary }); @@ -56,6 +76,7 @@ class WebBLSHeartbeatServer { // 初始化处理器(共享批处理队列) this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, { + g5DatabaseManager: this.g5DatabaseManager, redis: this.redis, stats: this.stats, onDbOffline: () => { @@ -114,6 +135,10 @@ class WebBLSHeartbeatServer { await this.databaseManager.disconnect(); } + if (this.g5DatabaseManager) { + await this.g5DatabaseManager.disconnect(); + } + if (this.redis) { await this.redis.info('BLS心跳接收端已停止', { module: 'app' }); await this.redis.disconnect(); diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index b33a63b..4d1e832 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -6,6 +6,7 @@ class HeartbeatProcessor { constructor(config, databaseManager, deps = {}) { this.config = config; this.databaseManager = databaseManager; + this.g5DatabaseManager = deps?.g5DatabaseManager ?? null; this.redis = deps?.redis ?? null; this.stats = deps?.stats ?? null; this.batchQueue = []; @@ -218,14 +219,7 @@ class HeartbeatProcessor { const { legacy: legacyResult, g4Hot: g4HotResult } = dualResult; // B. room_status 始终独立执行(不依赖明细写入结果) - const roomStatusEnabled = this.databaseManager.config?.roomStatusEnabled !== false; - if (roomStatusEnabled && batchData.length > 0) { - this.databaseManager.upsertRoomStatus(batchData).catch(err => { - console.warn('异步同步 room_status 失败 (忽略):', err); - this.stats?.incRoomStatusFailed?.(batchData.length); - }); - this.stats?.incRoomStatusWritten?.(batchData.length); - } + this._writeRoomStatus(batchData); // C. 暂停消费判定(基于当前启用的关键 sink) const shouldPause = this._shouldPauseConsumption(legacyResult, g4HotResult); @@ -240,14 +234,17 @@ class HeartbeatProcessor { return; } - // D. 清理队列、resolve deferreds + // D. G5 临时库独立写入,不参与主链路暂停判定 + const g5Result = await this._writeToG5(batchData); + + // E. 清理队列、resolve deferreds this.batchQueue.splice(0, batchEventCount); this.batchMessageQueue.splice(0, batchMessageCount); for (const entry of batchMessages) { entry.deferred.resolve({ insertedCount: entry.eventCount }); } - // E. 统计 & 日志 + // F. 统计 & 日志 if (legacyResult.enabled) { this.stats?.incDbWritten?.(legacyResult.insertedCount); if (legacyResult.failedRecords.length > 0) { @@ -263,7 +260,25 @@ class HeartbeatProcessor { } } - // F. 错误表:仅 g4Hot 失败记录(旧表失败不写错误表) + if (g5Result.enabled) { + const g5FailedCount = g5Result.failedRecords.length > 0 + ? g5Result.failedRecords.length + : (g5Result.isConnectionError ? batchData.length : 0); + this.stats?.incG5Written?.(g5Result.insertedCount); + if (g5FailedCount > 0) { + this.stats?.incG5WriteFailed?.(g5FailedCount); + console.warn(`[g5] 批次写入失败:成功 ${g5Result.insertedCount},失败 ${g5FailedCount}`); + await this.redis?.warn('G5批次写入失败', { + module: 'db', + table: this.g5DatabaseManager?.config?.g5Table, + insertedCount: g5Result.insertedCount, + failedCount: g5FailedCount, + error: String(g5Result.error?.message ?? g5Result.error ?? 'g5 write failed'), + }); + } + } + + // G. 错误表:仅 g4Hot 失败记录(旧表失败不写错误表) if (g4HotResult.enabled && g4HotResult.failedRecords.length > 0) { const dbPayload = g4HotResult.failedRecords.map(item => ({ hotel_id: item.record?.hotel_id ?? null, @@ -276,7 +291,7 @@ class HeartbeatProcessor { this.stats?.incG4HotErrorTableInserted?.(dbPayload.length); } - // G. Legacy 失败仅日志(不写错误表) + // H. Legacy 失败仅日志(不写错误表) if (legacyResult.enabled && legacyResult.failedRecords.length > 0) { for (const item of legacyResult.failedRecords.slice(0, 10)) { console.warn('[legacy] 单条写入失败:', item.error?.message); @@ -379,6 +394,54 @@ class HeartbeatProcessor { }, 5000); } + async _writeToG5(batchData) { + if (!this.g5DatabaseManager?.config?.enabled) { + return { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null }; + } + + try { + return await this.g5DatabaseManager.insertHeartbeatEventsG5(batchData); + } catch (error) { + const isConnectionError = typeof this.g5DatabaseManager?._isDbConnectionError === 'function' + ? this.g5DatabaseManager._isDbConnectionError(error) + : this._isConnectionError(error); + return { + enabled: true, + success: false, + insertedCount: 0, + failedRecords: [], + error, + isConnectionError, + batchError: error, + }; + } + } + + _writeRoomStatus(batchData) { + const oldRoomStatusEnabled = this.databaseManager.config?.roomStatusEnabled !== false; + if (oldRoomStatusEnabled && batchData.length > 0) { + this.databaseManager.upsertRoomStatus(batchData).catch(err => { + console.warn('异步同步 room_status 失败 (忽略):', err); + this.stats?.incRoomStatusFailed?.(batchData.length); + }); + this.stats?.incRoomStatusWritten?.(batchData.length); + } + + const g5RoomStatusEnabled = this.g5DatabaseManager?.config?.roomStatusEnabled === true; + if (g5RoomStatusEnabled && batchData.length > 0 && typeof this.g5DatabaseManager?.upsertRoomStatusG5 === 'function') { + this.g5DatabaseManager.upsertRoomStatusG5(batchData).catch(async (err) => { + console.warn('异步同步 G5 room_status 失败 (忽略):', err); + this.stats?.incRoomStatusFailed?.(batchData.length); + await this.redis?.warn('G5 room_status 同步失败', { + module: 'db', + table: this.g5DatabaseManager?.config?.roomStatusTable, + error: String(err?.message ?? err), + }); + }); + this.stats?.incRoomStatusWritten?.(batchData.length); + } + } + _emitDbWriteError(error, rawData) { const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : []; if (list.length > 0) { diff --git a/src/stats/statsManager.js b/src/stats/statsManager.js index b3a3de3..c22adfa 100644 --- a/src/stats/statsManager.js +++ b/src/stats/statsManager.js @@ -1,10 +1,13 @@ class StatsCounters { constructor() { - // 原有 4 槽 + 新增 7 槽 = 11 槽 + // [0] dbWritten, [1] filtered, [2] kafkaPulled, [3] dbWriteFailed, + // [4] g4HotWritten, [5] g4HotWriteFailed, [6] roomStatusWritten, + // [7] roomStatusFailed, [8] g4HotErrorTableInserted, + // [9] g5Written, [10] g5WriteFailed // [0] dbWritten, [1] filtered, [2] kafkaPulled, [3] dbWriteFailed, // [4] g4HotWritten, [5] g4HotWriteFailed, [6] roomStatusWritten, // [7] roomStatusFailed, [8] g4HotErrorTableInserted - this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 9); + this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 11); this._minute = new BigInt64Array(this._minuteBuf); } @@ -23,6 +26,8 @@ class StatsCounters { incRoomStatusWritten(n = 1) { this._inc(6, n); } incRoomStatusFailed(n = 1) { this._inc(7, n); } incG4HotErrorTableInserted(n = 1) { this._inc(8, n); } + incG5Written(n = 1) { this._inc(9, n); } + incG5WriteFailed(n = 1) { this._inc(10, n); } snapshotAndResetMinute() { const dbWritten = Atomics.exchange(this._minute, 0, 0n); @@ -34,7 +39,9 @@ class StatsCounters { const roomStatusWritten = Atomics.exchange(this._minute, 6, 0n); const roomStatusFailed = Atomics.exchange(this._minute, 7, 0n); const g4HotErrorTableInserted = Atomics.exchange(this._minute, 8, 0n); - return { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted }; + const g5Written = Atomics.exchange(this._minute, 9, 0n); + const g5WriteFailed = Atomics.exchange(this._minute, 10, 0n); + return { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted, g5Written, g5WriteFailed }; } } @@ -82,13 +89,15 @@ class StatsReporter { if (this._lastFlushMinute === minuteKey) { return; } - const { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted } = this.stats.snapshotAndResetMinute(); + const { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted, g5Written, g5WriteFailed } = this.stats.snapshotAndResetMinute(); this._lastFlushMinute = minuteKey; const ts = formatTimestamp(new Date()); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Legacy写入量: ${dbWritten}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Legacy写入失败量: ${dbWriteFailed}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot写入量: ${g4HotWritten}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot写入失败量: ${g4HotWriteFailed}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G5写入量: ${g5Written}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G5写入失败量: ${g5WriteFailed}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} RoomStatus写入量: ${roomStatusWritten}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} RoomStatus失败量: ${roomStatusFailed}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot错误表插入量: ${g4HotErrorTableInserted}条`, metadata: { module: 'stats' } }); diff --git a/test/dualWrite.test.js b/test/dualWrite.test.js index cbdd448..e79f12b 100644 --- a/test/dualWrite.test.js +++ b/test/dualWrite.test.js @@ -22,6 +22,7 @@ const buildBasePayload = () => ({ function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn = false, g4HotEnabled = false, g4HotSuccess = true, g4HotConn = false, + g5Enabled = false, g5Success = true, g5Conn = false, insertedCount = 1, failedRecords = [] } = {}) { return { legacy: { @@ -34,6 +35,11 @@ function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn failedRecords: g4HotSuccess ? [] : failedRecords, error: g4HotSuccess ? null : new Error('g4hot fail'), isConnectionError: g4HotConn, batchError: g4HotSuccess ? null : new Error('g4hot fail'), }, + g5: { + enabled: g5Enabled, success: g5Success, insertedCount: g5Success ? insertedCount : 0, + failedRecords: g5Success ? [] : failedRecords, error: g5Success ? null : new Error('g5 fail'), + isConnectionError: g5Conn, batchError: g5Success ? null : new Error('g5 fail'), + }, }; } @@ -45,6 +51,7 @@ function buildMockDb(overrides = {}) { roomStatusEnabled: true, legacyTable: 'heartbeat.heartbeat_events', g4HotTable: 'heartbeat.heartbeat_events_g4_hot', + roomStatusTable: 'room_status.room_status_moment', ...overrides.config, }, insertHeartbeatEventsDual: overrides.insertHeartbeatEventsDual ?? (async () => makeDualResult()), @@ -55,11 +62,40 @@ function buildMockDb(overrides = {}) { }; } -function buildProcessor(dbOverrides = {}, processorConfig = {}) { +function buildMockG5Db(overrides = {}) { + return { + config: { + enabled: true, + g5HeartbeatEnabled: true, + g5Table: 'heartbeat.heartbeat_events_g5', + roomStatusEnabled: true, + roomStatusTable: 'room_status.room_status_moment_g5', + ...overrides.config, + }, + insertHeartbeatEventsG5: overrides.insertHeartbeatEventsG5 ?? (async () => ({ + enabled: true, + success: true, + insertedCount: 1, + failedRecords: [], + error: null, + isConnectionError: false, + batchError: null, + })), + upsertRoomStatusG5: overrides.upsertRoomStatusG5 ?? (async () => ({ rowCount: 1 })), + _isDbConnectionError: overrides._isDbConnectionError ?? (() => false), + }; +} + +function buildProcessor(dbOverrides = {}, processorConfig = {}, g5Overrides = null) { const db = buildMockDb(dbOverrides); + const deps = {}; + if (g5Overrides) { + deps.g5DatabaseManager = buildMockG5Db(g5Overrides); + } return new HeartbeatProcessor( { batchSize: 1, batchTimeout: 1000, ...processorConfig }, - db + db, + deps ); } @@ -218,6 +254,112 @@ describe('Dual-write: room_status 始终执行', () => { }); }); +describe('RoomStatus dual-write', () => { + it('should write old and g5 room_status independently', async () => { + let oldCalled = false; + let g5Called = false; + const processor = buildProcessor( + { + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }), + upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; }, + }, + {}, + { + config: { roomStatusEnabled: true }, + upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; }, + } + ); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + await new Promise(r => setTimeout(r, 50)); + assert.equal(oldCalled, true); + assert.equal(g5Called, true); + }); + + it('should allow old room_status off while g5 room_status stays on', async () => { + let oldCalled = false; + let g5Called = false; + const processor = buildProcessor( + { + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: false }, + insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }), + upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; }, + }, + {}, + { + config: { roomStatusEnabled: true }, + upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; }, + } + ); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + await processor.processMessage(msg); + await new Promise(r => setTimeout(r, 50)); + assert.equal(oldCalled, false); + assert.equal(g5Called, true); + }); +}); + +describe('G5-write: 独立写库', () => { + it('should write to g5 independently after legacy/g4 succeed', async () => { + let g5Captured = null; + const processor = buildProcessor( + { + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, + insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: true }), + }, + {}, + { + insertHeartbeatEventsG5: async (events) => { + g5Captured = events; + return { + enabled: true, + success: true, + insertedCount: events.length, + failedRecords: [], + error: null, + isConnectionError: false, + batchError: null, + }; + }, + } + ); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + const res = await processor.processMessage(msg); + assert.deepEqual(res, { insertedCount: 1 }); + assert.ok(Array.isArray(g5Captured)); + assert.equal(g5Captured.length, 1); + }); + + it('should not block main flow when g5 write fails', async () => { + const processor = buildProcessor( + { + config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false }, + insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }), + }, + {}, + { + insertHeartbeatEventsG5: async () => ({ + enabled: true, + success: false, + insertedCount: 0, + failedRecords: [{ error: new Error('g5 fail'), record: buildBasePayload() }], + error: new Error('g5 fail'), + isConnectionError: false, + batchError: new Error('g5 fail'), + }), + } + ); + + const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; + const res = await processor.processMessage(msg); + assert.deepEqual(res, { insertedCount: 1 }); + }); +}); + describe('Dual-write: 暂停消费策略', () => { it('should NOT pause when both detail writes disabled but room_status enabled', async () => { let paused = false; @@ -371,6 +513,48 @@ describe('DatabaseManager: _g4HotToRowValues', () => { const guid = values[cols.indexOf('guid')]; assert.equal(guid, 'a0b1c2d3e4f56789abcdef0123456789'); }); + + it('omits guid column and value for g5 rows', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const cols = dm._getG5Columns(); + const values = dm._g5ToRowValues(buildBasePayload()); + assert.equal(cols.includes('guid'), false); + assert.equal(values.length, cols.length); + }); + + it('writes g5 base array columns as null', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); + const cols = dm._getG5Columns(); + const values = dm._g5ToRowValues({ + ...buildBasePayload(), + service_mask: 7, + elec_address: ['e1', 'e2'], + air_address: ['ac1', 'ac2'], + voltage: [220.5, 221.5], + ampere: [1.1, 1.2], + power: [100, 200], + phase: ['A', 'B'], + energy: [10, 20], + sum_energy: [100, 200], + state: [1, 0], + model: [2, 3], + speed: [1, 2], + set_temp: [24, 25], + now_temp: [26, 27], + solenoid_valve: [1, 0], + extra: { source: 'test' }, + }); + + for (const column of ['service_mask', 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', 'now_temp', 'solenoid_valve', 'extra']) { + assert.equal(values[cols.indexOf(column)], null); + } + + assert.equal(values[cols.indexOf('svc_01')], true); + assert.equal(values[cols.indexOf('svc_02')], true); + assert.equal(values[cols.indexOf('svc_03')], true); + assert.equal(values[cols.indexOf('air_address_1')], 'ac1'); + assert.equal(values[cols.indexOf('elec_address_1')], 'e1'); + }); }); describe('DatabaseManager: _formatPgCol', () => { @@ -382,6 +566,37 @@ describe('DatabaseManager: _formatPgCol', () => { }); }); +describe('DatabaseManager: room_status upsert SQL', () => { + it('does not update ts_ms in old room_status DO UPDATE SET', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' }); + const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], { + tableName: 'room_status.room_status_moment', + conflictColumns: ['hotel_id', 'room_id', 'device_id'], + includeGuid: true, + autoCreatePartitions: true, + tableRef: 'room_status.room_status_moment', + logPrefix: 'upsertRoomStatus', + }); + assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id, device_id\)/); + assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/); + }); + + it('uses hotel_id and room_id as conflict key for g5 room_status', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); + const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], { + tableName: 'room_status.room_status_moment_g5', + conflictColumns: ['hotel_id', 'room_id'], + includeGuid: false, + autoCreatePartitions: false, + tableRef: 'room_status.room_status_moment_g5', + logPrefix: 'upsertRoomStatusG5', + }); + assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id\)/); + assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/); + assert.equal(/guid/.test(built.sql), false); + }); +}); + describe('DatabaseManager: insertHeartbeatEventsDual', () => { it('returns empty results when both targets disabled', async () => { const dm = new DatabaseManager({ diff --git a/test/stats.test.js b/test/stats.test.js index fcd8152..417f018 100644 --- a/test/stats.test.js +++ b/test/stats.test.js @@ -9,28 +9,36 @@ describe('StatsCounters', () => { stats.incFiltered(2); stats.incKafkaPulled(5); stats.incDbWriteFailed(4); + stats.incG5Written(6); + stats.incG5WriteFailed(1); const first = stats.snapshotAndResetMinute(); assert.equal(first.dbWritten, 3n); assert.equal(first.filtered, 2n); assert.equal(first.kafkaPulled, 5n); assert.equal(first.dbWriteFailed, 4n); + assert.equal(first.g5Written, 6n); + assert.equal(first.g5WriteFailed, 1n); const second = stats.snapshotAndResetMinute(); assert.equal(second.dbWritten, 0n); assert.equal(second.filtered, 0n); assert.equal(second.kafkaPulled, 0n); assert.equal(second.dbWriteFailed, 0n); + assert.equal(second.g5Written, 0n); + assert.equal(second.g5WriteFailed, 0n); }); }); describe('StatsReporter', () => { - it('writes three [STATS] info logs to redis console', () => { + it('writes all [STATS] info logs to redis console', () => { const stats = new StatsCounters(); stats.incDbWritten(7); stats.incFiltered(8); stats.incKafkaPulled(9); stats.incDbWriteFailed(2); + stats.incG5Written(5); + stats.incG5WriteFailed(1); const calls = { push: [] }; const redis = { @@ -43,17 +51,19 @@ describe('StatsReporter', () => { const reporter = new StatsReporter({ redis, stats }); reporter.flushOnce(); - assert.equal(calls.push.length, 9); + assert.equal(calls.push.length, 11); for (const c of calls.push) assert.equal(c.level, 'info'); assert.match(calls.push[0].message, /Legacy写入量: 7条$/); assert.match(calls.push[1].message, /Legacy写入失败量: 2条$/); assert.match(calls.push[2].message, /G4Hot写入量: 0条$/); assert.match(calls.push[3].message, /G4Hot写入失败量: 0条$/); - assert.match(calls.push[4].message, /RoomStatus写入量: 0条$/); - assert.match(calls.push[5].message, /RoomStatus失败量: 0条$/); - assert.match(calls.push[6].message, /G4Hot错误表插入量: 0条$/); - assert.match(calls.push[7].message, /数据过滤量: 8条$/); - assert.match(calls.push[8].message, /Kafka拉取量: 9条$/); + assert.match(calls.push[4].message, /G5写入量: 5条$/); + assert.match(calls.push[5].message, /G5写入失败量: 1条$/); + assert.match(calls.push[6].message, /RoomStatus写入量: 0条$/); + assert.match(calls.push[7].message, /RoomStatus失败量: 0条$/); + assert.match(calls.push[8].message, /G4Hot错误表插入量: 0条$/); + assert.match(calls.push[9].message, /数据过滤量: 8条$/); + assert.match(calls.push[10].message, /Kafka拉取量: 9条$/); }); });