Files
Web_BLS_Heartbeat_Server/scripts/kafka/decodeMessage.js
XuJiacheng 910f1c353f feat: 实现Redis集成与Kafka消息处理优化
- 新增Redis集成模块,支持心跳写入与控制台日志队列
- 优化Kafka消费者实现,支持多实例与自动重连
- 改进消息处理器,支持批量处理与多层解码
- 更新数据库表结构,调整字段类型与约束
- 添加Redis与Kafka的配置项和环境变量支持
- 补充测试用例和文档说明
2026-01-14 17:58:45 +08:00

67 lines
2.5 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import fs from 'node:fs';
import path from 'node:path';
import process from 'node:process';
import { HeartbeatProcessor } from '../../src/processor/heartbeatProcessor.js';
function usageAndExit(code = 1) {
console.log(`\n用法:\n node scripts/kafka/decodeMessage.js --base64 <str>\n node scripts/kafka/decodeMessage.js --hex <str>\n node scripts/kafka/decodeMessage.js --file <path> [--encoding base64|hex|raw]\n\n说明:\n- 用于验证 Kafka message.value 的反向解码结果(对端为 JSON + UTF-8 bytes\n- 会尝试UTF-8 JSON / base64 -> (gzip|deflate|raw deflate|brotli) 循环解压(兼容但对端当前未用)\n`);
process.exit(code);
}
function parseArgs(argv) {
const args = {};
for (let i = 2; i < argv.length; i++) {
const a = argv[i];
if (a === '--base64') args.base64 = argv[++i];
else if (a === '--hex') args.hex = argv[++i];
else if (a === '--file') args.file = argv[++i];
else if (a === '--encoding') args.encoding = argv[++i];
else if (a === '--help' || a === '-h') args.help = true;
else args._ = [...(args._ ?? []), a];
}
return args;
}
const args = parseArgs(process.argv);
if (args.help) usageAndExit(0);
const processor = new HeartbeatProcessor(
{ batchSize: 9999, batchTimeout: 1000 },
{ insertHeartbeatEvents: async () => {} }
);
let buf;
if (args.base64) {
buf = Buffer.from(String(args.base64).trim(), 'base64');
} else if (args.hex) {
buf = Buffer.from(String(args.hex).trim().replace(/\s+/g, ''), 'hex');
} else if (args.file) {
const p = path.resolve(process.cwd(), args.file);
const raw = fs.readFileSync(p);
const enc = (args.encoding ?? 'raw').toLowerCase();
if (enc === 'raw') buf = raw;
else if (enc === 'base64') buf = Buffer.from(raw.toString('utf8').trim(), 'base64');
else if (enc === 'hex') buf = Buffer.from(raw.toString('utf8').trim().replace(/\s+/g, ''), 'hex');
else {
console.error('未知 encoding:', enc);
usageAndExit(1);
}
} else {
usageAndExit(1);
}
try {
const obj = processor.decodeToObject(buf);
const items = Array.isArray(obj) ? obj : [obj];
console.log('[decode] ok; items:', items.length);
console.log(JSON.stringify(obj, null, 2));
const normalized = items.map((x) => processor.normalizeHeartbeat(processor.unwrapPayload(x)));
const validCount = normalized.filter((x) => processor.validateData(x)).length;
console.log('[normalize] valid (required fields present):', validCount, '/', items.length);
} catch (err) {
console.error('[decode] failed:', err);
process.exitCode = 1;
}