feat: 实现Redis集成与Kafka消息处理优化

- 新增Redis集成模块,支持心跳写入与控制台日志队列
- 优化Kafka消费者实现,支持多实例与自动重连
- 改进消息处理器,支持批量处理与多层解码
- 更新数据库表结构,调整字段类型与约束
- 添加Redis与Kafka的配置项和环境变量支持
- 补充测试用例和文档说明
This commit is contained in:
2026-01-14 17:58:45 +08:00
parent eb94aaf92b
commit 910f1c353f
28 changed files with 1691 additions and 177 deletions

View File

@@ -1,16 +1,40 @@
// 配置文件示例
// 复制此文件为 config.js 并填写实际配置
const env = process.env;
const envList = (v) =>
String(v ?? '')
.split(',')
.map((s) => s.trim())
.filter(Boolean);
export default {
// Redis 对接(严格按 docs/redis-integration-protocol.md
redis: {
enabled: true,
projectName: 'Web_BLS_Heartbeat_Server',
url: 'redis://10.8.8.109:6379',
apiBaseUrl: `http://127.0.0.1:${env.PORT ?? 3001}`,
heartbeatIntervalMs: 3000,
heartbeatTtlSeconds: 30,
consoleMaxLen: null,
},
// Kafka配置
kafka: {
brokers: ['localhost:9092'], // Kafka集群地址
groupId: 'bls-heartbeat-consumer', // 消费者组ID
topic: 'bls-heartbeat', // 心跳消息主题
autoCommit: true, // 自动提交偏移量
autoCommitIntervalMs: 5000, // 自动提交间隔
retryAttempts: 3, // 重试次数
retryDelay: 1000 // 重试延迟
brokers: envList(env.KAFKA_BROKERS).length ? envList(env.KAFKA_BROKERS) : ['kafka.blv-oa.com:9092'],
clientId: env.KAFKA_CLIENT_ID ?? 'bls-heartbeat',
groupId: env.KAFKA_GROUP_ID ?? 'bls-heartbeat-consumer',
topics: envList(env.KAFKA_TOPICS).length ? envList(env.KAFKA_TOPICS) : ['blwlog4Nodejs-rcu-heartbeat-topic'],
autoCommit: (env.KAFKA_AUTO_COMMIT ?? 'true') === 'true',
autoCommitIntervalMs: Number(env.KAFKA_AUTO_COMMIT_INTERVAL_MS ?? 5000),
retryAttempts: 3,
retryDelay: 1000,
saslEnabled: (env.KAFKA_SASL_ENABLED ?? 'false') === 'true',
saslMechanism: env.KAFKA_SASL_MECHANISM ?? 'plain',
saslUsername: env.KAFKA_SASL_USERNAME,
saslPassword: env.KAFKA_SASL_PASSWORD,
sslEnabled: (env.KAFKA_SSL_ENABLED ?? 'false') === 'true',
},
// 处理器配置
@@ -21,13 +45,13 @@ export default {
// 数据库配置
db: {
host: '10.8.8.109', // 数据库主机
port: 5433, // 数据库端口
user: 'log_admin', // 数据库用户名
password: 'YourActualStrongPasswordForPostgres!', // 数据库密码
database: 'log_platform', // 数据库名称
maxConnections: 10, // 最大连接数
idleTimeoutMillis: 30000, // 连接空闲超时时间
host: env.POSTGRES_HOST ?? '10.8.8.109',
port: Number(env.POSTGRES_PORT ?? 5433),
user: env.POSTGRES_USER ?? 'log_admin',
password: env.POSTGRES_PASSWORD ?? 'YourActualStrongPasswordForPostgres!',
database: env.POSTGRES_DATABASE ?? 'log_platform',
maxConnections: Number(env.POSTGRES_MAX_CONNECTIONS ?? 6),
idleTimeoutMillis: Number(env.POSTGRES_IDLE_TIMEOUT_MS ?? 30000),
retryAttempts: 3, // 重试次数
retryDelay: 1000, // 重试延迟
@@ -41,13 +65,13 @@ export default {
// 日志配置
logger: {
level: 'info', // 日志级别
level: env.LOG_LEVEL ?? 'info',
format: 'json' // 日志格式
},
// 应用配置
app: {
port: 3000, // 应用端口
env: 'development' // 运行环境
port: Number(env.PORT ?? 3001),
env: env.NODE_ENV ?? 'development'
}
};
};

View File

@@ -83,9 +83,9 @@ class DatabaseManager {
ts_ms bigint NOT NULL,
hotel_id int2 NOT NULL,
room_id int4 NOT NULL,
room_id varchar(50) NOT NULL,
device_id varchar(64) NOT NULL,
ip inet NOT NULL,
ip varchar(21) NOT NULL,
power_state int2 NOT NULL,
guest_type int2 NOT NULL,
cardless_state int2 NOT NULL,
@@ -93,7 +93,7 @@ class DatabaseManager {
pms_state int2 NOT NULL,
carbon_state int2 NOT NULL,
device_count int2 NOT NULL,
comm_seq int2 NOT NULL,
comm_seq int4 NOT NULL,
extra jsonb,
@@ -101,14 +101,14 @@ class DatabaseManager {
CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0),
CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767),
CONSTRAINT chk_room_id_range CHECK (room_id >= 0),
CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50),
CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767),
CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767),
CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767),
CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767),
CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767),
CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767),
CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0 AND comm_seq <= 32767)
CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0)
)
PARTITION BY RANGE (ts_ms);
@@ -194,6 +194,8 @@ class DatabaseManager {
await this.pool.query(legacyTableQuery);
await this.pool.query(v2SchemaQuery);
await this.ensureIpColumnVarchar();
await this.ensureRoomIdColumnVarchar();
console.log('数据库表初始化成功');
} catch (error) {
console.error('数据库表初始化失败:', error);
@@ -201,6 +203,116 @@ class DatabaseManager {
}
}
async ensureRoomIdColumnVarchar() {
const res = await this.pool.query(
`
SELECT format_type(a.atttypid, a.atttypmod) AS type
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat'
AND c.relname = 'heartbeat_events'
AND a.attname = 'room_id'
AND a.attnum > 0
AND NOT a.attisdropped
`
);
const type = String(res?.rows?.[0]?.type ?? '').toLowerCase();
if (!type) return;
if (type.startsWith('character varying')) return;
await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range');
await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len');
await this.pool.query(
`ALTER TABLE heartbeat.heartbeat_events
ALTER COLUMN room_id TYPE varchar(50)
USING room_id::text`
);
await this.pool.query(
'ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50)'
);
const parts = await this.pool.query(
`
SELECT c.relname AS partition
FROM pg_inherits i
JOIN pg_class c ON c.oid = i.inhrelid
JOIN pg_class p ON p.oid = i.inhparent
JOIN pg_namespace n ON n.oid = p.relnamespace
WHERE n.nspname = 'heartbeat'
AND p.relname = 'heartbeat_events'
ORDER BY c.relname
`
);
for (const row of parts.rows ?? []) {
const name = row?.partition;
if (!name) continue;
await this.pool.query(
`ALTER TABLE heartbeat.${this.escapeIdentifier(name)}
ALTER COLUMN room_id TYPE varchar(50)
USING room_id::text`
);
}
}
async ensureIpColumnVarchar() {
const res = await this.pool.query(
`
SELECT format_type(a.atttypid, a.atttypmod) AS type
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat'
AND c.relname = 'heartbeat_events'
AND a.attname = 'ip'
AND a.attnum > 0
AND NOT a.attisdropped
`
);
const type = String(res?.rows?.[0]?.type ?? '').toLowerCase();
if (!type) return;
if (type.startsWith('character varying')) return;
if (!type.startsWith('inet')) return;
await this.pool.query(
`ALTER TABLE heartbeat.heartbeat_events
ALTER COLUMN ip TYPE varchar(21)
USING ip::text`
);
const parts = await this.pool.query(
`
SELECT c.relname AS partition
FROM pg_inherits i
JOIN pg_class c ON c.oid = i.inhrelid
JOIN pg_class p ON p.oid = i.inhparent
JOIN pg_namespace n ON n.oid = p.relnamespace
WHERE n.nspname = 'heartbeat'
AND p.relname = 'heartbeat_events'
ORDER BY c.relname
`
);
for (const row of parts.rows ?? []) {
const name = row?.partition;
if (!name) continue;
await this.pool.query(
`ALTER TABLE heartbeat.${this.escapeIdentifier(name)}
ALTER COLUMN ip TYPE varchar(21)
USING ip::text`
);
}
}
escapeIdentifier(id) {
return `"${String(id).replace(/"/g, '""')}"`;
}
getPartitionConfig() {
const cfg = this.config.partitionMaintenance ?? {};
return {
@@ -218,7 +330,7 @@ class DatabaseManager {
const startOffset = Number(startDayOffset ?? 0);
const endOffset = Number(endDayOffset ?? 0);
await this.pool.query(
'SELECT heartbeat.ensure_partitions(current_date + $1::int, current_date + $2::int)',
"SELECT heartbeat.ensure_partitions(((now() AT TIME ZONE 'Asia/Shanghai')::date) + $1::int, ((now() AT TIME ZONE 'Asia/Shanghai')::date) + $2::int)",
[startOffset, endOffset]
);
}
@@ -245,9 +357,6 @@ class DatabaseManager {
console.error('[db] 分区预创建维护失败:', err);
}
}, intervalMs);
// 不阻止进程退出
this.partitionMaintenanceTimer.unref?.();
}
stopPartitionMaintenance() {
@@ -289,11 +398,6 @@ class DatabaseManager {
}
if (events.length === 0) return;
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
if (tsValues.length > 0) {
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
}
const columns = [
'ts_ms',
'hotel_id',
@@ -338,21 +442,59 @@ class DatabaseManager {
const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`;
try {
await this.pool.query(sql, values);
} catch (error) {
// 兜底:若仍因缺分区失败,尝试确保“当前到未来 N 天”后重试一次
if (this.isMissingPartitionError(error)) {
console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次');
await this.ensurePartitionsForRange({
startDayOffset: -7,
endDayOffset: this.getPartitionFutureDays(),
});
await this.pool.query(sql, values);
return;
const runInsertOnce = async () => {
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
if (tsValues.length > 0) {
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
}
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const res = await client.query(sql, values);
const insertedCount = Number(res?.rowCount ?? 0);
if (insertedCount !== events.length) {
throw new Error(`insert rowCount mismatch: expect=${events.length} actual=${insertedCount}`);
}
await client.query('COMMIT');
return { insertedCount };
} catch (error) {
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
console.error('[db] rollback failed:', rollbackError);
}
throw error;
} finally {
client.release();
}
};
const retryAttempts = Number(this.config?.retryAttempts ?? 0);
const retryDelay = Math.max(250, Number(this.config?.retryDelay ?? 1000));
const maxAttempts = retryAttempts > 0 ? retryAttempts : 1;
let lastError = null;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
return await runInsertOnce();
} catch (error) {
lastError = error;
if (this.isMissingPartitionError(error)) {
console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次');
await this.ensurePartitionsForRange({
startDayOffset: -7,
endDayOffset: this.getPartitionFutureDays(),
});
}
if (attempt < maxAttempts) {
await new Promise((r) => setTimeout(r, retryDelay));
continue;
}
}
throw error;
}
throw lastError;
}
async insertHeartbeatData(data) {
@@ -381,8 +523,9 @@ class DatabaseManager {
values: values.flat()
};
await this.pool.query(query);
const res = await this.pool.query(query);
console.log(`成功插入 ${data.length} 条心跳数据`);
return { insertedCount: Number(res?.rowCount ?? data.length) };
} catch (error) {
console.error('插入心跳数据失败:', error);
throw error;
@@ -430,4 +573,4 @@ class DatabaseManager {
}
}
export { DatabaseManager };
export { DatabaseManager };

View File

@@ -3,6 +3,7 @@ import config from './config/config.js';
import { KafkaConsumer } from './kafka/consumer.js';
import { HeartbeatProcessor } from './processor/heartbeatProcessor.js';
import { DatabaseManager } from './db/databaseManager.js';
import { RedisIntegration } from './redis/redisIntegration.js';
class WebBLSHeartbeatServer {
constructor() {
@@ -10,48 +11,69 @@ class WebBLSHeartbeatServer {
this.kafkaConsumer = null;
this.heartbeatProcessor = null;
this.databaseManager = null;
this.redis = null;
this.consumers = null;
}
async start() {
try {
// 初始化 Redis按协议写入心跳与控制台日志
this.redis = new RedisIntegration(this.config.redis);
await this.redis.connect();
this.redis.startHeartbeat();
// 初始化数据库连接
this.databaseManager = new DatabaseManager(this.config.db);
this.databaseManager = new DatabaseManager({ ...this.config.db, maxConnections: 1 });
await this.databaseManager.connect();
console.log('数据库连接成功');
await this.redis?.info('数据库连接成功', { module: 'db' });
// 初始化处理器
this.heartbeatProcessor = new HeartbeatProcessor(
this.config.processor,
this.databaseManager
);
// 初始化处理器(共享批处理队列)
this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager);
// 初始化Kafka消费者
this.kafkaConsumer = new KafkaConsumer(
this.config.kafka,
this.heartbeatProcessor.processMessage.bind(this.heartbeatProcessor)
);
await this.kafkaConsumer.connect();
await this.kafkaConsumer.subscribe();
await this.kafkaConsumer.startConsuming();
console.log('Kafka消费者启动成功');
// 在单进程内启动 N 个消费者实例(与分区数匹配)
const instances = Math.max(1, Number(this.config.kafka?.consumerInstances ?? 1));
this.consumers = [];
for (let i = 0; i < instances; i++) {
const consumer = new KafkaConsumer(
{ ...this.config.kafka, consumerInstanceIndex: i },
this.heartbeatProcessor.processMessage.bind(this.heartbeatProcessor)
);
await consumer.connect();
await consumer.subscribe();
await consumer.startConsuming();
this.consumers.push({ consumer });
}
console.log(`Kafka消费者启动成功${instances} 个实例`);
await this.redis?.info('Kafka消费者启动成功', { module: 'kafka', topic: this.config.kafka?.topic, instances });
console.log('BLS心跳接收端启动成功');
await this.redis?.info('BLS心跳接收端启动成功', { module: 'app' });
} catch (error) {
console.error('启动失败:', error);
await this.redis?.error('启动失败', { module: 'app', error: String(error?.message ?? error) });
process.exit(1);
}
}
async stop() {
try {
if (this.kafkaConsumer) {
await this.kafkaConsumer.stopConsuming();
await this.kafkaConsumer.disconnect();
if (this.consumers && Array.isArray(this.consumers)) {
for (const { consumer } of this.consumers) {
await consumer.stopConsuming();
await consumer.disconnect();
}
this.consumers = null;
}
if (this.databaseManager) {
await this.databaseManager.disconnect();
}
if (this.redis) {
await this.redis.info('BLS心跳接收端已停止', { module: 'app' });
await this.redis.disconnect();
}
console.log('BLS心跳接收端已停止');
} catch (error) {
@@ -75,4 +97,4 @@ process.on('SIGTERM', () => {
process.exit(0);
});
export { WebBLSHeartbeatServer };
export { WebBLSHeartbeatServer };

View File

@@ -1,44 +1,217 @@
// Kafka消费者模块
import kafka from 'kafka-node';
const { ConsumerGroupStream } = kafka;
class KafkaConsumer {
constructor(config, messageHandler) {
this.config = config;
this.messageHandler = messageHandler;
this.consumer = null;
this.consumerGroupStream = null;
this.isRunning = false;
this._reconnectTimer = null;
this._reconnectAttempts = 0;
this._inFlight = new Set();
this._paused = false;
}
async connect() {
// 实现Kafka连接逻辑
// ConsumerGroup 会在创建时建立连接
console.log('连接到Kafka集群:', this.config.brokers);
// TODO: 实现Kafka连接
}
async disconnect() {
// 实现Kafka断开连接逻辑
console.log('断开与Kafka集群的连接');
// TODO: 实现Kafka断开连接
this._clearReconnectTimer();
if (this.consumerGroupStream) {
await new Promise((resolve) => {
this.consumerGroupStream.close(() => resolve());
});
this.consumerGroupStream = null;
}
}
async subscribe() {
// 实现Kafka订阅逻辑
console.log('订阅Kafka主题:', this.config.topic);
// TODO: 实现Kafka订阅
const topics = this.getTopics();
console.log('订阅Kafka主题:', topics.join(', '));
const kafkaHost = Array.isArray(this.config.brokers)
? this.config.brokers.join(',')
: String(this.config.brokers ?? '');
const idx = Number(this.config?.consumerInstanceIndex ?? 0);
const memberId = `${this.config.clientId ?? 'bls-heartbeat'}-${process.pid}-${Number.isFinite(idx) ? idx : 0}`;
const options = {
kafkaHost,
id: memberId,
clientId: memberId,
groupId: this.config.groupId,
protocol: ['range', 'roundrobin'],
autoCommit: false,
autoCommitIntervalMs: this.config.autoCommitIntervalMs,
// 从最新开始(生产环境常见);需要历史消费可改 earliest
fromOffset: 'latest',
encoding: 'buffer',
keyEncoding: 'buffer',
};
if (this.config?.sslEnabled === true) {
options.ssl = true;
}
if (this.config?.saslEnabled === true) {
options.sasl = {
mechanism: this.config?.saslMechanism ?? 'plain',
username: this.config?.saslUsername,
password: this.config?.saslPassword,
};
}
this.consumerGroupStream = new ConsumerGroupStream({ ...options, autoCommit: false }, topics);
this.consumerGroupStream.on('error', (err) => {
console.error('[kafka] consumer error:', err);
if (this.isRunning) {
this._scheduleReconnect('consumer error');
}
});
this.consumerGroupStream.on('connect', () => {
console.log('[kafka] connected');
this._reconnectAttempts = 0;
});
}
async startConsuming() {
// 实现Kafka消息消费逻辑
console.log('开始消费Kafka消息');
if (!this.consumerGroupStream) {
throw new Error('KafkaConsumer 未 subscribe');
}
this.isRunning = true;
// TODO: 实现Kafka消息消费
// 若已挂了重连定时器,说明上一轮失败,先清掉
this._clearReconnectTimer();
this.consumerGroupStream.on('data', (message) => {
if (!this.isRunning) return;
const p = this._handleMessage(message);
this._inFlight.add(p);
this._updateBackpressure();
p.finally(() => {
this._inFlight.delete(p);
this._updateBackpressure();
});
});
}
async stopConsuming() {
// 实现停止Kafka消息消费逻辑
console.log('停止消费Kafka消息');
this.isRunning = false;
// TODO: 实现停止Kafka消息消费
this._clearReconnectTimer();
await Promise.allSettled(Array.from(this._inFlight));
}
getTopics() {
const topics = this.config?.topics;
if (Array.isArray(topics) && topics.length) {
return topics.map((t) => String(t)).filter(Boolean);
}
const topic = this.config?.topic;
return [String(topic ?? '')].filter(Boolean);
}
_getRetryConfig() {
const attempts = Number(this.config?.retryAttempts ?? 0);
const delayMs = Number(this.config?.retryDelay ?? 1000);
return {
// attempts <= 0 表示无限重试
attempts: Number.isFinite(attempts) ? attempts : 0,
delayMs: Number.isFinite(delayMs) && delayMs > 0 ? delayMs : 1000,
};
}
_clearReconnectTimer() {
if (this._reconnectTimer) {
clearTimeout(this._reconnectTimer);
this._reconnectTimer = null;
}
}
async _handleMessage(message) {
try {
await this.messageHandler(message);
await this._commitMessage(message);
} catch (err) {
console.error('[kafka] messageHandler failed:', err);
}
}
async _commitMessage(message) {
if (!this.consumerGroupStream) return;
await new Promise((resolve, reject) => {
this.consumerGroupStream.commit(message, false, (err) => {
if (err) return reject(err);
resolve();
});
}).catch((err) => {
console.error('[kafka] commit failed:', err);
});
}
_updateBackpressure() {
if (!this.consumerGroupStream) return;
const max = Number(this.config?.maxInFlightMessages ?? 0);
if (!Number.isFinite(max) || max <= 0) return;
const shouldPause = this._inFlight.size >= max;
if (shouldPause && !this._paused) {
this.consumerGroupStream.pause();
this._paused = true;
console.warn(`[kafka] paused: inFlight=${this._inFlight.size} max=${max}`);
return;
}
if (!shouldPause && this._paused) {
this.consumerGroupStream.resume();
this._paused = false;
console.warn(`[kafka] resumed: inFlight=${this._inFlight.size} max=${max}`);
}
}
_scheduleReconnect(reason) {
this._clearReconnectTimer();
const { attempts, delayMs } = this._getRetryConfig();
this._reconnectAttempts += 1;
if (attempts > 0 && this._reconnectAttempts > attempts) {
console.error(`[kafka] reached max reconnect attempts (${attempts}); stop reconnecting`);
return;
}
const wait = Math.min(delayMs * this._reconnectAttempts, 30_000);
console.warn(`[kafka] scheduling reconnect in ${wait}ms (attempt ${this._reconnectAttempts}) reason=${reason}`);
// 不 unrefKafka 不可用时也要保持进程存活并持续重连
this._reconnectTimer = setTimeout(async () => {
if (!this.isRunning) return;
try {
await this.disconnect();
} catch (err) {
console.error('[kafka] disconnect during reconnect failed:', err);
}
try {
await this.subscribe();
await this.startConsuming();
} catch (err) {
console.error('[kafka] reconnect failed:', err);
this._scheduleReconnect('reconnect failed');
}
}, wait);
}
}
export { KafkaConsumer };
export { KafkaConsumer };

View File

@@ -1,90 +1,462 @@
// 心跳处理器模块
import { brotliDecompressSync, gunzipSync, inflateRawSync, inflateSync } from 'node:zlib';
class HeartbeatProcessor {
constructor(config, databaseManager) {
this.config = config;
this.databaseManager = databaseManager;
this.batchQueue = [];
this.batchMessageQueue = [];
this.batchTimer = null;
this._batchInFlight = false;
}
async processMessage(message) {
try {
// 解包心跳消息
const unpackedData = this.unpackMessage(message);
const deferred = this.createDeferred();
// 解包心跳消息
const unpackedData = this.unpackMessage(message);
// 支持批量上报message.value 可能是 JSON 数组
const items = Array.isArray(unpackedData) ? unpackedData : [unpackedData];
let addedCount = 0;
for (const item of items) {
const effective = this.unwrapPayload(item);
// 验证心跳数据
const isValid = this.validateData(unpackedData);
const isValid = this.validateData(effective);
if (!isValid) {
console.error('无效的心跳数据:', unpackedData);
return;
console.error('无效的心跳数据:', effective);
continue;
}
// 转换数据格式
const transformedData = this.transformData(unpackedData);
const transformedData = this.transformData(effective);
// 添加到批量队列
this.batchQueue.push(transformedData);
// 检查是否需要立即处理
if (this.batchQueue.length >= this.config.batchSize) {
await this.processBatch();
} else if (!this.batchTimer) {
// 设置批量处理定时器
this.batchTimer = setTimeout(
() => this.processBatch(),
this.config.batchTimeout
);
}
} catch (error) {
console.error('处理消息失败:', error);
addedCount += 1;
}
if (addedCount === 0) {
deferred.resolve({ insertedCount: 0 });
return deferred.promise;
}
this.batchMessageQueue.push({ deferred, eventCount: addedCount, message });
if (this.shouldFlushNow()) {
this.processBatch();
return deferred.promise;
}
if (!this.batchTimer) {
this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout);
}
return deferred.promise;
}
unpackMessage(message) {
// 实现心跳消息解包逻辑
console.log('解包心跳消息:', message);
// TODO: 实现消息解包
return {};
// kafka-node message: { value: Buffer|string, key: Buffer|string, ... }
const raw = message?.value;
const obj = this.decodeToObject(raw);
return obj;
}
unwrapPayload(obj) {
if (!obj || typeof obj !== 'object') return obj;
// 常见的包装结构:{ data: {...} } / { payload: {...} } / { body: {...} }
const candidates = ['data', 'payload', 'body', 'message'];
for (const k of candidates) {
const v = obj[k];
if (v && typeof v === 'object') return v;
}
return obj;
}
validateData(data) {
// 实现心跳数据验证逻辑
console.log('验证心跳数据:', data);
// TODO: 实现数据验证
if (!data || typeof data !== 'object') return false;
// v2 必填字段校验(宽松:允许上游使用 camelCase/PascalCase
const normalized = this.normalizeHeartbeat(data);
const required = [
'ts_ms',
'hotel_id',
'room_id',
'device_id',
'ip',
'power_state',
'guest_type',
'cardless_state',
'service_mask',
'pms_state',
'carbon_state',
'device_count',
'comm_seq',
];
for (const k of required) {
if (normalized[k] === undefined || normalized[k] === null) {
return false;
}
}
const isDigits = (v) => typeof v === 'string' && /^-?\d+$/.test(v);
const isFiniteNumber = (v) => typeof v === 'number' && Number.isFinite(v);
if (!isFiniteNumber(normalized.hotel_id)) return false;
if (!isFiniteNumber(normalized.power_state)) return false;
if (!isFiniteNumber(normalized.guest_type)) return false;
if (!isFiniteNumber(normalized.cardless_state)) return false;
if (!isFiniteNumber(normalized.pms_state)) return false;
if (!isFiniteNumber(normalized.carbon_state)) return false;
if (!isFiniteNumber(normalized.device_count)) return false;
if (!isFiniteNumber(normalized.comm_seq)) return false;
if (!(isFiniteNumber(normalized.ts_ms) || isDigits(normalized.ts_ms))) return false;
if (!(isFiniteNumber(normalized.service_mask) || isDigits(normalized.service_mask))) return false;
if (typeof normalized.device_id !== 'string' || normalized.device_id.length === 0) return false;
if (typeof normalized.room_id !== 'string' || normalized.room_id.length === 0 || normalized.room_id.length > 50) {
return false;
}
if (typeof normalized.ip !== 'string' || normalized.ip.length === 0) return false;
return true;
}
transformData(data) {
// 实现心跳数据转换逻辑
console.log('转换心跳数据:', data);
// TODO: 实现数据转换
return data;
return this.normalizeHeartbeat(data);
}
async processBatch() {
if (this.batchQueue.length === 0) {
return;
}
// 清除定时器
if (this._batchInFlight) return;
if (this.batchQueue.length === 0) return;
if (this.batchMessageQueue.length === 0) return;
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
this._batchInFlight = true;
let hasMore = false;
try {
// 获取当前批次数据
const batchData = [...this.batchQueue];
this.batchQueue = [];
// 写入数据库
await this.databaseManager.insertHeartbeatData(batchData);
const { batchEventCount, batchMessageCount } = this.computeNextBatchWindow();
const batchData = this.batchQueue.slice(0, batchEventCount);
const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount);
let insertedCount = 0;
if (typeof this.databaseManager.insertHeartbeatEvents === 'function') {
const result = await this.databaseManager.insertHeartbeatEvents(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0);
} else {
const result = await this.databaseManager.insertHeartbeatData(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0);
}
if (insertedCount !== batchData.length) {
throw new Error(`落库结果校验失败expect=${batchData.length} actual=${insertedCount}`);
}
this.batchQueue.splice(0, batchEventCount);
this.batchMessageQueue.splice(0, batchMessageCount);
for (const entry of batchMessages) {
entry.deferred.resolve({ insertedCount: entry.eventCount });
}
console.log(`成功处理批次数据,共 ${batchData.length}`);
hasMore = this.batchQueue.length > 0;
} catch (error) {
console.error('批量处理失败:', error);
if (!this.batchTimer) {
const retryDelay = Math.max(250, Number(this.config.batchTimeout ?? 1000));
this.batchTimer = setTimeout(() => this.processBatch(), retryDelay);
}
} finally {
this._batchInFlight = false;
if (hasMore && this.shouldFlushNow()) {
setImmediate(() => this.processBatch());
} else if (!this.batchTimer && this.batchQueue.length > 0) {
this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout);
}
}
}
shouldFlushNow() {
const max = Math.max(1, Number(this.config.batchSize ?? 1));
return this.batchQueue.length >= max;
}
computeNextBatchWindow() {
const maxEvents = Math.max(1, Number(this.config.batchSize ?? 1));
let batchEventCount = 0;
let batchMessageCount = 0;
for (const entry of this.batchMessageQueue) {
const cnt = Math.max(0, Number(entry?.eventCount ?? 0));
if (batchMessageCount === 0 && cnt > maxEvents) {
batchEventCount = cnt;
batchMessageCount = 1;
break;
}
if (batchEventCount + cnt > maxEvents) break;
batchEventCount += cnt;
batchMessageCount += 1;
}
if (batchMessageCount === 0) {
const first = this.batchMessageQueue[0];
batchEventCount = Math.max(0, Number(first?.eventCount ?? 0));
batchMessageCount = 1;
}
return { batchEventCount, batchMessageCount };
}
createDeferred() {
let resolve = null;
let reject = null;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
decodeToObject(input) {
let buf = this.toBuffer(input);
// 最多尝试 3 轮兼容“2层压缩”+base64
for (let i = 0; i < 3; i++) {
// 1) 直接当 UTF-8 文本
const text = this.tryDecodeUtf8(buf);
if (text) {
const trimmed = text.trim();
if (trimmed.startsWith('{') || trimmed.startsWith('[')) {
return JSON.parse(trimmed);
}
// 2) base64有些上游会把二进制压缩结果再 base64
if (this.looksLikeBase64(trimmed)) {
try {
buf = Buffer.from(trimmed, 'base64');
continue;
} catch {
// ignore
}
}
}
// 3) gzip / deflate
const decompressed = this.tryDecompress(buf);
if (decompressed) {
buf = decompressed;
continue;
}
break;
}
// 最后再试一次 JSON.parse
const finalText = this.tryDecodeUtf8(buf);
if (finalText) {
return JSON.parse(finalText);
}
throw new Error('无法解码 Kafka message.value 为 JSON 对象');
}
toBuffer(input) {
if (Buffer.isBuffer(input)) return input;
if (input === undefined || input === null) return Buffer.from('');
if (typeof input === 'string') return Buffer.from(input, 'utf8');
// kafka-node 在 encoding=buffer 时通常给 Buffer兜底
return Buffer.from(String(input), 'utf8');
}
tryDecodeUtf8(buf) {
if (!buf || buf.length === 0) return null;
try {
const text = buf.toString('utf8');
let nonPrintableCount = 0;
for (let i = 0; i < text.length; i++) {
const code = text.charCodeAt(i);
const allowed =
code === 0x09 ||
code === 0x0a ||
code === 0x0d ||
(code >= 0x20 && code <= 0x7e) ||
(code >= 0x4e00 && code <= 0x9fff);
if (!allowed) {
nonPrintableCount++;
}
}
if (nonPrintableCount > Math.max(8, text.length * 0.2)) {
return null;
}
return text;
} catch {
return null;
}
}
looksLikeBase64(str) {
if (!str || str.length < 16) return false;
const s = str.replace(/\s+/g, '');
if (s.length % 4 !== 0) return false;
return /^[A-Za-z0-9+/=]+$/.test(s);
}
tryDecompress(buf) {
if (!buf || buf.length < 2) return null;
// gzip magic: 1f 8b
if (buf[0] === 0x1f && buf[1] === 0x8b) {
try {
return gunzipSync(buf);
} catch {
return null;
}
}
// zlib/deflate 常见头0x78 0x01/0x9c/0xda
if (buf[0] === 0x78) {
try {
return inflateSync(buf);
} catch {
return null;
}
}
// .NET DeflateStream 常见为 raw deflate无 zlib 头)
try {
const out = inflateRawSync(buf);
// 粗略判定:解出来应当是可读文本或至少有长度
if (out && out.length > 0) return out;
} catch {
// ignore
}
// brotli若上游用 br 压缩)
try {
const out = brotliDecompressSync(buf);
if (out && out.length > 0) return out;
} catch {
// ignore
}
return null;
}
normalizeHeartbeat(obj) {
// 支持 snake_case / camelCase / PascalCase
const pick = (keys) => {
for (const k of keys) {
if (obj[k] !== undefined && obj[k] !== null) return obj[k];
}
return undefined;
};
const normalized = {
ts_ms: pick(['ts_ms', 'tsMs', 'TsMs', 'timestampMs', 'TimestampMs', 'timestamp', 'Timestamp', 'ts', 'Ts']),
hotel_id: pick(['hotel_id', 'hotelId', 'HotelId']),
room_id: pick(['room_id', 'roomId', 'RoomId']),
device_id: pick(['device_id', 'deviceId', 'DeviceId', 'device', 'Device']),
ip: pick(['ip', 'Ip', 'IP']),
power_state: pick(['power_state', 'powerState', 'PowerState']),
guest_type: pick(['guest_type', 'guestType', 'GuestType']),
cardless_state: pick(['cardless_state', 'cardlessState', 'CardlessState']),
service_mask: pick(['service_mask', 'serviceMask', 'ServiceMask']),
pms_state: pick(['pms_state', 'pmsState', 'PmsState']),
carbon_state: pick(['carbon_state', 'carbonState', 'CarbonState']),
device_count: pick(['device_count', 'deviceCount', 'DeviceCount']),
comm_seq: pick(['comm_seq', 'commSeq', 'CommSeq']),
extra: pick(['extra', 'Extra']),
};
const toTrimmedStringOrUndefined = (v) => {
if (v === undefined || v === null) return v;
const s = String(v).trim();
return s.length === 0 ? undefined : s;
};
const toIntOrUndefined = (v) => {
if (v === undefined || v === null) return v;
if (typeof v === 'number') {
if (!Number.isFinite(v)) return undefined;
return Math.trunc(v);
}
const s = String(v).trim();
if (s.length === 0) return undefined;
if (!/^-?\d+$/.test(s)) return undefined;
const n = Number(s);
if (!Number.isFinite(n)) return undefined;
return Math.trunc(n);
};
const toBigintParamOrUndefined = (v) => {
if (v === undefined || v === null) return v;
if (typeof v === 'number') {
if (!Number.isFinite(v)) return undefined;
const n = Math.trunc(v);
return Number.isSafeInteger(n) ? n : String(n);
}
const s = String(v).trim();
if (s.length === 0) return undefined;
if (!/^-?\d+$/.test(s)) return undefined;
return s;
};
normalized.ts_ms = toBigintParamOrUndefined(normalized.ts_ms);
normalized.hotel_id = toIntOrUndefined(normalized.hotel_id);
normalized.room_id = toTrimmedStringOrUndefined(normalized.room_id);
normalized.device_id = toTrimmedStringOrUndefined(normalized.device_id);
normalized.ip = toTrimmedStringOrUndefined(normalized.ip);
normalized.power_state = toIntOrUndefined(normalized.power_state);
normalized.guest_type = toIntOrUndefined(normalized.guest_type);
normalized.cardless_state = toIntOrUndefined(normalized.cardless_state);
normalized.service_mask = toBigintParamOrUndefined(normalized.service_mask);
normalized.pms_state = toIntOrUndefined(normalized.pms_state);
normalized.carbon_state = toIntOrUndefined(normalized.carbon_state);
normalized.device_count = toIntOrUndefined(normalized.device_count);
normalized.comm_seq = toIntOrUndefined(normalized.comm_seq);
// 其余未知字段塞进 extra避免丢信息但不覆盖显式 extra
if (!normalized.extra || typeof normalized.extra !== 'object') {
normalized.extra = {};
}
for (const [k, v] of Object.entries(obj)) {
if (
[
'ts_ms','tsMs','TsMs','timestampMs','TimestampMs','timestamp','Timestamp','ts','Ts',
'hotel_id','hotelId','HotelId',
'room_id','roomId','RoomId',
'device_id','deviceId','DeviceId','device','Device',
'ip','Ip','IP',
'power_state','powerState','PowerState',
'guest_type','guestType','GuestType',
'cardless_state','cardlessState','CardlessState',
'service_mask','serviceMask','ServiceMask',
'pms_state','pmsState','PmsState',
'carbon_state','carbonState','CarbonState',
'device_count','deviceCount','DeviceCount',
'comm_seq','commSeq','CommSeq',
'extra','Extra'
].includes(k)
) {
continue;
}
normalized.extra[k] = v;
}
return normalized;
}
}
export { HeartbeatProcessor };
export { HeartbeatProcessor };

View File

@@ -0,0 +1,263 @@
import { createClient } from 'redis';
class RedisIntegration {
constructor(config) {
this.config = config;
this.client = null;
this.heartbeatTimer = null;
this._connectPromise = null;
this._lastErrorLogAt = 0;
}
isEnabled() {
return Boolean(this.config?.enabled);
}
getProjectName() {
const projectName = this.config?.projectName;
if (!projectName || typeof projectName !== 'string') {
throw new Error('Redis projectName 未配置');
}
return projectName;
}
getHeartbeatKey() {
return `${this.getProjectName()}_项目心跳`;
}
getConsoleKey() {
return `${this.getProjectName()}_项目控制台`;
}
getApiBaseUrl() {
const apiBaseUrl = this.config?.apiBaseUrl;
if (!apiBaseUrl || typeof apiBaseUrl !== 'string') {
throw new Error('Redis apiBaseUrl 未配置');
}
return apiBaseUrl;
}
getHeartbeatIntervalMs() {
const ms = Number(this.config?.heartbeatIntervalMs ?? 3000);
return Number.isFinite(ms) && ms > 0 ? ms : 3000;
}
getHeartbeatTtlSeconds() {
const ttl = this.config?.heartbeatTtlSeconds;
if (ttl === undefined || ttl === null) return null;
const n = Number(ttl);
return Number.isFinite(n) && n > 0 ? Math.floor(n) : null;
}
getConsoleMaxLen() {
const v = this.config?.consoleMaxLen;
if (v === undefined || v === null) return null;
const n = Number(v);
return Number.isFinite(n) && n > 0 ? Math.floor(n) : null;
}
async connect() {
if (!this.isEnabled()) {
console.log('[redis] disabled');
return;
}
if (this.client) {
// 已创建 client则后台保证连接
this.ensureConnectedInBackground();
return;
}
const url = this.config?.url;
const host = this.config?.host;
const port = this.config?.port;
const password = this.config?.password;
const database = this.config?.db;
const connectTimeout = this.config?.connectTimeoutMs;
const socket = this.config?.socket;
const reconnectStrategy =
socket?.reconnectStrategy ??
((retries) => Math.min(1000 + retries * 500, 10_000));
const clientOptions = {};
if (url) {
clientOptions.url = url;
}
clientOptions.socket =
socket ??
{
host,
port,
connectTimeout,
reconnectStrategy,
};
if (typeof password === 'string' && password.length > 0) {
clientOptions.password = password;
}
if (database !== undefined && database !== null) {
const n = Number(database);
if (Number.isFinite(n) && n >= 0) {
clientOptions.database = Math.floor(n);
}
}
this.client = createClient(clientOptions);
this.client.on('error', (err) => {
const now = Date.now();
// 节流:最多每 10 秒打一条,避免长期无人值守刷屏
if (now - this._lastErrorLogAt > 10_000) {
this._lastErrorLogAt = now;
console.error('[redis] client error:', err);
}
});
// 不要 await避免 Redis 短暂不可用导致主服务启动失败
this.ensureConnectedInBackground();
}
ensureConnectedInBackground() {
if (!this.isEnabled()) return;
if (!this.client) return;
if (this.client.isReady) {
return;
}
if (this._connectPromise) {
return;
}
this._connectPromise = this.client
.connect()
.then(() => {
console.log('[redis] connected');
})
.catch((err) => {
// connect 失败不抛出到上层;依赖 redis 内建重连策略或下次调用再触发
const now = Date.now();
if (now - this._lastErrorLogAt > 10_000) {
this._lastErrorLogAt = now;
console.error('[redis] connect failed:', err);
}
})
.finally(() => {
this._connectPromise = null;
});
}
async disconnect() {
this.stopHeartbeat();
if (!this.client) {
return;
}
try {
await this.client.quit();
} finally {
this.client = null;
this._connectPromise = null;
console.log('[redis] disconnected');
}
}
async writeHeartbeat() {
if (!this.isEnabled()) return;
if (!this.client || !this.client.isReady) return;
const payload = {
apiBaseUrl: this.getApiBaseUrl(),
lastActiveAt: Date.now(),
};
const key = this.getHeartbeatKey();
const value = JSON.stringify(payload);
const ttl = this.getHeartbeatTtlSeconds();
if (ttl) {
await this.client.set(key, value, { EX: ttl });
} else {
await this.client.set(key, value);
}
}
startHeartbeat() {
if (!this.isEnabled()) return;
if (this.heartbeatTimer) return;
const intervalMs = this.getHeartbeatIntervalMs();
// 立即写一次,随后按间隔写
this.writeHeartbeat().catch((err) => {
console.error('[redis] writeHeartbeat failed:', err);
});
this.heartbeatTimer = setInterval(() => {
this.writeHeartbeat().catch((err) => {
console.error('[redis] writeHeartbeat failed:', err);
});
}, intervalMs);
console.log(`[redis] heartbeat started: every ${intervalMs}ms`);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
async pushConsoleLog({ level, message, metadata }) {
if (!this.isEnabled()) return;
if (!this.client || !this.client.isReady) return;
const entry = {
timestamp: new Date().toISOString(),
level,
message,
metadata: metadata ?? undefined,
};
const value = JSON.stringify(entry);
// 建议 < 64KB超出则丢弃避免 Redis 阻塞/异常
if (Buffer.byteLength(value, 'utf8') > 64 * 1024) {
console.warn('[redis] console log too large; dropped');
return;
}
const key = this.getConsoleKey();
await this.client.rPush(key, value);
const maxLen = this.getConsoleMaxLen();
if (maxLen) {
// 保留最新 maxLen 条
await this.client.lTrim(key, -maxLen, -1);
}
}
info(message, metadata) {
return this.pushConsoleLog({ level: 'info', message, metadata });
}
warn(message, metadata) {
return this.pushConsoleLog({ level: 'warn', message, metadata });
}
error(message, metadata) {
return this.pushConsoleLog({ level: 'error', message, metadata });
}
debug(message, metadata) {
return this.pushConsoleLog({ level: 'debug', message, metadata });
}
}
export { RedisIntegration };