修改gitigonre
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
/bls-onoffline-backend/node_modules
|
/bls-onoffline-backend/node_modules
|
||||||
/template
|
/template
|
||||||
|
/bls-onoffline-backend/dist
|
||||||
|
|||||||
950
bls-onoffline-backend/dist/index.js
vendored
950
bls-onoffline-backend/dist/index.js
vendored
@@ -1,950 +0,0 @@
|
|||||||
import cron from "node-cron";
|
|
||||||
import dotenv from "dotenv";
|
|
||||||
import pg from "pg";
|
|
||||||
import kafka from "kafka-node";
|
|
||||||
import { randomUUID } from "crypto";
|
|
||||||
import { z } from "zod";
|
|
||||||
import { createClient } from "redis";
|
|
||||||
dotenv.config();
|
|
||||||
const parseNumber = (value, defaultValue) => {
|
|
||||||
const parsed = Number(value);
|
|
||||||
return Number.isFinite(parsed) ? parsed : defaultValue;
|
|
||||||
};
|
|
||||||
const parseList = (value) => (value || "").split(",").map((item) => item.trim()).filter(Boolean);
|
|
||||||
const config = {
|
|
||||||
env: process.env.NODE_ENV || "development",
|
|
||||||
port: parseNumber(process.env.PORT, 3001),
|
|
||||||
kafka: {
|
|
||||||
brokers: parseList(process.env.KAFKA_BROKERS),
|
|
||||||
topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || "blwlog4Nodejs-rcu-onoffline-topic",
|
|
||||||
groupId: process.env.KAFKA_GROUP_ID || "bls-onoffline-group",
|
|
||||||
clientId: process.env.KAFKA_CLIENT_ID || "bls-onoffline-client",
|
|
||||||
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
|
|
||||||
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4),
|
|
||||||
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024),
|
|
||||||
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024),
|
|
||||||
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
|
|
||||||
autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3),
|
|
||||||
commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200),
|
|
||||||
commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true",
|
|
||||||
batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3),
|
|
||||||
batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50),
|
|
||||||
logMessages: process.env.KAFKA_LOG_MESSAGES === "true",
|
|
||||||
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
|
|
||||||
mechanism: process.env.KAFKA_SASL_MECHANISM || "plain",
|
|
||||||
username: process.env.KAFKA_SASL_USERNAME,
|
|
||||||
password: process.env.KAFKA_SASL_PASSWORD
|
|
||||||
} : void 0
|
|
||||||
},
|
|
||||||
db: {
|
|
||||||
host: process.env.DB_HOST || process.env.POSTGRES_HOST || "localhost",
|
|
||||||
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
|
||||||
user: process.env.DB_USER || process.env.POSTGRES_USER || "postgres",
|
|
||||||
password: process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || "",
|
|
||||||
database: process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || "log_platform",
|
|
||||||
max: parseNumber(process.env.DB_MAX_CONNECTIONS || process.env.POSTGRES_MAX_CONNECTIONS, 10),
|
|
||||||
ssl: process.env.DB_SSL === "true" ? { rejectUnauthorized: false } : void 0,
|
|
||||||
schema: process.env.DB_SCHEMA || "onoffline",
|
|
||||||
table: process.env.DB_TABLE || "onoffline_record"
|
|
||||||
},
|
|
||||||
redis: {
|
|
||||||
host: process.env.REDIS_HOST || "localhost",
|
|
||||||
port: parseNumber(process.env.REDIS_PORT, 6379),
|
|
||||||
password: process.env.REDIS_PASSWORD || void 0,
|
|
||||||
db: parseNumber(process.env.REDIS_DB, 0),
|
|
||||||
projectName: process.env.REDIS_PROJECT_NAME || "bls-onoffline",
|
|
||||||
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3001)}`
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const format = (level, message, context) => {
|
|
||||||
const payload = {
|
|
||||||
level,
|
|
||||||
message,
|
|
||||||
timestamp: Date.now(),
|
|
||||||
...context ? { context } : {}
|
|
||||||
};
|
|
||||||
return JSON.stringify(payload);
|
|
||||||
};
|
|
||||||
const logger = {
|
|
||||||
info(message, context) {
|
|
||||||
process.stdout.write(`${format("info", message, context)}
|
|
||||||
`);
|
|
||||||
},
|
|
||||||
error(message, context) {
|
|
||||||
process.stderr.write(`${format("error", message, context)}
|
|
||||||
`);
|
|
||||||
},
|
|
||||||
warn(message, context) {
|
|
||||||
process.stderr.write(`${format("warn", message, context)}
|
|
||||||
`);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const { Pool } = pg;
|
|
||||||
const columns = [
|
|
||||||
"guid",
|
|
||||||
"ts_ms",
|
|
||||||
"write_ts_ms",
|
|
||||||
"hotel_id",
|
|
||||||
"mac",
|
|
||||||
"device_id",
|
|
||||||
"room_id",
|
|
||||||
"ip",
|
|
||||||
"current_status",
|
|
||||||
"launcher_version",
|
|
||||||
"reboot_reason"
|
|
||||||
];
|
|
||||||
class DatabaseManager {
|
|
||||||
constructor(dbConfig) {
|
|
||||||
this.pool = new Pool({
|
|
||||||
host: dbConfig.host,
|
|
||||||
port: dbConfig.port,
|
|
||||||
user: dbConfig.user,
|
|
||||||
password: dbConfig.password,
|
|
||||||
database: dbConfig.database,
|
|
||||||
max: dbConfig.max,
|
|
||||||
ssl: dbConfig.ssl
|
|
||||||
});
|
|
||||||
}
|
|
||||||
async insertRows({ schema, table, rows }) {
|
|
||||||
if (!rows || rows.length === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const statement = `
|
|
||||||
INSERT INTO ${schema}.${table} (${columns.join(", ")})
|
|
||||||
SELECT *
|
|
||||||
FROM UNNEST(
|
|
||||||
$1::text[],
|
|
||||||
$2::int8[],
|
|
||||||
$3::int8[],
|
|
||||||
$4::int2[],
|
|
||||||
$5::text[],
|
|
||||||
$6::text[],
|
|
||||||
$7::text[],
|
|
||||||
$8::text[],
|
|
||||||
$9::text[],
|
|
||||||
$10::text[],
|
|
||||||
$11::text[]
|
|
||||||
)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
`;
|
|
||||||
try {
|
|
||||||
const params = columns.map((column) => rows.map((row) => row[column] ?? null));
|
|
||||||
await this.pool.query(statement, params);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Database insert failed", {
|
|
||||||
error: error?.message,
|
|
||||||
schema,
|
|
||||||
table,
|
|
||||||
rowsLength: rows.length
|
|
||||||
});
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async checkConnection() {
|
|
||||||
let client;
|
|
||||||
try {
|
|
||||||
const connectPromise = this.pool.connect();
|
|
||||||
const timeoutPromise = new Promise((_, reject) => {
|
|
||||||
setTimeout(() => reject(new Error("Connection timeout")), 5e3);
|
|
||||||
});
|
|
||||||
try {
|
|
||||||
client = await Promise.race([connectPromise, timeoutPromise]);
|
|
||||||
} catch (raceError) {
|
|
||||||
connectPromise.then((c) => c.release()).catch(() => {
|
|
||||||
});
|
|
||||||
throw raceError;
|
|
||||||
}
|
|
||||||
await client.query("SELECT 1");
|
|
||||||
return true;
|
|
||||||
} catch (err) {
|
|
||||||
logger.error("Database check connection failed", { error: err.message });
|
|
||||||
return false;
|
|
||||||
} finally {
|
|
||||||
if (client) {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async close() {
|
|
||||||
await this.pool.end();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const dbManager = new DatabaseManager(config.db);
|
|
||||||
class OffsetTracker {
|
|
||||||
constructor() {
|
|
||||||
this.partitions = /* @__PURE__ */ new Map();
|
|
||||||
}
|
|
||||||
// Called when a message is received (before processing)
|
|
||||||
add(topic, partition, offset) {
|
|
||||||
const key = `${topic}-${partition}`;
|
|
||||||
if (!this.partitions.has(key)) {
|
|
||||||
this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() });
|
|
||||||
}
|
|
||||||
const state = this.partitions.get(key);
|
|
||||||
const numericOffset = Number(offset);
|
|
||||||
if (!Number.isFinite(numericOffset)) return;
|
|
||||||
if (state.nextCommitOffset === null) {
|
|
||||||
state.nextCommitOffset = numericOffset;
|
|
||||||
} else if (numericOffset < state.nextCommitOffset) {
|
|
||||||
state.nextCommitOffset = numericOffset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Called when a message is successfully processed
|
|
||||||
// Returns the next offset to commit (if any advancement is possible), or null
|
|
||||||
markDone(topic, partition, offset) {
|
|
||||||
const key = `${topic}-${partition}`;
|
|
||||||
const state = this.partitions.get(key);
|
|
||||||
if (!state) return null;
|
|
||||||
const numericOffset = Number(offset);
|
|
||||||
if (!Number.isFinite(numericOffset)) return null;
|
|
||||||
state.done.add(numericOffset);
|
|
||||||
if (state.nextCommitOffset === null) {
|
|
||||||
state.nextCommitOffset = numericOffset;
|
|
||||||
}
|
|
||||||
let advanced = false;
|
|
||||||
while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) {
|
|
||||||
state.done.delete(state.nextCommitOffset);
|
|
||||||
state.nextCommitOffset += 1;
|
|
||||||
advanced = true;
|
|
||||||
}
|
|
||||||
if (!advanced) return null;
|
|
||||||
return state.nextCommitOffset;
|
|
||||||
}
|
|
||||||
clear() {
|
|
||||||
this.partitions.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const { ConsumerGroup } = kafka;
|
|
||||||
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
|
|
||||||
const kafkaHost = kafkaConfig.brokers.join(",");
|
|
||||||
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
|
|
||||||
const id = `${clientId}-${process.pid}-${Date.now()}`;
|
|
||||||
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3;
|
|
||||||
const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200;
|
|
||||||
let inFlight = 0;
|
|
||||||
const tracker = new OffsetTracker();
|
|
||||||
let pendingCommits = /* @__PURE__ */ new Map();
|
|
||||||
let commitTimer = null;
|
|
||||||
const flushCommits = () => {
|
|
||||||
if (pendingCommits.size === 0) return;
|
|
||||||
const batch = pendingCommits;
|
|
||||||
pendingCommits = /* @__PURE__ */ new Map();
|
|
||||||
consumer.sendOffsetCommitRequest(
|
|
||||||
Array.from(batch.values()),
|
|
||||||
(err) => {
|
|
||||||
if (err) {
|
|
||||||
for (const [k, v] of batch.entries()) {
|
|
||||||
pendingCommits.set(k, v);
|
|
||||||
}
|
|
||||||
logger.error("Kafka commit failed", { error: err?.message, count: batch.size });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
};
|
|
||||||
const scheduleCommitFlush = () => {
|
|
||||||
if (commitTimer) return;
|
|
||||||
commitTimer = setTimeout(() => {
|
|
||||||
commitTimer = null;
|
|
||||||
flushCommits();
|
|
||||||
}, commitIntervalMs);
|
|
||||||
};
|
|
||||||
const consumer = new ConsumerGroup(
|
|
||||||
{
|
|
||||||
kafkaHost,
|
|
||||||
groupId: kafkaConfig.groupId,
|
|
||||||
clientId,
|
|
||||||
id,
|
|
||||||
fromOffset: "earliest",
|
|
||||||
protocol: ["roundrobin"],
|
|
||||||
outOfRangeOffset: "latest",
|
|
||||||
autoCommit: false,
|
|
||||||
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
|
|
||||||
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
|
|
||||||
fetchMinBytes: kafkaConfig.fetchMinBytes,
|
|
||||||
fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs,
|
|
||||||
sasl: kafkaConfig.sasl
|
|
||||||
},
|
|
||||||
kafkaConfig.topic
|
|
||||||
);
|
|
||||||
const tryResume = () => {
|
|
||||||
if (inFlight < maxInFlight && consumer.paused) {
|
|
||||||
consumer.resume();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
consumer.on("message", (message) => {
|
|
||||||
inFlight += 1;
|
|
||||||
tracker.add(message.topic, message.partition, message.offset);
|
|
||||||
if (inFlight >= maxInFlight) {
|
|
||||||
consumer.pause();
|
|
||||||
}
|
|
||||||
Promise.resolve(onMessage(message)).then(() => {
|
|
||||||
}).catch((error) => {
|
|
||||||
logger.error("Kafka message handling failed", { error: error?.message });
|
|
||||||
if (onError) {
|
|
||||||
onError(error, message);
|
|
||||||
}
|
|
||||||
}).finally(() => {
|
|
||||||
const commitOffset = tracker.markDone(message.topic, message.partition, message.offset);
|
|
||||||
if (commitOffset !== null) {
|
|
||||||
const key = `${message.topic}-${message.partition}`;
|
|
||||||
pendingCommits.set(key, {
|
|
||||||
topic: message.topic,
|
|
||||||
partition: message.partition,
|
|
||||||
offset: commitOffset,
|
|
||||||
metadata: "m"
|
|
||||||
});
|
|
||||||
scheduleCommitFlush();
|
|
||||||
}
|
|
||||||
inFlight -= 1;
|
|
||||||
tryResume();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
consumer.on("error", (error) => {
|
|
||||||
logger.error("Kafka consumer error", { error: error?.message });
|
|
||||||
if (onError) {
|
|
||||||
onError(error);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
consumer.on("connect", () => {
|
|
||||||
logger.info(`Kafka Consumer connected`, {
|
|
||||||
groupId: kafkaConfig.groupId,
|
|
||||||
clientId
|
|
||||||
});
|
|
||||||
});
|
|
||||||
consumer.on("rebalancing", () => {
|
|
||||||
logger.info(`Kafka Consumer rebalancing`, {
|
|
||||||
groupId: kafkaConfig.groupId,
|
|
||||||
clientId
|
|
||||||
});
|
|
||||||
tracker.clear();
|
|
||||||
pendingCommits.clear();
|
|
||||||
if (commitTimer) {
|
|
||||||
clearTimeout(commitTimer);
|
|
||||||
commitTimer = null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
consumer.on("rebalanced", () => {
|
|
||||||
logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId });
|
|
||||||
});
|
|
||||||
consumer.on("error", (err) => {
|
|
||||||
logger.error("Kafka Consumer Error", { error: err.message });
|
|
||||||
});
|
|
||||||
consumer.on("offsetOutOfRange", (err) => {
|
|
||||||
logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition });
|
|
||||||
});
|
|
||||||
consumer.on("offsetOutOfRange", (error) => {
|
|
||||||
logger.warn(`Kafka Consumer offset out of range`, {
|
|
||||||
error: error?.message,
|
|
||||||
groupId: kafkaConfig.groupId,
|
|
||||||
clientId
|
|
||||||
});
|
|
||||||
});
|
|
||||||
consumer.on("close", () => {
|
|
||||||
if (commitTimer) {
|
|
||||||
clearTimeout(commitTimer);
|
|
||||||
commitTimer = null;
|
|
||||||
}
|
|
||||||
flushCommits();
|
|
||||||
logger.warn(`Kafka Consumer closed`, {
|
|
||||||
groupId: kafkaConfig.groupId,
|
|
||||||
clientId
|
|
||||||
});
|
|
||||||
});
|
|
||||||
return consumer;
|
|
||||||
};
|
|
||||||
const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
|
|
||||||
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
|
|
||||||
const count = Math.max(1, instances);
|
|
||||||
return Array.from(
|
|
||||||
{ length: count },
|
|
||||||
(_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
|
|
||||||
);
|
|
||||||
};
|
|
||||||
const createGuid = () => randomUUID().replace(/-/g, "");
|
|
||||||
const toNumber = (value) => {
|
|
||||||
if (value === void 0 || value === null || value === "") {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
if (typeof value === "number") {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
const parsed = Number(value);
|
|
||||||
return Number.isFinite(parsed) ? parsed : value;
|
|
||||||
};
|
|
||||||
const toStringAllowEmpty = (value) => {
|
|
||||||
if (value === void 0 || value === null) {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
return String(value);
|
|
||||||
};
|
|
||||||
const kafkaPayloadSchema = z.object({
|
|
||||||
HotelCode: z.preprocess(toNumber, z.number()),
|
|
||||||
MAC: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
HostNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
RoomNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
EndPoint: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
CurrentStatus: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
CurrentTime: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
UnixTime: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
|
|
||||||
LauncherVersion: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
|
|
||||||
RebootReason: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable()
|
|
||||||
});
|
|
||||||
const normalizeText = (value, maxLength) => {
|
|
||||||
if (value === void 0 || value === null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
const str = String(value);
|
|
||||||
if (maxLength && str.length > maxLength) {
|
|
||||||
return str.substring(0, maxLength);
|
|
||||||
}
|
|
||||||
return str;
|
|
||||||
};
|
|
||||||
const buildRowsFromPayload = (rawPayload) => {
|
|
||||||
const payload = kafkaPayloadSchema.parse(rawPayload);
|
|
||||||
const rebootReason = normalizeText(payload.RebootReason, 255);
|
|
||||||
const currentStatusRaw = normalizeText(payload.CurrentStatus, 255);
|
|
||||||
const hasRebootReason = rebootReason !== null && rebootReason !== "";
|
|
||||||
const currentStatus = hasRebootReason ? "on" : currentStatusRaw;
|
|
||||||
let tsMs = payload.UnixTime;
|
|
||||||
if (typeof tsMs === "number" && tsMs < 1e11) {
|
|
||||||
tsMs = tsMs * 1e3;
|
|
||||||
}
|
|
||||||
if (!tsMs && payload.CurrentTime) {
|
|
||||||
const parsed = Date.parse(payload.CurrentTime);
|
|
||||||
if (!isNaN(parsed)) {
|
|
||||||
tsMs = parsed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!tsMs) {
|
|
||||||
tsMs = Date.now();
|
|
||||||
}
|
|
||||||
const mac = normalizeText(payload.MAC) || "";
|
|
||||||
const deviceId = normalizeText(payload.HostNumber) || "";
|
|
||||||
const roomId = normalizeText(payload.RoomNumber) || "";
|
|
||||||
let hotelId = payload.HotelCode;
|
|
||||||
if (typeof hotelId !== "number" || Number.isNaN(hotelId) || hotelId < -32768 || hotelId > 32767) {
|
|
||||||
hotelId = 0;
|
|
||||||
}
|
|
||||||
const row = {
|
|
||||||
guid: createGuid(),
|
|
||||||
ts_ms: tsMs,
|
|
||||||
write_ts_ms: Date.now(),
|
|
||||||
hotel_id: hotelId,
|
|
||||||
mac,
|
|
||||||
device_id: deviceId,
|
|
||||||
room_id: roomId,
|
|
||||||
ip: normalizeText(payload.EndPoint),
|
|
||||||
current_status: currentStatus,
|
|
||||||
launcher_version: normalizeText(payload.LauncherVersion, 255),
|
|
||||||
reboot_reason: rebootReason
|
|
||||||
};
|
|
||||||
return [row];
|
|
||||||
};
|
|
||||||
const parseMessageToRows = (message) => {
|
|
||||||
const rawValue = message.value.toString();
|
|
||||||
let payload;
|
|
||||||
try {
|
|
||||||
payload = JSON.parse(rawValue);
|
|
||||||
} catch (e) {
|
|
||||||
const error = new Error(`JSON Parse Error: ${e.message}`);
|
|
||||||
error.type = "PARSE_ERROR";
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
const validationResult = kafkaPayloadSchema.safeParse(payload);
|
|
||||||
if (!validationResult.success) {
|
|
||||||
const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`);
|
|
||||||
error.type = "VALIDATION_ERROR";
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
return buildRowsFromPayload(payload);
|
|
||||||
};
|
|
||||||
const createRedisClient = async (config2) => {
|
|
||||||
const client = createClient({
|
|
||||||
socket: {
|
|
||||||
host: config2.host,
|
|
||||||
port: config2.port
|
|
||||||
},
|
|
||||||
password: config2.password,
|
|
||||||
database: config2.db
|
|
||||||
});
|
|
||||||
await client.connect();
|
|
||||||
return client;
|
|
||||||
};
|
|
||||||
class RedisIntegration {
|
|
||||||
constructor(client, projectName, apiBaseUrl) {
|
|
||||||
this.client = client;
|
|
||||||
this.projectName = projectName;
|
|
||||||
this.apiBaseUrl = apiBaseUrl;
|
|
||||||
this.heartbeatKey = "项目心跳";
|
|
||||||
this.logKey = `${projectName}_项目控制台`;
|
|
||||||
}
|
|
||||||
async info(message, context) {
|
|
||||||
const payload = {
|
|
||||||
timestamp: (/* @__PURE__ */ new Date()).toISOString(),
|
|
||||||
level: "info",
|
|
||||||
message,
|
|
||||||
metadata: context || void 0
|
|
||||||
};
|
|
||||||
await this.client.rPush(this.logKey, JSON.stringify(payload));
|
|
||||||
}
|
|
||||||
async error(message, context) {
|
|
||||||
const payload = {
|
|
||||||
timestamp: (/* @__PURE__ */ new Date()).toISOString(),
|
|
||||||
level: "error",
|
|
||||||
message,
|
|
||||||
metadata: context || void 0
|
|
||||||
};
|
|
||||||
await this.client.rPush(this.logKey, JSON.stringify(payload));
|
|
||||||
}
|
|
||||||
startHeartbeat() {
|
|
||||||
setInterval(() => {
|
|
||||||
const payload = {
|
|
||||||
projectName: this.projectName,
|
|
||||||
apiBaseUrl: this.apiBaseUrl,
|
|
||||||
lastActiveAt: Date.now()
|
|
||||||
};
|
|
||||||
this.client.rPush(this.heartbeatKey, JSON.stringify(payload));
|
|
||||||
}, 3e3);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const buildErrorQueueKey = (projectName) => `${projectName}_error_queue`;
|
|
||||||
const enqueueError = async (client, queueKey, payload) => {
|
|
||||||
try {
|
|
||||||
await client.rPush(queueKey, JSON.stringify(payload));
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Redis enqueue error failed", { error: error?.message });
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const startErrorRetryWorker = async ({
|
|
||||||
client,
|
|
||||||
queueKey,
|
|
||||||
handler,
|
|
||||||
redisIntegration,
|
|
||||||
maxAttempts = 5
|
|
||||||
}) => {
|
|
||||||
while (true) {
|
|
||||||
const result = await client.blPop(queueKey, 0);
|
|
||||||
const raw = result?.element;
|
|
||||||
if (!raw) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let item;
|
|
||||||
try {
|
|
||||||
item = JSON.parse(raw);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Invalid error payload", { error: error?.message });
|
|
||||||
await redisIntegration.error("Invalid error payload", { module: "redis", stack: error?.message });
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const attempts = item.attempts || 0;
|
|
||||||
try {
|
|
||||||
await handler(item);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Retry handler failed", { error: error?.message, stack: error?.stack });
|
|
||||||
const nextPayload = {
|
|
||||||
...item,
|
|
||||||
attempts: attempts + 1,
|
|
||||||
lastError: error?.message,
|
|
||||||
lastAttemptAt: Date.now()
|
|
||||||
};
|
|
||||||
if (nextPayload.attempts >= maxAttempts) {
|
|
||||||
await redisIntegration.error("Retry attempts exceeded", { module: "retry", stack: JSON.stringify(nextPayload) });
|
|
||||||
} else {
|
|
||||||
await enqueueError(client, queueKey, nextPayload);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
class MetricCollector {
|
|
||||||
constructor() {
|
|
||||||
this.reset();
|
|
||||||
}
|
|
||||||
reset() {
|
|
||||||
this.metrics = {
|
|
||||||
kafka_pulled: 0,
|
|
||||||
parse_error: 0,
|
|
||||||
db_inserted: 0,
|
|
||||||
db_failed: 0,
|
|
||||||
db_insert_count: 0,
|
|
||||||
db_insert_ms_sum: 0,
|
|
||||||
batch_flush_count: 0,
|
|
||||||
batch_flush_ms_sum: 0
|
|
||||||
};
|
|
||||||
this.keyed = {};
|
|
||||||
}
|
|
||||||
increment(metric, count = 1) {
|
|
||||||
if (this.metrics.hasOwnProperty(metric)) {
|
|
||||||
this.metrics[metric] += count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
incrementKeyed(metric, key, count = 1) {
|
|
||||||
if (!key) return;
|
|
||||||
if (!this.keyed[metric]) {
|
|
||||||
this.keyed[metric] = {};
|
|
||||||
}
|
|
||||||
if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) {
|
|
||||||
this.keyed[metric][key] = 0;
|
|
||||||
}
|
|
||||||
this.keyed[metric][key] += count;
|
|
||||||
}
|
|
||||||
getAndReset() {
|
|
||||||
const current = { ...this.metrics };
|
|
||||||
const keyed = JSON.parse(JSON.stringify(this.keyed));
|
|
||||||
this.reset();
|
|
||||||
return { ...current, keyed };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const bootstrap = async () => {
|
|
||||||
logger.info("Starting application with config", {
|
|
||||||
env: process.env.NODE_ENV,
|
|
||||||
db: {
|
|
||||||
host: config.db.host,
|
|
||||||
port: config.db.port,
|
|
||||||
user: config.db.user,
|
|
||||||
database: config.db.database,
|
|
||||||
schema: config.db.schema
|
|
||||||
},
|
|
||||||
kafka: {
|
|
||||||
brokers: config.kafka.brokers,
|
|
||||||
topic: config.kafka.topic,
|
|
||||||
groupId: config.kafka.groupId
|
|
||||||
},
|
|
||||||
redis: {
|
|
||||||
host: config.redis.host,
|
|
||||||
port: config.redis.port
|
|
||||||
}
|
|
||||||
});
|
|
||||||
const metricCollector = new MetricCollector();
|
|
||||||
const redisClient = await createRedisClient(config.redis);
|
|
||||||
const redisIntegration = new RedisIntegration(
|
|
||||||
redisClient,
|
|
||||||
config.redis.projectName,
|
|
||||||
config.redis.apiBaseUrl
|
|
||||||
);
|
|
||||||
redisIntegration.startHeartbeat();
|
|
||||||
cron.schedule("* * * * *", async () => {
|
|
||||||
const metrics = metricCollector.getAndReset();
|
|
||||||
const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0";
|
|
||||||
const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0";
|
|
||||||
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
|
|
||||||
console.log(report);
|
|
||||||
logger.info(report);
|
|
||||||
try {
|
|
||||||
await redisIntegration.info("Minute Metrics", metrics);
|
|
||||||
} catch (err) {
|
|
||||||
logger.error("Failed to report metrics to Redis", { error: err?.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
|
|
||||||
const handleError = async (error, message) => {
|
|
||||||
logger.error("Kafka processing error", {
|
|
||||||
error: error?.message,
|
|
||||||
type: error?.type,
|
|
||||||
stack: error?.stack
|
|
||||||
});
|
|
||||||
try {
|
|
||||||
await redisIntegration.error("Kafka processing error", {
|
|
||||||
module: "kafka",
|
|
||||||
stack: error?.stack || error?.message
|
|
||||||
});
|
|
||||||
} catch (redisError) {
|
|
||||||
logger.error("Redis error log failed", { error: redisError?.message });
|
|
||||||
}
|
|
||||||
if (message) {
|
|
||||||
const messageValue = Buffer.isBuffer(message.value) ? message.value.toString("utf8") : message.value;
|
|
||||||
try {
|
|
||||||
await enqueueError(redisClient, errorQueueKey, {
|
|
||||||
attempts: 0,
|
|
||||||
value: messageValue,
|
|
||||||
meta: {
|
|
||||||
topic: message.topic,
|
|
||||||
partition: message.partition,
|
|
||||||
offset: message.offset,
|
|
||||||
key: message.key
|
|
||||||
},
|
|
||||||
timestamp: Date.now()
|
|
||||||
});
|
|
||||||
} catch (enqueueError2) {
|
|
||||||
logger.error("Enqueue error payload failed", { error: enqueueError2?.message });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1e3;
|
|
||||||
const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20;
|
|
||||||
const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5e3;
|
|
||||||
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
|
|
||||||
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
|
|
||||||
const commitOnAttempt = config.kafka.commitOnAttempt === true;
|
|
||||||
const batchStates = /* @__PURE__ */ new Map();
|
|
||||||
const partitionKeyFromMessage = (message) => {
|
|
||||||
if (message?.topic !== void 0 && message?.partition !== void 0) {
|
|
||||||
return `${message.topic}-${message.partition}`;
|
|
||||||
}
|
|
||||||
return "retry";
|
|
||||||
};
|
|
||||||
const dayKeyFromTsMs = (tsMs) => {
|
|
||||||
const numeric = typeof tsMs === "string" ? Number(tsMs) : tsMs;
|
|
||||||
if (!Number.isFinite(numeric)) return null;
|
|
||||||
const d = new Date(numeric);
|
|
||||||
if (Number.isNaN(d.getTime())) return null;
|
|
||||||
const yyyy = d.getFullYear();
|
|
||||||
const mm = String(d.getMonth() + 1).padStart(2, "0");
|
|
||||||
const dd = String(d.getDate()).padStart(2, "0");
|
|
||||||
return `${yyyy}${mm}${dd}`;
|
|
||||||
};
|
|
||||||
const getBatchState = (key) => {
|
|
||||||
if (!batchStates.has(key)) {
|
|
||||||
batchStates.set(key, { items: [], timer: null, flushing: null });
|
|
||||||
}
|
|
||||||
return batchStates.get(key);
|
|
||||||
};
|
|
||||||
const isDbConnectionError = (err) => {
|
|
||||||
const code = err?.code;
|
|
||||||
if (typeof code === "string") {
|
|
||||||
const networkCodes = /* @__PURE__ */ new Set([
|
|
||||||
"ECONNREFUSED",
|
|
||||||
"ECONNRESET",
|
|
||||||
"EPIPE",
|
|
||||||
"ETIMEDOUT",
|
|
||||||
"ENOTFOUND",
|
|
||||||
"EHOSTUNREACH",
|
|
||||||
"ENETUNREACH",
|
|
||||||
"57P03",
|
|
||||||
"08006",
|
|
||||||
"08001",
|
|
||||||
"08000",
|
|
||||||
"08003"
|
|
||||||
]);
|
|
||||||
if (networkCodes.has(code)) return true;
|
|
||||||
}
|
|
||||||
const message = typeof err?.message === "string" ? err.message : "";
|
|
||||||
if (!message) return false;
|
|
||||||
const lower = message.toLowerCase();
|
|
||||||
return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry");
|
|
||||||
};
|
|
||||||
const insertRowsWithRetry = async (rows) => {
|
|
||||||
const startedAt = Date.now();
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
|
||||||
metricCollector.increment("db_insert_count", 1);
|
|
||||||
metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt);
|
|
||||||
return;
|
|
||||||
} catch (err) {
|
|
||||||
if (isDbConnectionError(err)) {
|
|
||||||
logger.error("Database offline during batch insert. Retrying in 5s...", { error: err.message });
|
|
||||||
await new Promise((r) => setTimeout(r, 5e3));
|
|
||||||
while (!await dbManager.checkConnection()) {
|
|
||||||
logger.warn("Database still offline. Waiting 5s...");
|
|
||||||
await new Promise((r) => setTimeout(r, 5e3));
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const insertRowsOnce = async (rows) => {
|
|
||||||
const startedAt = Date.now();
|
|
||||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
|
||||||
metricCollector.increment("db_insert_count", 1);
|
|
||||||
metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt);
|
|
||||||
};
|
|
||||||
const resolveInsertedItems = (partitionKey, items) => {
|
|
||||||
let insertedRows = 0;
|
|
||||||
for (const p of items) {
|
|
||||||
insertedRows += p.rows.length;
|
|
||||||
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
|
|
||||||
if (dayKey) {
|
|
||||||
metricCollector.incrementKeyed("db_inserted_by_day", dayKey, p.rows.length);
|
|
||||||
}
|
|
||||||
p.item.resolve();
|
|
||||||
}
|
|
||||||
metricCollector.increment("db_inserted", insertedRows);
|
|
||||||
metricCollector.incrementKeyed("db_inserted_by_partition", partitionKey, insertedRows);
|
|
||||||
};
|
|
||||||
const handleFailedItem = async (partitionKey, p, err) => {
|
|
||||||
metricCollector.increment("db_failed");
|
|
||||||
metricCollector.incrementKeyed("db_failed_by_partition", partitionKey, 1);
|
|
||||||
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
|
|
||||||
if (dayKey) {
|
|
||||||
metricCollector.incrementKeyed("db_failed_by_day", dayKey, 1);
|
|
||||||
}
|
|
||||||
await handleError(err, p.item.message);
|
|
||||||
p.item.resolve();
|
|
||||||
};
|
|
||||||
const insertItemsDegraded = async (partitionKey, items) => {
|
|
||||||
if (items.length === 0) return;
|
|
||||||
const rows = items.flatMap((p) => p.rows);
|
|
||||||
if (commitOnAttempt) {
|
|
||||||
try {
|
|
||||||
await insertRowsOnce(rows);
|
|
||||||
resolveInsertedItems(partitionKey, items);
|
|
||||||
} catch (err) {
|
|
||||||
for (const item of items) {
|
|
||||||
await handleFailedItem(partitionKey, item, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
await insertRowsWithRetry(rows);
|
|
||||||
resolveInsertedItems(partitionKey, items);
|
|
||||||
return;
|
|
||||||
} catch (err) {
|
|
||||||
if (items.length === 1) {
|
|
||||||
try {
|
|
||||||
await insertRowsWithRetry(items[0].rows);
|
|
||||||
resolveInsertedItems(partitionKey, items);
|
|
||||||
} catch (innerErr) {
|
|
||||||
await handleFailedItem(partitionKey, items[0], innerErr);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const mid = Math.floor(items.length / 2);
|
|
||||||
await insertItemsDegraded(partitionKey, items.slice(0, mid));
|
|
||||||
await insertItemsDegraded(partitionKey, items.slice(mid));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const flushBatchForKey = async (partitionKey) => {
|
|
||||||
const state = getBatchState(partitionKey);
|
|
||||||
if (state.flushing) return state.flushing;
|
|
||||||
state.flushing = (async () => {
|
|
||||||
if (state.timer) {
|
|
||||||
clearTimeout(state.timer);
|
|
||||||
state.timer = null;
|
|
||||||
}
|
|
||||||
if (state.items.length === 0) return;
|
|
||||||
const startedAt = Date.now();
|
|
||||||
const currentBatch = state.items;
|
|
||||||
state.items = [];
|
|
||||||
const pendingDbItems = [];
|
|
||||||
const unresolvedItems = [];
|
|
||||||
try {
|
|
||||||
for (const item of currentBatch) {
|
|
||||||
try {
|
|
||||||
const rows = parseMessageToRows(item.message);
|
|
||||||
pendingDbItems.push({ item, rows });
|
|
||||||
unresolvedItems.push(item);
|
|
||||||
} catch (err) {
|
|
||||||
metricCollector.increment("parse_error");
|
|
||||||
metricCollector.incrementKeyed("parse_error_by_partition", partitionKey, 1);
|
|
||||||
logger.error("Message processing failed (Parse/Validation)", { error: err.message });
|
|
||||||
await handleError(err, item.message);
|
|
||||||
item.resolve();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pendingDbItems.length > 0) {
|
|
||||||
const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms;
|
|
||||||
const dayKey = dayKeyFromTsMs(firstTs);
|
|
||||||
if (dayKey) {
|
|
||||||
const dayStartMs = Date.now();
|
|
||||||
await insertItemsDegraded(partitionKey, pendingDbItems);
|
|
||||||
metricCollector.incrementKeyed("db_insert_ms_sum_by_day", dayKey, Date.now() - dayStartMs);
|
|
||||||
} else {
|
|
||||||
await insertItemsDegraded(partitionKey, pendingDbItems);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metricCollector.increment("batch_flush_count", 1);
|
|
||||||
metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt);
|
|
||||||
} catch (err) {
|
|
||||||
if (!commitOnAttempt && isDbConnectionError(err)) {
|
|
||||||
state.items = unresolvedItems.concat(state.items);
|
|
||||||
if (!state.timer) {
|
|
||||||
state.timer = setTimeout(() => {
|
|
||||||
state.timer = null;
|
|
||||||
flushBatchForKey(partitionKey);
|
|
||||||
}, 5e3);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.error("Batch flush failed (non-network). Marking as consumed", {
|
|
||||||
error: err?.message,
|
|
||||||
partitionKey,
|
|
||||||
batchSize: currentBatch.length
|
|
||||||
});
|
|
||||||
for (const item of unresolvedItems) {
|
|
||||||
try {
|
|
||||||
await handleError(err, item.message);
|
|
||||||
} catch {
|
|
||||||
}
|
|
||||||
item.resolve();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})().finally(() => {
|
|
||||||
state.flushing = null;
|
|
||||||
if (state.items.length > 0) {
|
|
||||||
if (state.items.length >= BATCH_SIZE) {
|
|
||||||
flushBatchForKey(partitionKey);
|
|
||||||
} else if (!state.timer) {
|
|
||||||
state.timer = setTimeout(() => {
|
|
||||||
state.timer = null;
|
|
||||||
flushBatchForKey(partitionKey);
|
|
||||||
}, BATCH_TIMEOUT_MS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return state.flushing;
|
|
||||||
};
|
|
||||||
const handleMessage = (message) => {
|
|
||||||
if (message.topic) {
|
|
||||||
metricCollector.increment("kafka_pulled");
|
|
||||||
metricCollector.incrementKeyed("kafka_pulled_by_partition", `${message.topic}-${message.partition}`, 1);
|
|
||||||
}
|
|
||||||
const partitionKey = partitionKeyFromMessage(message);
|
|
||||||
const state = getBatchState(partitionKey);
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
state.items.push({ message, resolve, reject });
|
|
||||||
if (state.items.length >= BATCH_SIZE) {
|
|
||||||
flushBatchForKey(partitionKey);
|
|
||||||
} else if (!state.timer) {
|
|
||||||
state.timer = setTimeout(() => {
|
|
||||||
state.timer = null;
|
|
||||||
flushBatchForKey(partitionKey);
|
|
||||||
}, BATCH_TIMEOUT_MS);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
const consumers = createKafkaConsumers({
|
|
||||||
kafkaConfig: config.kafka,
|
|
||||||
onMessage: handleMessage,
|
|
||||||
onError: handleError
|
|
||||||
});
|
|
||||||
startErrorRetryWorker({
|
|
||||||
client: redisClient,
|
|
||||||
queueKey: errorQueueKey,
|
|
||||||
redisIntegration,
|
|
||||||
handler: async (item) => {
|
|
||||||
if (!item?.value) {
|
|
||||||
throw new Error("Missing value in retry payload");
|
|
||||||
}
|
|
||||||
await handleMessage({ value: item.value });
|
|
||||||
}
|
|
||||||
}).catch((err) => {
|
|
||||||
logger.error("Retry worker failed", { error: err?.message });
|
|
||||||
});
|
|
||||||
const shutdown = async (signal) => {
|
|
||||||
logger.info(`Received ${signal}, shutting down...`);
|
|
||||||
try {
|
|
||||||
if (consumers && consumers.length > 0) {
|
|
||||||
await Promise.all(consumers.map((c) => new Promise((resolve) => c.close(true, resolve))));
|
|
||||||
logger.info("Kafka consumer closed", { count: consumers.length });
|
|
||||||
}
|
|
||||||
await redisClient.quit();
|
|
||||||
logger.info("Redis client closed");
|
|
||||||
await dbManager.close();
|
|
||||||
logger.info("Database connection closed");
|
|
||||||
process.exit(0);
|
|
||||||
} catch (err) {
|
|
||||||
logger.error("Error during shutdown", { error: err?.message });
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
|
||||||
process.on("SIGINT", () => shutdown("SIGINT"));
|
|
||||||
};
|
|
||||||
bootstrap().catch((error) => {
|
|
||||||
logger.error("Service bootstrap failed", { error: error?.message });
|
|
||||||
process.exit(1);
|
|
||||||
});
|
|
||||||
Reference in New Issue
Block a user