feat: 更新 Kafka 配置和数据库管理逻辑

- 在 .env.example 中添加 Kafka 配置项:KAFKA_FETCH_MAX_BYTES, KAFKA_FETCH_MIN_BYTES, KAFKA_FETCH_MAX_WAIT_MS。
- 删除 room_status_sync 提案及相关文档。
- 删除 fix_uint64_overflow 提案及相关文档。
- 更新数据库管理器以支持使用 COPY 语句进行高效数据写入,替换批量 INSERT 逻辑。
- 实现心跳数据的整数溢出处理,确保无效数据被持久化到 heartbeat_events_errors 表。
- 更新处理器规范,确保心跳数据成功写入历史表后触发 room_status 同步。
- 添加新文档,描述新的分区方法案例。
- 归档旧的提案和规范文档以保持项目整洁。
This commit is contained in:
2026-03-03 18:22:12 +08:00
parent d0c4940e01
commit c0cdc9ea66
19 changed files with 188 additions and 215 deletions

View File

@@ -1,5 +1,8 @@
// 数据库管理器模块
import { Pool } from 'pg';
import { pipeline } from 'stream/promises';
import { Readable } from 'stream';
import pgCopyStreams from 'pg-copy-streams';
const { from: copyFrom } = pgCopyStreams;
class DatabaseManager {
constructor(config) {
@@ -78,21 +81,6 @@ class DatabaseManager {
async initTables() {
try {
// 兼容保留旧表public.heartbeat避免现有调用路径直接报错。
const legacyTableQuery = `
CREATE TABLE IF NOT EXISTS public.heartbeat (
id SERIAL PRIMARY KEY,
component_id VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON public.heartbeat(component_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON public.heartbeat(timestamp);
`;
// v2高吞吐按天分区表位于 heartbeat schema
const v2SchemaQuery = `
BEGIN;
@@ -210,35 +198,24 @@ class DatabaseManager {
SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD'));
$$;
-- 创建单日分区(幂等);父表索引自动继承到子表,无需手动建索引
CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
start_ms bigint;
end_ms bigint;
end_ms bigint;
part_name text;
BEGIN
start_ms := heartbeat.day_start_ms_shanghai(p_day);
end_ms := start_ms + 86400000;
start_ms := heartbeat.day_start_ms_shanghai(p_day);
end_ms := start_ms + 86400000;
part_name := heartbeat.partition_name_for_day(p_day);
IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN
RETURN;
END IF;
EXECUTE format(
'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);',
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)',
part_name, start_ms, end_ms
);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name);
END;
$$;
@@ -253,10 +230,9 @@ class DatabaseManager {
RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day;
END IF;
d := p_start_day;
WHILE d <= p_end_day LOOP
FOR d IN SELECT generate_series(p_start_day, p_end_day, interval '1 day')::date
LOOP
PERFORM heartbeat.create_daily_partition(d);
d := d + 1;
END LOOP;
END;
$$;
@@ -264,7 +240,6 @@ class DatabaseManager {
COMMIT;
`;
await this.pool.query(legacyTableQuery);
await this.pool.query(v2SchemaQuery);
await this.ensureIpColumnVarchar();
await this.ensureRoomIdColumnVarchar();
@@ -494,20 +469,20 @@ class DatabaseManager {
e.extra ?? null,
];
const values = [];
const placeholders = events
.map((e, rowIndex) => {
const base = rowIndex * columns.length;
values.push(...toRowValues(e));
const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', ');
return `(${row})`;
})
.join(', ');
const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`;
const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns
.map((_, i) => `$${i + 1}`)
.join(', ')})`;
const formatPgCol = (v) => {
if (v === null || v === undefined) return '\\N';
if (Array.isArray(v)) {
const inner = v.map((item) => {
if (item === null || item === undefined) return 'NULL';
const s = String(item);
return `"${s.replace(/\\/g, '\\\\').replace(/"/g, '\\"')}"`;
});
const arrStr = `{${inner.join(',')}}`;
return arrStr.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t');
}
const s = typeof v === 'object' ? JSON.stringify(v) : String(v);
return s.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t');
};
const runInsertOnce = async () => {
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
@@ -517,20 +492,21 @@ class DatabaseManager {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const res = await client.query(sql, values);
const insertedCount = Number(res?.rowCount ?? 0);
if (insertedCount !== events.length) {
throw new Error(`insert rowCount mismatch: expect=${events.length} actual=${insertedCount}`);
const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`;
const stream = client.query(copyFrom(copySql));
// Use a generator to stream rows directly
async function* generateRows() {
for (const e of events) {
const rowValues = toRowValues(e);
const line = rowValues.map(formatPgCol).join('\t') + '\n';
yield line;
}
}
await client.query('COMMIT');
return { insertedCount };
await pipeline(Readable.from(generateRows()), stream);
return { insertedCount: events.length };
} catch (error) {
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
console.error('[db] rollback failed:', rollbackError);
}
throw error;
} finally {
client.release();
@@ -565,6 +541,10 @@ class DatabaseManager {
let insertedCount = 0;
console.error('[db] 批量写入失败,已切换为逐条写入:', lastError);
const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns
.map((_, i) => `$${i + 1}`)
.join(', ')})`;
for (const event of events) {
try {
await this.pool.query(singleSql, toRowValues(event));
@@ -621,40 +601,7 @@ class DatabaseManager {
}
}
async insertHeartbeatData(data) {
try {
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0) {
return;
}
// 构建批量插入语句
const values = data.map(item => [
item.component_id,
item.status,
item.timestamp,
item.data
]);
const query = {
text: `
INSERT INTO heartbeat (component_id, status, timestamp, data)
VALUES ${values.map((_, index) => `($${index * 4 + 1}, $${index * 4 + 2}, $${index * 4 + 3}, $${index * 4 + 4})`).join(', ')}
`,
values: values.flat()
};
const res = await this.pool.query(query);
console.log(`成功插入 ${data.length} 条心跳数据`);
return { insertedCount: Number(res?.rowCount ?? data.length) };
} catch (error) {
console.error('插入心跳数据失败:', error);
throw error;
}
}
// 同步更新 room_status.room_status_moment 表
// 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert
@@ -833,45 +780,7 @@ class DatabaseManager {
}
}
async getLatestHeartbeat(componentId) {
try {
const query = {
text: `
SELECT * FROM heartbeat
WHERE component_id = $1
ORDER BY timestamp DESC
LIMIT 1
`,
values: [componentId]
};
const result = await this.pool.query(query);
return result.rows[0] || null;
} catch (error) {
console.error('查询最新心跳数据失败:', error);
throw error;
}
}
async getHeartbeatHistory(componentId, startTime, endTime) {
try {
const query = {
text: `
SELECT * FROM heartbeat
WHERE component_id = $1
AND timestamp BETWEEN $2 AND $3
ORDER BY timestamp DESC
`,
values: [componentId, startTime, endTime]
};
const result = await this.pool.query(query);
return result.rows;
} catch (error) {
console.error('查询心跳历史数据失败:', error);
throw error;
}
}
}
export { DatabaseManager };

View File

@@ -235,9 +235,6 @@ class HeartbeatProcessor {
}
}
} else {
const result = await this.databaseManager.insertHeartbeatData(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0);
}
this.batchQueue.splice(0, batchEventCount);