feat: 增加批量处理和数据库离线恢复机制以提升可靠性

- 新增 BatchProcessor 类实现消息批量插入,提高数据库写入性能
- 在 consumer 中禁用 autoCommit 并实现手动提交,确保数据一致性
- 添加数据库健康检查机制,在数据库离线时暂停消费并自动恢复
- 支持 0x0E 命令字处理,扩展消息类型识别范围
- 增加数据库连接重试逻辑,解决 Windows 环境端口冲突问题
- 更新环境变量配置,优化 Kafka 消费者参数
- 添加相关单元测试验证批量处理和可靠性功能
This commit is contained in:
2026-02-04 20:36:33 +08:00
parent 339db6f95f
commit 680bf6a957
16 changed files with 557 additions and 43 deletions

File diff suppressed because one or more lines are too long

View File

@@ -6,7 +6,9 @@ NODE_ENV=development
KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC=my-topic-name
KAFKA_GROUP_ID=my-group-id
KAFKA_CLIENT_ID=my-client-id
KAFKA_CLIENT_ID=bls-rcu-action-client
KAFKA_AUTO_COMMIT=false
KAFKA_AUTO_COMMIT_INTERVAL_MS=5000
KAFKA_CONSUMER_INSTANCES=1
# KAFKA_SASL_USERNAME=
# KAFKA_SASL_PASSWORD=

View File

@@ -22,7 +22,7 @@ export const config = {
groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group',
clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client',
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 500),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1),
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),

View File

@@ -0,0 +1,69 @@
export class BatchProcessor {
constructor(dbManager, config, options = {}) {
this.dbManager = dbManager;
this.config = config;
this.batchSize = options.batchSize || 500;
this.flushInterval = options.flushInterval || 1000;
this.buffer = [];
this.timer = null;
}
add(item) {
return new Promise((resolve, reject) => {
this.buffer.push({ ...item, resolve, reject });
if (this.buffer.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
});
}
async flush() {
if (this.buffer.length === 0) return;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const currentBatch = [...this.buffer];
this.buffer = [];
const allRows = currentBatch.flatMap(item => item.rows);
if (allRows.length === 0) {
// No rows to insert (e.g. empty messages), just resolve
currentBatch.forEach(item => item.resolve(0));
return;
}
try {
await this.dbManager.insertRows({
schema: this.config.db.schema,
table: this.config.db.table,
rows: allRows
});
// Resolve each item with its own row count
currentBatch.forEach(item => item.resolve(item.rows.length));
} catch (error) {
// Enrich error with DB context if possible (using first item as sample)
error.type = 'DB_ERROR';
const sample = allRows[0];
error.dbContext = {
batchSize: currentBatch.length,
totalRows: allRows.length,
sampleRow: sample ? {
guid: sample.guid,
ts_ms: sample.ts_ms,
action_type: sample.action_type,
cmd_word: sample.cmd_word
} : null
};
// Reject all items in the batch
currentBatch.forEach(item => item.reject(error));
}
}
}

View File

@@ -72,6 +72,15 @@ export class DatabaseManager {
}
}
async testConnection() {
try {
await this.pool.query('SELECT 1');
return true;
} catch (error) {
return false;
}
}
async close() {
await this.pool.end();
}

View File

