diff --git a/scripts/db/010_heartbeat_schema.sql b/scripts/db/010_heartbeat_schema.sql index d57eb8f..945f407 100644 --- a/scripts/db/010_heartbeat_schema.sql +++ b/scripts/db/010_heartbeat_schema.sql @@ -14,6 +14,7 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), ts_ms bigint NOT NULL, + write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint, hotel_id int2 NOT NULL, room_id varchar(50) NOT NULL, device_id varchar(64) NOT NULL, @@ -62,6 +63,9 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( ) PARTITION BY RANGE (ts_ms); +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint; +ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; diff --git a/src/config/config.example.js b/src/config/config.example.js index 24b2004..67ee750 100644 --- a/src/config/config.example.js +++ b/src/config/config.example.js @@ -16,8 +16,6 @@ export default { url: 'redis://10.8.8.109:6379', apiBaseUrl: `http://127.0.0.1:${env.PORT ?? 3001}`, heartbeatIntervalMs: 3000, - heartbeatTtlSeconds: 30, - consoleMaxLen: null, }, // Kafka配置 diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 2d8d0bb..fd789a9 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -84,6 +84,7 @@ class DatabaseManager { guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), ts_ms bigint NOT NULL, + write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint, hotel_id int2 NOT NULL, room_id varchar(50) NOT NULL, device_id varchar(64) NOT NULL, @@ -130,6 +131,9 @@ class DatabaseManager { ) PARTITION BY RANGE (ts_ms); + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint; + ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; @@ -396,6 +400,7 @@ class DatabaseManager { const columns = [ 'ts_ms', + 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', @@ -427,6 +432,7 @@ class DatabaseManager { const toRowValues = (e) => [ e.ts_ms, + e.write_ts_ms ?? Date.now(), e.hotel_id, e.room_id, e.device_id, diff --git a/src/redis/redisIntegration.js b/src/redis/redisIntegration.js index 7c120c7..6555b7b 100644 --- a/src/redis/redisIntegration.js +++ b/src/redis/redisIntegration.js @@ -46,28 +46,6 @@ class RedisIntegration { return Number.isFinite(ms) && ms > 0 ? ms : 3000; } - getHeartbeatTtlSeconds() { - const ttl = this.config?.heartbeatTtlSeconds; - if (ttl === undefined || ttl === null) return null; - const n = Number(ttl); - return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; - } - - getHeartbeatMaxLen() { - const v = this.config?.heartbeatMaxLen; - if (v === undefined) return 2000; - if (v === null) return null; - const n = Number(v); - return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; - } - - getConsoleMaxLen() { - const v = this.config?.consoleMaxLen; - if (v === undefined || v === null) return null; - const n = Number(v); - return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; - } - async connect() { if (!this.isEnabled()) { console.log('[redis] disabled'); @@ -191,10 +169,6 @@ class RedisIntegration { this._pendingHeartbeat = null; await this.client.rPush(key, value); - const maxLen = this.getHeartbeatMaxLen(); - if (maxLen) { - await this.client.lTrim(key, -maxLen, -1); - } } async writeHeartbeat() { @@ -218,10 +192,6 @@ class RedisIntegration { for (let attempt = 1; attempt <= 2; attempt += 1) { try { await this.client.rPush(key, value); - const maxLen = this.getHeartbeatMaxLen(); - if (maxLen) { - await this.client.lTrim(key, -maxLen, -1); - } this._pendingHeartbeat = null; return; } catch (err) { @@ -277,11 +247,6 @@ class RedisIntegration { while (this._pendingConsoleLogs.length) { const batch = this._pendingConsoleLogs.splice(0, 200); await this.client.rPush(key, ...batch); - - const maxLen = this.getConsoleMaxLen(); - if (maxLen) { - await this.client.lTrim(key, -maxLen, -1); - } } } finally { this._flushingConsoleLogs = false; @@ -325,12 +290,6 @@ class RedisIntegration { const key = this.getConsoleKey(); await this.client.rPush(key, value); - - const maxLen = this.getConsoleMaxLen(); - if (maxLen) { - // 保留最新 maxLen 条 - await this.client.lTrim(key, -maxLen, -1); - } } info(message, metadata) { diff --git a/test/smoke.test.js b/test/smoke.test.js index 73b53a7..e95a31a 100644 --- a/test/smoke.test.js +++ b/test/smoke.test.js @@ -34,15 +34,12 @@ describe('RedisIntegration protocol', () => { apiBaseUrl: 'http://127.0.0.1:3000', }); - const calls = { rPush: [], lTrim: [] }; + const calls = { rPush: [] }; redis.client = { isReady: true, rPush: async (key, value) => { calls.rPush.push({ key, value }); }, - lTrim: async (key, start, stop) => { - calls.lTrim.push({ key, start, stop }); - }, }; const before = Date.now(); @@ -56,9 +53,6 @@ describe('RedisIntegration protocol', () => { assert.equal(payload.apiBaseUrl, 'http://127.0.0.1:3000'); assert.equal(typeof payload.lastActiveAt, 'number'); assert.ok(payload.lastActiveAt >= before && payload.lastActiveAt <= after); - - assert.equal(calls.lTrim.length, 1); - assert.deepEqual(calls.lTrim[0], { key: '项目心跳', start: -2000, stop: -1 }); }); it('caches heartbeat when redis is not ready and flushes later', async () => { @@ -68,16 +62,13 @@ describe('RedisIntegration protocol', () => { apiBaseUrl: 'http://127.0.0.1:3000', }); - const calls = { rPush: [], lTrim: [] }; + const calls = { rPush: [] }; redis.client = { isReady: false, connect: async () => {}, rPush: async (key, value) => { calls.rPush.push({ key, value }); }, - lTrim: async (key, start, stop) => { - calls.lTrim.push({ key, start, stop }); - }, }; await redis.writeHeartbeat(); @@ -93,7 +84,6 @@ describe('RedisIntegration protocol', () => { assert.equal(payload.projectName, 'BLS主机心跳日志'); assert.equal(payload.apiBaseUrl, 'http://127.0.0.1:3000'); assert.equal(typeof payload.lastActiveAt, 'number'); - assert.equal(calls.lTrim.length, 1); }); it('buffers console logs when redis is not ready', async () => { @@ -103,16 +93,13 @@ describe('RedisIntegration protocol', () => { apiBaseUrl: 'http://127.0.0.1:3000', }); - const calls = { rPush: [], lTrim: [] }; + const calls = { rPush: [] }; redis.client = { isReady: false, connect: async () => {}, rPush: async (key, ...values) => { calls.rPush.push({ key, values }); }, - lTrim: async (key, start, stop) => { - calls.lTrim.push({ key, start, stop }); - }, }; await redis.info('hello', { module: 'test' });