feat: 初始化BLS心跳接收端项目

- 添加基础项目结构,包括.gitignore、vite配置和package.json
- 实现Kafka消费者模块框架
- 添加心跳处理器模块框架
- 实现数据库管理模块框架
- 添加OpenSpec规范文档
- 更新README文档说明项目功能和技术栈
This commit is contained in:
2026-01-08 09:16:53 +08:00
parent 24654c4b90
commit adc3bfd87d
19 changed files with 5549 additions and 1 deletions

View File

@@ -0,0 +1,46 @@
// 配置文件示例
// 复制此文件为 config.js 并填写实际配置
export default {
// Kafka配置
kafka: {
brokers: ['localhost:9092'], // Kafka集群地址
groupId: 'bls-heartbeat-consumer', // 消费者组ID
topic: 'bls-heartbeat', // 心跳消息主题
autoCommit: true, // 自动提交偏移量
autoCommitIntervalMs: 5000, // 自动提交间隔
retryAttempts: 3, // 重试次数
retryDelay: 1000 // 重试延迟
},
// 处理器配置
processor: {
batchSize: 100, // 批量处理大小
batchTimeout: 5000 // 批量处理超时时间
},
// 数据库配置
db: {
host: '10.8.8.109', // 数据库主机
port: 5433, // 数据库端口
user: 'log_admin', // 数据库用户名
password: 'YourActualStrongPasswordForPostgres!', // 数据库密码
database: 'log_platform', // 数据库名称
maxConnections: 10, // 最大连接数
idleTimeoutMillis: 30000, // 连接空闲超时时间
retryAttempts: 3, // 重试次数
retryDelay: 1000 // 重试延迟
},
// 日志配置
logger: {
level: 'info', // 日志级别
format: 'json' // 日志格式
},
// 应用配置
app: {
port: 3000, // 应用端口
env: 'development' // 运行环境
}
};

138
src/db/databaseManager.js Normal file
View File

@@ -0,0 +1,138 @@
// 数据库管理器模块
import { Pool } from 'pg';
class DatabaseManager {
constructor(config) {
this.config = config;
this.pool = null;
}
async connect() {
try {
// 创建数据库连接池
this.pool = new Pool(this.config);
// 测试连接
await this.pool.connect();
console.log('数据库连接池创建成功');
// 初始化表结构
await this.initTables();
} catch (error) {
console.error('数据库连接失败:', error);
throw error;
}
}
async disconnect() {
try {
if (this.pool) {
await this.pool.end();
console.log('数据库连接池已关闭');
}
} catch (error) {
console.error('关闭数据库连接池失败:', error);
throw error;
}
}
async initTables() {
try {
const createTableQuery = `
CREATE TABLE IF NOT EXISTS heartbeat (
id SERIAL PRIMARY KEY,
component_id VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON heartbeat(component_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON heartbeat(timestamp);
`;
await this.pool.query(createTableQuery);
console.log('数据库表初始化成功');
} catch (error) {
console.error('数据库表初始化失败:', error);
throw error;
}
}
async insertHeartbeatData(data) {
try {
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0) {
return;
}
// 构建批量插入语句
const values = data.map(item => [
item.component_id,
item.status,
item.timestamp,
item.data
]);
const query = {
text: `
INSERT INTO heartbeat (component_id, status, timestamp, data)
VALUES ${values.map((_, index) => `($${index * 4 + 1}, $${index * 4 + 2}, $${index * 4 + 3}, $${index * 4 + 4})`).join(', ')}
`,
values: values.flat()
};
await this.pool.query(query);
console.log(`成功插入 ${data.length} 条心跳数据`);
} catch (error) {
console.error('插入心跳数据失败:', error);
throw error;
}
}
async getLatestHeartbeat(componentId) {
try {
const query = {
text: `
SELECT * FROM heartbeat
WHERE component_id = $1
ORDER BY timestamp DESC
LIMIT 1
`,
values: [componentId]
};
const result = await this.pool.query(query);
return result.rows[0] || null;
} catch (error) {
console.error('查询最新心跳数据失败:', error);
throw error;
}
}
async getHeartbeatHistory(componentId, startTime, endTime) {
try {
const query = {
text: `
SELECT * FROM heartbeat
WHERE component_id = $1
AND timestamp BETWEEN $2 AND $3
ORDER BY timestamp DESC
`,
values: [componentId, startTime, endTime]
};
const result = await this.pool.query(query);
return result.rows;
} catch (error) {
console.error('查询心跳历史数据失败:', error);
throw error;
}
}
}
export { DatabaseManager };

78
src/index.js Normal file
View File

