feat: 实现 G5 room_status 状态更新时跳过空 IP,增加 IP 归一化逻辑,更新相关测试
This commit is contained in:
@@ -36,7 +36,7 @@ export class RoomStatusManager {
|
||||
* Uses ON CONFLICT for atomic merge.
|
||||
*
|
||||
* @param {Array<Object>} rows - Array of merged status objects
|
||||
* Each: { hotel_id, room_id, device_id, ts_ms, sys_lock_status, dev_loops, faulty_device_count }
|
||||
* Each: { hotel_id, room_id, device_id, ts_ms, ip, sys_lock_status, dev_loops, faulty_device_count }
|
||||
*/
|
||||
async upsertBatch(rows) {
|
||||
if (!rows || rows.length === 0) return;
|
||||
@@ -52,7 +52,7 @@ export class RoomStatusManager {
|
||||
|
||||
for (let i = 0; i < rows.length; i++) {
|
||||
const row = rows[i];
|
||||
const paramsPerRow = this.omitGuid ? 8 : 9;
|
||||
const paramsPerRow = this.omitGuid ? 9 : 10;
|
||||
const offset = i * paramsPerRow;
|
||||
|
||||
if (this.omitGuid) {
|
||||
@@ -61,13 +61,14 @@ export class RoomStatusManager {
|
||||
row.hotel_id, // $2
|
||||
row.room_id, // $3
|
||||
row.device_id, // $4
|
||||
row.sys_lock_status, // $5
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $6
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $7
|
||||
1 // $8 online_status
|
||||
row.ip ?? null, // $5
|
||||
row.sys_lock_status, // $6
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8
|
||||
1 // $9 online_status
|
||||
);
|
||||
const p = (n) => `$${offset + n}`;
|
||||
placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}::jsonb, ${p(7)}::jsonb, ${p(8)})`);
|
||||
placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`);
|
||||
} else {
|
||||
values.push(
|
||||
row.guid || randomUUID(), // $1
|
||||
@@ -75,19 +76,20 @@ export class RoomStatusManager {
|
||||
row.hotel_id, // $3
|
||||
row.room_id, // $4
|
||||
row.device_id, // $5
|
||||
row.sys_lock_status, // $6
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8
|
||||
1 // $9 online_status
|
||||
row.ip ?? null, // $6
|
||||
row.sys_lock_status, // $7
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $8
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $9
|
||||
1 // $10 online_status
|
||||
);
|
||||
const p = (n) => `$${offset + n}`;
|
||||
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`);
|
||||
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}, ${p(8)}::jsonb, ${p(9)}::jsonb, ${p(10)})`);
|
||||
}
|
||||
}
|
||||
|
||||
const insertColumns = this.omitGuid
|
||||
? 'ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status'
|
||||
: 'guid, ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status';
|
||||
? 'ts_ms, hotel_id, room_id, device_id, ip, sys_lock_status, dev_loops, faulty_device_count, online_status'
|
||||
: 'guid, ts_ms, hotel_id, room_id, device_id, ip, sys_lock_status, dev_loops, faulty_device_count, online_status';
|
||||
|
||||
const conflictTarget = this.omitGuid
|
||||
? '(hotel_id, room_id)'
|
||||
@@ -98,6 +100,7 @@ export class RoomStatusManager {
|
||||
ON CONFLICT ${conflictTarget}
|
||||
DO UPDATE SET
|
||||
ts_ms = EXCLUDED.ts_ms,
|
||||
ip = COALESCE(EXCLUDED.ip, ${this.fullTableName}.ip),
|
||||
online_status = 1,
|
||||
device_id = EXCLUDED.device_id,
|
||||
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, ${this.fullTableName}.sys_lock_status),
|
||||
|
||||
@@ -47,6 +47,17 @@ export class StatusBatchProcessor {
|
||||
add(update) {
|
||||
if (!update) return;
|
||||
|
||||
if (this.targetName.startsWith('g5:') && !update.ip) {
|
||||
logger.info('Status update skipped for empty ip', {
|
||||
target: this.targetName,
|
||||
hotel_id: update.hotel_id,
|
||||
room_id: update.room_id,
|
||||
device_id: update.device_id,
|
||||
ts_ms: update.ts_ms
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const key = this._key(update);
|
||||
const existing = this.buffer.get(key);
|
||||
|
||||
@@ -64,6 +75,11 @@ export class StatusBatchProcessor {
|
||||
existing.sys_lock_status = update.sys_lock_status;
|
||||
}
|
||||
|
||||
// ip: prefer newer non-null value
|
||||
if (update.ip != null) {
|
||||
existing.ip = update.ip;
|
||||
}
|
||||
|
||||
// dev_loops: merge keys (new overwrites old for same key)
|
||||
if (update.dev_loops) {
|
||||
existing.dev_loops = existing.dev_loops
|
||||
|
||||
@@ -20,6 +20,12 @@ const pad3 = (val) => String(val).padStart(3, '0');
|
||||
const buildLoopKey = (devType, devAddr, devLoop) =>
|
||||
`${pad3(devType)}${pad3(devAddr)}${pad3(devLoop)}`;
|
||||
|
||||
const normalizeIp = (value) => {
|
||||
if (value == null) return null;
|
||||
const text = String(value).trim();
|
||||
return text.length > 0 ? text : null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Extract a status update object from a validated Kafka payload.
|
||||
*
|
||||
@@ -34,14 +40,18 @@ export const extractStatusUpdate = (payload) => {
|
||||
room_id,
|
||||
device_id,
|
||||
ts_ms,
|
||||
ip,
|
||||
sys_lock_status,
|
||||
device_list = [],
|
||||
fault_list = [],
|
||||
control_list = [],
|
||||
extra = {},
|
||||
direction,
|
||||
cmd_word
|
||||
} = payload;
|
||||
|
||||
const resolvedIp = normalizeIp(ip ?? extra?.ip ?? null);
|
||||
|
||||
// Must have identity fields
|
||||
if (hotel_id == null || !room_id || !device_id || !ts_ms) {
|
||||
return null;
|
||||
@@ -98,6 +108,7 @@ export const extractStatusUpdate = (payload) => {
|
||||
room_id: String(room_id),
|
||||
device_id: String(device_id),
|
||||
ts_ms,
|
||||
ip: resolvedIp,
|
||||
sys_lock_status: sys_lock_status ?? null,
|
||||
dev_loops: devLoops,
|
||||
faulty_device_count: faultyDeviceCount
|
||||
|
||||
@@ -47,6 +47,7 @@ export const kafkaPayloadSchema = z.object({
|
||||
cmd_word: z.union([z.string(), z.number()]).transform(val => String(val)),
|
||||
frame_id: z.number(),
|
||||
udp_raw: z.string(),
|
||||
ip: z.string().optional().nullable(),
|
||||
|
||||
// Optional Statistical/Status Fields
|
||||
sys_lock_status: z.number().optional().nullable(),
|
||||
|
||||
@@ -25,6 +25,7 @@ describe('StatusBatchProcessor', () => {
|
||||
room_id: '8001',
|
||||
device_id: 'dev_001',
|
||||
ts_ms: 1700000000000,
|
||||
ip: '10.1.2.3',
|
||||
sys_lock_status: null,
|
||||
dev_loops: null,
|
||||
faulty_device_count: null,
|
||||
@@ -105,6 +106,29 @@ describe('StatusBatchProcessor', () => {
|
||||
expect(processor.buffer.size).toBe(2);
|
||||
});
|
||||
|
||||
it('should skip empty ip for g5 target', () => {
|
||||
processor = new StatusBatchProcessor(mockManager, {
|
||||
flushInterval: 50000,
|
||||
maxBufferSize: 100,
|
||||
targetName: 'g5:room_status.room_status_moment_g5'
|
||||
});
|
||||
|
||||
processor.add(makeUpdate({ ip: null }));
|
||||
|
||||
expect(processor.buffer.size).toBe(0);
|
||||
expect(mockManager.upsertBatch).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should preserve ip when flushing rows', async () => {
|
||||
processor.add(makeUpdate({ ip: '10.9.8.7' }));
|
||||
|
||||
await processor.flush();
|
||||
|
||||
expect(mockManager.upsertBatch).toHaveBeenCalledTimes(1);
|
||||
const rows = mockManager.upsertBatch.mock.calls[0][0];
|
||||
expect(rows[0].ip).toBe('10.9.8.7');
|
||||
});
|
||||
|
||||
it('should clear buffer after flush', async () => {
|
||||
processor.add(makeUpdate());
|
||||
expect(processor.buffer.size).toBe(1);
|
||||
|
||||
@@ -45,6 +45,27 @@ describe('StatusExtractor', () => {
|
||||
expect(result.device_id).toBe('dev_001');
|
||||
});
|
||||
|
||||
it('should normalize empty ip to null', () => {
|
||||
const result = extractStatusUpdate({
|
||||
...base,
|
||||
ip: ' ',
|
||||
sys_lock_status: 1
|
||||
});
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result.ip).toBeNull();
|
||||
});
|
||||
|
||||
it('should preserve non-empty ip', () => {
|
||||
const result = extractStatusUpdate({
|
||||
...base,
|
||||
ip: '10.1.2.3',
|
||||
sys_lock_status: 1
|
||||
});
|
||||
|
||||
expect(result.ip).toBe('10.1.2.3');
|
||||
});
|
||||
|
||||
it('should build dev_loops from device_list with 9-digit padded keys', () => {
|
||||
const result = extractStatusUpdate({
|
||||
...base,
|
||||
|
||||
27
openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md
Normal file
27
openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md
Normal file
@@ -0,0 +1,27 @@
|
||||
# G5 room_status 空 IP 跳过写入
|
||||
|
||||
## 背景
|
||||
Kafka payload 中包含 `ip` 字段,但该字段可能为空。对于 `room_status_moment_g5`,当 `ip` 为空时,应直接跳过该次状态写入,避免更新 `ts_ms`,也避免写入不完整数据。
|
||||
|
||||
## 目标
|
||||
1. G5 `room_status_moment_g5` 写入时补充 `ip` 字段。
|
||||
2. 当 `ip` 为空或空白时,G5 直接跳过这条状态,不进入缓冲、不触发 upsert、不更新 `ts_ms`。
|
||||
3. G4 逻辑保持现状,不受 G5 空 IP 策略影响。
|
||||
4. 保持 `ip` 作为有效值时的正常 upsert 与字段合并。
|
||||
|
||||
## 变更范围
|
||||
- `src/processor/statusExtractor.js`
|
||||
- 规范化 `ip`,将空字符串/空白字符串视为 `null`
|
||||
- `src/db/statusBatchProcessor.js`
|
||||
- G5 目标在 `ip` 为空时直接跳过
|
||||
- `src/db/roomStatusManager.js`
|
||||
- 在 upsert 中增加 `ip` 写入与更新逻辑
|
||||
- `tests/status_extractor.test.js`
|
||||
- 增加 `ip` 归一化测试
|
||||
- `tests/status_batch_processor.test.js`
|
||||
- 增加 G5 空 `ip` 直接跳过测试
|
||||
|
||||
## 验收标准
|
||||
1. G5 表 `room_status_moment_g5` 的记录可写入 `ip` 字段。
|
||||
2. `ip` 为空的 G5 状态不产生任何数据库写入。
|
||||
3. 现有测试通过。
|
||||
16
openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md
Normal file
16
openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md
Normal file
@@ -0,0 +1,16 @@
|
||||
# 2026-03-18 G5 空 IP 跳过写入修正
|
||||
|
||||
## 结果
|
||||
已修正 room_status_moment_g5 的写入规则:
|
||||
- 现在会写入 `ip` 字段
|
||||
- `ip` 为空或空白时,G5 状态直接跳过,不更新 `ts_ms`
|
||||
- G4 逻辑不受影响
|
||||
|
||||
## 关键实现
|
||||
- `statusExtractor` 规范化 `ip`
|
||||
- `statusBatchProcessor` 对 G5 空 `ip` 直接丢弃
|
||||
- `roomStatusManager` 的 upsert 增加 `ip` 列
|
||||
|
||||
## 验证
|
||||
- 新增测试覆盖 `ip` 归一化与 G5 空 `ip` 跳过
|
||||
- 现有测试应保持通过
|
||||
Reference in New Issue
Block a user