feat: 添加对 Kafka CurrentStatusrestart 值支持,更新 G5 入库逻辑及相关测试

This commit is contained in:
2026-03-18 09:47:33 +08:00
parent 1329eca99e
commit 381080fee0
9 changed files with 97 additions and 16 deletions

View File

@@ -0,0 +1,13 @@
# Change: add restart current_status mapping
## Why
上游 Kafka 的 `CurrentStatus` 新增了 `restart` 值。现有处理逻辑仍按 `on/off` 处理,导致 `restart` 在入库时无法被正确标记为 3。
## What Changes
- 让 Kafka 解析链路接受 `CurrentStatus=restart`
- 将 G5 入库链路中的 `current_status=restart` 映射为 `3`
- 更新相关测试与 OpenSpec 说明,确保 `restart` 是受支持状态。
## Impact
- Affected specs: `openspec/specs/onoffline/spec.md`
- Affected code: `src/processor/index.js`, `src/db/g5DatabaseManager.js`, `tests/processor.test.js`

View File

@@ -0,0 +1,14 @@
## MODIFIED Requirements
### Requirement: 重启数据处理
系统 SHALL 在 `CurrentStatus``restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`
#### Scenario: restart 状态写入
- **GIVEN** Kafka 消息中的 `CurrentStatus``restart`
- **WHEN** 消息被处理并写入数据库
- **THEN** 普通入库链路保留 `restart`G5 入库链路将其写入为 `3`
#### Scenario: 其他状态保持原样
- **GIVEN** Kafka 消息中的 `CurrentStatus``on``off`
- **WHEN** 消息被处理并写入数据库
- **THEN** 系统按既有规则处理该状态值

View File

@@ -0,0 +1,10 @@
## 1. Implementation
- [x] 1.1 Update Kafka row building logic to preserve `restart` as a valid `current_status` value.
- [x] 1.2 Update G5 database mapping so `restart` maps to `3`.
- [x] 1.3 Update processor tests for the `restart` case.
- [x] 1.4 Update OpenSpec requirements for supported current status values.
## 2. Validation
- [x] 2.1 Run `npm run test`.
- [x] 2.2 Run `npm run build`.
- [x] 2.3 Run `openspec validate add-restart-current-status-mapping --strict`.

View File

@@ -12,12 +12,17 @@
- **THEN** current_status 等于 CurrentStatus (截断至 255 字符) - **THEN** current_status 等于 CurrentStatus (截断至 255 字符)
### Requirement: 重启数据处理 ### Requirement: 重启数据处理
系统 SHALL 在 RebootReason 非空时强制 current_status 为 on 系统 SHALL 在 `CurrentStatus``restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`
#### Scenario: 重启数据写入 #### Scenario: restart 状态写入
- **GIVEN** RebootReason 为非空值 - **GIVEN** Kafka 消息中的 `CurrentStatus``restart`
- **WHEN** 消息被处理 - **WHEN** 消息被处理并写入数据库
- **THEN** current_status 等于 on - **THEN** 普通入库链路保留 `restart`G5 入库链路将其写入为 `3`
#### Scenario: 其他状态保持原样
- **GIVEN** Kafka 消息中的 `CurrentStatus``on``off`
- **WHEN** 消息被处理并写入数据库
- **THEN** 系统按既有规则处理该状态值
### Requirement: 空值保留 ### Requirement: 空值保留
系统 SHALL 保留上游空值,不对字段进行补 0。 系统 SHALL 保留上游空值,不对字段进行补 0。

View File

@@ -41,10 +41,10 @@ G5库结构双写临时接入
差异字段: 差异字段:
- guid 为 int4由库自己生成。 - guid 为 int4由库自己生成。
- record_source 固定为 CRICS。 - record_source 固定为 CRICS。
- current_status 为 int2on映射为1off映射为2其余为0。 - current_status 为 int2on映射为1off映射为2restart映射为3其余为0。
支持通过环境变量开关双写。 支持通过环境变量开关双写。
4. 数据处理规则 4. 数据处理规则
非重启数据reboot_reason 为空或不存在current_status 取 CurrentStatus 非重启数据reboot_reason 为空或不存在current_status 取 CurrentStatus
重启数据reboot_reason 不为空current_status 固定为 on 重启数据reboot_reason 不为空时保留 Kafka 上游 current_status 值;若上游值为 restart则入库标记为 restartG5 库映射为 3
其余字段直接按 Kafka 原值落库,空值不补 0 其余字段直接按 Kafka 原值落库,空值不补 0

