Files
Web_BLS_Heartbeat_Server/scripts/kafka/decodeMessage.js
XuJiacheng c0cdc9ea66 feat: 更新 Kafka 配置和数据库管理逻辑
- 在 .env.example 中添加 Kafka 配置项:KAFKA_FETCH_MAX_BYTES, KAFKA_FETCH_MIN_BYTES, KAFKA_FETCH_MAX_WAIT_MS。
- 删除 room_status_sync 提案及相关文档。
- 删除 fix_uint64_overflow 提案及相关文档。
- 更新数据库管理器以支持使用 COPY 语句进行高效数据写入,替换批量 INSERT 逻辑。
- 实现心跳数据的整数溢出处理,确保无效数据被持久化到 heartbeat_events_errors 表。
- 更新处理器规范,确保心跳数据成功写入历史表后触发 room_status 同步。
- 添加新文档,描述新的分区方法案例。
- 归档旧的提案和规范文档以保持项目整洁。
2026-03-03 18:22:12 +08:00

67 lines
2.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import fs from 'node:fs';
import path from 'node:path';
import process from 'node:process';
import { HeartbeatProcessor } from '../../src/processor/heartbeatProcessor.js';
function usageAndExit(code = 1) {
console.log(`\n用法:\n node scripts/kafka/decodeMessage.js --base64 <str>\n node scripts/kafka/decodeMessage.js --hex <str>\n node scripts/kafka/decodeMessage.js --file <path> [--encoding base64|hex|raw]\n\n说明:\n- 用于验证 Kafka message.value 的反向解码结果(对端为 JSON + UTF-8 bytes\n- 会尝试UTF-8 JSON / base64 -> (gzip|deflate|raw deflate|brotli) 循环解压(兼容但对端当前未用)\n`);
process.exit(code);
}
function parseArgs(argv) {
const args = {};
for (let i = 2; i < argv.length; i++) {
const a = argv[i];
if (a === '--base64') args.base64 = argv[++i];
else if (a === '--hex') args.hex = argv[++i];
else if (a === '--file') args.file = argv[++i];
else if (a === '--encoding') args.encoding = argv[++i];
else if (a === '--help' || a === '-h') args.help = true;
else args._ = [...(args._ ?? []), a];
}
return args;
}
const args = parseArgs(process.argv);
if (args.help) usageAndExit(0);
const processor = new HeartbeatProcessor(
{ batchSize: 30000, batchTimeout: 5000 },
{ insertHeartbeatEvents: async () => {} }
);
let buf;
if (args.base64) {
buf = Buffer.from(String(args.base64).trim(), 'base64');
} else if (args.hex) {
buf = Buffer.from(String(args.hex).trim().replace(/\s+/g, ''), 'hex');
} else if (args.file) {
const p = path.resolve(process.cwd(), args.file);
const raw = fs.readFileSync(p);
const enc = (args.encoding ?? 'raw').toLowerCase();
if (enc === 'raw') buf = raw;
else if (enc === 'base64') buf = Buffer.from(raw.toString('utf8').trim(), 'base64');
else if (enc === 'hex') buf = Buffer.from(raw.toString('utf8').trim().replace(/\s+/g, ''), 'hex');
else {
console.error('未知 encoding:', enc);
usageAndExit(1);
}
} else {
usageAndExit(1);
}
try {
const obj = processor.decodeToObject(buf);
const items = Array.isArray(obj) ? obj : [obj];
console.log('[decode] ok; items:', items.length);
console.log(JSON.stringify(obj, null, 2));
const normalized = items.map((x) => processor.normalizeHeartbeat(processor.unwrapPayload(x)));
const validCount = normalized.filter((x) => processor.validateData(x)).length;
console.log('[normalize] valid (required fields present):', validCount, '/', items.length);
} catch (err) {
console.error('[decode] failed:', err);
process.exitCode = 1;
}