refactor: 重构分区索引策略,移除显式创建索引的方法并更新相关流程

This commit is contained in:
2026-03-02 11:49:02 +08:00
parent e7658dd3bd
commit 1eccc2e3aa
7 changed files with 32 additions and 140 deletions

View File

@@ -193,65 +193,6 @@ class PartitionManager {
const endMs = end.getTime();
return { startMs, endMs, partitionSuffix };
}
async ensurePartitionIndexes(client, schema, table, partitionSuffix) {
const startedAt = Date.now();
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const indexBase = `${table}_${partitionSuffix}`;
const indexSpecs = [
{ name: `idx_${indexBase}_ts`, column: "ts_ms" },
{ name: `idx_${indexBase}_hid`, column: "hotel_id" },
{ name: `idx_${indexBase}_mac`, column: "mac" },
{ name: `idx_${indexBase}_did`, column: "device_id" },
{ name: `idx_${indexBase}_rid`, column: "room_id" },
{ name: `idx_${indexBase}_cs`, column: "current_status" }
];
for (const spec of indexSpecs) {
await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`);
}
await client.query(`ANALYZE ${partitionName};`);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 1e3) {
logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs });
}
}
async ensureIndexesForExistingPartitions() {
const startedAt = Date.now();
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
const res = await client.query(
`
SELECT c.relname AS relname
FROM pg_inherits i
JOIN pg_class p ON i.inhparent = p.oid
JOIN pg_namespace pn ON pn.oid = p.relnamespace
JOIN pg_class c ON i.inhrelid = c.oid
WHERE pn.nspname = $1 AND p.relname = $2
ORDER BY c.relname;
`,
[schema, table]
);
const suffixes = /* @__PURE__ */ new Set();
const pattern = new RegExp(`^${table}_(\\d{8})$`);
for (const row of res.rows) {
const relname = row?.relname;
if (typeof relname !== "string") continue;
const match = relname.match(pattern);
if (!match) continue;
suffixes.add(match[1]);
}
for (const suffix of suffixes) {
await this.ensurePartitionIndexes(client, schema, table, suffix);
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 5e3) {
logger.warn("Ensure existing partition indexes slow", { schema, table, partitions: suffixes.size, elapsedMs });
}
} finally {
client.release();
}
}
/**
* Ensure partitions exist for the past M days and next N days.
* @param {number} daysAhead - Number of days to pre-create.
@@ -284,7 +225,6 @@ class PartitionManager {
`;
await client.query(createSql);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
logger.info("Partition check completed.");
} catch (err) {
@@ -329,7 +269,6 @@ class PartitionManager {
FOR VALUES FROM (${startMs}) TO (${endMs});
`);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
} finally {
client.release();
@@ -345,7 +284,6 @@ class DatabaseInitializer {
await this.ensureDatabaseExists();
await this.ensureSchemaAndTable();
await partitionManager.ensurePartitions(30);
await partitionManager.ensureIndexesForExistingPartitions();
console.log("Database initialization completed successfully.");
logger.info("Database initialization completed successfully.");
}
@@ -875,9 +813,9 @@ const bootstrap = async () => {
const metrics = metricCollector.getAndReset();
const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0";
const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0";
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`;
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
console.log(report);
logger.info(report, metrics);
logger.info(report);
try {
await redisIntegration.info("Minute Metrics", metrics);
} catch (err) {