View File

@@ -18,6 +18,18 @@ const g5Columns = [
'record_source' 'record_source'
]; ];
export const mapCurrentStatusToG5Code = (value) => {
if (value === 1 || value === 2 || value === 3) {
return value;
}
const normalized = typeof value === 'string' ? value.trim().toLowerCase() : '';
if (normalized === 'on') return 1;
if (normalized === 'off') return 2;
if (normalized === 'restart') return 3;
return 0;
};
export class G5DatabaseManager { export class G5DatabaseManager {
constructor(dbConfig) { constructor(dbConfig) {
if (!dbConfig.enabled) return; if (!dbConfig.enabled) return;
@@ -64,9 +76,7 @@ export class G5DatabaseManager {
} }
if (column === 'current_status') { if (column === 'current_status') {
// current_status in G5 is int2 // current_status in G5 is int2
if (row.current_status === 'on') return 1; return mapCurrentStatusToG5Code(row.current_status);
if (row.current_status === 'off') return 2;
return 0;
} }
return row[column] ?? null; return row[column] ?? null;
}); });

View File

@@ -20,6 +20,16 @@ const normalizeText = (value, maxLength) => {
return str; return str;
}; };
const normalizeCurrentStatus = (value) => {
const currentStatus = normalizeText(value, 255);
if (currentStatus === null) return null;
const normalized = currentStatus.toLowerCase();
if (normalized === 'on' || normalized === 'off' || normalized === 'restart') {
return normalized;
}
return currentStatus;
};
export const buildRowsFromMessageValue = (value) => { export const buildRowsFromMessageValue = (value) => {
const payload = parseKafkaPayload(value); const payload = parseKafkaPayload(value);
return buildRowsFromPayload(payload); return buildRowsFromPayload(payload);
@@ -30,9 +40,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Database limit is VARCHAR(255) // Database limit is VARCHAR(255)
const rebootReason = normalizeText(payload.RebootReason, 255); const rebootReason = normalizeText(payload.RebootReason, 255);
const currentStatusRaw = normalizeText(payload.CurrentStatus, 255); const currentStatus = normalizeCurrentStatus(payload.CurrentStatus);
const hasRebootReason = rebootReason !== null && rebootReason !== '';
const currentStatus = hasRebootReason ? 'on' : currentStatusRaw;
// Derive timestamp: UnixTime -> CurrentTime -> Date.now() // Derive timestamp: UnixTime -> CurrentTime -> Date.now()
let tsMs = payload.UnixTime; let tsMs = payload.UnixTime;

View File

@@ -0,0 +1,15 @@
import { describe, it, expect } from 'vitest';
import { mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js';
describe('G5 current_status mapping', () => {
it('maps on/off/restart to numeric codes', () => {
expect(mapCurrentStatusToG5Code('on')).toBe(1);
expect(mapCurrentStatusToG5Code('off')).toBe(2);
expect(mapCurrentStatusToG5Code('restart')).toBe(3);
});
it('returns 0 for unknown values', () => {
expect(mapCurrentStatusToG5Code('idle')).toBe(0);
expect(mapCurrentStatusToG5Code(null)).toBe(0);
});
});

View File

@@ -26,13 +26,19 @@ describe('Processor Logic', () => {
expect(rows[0].reboot_reason).toBeNull(); expect(rows[0].reboot_reason).toBeNull();
}); });
it('should override current_status to on for reboot data', () => { it('should preserve restart current_status for reboot data', () => {
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'off', RebootReason: '0x01' }); const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: '0x01' });
expect(rows).toHaveLength(1); expect(rows).toHaveLength(1);
expect(rows[0].current_status).toBe('on'); expect(rows[0].current_status).toBe('restart');
expect(rows[0].reboot_reason).toBe('0x01'); expect(rows[0].reboot_reason).toBe('0x01');
}); });
it('should preserve restart current_status for non-reboot data', () => {
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: null });
expect(rows).toHaveLength(1);
expect(rows[0].current_status).toBe('restart');
});
it('should keep empty optional fields as empty strings', () => { it('should keep empty optional fields as empty strings', () => {
const rows = buildRowsFromPayload({ const rows = buildRowsFromPayload({
...basePayload, ...basePayload,