diff --git a/bls-rcu-action-backend/src/db/roomStatusManager.js b/bls-rcu-action-backend/src/db/roomStatusManager.js index 9784d83..75708b9 100644 --- a/bls-rcu-action-backend/src/db/roomStatusManager.js +++ b/bls-rcu-action-backend/src/db/roomStatusManager.js @@ -36,7 +36,7 @@ export class RoomStatusManager { * Uses ON CONFLICT for atomic merge. * * @param {Array} rows - Array of merged status objects - * Each: { hotel_id, room_id, device_id, ts_ms, sys_lock_status, dev_loops, faulty_device_count } + * Each: { hotel_id, room_id, device_id, ts_ms, ip, sys_lock_status, dev_loops, faulty_device_count } */ async upsertBatch(rows) { if (!rows || rows.length === 0) return; @@ -52,7 +52,7 @@ export class RoomStatusManager { for (let i = 0; i < rows.length; i++) { const row = rows[i]; - const paramsPerRow = this.omitGuid ? 8 : 9; + const paramsPerRow = this.omitGuid ? 9 : 10; const offset = i * paramsPerRow; if (this.omitGuid) { @@ -61,13 +61,14 @@ export class RoomStatusManager { row.hotel_id, // $2 row.room_id, // $3 row.device_id, // $4 - row.sys_lock_status, // $5 - row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $6 - row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $7 - 1 // $8 online_status + row.ip ?? null, // $5 + row.sys_lock_status, // $6 + row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 + row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8 + 1 // $9 online_status ); const p = (n) => `$${offset + n}`; - placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}::jsonb, ${p(7)}::jsonb, ${p(8)})`); + placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`); } else { values.push( row.guid || randomUUID(), // $1 @@ -75,19 +76,20 @@ export class RoomStatusManager { row.hotel_id, // $3 row.room_id, // $4 row.device_id, // $5 - row.sys_lock_status, // $6 - row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 - row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8 - 1 // $9 online_status + row.ip ?? null, // $6 + row.sys_lock_status, // $7 + row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $8 + row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $9 + 1 // $10 online_status ); const p = (n) => `$${offset + n}`; - placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`); + placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}, ${p(8)}::jsonb, ${p(9)}::jsonb, ${p(10)})`); } } const insertColumns = this.omitGuid - ? 'ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status' - : 'guid, ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status'; + ? 'ts_ms, hotel_id, room_id, device_id, ip, sys_lock_status, dev_loops, faulty_device_count, online_status' + : 'guid, ts_ms, hotel_id, room_id, device_id, ip, sys_lock_status, dev_loops, faulty_device_count, online_status'; const conflictTarget = this.omitGuid ? '(hotel_id, room_id)' @@ -98,6 +100,7 @@ export class RoomStatusManager { ON CONFLICT ${conflictTarget} DO UPDATE SET ts_ms = EXCLUDED.ts_ms, + ip = COALESCE(EXCLUDED.ip, ${this.fullTableName}.ip), online_status = 1, device_id = EXCLUDED.device_id, sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, ${this.fullTableName}.sys_lock_status), diff --git a/bls-rcu-action-backend/src/db/statusBatchProcessor.js b/bls-rcu-action-backend/src/db/statusBatchProcessor.js index 73a1705..904a007 100644 --- a/bls-rcu-action-backend/src/db/statusBatchProcessor.js +++ b/bls-rcu-action-backend/src/db/statusBatchProcessor.js @@ -47,6 +47,17 @@ export class StatusBatchProcessor { add(update) { if (!update) return; + if (this.targetName.startsWith('g5:') && !update.ip) { + logger.info('Status update skipped for empty ip', { + target: this.targetName, + hotel_id: update.hotel_id, + room_id: update.room_id, + device_id: update.device_id, + ts_ms: update.ts_ms + }); + return; + } + const key = this._key(update); const existing = this.buffer.get(key); @@ -64,6 +75,11 @@ export class StatusBatchProcessor { existing.sys_lock_status = update.sys_lock_status; } + // ip: prefer newer non-null value + if (update.ip != null) { + existing.ip = update.ip; + } + // dev_loops: merge keys (new overwrites old for same key) if (update.dev_loops) { existing.dev_loops = existing.dev_loops diff --git a/bls-rcu-action-backend/src/processor/statusExtractor.js b/bls-rcu-action-backend/src/processor/statusExtractor.js index 74a4ed4..bd238e9 100644 --- a/bls-rcu-action-backend/src/processor/statusExtractor.js +++ b/bls-rcu-action-backend/src/processor/statusExtractor.js @@ -20,6 +20,12 @@ const pad3 = (val) => String(val).padStart(3, '0'); const buildLoopKey = (devType, devAddr, devLoop) => `${pad3(devType)}${pad3(devAddr)}${pad3(devLoop)}`; +const normalizeIp = (value) => { + if (value == null) return null; + const text = String(value).trim(); + return text.length > 0 ? text : null; +}; + /** * Extract a status update object from a validated Kafka payload. * @@ -34,14 +40,18 @@ export const extractStatusUpdate = (payload) => { room_id, device_id, ts_ms, + ip, sys_lock_status, device_list = [], fault_list = [], control_list = [], + extra = {}, direction, cmd_word } = payload; + const resolvedIp = normalizeIp(ip ?? extra?.ip ?? null); + // Must have identity fields if (hotel_id == null || !room_id || !device_id || !ts_ms) { return null; @@ -98,6 +108,7 @@ export const extractStatusUpdate = (payload) => { room_id: String(room_id), device_id: String(device_id), ts_ms, + ip: resolvedIp, sys_lock_status: sys_lock_status ?? null, dev_loops: devLoops, faulty_device_count: faultyDeviceCount diff --git a/bls-rcu-action-backend/src/schema/kafkaPayload.js b/bls-rcu-action-backend/src/schema/kafkaPayload.js index faaf839..4509956 100644 --- a/bls-rcu-action-backend/src/schema/kafkaPayload.js +++ b/bls-rcu-action-backend/src/schema/kafkaPayload.js @@ -47,6 +47,7 @@ export const kafkaPayloadSchema = z.object({ cmd_word: z.union([z.string(), z.number()]).transform(val => String(val)), frame_id: z.number(), udp_raw: z.string(), + ip: z.string().optional().nullable(), // Optional Statistical/Status Fields sys_lock_status: z.number().optional().nullable(), diff --git a/bls-rcu-action-backend/tests/status_batch_processor.test.js b/bls-rcu-action-backend/tests/status_batch_processor.test.js index 10bc7fc..4b770c9 100644 --- a/bls-rcu-action-backend/tests/status_batch_processor.test.js +++ b/bls-rcu-action-backend/tests/status_batch_processor.test.js @@ -25,6 +25,7 @@ describe('StatusBatchProcessor', () => { room_id: '8001', device_id: 'dev_001', ts_ms: 1700000000000, + ip: '10.1.2.3', sys_lock_status: null, dev_loops: null, faulty_device_count: null, @@ -105,6 +106,29 @@ describe('StatusBatchProcessor', () => { expect(processor.buffer.size).toBe(2); }); + it('should skip empty ip for g5 target', () => { + processor = new StatusBatchProcessor(mockManager, { + flushInterval: 50000, + maxBufferSize: 100, + targetName: 'g5:room_status.room_status_moment_g5' + }); + + processor.add(makeUpdate({ ip: null })); + + expect(processor.buffer.size).toBe(0); + expect(mockManager.upsertBatch).not.toHaveBeenCalled(); + }); + + it('should preserve ip when flushing rows', async () => { + processor.add(makeUpdate({ ip: '10.9.8.7' })); + + await processor.flush(); + + expect(mockManager.upsertBatch).toHaveBeenCalledTimes(1); + const rows = mockManager.upsertBatch.mock.calls[0][0]; + expect(rows[0].ip).toBe('10.9.8.7'); + }); + it('should clear buffer after flush', async () => { processor.add(makeUpdate()); expect(processor.buffer.size).toBe(1); diff --git a/bls-rcu-action-backend/tests/status_extractor.test.js b/bls-rcu-action-backend/tests/status_extractor.test.js index 485d997..fdb6739 100644 --- a/bls-rcu-action-backend/tests/status_extractor.test.js +++ b/bls-rcu-action-backend/tests/status_extractor.test.js @@ -45,6 +45,27 @@ describe('StatusExtractor', () => { expect(result.device_id).toBe('dev_001'); }); + it('should normalize empty ip to null', () => { + const result = extractStatusUpdate({ + ...base, + ip: ' ', + sys_lock_status: 1 + }); + + expect(result).not.toBeNull(); + expect(result.ip).toBeNull(); + }); + + it('should preserve non-empty ip', () => { + const result = extractStatusUpdate({ + ...base, + ip: '10.1.2.3', + sys_lock_status: 1 + }); + + expect(result.ip).toBe('10.1.2.3'); + }); + it('should build dev_loops from device_list with 9-digit padded keys', () => { const result = extractStatusUpdate({ ...base, diff --git a/openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md b/openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md new file mode 100644 index 0000000..85e2fe8 --- /dev/null +++ b/openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md @@ -0,0 +1,27 @@ +# G5 room_status 空 IP 跳过写入 + +## 背景 +Kafka payload 中包含 `ip` 字段,但该字段可能为空。对于 `room_status_moment_g5`,当 `ip` 为空时,应直接跳过该次状态写入,避免更新 `ts_ms`,也避免写入不完整数据。 + +## 目标 +1. G5 `room_status_moment_g5` 写入时补充 `ip` 字段。 +2. 当 `ip` 为空或空白时,G5 直接跳过这条状态,不进入缓冲、不触发 upsert、不更新 `ts_ms`。 +3. G4 逻辑保持现状,不受 G5 空 IP 策略影响。 +4. 保持 `ip` 作为有效值时的正常 upsert 与字段合并。 + +## 变更范围 +- `src/processor/statusExtractor.js` + - 规范化 `ip`,将空字符串/空白字符串视为 `null` +- `src/db/statusBatchProcessor.js` + - G5 目标在 `ip` 为空时直接跳过 +- `src/db/roomStatusManager.js` + - 在 upsert 中增加 `ip` 写入与更新逻辑 +- `tests/status_extractor.test.js` + - 增加 `ip` 归一化测试 +- `tests/status_batch_processor.test.js` + - 增加 G5 空 `ip` 直接跳过测试 + +## 验收标准 +1. G5 表 `room_status_moment_g5` 的记录可写入 `ip` 字段。 +2. `ip` 为空的 G5 状态不产生任何数据库写入。 +3. 现有测试通过。 diff --git a/openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md b/openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md new file mode 100644 index 0000000..31d4f64 --- /dev/null +++ b/openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md @@ -0,0 +1,16 @@ +# 2026-03-18 G5 空 IP 跳过写入修正 + +## 结果 +已修正 room_status_moment_g5 的写入规则: +- 现在会写入 `ip` 字段 +- `ip` 为空或空白时,G5 状态直接跳过,不更新 `ts_ms` +- G4 逻辑不受影响 + +## 关键实现 +- `statusExtractor` 规范化 `ip` +- `statusBatchProcessor` 对 G5 空 `ip` 直接丢弃 +- `roomStatusManager` 的 upsert 增加 `ip` 列 + +## 验证 +- 新增测试覆盖 `ip` 归一化与 G5 空 `ip` 跳过 +- 现有测试应保持通过