@@ -40,9 +40,25 @@ class DatabaseInitializer {
ssl: ssl ? { rejectUnauthorized: false } : false
});
const maxRetries = 5;
let retryCount = 0;
while (retryCount < maxRetries) {
try {
await client.connect();
break;
} catch (err) {
if (err.code === 'EADDRINUSE') {
retryCount++;
logger.warn(`Port conflict (EADDRINUSE) connecting to database, retrying (${retryCount}/${maxRetries})...`);
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
throw err;
}
}
}
try {
await client.connect();
const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`,
[database]

View File

@@ -11,6 +11,7 @@ import { RedisIntegration } from './redis/redisIntegration.js';
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
import { MetricCollector } from './utils/metricCollector.js';
import { logger } from './utils/logger.js';
import { BatchProcessor } from './db/batchProcessor.js';
const bootstrap = async () => {
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
@@ -72,6 +73,10 @@ const bootstrap = async () => {
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
const batchProcessor = new BatchProcessor(dbManager, config, {
batchSize: config.kafka.maxInFlight
});
const handleMessage = async (message) => {
if (message.topic) {
metricCollector.increment('kafka_pulled');
@@ -100,7 +105,8 @@ const bootstrap = async () => {
valueLength: typeof messageValue === 'string' ? messageValue.length : null
});
}
const inserted = await processKafkaMessage({ message, dbManager, config });
const rows = await processKafkaMessage({ message });
const inserted = await batchProcessor.add({ rows });
metricCollector.increment('db_inserted');
logger.info('Kafka message processed', { inserted });
} catch (error) {
@@ -157,10 +163,24 @@ const bootstrap = async () => {
}
};
const healthCheck = {
shouldPause: async (error) => {
if (error?.type === 'DB_ERROR') {
const isConnected = await dbManager.testConnection();
return !isConnected;
}
return false;
},
check: async () => {
return await dbManager.testConnection();
}
};
const consumers = createKafkaConsumers({
kafkaConfig: config.kafka,
onMessage: handleMessage,
onError: handleError
onError: handleError,
healthCheck
});
// Start retry worker (non-blocking)

View File

@@ -3,12 +3,13 @@ import { logger } from '../utils/logger.js';
const { ConsumerGroup } = kafka;
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex, healthCheck }) => {
const kafkaHost = kafkaConfig.brokers.join(',');
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50;
let inFlight = 0;
let isPausedForHealth = false;
const consumer = new ConsumerGroup(
{
@@ -19,7 +20,7 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
fromOffset: 'earliest',
protocol: ['roundrobin'],
outOfRangeOffset: 'latest',
autoCommit: true,
autoCommit: false,
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
fetchMinBytes: kafkaConfig.fetchMinBytes,
@@ -30,7 +31,7 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
);
const tryResume = () => {
if (inFlight < maxInFlight) {
if (!isPausedForHealth && inFlight < maxInFlight) {
consumer.resume();
}
};
@@ -40,9 +41,48 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
if (inFlight >= maxInFlight) {
consumer.pause();
}
Promise.resolve(onMessage(message))
.catch((error) => {
return Promise.resolve(onMessage(message))
.then(() => {
consumer.commit((err) => {
if (err) {
logger.error('Kafka commit failed', { error: err.message });
}
});
})
.catch(async (error) => {
logger.error('Kafka message handling failed', { error: error?.message });
let shouldCommit = true;
if (!isPausedForHealth && healthCheck && await healthCheck.shouldPause(error)) {
shouldCommit = false;
isPausedForHealth = true;
consumer.pause();
logger.warn('Pausing consumer due to dependency failure. Entering recovery mode...');
const checkInterval = setInterval(async () => {
try {
const isHealthy = await healthCheck.check();
if (isHealthy) {
clearInterval(checkInterval);
isPausedForHealth = false;
consumer.resume();
logger.info('Dependency recovered. Resuming consumer.');
}
} catch (err) {
logger.error('Health check failed', { error: err.message });
}
}, 60000);
}
if (shouldCommit) {
consumer.commit((err) => {
if (err) {
logger.error('Kafka commit failed (error case)', { error: err.message });
}
});
}
if (onError) {
onError(error, message);
}
@@ -63,13 +103,13 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
return consumer;
};
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError, healthCheck }) => {
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
const count = Math.max(1, instances);
return Array.from({ length: count }, (_, idx) =>
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx, healthCheck })
);
};
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) =>
createKafkaConsumers({ kafkaConfig, onMessage, onError })[0];
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError, healthCheck }) =>
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck })[0];

View File

@@ -32,7 +32,7 @@ const normalizeCmdWord = (value) => {
};
const resolveMessageType = (direction, cmdWord) => {
if (cmdWord === '0x36') {
if (cmdWord === '0x36' || cmdWord === '0x0e') {
return '36上报';
}
if (cmdWord === '0x0f' && direction === '下发') {
@@ -375,11 +375,12 @@ export const buildRowsFromPayload = (rawPayload) => {
return rows;
};
export const processKafkaMessage = async ({ message, dbManager, config }) => {
export const processKafkaMessage = async ({ message }) => {
let rows;
try {
const payload = parseKafkaPayload(message.value);
rows = buildRowsFromPayload(payload);
return rows;
} catch (error) {
error.type = 'PARSE_ERROR';
const rawValue = Buffer.isBuffer(message.value)
@@ -391,27 +392,4 @@ export const processKafkaMessage = async ({ message, dbManager, config }) => {
}
throw error;
}
try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
} catch (error) {
error.type = 'DB_ERROR';
const sample = rows?.[0];
error.dbContext = {
rowsLength: rows?.length || 0,
sampleRow: sample
? {
guid: sample.guid,
ts_ms: sample.ts_ms,
action_type: sample.action_type,
cmd_word: sample.cmd_word,
direction: sample.direction,
device_id: sample.device_id
}
: null
};
throw error;
}
return rows.length;
};

View File

@@ -12,6 +12,9 @@ export const logger = {
info(message, context) {
process.stdout.write(`${format('info', message, context)}\n`);
},
warn(message, context) {
process.stdout.write(`${format('warn', message, context)}\n`);
},
error(message, context) {
process.stderr.write(`${format('error', message, context)}\n`);
}

View File

@@ -0,0 +1,97 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { BatchProcessor } from '../src/db/batchProcessor.js';
describe('BatchProcessor', () => {
let dbManager;
let config;
let batchProcessor;
beforeEach(() => {
vi.useFakeTimers();
dbManager = {
insertRows: vi.fn().mockResolvedValue(true)
};
config = {
db: { schema: 'test_schema', table: 'test_table' }
};
batchProcessor = new BatchProcessor(dbManager, config, { batchSize: 3, flushInterval: 1000 });
});
afterEach(() => {
vi.useRealTimers();
});
it('should buffer items and not flush until batch size is reached', async () => {
const p1 = batchProcessor.add({ rows: ['r1'] });
const p2 = batchProcessor.add({ rows: ['r2'] });
expect(dbManager.insertRows).not.toHaveBeenCalled();
const p3 = batchProcessor.add({ rows: ['r3'] });
// Wait for microtasks
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
rows: ['r1', 'r2', 'r3']
});
await expect(p1).resolves.toBe(1);
await expect(p2).resolves.toBe(1);
await expect(p3).resolves.toBe(1);
});
it('should flush when timer expires', async () => {
const p1 = batchProcessor.add({ rows: ['r1'] });
expect(dbManager.insertRows).not.toHaveBeenCalled();
vi.advanceTimersByTime(1000);
// Wait for microtasks
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
rows: ['r1']
});
await expect(p1).resolves.toBe(1);
});
it('should handle db error and reject all pending promises', async () => {
dbManager.insertRows.mockRejectedValue(new Error('DB Fail'));
const p1 = batchProcessor.add({ rows: ['r1'] });
const p2 = batchProcessor.add({ rows: ['r2'] });
const p3 = batchProcessor.add({ rows: ['r3'] }); // Triggers flush
await expect(p1).rejects.toThrow('DB Fail');
await expect(p2).rejects.toThrow('DB Fail');
await expect(p3).rejects.toThrow('DB Fail');
});
it('should handle mixed batch sizes', async () => {
// 3 items with different row counts
const p1 = batchProcessor.add({ rows: ['r1', 'r2'] });
const p2 = batchProcessor.add({ rows: [] }); // Empty rows
const p3 = batchProcessor.add({ rows: ['r3'] });
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
rows: ['r1', 'r2', 'r3']
});
await expect(p1).resolves.toBe(2);
await expect(p2).resolves.toBe(0);
await expect(p3).resolves.toBe(1);
});
});

View File

@@ -0,0 +1,124 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { createKafkaConsumers } from '../src/kafka/consumer.js';
import kafka from 'kafka-node';
// Mock kafka-node
vi.mock('kafka-node', () => {
return {
ConsumerGroup: vi.fn(),
default: { ConsumerGroup: vi.fn() }
};
});
describe('Consumer Reliability', () => {
let mockConsumer;
let onMessage;
let onError;
let healthCheck;
const kafkaConfig = {
brokers: ['localhost:9092'],
groupId: 'test-group',
clientId: 'test-client',
topic: 'test-topic',
autoCommitIntervalMs: 5000
};
beforeEach(() => {
vi.clearAllMocks();
mockConsumer = {
on: vi.fn(),
commit: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
close: vi.fn()
};
kafka.ConsumerGroup.mockImplementation(function() {
return mockConsumer;
});
onMessage = vi.fn().mockResolvedValue(true);
onError = vi.fn();
healthCheck = {
shouldPause: vi.fn().mockResolvedValue(false),
check: vi.fn().mockResolvedValue(true)
};
});
it('should initialize with autoCommit: false', () => {
createKafkaConsumers({ kafkaConfig, onMessage, onError });
expect(kafka.ConsumerGroup).toHaveBeenCalledWith(
expect.objectContaining({ autoCommit: false }),
expect.anything()
);
});
it('should commit offset after successful message processing', async () => {
createKafkaConsumers({ kafkaConfig, onMessage, onError });
// Simulate 'message' event
const message = { value: 'test' };
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
await messageHandler(message);
expect(onMessage).toHaveBeenCalledWith(message);
expect(mockConsumer.commit).toHaveBeenCalled();
});
it('should NOT commit if processing fails and health check says pause', async () => {
onMessage.mockRejectedValue(new Error('Fail'));
healthCheck.shouldPause.mockResolvedValue(true);
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
await messageHandler({ value: 'test' });
expect(mockConsumer.commit).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalled();
});
it('should commit if processing fails but health check says continue (Data Error)', async () => {
onMessage.mockRejectedValue(new Error('Data Error'));
healthCheck.shouldPause.mockResolvedValue(false); // Do not pause, it's just bad data
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
await messageHandler({ value: 'bad_data' });
expect(mockConsumer.commit).toHaveBeenCalled(); // Should commit to move past bad data
expect(onError).toHaveBeenCalled(); // Should still report error
});
it('should pause and enter recovery mode if healthCheck.shouldPause returns true', async () => {
vi.useFakeTimers();
onMessage.mockRejectedValue(new Error('DB Error'));
healthCheck.shouldPause.mockResolvedValue(true);
healthCheck.check.mockResolvedValueOnce(false).mockResolvedValueOnce(true); // Fail once, then succeed
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
// Trigger error
await messageHandler({ value: 'fail' });
expect(mockConsumer.pause).toHaveBeenCalled();
expect(healthCheck.shouldPause).toHaveBeenCalled();
// Fast-forward time for interval check (1st check - fails)
await vi.advanceTimersByTimeAsync(60000);
expect(healthCheck.check).toHaveBeenCalledTimes(1);
expect(mockConsumer.resume).not.toHaveBeenCalled();
// Fast-forward time for interval check (2nd check - succeeds)
await vi.advanceTimersByTimeAsync(60000);
expect(healthCheck.check).toHaveBeenCalledTimes(2);
expect(mockConsumer.resume).toHaveBeenCalled();
vi.useRealTimers();
});
});

View File

@@ -1,7 +1,19 @@
import { describe, it, expect } from 'vitest';
import { describe, it, expect, vi } from 'vitest';
import { buildRowsFromPayload } from '../src/processor/index.js';
import projectMetadata from '../src/cache/projectMetadata.js';
// Mock config to ensure loop name generation is enabled
vi.mock('../src/config/config.js', async (importOriginal) => {
const actual = await importOriginal();
return {
...actual,
config: {
...actual.config,
enableLoopNameAutoGeneration: true,
},
};
});
describe('Processor Logic', () => {
const basePayload = {
ts_ms: 1700000000000,
@@ -227,3 +239,58 @@ describe('Processor Logic', () => {
expect(rows[1].loop_name).toBe('[1强电继电器输出状态-10-2]');
});
});
describe('Processor Logic - 0x0E Support', () => {
const basePayload = {
ts_ms: 1700000000000,
hotel_id: 1001,
room_id: '8001',
device_id: 'dev_001',
direction: '上报',
cmd_word: '0x0E',
frame_id: 1,
udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
sys_lock_status: 0,
report_count: 0,
fault_count: 0
};
it('should handle 0x0E Status Report with device list (same as 0x36)', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x0E',
report_count: 2,
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(2);
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_addr).toBe(10);
expect(rows[0].cmd_word).toBe('0x0e'); // Normalized
expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2);
});
it('should handle 0x0E Fault Report', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x0E',
fault_count: 1,
fault_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].error_type).toBe(2);
expect(rows[0].cmd_word).toBe('0x0e');
});
});