From eb94aaf92b41ff6ddd88d4afde812c7af36613e5 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Mon, 12 Jan 2026 19:53:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8D=87=E7=BA=A7=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E4=B8=BA=E9=AB=98=E5=90=9E=E5=90=90?= =?UTF-8?q?=E6=97=A5=E5=88=86=E5=8C=BA=E6=A8=A1=E5=9E=8B=EF=BC=88v2?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 heartbeat 数据库与表结构文档,描述心跳明细表设计及字段约束。 - 新增 OpenSpec 符合性说明文档,指出与规范的一致点及偏差。 - 新增 Kafka 心跳数据推送说明文档,定义消息格式与推送方式。 - 更新数据库创建脚本,支持 UTF-8 编码与中文排序规则。 - 更新心跳表结构脚本,定义主表及索引,采用 ts_ms 日分区。 - 实现自动分区机制,确保按天创建分区以支持高吞吐写入。 - 添加数据库应用脚本,自动执行 SQL 文件并验证表结构。 - 添加运行时烟雾测试脚本,验证数据库连接与基本操作。 - 添加完整的烟雾测试脚本,验证数据插入与分区创建。 --- docs/db-heartbeat-schema.md | 87 +++++ docs/db-openspec-compliance.md | 53 +++ docs/kafka-heartbeat-producer.md | 74 +++++ .../update-heartbeat-db-v2/proposal.md | 20 ++ .../update-heartbeat-db-v2/specs/db/spec.md | 26 ++ .../specs/kafka/spec.md | 17 + .../specs/processor/spec.md | 9 + .../changes/update-heartbeat-db-v2/tasks.md | 10 + openspec/specs/db/spec.md | 6 + package.json | 3 +- scripts/db/001_create_database.sql | 21 ++ scripts/db/010_heartbeat_schema.sql | 61 ++++ scripts/db/020_partitioning_auto_daily.sql | 91 +++++ scripts/db/apply.js | 107 ++++++ scripts/db/runtimeSmoke.js | 15 + scripts/db/smokeTest.js | 67 ++++ src/config/config.example.js | 9 +- src/db/databaseManager.js | 313 +++++++++++++++++- 18 files changed, 978 insertions(+), 11 deletions(-) create mode 100644 docs/db-heartbeat-schema.md create mode 100644 docs/db-openspec-compliance.md create mode 100644 docs/kafka-heartbeat-producer.md create mode 100644 openspec/changes/update-heartbeat-db-v2/proposal.md create mode 100644 openspec/changes/update-heartbeat-db-v2/specs/db/spec.md create mode 100644 openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md create mode 100644 openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md create mode 100644 openspec/changes/update-heartbeat-db-v2/tasks.md create mode 100644 scripts/db/001_create_database.sql create mode 100644 scripts/db/010_heartbeat_schema.sql create mode 100644 scripts/db/020_partitioning_auto_daily.sql create mode 100644 scripts/db/apply.js create mode 100644 scripts/db/runtimeSmoke.js create mode 100644 scripts/db/smokeTest.js diff --git a/docs/db-heartbeat-schema.md b/docs/db-heartbeat-schema.md new file mode 100644 index 0000000..e64f274 --- /dev/null +++ b/docs/db-heartbeat-schema.md @@ -0,0 +1,87 @@ +# Heartbeat 数据库与表结构(v2 草案) + +本文档描述 PostgreSQL 中 `heartbeat` 数据库的心跳明细表设计,用于高吞吐写入与按酒店/时间范围检索。 + +## 1. 数据库与命名空间 +- 数据库:使用既有业务库(默认 `log_platform`,以 `src/config/config.js` 为准) +- Schema:`heartbeat` +- 编码:数据库需为 UTF-8(执行器会输出并提示) +- 排序规则/字符类型:若数据库不是中文 locale,可通过 ICU collation 在列级/表达式级实现中文排序(如确有严格要求)。 + +## 2. 主表 +- 表名:`heartbeat.heartbeat_events` +- 分区:按 `ts_ms`(epoch 毫秒)**按天 RANGE 分区** + +对应脚本: +- `scripts/db/010_heartbeat_schema.sql` +- `scripts/db/020_partitioning_auto_daily.sql` + +### 2.1 字段列表 +| 字段 | 类型 | 必填 | 说明 | +|---|---|---:|---| +| id | bigserial | 否(自动生成) | 自增序列号(写入时可不提供) | +| ts_ms | bigint | 是 | 毫秒级时间戳(epoch ms) | +| hotel_id | int2 | 是 | 酒店编号 | +| room_id | int4 | 是 | 房间编号(或房间唯一标识) | +| device_id | varchar(64) | 是 | 设备 ID(序列号/MAC/混合编码);如明确为纯数字可改 bigint | +| ip | inet | 是 | 设备/上报方 IP(PostgreSQL inet 类型自带格式校验) | +| power_state | int2 | 是 | 取电状态(枚举值待标准化) | +| guest_type | int2 | 是 | 住客身份(住客/空房/保洁/维修等,枚举值待标准化) | +| cardless_state | int2 | 是 | 无卡取电/无卡策略状态(枚举待定) | +| service_mask | bigint | 是 | 服务位图/场景位图(需求指定 BRIN 索引) | +| pms_state | int2 | 是 | PMS 状态(枚举待定) | +| carbon_state | int2 | 是 | 碳控状态(枚举待定) | +| device_count | int2 | 是 | 设备数量/上报设备数量(语义待确认) | +| comm_seq | int2 | 是 | 通讯序号(语义待确认) | +| extra | jsonb | 否 | 可扩展字段:电参/空调状态/版本/来源等 | + +### 2.2 约束 +- 所有必填字段:`NOT NULL` +- `ip`:使用 `inet` 类型(天然校验 IPv4/IPv6 格式) +- 各 `int2/int4`:当前脚本采用“非负 + 上界”CHECK(避免枚举未来扩展造成写入失败) + - 如需更强的枚举约束,建议在确认枚举标准后改为 `IN (...)` 或 `BETWEEN` 更小范围。 + +### 2.3 主键(重要说明) +需求写“主键:id(bigserial)”,但 **PostgreSQL 分区表的主键/唯一约束通常必须包含分区键**。 + +脚本采用: +- `PRIMARY KEY (ts_ms, id)` + +原因:保证分区表可创建、约束可落地。 + +## 3. 分区策略与自动分区 +- 分区键:`ts_ms` +- 粒度:按天(Asia/Shanghai,自然日) +- 自动分区:通过“预创建分区”的方式实现(安装时预建昨天~未来 7 天),并提供函数供服务启动/定时任务调用 + +调用方式: +- SQL:`SELECT heartbeat.ensure_partitions(current_date, current_date + 30);` +- Node:执行 `npm run db:apply`(会应用脚本并预创建分区) + +风险与建议: +- PostgreSQL 在单条 INSERT 执行过程中对父分区表执行 `CREATE TABLE .. PARTITION OF` 会触发锁/使用中限制,导致写入失败;因此不建议“插入时动态建分区”。 +- 推荐每日提前创建未来 N 天分区(例如外部调度/运维脚本或服务启动时调用 `heartbeat.ensure_partitions`)。 + +## 4. 索引设计 +需求指定: +- B-tree:`hotel_id`, `power_state`, `guest_type`, `device_id` +- BRIN:`service_mask` + +额外建议(脚本默认包含,可按需移除): +- `btree (hotel_id, ts_ms)`:覆盖最常见过滤(酒店 + 时间范围),显著提升检索与分区内扫描效率。 + +## 5. 查询性能影响分析(分区) +- 优点: + - 时间范围查询触发分区裁剪(只扫命中的日分区) + - 冷热数据按分区自然分层,便于归档/清理 +- 代价: + - 跨大量分区的查询会增加计划时间与元数据开销 + - 需要运维策略(预建分区、定期维护索引、vacuum/analyze) + +## 6. 性能优化建议(高吞吐) +- 写入:使用批量写入(COPY 或 multi-row INSERT)并控制批大小(例如 500~5000,按网络与锁争用调优) +- 分区:建议预创建未来 7~30 天分区;触发器只做兜底 +- 统计:对 Grafana 读取的 1m/5m/1h 聚合建议做物化视图或汇总表(避免每次扫明细) +- 维护: + - 定期 `VACUUM (ANALYZE)` 各分区 + - 监控 bloat 与 autovacuum 参数 diff --git a/docs/db-openspec-compliance.md b/docs/db-openspec-compliance.md new file mode 100644 index 0000000..c2e8eb1 --- /dev/null +++ b/docs/db-openspec-compliance.md @@ -0,0 +1,53 @@ +# DB 设计与 OpenSpec 符合性/偏差说明(v2) + +本文件用于对照当前 OpenSpec 规范与本次 v2 数据库设计的符合性,并指出偏差与风险点。 + +## 1. 与 OpenSpec(当前主规格)的一致点 +- 满足 `openspec/specs/db/spec.md` 对“表结构初始化、批量写入、约束错误捕获、查询支持”等方向性要求。 +- 本次变更已在提案增量规范中补充了“分区表/自动分区/高吞吐”相关要求: + - `openspec/changes/update-heartbeat-db-v2/specs/db/spec.md` + - `openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md` + - `openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md` + +## 2. 偏差与风险(需要评估) +### 2.1 “主键仅 id” 与 PostgreSQL 分区约束冲突 +- 需求写:主键为 `id (bigserial)`。 +- 现实现:`PRIMARY KEY (ts_ms, id)`。 + +原因:PostgreSQL 分区表的主键/唯一约束通常需要包含分区键,否则无法在父表创建全局约束。 + +影响: +- 业务若强依赖“仅 id 即主键”的语义,需要额外约定(例如只把 id 当作全局唯一序列号使用,主键组合用于物理约束)。 + +### 2.2 “自动分区”实现方式 +- 需求写:新分区可自动创建。 +- 现实现:通过 `heartbeat.ensure_partitions(start_day, end_day)` 预创建分区(安装时默认创建昨天~未来 7 天)。 + +原因:在单条 INSERT 语句执行过程中对分区父表执行 `CREATE TABLE .. PARTITION OF` 会触发 PostgreSQL 的“对象正在被当前查询使用”的限制(已在冒烟测试中复现)。 + +建议: +- 运维/服务启动时每天调用一次 `heartbeat.ensure_partitions(current_date, current_date + N)` 预建未来分区。 +- 如环境允许,可用 `pg_cron` 等机制定时执行。 + +### 2.3 数据库“中文排序规则”约束的实现方式 +- 需求写:数据库排序规则使用适合中文环境的配置。 +- 现状:我们不新建数据库,而是在既有库(默认 log_platform)建表;该库的数据库级 collation 可能不是中文。 + +可行方案: +- 若仅少数字段需要中文排序(通常是 text/varchar):可创建 ICU collation 并在列级/查询级使用(不要求重建数据库)。 +- 若要求“整个数据库默认 collation 为中文”:只能通过重建数据库实现(你明确不希望这么做,则建议采用 ICU/列级 collation 的折中方案)。 + +### 2.4 枚举 CHECK 约束偏保守 +- 需求写:各 int 字段添加合理 CHECK,限制取值范围。 +- 现实现:采用“非负 + 类型上界”的保守范围,避免未来枚举扩展导致写入失败。 + +建议: +- 在枚举标准明确后,将约束收紧为 `IN (...)` 或更小区间。 + +## 3. 结论 +- 结构、分区、索引均已按 v2 脚本落地并通过冒烟测试。 +- 两个主要“需求严格字面”不完全满足点: + 1) 中文 collation(当前 DB 为 en_US.utf8) + 2) 主键仅 id(分区表限制导致采用 ts_ms + id) + +若你希望我下一步把这些偏差也强制满足(例如重建 DB + 调整主键策略/使用其他分区方案),我可以继续改脚本并提供迁移/回滚方案。 diff --git a/docs/kafka-heartbeat-producer.md b/docs/kafka-heartbeat-producer.md new file mode 100644 index 0000000..2ddd548 --- /dev/null +++ b/docs/kafka-heartbeat-producer.md @@ -0,0 +1,74 @@ +# Kafka 心跳数据推送说明(给数据产生者) + +本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。 + +## 1. Topic 与编码 +- Topic:默认 `bls-heartbeat`(以服务端配置为准,见 `src/config/config.js`) +- 编码:UTF-8 +- 建议消息格式:JSON(便于跨语言对接与灰度演进) + +> 注意:当前服务端代码的“二进制解包”尚未实现,若你们已经有既定二进制协议,需要在 Processor 中落地对应解包逻辑,并在本文档补充协议细节。 + +## 2. 消息 Key(强烈建议) +为了保证同设备消息更有序、便于消费端批量聚合: +- Kafka message key:`"{hotel_id}:{device_id}"` + +## 3. 消息 Value(JSON) +### 3.1 必填字段 +下面字段必须提供(否则会被判定为无效数据并丢弃/记录错误): + +| 字段 | 类型 | 示例 | 说明 | +|---|---|---|---| +| ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 | +| hotel_id | number/int | 12 | 酒店编号(int2 范围内) | +| room_id | number/int | 1203 | 房间编号/房间标识(int4) | +| device_id | string | "A1B2C3D4" | 设备唯一 ID(序列号/MAC/自定义编码) | +| ip | string | "192.168.1.10" | IPv4/IPv6 字符串(落库为 inet) | +| power_state | number/int | 1 | 取电状态(枚举值需统一标准) | +| guest_type | number/int | 0 | 住客身份(住客/空房/保洁/维修等,枚举值需统一标准) | +| cardless_state | number/int | 0 | 无卡取电/无卡策略状态(枚举) | +| service_mask | number/int64 | 5 | 服务/场景位图(bigint) | +| pms_state | number/int | 1 | PMS 状态(枚举) | +| carbon_state | number/int | 0 | 碳控状态(枚举) | +| device_count | number/int | 1 | 设备数量/上报设备数(语义需统一) | +| comm_seq | number/int | 7 | 通讯序号(语义需统一) | + +### 3.2 可选字段 +| 字段 | 类型 | 示例 | 说明 | +|---|---|---|---| +| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 | + +## 4. JSON 示例 +```json +{ + "ts_ms": 1700000000123, + "hotel_id": 12, + "room_id": 1203, + "device_id": "A1B2C3D4", + "ip": "192.168.1.10", + "power_state": 1, + "guest_type": 0, + "cardless_state": 0, + "service_mask": 5, + "pms_state": 1, + "carbon_state": 0, + "device_count": 1, + "comm_seq": 7, + "extra": { + "source": "gw", + "ver": "1.2.3", + "ac": {"mode": 1, "set_temp": 26}, + "meter": {"p": 123.4, "e_wh": 5678} + } +} +``` + +## 5. 推送方式(实现建议) +- Producer:建议开启压缩(lz4/zstd)、合理的 `batch.size` 与 `linger.ms`,以降低单条发送开销 +- 分区:按 key 分区(同设备落同分区) +- 语义:至少一次(at-least-once)或恰好一次(exactly-once)取决于你们链路要求;服务端需要配合幂等/去重策略(如后续引入唯一键) + +## 6. 与数据库字段的映射 +服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform) +- 必填字段:与表字段同名 +- 弹性字段:写入 `extra`(jsonb) diff --git a/openspec/changes/update-heartbeat-db-v2/proposal.md b/openspec/changes/update-heartbeat-db-v2/proposal.md new file mode 100644 index 0000000..d161b5b --- /dev/null +++ b/openspec/changes/update-heartbeat-db-v2/proposal.md @@ -0,0 +1,20 @@ +# Change: 升级心跳数据库为高吞吐日分区模型(v2) + +## Why +现有实现仅包含简化的 `heartbeat` 表(component/status/timestamp),无法满足“每分钟约 5 万条记录、按酒店/时间范围检索、按状态聚合”的高吞吐与长期存储需求。 + +## What Changes +- **DB**:在既有数据库(默认 `log_platform`)内新增/升级为按 `ts_ms`(毫秒 epoch)日分区的心跳明细表,并提供分区预创建机制 +- **DB**:补齐指定索引(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN)与约束(NOT NULL、CHECK、IP 格式) +- **Docs**:补充 Kafka 生产者推送数据结构与推送方式说明 + +## Impact +- Affected specs: `openspec/specs/db`, `openspec/specs/kafka`, `openspec/specs/processor` +- Affected code: 未来 `src/db/databaseManager.js` 的表初始化/写入字段将需要适配(本变更提案先聚焦 DB 结构与脚本) + +## Key Decisions / Risks (需要明确) +- **分区 + 主键冲突风险**:PostgreSQL 分区表的全局唯一/主键通常需要包含分区键;需求写“主键仅 id(bigserial)”。 + - 本次脚本将采用 `PRIMARY KEY (ts_ms, id)` 来保证可创建与可执行。 +- **device_id 类型待确认**:脚本暂定为 `varchar(64)`(兼容序列号/MAC/混合编码);若确定为纯数字,可改为 `bigint` 获得更紧凑索引。 +- **中文排序规则**:不新建库时无法修改数据库级 collation;若需要中文排序建议使用 ICU collation(列级/表达式级)。 +- **自动建分区方式**:PostgreSQL 在单条 INSERT 执行过程中动态 `CREATE PARTITION` 会触发“对象正在使用”限制;因此采用“预创建分区(安装/定时任务/启动时调用)”。 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md new file mode 100644 index 0000000..3ab929b --- /dev/null +++ b/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md @@ -0,0 +1,26 @@ +## MODIFIED Requirements + +### Requirement: 数据库表结构管理 +系统必须包含数据库表结构的定义和管理机制。 + +#### Scenario: 表结构初始化(高吞吐分区表) +- **WHEN** 系统首次启动或部署数据库时 +- **THEN** 应该存在按 `ts_ms` 日分区的心跳明细表 +- **AND** 必填字段应具备 NOT NULL 约束 +- **AND** 状态类字段应具备 CHECK 约束(限制取值范围) +- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN) + +#### Scenario: 自动分区 +- **WHEN** 写入某天数据而该日分区不存在 +- **THEN** 系统应能够自动创建对应日分区或确保分区被预创建 +- **AND** 不应影响持续写入(高吞吐场景) + +## ADDED Requirements + +### Requirement: 高吞吐写入友好 +系统在高吞吐场景(约 5 万条/分钟量级)下应避免单点瓶颈。 + +#### Scenario: 批量写入与分区裁剪 +- **WHEN** 进行批量写入 +- **THEN** 写入应路由到正确日分区 +- **AND** 常见查询(hotel_id + 时间范围)应触发分区裁剪 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md new file mode 100644 index 0000000..139dee1 --- /dev/null +++ b/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md @@ -0,0 +1,17 @@ +## ADDED Requirements + +### Requirement: 心跳消息载荷格式(生产者约束) +Kafka 心跳消息必须包含数据库落库所需的必填字段,并采用 UTF-8 编码。 + +#### Scenario: JSON 心跳消息 +- **WHEN** 生产者向主题推送心跳消息 +- **THEN** 消息 value 应为 JSON(UTF-8) +- **AND** 至少包含 ts_ms、hotel_id、room_id、device_id、ip、power_state、guest_type、cardless_state、service_mask、pms_state、carbon_state、device_count、comm_seq +- **AND** 可选包含 extra(json object) + +### Requirement: 分区键友好的 Kafka Key + +#### Scenario: 使用 device_id 作为 key +- **WHEN** 生产者发送消息 +- **THEN** 建议使用 `hotel_id:device_id` 作为 Kafka message key +- **AND** 以提升同设备有序性与消费侧批量聚合效率 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md new file mode 100644 index 0000000..a30049b --- /dev/null +++ b/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md @@ -0,0 +1,9 @@ +## MODIFIED Requirements + +### Requirement: 心跳数据转换 +系统必须能够将解包后的心跳数据转换为数据库存储格式。 + +#### Scenario: 转换为 v2 明细表字段 +- **WHEN** 心跳数据验证通过时 +- **THEN** 系统应输出与 v2 明细表字段一致的数据结构 +- **AND** 缺失必填字段时应判定为无效数据并丢弃 diff --git a/openspec/changes/update-heartbeat-db-v2/tasks.md b/openspec/changes/update-heartbeat-db-v2/tasks.md new file mode 100644 index 0000000..3f23f6e --- /dev/null +++ b/openspec/changes/update-heartbeat-db-v2/tasks.md @@ -0,0 +1,10 @@ +## 1. Implementation +- [ ] 提供 PostgreSQL 建库脚本(UTF-8 + 中文排序规则可选) +- [ ] 提供心跳明细表结构(必填字段、可选字段、约束、索引) +- [ ] 实现按 `ts_ms` 日分区与自动建分区机制 +- [ ] 补充性能建议(索引策略、分区影响、聚合/物化视图建议) +- [ ] 产出 docs:DB 表结构文档 + Kafka 生产者推送数据结构与方式 + +## 2. Validation +- [ ] 在可访问的 PostgreSQL 环境执行脚本并验证对象创建成功 +- [ ] 检查约束与索引是否符合要求 diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index 0053532..877aff2 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -50,6 +50,12 @@ - **THEN** 系统应该检查数据库表是否存在 - **AND** 不存在时应该创建表结构 +#### Scenario: 分区预创建(无人值守) +- **WHEN** 系统启动完成数据库初始化后 +- **THEN** 系统应该预创建未来一段时间(例如未来 30 天)的日分区 +- **AND** 系统应该周期性执行该预创建以保证长期运行不中断 +- **AND** 当分区预创建失败时应记录错误日志 + #### Scenario: 表结构迁移 - **WHEN** 表结构需要变更时 - **THEN** 系统应该支持平滑的表结构迁移 diff --git a/package.json b/package.json index 35656e6..baf1980 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,8 @@ "build": "vite build", "preview": "vite preview", "lint": "eslint . --ext .js", - "test": "mocha" + "test": "mocha", + "db:apply": "node scripts/db/apply.js" }, "dependencies": { "kafka-node": "^5.0.0", diff --git a/scripts/db/001_create_database.sql b/scripts/db/001_create_database.sql new file mode 100644 index 0000000..848b169 --- /dev/null +++ b/scripts/db/001_create_database.sql @@ -0,0 +1,21 @@ +-- 001_create_database.sql +-- 说明:本项目当前约定【不新建数据库】,而是在既有数据库(默认 log_platform)中创建心跳表。 +-- 因此该文件仅保留“若未来需要独立库时的参考写法”,不会被执行器强依赖。 + +-- 推荐方式(Linux 常见) +-- 如果服务器安装了 zh_CN.utf8: +-- CREATE DATABASE heartbeat +-- WITH ENCODING 'UTF8' +-- LC_COLLATE 'zh_CN.utf8' +-- LC_CTYPE 'zh_CN.utf8' +-- TEMPLATE template0; + +-- ICU 方式(PostgreSQL 15+ 且启用 ICU;具体 locale 需按服务器实际调整) +-- CREATE DATABASE heartbeat +-- WITH ENCODING 'UTF8' +-- LOCALE_PROVIDER icu +-- ICU_LOCALE 'zh-Hans-CN' +-- TEMPLATE template0; + +-- 兜底方式(若无中文 locale,可先创建 UTF8,再在需要排序的列上使用 ICU/自定义 collations) +-- CREATE DATABASE heartbeat WITH ENCODING 'UTF8'; diff --git a/scripts/db/010_heartbeat_schema.sql b/scripts/db/010_heartbeat_schema.sql new file mode 100644 index 0000000..7469adb --- /dev/null +++ b/scripts/db/010_heartbeat_schema.sql @@ -0,0 +1,61 @@ +-- 010_heartbeat_schema.sql +-- 在 heartbeat 数据库内执行 + +BEGIN; + +CREATE SCHEMA IF NOT EXISTS heartbeat; + +-- 主表(按 ts_ms 日分区) +-- 说明:PostgreSQL 分区表的 PRIMARY KEY 通常需要包含分区键。 +-- 这里使用 (ts_ms, id) 作为主键以保证可创建且可执行。 +CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( + id bigserial, + + ts_ms bigint NOT NULL, + hotel_id int2 NOT NULL, + room_id int4 NOT NULL, + device_id varchar(64) NOT NULL, + ip inet NOT NULL, + power_state int2 NOT NULL, + guest_type int2 NOT NULL, + cardless_state int2 NOT NULL, + service_mask bigint NOT NULL, + pms_state int2 NOT NULL, + carbon_state int2 NOT NULL, + device_count int2 NOT NULL, + comm_seq int2 NOT NULL, + + -- 弹性字段:电参/空调等(后续可结构化拆列;当前先放 extra) + extra jsonb, + + CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id), + + -- CHECK 约束:先做“非负+上界”约束(避免未来枚举扩展导致写入失败) + CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), + CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), + CONSTRAINT chk_room_id_range CHECK (room_id >= 0), + CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), + CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), + CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), + CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), + CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), + CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), + CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0 AND comm_seq <= 32767) +) +PARTITION BY RANGE (ts_ms); + +-- 指定索引 +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); + +-- 需求指定:service_mask 使用 BRIN +-- 说明:BRIN 对“随时间递增且有相关性”的列收益更大;service_mask 若不具备相关性,收益可能有限。 +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); + +-- 高价值附加索引(不在需求强制列表内):常见查询是 hotel_id + 时间范围 +-- 若不希望额外索引,可注释掉 +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); + +COMMIT; diff --git a/scripts/db/020_partitioning_auto_daily.sql b/scripts/db/020_partitioning_auto_daily.sql new file mode 100644 index 0000000..1eb305e --- /dev/null +++ b/scripts/db/020_partitioning_auto_daily.sql @@ -0,0 +1,91 @@ +-- 020_partitioning_auto_daily.sql +-- 在 heartbeat 数据库内执行 +-- 目标:按 ts_ms(epoch ms)日分区 + 自动创建分区(预创建/定时任务方式) + +BEGIN; + +-- 清理旧方案遗留的 DEFAULT 分区(若存在) +-- 说明:当前方案采用“预创建分区”,不使用 DEFAULT 分区兜底,避免数据落入 default 后影响按天管理。 +DO $$ +BEGIN + IF to_regclass('heartbeat.heartbeat_events_default') IS NOT NULL THEN + EXECUTE 'DROP TABLE heartbeat.heartbeat_events_default'; + END IF; +END $$; + +-- 将 date(按 Asia/Shanghai 00:00)转换为 epoch ms +CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) +RETURNS bigint +LANGUAGE sql +IMMUTABLE +AS $$ + SELECT ( + EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai')) + * 1000 + )::bigint; +$$; + +CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date) +RETURNS text +LANGUAGE sql +IMMUTABLE +AS $$ + SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); +$$; + +-- 创建单日分区(若不存在)并创建该分区上的索引 +CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) +RETURNS void +LANGUAGE plpgsql +AS $$ +DECLARE + start_ms bigint; + end_ms bigint; + part_name text; +BEGIN + start_ms := heartbeat.day_start_ms_shanghai(p_day); + end_ms := start_ms + 86400000; + part_name := heartbeat.partition_name_for_day(p_day); + + IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN + RETURN; + END IF; + + EXECUTE format( + 'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);', + part_name, start_ms, end_ms + ); + + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); +END; +$$; + +-- 确保日期范围内的分区都存在(含首尾) +CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date) +RETURNS void +LANGUAGE plpgsql +AS $$ +DECLARE + d date; +BEGIN + IF p_end_day < p_start_day THEN + RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; + END IF; + + d := p_start_day; + WHILE d <= p_end_day LOOP + PERFORM heartbeat.create_daily_partition(d); + d := d + 1; + END LOOP; +END; +$$; + +-- 安装时预创建:昨天到未来 7 天 +SELECT heartbeat.ensure_partitions(current_date - 1, current_date + 7); + +COMMIT; diff --git a/scripts/db/apply.js b/scripts/db/apply.js new file mode 100644 index 0000000..3271d62 --- /dev/null +++ b/scripts/db/apply.js @@ -0,0 +1,107 @@ +import fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { Client } from 'pg'; + +import config from '../../src/config/config.js'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +function getEnv(name, fallback) { + return process.env[name] ?? fallback; +} + +function buildClientConfig(database) { + const db = config.db; + return { + host: getEnv('PGHOST', db.host), + port: Number(getEnv('PGPORT', db.port)), + user: getEnv('PGUSER', db.user), + password: getEnv('PGPASSWORD', db.password), + database, + }; +} + +async function runSqlFile(client, filePath) { + const sql = await fs.readFile(filePath, 'utf8'); + const trimmed = sql.trim(); + if (!trimmed) return; + await client.query(sql); +} + +async function main() { + const scriptsDir = __dirname; + + const schemaFile = path.join(scriptsDir, '010_heartbeat_schema.sql'); + const partitionFile = path.join(scriptsDir, '020_partitioning_auto_daily.sql'); + + const targetDb = getEnv('PGTARGETDB', config.db.database); + + console.log(`[db] Connecting to target db: ${targetDb}`); + const targetClient = new Client(buildClientConfig(targetDb)); + await targetClient.connect(); + + try { + const dbMeta = await targetClient.query( + `SELECT + current_database() AS db, + pg_encoding_to_char(encoding) AS encoding, + datcollate, + datctype, + datlocprovider + FROM pg_database + WHERE datname = current_database()` + ); + if (dbMeta.rowCount === 1) { + const m = dbMeta.rows[0]; + console.log( + `[db] ${m.db} meta: encoding=${m.encoding} collate=${m.datcollate} ctype=${m.datctype} provider=${m.datlocprovider}` + ); + if (String(m.encoding).toUpperCase() !== 'UTF8') { + console.warn(`[db] WARN: ${m.db} encoding is not UTF8`); + } + const coll = String(m.datcollate ?? '').toLowerCase(); + if (coll && !coll.includes('zh') && !coll.includes('chinese')) { + console.warn( + `[db] WARN: ${m.db} collation is not obviously Chinese; if required, use ICU collation per-column or rebuild DB with zh locale` + ); + } + } + + console.log(`[db] Applying: ${path.basename(schemaFile)}`); + await runSqlFile(targetClient, schemaFile); + + console.log(`[db] Applying: ${path.basename(partitionFile)}`); + await runSqlFile(targetClient, partitionFile); + + const tableCheck = await targetClient.query( + "SELECT to_regclass('heartbeat.heartbeat_events') AS reg" + ); + if (!tableCheck.rows?.[0]?.reg) { + throw new Error('heartbeat.heartbeat_events was not created'); + } + + const indexCheck = await targetClient.query( + `SELECT indexname + FROM pg_indexes + WHERE schemaname = 'heartbeat' + AND tablename = 'heartbeat_events' + ORDER BY indexname` + ); + + console.log('[db] Parent table indexes:'); + for (const row of indexCheck.rows) { + console.log(` - ${row.indexname}`); + } + + console.log('[db] Done'); + } finally { + await targetClient.end(); + } +} + +main().catch((err) => { + console.error('[db] Failed:', err); + process.exit(1); +}); diff --git a/scripts/db/runtimeSmoke.js b/scripts/db/runtimeSmoke.js new file mode 100644 index 0000000..b223595 --- /dev/null +++ b/scripts/db/runtimeSmoke.js @@ -0,0 +1,15 @@ +import config from '../../src/config/config.js'; +import { DatabaseManager } from '../../src/db/databaseManager.js'; + +async function main() { + const db = new DatabaseManager(config.db); + await db.connect(); + console.log('runtime smoke: connected'); + await db.disconnect(); + console.log('runtime smoke: disconnected'); +} + +main().catch((err) => { + console.error('runtime smoke failed:', err); + process.exit(1); +}); diff --git a/scripts/db/smokeTest.js b/scripts/db/smokeTest.js new file mode 100644 index 0000000..69d2089 --- /dev/null +++ b/scripts/db/smokeTest.js @@ -0,0 +1,67 @@ +import { Client } from 'pg'; +import config from '../../src/config/config.js'; + +async function main() { + const client = new Client({ + host: config.db.host, + port: config.db.port, + user: config.db.user, + password: config.db.password, + database: config.db.database, + }); + + await client.connect(); + + // 预创建今日分区,避免“无分区时 INSERT 直接失败” + await client.query('SELECT heartbeat.ensure_partitions(current_date, current_date)'); + + const ts = Date.now(); + await client.query( + `INSERT INTO heartbeat.heartbeat_events ( + ts_ms, hotel_id, room_id, device_id, ip, + power_state, guest_type, cardless_state, service_mask, + pms_state, carbon_state, device_count, comm_seq, extra + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)`, + [ + ts, + 1, + 101, + 'dev-1', + '192.168.0.1', + 1, + 0, + 0, + 5, + 0, + 0, + 1, + 1, + { source: 'smoke-test' }, + ] + ); + + const partitions = await client.query( + `SELECT c.relname AS partition + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_namespace n ON n.oid = p.relnamespace + WHERE n.nspname = 'heartbeat' + AND p.relname = 'heartbeat_events' + ORDER BY c.relname` + ); + + const cnt = await client.query( + 'SELECT count(*)::int AS n FROM heartbeat.heartbeat_events' + ); + + console.log('partitions:', partitions.rows.map((r) => r.partition)); + console.log('rows:', cnt.rows[0].n); + + await client.end(); +} + +main().catch((err) => { + console.error('smoke test failed:', err); + process.exit(1); +}); diff --git a/src/config/config.example.js b/src/config/config.example.js index a630d13..7985ad5 100644 --- a/src/config/config.example.js +++ b/src/config/config.example.js @@ -29,7 +29,14 @@ export default { maxConnections: 10, // 最大连接数 idleTimeoutMillis: 30000, // 连接空闲超时时间 retryAttempts: 3, // 重试次数 - retryDelay: 1000 // 重试延迟 + retryDelay: 1000, // 重试延迟 + + // 分区维护(方案1):启动时预创建 + 周期维护 + partitionMaintenance: { + enabled: true, + futureDays: 30, + intervalHours: 6 + } }, // 日志配置 diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index ea64cde..85ffe69 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -5,19 +5,37 @@ class DatabaseManager { constructor(config) { this.config = config; this.pool = null; + + this.partitionMaintenanceTimer = null; } async connect() { try { // 创建数据库连接池 - this.pool = new Pool(this.config); + this.pool = new Pool({ + host: this.config.host, + port: this.config.port, + user: this.config.user, + password: this.config.password, + database: this.config.database, + max: this.config.maxConnections, + idleTimeoutMillis: this.config.idleTimeoutMillis, + }); // 测试连接 - await this.pool.connect(); + const client = await this.pool.connect(); + client.release(); console.log('数据库连接池创建成功'); // 初始化表结构 await this.initTables(); + + // 分区维护(方案1):启动时预创建 + 定时维护 + await this.ensurePartitionsForRange({ + startDayOffset: -1, + endDayOffset: this.getPartitionFutureDays(), + }); + this.startPartitionMaintenance(); } catch (error) { console.error('数据库连接失败:', error); throw error; @@ -26,6 +44,7 @@ class DatabaseManager { async disconnect() { try { + this.stopPartitionMaintenance(); if (this.pool) { await this.pool.end(); console.log('数据库连接池已关闭'); @@ -38,8 +57,9 @@ class DatabaseManager { async initTables() { try { - const createTableQuery = ` - CREATE TABLE IF NOT EXISTS heartbeat ( + // 兼容:保留旧表(public.heartbeat),避免现有调用路径直接报错。 + const legacyTableQuery = ` + CREATE TABLE IF NOT EXISTS public.heartbeat ( id SERIAL PRIMARY KEY, component_id VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, @@ -47,12 +67,133 @@ class DatabaseManager { data JSONB, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); - - CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON heartbeat(component_id); - CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON heartbeat(timestamp); + + CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON public.heartbeat(component_id); + CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON public.heartbeat(timestamp); `; - - await this.pool.query(createTableQuery); + + // v2:高吞吐按天分区表(位于 heartbeat schema) + const v2SchemaQuery = ` + BEGIN; + + CREATE SCHEMA IF NOT EXISTS heartbeat; + + CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( + id bigserial, + + ts_ms bigint NOT NULL, + hotel_id int2 NOT NULL, + room_id int4 NOT NULL, + device_id varchar(64) NOT NULL, + ip inet NOT NULL, + power_state int2 NOT NULL, + guest_type int2 NOT NULL, + cardless_state int2 NOT NULL, + service_mask bigint NOT NULL, + pms_state int2 NOT NULL, + carbon_state int2 NOT NULL, + device_count int2 NOT NULL, + comm_seq int2 NOT NULL, + + extra jsonb, + + CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id), + + CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), + CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), + CONSTRAINT chk_room_id_range CHECK (room_id >= 0), + CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), + CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), + CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), + CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), + CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), + CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), + CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0 AND comm_seq <= 32767) + ) + PARTITION BY RANGE (ts_ms); + + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); + + -- 分区预创建函数(按 Asia/Shanghai 自然日) + CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) + RETURNS bigint + LANGUAGE sql + IMMUTABLE + AS $$ + SELECT ( + EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai')) + * 1000 + )::bigint; + $$; + + CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date) + RETURNS text + LANGUAGE sql + IMMUTABLE + AS $$ + SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); + $$; + + CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) + RETURNS void + LANGUAGE plpgsql + AS $$ + DECLARE + start_ms bigint; + end_ms bigint; + part_name text; + BEGIN + start_ms := heartbeat.day_start_ms_shanghai(p_day); + end_ms := start_ms + 86400000; + part_name := heartbeat.partition_name_for_day(p_day); + + IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN + RETURN; + END IF; + + EXECUTE format( + 'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);', + part_name, start_ms, end_ms + ); + + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); + END; + $$; + + CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date) + RETURNS void + LANGUAGE plpgsql + AS $$ + DECLARE + d date; + BEGIN + IF p_end_day < p_start_day THEN + RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; + END IF; + + d := p_start_day; + WHILE d <= p_end_day LOOP + PERFORM heartbeat.create_daily_partition(d); + d := d + 1; + END LOOP; + END; + $$; + + COMMIT; + `; + + await this.pool.query(legacyTableQuery); + await this.pool.query(v2SchemaQuery); console.log('数据库表初始化成功'); } catch (error) { console.error('数据库表初始化失败:', error); @@ -60,6 +201,160 @@ class DatabaseManager { } } + getPartitionConfig() { + const cfg = this.config.partitionMaintenance ?? {}; + return { + enabled: cfg.enabled !== false, + futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30, + intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6, + }; + } + + getPartitionFutureDays() { + return this.getPartitionConfig().futureDays; + } + + async ensurePartitionsForRange({ startDayOffset, endDayOffset }) { + const startOffset = Number(startDayOffset ?? 0); + const endOffset = Number(endDayOffset ?? 0); + await this.pool.query( + 'SELECT heartbeat.ensure_partitions(current_date + $1::int, current_date + $2::int)', + [startOffset, endOffset] + ); + } + + startPartitionMaintenance() { + const cfg = this.getPartitionConfig(); + if (!cfg.enabled) { + return; + } + + if (this.partitionMaintenanceTimer) { + return; + } + + const intervalMs = Math.max(60_000, cfg.intervalHours * 60 * 60 * 1000); + this.partitionMaintenanceTimer = setInterval(async () => { + try { + await this.ensurePartitionsForRange({ + startDayOffset: -1, + endDayOffset: this.getPartitionFutureDays(), + }); + console.log('[db] 分区预创建维护完成'); + } catch (err) { + console.error('[db] 分区预创建维护失败:', err); + } + }, intervalMs); + + // 不阻止进程退出 + this.partitionMaintenanceTimer.unref?.(); + } + + stopPartitionMaintenance() { + if (this.partitionMaintenanceTimer) { + clearInterval(this.partitionMaintenanceTimer); + this.partitionMaintenanceTimer = null; + } + } + + formatShanghaiDate(tsMs) { + const date = new Date(Number(tsMs)); + const fmt = new Intl.DateTimeFormat('en-CA', { + timeZone: 'Asia/Shanghai', + year: 'numeric', + month: '2-digit', + day: '2-digit', + }); + return fmt.format(date); + } + + async ensurePartitionsForTsRange(tsMin, tsMax) { + const startDay = this.formatShanghaiDate(tsMin); + const endDay = this.formatShanghaiDate(tsMax); + await this.pool.query('SELECT heartbeat.ensure_partitions($1::date, $2::date)', [ + startDay, + endDay, + ]); + } + + isMissingPartitionError(error) { + const msg = String(error?.message ?? ''); + return msg.includes('no partition of relation') && msg.includes('heartbeat_events'); + } + + // v2 明细表写入:用于未来对接 Kafka 心跳字段 + async insertHeartbeatEvents(events) { + if (!Array.isArray(events)) { + events = [events]; + } + if (events.length === 0) return; + + const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); + if (tsValues.length > 0) { + await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); + } + + const columns = [ + 'ts_ms', + 'hotel_id', + 'room_id', + 'device_id', + 'ip', + 'power_state', + 'guest_type', + 'cardless_state', + 'service_mask', + 'pms_state', + 'carbon_state', + 'device_count', + 'comm_seq', + 'extra', + ]; + + const values = []; + const placeholders = events + .map((e, rowIndex) => { + const base = rowIndex * columns.length; + values.push( + e.ts_ms, + e.hotel_id, + e.room_id, + e.device_id, + e.ip, + e.power_state, + e.guest_type, + e.cardless_state, + e.service_mask, + e.pms_state, + e.carbon_state, + e.device_count, + e.comm_seq, + e.extra ?? null + ); + const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); + return `(${row})`; + }) + .join(', '); + + const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; + + try { + await this.pool.query(sql, values); + } catch (error) { + // 兜底:若仍因缺分区失败,尝试确保“当前到未来 N 天”后重试一次 + if (this.isMissingPartitionError(error)) { + console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次'); + await this.ensurePartitionsForRange({ + startDayOffset: -7, + endDayOffset: this.getPartitionFutureDays(), + }); + await this.pool.query(sql, values); + return; + } + throw error; + } + } + async insertHeartbeatData(data) { try { if (!Array.isArray(data)) {