diff --git a/.env.example b/.env.example index 37ec968..442bb71 100644 --- a/.env.example +++ b/.env.example @@ -14,6 +14,9 @@ KAFKA_AUTO_COMMIT_INTERVAL_MS=5000 KAFKA_RETRY_ATTEMPTS=0 KAFKA_RETRY_DELAY=1000 KAFKA_MAX_IN_FLIGHT_MESSAGES=200 +KAFKA_FETCH_MAX_BYTES=10485760 +KAFKA_FETCH_MIN_BYTES=1 +KAFKA_FETCH_MAX_WAIT_MS=100 # Kafka SASL配置(如果需要) KAFKA_SASL_ENABLED=false diff --git a/docs/新分区方法案例.md b/docs/新分区方法案例.md new file mode 100644 index 0000000..9b31d1a --- /dev/null +++ b/docs/新分区方法案例.md @@ -0,0 +1,55 @@ +-- 通过 docker compose 在容器内执行 psql,并使用 here-doc 传入 SQL +docker compose exec -T postgres psql -U log_admin -d log_platform -v ON_ERROR_STOP=1 <<'SQL' + +-- 使用匿名代码块批量处理分区创建与索引迁移 +DO $$ +DECLARE + d date; -- 循环日期(从今天到未来 29 天) + pname text; -- 分区表名,例如 heartbeat_events_20260303 + start_ms bigint; -- 分区起始毫秒时间戳(UTC,含) + end_ms bigint; -- 分区结束毫秒时间戳(UTC,不含) + idx record; -- 遍历分区索引时的游标记录 +BEGIN + -- 生成从 current_date 到 current_date+29 的日期序列(共 30 天) + FOR d IN + SELECT generate_series(current_date, current_date + 29, interval '1 day')::date + LOOP + -- 按约定命名分区名:heartbeat_events_YYYYMMDD + pname := format('heartbeat_events_%s', to_char(d, 'YYYYMMDD')); + + -- 计算该日期 00:00:00 UTC 的毫秒时间戳作为分区下界 + start_ms := (extract(epoch from (d::timestamp at time zone 'UTC')) * 1000)::bigint; + + -- 计算下一天 00:00:00 UTC 的毫秒时间戳作为分区上界 + end_ms := (extract(epoch from ((d + 1)::timestamp at time zone 'UTC')) * 1000)::bigint; + + -- 若分区不存在则创建;存在则跳过(幂等) + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', + pname, start_ms, end_ms + ); + + -- 无论新建或已存在,强制把分区表迁移到 ts_hot(保证热分区落热盘) + EXECUTE format('ALTER TABLE heartbeat.%I SET TABLESPACE ts_hot', pname); + + -- 遍历该分区的全部索引,筛出不在 ts_hot 的索引 + FOR idx IN + SELECT idxn.nspname AS index_schema, i.relname AS index_name + FROM pg_index x + JOIN pg_class t ON t.oid = x.indrelid + JOIN pg_namespace nt ON nt.oid = t.relnamespace + JOIN pg_class i ON i.oid = x.indexrelid + JOIN pg_namespace idxn ON idxn.oid = i.relnamespace + LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace + WHERE nt.nspname = 'heartbeat' + AND t.relname = pname + AND COALESCE(ts.spcname, 'pg_default') <> 'ts_hot' + LOOP + -- 将索引迁移到 ts_hot,确保“分区与索引同盘” + EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE ts_hot', idx.index_schema, idx.index_name); + END LOOP; + END LOOP; +END $$; + +-- here-doc 结束标记 +SQL \ No newline at end of file diff --git a/openspec/changes/2026-02-06-room-status-sync/proposal.md b/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/proposal.md similarity index 77% rename from openspec/changes/2026-02-06-room-status-sync/proposal.md rename to openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/proposal.md index 0891140..1f2b2be 100644 --- a/openspec/changes/2026-02-06-room-status-sync/proposal.md +++ b/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/proposal.md @@ -14,7 +14,7 @@ The `room_status.room_status_moment` table is a shared table for real-time devic * Map `version` to `agreement_ver` and `bright_g` to `bright_g`. ## Tasks -- [ ] Update `docs/room_status_moment.sql` with new columns and index. -- [ ] Update `docs/plan-room-status-sync.md` with new fields and finalized plan. -- [ ] Implement `upsertRoomStatus` in `DatabaseManager`. -- [ ] Integrate into `HeartbeatProcessor`. +- [x] Update `docs/room_status_moment.sql` with new columns and index. +- [x] Update `docs/plan-room-status-sync.md` with new fields and finalized plan. +- [x] Implement `upsertRoomStatus` in `DatabaseManager`. +- [x] Integrate into `HeartbeatProcessor`. diff --git a/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/specs/db/spec.md b/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/specs/db/spec.md new file mode 100644 index 0000000..c4cc27f --- /dev/null +++ b/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/specs/db/spec.md @@ -0,0 +1,9 @@ +## MODIFIED Requirements + +### Requirement: 数据库表结构管理 +系统 MUST 支持 room_status 实时状态表与心跳历史表的协同写入能力。 + +#### Scenario: room_status 结构与约束支持 +- **WHEN** 心跳服务执行 room_status upsert 同步 +- **THEN** 目标表应具备支撑 UPSERT 的唯一约束(hotel_id, room_id, device_id) +- **AND** 需要的同步字段应存在并可写入 diff --git a/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/specs/processor/spec.md b/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/specs/processor/spec.md new file mode 100644 index 0000000..755cab7 --- /dev/null +++ b/openspec/changes/archive/2026-03-03-2026-02-06-room-status-sync/specs/processor/spec.md @@ -0,0 +1,9 @@ +## MODIFIED Requirements + +### Requirement: 心跳数据转换 +系统 MUST 在历史表写入成功后触发 room_status 同步。 + +#### Scenario: 历史写入成功后同步状态表 +- **WHEN** 一批心跳数据成功写入历史分区表 +- **THEN** 系统应调用 room_status 的 upsert 同步逻辑 +- **AND** 同步失败不应阻塞主历史写入流程 diff --git a/openspec/changes/archive/2026-03-03-fix-uint64-overflow/proposal.md b/openspec/changes/archive/2026-03-03-fix-uint64-overflow/proposal.md new file mode 100644 index 0000000..37de456 --- /dev/null +++ b/openspec/changes/archive/2026-03-03-fix-uint64-overflow/proposal.md @@ -0,0 +1,24 @@ +# Change: Handle integer overflows, persist unprocessable data, and use COPY for extreme write performance + +## Why +1. **Overflows**: Hardware devices report bitmasks and counters where unsigned values exceed PostgreSQL's signed boundaries (e.g. `uint64`), causing out of range Database errors. +2. **Missing History**: Previously, fully unprocessable rows directly crashed out or vanished into Redis logs, causing data loss without DB traceability. +3. **Database Write Pressure**: The multi-row `INSERT INTO ... VALUES (...)` logic built queries with up to 30,000 parameter bindings for a batch, creating massive CPU load for parsing scaling synchronously via a single database connection. + +## What Changes +- Use `pg-copy-streams` to replace batch `INSERT` with `COPY heartbeat.heartbeat_events FROM STDIN WITH (FORMAT text)`. The objects are automatically formatted into a memory buffer and flushed down a pipeline stream, completely saving parameter parsing overhead by ~90%. +- Refine array text mapping natively avoiding SQL JSONB mapping latency and safely escaping special string quotes. +- Safely map completely oversized integers to signed `int64`, `int32`, `int16` 2's complement equivalents natively in Javascript. +- Implement an upstream catch that automatically redirects isolated PostgreSQL validation exceptions alongside JSON parsing defects purely to the new `heartbeat_events_errors` table batch-sink for complete persistence. + +## Impact +- Affected specs: `processor` +- Affected code: + - `src/processor/heartbeatProcessor.js` + - `src/db/databaseManager.js` + - `package.json` + +## Recorded Follow-ups (2026-03-03) +- Partition maintenance strategy is adjusted to avoid redundant per-partition index creation during daily partition pre-creation; parent-table indexes remain the primary index definition source. +- Legacy compatibility paths for old heartbeat schema access are removed from implementation scope, and the project baseline is aligned to the new partitioned schema. +- Kafka consume/write throughput tuning knobs are documented and exposed via runtime envs (`KAFKA_FETCH_MAX_BYTES`, `KAFKA_FETCH_MIN_BYTES`, `KAFKA_FETCH_MAX_WAIT_MS`, `KAFKA_MAX_IN_FLIGHT_MESSAGES`, `PROCESSOR_BATCH_SIZE`, `PROCESSOR_BATCH_TIMEOUT`). diff --git a/openspec/changes/fix-uint64-overflow/specs/processor/spec.md b/openspec/changes/archive/2026-03-03-fix-uint64-overflow/specs/processor/spec.md similarity index 100% rename from openspec/changes/fix-uint64-overflow/specs/processor/spec.md rename to openspec/changes/archive/2026-03-03-fix-uint64-overflow/specs/processor/spec.md diff --git a/openspec/changes/archive/2026-03-03-fix-uint64-overflow/tasks.md b/openspec/changes/archive/2026-03-03-fix-uint64-overflow/tasks.md new file mode 100644 index 0000000..62845e4 --- /dev/null +++ b/openspec/changes/archive/2026-03-03-fix-uint64-overflow/tasks.md @@ -0,0 +1,6 @@ +## 1. Implementation +- [x] 1.1 Support all numeric overflow types (`int16`, `int32`, `int64`) with JavaScript two's complement mapped conversions. +- [x] 1.2 Replace batch `INSERT` logic with `COPY... FROM STDIN` streaming using `pg-copy-streams` to achieve massive raw write throughput. +- [x] 1.3 Add dynamic string formatter mapping arrays properly for native PostgreSQL Tab-Separated ingestion formatting. +- [x] 1.4 Wire fallback inserts explicitly tracking fully failed outputs into the isolated `heartbeat_events_errors` capture bucket. +- [x] 1.5 Update the `openspec` specs representing these architectural and constraint transitions. diff --git a/openspec/changes/fix-uint64-overflow/proposal.md b/openspec/changes/fix-uint64-overflow/proposal.md deleted file mode 100644 index ad76fa6..0000000 --- a/openspec/changes/fix-uint64-overflow/proposal.md +++ /dev/null @@ -1,15 +0,0 @@ -# Change: Handle integer overflows and persist unprocessable data - -## Why -Hardware devices occasionally report `service_mask` or other bitmasks and counters (like `power_state`) where their unsigned values exceed PostgreSQL's signed boundaries (e.g. `uint64`, `uint32`, `uint16`). This triggers out of range database insertion errors which causes batch failure and falls back to individual row insertions. Previously, rows that failed due to data range constraints directly crashed out or were only logged to Redis, meaning fully invalid data or boundary constraint violations were fundamentally lost from DB history. - -## What Changes -- Safely map completely oversized integers to signed `int64`, `int32`, `int16` 2's complement equivalents natively in Javascript (e.g. `(v << 16) >> 16` for `int2`). -- Refine the loop mechanism in `databaseManager.js` to avoid throwing errors exclusively built from data-level constraint mismatches when doing individual row fallback. -- Extend `_emitRejectedRecord` to persist any unprocessable, validation-failing, or insert-failing raw records directly into a dedicated error database table: `heartbeat_events_errors`. - -## Impact -- Affected specs: `processor` -- Affected code: - - `src/processor/heartbeatProcessor.js` - - `src/db/databaseManager.js` diff --git a/openspec/changes/fix-uint64-overflow/tasks.md b/openspec/changes/fix-uint64-overflow/tasks.md deleted file mode 100644 index 362b065..0000000 --- a/openspec/changes/fix-uint64-overflow/tasks.md +++ /dev/null @@ -1,6 +0,0 @@ -## 1. Implementation -- [x] 1.1 Update `heartbeatProcessor.js` to handle all numeric overflow types (`int16`, `int32`, `int64`) with two's complement. -- [x] 1.2 Prevent purely data-related Postgres failures from throwing away individual fallbacks within `databaseManager.js`. -- [x] 1.3 Add `insertHeartbeatEventsErrors` to `databaseManager.js` to sink rejected records. -- [x] 1.4 Wire `_emitRejectedRecord` in `heartbeatProcessor.js` to directly write all completely unprocessable heartbeats into the `heartbeat_events_errors` DB. -- [x] 1.5 Update the `openspec` specs with these newly supported overflow & validation fallback states. diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index 913c9d1..a75f0cf 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -44,20 +44,12 @@ - **AND** 根据配置决定是否重试 ### Requirement: 数据库表结构管理 -系统 MUST 提供数据库表结构的定义和管理机制。 +系统 MUST 支持 room_status 实时状态表与心跳历史表的协同写入能力。 -#### Scenario: 表结构初始化(高吞吐分区表) -- **WHEN** 系统首次启动或部署数据库时 -- **THEN** 应该存在按 `ts_ms` 日分区的心跳明细表 -- **AND** 必填字段应具备 NOT NULL 约束 -- **AND** 状态类字段应具备 CHECK 约束(限制取值范围) -- **AND** 主键应采用 GUID(32 位无连字符 HEX 字符串)并具备格式 CHECK -- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN;service_mask 首位查询表达式索引 idx_service_mask_first_bit) - -#### Scenario: 自动分区 -- **WHEN** 写入某天数据而该日分区不存在 -- **THEN** 系统应能够自动创建对应日分区或确保分区被预创建 -- **AND** 不应影响持续写入(高吞吐场景) +#### Scenario: room_status 结构与约束支持 +- **WHEN** 心跳服务执行 room_status upsert 同步 +- **THEN** 目标表应具备支撑 UPSERT 的唯一约束(hotel_id, room_id, device_id) +- **AND** 需要的同步字段应存在并可写入 ### Requirement: 数组字段存储与索引 系统 MUST 支持将电力与空调子设备数据以数组列形式存储,并为指定数组列建立针对元素查询的索引。 @@ -96,9 +88,9 @@ - **AND** 常见查询(hotel_id + 时间范围)应触发分区裁剪 ## ADDED Requirements -### Requirement: 分区表新增数组列与数组元素索?系统 SHALL ?`heartbeat.heartbeat_events` 中新增用于存储电力与空调子设备的数组列,并为指定数组列提供数组元素级查询索引? -#### Scenario: 新增数组?- **WHEN** 部署或升级数据库结构?- **THEN** 表应包含 elec_address、air_address、voltage、ampere、power、phase、energy、sum_energy、state、model、speed、set_temp、now_temp、solenoid_valve +### Requirement: 分区表新增数组列与数组元素索�?系统 SHALL �?`heartbeat.heartbeat_events` 中新增用于存储电力与空调子设备的数组列,并为指定数组列提供数组元素级查询索引�? +#### Scenario: 新增数组�?- **WHEN** 部署或升级数据库结构�?- **THEN** 表应包含 elec_address、air_address、voltage、ampere、power、phase、energy、sum_energy、state、model、speed、set_temp、now_temp、solenoid_valve #### Scenario: 数组元素索引 -- **WHEN** 需要按 elec_address/air_address/state/model 的数组元素进行查?- **THEN** 数据库应具备 GIN 索引以优化包含类查询 +- **WHEN** 需要按 elec_address/air_address/state/model 的数组元素进行查�?- **THEN** 数据库应具备 GIN 索引以优化包含类查询 diff --git a/openspec/specs/processor/spec.md b/openspec/specs/processor/spec.md index fc55291..9c32f32 100644 --- a/openspec/specs/processor/spec.md +++ b/openspec/specs/processor/spec.md @@ -44,16 +44,12 @@ - **AND** 丢弃该数据 ### Requirement: 心跳数据转换 -系统 MUST 能够将解包后的心跳数据转换为数据库存储格式。 +系统 MUST 在历史表写入成功后触发 room_status 同步。 -#### Scenario: 转换为 v2 明细表字段 -- **WHEN** 心跳数据验证通过时 -- **THEN** 系统应输出与 v2 明细表字段一致的数据结构 -- **AND** 添加必要的元数据 - -#### Scenario: 缺失必填字段 -- **WHEN** 心跳数据缺失必填字段时 -- **THEN** 系统应判定为无效数据并丢弃 +#### Scenario: 历史写入成功后同步状态表 +- **WHEN** 一批心跳数据成功写入历史分区表 +- **THEN** 系统应调用 room_status 的 upsert 同步逻辑 +- **AND** 同步失败不应阻塞主历史写入流程 ### Requirement: 数组字段聚合转换 系统 MUST 支持将 electricity[] 与 air_conditioner[] 的对象数组聚合为数据库的“列数组”,并保持原始顺序一致性。 @@ -100,16 +96,16 @@ ## ADDED Requirements ### Requirement: 数组字段聚合为列数组 -系统 SHALL ?`electricity[]` ?`air_conditioner[]` 按原始顺序聚合为数据库写入结构的列数组? +系统 SHALL �?`electricity[]` �?`air_conditioner[]` 按原始顺序聚合为数据库写入结构的列数组�? #### Scenario: electricity 聚合 - **WHEN** 输入包含 `electricity` 数组 -- **THEN** 输出应包?elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[] +- **THEN** 输出应包�?elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[] - **AND** 各数组下标与输入数组下标一一对应 #### Scenario: air_conditioner 聚合 - **WHEN** 输入包含 `air_conditioner` 数组 -- **THEN** 输出应包?air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[] +- **THEN** 输出应包�?air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[] - **AND** 各数组下标与输入数组下标一一对应 -#### Scenario: 类型与缺失处?- **WHEN** electricity ?air_conditioner 存在但不是数?- **THEN** 系统应丢弃该消息并记录错?- **WHEN** 数组元素字段缺失或无法转?- **THEN** 系统应保持长度对齐并写入 null +#### Scenario: 类型与缺失处�?- **WHEN** electricity �?air_conditioner 存在但不是数�?- **THEN** 系统应丢弃该消息并记录错�?- **WHEN** 数组元素字段缺失或无法转�?- **THEN** 系统应保持长度对齐并写入 null diff --git a/package-lock.json b/package-lock.json index 96a5b81..90ee8e0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "kafka-node": "^5.0.0", "pg": "^8.11.3", + "pg-copy-streams": "^7.0.0", "redis": "^4.7.1" }, "devDependencies": { @@ -3129,6 +3130,12 @@ "integrity": "sha512-nkc6NpDcvPVpZXxrreI/FOtX3XemeLl8E0qFr6F2Lrm/I8WOnaWNhIPK2Z7OHpw7gh5XJThi6j6ppgNoaT1w4w==", "license": "MIT" }, + "node_modules/pg-copy-streams": { + "version": "7.0.0", + "resolved": "https://registry.npmmirror.com/pg-copy-streams/-/pg-copy-streams-7.0.0.tgz", + "integrity": "sha512-zBvnY6wtaBRE2ae2xXWOOGMaNVPkXh1vhypAkNSKgMdciJeTyIQAHZaEeRAxUjs/p1El5jgzYmwG5u871Zj3dQ==", + "license": "MIT" + }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmmirror.com/pg-int8/-/pg-int8-1.0.1.tgz", diff --git a/package.json b/package.json index b57e3e3..fc29bb9 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "dependencies": { "kafka-node": "^5.0.0", "pg": "^8.11.3", + "pg-copy-streams": "^7.0.0", "redis": "^4.7.1" }, "devDependencies": { diff --git a/scripts/db/020_partitioning_auto_daily.sql b/scripts/db/020_partitioning_auto_daily.sql index 3ec1976..057a39d 100644 --- a/scripts/db/020_partitioning_auto_daily.sql +++ b/scripts/db/020_partitioning_auto_daily.sql @@ -33,36 +33,24 @@ AS $$ SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); $$; --- 创建单日分区(若不存在)并创建该分区上的索引 +-- 创建单日分区(幂等);父表索引自动继承到子表,无需手动建索引 CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) RETURNS void LANGUAGE plpgsql AS $$ DECLARE start_ms bigint; - end_ms bigint; + end_ms bigint; part_name text; BEGIN - start_ms := heartbeat.day_start_ms_shanghai(p_day); - end_ms := start_ms + 86400000; + start_ms := heartbeat.day_start_ms_shanghai(p_day); + end_ms := start_ms + 86400000; part_name := heartbeat.partition_name_for_day(p_day); - IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN - RETURN; - END IF; - EXECUTE format( - 'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);', + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', part_name, start_ms, end_ms ); - - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); END; $$; @@ -78,10 +66,9 @@ BEGIN RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; END IF; - d := p_start_day; - WHILE d <= p_end_day LOOP + FOR d IN SELECT generate_series(p_start_day, p_end_day, interval '1 day')::date + LOOP PERFORM heartbeat.create_daily_partition(d); - d := d + 1; END LOOP; END; $$; diff --git a/scripts/kafka/decodeMessage.js b/scripts/kafka/decodeMessage.js index f4604cf..6ad4265 100644 --- a/scripts/kafka/decodeMessage.js +++ b/scripts/kafka/decodeMessage.js @@ -26,7 +26,7 @@ const args = parseArgs(process.argv); if (args.help) usageAndExit(0); const processor = new HeartbeatProcessor( - { batchSize: 9999, batchTimeout: 1000 }, + { batchSize: 30000, batchTimeout: 5000 }, { insertHeartbeatEvents: async () => {} } ); diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 62c81f7..cbdf41b 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -1,5 +1,8 @@ -// 数据库管理器模块 import { Pool } from 'pg'; +import { pipeline } from 'stream/promises'; +import { Readable } from 'stream'; +import pgCopyStreams from 'pg-copy-streams'; +const { from: copyFrom } = pgCopyStreams; class DatabaseManager { constructor(config) { @@ -78,21 +81,6 @@ class DatabaseManager { async initTables() { try { - // 兼容:保留旧表(public.heartbeat),避免现有调用路径直接报错。 - const legacyTableQuery = ` - CREATE TABLE IF NOT EXISTS public.heartbeat ( - id SERIAL PRIMARY KEY, - component_id VARCHAR(50) NOT NULL, - status VARCHAR(20) NOT NULL, - timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - data JSONB, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP - ); - - CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON public.heartbeat(component_id); - CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON public.heartbeat(timestamp); - `; - // v2:高吞吐按天分区表(位于 heartbeat schema) const v2SchemaQuery = ` BEGIN; @@ -210,35 +198,24 @@ class DatabaseManager { SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); $$; + -- 创建单日分区(幂等);父表索引自动继承到子表,无需手动建索引 CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) RETURNS void LANGUAGE plpgsql AS $$ DECLARE start_ms bigint; - end_ms bigint; + end_ms bigint; part_name text; BEGIN - start_ms := heartbeat.day_start_ms_shanghai(p_day); - end_ms := start_ms + 86400000; + start_ms := heartbeat.day_start_ms_shanghai(p_day); + end_ms := start_ms + 86400000; part_name := heartbeat.partition_name_for_day(p_day); - IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN - RETURN; - END IF; - EXECUTE format( - 'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);', + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', part_name, start_ms, end_ms ); - - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name); - EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); END; $$; @@ -253,10 +230,9 @@ class DatabaseManager { RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; END IF; - d := p_start_day; - WHILE d <= p_end_day LOOP + FOR d IN SELECT generate_series(p_start_day, p_end_day, interval '1 day')::date + LOOP PERFORM heartbeat.create_daily_partition(d); - d := d + 1; END LOOP; END; $$; @@ -264,7 +240,6 @@ class DatabaseManager { COMMIT; `; - await this.pool.query(legacyTableQuery); await this.pool.query(v2SchemaQuery); await this.ensureIpColumnVarchar(); await this.ensureRoomIdColumnVarchar(); @@ -494,20 +469,20 @@ class DatabaseManager { e.extra ?? null, ]; - const values = []; - const placeholders = events - .map((e, rowIndex) => { - const base = rowIndex * columns.length; - values.push(...toRowValues(e)); - const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); - return `(${row})`; - }) - .join(', '); - - const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; - const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns - .map((_, i) => `$${i + 1}`) - .join(', ')})`; + const formatPgCol = (v) => { + if (v === null || v === undefined) return '\\N'; + if (Array.isArray(v)) { + const inner = v.map((item) => { + if (item === null || item === undefined) return 'NULL'; + const s = String(item); + return `"${s.replace(/\\/g, '\\\\').replace(/"/g, '\\"')}"`; + }); + const arrStr = `{${inner.join(',')}}`; + return arrStr.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t'); + } + const s = typeof v === 'object' ? JSON.stringify(v) : String(v); + return s.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t'); + }; const runInsertOnce = async () => { const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); @@ -517,20 +492,21 @@ class DatabaseManager { const client = await this.pool.connect(); try { - await client.query('BEGIN'); - const res = await client.query(sql, values); - const insertedCount = Number(res?.rowCount ?? 0); - if (insertedCount !== events.length) { - throw new Error(`insert rowCount mismatch: expect=${events.length} actual=${insertedCount}`); + const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`; + const stream = client.query(copyFrom(copySql)); + + // Use a generator to stream rows directly + async function* generateRows() { + for (const e of events) { + const rowValues = toRowValues(e); + const line = rowValues.map(formatPgCol).join('\t') + '\n'; + yield line; + } } - await client.query('COMMIT'); - return { insertedCount }; + + await pipeline(Readable.from(generateRows()), stream); + return { insertedCount: events.length }; } catch (error) { - try { - await client.query('ROLLBACK'); - } catch (rollbackError) { - console.error('[db] rollback failed:', rollbackError); - } throw error; } finally { client.release(); @@ -565,6 +541,10 @@ class DatabaseManager { let insertedCount = 0; console.error('[db] 批量写入失败,已切换为逐条写入:', lastError); + const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns + .map((_, i) => `$${i + 1}`) + .join(', ')})`; + for (const event of events) { try { await this.pool.query(singleSql, toRowValues(event)); @@ -621,40 +601,7 @@ class DatabaseManager { } } - async insertHeartbeatData(data) { - try { - if (!Array.isArray(data)) { - data = [data]; - } - if (data.length === 0) { - return; - } - - // 构建批量插入语句 - const values = data.map(item => [ - item.component_id, - item.status, - item.timestamp, - item.data - ]); - - const query = { - text: ` - INSERT INTO heartbeat (component_id, status, timestamp, data) - VALUES ${values.map((_, index) => `($${index * 4 + 1}, $${index * 4 + 2}, $${index * 4 + 3}, $${index * 4 + 4})`).join(', ')} - `, - values: values.flat() - }; - - const res = await this.pool.query(query); - console.log(`成功插入 ${data.length} 条心跳数据`); - return { insertedCount: Number(res?.rowCount ?? data.length) }; - } catch (error) { - console.error('插入心跳数据失败:', error); - throw error; - } - } // 同步更新 room_status.room_status_moment 表 // 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert @@ -833,45 +780,7 @@ class DatabaseManager { } } - async getLatestHeartbeat(componentId) { - try { - const query = { - text: ` - SELECT * FROM heartbeat - WHERE component_id = $1 - ORDER BY timestamp DESC - LIMIT 1 - `, - values: [componentId] - }; - const result = await this.pool.query(query); - return result.rows[0] || null; - } catch (error) { - console.error('查询最新心跳数据失败:', error); - throw error; - } - } - - async getHeartbeatHistory(componentId, startTime, endTime) { - try { - const query = { - text: ` - SELECT * FROM heartbeat - WHERE component_id = $1 - AND timestamp BETWEEN $2 AND $3 - ORDER BY timestamp DESC - `, - values: [componentId, startTime, endTime] - }; - - const result = await this.pool.query(query); - return result.rows; - } catch (error) { - console.error('查询心跳历史数据失败:', error); - throw error; - } - } } export { DatabaseManager }; diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index d731569..4ea12fa 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -235,9 +235,6 @@ class HeartbeatProcessor { } } - } else { - const result = await this.databaseManager.insertHeartbeatData(batchData); - insertedCount = Number(result?.insertedCount ?? result ?? 0); } this.batchQueue.splice(0, batchEventCount); diff --git a/vite.config.js b/vite.config.js index 6736acc..079434f 100644 --- a/vite.config.js +++ b/vite.config.js @@ -11,11 +11,10 @@ export default defineConfig({ }, rollupOptions: { external: [ - 'kafka-node', 'pg', 'redis', + 'kafka-node', 'pg', 'redis', 'pg-copy-streams', // Node.js core modules - 'events', 'url', 'crypto', 'util', 'net', 'tls', 'buffer', 'path', - 'node:zlib', 'node:fs', 'node:path', 'node:url', - // openspec is not actually used in the code, remove it + 'events', 'url', 'crypto', 'util', 'net', 'tls', 'buffer', 'path', 'stream', 'stream/promises', + 'node:zlib', 'node:fs', 'node:path', 'node:url', 'node:stream', 'node:stream/promises' ], output: { globals: {