From 139d74d9443ebed5d6448a53fe5e63b110738ce1 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Wed, 18 Mar 2026 19:42:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=92=8C=E8=A7=A3=E6=9E=90=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=20IP=20=E5=AD=97=E6=AE=B5=EF=BC=9B?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=89=B9=E9=87=8F=E6=8F=92=E5=85=A5=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=BB=A5=E5=A4=84=E7=90=86=E6=96=B0=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=EF=BC=9B=E5=A2=9E=E5=8A=A0=E7=9B=B8=E5=BA=94=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/db/heartbeatDbManager.js | 17 ++++++++++---- .../src/processor/heartbeatParser.js | 12 +++++++--- .../tests/heartbeat_db_manager.test.js | 23 ++++++++++--------- .../tests/heartbeat_parser.test.js | 18 +++++++++++++++ 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js b/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js index ebc56a2..e3b1ab3 100644 --- a/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js +++ b/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js @@ -5,7 +5,7 @@ const { Pool } = pg; const SMALLINT_MIN = -32768; const SMALLINT_MAX = 32767; -const PARAMS_PER_ROW = 4; +const PARAMS_PER_ROW = 5; const PG_MAX_BIND_PARAMS = 65535; const MAX_ROWS_PER_STATEMENT = Math.floor(PG_MAX_BIND_PARAMS / PARAMS_PER_ROW); @@ -48,7 +48,7 @@ export class HeartbeatDbManager { /** * Batch upsert heartbeat rows. * 为避免 PostgreSQL bind 参数上限,按 upsertChunkSize 分片执行。 - * @param {Array<{ts_ms: number, hotel_id: string, room_id: string, device_id: string}>} rows + * @param {Array<{ts_ms: number, hotel_id: string, room_id: string, device_id: string, ip?: string}>} rows */ async upsertBatch(rows) { if (!rows || rows.length === 0) return; @@ -66,14 +66,20 @@ export class HeartbeatDbManager { for (let i = 0; i < rows.length; i++) { const row = rows[i]; const offset = i * PARAMS_PER_ROW; - values.push(normalizeHotelId(row.hotel_id), row.room_id, row.device_id, row.ts_ms); + values.push( + normalizeHotelId(row.hotel_id), + row.room_id, + row.device_id, + row.ts_ms, + typeof row.ip === 'string' && row.ip.trim() ? row.ip.trim() : null + ); placeholders.push( - `($${offset + 1}::smallint, $${offset + 2}, $${offset + 3}, $${offset + 4}, 1)` + `($${offset + 1}::smallint, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, 1)` ); } const sql = ` - INSERT INTO ${this.fullTableName} (hotel_id, room_id, device_id, ts_ms, online_status) + INSERT INTO ${this.fullTableName} (hotel_id, room_id, device_id, ts_ms, ip, online_status) VALUES ${placeholders.join(', ')} ON CONFLICT (hotel_id, room_id) DO UPDATE SET @@ -82,6 +88,7 @@ export class HeartbeatDbManager { WHEN EXCLUDED.ts_ms >= ${this.fullTableName}.ts_ms THEN EXCLUDED.device_id ELSE ${this.fullTableName}.device_id END, + ip = COALESCE(EXCLUDED.ip, ${this.fullTableName}.ip), online_status = 1 `; diff --git a/bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js b/bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js index 71661f8..35ed9a3 100644 --- a/bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js +++ b/bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js @@ -30,7 +30,7 @@ const isDigitsOnly = (value) => { /** * 解析 Kafka 消息并提取心跳字段 * @param {string} raw - JSON string from Kafka - * @returns {{ ts_ms: number, hotel_id: string, room_id: string, device_id: string } | null} + * @returns {{ ts_ms: number, hotel_id: string, room_id: string, device_id: string, ip?: string } | null} */ export const parseHeartbeat = (raw) => { const parsed = JSON.parse(raw); @@ -39,7 +39,7 @@ export const parseHeartbeat = (raw) => { return null; } - const { ts_ms: tsMs, hotel_id: hotelId, room_id: roomId, device_id: deviceId } = parsed; + const { ts_ms: tsMs, hotel_id: hotelId, room_id: roomId, device_id: deviceId, ip } = parsed; if (!Number.isFinite(tsMs) || !isDigitsOnly(hotelId)) { return null; @@ -49,10 +49,16 @@ export const parseHeartbeat = (raw) => { return null; } - return { + const heartbeat = { ts_ms: tsMs, hotel_id: hotelId, room_id: roomId, device_id: deviceId }; + + if (typeof ip === 'string' && ip.trim()) { + heartbeat.ip = ip.trim(); + } + + return heartbeat; }; diff --git a/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js b/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js index d464e7b..ecd2bcc 100644 --- a/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js +++ b/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js @@ -35,7 +35,7 @@ describe('HeartbeatDbManager', () => { }); await manager.upsertBatch([ - { hotel_id: '1', room_id: '101', device_id: 'dev-a', ts_ms: 1000 } + { hotel_id: '1', room_id: '101', device_id: 'dev-a', ts_ms: 1000, ip: '10.0.0.1' } ]); expect(queryMock).toHaveBeenCalledTimes(1); @@ -44,8 +44,9 @@ describe('HeartbeatDbManager', () => { expect(sql).toContain('online_status = 1'); expect(sql).toContain('ts_ms = GREATEST(EXCLUDED.ts_ms, room_status.room_status_moment_g5.ts_ms)'); expect(sql).toContain('WHEN EXCLUDED.ts_ms >= room_status.room_status_moment_g5.ts_ms THEN EXCLUDED.device_id'); + expect(sql).toContain('ip = COALESCE(EXCLUDED.ip, room_status.room_status_moment_g5.ip)'); expect(sql).not.toContain('WHERE EXCLUDED.ts_ms >= room_status.room_status_moment_g5.ts_ms'); - expect(values).toEqual([1, '101', 'dev-a', 1000]); + expect(values).toEqual([1, '101', 'dev-a', 1000, '10.0.0.1']); }); it('should write hotel_id as 0 when it is outside the smallint range', async () => { @@ -68,7 +69,7 @@ describe('HeartbeatDbManager', () => { expect(queryMock).toHaveBeenCalledTimes(1); const [, values] = queryMock.mock.calls[0]; - expect(values).toEqual([0, '101', 'dev-a', 1000]); + expect(values).toEqual([0, '101', 'dev-a', 1000, null]); }); it('should split large batches into multiple upsert queries', async () => { @@ -87,16 +88,16 @@ describe('HeartbeatDbManager', () => { }); await manager.upsertBatch([ - { hotel_id: '1', room_id: '101', device_id: 'dev-a', ts_ms: 1000 }, - { hotel_id: '1', room_id: '102', device_id: 'dev-b', ts_ms: 1001 }, - { hotel_id: '1', room_id: '103', device_id: 'dev-c', ts_ms: 1002 }, - { hotel_id: '1', room_id: '104', device_id: 'dev-d', ts_ms: 1003 }, - { hotel_id: '1', room_id: '105', device_id: 'dev-e', ts_ms: 1004 } + { hotel_id: '1', room_id: '101', device_id: 'dev-a', ts_ms: 1000, ip: '10.0.0.1' }, + { hotel_id: '1', room_id: '102', device_id: 'dev-b', ts_ms: 1001, ip: '10.0.0.2' }, + { hotel_id: '1', room_id: '103', device_id: 'dev-c', ts_ms: 1002, ip: '10.0.0.3' }, + { hotel_id: '1', room_id: '104', device_id: 'dev-d', ts_ms: 1003, ip: '10.0.0.4' }, + { hotel_id: '1', room_id: '105', device_id: 'dev-e', ts_ms: 1004, ip: '10.0.0.5' } ]); expect(queryMock).toHaveBeenCalledTimes(3); - expect(queryMock.mock.calls[0][1]).toEqual([1, '101', 'dev-a', 1000, 1, '102', 'dev-b', 1001]); - expect(queryMock.mock.calls[1][1]).toEqual([1, '103', 'dev-c', 1002, 1, '104', 'dev-d', 1003]); - expect(queryMock.mock.calls[2][1]).toEqual([1, '105', 'dev-e', 1004]); + expect(queryMock.mock.calls[0][1]).toEqual([1, '101', 'dev-a', 1000, '10.0.0.1', 1, '102', 'dev-b', 1001, '10.0.0.2']); + expect(queryMock.mock.calls[1][1]).toEqual([1, '103', 'dev-c', 1002, '10.0.0.3', 1, '104', 'dev-d', 1003, '10.0.0.4']); + expect(queryMock.mock.calls[2][1]).toEqual([1, '105', 'dev-e', 1004, '10.0.0.5']); }); }); \ No newline at end of file diff --git a/bls-oldrcu-heartbeat-backend/tests/heartbeat_parser.test.js b/bls-oldrcu-heartbeat-backend/tests/heartbeat_parser.test.js index 2d5d211..2cd3694 100644 --- a/bls-oldrcu-heartbeat-backend/tests/heartbeat_parser.test.js +++ b/bls-oldrcu-heartbeat-backend/tests/heartbeat_parser.test.js @@ -3,6 +3,24 @@ import { parseHeartbeat } from '../src/processor/heartbeatParser.js'; describe('parseHeartbeat', () => { it('should parse valid heartbeat message', () => { + const raw = JSON.stringify({ + ts_ms: 1710000000000, + hotel_id: '2045', + room_id: '101', + device_id: 'abc123', + ip: '10.1.2.3' + }); + const result = parseHeartbeat(raw); + expect(result).toEqual({ + ts_ms: 1710000000000, + hotel_id: '2045', + room_id: '101', + device_id: 'abc123', + ip: '10.1.2.3' + }); + }); + + it('should allow heartbeat without ip field', () => { const raw = JSON.stringify({ ts_ms: 1710000000000, hotel_id: '2045',