@@ -0,0 +1,78 @@
// 项目入口文件
import config from './config/config.js';
import { KafkaConsumer } from './kafka/consumer.js';
import { HeartbeatProcessor } from './processor/heartbeatProcessor.js';
import { DatabaseManager } from './db/databaseManager.js';
class WebBLSHeartbeatServer {
constructor() {
this.config = config;
this.kafkaConsumer = null;
this.heartbeatProcessor = null;
this.databaseManager = null;
}
async start() {
try {
// 初始化数据库连接
this.databaseManager = new DatabaseManager(this.config.db);
await this.databaseManager.connect();
console.log('数据库连接成功');
// 初始化处理器
this.heartbeatProcessor = new HeartbeatProcessor(
this.config.processor,
this.databaseManager
);
// 初始化Kafka消费者
this.kafkaConsumer = new KafkaConsumer(
this.config.kafka,
this.heartbeatProcessor.processMessage.bind(this.heartbeatProcessor)
);
await this.kafkaConsumer.connect();
await this.kafkaConsumer.subscribe();
await this.kafkaConsumer.startConsuming();
console.log('Kafka消费者启动成功');
console.log('BLS心跳接收端启动成功');
} catch (error) {
console.error('启动失败:', error);
process.exit(1);
}
}
async stop() {
try {
if (this.kafkaConsumer) {
await this.kafkaConsumer.stopConsuming();
await this.kafkaConsumer.disconnect();
}
if (this.databaseManager) {
await this.databaseManager.disconnect();
}
console.log('BLS心跳接收端已停止');
} catch (error) {
console.error('停止失败:', error);
}
}
}
// 启动服务器
const server = new WebBLSHeartbeatServer();
server.start();
// 处理进程终止信号
process.on('SIGINT', () => {
server.stop();
process.exit(0);
});
process.on('SIGTERM', () => {
server.stop();
process.exit(0);
});
export { WebBLSHeartbeatServer };

44
src/kafka/consumer.js Normal file
View File

@@ -0,0 +1,44 @@
// Kafka消费者模块
class KafkaConsumer {
constructor(config, messageHandler) {
this.config = config;
this.messageHandler = messageHandler;
this.consumer = null;
this.isRunning = false;
}
async connect() {
// 实现Kafka连接逻辑
console.log('连接到Kafka集群:', this.config.brokers);
// TODO: 实现Kafka连接
}
async disconnect() {
// 实现Kafka断开连接逻辑
console.log('断开与Kafka集群的连接');
// TODO: 实现Kafka断开连接
}
async subscribe() {
// 实现Kafka订阅逻辑
console.log('订阅Kafka主题:', this.config.topic);
// TODO: 实现Kafka订阅
}
async startConsuming() {
// 实现Kafka消息消费逻辑
console.log('开始消费Kafka消息');
this.isRunning = true;
// TODO: 实现Kafka消息消费
}
async stopConsuming() {
// 实现停止Kafka消息消费逻辑
console.log('停止消费Kafka消息');
this.isRunning = false;
// TODO: 实现停止Kafka消息消费
}
}
export { KafkaConsumer };

View File

@@ -0,0 +1,90 @@
// 心跳处理器模块
class HeartbeatProcessor {
constructor(config, databaseManager) {
this.config = config;
this.databaseManager = databaseManager;
this.batchQueue = [];
this.batchTimer = null;
}
async processMessage(message) {
try {
// 解包心跳消息
const unpackedData = this.unpackMessage(message);
// 验证心跳数据
const isValid = this.validateData(unpackedData);
if (!isValid) {
console.error('无效的心跳数据:', unpackedData);
return;
}
// 转换数据格式
const transformedData = this.transformData(unpackedData);
// 添加到批量队列
this.batchQueue.push(transformedData);
// 检查是否需要立即处理
if (this.batchQueue.length >= this.config.batchSize) {
await this.processBatch();
} else if (!this.batchTimer) {
// 设置批量处理定时器
this.batchTimer = setTimeout(
() => this.processBatch(),
this.config.batchTimeout
);
}
} catch (error) {
console.error('处理消息失败:', error);
}
}
unpackMessage(message) {
// 实现心跳消息解包逻辑
console.log('解包心跳消息:', message);
// TODO: 实现消息解包
return {};
}
validateData(data) {
// 实现心跳数据验证逻辑
console.log('验证心跳数据:', data);
// TODO: 实现数据验证
return true;
}
transformData(data) {
// 实现心跳数据转换逻辑
console.log('转换心跳数据:', data);
// TODO: 实现数据转换
return data;
}
async processBatch() {
if (this.batchQueue.length === 0) {
return;
}
// 清除定时器
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
try {
// 获取当前批次数据
const batchData = [...this.batchQueue];
this.batchQueue = [];
// 写入数据库
await this.databaseManager.insertHeartbeatData(batchData);
console.log(`成功处理批次数据,共 ${batchData.length}`);
} catch (error) {
console.error('批量处理失败:', error);
}
}
}
export { HeartbeatProcessor };