Files
XuJiacheng 8337c60f98 feat: 实现Kafka批量消费与写入以提升吞吐量
引入批量处理机制,将消息缓冲并按批次写入数据库,显著提高消费性能。调整Kafka配置参数,优化消费者并发与提交策略。新增分区索引自动创建功能,并重构处理器以支持批量操作。添加降级写入逻辑以处理数据错误,同时增强指标收集以监控批量处理效果。
2026-02-09 10:50:56 +08:00

1215 lines
43 KiB
JavaScript

import cron from "node-cron";
import dotenv from "dotenv";
import pg from "pg";
import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import kafka from "kafka-node";
import { randomUUID } from "crypto";
import { z } from "zod";
import { createClient } from "redis";
dotenv.config();
const parseNumber = (value, defaultValue) => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : defaultValue;
};
const parseList = (value) => (value || "").split(",").map((item) => item.trim()).filter(Boolean);
const config = {
env: process.env.NODE_ENV || "development",
port: parseNumber(process.env.PORT, 3001),
kafka: {
brokers: parseList(process.env.KAFKA_BROKERS),
topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || "blwlog4Nodejs-rcu-onoffline-topic",
groupId: process.env.KAFKA_GROUP_ID || "bls-onoffline-group",
clientId: process.env.KAFKA_CLIENT_ID || "bls-onoffline-client",
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024),
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3),
commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200),
commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true",
batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3),
batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50),
logMessages: process.env.KAFKA_LOG_MESSAGES === "true",
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
mechanism: process.env.KAFKA_SASL_MECHANISM || "plain",
username: process.env.KAFKA_SASL_USERNAME,
password: process.env.KAFKA_SASL_PASSWORD
} : void 0
},
db: {
host: process.env.DB_HOST || process.env.POSTGRES_HOST || "localhost",
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
user: process.env.DB_USER || process.env.POSTGRES_USER || "postgres",
password: process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || "",
database: process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || "log_platform",
max: parseNumber(process.env.DB_MAX_CONNECTIONS || process.env.POSTGRES_MAX_CONNECTIONS, 10),
ssl: process.env.DB_SSL === "true" ? { rejectUnauthorized: false } : void 0,
schema: process.env.DB_SCHEMA || "onoffline",
table: process.env.DB_TABLE || "onoffline_record"
},
redis: {
host: process.env.REDIS_HOST || "localhost",
port: parseNumber(process.env.REDIS_PORT, 6379),
password: process.env.REDIS_PASSWORD || void 0,
db: parseNumber(process.env.REDIS_DB, 0),
projectName: process.env.REDIS_PROJECT_NAME || "bls-onoffline",
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3001)}`
}
};
const format = (level, message, context) => {
const payload = {
level,
message,
timestamp: Date.now(),
...context ? { context } : {}
};
return JSON.stringify(payload);
};
const logger = {
info(message, context) {
process.stdout.write(`${format("info", message, context)}
`);
},
error(message, context) {
process.stderr.write(`${format("error", message, context)}
`);
},
warn(message, context) {
process.stderr.write(`${format("warn", message, context)}
`);
}
};
const { Pool } = pg;
const columns = [
"guid",
"ts_ms",
"write_ts_ms",
"hotel_id",
"mac",
"device_id",
"room_id",
"ip",
"current_status",
"launcher_version",
"reboot_reason"
];
class DatabaseManager {
constructor(dbConfig) {
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
user: dbConfig.user,
password: dbConfig.password,
database: dbConfig.database,
max: dbConfig.max,
ssl: dbConfig.ssl
});
}
async insertRows({ schema, table, rows }) {
if (!rows || rows.length === 0) {
return;
}
const statement = `
INSERT INTO ${schema}.${table} (${columns.join(", ")})
SELECT *
FROM UNNEST(
$1::text[],
$2::int8[],
$3::int8[],
$4::int2[],
$5::text[],
$6::text[],
$7::text[],
$8::text[],
$9::text[],
$10::text[],
$11::text[]
)
ON CONFLICT DO NOTHING
`;
try {
const params = columns.map((column) => rows.map((row) => row[column] ?? null));
await this.pool.query(statement, params);
} catch (error) {
logger.error("Database insert failed", {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
}
async checkConnection() {
let client;
try {
const connectPromise = this.pool.connect();
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error("Connection timeout")), 5e3);
});
try {
client = await Promise.race([connectPromise, timeoutPromise]);
} catch (raceError) {
connectPromise.then((c) => c.release()).catch(() => {
});
throw raceError;
}
await client.query("SELECT 1");
return true;
} catch (err) {
logger.error("Database check connection failed", { error: err.message });
return false;
} finally {
if (client) {
client.release();
}
}
}
async close() {
await this.pool.end();
}
}
const dbManager = new DatabaseManager(config.db);
class PartitionManager {
/**
* Calculate the start and end timestamps (milliseconds) for a given date.
* @param {Date} date - The date to calculate for.
* @returns {Object} { startMs, endMs, partitionSuffix }
*/
getPartitionInfo(date) {
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, "0");
const dd = String(date.getDate()).padStart(2, "0");
const partitionSuffix = `${yyyy}${mm}${dd}`;
const start = new Date(date);
start.setHours(0, 0, 0, 0);
const startMs = start.getTime();
const end = new Date(date);
end.setDate(end.getDate() + 1);
end.setHours(0, 0, 0, 0);
const endMs = end.getTime();
return { startMs, endMs, partitionSuffix };
}
async ensurePartitionIndexes(client, schema, table, partitionSuffix) {
const startedAt = Date.now();
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const indexBase = `${table}_${partitionSuffix}`;
const indexSpecs = [
{ name: `idx_${indexBase}_ts`, column: "ts_ms" },
{ name: `idx_${indexBase}_hid`, column: "hotel_id" },
{ name: `idx_${indexBase}_mac`, column: "mac" },
{ name: `idx_${indexBase}_did`, column: "device_id" },
{ name: `idx_${indexBase}_rid`, column: "room_id" },
{ name: `idx_${indexBase}_cs`, column: "current_status" }
];
for (const spec of indexSpecs) {
await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`);
}
await client.query(`ANALYZE ${partitionName};`);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 1e3) {
logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs });
}
}
async ensureIndexesForExistingPartitions() {
const startedAt = Date.now();
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
const res = await client.query(
`
SELECT c.relname AS relname
FROM pg_inherits i
JOIN pg_class p ON i.inhparent = p.oid
JOIN pg_namespace pn ON pn.oid = p.relnamespace
JOIN pg_class c ON i.inhrelid = c.oid
WHERE pn.nspname = $1 AND p.relname = $2
ORDER BY c.relname;
`,
[schema, table]
);
const suffixes = /* @__PURE__ */ new Set();
const pattern = new RegExp(`^${table}_(\\d{8})$`);
for (const row of res.rows) {
const relname = row?.relname;
if (typeof relname !== "string") continue;
const match = relname.match(pattern);
if (!match) continue;
suffixes.add(match[1]);
}
for (const suffix of suffixes) {
await this.ensurePartitionIndexes(client, schema, table, suffix);
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 5e3) {
logger.warn("Ensure existing partition indexes slow", { schema, table, partitions: suffixes.size, elapsedMs });
}
} finally {
client.release();
}
}
/**
* Ensure partitions exist for the past M days and next N days.
* @param {number} daysAhead - Number of days to pre-create.
* @param {number} daysBack - Number of days to look back.
*/
async ensurePartitions(daysAhead = 30, daysBack = 15) {
const client = await dbManager.pool.connect();
try {
logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
const now = /* @__PURE__ */ new Date();
for (let i = -daysBack; i < daysAhead; i++) {
const targetDate = new Date(now);
targetDate.setDate(now.getDate() + i);
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
const schema = config.db.schema;
const table = config.db.table;
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const checkSql = `
SELECT to_regclass($1) as exists;
`;
const checkRes = await client.query(checkSql, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
const createSql = `
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs});
`;
await client.query(createSql);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
logger.info("Partition check completed.");
} catch (err) {
logger.error("Error ensuring partitions:", err);
throw err;
} finally {
client.release();
}
}
async ensurePartitionsForTimestamps(tsMsList) {
if (!Array.isArray(tsMsList) || tsMsList.length === 0) return;
const uniqueSuffixes = /* @__PURE__ */ new Set();
for (const ts of tsMsList) {
const numericTs = typeof ts === "string" ? Number(ts) : ts;
if (!Number.isFinite(numericTs)) continue;
const date = new Date(numericTs);
if (Number.isNaN(date.getTime())) continue;
const { partitionSuffix } = this.getPartitionInfo(date);
uniqueSuffixes.add(partitionSuffix);
if (uniqueSuffixes.size >= 400) break;
}
if (uniqueSuffixes.size === 0) return;
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
for (const partitionSuffix of uniqueSuffixes) {
const yyyy = Number(partitionSuffix.slice(0, 4));
const mm = Number(partitionSuffix.slice(4, 6));
const dd = Number(partitionSuffix.slice(6, 8));
if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue;
const targetDate = new Date(yyyy, mm - 1, dd);
if (Number.isNaN(targetDate.getTime())) continue;
const { startMs, endMs } = this.getPartitionInfo(targetDate);
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
await client.query(`
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs});
`);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
} finally {
client.release();
}
}
}
const partitionManager = new PartitionManager();
const __filename$1 = fileURLToPath(import.meta.url);
const __dirname$1 = path.dirname(__filename$1);
class DatabaseInitializer {
async initialize() {
logger.info("Starting database initialization check...");
await this.ensureDatabaseExists();
await this.ensureSchemaAndTable();
await partitionManager.ensurePartitions(30);
await partitionManager.ensureIndexesForExistingPartitions();
console.log("Database initialization completed successfully.");
logger.info("Database initialization completed successfully.");
}
async ensureDatabaseExists() {
const { host, port, user, password, database, ssl } = config.db;
console.log(`Checking if database '${database}' exists at ${host}:${port}...`);
const client = new pg.Client({
host,
port,
user,
password,
database: "postgres",
ssl: ssl ? { rejectUnauthorized: false } : false
});
try {
await client.connect();
const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`,
[database]
);
if (checkRes.rowCount === 0) {
logger.info(`Database '${database}' does not exist. Creating...`);
await client.query(`CREATE DATABASE "${database}"`);
console.log(`Database '${database}' created.`);
logger.info(`Database '${database}' created.`);
} else {
console.log(`Database '${database}' already exists.`);
logger.info(`Database '${database}' already exists.`);
}
} catch (err) {
logger.error("Error ensuring database exists:", err);
throw err;
} finally {
await client.end();
}
}
async ensureSchemaAndTable() {
const client = await dbManager.pool.connect();
try {
const sqlPathCandidates = [
path.resolve(process.cwd(), "scripts/init_db.sql"),
path.resolve(__dirname$1, "../scripts/init_db.sql"),
path.resolve(__dirname$1, "../../scripts/init_db.sql")
];
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
if (!sqlPath) {
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(" | ")}`);
}
const sql = fs.readFileSync(sqlPath, "utf8");
console.log(`Executing init_db.sql from ${sqlPath}...`);
logger.info("Executing init_db.sql...");
await client.query(sql);
console.log("Schema and parent table initialized.");
logger.info("Schema and parent table initialized.");
} catch (err) {
logger.error("Error initializing schema and table:", err);
throw err;
} finally {
client.release();
}
}
}
const dbInitializer = new DatabaseInitializer();
class OffsetTracker {
constructor() {
this.partitions = /* @__PURE__ */ new Map();
}
// Called when a message is received (before processing)
add(topic, partition, offset) {
const key = `${topic}-${partition}`;
if (!this.partitions.has(key)) {
this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() });
}
const state = this.partitions.get(key);
const numericOffset = Number(offset);
if (!Number.isFinite(numericOffset)) return;
if (state.nextCommitOffset === null) {
state.nextCommitOffset = numericOffset;
} else if (numericOffset < state.nextCommitOffset) {
state.nextCommitOffset = numericOffset;
}
}
// Called when a message is successfully processed
// Returns the next offset to commit (if any advancement is possible), or null
markDone(topic, partition, offset) {
const key = `${topic}-${partition}`;
const state = this.partitions.get(key);
if (!state) return null;
const numericOffset = Number(offset);
if (!Number.isFinite(numericOffset)) return null;
state.done.add(numericOffset);
if (state.nextCommitOffset === null) {
state.nextCommitOffset = numericOffset;
}
let advanced = false;
while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) {
state.done.delete(state.nextCommitOffset);
state.nextCommitOffset += 1;
advanced = true;
}
if (!advanced) return null;
return state.nextCommitOffset;
}
clear() {
this.partitions.clear();
}
}
const { ConsumerGroup } = kafka;
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
const kafkaHost = kafkaConfig.brokers.join(",");
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3;
const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200;
let inFlight = 0;
const tracker = new OffsetTracker();
let pendingCommits = /* @__PURE__ */ new Map();
let commitTimer = null;
const flushCommits = () => {
if (pendingCommits.size === 0) return;
const batch = pendingCommits;
pendingCommits = /* @__PURE__ */ new Map();
consumer.sendOffsetCommitRequest(
Array.from(batch.values()),
(err) => {
if (err) {
for (const [k, v] of batch.entries()) {
pendingCommits.set(k, v);
}
logger.error("Kafka commit failed", { error: err?.message, count: batch.size });
}
}
);
};
const scheduleCommitFlush = () => {
if (commitTimer) return;
commitTimer = setTimeout(() => {
commitTimer = null;
flushCommits();
}, commitIntervalMs);
};
const consumer = new ConsumerGroup(
{
kafkaHost,
groupId: kafkaConfig.groupId,
clientId,
id,
fromOffset: "earliest",
protocol: ["roundrobin"],
outOfRangeOffset: "latest",
autoCommit: false,
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
fetchMinBytes: kafkaConfig.fetchMinBytes,
fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs,
sasl: kafkaConfig.sasl
},
kafkaConfig.topic
);
const tryResume = () => {
if (inFlight < maxInFlight && consumer.paused) {
consumer.resume();
}
};
consumer.on("message", (message) => {
inFlight += 1;
tracker.add(message.topic, message.partition, message.offset);
if (inFlight >= maxInFlight) {
consumer.pause();
}
Promise.resolve(onMessage(message)).then(() => {
}).catch((error) => {
logger.error("Kafka message handling failed", { error: error?.message });
if (onError) {
onError(error, message);
}
}).finally(() => {
const commitOffset = tracker.markDone(message.topic, message.partition, message.offset);
if (commitOffset !== null) {
const key = `${message.topic}-${message.partition}`;
pendingCommits.set(key, {
topic: message.topic,
partition: message.partition,
offset: commitOffset,
metadata: "m"
});
scheduleCommitFlush();
}
inFlight -= 1;
tryResume();
});
});
consumer.on("error", (error) => {
logger.error("Kafka consumer error", { error: error?.message });
if (onError) {
onError(error);
}
});
consumer.on("connect", () => {
logger.info(`Kafka Consumer connected`, {
groupId: kafkaConfig.groupId,
clientId
});
});
consumer.on("rebalancing", () => {
logger.info(`Kafka Consumer rebalancing`, {
groupId: kafkaConfig.groupId,
clientId
});
tracker.clear();
pendingCommits.clear();
if (commitTimer) {
clearTimeout(commitTimer);
commitTimer = null;
}
});
consumer.on("rebalanced", () => {
logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId });
});
consumer.on("error", (err) => {
logger.error("Kafka Consumer Error", { error: err.message });
});
consumer.on("offsetOutOfRange", (err) => {
logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition });
});
consumer.on("offsetOutOfRange", (error) => {
logger.warn(`Kafka Consumer offset out of range`, {
error: error?.message,
groupId: kafkaConfig.groupId,
clientId
});
});
consumer.on("close", () => {
if (commitTimer) {
clearTimeout(commitTimer);
commitTimer = null;
}
flushCommits();
logger.warn(`Kafka Consumer closed`, {
groupId: kafkaConfig.groupId,
clientId
});
});
return consumer;
};
const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
const count = Math.max(1, instances);
return Array.from(
{ length: count },
(_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
);
};
const createGuid = () => randomUUID().replace(/-/g, "");
const toNumber = (value) => {
if (value === void 0 || value === null || value === "") {
return value;
}
if (typeof value === "number") {
return value;
}
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : value;
};
const toStringAllowEmpty = (value) => {
if (value === void 0 || value === null) {
return value;
}
return String(value);
};
const kafkaPayloadSchema = z.object({
HotelCode: z.preprocess(toNumber, z.number()),
MAC: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
HostNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
RoomNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
EndPoint: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
CurrentStatus: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
CurrentTime: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
UnixTime: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
LauncherVersion: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
RebootReason: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable()
});
const normalizeText = (value, maxLength) => {
if (value === void 0 || value === null) {
return null;
}
const str = String(value);
if (maxLength && str.length > maxLength) {
return str.substring(0, maxLength);
}
return str;
};
const buildRowsFromPayload = (rawPayload) => {
const payload = kafkaPayloadSchema.parse(rawPayload);
const rebootReason = normalizeText(payload.RebootReason, 255);
const currentStatusRaw = normalizeText(payload.CurrentStatus, 255);
const hasRebootReason = rebootReason !== null && rebootReason !== "";
const currentStatus = hasRebootReason ? "on" : currentStatusRaw;
let tsMs = payload.UnixTime;
if (typeof tsMs === "number" && tsMs < 1e11) {
tsMs = tsMs * 1e3;
}
if (!tsMs && payload.CurrentTime) {
const parsed = Date.parse(payload.CurrentTime);
if (!isNaN(parsed)) {
tsMs = parsed;
}
}
if (!tsMs) {
tsMs = Date.now();
}
const mac = normalizeText(payload.MAC) || "";
const deviceId = normalizeText(payload.HostNumber) || "";
const roomId = normalizeText(payload.RoomNumber) || "";
const row = {
guid: createGuid(),
ts_ms: tsMs,
write_ts_ms: Date.now(),
hotel_id: payload.HotelCode,
mac,
device_id: deviceId,
room_id: roomId,
ip: normalizeText(payload.EndPoint),
current_status: currentStatus,
launcher_version: normalizeText(payload.LauncherVersion, 255),
reboot_reason: rebootReason
};
return [row];
};
const parseMessageToRows = (message) => {
const rawValue = message.value.toString();
let payload;
try {
payload = JSON.parse(rawValue);
} catch (e) {
const error = new Error(`JSON Parse Error: ${e.message}`);
error.type = "PARSE_ERROR";
throw error;
}
const validationResult = kafkaPayloadSchema.safeParse(payload);
if (!validationResult.success) {
const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`);
error.type = "VALIDATION_ERROR";
throw error;
}
return buildRowsFromPayload(payload);
};
const createRedisClient = async (config2) => {
const client = createClient({
socket: {
host: config2.host,
port: config2.port
},
password: config2.password,
database: config2.db
});
await client.connect();
return client;
};
class RedisIntegration {
constructor(client, projectName, apiBaseUrl) {
this.client = client;
this.projectName = projectName;
this.apiBaseUrl = apiBaseUrl;
this.heartbeatKey = "项目心跳";
this.logKey = `${projectName}_项目控制台`;
}
async info(message, context) {
const payload = {
timestamp: (/* @__PURE__ */ new Date()).toISOString(),
level: "info",
message,
metadata: context || void 0
};
await this.client.rPush(this.logKey, JSON.stringify(payload));
}
async error(message, context) {
const payload = {
timestamp: (/* @__PURE__ */ new Date()).toISOString(),
level: "error",
message,
metadata: context || void 0
};
await this.client.rPush(this.logKey, JSON.stringify(payload));
}
startHeartbeat() {
setInterval(() => {
const payload = {
projectName: this.projectName,
apiBaseUrl: this.apiBaseUrl,
lastActiveAt: Date.now()
};
this.client.rPush(this.heartbeatKey, JSON.stringify(payload));
}, 3e3);
}
}
const buildErrorQueueKey = (projectName) => `${projectName}_error_queue`;
const enqueueError = async (client, queueKey, payload) => {
try {
await client.rPush(queueKey, JSON.stringify(payload));
} catch (error) {
logger.error("Redis enqueue error failed", { error: error?.message });
throw error;
}
};
const startErrorRetryWorker = async ({
client,
queueKey,
handler,
redisIntegration,
maxAttempts = 5
}) => {
while (true) {
const result = await client.blPop(queueKey, 0);
const raw = result?.element;
if (!raw) {
continue;
}
let item;
try {
item = JSON.parse(raw);
} catch (error) {
logger.error("Invalid error payload", { error: error?.message });
await redisIntegration.error("Invalid error payload", { module: "redis", stack: error?.message });
continue;
}
const attempts = item.attempts || 0;
try {
await handler(item);
} catch (error) {
logger.error("Retry handler failed", { error: error?.message, stack: error?.stack });
const nextPayload = {
...item,
attempts: attempts + 1,
lastError: error?.message,
lastAttemptAt: Date.now()
};
if (nextPayload.attempts >= maxAttempts) {
await redisIntegration.error("Retry attempts exceeded", { module: "retry", stack: JSON.stringify(nextPayload) });
} else {
await enqueueError(client, queueKey, nextPayload);
}
}
}
};
class MetricCollector {
constructor() {
this.reset();
}
reset() {
this.metrics = {
kafka_pulled: 0,
parse_error: 0,
db_inserted: 0,
db_failed: 0,
db_insert_count: 0,
db_insert_ms_sum: 0,
batch_flush_count: 0,
batch_flush_ms_sum: 0
};
this.keyed = {};
}
increment(metric, count = 1) {
if (this.metrics.hasOwnProperty(metric)) {
this.metrics[metric] += count;
}
}
incrementKeyed(metric, key, count = 1) {
if (!key) return;
if (!this.keyed[metric]) {
this.keyed[metric] = {};
}
if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) {
this.keyed[metric][key] = 0;
}
this.keyed[metric][key] += count;
}
getAndReset() {
const current = { ...this.metrics };
const keyed = JSON.parse(JSON.stringify(this.keyed));
this.reset();
return { ...current, keyed };
}
}
const bootstrap = async () => {
logger.info("Starting application with config", {
env: process.env.NODE_ENV,
db: {
host: config.db.host,
port: config.db.port,
user: config.db.user,
database: config.db.database,
schema: config.db.schema
},
kafka: {
brokers: config.kafka.brokers,
topic: config.kafka.topic,
groupId: config.kafka.groupId
},
redis: {
host: config.redis.host,
port: config.redis.port
}
});
await dbInitializer.initialize();
const metricCollector = new MetricCollector();
cron.schedule("0 0 * * *", async () => {
logger.info("Running scheduled partition maintenance...");
try {
await partitionManager.ensurePartitions(30);
} catch (err) {
logger.error("Scheduled partition maintenance failed", err);
}
});
const redisClient = await createRedisClient(config.redis);
const redisIntegration = new RedisIntegration(
redisClient,
config.redis.projectName,
config.redis.apiBaseUrl
);
redisIntegration.startHeartbeat();
cron.schedule("* * * * *", async () => {
const metrics = metricCollector.getAndReset();
const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0";
const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0";
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`;
console.log(report);
logger.info(report, metrics);
try {
await redisIntegration.info("Minute Metrics", metrics);
} catch (err) {
logger.error("Failed to report metrics to Redis", { error: err?.message });
}
});
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
const handleError = async (error, message) => {
logger.error("Kafka processing error", {
error: error?.message,
type: error?.type,
stack: error?.stack
});
try {
await redisIntegration.error("Kafka processing error", {
module: "kafka",
stack: error?.stack || error?.message
});
} catch (redisError) {
logger.error("Redis error log failed", { error: redisError?.message });
}
if (message) {
const messageValue = Buffer.isBuffer(message.value) ? message.value.toString("utf8") : message.value;
try {
await enqueueError(redisClient, errorQueueKey, {
attempts: 0,
value: messageValue,
meta: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: message.key
},
timestamp: Date.now()
});
} catch (enqueueError2) {
logger.error("Enqueue error payload failed", { error: enqueueError2?.message });
}
}
};
const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1e3;
const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20;
const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5e3;
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
const commitOnAttempt = config.kafka.commitOnAttempt === true;
const batchStates = /* @__PURE__ */ new Map();
const partitionKeyFromMessage = (message) => {
if (message?.topic !== void 0 && message?.partition !== void 0) {
return `${message.topic}-${message.partition}`;
}
return "retry";
};
const dayKeyFromTsMs = (tsMs) => {
const numeric = typeof tsMs === "string" ? Number(tsMs) : tsMs;
if (!Number.isFinite(numeric)) return null;
const d = new Date(numeric);
if (Number.isNaN(d.getTime())) return null;
const yyyy = d.getFullYear();
const mm = String(d.getMonth() + 1).padStart(2, "0");
const dd = String(d.getDate()).padStart(2, "0");
return `${yyyy}${mm}${dd}`;
};
const getBatchState = (key) => {
if (!batchStates.has(key)) {
batchStates.set(key, { items: [], timer: null, flushing: null });
}
return batchStates.get(key);
};
const isDbConnectionError = (err) => {
const code = err?.code;
if (typeof code === "string") {
const networkCodes = /* @__PURE__ */ new Set([
"ECONNREFUSED",
"ECONNRESET",
"EPIPE",
"ETIMEDOUT",
"ENOTFOUND",
"EHOSTUNREACH",
"ENETUNREACH",
"57P03",
"08006",
"08001",
"08000",
"08003"
]);
if (networkCodes.has(code)) return true;
}
const message = typeof err?.message === "string" ? err.message : "";
if (!message) return false;
const lower = message.toLowerCase();
return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry");
};
const isMissingPartitionError = (err) => err?.code === "23514" || typeof err?.message === "string" && err.message.includes("no partition of relation");
const insertRowsWithRetry = async (rows) => {
const startedAt = Date.now();
let attemptedPartitionFix = false;
while (true) {
try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
metricCollector.increment("db_insert_count", 1);
metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt);
return;
} catch (err) {
if (isDbConnectionError(err)) {
logger.error("Database offline during batch insert. Retrying in 5s...", { error: err.message });
await new Promise((r) => setTimeout(r, 5e3));
while (!await dbManager.checkConnection()) {
logger.warn("Database still offline. Waiting 5s...");
await new Promise((r) => setTimeout(r, 5e3));
}
continue;
}
if (isMissingPartitionError(err) && !attemptedPartitionFix) {
attemptedPartitionFix = true;
try {
await partitionManager.ensurePartitionsForTimestamps(rows.map((r) => r.ts_ms));
} catch (partitionErr) {
if (isDbConnectionError(partitionErr)) {
logger.error("Database offline during partition ensure. Retrying in 5s...", { error: partitionErr.message });
await new Promise((r) => setTimeout(r, 5e3));
while (!await dbManager.checkConnection()) {
logger.warn("Database still offline. Waiting 5s...");
await new Promise((r) => setTimeout(r, 5e3));
}
continue;
}
throw partitionErr;
}
continue;
}
throw err;
}
}
};
const insertRowsOnce = async (rows) => {
const startedAt = Date.now();
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
metricCollector.increment("db_insert_count", 1);
metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt);
};
const resolveInsertedItems = (partitionKey, items) => {
let insertedRows = 0;
for (const p of items) {
insertedRows += p.rows.length;
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
if (dayKey) {
metricCollector.incrementKeyed("db_inserted_by_day", dayKey, p.rows.length);
}
p.item.resolve();
}
metricCollector.increment("db_inserted", insertedRows);
metricCollector.incrementKeyed("db_inserted_by_partition", partitionKey, insertedRows);
};
const handleFailedItem = async (partitionKey, p, err) => {
metricCollector.increment("db_failed");
metricCollector.incrementKeyed("db_failed_by_partition", partitionKey, 1);
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
if (dayKey) {
metricCollector.incrementKeyed("db_failed_by_day", dayKey, 1);
}
await handleError(err, p.item.message);
p.item.resolve();
};
const insertItemsDegraded = async (partitionKey, items) => {
if (items.length === 0) return;
const rows = items.flatMap((p) => p.rows);
if (commitOnAttempt) {
try {
await insertRowsOnce(rows);
resolveInsertedItems(partitionKey, items);
} catch (err) {
for (const item of items) {
await handleFailedItem(partitionKey, item, err);
}
}
return;
}
try {
await insertRowsWithRetry(rows);
resolveInsertedItems(partitionKey, items);
return;
} catch (err) {
if (items.length === 1) {
try {
await insertRowsWithRetry(items[0].rows);
resolveInsertedItems(partitionKey, items);
} catch (innerErr) {
await handleFailedItem(partitionKey, items[0], innerErr);
}
return;
}
const mid = Math.floor(items.length / 2);
await insertItemsDegraded(partitionKey, items.slice(0, mid));
await insertItemsDegraded(partitionKey, items.slice(mid));
}
};
const flushBatchForKey = async (partitionKey) => {
const state = getBatchState(partitionKey);
if (state.flushing) return state.flushing;
state.flushing = (async () => {
if (state.timer) {
clearTimeout(state.timer);
state.timer = null;
}
if (state.items.length === 0) return;
const startedAt = Date.now();
const currentBatch = state.items;
state.items = [];
const pendingDbItems = [];
const unresolvedItems = [];
try {
for (const item of currentBatch) {
try {
const rows = parseMessageToRows(item.message);
pendingDbItems.push({ item, rows });
unresolvedItems.push(item);
} catch (err) {
metricCollector.increment("parse_error");
metricCollector.incrementKeyed("parse_error_by_partition", partitionKey, 1);
logger.error("Message processing failed (Parse/Validation)", { error: err.message });
await handleError(err, item.message);
item.resolve();
}
}
if (pendingDbItems.length > 0) {
const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms;
const dayKey = dayKeyFromTsMs(firstTs);
if (dayKey) {
const dayStartMs = Date.now();
await insertItemsDegraded(partitionKey, pendingDbItems);
metricCollector.incrementKeyed("db_insert_ms_sum_by_day", dayKey, Date.now() - dayStartMs);
} else {
await insertItemsDegraded(partitionKey, pendingDbItems);
}
}
metricCollector.increment("batch_flush_count", 1);
metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt);
} catch (err) {
if (!commitOnAttempt && isDbConnectionError(err)) {
state.items = unresolvedItems.concat(state.items);
if (!state.timer) {
state.timer = setTimeout(() => {
state.timer = null;
flushBatchForKey(partitionKey);
}, 5e3);
}
return;
}
logger.error("Batch flush failed (non-network). Marking as consumed", {
error: err?.message,
partitionKey,
batchSize: currentBatch.length
});
for (const item of unresolvedItems) {
try {
await handleError(err, item.message);
} catch {
}
item.resolve();
}
}
})().finally(() => {
state.flushing = null;
if (state.items.length > 0) {
if (state.items.length >= BATCH_SIZE) {
flushBatchForKey(partitionKey);
} else if (!state.timer) {
state.timer = setTimeout(() => {
state.timer = null;
flushBatchForKey(partitionKey);
}, BATCH_TIMEOUT_MS);
}
}
});
return state.flushing;
};
const handleMessage = (message) => {
if (message.topic) {
metricCollector.increment("kafka_pulled");
metricCollector.incrementKeyed("kafka_pulled_by_partition", `${message.topic}-${message.partition}`, 1);
}
const partitionKey = partitionKeyFromMessage(message);
const state = getBatchState(partitionKey);
return new Promise((resolve, reject) => {
state.items.push({ message, resolve, reject });
if (state.items.length >= BATCH_SIZE) {
flushBatchForKey(partitionKey);
} else if (!state.timer) {
state.timer = setTimeout(() => {
state.timer = null;
flushBatchForKey(partitionKey);
}, BATCH_TIMEOUT_MS);
}
});
};
const consumers = createKafkaConsumers({
kafkaConfig: config.kafka,
onMessage: handleMessage,
onError: handleError
});
startErrorRetryWorker({
client: redisClient,
queueKey: errorQueueKey,
redisIntegration,
handler: async (item) => {
if (!item?.value) {
throw new Error("Missing value in retry payload");
}
await handleMessage({ value: item.value });
}
}).catch((err) => {
logger.error("Retry worker failed", { error: err?.message });
});
const shutdown = async (signal) => {
logger.info(`Received ${signal}, shutting down...`);
try {
if (consumers && consumers.length > 0) {
await Promise.all(consumers.map((c) => new Promise((resolve) => c.close(true, resolve))));
logger.info("Kafka consumer closed", { count: consumers.length });
}
await redisClient.quit();
logger.info("Redis client closed");
await dbManager.close();
logger.info("Database connection closed");
process.exit(0);
} catch (err) {
logger.error("Error during shutdown", { error: err?.message });
process.exit(1);
}
};
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
};
bootstrap().catch((error) => {
logger.error("Service bootstrap failed", { error: error?.message });
process.exit(1);
});