From 33c9bf0e073d1c4b726fde614cf73ba73216d103 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Wed, 4 Mar 2026 11:52:12 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=A7=BB=E9=99=A4=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E6=97=B6=E6=95=B0=E6=8D=AE=E5=BA=93=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E4=B8=8E=E5=88=86=E5=8C=BA=E7=BB=B4=E6=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 删除了服务启动阶段的数据库初始化逻辑,包括创建数据库、表和分区的相关代码。 - 移除了定时分区维护任务,确保服务职责更清晰。 - 更新了数据库分区策略,明确分区由外部脚本管理,服务不再自动创建缺失分区。 - 修改了相关文档,确保数据库结构与分区维护的责任转移到 `SQL_Script/` 目录下的外部脚本。 - 更新了需求和场景,确保符合新的设计规范。 --- SQL_Script/create_database.sql | 6 + SQL_Script/create_partition_for_day.sql | 12 + SQL_Script/create_schema_and_parent_table.sql | 27 ++ SQL_Script/generate_init_sql.js | 26 ++ SQL_Script/generate_partition_range_sql.js | 64 ++++ SQL_Script/generate_partition_sql.js | 49 +++ bls-onoffline-backend/README.md | 8 +- bls-onoffline-backend/dist/index.js | 288 ------------------ .../proposal.md | 0 .../specs/onoffline/spec.md | 0 .../tasks.md | 0 .../proposal.md | 14 + .../specs/onoffline/spec.md | 32 ++ .../tasks.md | 12 + .../openspec/specs/onoffline/spec.md | 11 +- bls-onoffline-backend/scripts/init_db.sql | 23 -- .../scripts/verify_partitions.js | 61 ---- bls-onoffline-backend/src/db/initializer.js | 100 ------ .../src/db/partitionManager.js | 136 --------- bls-onoffline-backend/src/index.js | 51 ---- 20 files changed, 257 insertions(+), 663 deletions(-) create mode 100644 SQL_Script/create_database.sql create mode 100644 SQL_Script/create_partition_for_day.sql create mode 100644 SQL_Script/create_schema_and_parent_table.sql create mode 100644 SQL_Script/generate_init_sql.js create mode 100644 SQL_Script/generate_partition_range_sql.js create mode 100644 SQL_Script/generate_partition_sql.js rename bls-onoffline-backend/openspec/changes/{2026-03-03-refactor-partition-indexes => archive/2026-03-04-2026-03-03-refactor-partition-indexes}/proposal.md (100%) rename bls-onoffline-backend/openspec/changes/{2026-03-03-refactor-partition-indexes => archive/2026-03-04-2026-03-03-refactor-partition-indexes}/specs/onoffline/spec.md (100%) rename bls-onoffline-backend/openspec/changes/{2026-03-03-refactor-partition-indexes => archive/2026-03-04-2026-03-03-refactor-partition-indexes}/tasks.md (100%) create mode 100644 bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/proposal.md create mode 100644 bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/specs/onoffline/spec.md create mode 100644 bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/tasks.md delete mode 100644 bls-onoffline-backend/scripts/init_db.sql delete mode 100644 bls-onoffline-backend/scripts/verify_partitions.js delete mode 100644 bls-onoffline-backend/src/db/initializer.js delete mode 100644 bls-onoffline-backend/src/db/partitionManager.js diff --git a/SQL_Script/create_database.sql b/SQL_Script/create_database.sql new file mode 100644 index 0000000..c87c519 --- /dev/null +++ b/SQL_Script/create_database.sql @@ -0,0 +1,6 @@ +-- Replace {{DATABASE_NAME}} before execution +-- Requires psql (uses \gexec) +SELECT format('CREATE DATABASE %I', '{{DATABASE_NAME}}') +WHERE NOT EXISTS ( + SELECT 1 FROM pg_database WHERE datname = '{{DATABASE_NAME}}' +)\gexec; \ No newline at end of file diff --git a/SQL_Script/create_partition_for_day.sql b/SQL_Script/create_partition_for_day.sql new file mode 100644 index 0000000..9de567a --- /dev/null +++ b/SQL_Script/create_partition_for_day.sql @@ -0,0 +1,12 @@ +-- Replace placeholders before execution: +-- {{SCHEMA_NAME}} default: onoffline +-- {{TABLE_NAME}} default: onoffline_record +-- {{PARTITION_SUFFIX}} format: YYYYMMDD (example: 20260304) +-- {{START_TS_MS}} unix ms at 00:00:00.000 +-- {{END_TS_MS}} unix ms at next day 00:00:00.000 +-- {{TABLESPACE_CLAUSE}} either empty string or: TABLESPACE ts_hot + +CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}}_{{PARTITION_SUFFIX}} +PARTITION OF {{SCHEMA_NAME}}.{{TABLE_NAME}} +FOR VALUES FROM ({{START_TS_MS}}) TO ({{END_TS_MS}}) +{{TABLESPACE_CLAUSE}}; \ No newline at end of file diff --git a/SQL_Script/create_schema_and_parent_table.sql b/SQL_Script/create_schema_and_parent_table.sql new file mode 100644 index 0000000..208bd98 --- /dev/null +++ b/SQL_Script/create_schema_and_parent_table.sql @@ -0,0 +1,27 @@ +-- Replace placeholders before execution: +-- {{SCHEMA_NAME}} default: onoffline +-- {{TABLE_NAME}} default: onoffline_record + +CREATE SCHEMA IF NOT EXISTS {{SCHEMA_NAME}}; + +CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}} ( + guid TEXT NOT NULL, + ts_ms BIGINT NOT NULL, + write_ts_ms BIGINT NOT NULL, + hotel_id SMALLINT, + mac TEXT, + device_id TEXT, + room_id TEXT, + ip TEXT, + current_status TEXT, + launcher_version TEXT, + reboot_reason TEXT, + PRIMARY KEY (ts_ms, guid) +) PARTITION BY RANGE (ts_ms); + +CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_guid ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (guid); +CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_device_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (device_id); +CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_hotel_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (hotel_id); +CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_mac ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (mac); +CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_room_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (room_id); +CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_status ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (current_status); \ No newline at end of file diff --git a/SQL_Script/generate_init_sql.js b/SQL_Script/generate_init_sql.js new file mode 100644 index 0000000..40e7511 --- /dev/null +++ b/SQL_Script/generate_init_sql.js @@ -0,0 +1,26 @@ +#!/usr/bin/env node + +import fs from 'fs'; +import path from 'path'; + +const args = Object.fromEntries( + process.argv.slice(2).map((arg) => { + const [key, value] = arg.split('='); + return [key.replace(/^--/, ''), value]; + }) +); + +const database = args.database || 'log_platform'; +const schema = args.schema || 'onoffline'; +const table = args.table || 'onoffline_record'; +const output = args.output; + +const sql = `SELECT format('CREATE DATABASE %I', '${database}')\nWHERE NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = '${database}')\\gexec;\n\nCREATE SCHEMA IF NOT EXISTS ${schema};\n\nCREATE TABLE IF NOT EXISTS ${schema}.${table} (\n guid TEXT NOT NULL,\n ts_ms BIGINT NOT NULL,\n write_ts_ms BIGINT NOT NULL,\n hotel_id SMALLINT,\n mac TEXT,\n device_id TEXT,\n room_id TEXT,\n ip TEXT,\n current_status TEXT,\n launcher_version TEXT,\n reboot_reason TEXT,\n PRIMARY KEY (ts_ms, guid)\n) PARTITION BY RANGE (ts_ms);\n\nCREATE INDEX IF NOT EXISTS idx_${table}_guid ON ${schema}.${table} (guid);\nCREATE INDEX IF NOT EXISTS idx_${table}_device_id ON ${schema}.${table} (device_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_hotel_id ON ${schema}.${table} (hotel_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_mac ON ${schema}.${table} (mac);\nCREATE INDEX IF NOT EXISTS idx_${table}_room_id ON ${schema}.${table} (room_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_status ON ${schema}.${table} (current_status);\n`; + +if (output) { + const outputPath = path.resolve(output); + fs.writeFileSync(outputPath, sql, 'utf8'); + console.log(`Init SQL written to ${outputPath}`); +} else { + process.stdout.write(sql); +} diff --git a/SQL_Script/generate_partition_range_sql.js b/SQL_Script/generate_partition_range_sql.js new file mode 100644 index 0000000..4604b8c --- /dev/null +++ b/SQL_Script/generate_partition_range_sql.js @@ -0,0 +1,64 @@ +#!/usr/bin/env node + +import fs from 'fs'; +import path from 'path'; + +const args = Object.fromEntries( + process.argv.slice(2).map((arg) => { + const [key, value] = arg.split('='); + return [key.replace(/^--/, ''), value]; + }) +); + +const schema = args.schema || 'onoffline'; +const table = args.table || 'onoffline_record'; +const start = args.start; +const days = Number(args.days || '30'); +const tablespace = args.tablespace || ''; +const output = args.output; + +if (!start) { + console.error('Missing required argument: --start=YYYY-MM-DD'); + process.exit(1); +} +if (!Number.isFinite(days) || days <= 0) { + console.error('Invalid --days value. It must be a positive integer.'); + process.exit(1); +} + +const base = new Date(`${start}T00:00:00`); +if (Number.isNaN(base.getTime())) { + console.error('Invalid start date. Use format YYYY-MM-DD'); + process.exit(1); +} + +const statements = []; +for (let i = 0; i < days; i += 1) { + const date = new Date(base); + date.setDate(base.getDate() + i); + + const startMs = date.getTime(); + const endDate = new Date(date); + endDate.setDate(endDate.getDate() + 1); + const endMs = endDate.getTime(); + + const yyyy = date.getFullYear(); + const mm = String(date.getMonth() + 1).padStart(2, '0'); + const dd = String(date.getDate()).padStart(2, '0'); + const suffix = `${yyyy}${mm}${dd}`; + + const tablespaceClause = tablespace ? ` TABLESPACE ${tablespace}` : ''; + statements.push( + `CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause};` + ); +} + +const sql = `${statements.join('\n\n')}\n`; + +if (output) { + const outputPath = path.resolve(output); + fs.writeFileSync(outputPath, sql, 'utf8'); + console.log(`Partition range SQL written to ${outputPath}`); +} else { + process.stdout.write(sql); +} diff --git a/SQL_Script/generate_partition_sql.js b/SQL_Script/generate_partition_sql.js new file mode 100644 index 0000000..551ef31 --- /dev/null +++ b/SQL_Script/generate_partition_sql.js @@ -0,0 +1,49 @@ +#!/usr/bin/env node + +import fs from 'fs'; +import path from 'path'; + +const args = Object.fromEntries( + process.argv.slice(2).map((arg) => { + const [key, value] = arg.split('='); + return [key.replace(/^--/, ''), value]; + }) +); + +const schema = args.schema || 'onoffline'; +const table = args.table || 'onoffline_record'; +const dateInput = args.date; +const tablespace = args.tablespace || ''; +const output = args.output; + +if (!dateInput) { + console.error('Missing required argument: --date=YYYY-MM-DD'); + process.exit(1); +} + +const date = new Date(`${dateInput}T00:00:00`); +if (Number.isNaN(date.getTime())) { + console.error('Invalid date. Use format YYYY-MM-DD'); + process.exit(1); +} + +const startMs = date.getTime(); +const endDate = new Date(date); +endDate.setDate(endDate.getDate() + 1); +const endMs = endDate.getTime(); + +const yyyy = date.getFullYear(); +const mm = String(date.getMonth() + 1).padStart(2, '0'); +const dd = String(date.getDate()).padStart(2, '0'); +const suffix = `${yyyy}${mm}${dd}`; +const tablespaceClause = tablespace ? `TABLESPACE ${tablespace}` : ''; + +const sql = `CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})\n${tablespaceClause};\n`; + +if (output) { + const outputPath = path.resolve(output); + fs.writeFileSync(outputPath, sql, 'utf8'); + console.log(`Partition SQL written to ${outputPath}`); +} else { + process.stdout.write(sql); +} diff --git a/bls-onoffline-backend/README.md b/bls-onoffline-backend/README.md index 68782fd..4180ac8 100644 --- a/bls-onoffline-backend/README.md +++ b/bls-onoffline-backend/README.md @@ -18,7 +18,13 @@ bls-onoffline-backend - 复制 .env.example 为 .env 并按实际环境配置 数据库初始化 -- 启动时自动执行 scripts/init_db.sql 并预创建未来 30 天分区 +- 运行服务前请先通过根目录 SQL_Script 下脚本完成建库与分区维护 +- `../SQL_Script/create_database.sql`:建库(psql) +- `../SQL_Script/create_schema_and_parent_table.sql`:建 schema 与主分区表 +- `../SQL_Script/create_partition_for_day.sql`:按日建分区模板 +- `../SQL_Script/generate_init_sql.js`:生成建库+建表 SQL +- `../SQL_Script/generate_partition_sql.js`:生成单日分区 SQL +- `../SQL_Script/generate_partition_range_sql.js`:生成批量分区 SQL 规范说明 - 规格文件位于 spec/onoffline-spec.md diff --git a/bls-onoffline-backend/dist/index.js b/bls-onoffline-backend/dist/index.js index 43d3505..5296636 100644 --- a/bls-onoffline-backend/dist/index.js +++ b/bls-onoffline-backend/dist/index.js @@ -1,9 +1,6 @@ import cron from "node-cron"; import dotenv from "dotenv"; import pg from "pg"; -import fs from "fs"; -import path from "path"; -import { fileURLToPath } from "url"; import kafka from "kafka-node"; import { randomUUID } from "crypto"; import { z } from "zod"; @@ -173,262 +170,6 @@ class DatabaseManager { } } const dbManager = new DatabaseManager(config.db); -class PartitionManager { - isCurrentOrFutureDate(date) { - const normalizedDate = new Date(date); - normalizedDate.setHours(0, 0, 0, 0); - const today = /* @__PURE__ */ new Date(); - today.setHours(0, 0, 0, 0); - return normalizedDate.getTime() >= today.getTime(); - } - escapeSqlLiteral(value) { - return String(value).replace(/'/g, "''"); - } - buildForceHotTablespaceSql(schema, partitionName, hotTablespace = "ts_hot") { - const schemaLiteral = this.escapeSqlLiteral(schema); - const partitionLiteral = this.escapeSqlLiteral(partitionName); - const hotLiteral = this.escapeSqlLiteral(hotTablespace); - return ` -DO $$ -DECLARE - v_schema text := '${schemaLiteral}'; - v_partition text := '${partitionLiteral}'; - v_hot text := '${hotLiteral}'; - v_part_oid oid; - v_toast_oid oid; - r record; -BEGIN - SELECT c.oid INTO v_part_oid - FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace - WHERE n.nspname=v_schema AND c.relname=v_partition AND c.relkind='r'; - - IF v_part_oid IS NULL THEN - RAISE EXCEPTION 'partition %.% not found', v_schema, v_partition; - END IF; - - EXECUTE format('ALTER TABLE %I.%I SET TABLESPACE %I', v_schema, v_partition, v_hot); - - FOR r IN - SELECT idxn.nspname AS index_schema, i.relname AS index_name - FROM pg_index x - JOIN pg_class t ON t.oid=x.indrelid - JOIN pg_namespace nt ON nt.oid=t.relnamespace - JOIN pg_class i ON i.oid=x.indexrelid - JOIN pg_namespace idxn ON idxn.oid=i.relnamespace - LEFT JOIN pg_tablespace ts ON ts.oid=i.reltablespace - WHERE nt.nspname=v_schema - AND t.relname=v_partition - AND COALESCE(ts.spcname,'pg_default')<>v_hot - LOOP - EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, v_hot); - END LOOP; - - SELECT reltoastrelid INTO v_toast_oid FROM pg_class WHERE oid=v_part_oid; - IF v_toast_oid IS NOT NULL AND v_toast_oid<>0 THEN - EXECUTE format('ALTER TABLE %s SET TABLESPACE %I', v_toast_oid::regclass, v_hot); - - FOR r IN - SELECT idxn.nspname AS index_schema, i.relname AS index_name - FROM pg_index x - JOIN pg_class i ON i.oid=x.indexrelid - JOIN pg_namespace idxn ON idxn.oid=i.relnamespace - LEFT JOIN pg_tablespace ts ON ts.oid=i.reltablespace - WHERE x.indrelid=v_toast_oid - AND COALESCE(ts.spcname,'pg_default')<>v_hot - LOOP - EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, v_hot); - END LOOP; - END IF; - - EXECUTE format('ANALYZE %I.%I', v_schema, v_partition); -END $$; -`; - } - /** - * Calculate the start and end timestamps (milliseconds) for a given date. - * @param {Date} date - The date to calculate for. - * @returns {Object} { startMs, endMs, partitionSuffix } - */ - getPartitionInfo(date) { - const yyyy = date.getFullYear(); - const mm = String(date.getMonth() + 1).padStart(2, "0"); - const dd = String(date.getDate()).padStart(2, "0"); - const partitionSuffix = `${yyyy}${mm}${dd}`; - const start = new Date(date); - start.setHours(0, 0, 0, 0); - const startMs = start.getTime(); - const end = new Date(date); - end.setDate(end.getDate() + 1); - end.setHours(0, 0, 0, 0); - const endMs = end.getTime(); - return { startMs, endMs, partitionSuffix }; - } - /** - * Ensure partitions exist for the past M days and next N days. - * @param {number} daysAhead - Number of days to pre-create. - * @param {number} daysBack - Number of days to look back. - */ - async ensurePartitions(daysAhead = 30, daysBack = 15) { - const client = await dbManager.pool.connect(); - try { - logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); - console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); - const now = /* @__PURE__ */ new Date(); - for (let i = -daysBack; i < daysAhead; i++) { - const targetDate = new Date(now); - targetDate.setDate(now.getDate() + i); - const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate); - const schema = config.db.schema; - const table = config.db.table; - const partitionName = `${schema}.${table}_${partitionSuffix}`; - const checkSql = ` - SELECT to_regclass($1) as exists; - `; - const checkRes = await client.query(checkSql, [partitionName]); - if (!checkRes.rows[0].exists) { - logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); - console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); - const shouldUseHotTablespace = this.isCurrentOrFutureDate(targetDate); - const tablespaceClause = shouldUseHotTablespace ? " TABLESPACE ts_hot" : ""; - const partitionTableName = `${table}_${partitionSuffix}`; - const createSql = ` - CREATE TABLE IF NOT EXISTS ${partitionName} - PARTITION OF ${schema}.${table} - FOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause}; - `; - await client.query(createSql); - if (shouldUseHotTablespace) { - await client.query(this.buildForceHotTablespaceSql(schema, partitionTableName)); - } - } - } - logger.info("Partition check completed."); - } catch (err) { - logger.error("Error ensuring partitions:", err); - throw err; - } finally { - client.release(); - } - } - async ensurePartitionsForTimestamps(tsMsList) { - if (!Array.isArray(tsMsList) || tsMsList.length === 0) return; - const uniqueSuffixes = /* @__PURE__ */ new Set(); - for (const ts of tsMsList) { - const numericTs = typeof ts === "string" ? Number(ts) : ts; - if (!Number.isFinite(numericTs)) continue; - const date = new Date(numericTs); - if (Number.isNaN(date.getTime())) continue; - const { partitionSuffix } = this.getPartitionInfo(date); - uniqueSuffixes.add(partitionSuffix); - if (uniqueSuffixes.size >= 400) break; - } - if (uniqueSuffixes.size === 0) return; - const client = await dbManager.pool.connect(); - try { - const schema = config.db.schema; - const table = config.db.table; - for (const partitionSuffix of uniqueSuffixes) { - const yyyy = Number(partitionSuffix.slice(0, 4)); - const mm = Number(partitionSuffix.slice(4, 6)); - const dd = Number(partitionSuffix.slice(6, 8)); - if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue; - const targetDate = new Date(yyyy, mm - 1, dd); - if (Number.isNaN(targetDate.getTime())) continue; - const { startMs, endMs } = this.getPartitionInfo(targetDate); - const partitionName = `${schema}.${table}_${partitionSuffix}`; - const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]); - if (!checkRes.rows[0].exists) { - logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); - const shouldUseHotTablespace = this.isCurrentOrFutureDate(targetDate); - const tablespaceClause = shouldUseHotTablespace ? " TABLESPACE ts_hot" : ""; - const partitionTableName = `${table}_${partitionSuffix}`; - await client.query(` - CREATE TABLE IF NOT EXISTS ${partitionName} - PARTITION OF ${schema}.${table} - FOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause}; - `); - if (shouldUseHotTablespace) { - await client.query(this.buildForceHotTablespaceSql(schema, partitionTableName)); - } - } - } - } finally { - client.release(); - } - } -} -const partitionManager = new PartitionManager(); -const __filename$1 = fileURLToPath(import.meta.url); -const __dirname$1 = path.dirname(__filename$1); -class DatabaseInitializer { - async initialize() { - logger.info("Starting database initialization check..."); - await this.ensureDatabaseExists(); - await this.ensureSchemaAndTable(); - await partitionManager.ensurePartitions(30); - console.log("Database initialization completed successfully."); - logger.info("Database initialization completed successfully."); - } - async ensureDatabaseExists() { - const { host, port, user, password, database, ssl } = config.db; - console.log(`Checking if database '${database}' exists at ${host}:${port}...`); - const client = new pg.Client({ - host, - port, - user, - password, - database: "postgres", - ssl: ssl ? { rejectUnauthorized: false } : false - }); - try { - await client.connect(); - const checkRes = await client.query( - `SELECT 1 FROM pg_database WHERE datname = $1`, - [database] - ); - if (checkRes.rowCount === 0) { - logger.info(`Database '${database}' does not exist. Creating...`); - await client.query(`CREATE DATABASE "${database}"`); - console.log(`Database '${database}' created.`); - logger.info(`Database '${database}' created.`); - } else { - console.log(`Database '${database}' already exists.`); - logger.info(`Database '${database}' already exists.`); - } - } catch (err) { - logger.error("Error ensuring database exists:", err); - throw err; - } finally { - await client.end(); - } - } - async ensureSchemaAndTable() { - const client = await dbManager.pool.connect(); - try { - const sqlPathCandidates = [ - path.resolve(process.cwd(), "scripts/init_db.sql"), - path.resolve(__dirname$1, "../scripts/init_db.sql"), - path.resolve(__dirname$1, "../../scripts/init_db.sql") - ]; - const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate)); - if (!sqlPath) { - throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(" | ")}`); - } - const sql = fs.readFileSync(sqlPath, "utf8"); - console.log(`Executing init_db.sql from ${sqlPath}...`); - logger.info("Executing init_db.sql..."); - await client.query(sql); - console.log("Schema and parent table initialized."); - logger.info("Schema and parent table initialized."); - } catch (err) { - logger.error("Error initializing schema and table:", err); - throw err; - } finally { - client.release(); - } - } -} -const dbInitializer = new DatabaseInitializer(); class OffsetTracker { constructor() { this.partitions = /* @__PURE__ */ new Map(); @@ -874,16 +615,7 @@ const bootstrap = async () => { port: config.redis.port } }); - await dbInitializer.initialize(); const metricCollector = new MetricCollector(); - cron.schedule("0 0 * * *", async () => { - logger.info("Running scheduled partition maintenance..."); - try { - await partitionManager.ensurePartitions(30); - } catch (err) { - logger.error("Scheduled partition maintenance failed", err); - } - }); const redisClient = await createRedisClient(config.redis); const redisIntegration = new RedisIntegration( redisClient, @@ -991,10 +723,8 @@ const bootstrap = async () => { const lower = message.toLowerCase(); return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry"); }; - const isMissingPartitionError = (err) => err?.code === "23514" || typeof err?.message === "string" && err.message.includes("no partition of relation"); const insertRowsWithRetry = async (rows) => { const startedAt = Date.now(); - let attemptedPartitionFix = false; while (true) { try { await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); @@ -1011,24 +741,6 @@ const bootstrap = async () => { } continue; } - if (isMissingPartitionError(err) && !attemptedPartitionFix) { - attemptedPartitionFix = true; - try { - await partitionManager.ensurePartitionsForTimestamps(rows.map((r) => r.ts_ms)); - } catch (partitionErr) { - if (isDbConnectionError(partitionErr)) { - logger.error("Database offline during partition ensure. Retrying in 5s...", { error: partitionErr.message }); - await new Promise((r) => setTimeout(r, 5e3)); - while (!await dbManager.checkConnection()) { - logger.warn("Database still offline. Waiting 5s..."); - await new Promise((r) => setTimeout(r, 5e3)); - } - continue; - } - throw partitionErr; - } - continue; - } throw err; } } diff --git a/bls-onoffline-backend/openspec/changes/2026-03-03-refactor-partition-indexes/proposal.md b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-2026-03-03-refactor-partition-indexes/proposal.md similarity index 100% rename from bls-onoffline-backend/openspec/changes/2026-03-03-refactor-partition-indexes/proposal.md rename to bls-onoffline-backend/openspec/changes/archive/2026-03-04-2026-03-03-refactor-partition-indexes/proposal.md diff --git a/bls-onoffline-backend/openspec/changes/2026-03-03-refactor-partition-indexes/specs/onoffline/spec.md b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-2026-03-03-refactor-partition-indexes/specs/onoffline/spec.md similarity index 100% rename from bls-onoffline-backend/openspec/changes/2026-03-03-refactor-partition-indexes/specs/onoffline/spec.md rename to bls-onoffline-backend/openspec/changes/archive/2026-03-04-2026-03-03-refactor-partition-indexes/specs/onoffline/spec.md diff --git a/bls-onoffline-backend/openspec/changes/2026-03-03-refactor-partition-indexes/tasks.md b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-2026-03-03-refactor-partition-indexes/tasks.md similarity index 100% rename from bls-onoffline-backend/openspec/changes/2026-03-03-refactor-partition-indexes/tasks.md rename to bls-onoffline-backend/openspec/changes/archive/2026-03-04-2026-03-03-refactor-partition-indexes/tasks.md diff --git a/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/proposal.md b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/proposal.md new file mode 100644 index 0000000..31905ce --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/proposal.md @@ -0,0 +1,14 @@ +# Change: remove runtime db provisioning + +## Why +当前服务在运行时承担了建库、建表和分区维护职责,导致服务职责边界不清晰,也会引入启动阶段 DDL 风险。现已将该能力剥离到根目录 `SQL_Script/`,需要通过 OpenSpec 正式记录为规范变更。 + +## What Changes +- 移除服务启动阶段的数据库初始化与定时分区维护要求。 +- 移除服务在写入失败时自动创建缺失分区的要求。 +- 明确数据库结构与分区维护由外部脚本(`SQL_Script/`)负责。 +- 保留服务的核心职责:Kafka 消费、解析、写库、重试与监控。 + +## Impact +- Affected specs: `openspec/specs/onoffline/spec.md` +- Affected code: `src/index.js`, `src/config/config.js`, `src/db/initializer.js`, `src/db/partitionManager.js`, `scripts/init_db.sql`, `scripts/verify_partitions.js`, `../SQL_Script/*` diff --git a/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/specs/onoffline/spec.md b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/specs/onoffline/spec.md new file mode 100644 index 0000000..4fa7887 --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/specs/onoffline/spec.md @@ -0,0 +1,32 @@ +## MODIFIED Requirements + +### Requirement: 数据库分区策略 +系统 SHALL 使用 Range Partitioning 按天分区;运行服务本身 SHALL NOT 执行建库、建表、分区创建或定时分区维护。 + +#### Scenario: 服务启动不执行 DDL +- **GIVEN** 服务进程启动 +- **WHEN** 进入 bootstrap 过程 +- **THEN** 仅初始化消费、处理、监控相关能力,不执行数据库创建、表结构初始化与分区创建 + +#### Scenario: 分区由外部脚本维护 +- **GIVEN** 需要创建数据库对象或新增未来分区 +- **WHEN** 执行外部 SQL/JS 工具 +- **THEN** 通过根目录 `SQL_Script/` 完成建库和分区维护,而不是由服务运行时自动执行 + +### Requirement: 批量消费与写入 +系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。 + +#### Scenario: 批量写入 +- **GIVEN** 短时间内收到多条消息 (e.g., 500条) +- **WHEN** 缓冲区满或超时 (e.g., 200ms) +- **THEN** 执行一次批量数据库插入操作 + +#### Scenario: 写入失败降级 +- **GIVEN** 批量写入因数据错误失败 (非连接错误) +- **WHEN** 捕获异常 +- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库 + +#### Scenario: 分区缺失错误处理 +- **GIVEN** 写入时数据库返回分区缺失错误 +- **WHEN** 服务处理该错误 +- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建 diff --git a/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/tasks.md b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/tasks.md new file mode 100644 index 0000000..bc99b8c --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/archive/2026-03-04-remove-runtime-db-provisioning/tasks.md @@ -0,0 +1,12 @@ +## 1. Implementation +- [x] 1.1 Remove runtime DB initialization from bootstrap flow (`src/index.js`). +- [x] 1.2 Remove scheduled partition maintenance job from runtime service. +- [x] 1.3 Remove runtime missing-partition auto-fix behavior. +- [x] 1.4 Remove legacy DB provisioning modules and scripts from service project. +- [x] 1.5 Add external SQL/JS provisioning scripts under root `SQL_Script/` for DB/schema/partition management. +- [x] 1.6 Update project docs to point DB provisioning to `SQL_Script/`. + +## 2. Validation +- [x] 2.1 Run `npm run lint` in `bls-onoffline-backend`. +- [x] 2.2 Run `npm run build` in `bls-onoffline-backend`. +- [x] 2.3 Run `openspec validate remove-runtime-db-provisioning --strict`. diff --git a/bls-onoffline-backend/openspec/specs/onoffline/spec.md b/bls-onoffline-backend/openspec/specs/onoffline/spec.md index 93ce015..a3bc8b0 100644 --- a/bls-onoffline-backend/openspec/specs/onoffline/spec.md +++ b/bls-onoffline-backend/openspec/specs/onoffline/spec.md @@ -28,12 +28,12 @@ - **THEN** 数据库存储值为对应的空字符串 ### Requirement: 数据库分区策略 -系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表。 +系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表,子表依赖 PostgreSQL 原生机制继承主表索引。 #### Scenario: 分区预创建 - **GIVEN** 系统启动或每日凌晨 - **WHEN** 运行分区维护任务 -- **THEN** 确保数据库中存在未来 30 天的分区表 +- **THEN** 确保数据库中存在未来 30 天的分区表,无需对子表显式创建单列表索引 ### Requirement: 消费可靠性 (At-Least-Once) 系统 SHALL 仅在数据成功写入数据库后,才向 Kafka 提交消费位点。 @@ -84,7 +84,7 @@ - **THEN** 自动乘以 1000 转换为毫秒 ### Requirement: 批量消费与写入 -系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量。 +系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。 #### Scenario: 批量写入 - **GIVEN** 短时间内收到多条消息 (e.g., 500条) @@ -96,3 +96,8 @@ - **WHEN** 捕获异常 - **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库 +#### Scenario: 分区缺失错误处理 +- **GIVEN** 写入时数据库返回分区缺失错误 +- **WHEN** 服务处理该错误 +- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建 + diff --git a/bls-onoffline-backend/scripts/init_db.sql b/bls-onoffline-backend/scripts/init_db.sql deleted file mode 100644 index b0b1503..0000000 --- a/bls-onoffline-backend/scripts/init_db.sql +++ /dev/null @@ -1,23 +0,0 @@ -CREATE SCHEMA IF NOT EXISTS onoffline; - -CREATE TABLE IF NOT EXISTS onoffline.onoffline_record ( - guid VARCHAR(32) NOT NULL, - ts_ms BIGINT NOT NULL, - write_ts_ms BIGINT NOT NULL, - hotel_id SMALLINT NOT NULL, - mac VARCHAR(21) NOT NULL, - device_id VARCHAR(64) NOT NULL, - room_id VARCHAR(64) NOT NULL, - ip VARCHAR(21), - current_status VARCHAR(255), - launcher_version VARCHAR(255), - reboot_reason VARCHAR(255), - PRIMARY KEY (ts_ms, mac, device_id, room_id) -) PARTITION BY RANGE (ts_ms); - -CREATE INDEX IF NOT EXISTS idx_onoffline_ts_ms ON onoffline.onoffline_record (ts_ms); -CREATE INDEX IF NOT EXISTS idx_onoffline_hotel_id ON onoffline.onoffline_record (hotel_id); -CREATE INDEX IF NOT EXISTS idx_onoffline_mac ON onoffline.onoffline_record (mac); -CREATE INDEX IF NOT EXISTS idx_onoffline_device_id ON onoffline.onoffline_record (device_id); -CREATE INDEX IF NOT EXISTS idx_onoffline_room_id ON onoffline.onoffline_record (room_id); -CREATE INDEX IF NOT EXISTS idx_onoffline_current_status ON onoffline.onoffline_record (current_status); diff --git a/bls-onoffline-backend/scripts/verify_partitions.js b/bls-onoffline-backend/scripts/verify_partitions.js deleted file mode 100644 index 68db13f..0000000 --- a/bls-onoffline-backend/scripts/verify_partitions.js +++ /dev/null @@ -1,61 +0,0 @@ -import pg from 'pg'; -import { config } from '../src/config/config.js'; - -const { Pool } = pg; - -const verify = async () => { - const pool = new Pool({ - host: config.db.host, - port: config.db.port, - user: config.db.user, - password: config.db.password, - database: config.db.database, - ssl: config.db.ssl, - }); - - try { - console.log('Verifying partitions for table onoffline_record...'); - const client = await pool.connect(); - - // Check parent table - const parentRes = await client.query(` - SELECT to_regclass('onoffline.onoffline_record') as oid; - `); - if (!parentRes.rows[0].oid) { - console.error('Parent table onoffline.onoffline_record DOES NOT EXIST.'); - return; - } - console.log('Parent table onoffline.onoffline_record exists.'); - - // Check partitions - const res = await client.query(` - SELECT - child.relname AS partition_name - FROM pg_inherits - JOIN pg_class parent ON pg_inherits.inhparent = parent.oid - JOIN pg_class child ON pg_inherits.inhrelid = child.oid - JOIN pg_namespace ns ON parent.relnamespace = ns.oid - WHERE parent.relname = 'onoffline_record' AND ns.nspname = 'onoffline' - ORDER BY child.relname; - `); - - console.log(`Found ${res.rowCount} partitions.`); - res.rows.forEach(row => { - console.log(`- ${row.partition_name}`); - }); - - if (res.rowCount >= 30) { - console.log('SUCCESS: At least 30 partitions exist.'); - } else { - console.warn(`WARNING: Expected 30+ partitions, found ${res.rowCount}.`); - } - - client.release(); - } catch (err) { - console.error('Verification failed:', err); - } finally { - await pool.end(); - } -}; - -verify(); diff --git a/bls-onoffline-backend/src/db/initializer.js b/bls-onoffline-backend/src/db/initializer.js deleted file mode 100644 index 1f83cc2..0000000 --- a/bls-onoffline-backend/src/db/initializer.js +++ /dev/null @@ -1,100 +0,0 @@ -import pg from 'pg'; -import fs from 'fs'; -import path from 'path'; -import { fileURLToPath } from 'url'; -import { logger } from '../utils/logger.js'; -import partitionManager from './partitionManager.js'; -import dbManager from './databaseManager.js'; -import { config } from '../config/config.js'; - -const __filename = fileURLToPath(import.meta.url); -const __dirname = path.dirname(__filename); - -class DatabaseInitializer { - async initialize() { - logger.info('Starting database initialization check...'); - - // 1. Check if database exists, create if not - await this.ensureDatabaseExists(); - - // 2. Initialize Schema and Parent Table (if not exists) - // Note: We need to use dbManager because it connects to the target database - await this.ensureSchemaAndTable(); - - // 3. Ensure Partitions for the next month - await partitionManager.ensurePartitions(30); - - console.log('Database initialization completed successfully.'); - logger.info('Database initialization completed successfully.'); - } - - async ensureDatabaseExists() { - const { host, port, user, password, database, ssl } = config.db; - console.log(`Checking if database '${database}' exists at ${host}:${port}...`); - - // Connect to 'postgres' database to check/create target database - const client = new pg.Client({ - host, - port, - user, - password, - database: 'postgres', - ssl: ssl ? { rejectUnauthorized: false } : false - }); - - try { - await client.connect(); - - const checkRes = await client.query( - `SELECT 1 FROM pg_database WHERE datname = $1`, - [database] - ); - - if (checkRes.rowCount === 0) { - logger.info(`Database '${database}' does not exist. Creating...`); - // CREATE DATABASE cannot run inside a transaction block - await client.query(`CREATE DATABASE "${database}"`); - console.log(`Database '${database}' created.`); - logger.info(`Database '${database}' created.`); - } else { - console.log(`Database '${database}' already exists.`); - logger.info(`Database '${database}' already exists.`); - } - } catch (err) { - logger.error('Error ensuring database exists:', err); - throw err; - } finally { - await client.end(); - } - } - - async ensureSchemaAndTable() { - // dbManager connects to the target database - const client = await dbManager.pool.connect(); - try { - const sqlPathCandidates = [ - path.resolve(process.cwd(), 'scripts/init_db.sql'), - path.resolve(__dirname, '../scripts/init_db.sql'), - path.resolve(__dirname, '../../scripts/init_db.sql') - ]; - const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate)); - if (!sqlPath) { - throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`); - } - const sql = fs.readFileSync(sqlPath, 'utf8'); - - console.log(`Executing init_db.sql from ${sqlPath}...`); - logger.info('Executing init_db.sql...'); - await client.query(sql); - console.log('Schema and parent table initialized.'); - logger.info('Schema and parent table initialized.'); - } catch (err) { - logger.error('Error initializing schema and table:', err); - throw err; - } finally { - client.release(); - } - } -} - -export default new DatabaseInitializer(); diff --git a/bls-onoffline-backend/src/db/partitionManager.js b/bls-onoffline-backend/src/db/partitionManager.js deleted file mode 100644 index 7a3651e..0000000 --- a/bls-onoffline-backend/src/db/partitionManager.js +++ /dev/null @@ -1,136 +0,0 @@ -import { logger } from '../utils/logger.js'; -import { config } from '../config/config.js'; -import dbManager from './databaseManager.js'; - -class PartitionManager { - isCurrentOrFutureDate(date) { - const normalizedDate = new Date(date); - normalizedDate.setHours(0, 0, 0, 0); - - const today = new Date(); - today.setHours(0, 0, 0, 0); - - return normalizedDate.getTime() >= today.getTime(); - } - - /** - * Calculate the start and end timestamps (milliseconds) for a given date. - * @param {Date} date - The date to calculate for. - * @returns {Object} { startMs, endMs, partitionSuffix } - */ - getPartitionInfo(date) { - const yyyy = date.getFullYear(); - const mm = String(date.getMonth() + 1).padStart(2, '0'); - const dd = String(date.getDate()).padStart(2, '0'); - const partitionSuffix = `${yyyy}${mm}${dd}`; - - const start = new Date(date); - start.setHours(0, 0, 0, 0); - const startMs = start.getTime(); - - const end = new Date(date); - end.setDate(end.getDate() + 1); - end.setHours(0, 0, 0, 0); - const endMs = end.getTime(); - - return { startMs, endMs, partitionSuffix }; - } - - /** - * Ensure partitions exist for the past M days and next N days. - * @param {number} daysAhead - Number of days to pre-create. - * @param {number} daysBack - Number of days to look back. - */ - async ensurePartitions(daysAhead = 30, daysBack = 15) { - const client = await dbManager.pool.connect(); - try { - logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); - console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); - const now = new Date(); - - for (let i = -daysBack; i < daysAhead; i++) { - const targetDate = new Date(now); - targetDate.setDate(now.getDate() + i); - - const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate); - const schema = config.db.schema; - const table = config.db.table; - const partitionName = `${schema}.${table}_${partitionSuffix}`; - - // Check if partition exists - const checkSql = ` - SELECT to_regclass($1) as exists; - `; - const checkRes = await client.query(checkSql, [partitionName]); - - if (!checkRes.rows[0].exists) { - logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); - console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); - const tablespaceClause = this.isCurrentOrFutureDate(targetDate) ? ' TABLESPACE ts_hot' : ''; - const createSql = ` - CREATE TABLE IF NOT EXISTS ${partitionName} - PARTITION OF ${schema}.${table} - FOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause}; - `; - await client.query(createSql); - } - } - logger.info('Partition check completed.'); - } catch (err) { - logger.error('Error ensuring partitions:', err); - throw err; - } finally { - client.release(); - } - } - - async ensurePartitionsForTimestamps(tsMsList) { - if (!Array.isArray(tsMsList) || tsMsList.length === 0) return; - - const uniqueSuffixes = new Set(); - for (const ts of tsMsList) { - const numericTs = typeof ts === 'string' ? Number(ts) : ts; - if (!Number.isFinite(numericTs)) continue; - const date = new Date(numericTs); - if (Number.isNaN(date.getTime())) continue; - const { partitionSuffix } = this.getPartitionInfo(date); - uniqueSuffixes.add(partitionSuffix); - if (uniqueSuffixes.size >= 400) break; - } - - if (uniqueSuffixes.size === 0) return; - - const client = await dbManager.pool.connect(); - try { - const schema = config.db.schema; - const table = config.db.table; - - for (const partitionSuffix of uniqueSuffixes) { - const yyyy = Number(partitionSuffix.slice(0, 4)); - const mm = Number(partitionSuffix.slice(4, 6)); - const dd = Number(partitionSuffix.slice(6, 8)); - if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue; - const targetDate = new Date(yyyy, mm - 1, dd); - if (Number.isNaN(targetDate.getTime())) continue; - - const { startMs, endMs } = this.getPartitionInfo(targetDate); - const partitionName = `${schema}.${table}_${partitionSuffix}`; - - const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]); - if (!checkRes.rows[0].exists) { - logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); - const tablespaceClause = this.isCurrentOrFutureDate(targetDate) ? ' TABLESPACE ts_hot' : ''; - await client.query(` - CREATE TABLE IF NOT EXISTS ${partitionName} - PARTITION OF ${schema}.${table} - FOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause}; - `); - } - } - } finally { - client.release(); - } - } -} - -export default new PartitionManager(); diff --git a/bls-onoffline-backend/src/index.js b/bls-onoffline-backend/src/index.js index 0451cba..5259900 100644 --- a/bls-onoffline-backend/src/index.js +++ b/bls-onoffline-backend/src/index.js @@ -1,8 +1,6 @@ import cron from 'node-cron'; import { config } from './config/config.js'; import dbManager from './db/databaseManager.js'; -import dbInitializer from './db/initializer.js'; -import partitionManager from './db/partitionManager.js'; import { createKafkaConsumers } from './kafka/consumer.js'; import { parseMessageToRows } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; @@ -33,38 +31,12 @@ const bootstrap = async () => { } }); - // 0. Initialize Database (Create DB, Schema, Table, Partitions) - await dbInitializer.initialize(); - // Metric Collector const metricCollector = new MetricCollector(); - // 1. Setup Partition Maintenance Cron Job (Every day at 00:00) - cron.schedule('0 0 * * *', async () => { - logger.info('Running scheduled partition maintenance...'); - try { - await partitionManager.ensurePartitions(30); - } catch (err) { - logger.error('Scheduled partition maintenance failed', err); - } - }); - // 1.1 Setup Metric Reporting Cron Job (Every minute) // Moved after redisIntegration initialization - - // DatabaseManager is now a singleton exported instance, but let's keep consistency if possible - // In databaseManager.js it exports `dbManager` instance by default. - // The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export. - // Let's check `databaseManager.js` content. - // Wait, I imported `dbManager` from `./db/databaseManager.js`. - // If `databaseManager.js` exports an instance as default, I should use that. - // If it exports a class, I should instantiate it. - - // Let's assume the previous code `new DatabaseManager` was correct if it was a class. - // BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance. - // I need to verify `databaseManager.js`. - const redisClient = await createRedisClient(config.redis); const redisIntegration = new RedisIntegration( redisClient, @@ -199,13 +171,8 @@ const bootstrap = async () => { ); }; - const isMissingPartitionError = (err) => - err?.code === '23514' || - (typeof err?.message === 'string' && err.message.includes('no partition of relation')); - const insertRowsWithRetry = async (rows) => { const startedAt = Date.now(); - let attemptedPartitionFix = false; while (true) { try { await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); @@ -222,24 +189,6 @@ const bootstrap = async () => { } continue; } - if (isMissingPartitionError(err) && !attemptedPartitionFix) { - attemptedPartitionFix = true; - try { - await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms)); - } catch (partitionErr) { - if (isDbConnectionError(partitionErr)) { - logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message }); - await new Promise(r => setTimeout(r, 5000)); - while (!(await dbManager.checkConnection())) { - logger.warn('Database still offline. Waiting 5s...'); - await new Promise(r => setTimeout(r, 5000)); - } - continue; - } - throw partitionErr; - } - continue; - } throw err; } }