Files
Web_BLS_Heartbeat_Server/src/index.js
XuJiacheng b72cdde8bf feat(processor): 增加数据库连接断开检测与自动恢复机制
- 在 DatabaseManager 中添加 checkConnection 方法用于检测数据库连接状态
- 当数据库连接断开时,自动暂停 Kafka 消费者,防止消息堆积
- 实现每分钟数据库连接检查,连接恢复后自动恢复消费者处理
- 区分数据库连接错误和其他严重错误,连接错误时保留数据等待重试
- 在 .gitignore 中添加 SQL 文件排除
2026-02-06 13:37:46 +08:00

135 lines
4.4 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 项目入口文件
import config from './config/config.js';
import { KafkaConsumer } from './kafka/consumer.js';
import { HeartbeatProcessor } from './processor/heartbeatProcessor.js';
import { DatabaseManager } from './db/databaseManager.js';
import { RedisIntegration } from './redis/redisIntegration.js';
import { StatsCounters, StatsReporter } from './stats/statsManager.js';
class WebBLSHeartbeatServer {
constructor() {
this.config = config;
this.kafkaConsumer = null;
this.heartbeatProcessor = null;
this.databaseManager = null;
this.redis = null;
this.consumers = null;
this.stats = new StatsCounters();
this.statsReporter = null;
}
async start() {
try {
// 初始化 Redis按协议写入心跳与控制台日志
this.redis = new RedisIntegration(this.config.redis);
await this.redis.connect();
this.redis.startHeartbeat();
this.statsReporter = new StatsReporter({ redis: this.redis, stats: this.stats });
this.statsReporter.start();
// 初始化数据库连接
this.databaseManager = new DatabaseManager({ ...this.config.db, maxConnections: 1 });
await this.databaseManager.connect();
console.log('数据库连接成功');
await this.redis?.info('数据库连接成功', { module: 'db' });
// 打印 Kafka 配置摘要,便于排查连接问题
console.log('正在初始化 Kafka 消费者...');
console.log('Kafka 配置:', {
brokers: this.config.kafka?.brokers,
topics: this.config.kafka?.topics,
groupId: this.config.kafka?.groupId,
fromOffset: this.config.kafka?.fromOffset ?? 'latest',
ssl: !!this.config.kafka?.sslEnabled,
sasl: this.config.kafka?.saslEnabled ? `enabled (mechanism: ${this.config.kafka?.saslMechanism})` : 'disabled'
});
// 初始化处理器(共享批处理队列)
this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, {
redis: this.redis,
stats: this.stats,
onDbOffline: () => {
if (this.consumers) {
this.consumers.forEach(c => c.consumer.pause());
}
},
onDbOnline: () => {
if (this.consumers) {
this.consumers.forEach(c => c.consumer.resume());
}
}
});
// 在单进程内启动 N 个消费者实例(与分区数匹配)
const instances = Math.max(1, Number(this.config.kafka?.consumerInstances ?? 1));
this.consumers = [];
for (let i = 0; i < instances; i++) {
const consumer = new KafkaConsumer(
{ ...this.config.kafka, consumerInstanceIndex: i },
this.heartbeatProcessor.processMessage.bind(this.heartbeatProcessor)
);
await consumer.connect();
await consumer.subscribe();
await consumer.startConsuming();
this.consumers.push({ consumer });
}
console.log(`Kafka消费者启动成功${instances} 个实例`);
await this.redis?.info('Kafka消费者启动成功', { module: 'kafka', topic: this.config.kafka?.topic, instances });
console.log('BLS心跳接收端启动成功');
await this.redis?.info('BLS心跳接收端启动成功', { module: 'app' });
} catch (error) {
console.error('启动失败:', error);
await this.redis?.error('启动失败', { module: 'app', error: String(error?.message ?? error) });
process.exit(1);
}
}
async stop() {
try {
if (this.statsReporter) {
this.statsReporter.stop();
this.statsReporter = null;
}
if (this.consumers && Array.isArray(this.consumers)) {
for (const { consumer } of this.consumers) {
await consumer.stopConsuming();
await consumer.disconnect();
}
this.consumers = null;
}
if (this.databaseManager) {
await this.databaseManager.disconnect();
}
if (this.redis) {
await this.redis.info('BLS心跳接收端已停止', { module: 'app' });
await this.redis.disconnect();
}
console.log('BLS心跳接收端已停止');
} catch (error) {
console.error('停止失败:', error);
}
}
}
// 启动服务器
const server = new WebBLSHeartbeatServer();
server.start();
// 处理进程终止信号
process.on('SIGINT', () => {
server.stop();
process.exit(0);
});
process.on('SIGTERM', () => {
server.stop();
process.exit(0);
});
export { WebBLSHeartbeatServer };