refactor: 移除运行时数据库初始化与分区维护

- 删除了服务启动阶段的数据库初始化逻辑,包括创建数据库、表和分区的相关代码。
- 移除了定时分区维护任务,确保服务职责更清晰。
- 更新了数据库分区策略,明确分区由外部脚本管理,服务不再自动创建缺失分区。
- 修改了相关文档,确保数据库结构与分区维护的责任转移到 `SQL_Script/` 目录下的外部脚本。
- 更新了需求和场景,确保符合新的设计规范。
This commit is contained in:
2026-03-04 11:52:12 +08:00
parent 3d80ad8710
commit 33c9bf0e07
20 changed files with 257 additions and 663 deletions

View File

@@ -1,9 +1,6 @@
import cron from "node-cron";
import dotenv from "dotenv";
import pg from "pg";
import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import kafka from "kafka-node";
import { randomUUID } from "crypto";
import { z } from "zod";
@@ -173,262 +170,6 @@ class DatabaseManager {
}
}
const dbManager = new DatabaseManager(config.db);
class PartitionManager {
isCurrentOrFutureDate(date) {
const normalizedDate = new Date(date);
normalizedDate.setHours(0, 0, 0, 0);
const today = /* @__PURE__ */ new Date();
today.setHours(0, 0, 0, 0);
return normalizedDate.getTime() >= today.getTime();
}
escapeSqlLiteral(value) {
return String(value).replace(/'/g, "''");
}
buildForceHotTablespaceSql(schema, partitionName, hotTablespace = "ts_hot") {
const schemaLiteral = this.escapeSqlLiteral(schema);
const partitionLiteral = this.escapeSqlLiteral(partitionName);
const hotLiteral = this.escapeSqlLiteral(hotTablespace);
return `
DO $$
DECLARE
v_schema text := '${schemaLiteral}';
v_partition text := '${partitionLiteral}';
v_hot text := '${hotLiteral}';
v_part_oid oid;
v_toast_oid oid;
r record;
BEGIN
SELECT c.oid INTO v_part_oid
FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace
WHERE n.nspname=v_schema AND c.relname=v_partition AND c.relkind='r';
IF v_part_oid IS NULL THEN
RAISE EXCEPTION 'partition %.% not found', v_schema, v_partition;
END IF;
EXECUTE format('ALTER TABLE %I.%I SET TABLESPACE %I', v_schema, v_partition, v_hot);
FOR r IN
SELECT idxn.nspname AS index_schema, i.relname AS index_name
FROM pg_index x
JOIN pg_class t ON t.oid=x.indrelid
JOIN pg_namespace nt ON nt.oid=t.relnamespace
JOIN pg_class i ON i.oid=x.indexrelid
JOIN pg_namespace idxn ON idxn.oid=i.relnamespace
LEFT JOIN pg_tablespace ts ON ts.oid=i.reltablespace
WHERE nt.nspname=v_schema
AND t.relname=v_partition
AND COALESCE(ts.spcname,'pg_default')<>v_hot
LOOP
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, v_hot);
END LOOP;
SELECT reltoastrelid INTO v_toast_oid FROM pg_class WHERE oid=v_part_oid;
IF v_toast_oid IS NOT NULL AND v_toast_oid<>0 THEN
EXECUTE format('ALTER TABLE %s SET TABLESPACE %I', v_toast_oid::regclass, v_hot);
FOR r IN
SELECT idxn.nspname AS index_schema, i.relname AS index_name
FROM pg_index x
JOIN pg_class i ON i.oid=x.indexrelid
JOIN pg_namespace idxn ON idxn.oid=i.relnamespace
LEFT JOIN pg_tablespace ts ON ts.oid=i.reltablespace
WHERE x.indrelid=v_toast_oid
AND COALESCE(ts.spcname,'pg_default')<>v_hot
LOOP
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, v_hot);
END LOOP;
END IF;
EXECUTE format('ANALYZE %I.%I', v_schema, v_partition);
END $$;
`;
}
/**
* Calculate the start and end timestamps (milliseconds) for a given date.
* @param {Date} date - The date to calculate for.
* @returns {Object} { startMs, endMs, partitionSuffix }
*/
getPartitionInfo(date) {
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, "0");
const dd = String(date.getDate()).padStart(2, "0");
const partitionSuffix = `${yyyy}${mm}${dd}`;
const start = new Date(date);
start.setHours(0, 0, 0, 0);
const startMs = start.getTime();
const end = new Date(date);
end.setDate(end.getDate() + 1);
end.setHours(0, 0, 0, 0);
const endMs = end.getTime();
return { startMs, endMs, partitionSuffix };
}
/**
* Ensure partitions exist for the past M days and next N days.
* @param {number} daysAhead - Number of days to pre-create.
* @param {number} daysBack - Number of days to look back.
*/
async ensurePartitions(daysAhead = 30, daysBack = 15) {
const client = await dbManager.pool.connect();
try {
logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
const now = /* @__PURE__ */ new Date();
for (let i = -daysBack; i < daysAhead; i++) {
const targetDate = new Date(now);
targetDate.setDate(now.getDate() + i);
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
const schema = config.db.schema;
const table = config.db.table;
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const checkSql = `
SELECT to_regclass($1) as exists;
`;
const checkRes = await client.query(checkSql, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
const shouldUseHotTablespace = this.isCurrentOrFutureDate(targetDate);
const tablespaceClause = shouldUseHotTablespace ? " TABLESPACE ts_hot" : "";
const partitionTableName = `${table}_${partitionSuffix}`;
const createSql = `
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause};
`;
await client.query(createSql);
if (shouldUseHotTablespace) {
await client.query(this.buildForceHotTablespaceSql(schema, partitionTableName));
}
}
}
logger.info("Partition check completed.");
} catch (err) {
logger.error("Error ensuring partitions:", err);
throw err;
} finally {
client.release();
}
}
async ensurePartitionsForTimestamps(tsMsList) {
if (!Array.isArray(tsMsList) || tsMsList.length === 0) return;
const uniqueSuffixes = /* @__PURE__ */ new Set();
for (const ts of tsMsList) {
const numericTs = typeof ts === "string" ? Number(ts) : ts;
if (!Number.isFinite(numericTs)) continue;
const date = new Date(numericTs);
if (Number.isNaN(date.getTime())) continue;
const { partitionSuffix } = this.getPartitionInfo(date);
uniqueSuffixes.add(partitionSuffix);
if (uniqueSuffixes.size >= 400) break;
}
if (uniqueSuffixes.size === 0) return;
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
for (const partitionSuffix of uniqueSuffixes) {
const yyyy = Number(partitionSuffix.slice(0, 4));
const mm = Number(partitionSuffix.slice(4, 6));
const dd = Number(partitionSuffix.slice(6, 8));
if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue;
const targetDate = new Date(yyyy, mm - 1, dd);
if (Number.isNaN(targetDate.getTime())) continue;
const { startMs, endMs } = this.getPartitionInfo(targetDate);
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
const shouldUseHotTablespace = this.isCurrentOrFutureDate(targetDate);
const tablespaceClause = shouldUseHotTablespace ? " TABLESPACE ts_hot" : "";
const partitionTableName = `${table}_${partitionSuffix}`;
await client.query(`
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause};
`);
if (shouldUseHotTablespace) {
await client.query(this.buildForceHotTablespaceSql(schema, partitionTableName));
}
}
}
} finally {
client.release();
}
}
}
const partitionManager = new PartitionManager();
const __filename$1 = fileURLToPath(import.meta.url);
const __dirname$1 = path.dirname(__filename$1);
class DatabaseInitializer {
async initialize() {
logger.info("Starting database initialization check...");
await this.ensureDatabaseExists();
await this.ensureSchemaAndTable();
await partitionManager.ensurePartitions(30);
console.log("Database initialization completed successfully.");
logger.info("Database initialization completed successfully.");
}
async ensureDatabaseExists() {
const { host, port, user, password, database, ssl } = config.db;
console.log(`Checking if database '${database}' exists at ${host}:${port}...`);
const client = new pg.Client({
host,
port,
user,
password,
database: "postgres",
ssl: ssl ? { rejectUnauthorized: false } : false
});
try {
await client.connect();
const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`,
[database]
);
if (checkRes.rowCount === 0) {
logger.info(`Database '${database}' does not exist. Creating...`);
await client.query(`CREATE DATABASE "${database}"`);
console.log(`Database '${database}' created.`);
logger.info(`Database '${database}' created.`);
} else {
console.log(`Database '${database}' already exists.`);
logger.info(`Database '${database}' already exists.`);
}
} catch (err) {
logger.error("Error ensuring database exists:", err);
throw err;
} finally {
await client.end();
}
}
async ensureSchemaAndTable() {
const client = await dbManager.pool.connect();
try {
const sqlPathCandidates = [
path.resolve(process.cwd(), "scripts/init_db.sql"),
path.resolve(__dirname$1, "../scripts/init_db.sql"),
path.resolve(__dirname$1, "../../scripts/init_db.sql")
];
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
if (!sqlPath) {
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(" | ")}`);
}
const sql = fs.readFileSync(sqlPath, "utf8");
console.log(`Executing init_db.sql from ${sqlPath}...`);
logger.info("Executing init_db.sql...");
await client.query(sql);
console.log("Schema and parent table initialized.");
logger.info("Schema and parent table initialized.");
} catch (err) {
logger.error("Error initializing schema and table:", err);
throw err;
} finally {
client.release();
}
}
}
const dbInitializer = new DatabaseInitializer();
class OffsetTracker {
constructor() {
this.partitions = /* @__PURE__ */ new Map();
@@ -874,16 +615,7 @@ const bootstrap = async () => {
port: config.redis.port
}
});
await dbInitializer.initialize();
const metricCollector = new MetricCollector();
cron.schedule("0 0 * * *", async () => {
logger.info("Running scheduled partition maintenance...");
try {
await partitionManager.ensurePartitions(30);
} catch (err) {
logger.error("Scheduled partition maintenance failed", err);
}
});
const redisClient = await createRedisClient(config.redis);
const redisIntegration = new RedisIntegration(
redisClient,
@@ -991,10 +723,8 @@ const bootstrap = async () => {
const lower = message.toLowerCase();
return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry");
};
const isMissingPartitionError = (err) => err?.code === "23514" || typeof err?.message === "string" && err.message.includes("no partition of relation");
const insertRowsWithRetry = async (rows) => {
const startedAt = Date.now();
let attemptedPartitionFix = false;
while (true) {
try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
@@ -1011,24 +741,6 @@ const bootstrap = async () => {
}
continue;
}
if (isMissingPartitionError(err) && !attemptedPartitionFix) {
attemptedPartitionFix = true;
try {
await partitionManager.ensurePartitionsForTimestamps(rows.map((r) => r.ts_ms));
} catch (partitionErr) {
if (isDbConnectionError(partitionErr)) {
logger.error("Database offline during partition ensure. Retrying in 5s...", { error: partitionErr.message });
await new Promise((r) => setTimeout(r, 5e3));
while (!await dbManager.checkConnection()) {
logger.warn("Database still offline. Waiting 5s...");
await new Promise((r) => setTimeout(r, 5e3));
}
continue;
}
throw partitionErr;
}
continue;
}
throw err;
}
}