feat(数据库): 实现双数据库写入功能并优化数据同步

添加对G5数据库的支持,实现双数据库同时写入功能
重构数据库配置和连接管理,支持主库和G5库的独立配置
优化数据同步逻辑,添加缺失字段并保持现有字段不变
改进数据过滤和事务处理,增强错误处理和日志记录
This commit is contained in:
2026-04-03 18:47:31 +08:00
parent a13414ffad
commit 286b605e4f
7 changed files with 317 additions and 131 deletions

View File

@@ -7,13 +7,19 @@ PORT=3000
# 数据库配置
POSTGRES_HOST=10.8.8.109
POSTGRES_PORT=5433
POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres!
POSTGRES_DATABASE=log_platform
POSTGRES_USER=log_admin
POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres!
POSTGRES_MAX_CONNECTIONS=2
POSTGRES_IDLE_TIMEOUT_MS=30000
POSTGRES_HOST_G5=10.8.8.80
POSTGRES_PORT_G5=5434
POSTGRES_PASSWORD_G5=H3IkLUt8K!x
# 启用的酒店ID列表
ENABLED_HOTEL_IDS=1000-2500 #1001,1003,1068,1085,1865,2000-2500
ENABLED_HOTEL_IDS=1000-2500
# 接口启用配置 (true/false)
ENABLE_API_HOTEL_LIST=true # 酒店列表

View File

@@ -0,0 +1,17 @@
-- Make G5 (10.8.8.80) loops table compatible with primary (10.8.8.109)
-- Scope: temporary_project.loops
-- Strategy: add missing columns only, keep existing columns unchanged
ALTER TABLE temporary_project.loops
ADD COLUMN IF NOT EXISTS type varchar(254),
ADD COLUMN IF NOT EXISTS name varchar(254),
ADD COLUMN IF NOT EXISTS power double precision,
ADD COLUMN IF NOT EXISTS rate double precision,
ADD COLUMN IF NOT EXISTS temperature double precision,
ADD COLUMN IF NOT EXISTS air_type varchar(254),
ADD COLUMN IF NOT EXISTS air_brand varchar(254),
ADD COLUMN IF NOT EXISTS air_model varchar(254),
ADD COLUMN IF NOT EXISTS height double precision,
ADD COLUMN IF NOT EXISTS area double precision,
ADD COLUMN IF NOT EXISTS heat_loss double precision,
ADD COLUMN IF NOT EXISTS remark varchar(254);

View File

@@ -1,16 +1,57 @@
require('dotenv').config();
const stripInlineComment = (value) => {
if (typeof value !== 'string') return '';
return value.split('#')[0].trim();
};
const parseBooleanEnv = (value, defaultValue = true) => {
const normalized = stripInlineComment(value).toLowerCase();
if (!normalized) return defaultValue;
if (normalized === 'true') return true;
if (normalized === 'false') return false;
return defaultValue;
};
const primaryDbConfig = {
host: process.env.POSTGRES_HOST || '10.8.8.109',
port: parseInt(process.env.POSTGRES_PORT, 10) || 5433,
database: process.env.POSTGRES_DATABASE || 'log_platform',
user: process.env.POSTGRES_USER || 'log_admin',
password: process.env.POSTGRES_PASSWORD || 'YourActualStrongPasswordForPostgres!',
max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS, 10) || 6,
idleTimeoutMillis: parseInt(process.env.POSTGRES_IDLE_TIMEOUT_MS, 10) || 30000,
};
const g5DbConfig = {
host: process.env.POSTGRES_HOST_G5 || primaryDbConfig.host,
port: parseInt(process.env.POSTGRES_PORT_G5, 10) || primaryDbConfig.port,
database: process.env.POSTGRES_DATABASE_G5 || primaryDbConfig.database,
user: process.env.POSTGRES_USER_G5 || primaryDbConfig.user,
password: process.env.POSTGRES_PASSWORD_G5 || primaryDbConfig.password,
max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS_G5, 10) || primaryDbConfig.max,
idleTimeoutMillis: parseInt(process.env.POSTGRES_IDLE_TIMEOUT_MS_G5, 10) || primaryDbConfig.idleTimeoutMillis,
};
const hasG5Overrides = Boolean(
process.env.POSTGRES_HOST_G5 ||
process.env.POSTGRES_PORT_G5 ||
process.env.POSTGRES_DATABASE_G5 ||
process.env.POSTGRES_USER_G5 ||
process.env.POSTGRES_PASSWORD_G5
);
const isSameDbTarget =
primaryDbConfig.host === g5DbConfig.host &&
primaryDbConfig.port === g5DbConfig.port &&
primaryDbConfig.database === g5DbConfig.database &&
primaryDbConfig.user === g5DbConfig.user;
module.exports = {
port: process.env.PORT || 3000,
dbConfig: {
host: process.env.POSTGRES_HOST || '10.8.8.109',
port: parseInt(process.env.POSTGRES_PORT, 10) || 5433,
database: process.env.POSTGRES_DATABASE || 'log_platform',
user: process.env.POSTGRES_USER || 'log_admin',
password: process.env.POSTGRES_PASSWORD || 'YourActualStrongPasswordForPostgres!',
max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS, 10) || 6,
idleTimeoutMillis: parseInt(process.env.POSTGRES_IDLE_TIMEOUT_MS, 10) || 30000,
},
dbConfig: primaryDbConfig,
dbConfigG5: g5DbConfig,
enableDualWrite: hasG5Overrides && !isSameDbTarget,
cloudDbConfig: {
host: process.env.CLOUD_DB_HOST || 'blv-cloud-db.mysql.rds.aliyuncs.com',
port: parseInt(process.env.CLOUD_DB_PORT, 10) || 3307,
@@ -21,35 +62,39 @@ module.exports = {
enabledHotelIds: parseHotelIds(process.env.ENABLED_HOTEL_IDS),
apiBaseUrl: process.env.API_BASE_URL || 'http://www.boonlive-rcu.com:7000/api/values',
apiToggles: {
hotelList: process.env.ENABLE_API_HOTEL_LIST !== 'false',
hostList: process.env.ENABLE_API_HOST_LIST !== 'false',
roomTypeInfo: process.env.ENABLE_API_ROOM_TYPE_INFO !== 'false',
roomTypeModalInfo: process.env.ENABLE_API_ROOM_TYPE_MODAL_INFO !== 'false',
hotelList: parseBooleanEnv(process.env.ENABLE_API_HOTEL_LIST, true),
hostList: parseBooleanEnv(process.env.ENABLE_API_HOST_LIST, true),
roomTypeInfo: parseBooleanEnv(process.env.ENABLE_API_ROOM_TYPE_INFO, true),
roomTypeModalInfo: parseBooleanEnv(process.env.ENABLE_API_ROOM_TYPE_MODAL_INFO, true),
}
};
function parseHotelIds(envVar) {
if (!envVar) return [];
const parts = envVar.split(',');
const raw = stripInlineComment(envVar);
if (!raw) return [];
const parts = raw.split(',');
const ids = new Set();
parts.forEach(part => {
part = part.trim();
if (part.includes('-')) {
const [startStr, endStr] = part.split('-');
const start = parseInt(startStr.trim(), 10);
const end = parseInt(endStr.trim(), 10);
if (!isNaN(start) && !isNaN(end) && start <= end) {
if (!part) return;
const rangeMatch = part.match(/^(\d+)\s*-\s*(\d+)$/);
if (rangeMatch) {
const start = parseInt(rangeMatch[1], 10);
const end = parseInt(rangeMatch[2], 10);
if (start <= end) {
for (let i = start; i <= end; i++) {
ids.add(i);
}
}
} else {
const num = parseInt(part, 10);
if (!isNaN(num)) {
ids.add(num);
}
return;
}
if (/^\d+$/.test(part)) {
ids.add(parseInt(part, 10));
}
});

View File

@@ -1,8 +1,9 @@
const { Pool } = require('pg');
const { dbConfig } = require('../config');
const { dbConfig, dbConfigG5, enableDualWrite } = require('../config');
const logger = require('../utils/logger');
const pool = new Pool(dbConfig);
const poolG5 = enableDualWrite ? new Pool(dbConfigG5) : null;
pool.on('error', (err, client) => {
logger.error({ err }, 'Unexpected error on idle client');
@@ -10,7 +11,22 @@ pool.on('error', (err, client) => {
});
pool.on('connect', () => {
logger.debug('New client connected to database');
logger.debug('New client connected to primary database');
});
module.exports = pool;
if (poolG5) {
poolG5.on('error', (err, client) => {
logger.error({ err }, 'Unexpected error on idle client (G5)');
process.exit(-1);
});
poolG5.on('connect', () => {
logger.debug('New client connected to G5 database');
});
}
module.exports = {
pool,
poolG5,
enableDualWrite
};

View File

@@ -1,14 +1,38 @@
const pool = require('./index');
const { pool, poolG5, enableDualWrite } = require('./index');
const query = (text, params) => pool.query(text, params);
const getClient = () => pool.connect();
const close = () => pool.end();
const getWriteClients = async () => {
const primaryClient = await pool.connect();
if (!enableDualWrite || !poolG5) {
return [
{ name: 'primary', client: primaryClient }
];
}
const g5Client = await poolG5.connect();
return [
{ name: 'primary', client: primaryClient },
{ name: 'g5', client: g5Client }
];
};
const close = async () => {
await pool.end();
if (poolG5) {
await poolG5.end();
}
};
module.exports = {
query,
getClient,
getWriteClients,
close,
pool
pool,
poolG5,
enableDualWrite
};

View File

@@ -1,4 +1,3 @@
const initDB = require('../db/init');
const { concurrentFetch, queuedFetch } = require('../utils/http');
const { saveHotelsTransaction, saveRoomsTransaction, saveRoomTypesTransaction, saveLoopsTransaction } = require('../services/dataService');
const { enabledHotelIds, apiToggles } = require('../config');
@@ -19,12 +18,34 @@ const stats = {
endTime: 0
};
const filterPhase2DataByEnabledHotels = (hotels, rooms, roomTypes, enabledIds) => {
if (!Array.isArray(enabledIds) || enabledIds.length === 0) {
return { hotels, rooms, roomTypes, filtered: false };
}
const enabledSet = new Set(enabledIds.map(id => String(id)));
const filteredHotels = hotels.filter(item => enabledSet.has(String(item.hotelCode)));
const allowedHotelIds = new Set(filteredHotels.map(item => String(item.hotelID)));
const filteredRooms = rooms.filter(item => allowedHotelIds.has(String(item.hotelID)));
const filteredRoomTypes = roomTypes.filter(item => allowedHotelIds.has(String(item.hotelID)));
return {
hotels: filteredHotels,
rooms: filteredRooms,
roomTypes: filteredRoomTypes,
filtered: true
};
};
const main = async () => {
try {
logger.info('Starting Application...');
// Phase 1: Init
await initDB();
// Phase 1: Init runtime dependencies (skip schema creation by requirement)
logger.info('Skip database schema initialization and use existing tables only.');
await initCloudDb();
// Phase 2: Concurrent Data Fetch
@@ -35,28 +56,37 @@ const main = async () => {
// Helper to conditionally fetch or return empty array
const fetchIfEnabled = (enabled, fetchFn) => enabled ? fetchFn : Promise.resolve([]);
const [hotels, rooms, roomTypes] = await Promise.all([
const [rawHotels, rawRooms, rawRoomTypes] = await Promise.all([
fetchIfEnabled(apiToggles.hotelList, concurrentFetch(endpoints.getHotelList)),
fetchIfEnabled(apiToggles.hostList, concurrentFetch(endpoints.getHostList)),
fetchIfEnabled(apiToggles.roomTypeInfo, concurrentFetch(endpoints.getRoomTypeInfo))
]);
logger.info(`Fetched ${hotels.length} hotels, ${rooms.length} rooms, ${roomTypes.length} room types.`);
logger.info(`Fetched RAW ${rawHotels.length} hotels, ${rawRooms.length} rooms, ${rawRoomTypes.length} room types.`);
const scopedData = filterPhase2DataByEnabledHotels(rawHotels, rawRooms, rawRoomTypes, enabledHotelIds);
const hotels = scopedData.hotels;
const rooms = scopedData.rooms;
const roomTypes = scopedData.roomTypes;
if (scopedData.filtered) {
logger.info(`Filtered by ENABLED_HOTEL_IDS (${enabledHotelIds.join(', ')}): ${hotels.length} hotels, ${rooms.length} rooms, ${roomTypes.length} room types.`);
}
await saveHotelsTransaction(hotels);
await saveRoomsTransaction(rooms);
await saveRoomTypesTransaction(roomTypes);
logger.info('Phase 2 Completed: Data saved.');
await saveRoomsTransaction(rooms);
await saveRoomTypesTransaction(roomTypes);
logger.info('Phase 2 Completed: Data saved.');
// Fetch all cloud data at once for caching
if (apiToggles.roomTypeModalInfo) {
const { fetchAllCloudData } = require('../services/dataService');
await fetchAllCloudData();
// Fetch cloud data for enabled hotels only when scoped list exists.
if (apiToggles.roomTypeModalInfo) {
const { fetchAllCloudData } = require('../services/dataService');
await fetchAllCloudData(enabledHotelIds);
}
} catch (error) {
logger.error({ error }, 'Phase 2 failed. Exiting.');
throw error;
}
} catch (error) {
logger.error({ error }, 'Phase 2 failed. Exiting.');
throw error;
}
// Phase 3: Loop Address Fetching
if (apiToggles.roomTypeModalInfo) {
@@ -100,8 +130,11 @@ const main = async () => {
});
if (loops && Array.isArray(loops)) {
await saveLoopsTransaction(loops);
logger.info(`Saved ${loops.length} loops for Hotel ID ${hotelId}`);
const loopResult = await saveLoopsTransaction(loops);
const savedCount = loopResult && typeof loopResult.successCount === 'number'
? loopResult.successCount
: loops.length;
logger.info(`Saved ${savedCount} loops for Hotel ID ${hotelId}`);
} else {
logger.warn(`No loops returned for Hotel ID ${hotelId}`);
}

View File

@@ -1,5 +1,5 @@
const { v4: uuidv4 } = require('uuid');
const { getClient } = require('../db/utils');
const { getClient, getWriteClients, enableDualWrite } = require('../db/utils');
const { queryCloud } = require('../db/cloudDb');
const logger = require('../utils/logger');
@@ -10,12 +10,25 @@ let cloudDataCache = null;
const generateGuid = () => uuidv4().replace(/-/g, '');
// 一次性获取所有云端数据并缓存
const fetchAllCloudData = async () => {
const fetchAllCloudData = async (hotelCodes = []) => {
try {
logger.info('Fetching all cloud data...');
const normalizedHotelCodes = Array.isArray(hotelCodes)
? hotelCodes.map(code => Number(code)).filter(code => Number.isInteger(code))
: [];
logger.info(`Fetching cloud data... scope=${normalizedHotelCodes.length > 0 ? `hotels(${normalizedHotelCodes.join(',')})` : 'all hotels'}`);
const startTime = Date.now();
const cloudRows = await queryCloud('SELECT * FROM tbl_room_type_circuit_powers');
let cloudRows = [];
if (normalizedHotelCodes.length > 0) {
const placeholders = normalizedHotelCodes.map(() => '?').join(', ');
cloudRows = await queryCloud(
`SELECT * FROM tbl_room_type_circuit_powers WHERE hotel_rcu_code IN (${placeholders})`,
normalizedHotelCodes
);
} else {
cloudRows = await queryCloud('SELECT * FROM tbl_room_type_circuit_powers');
}
// 构建索引,方便快速查询
const cache = {};
@@ -71,10 +84,12 @@ const validateSchema = (data, requiredFields) => {
const saveEntitiesTransaction = async (tableName, data, deleteByField, deleteValueExtractor, insertQuery, insertParamsExtractor) => {
if (data.length === 0) return;
const client = await getClient();
const clients = await getWriteClients();
try {
await client.query('BEGIN');
for (const { client } of clients) {
await client.query('BEGIN');
}
// 1. Delete existing
// Optimization: Batch delete?
@@ -99,19 +114,32 @@ const saveEntitiesTransaction = async (tableName, data, deleteByField, deleteVal
for (const item of data) {
const deleteVal = deleteValueExtractor(item);
// Delete query: "DELETE FROM schema.table WHERE col = $1"
await client.query(`DELETE FROM temporary_project.${tableName} WHERE ${deleteByField} = $1`, [deleteVal]);
for (const { client } of clients) {
await client.query(`DELETE FROM temporary_project.${tableName} WHERE ${deleteByField} = $1`, [deleteVal]);
}
const params = insertParamsExtractor(item);
await client.query(insertQuery, params);
for (const { client } of clients) {
await client.query(insertQuery, params);
}
}
await client.query('COMMIT');
for (const { client } of clients) {
await client.query('COMMIT');
}
} catch (e) {
await client.query('ROLLBACK');
for (const { client } of clients) {
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
logger.error({ rollbackError }, 'Rollback failed on one database');
}
}
throw e;
} finally {
client.release();
for (const { client } of clients) {
client.release();
}
}
};
@@ -163,93 +191,110 @@ const saveLoopsTransaction = async (data) => {
logger.info('saveLoopsTransaction: Schema validation passed');
const client = await getClient();
const writeClients = await getWriteClients();
let successCount = 0;
let errorCount = 0;
try {
await client.query('BEGIN');
logger.info('saveLoopsTransaction: Transaction started');
for (const { client: writeClient } of writeClients) {
await writeClient.query('BEGIN');
}
logger.info({ dualWrite: enableDualWrite }, 'saveLoopsTransaction: Transaction started');
for (let i = 0; i < data.length; i++) {
const item = data[i];
logger.info(`Processing loop item ${i + 1}/${data.length}: id=${item.id}, roomTypeID=${item.roomTypeID}, modalAddress=${item.modalAddress}`);
try {
// 1. Delete existing
const deleteResult = await client.query('DELETE FROM temporary_project.loops WHERE id = $1', [item.id]);
logger.info(`Deleted ${deleteResult.rowCount} existing loop(s) with id=${item.id}`);
// 2. Get room type info to find hotel_id
const roomTypeRes = await client.query(
'SELECT rt.hotel_id, h.hotel_id as hotel_code FROM temporary_project.room_type rt JOIN temporary_project.hotels h ON rt.hotel_id = h.id WHERE rt.id = $1',
[item.roomTypeID]
);
logger.info(`Found ${roomTypeRes.rows.length} room type(s) for room_type_id=${item.roomTypeID}`);
let cloudData = null;
if (roomTypeRes.rows.length > 0) {
const hotelCode = roomTypeRes.rows[0].hotel_code;
logger.info(`Hotel code for room type: ${hotelCode}`);
// 3. Get cloud data from cache
const cloudRow = getCloudData(hotelCode, item.roomTypeID, item.modalAddress);
if (cloudRow) {
cloudData = cloudRow;
logger.info(`Found cloud data for loop ${item.id} from cache: ${JSON.stringify(cloudData)}`);
} else {
logger.warn(`No cloud data found for loop ${item.id} (hotel: ${hotelCode}, room type: ${item.roomTypeID}, address: ${item.modalAddress})`);
}
// 1. Delete existing
for (const { client: writeClient, name } of writeClients) {
const deleteResult = await writeClient.query('DELETE FROM temporary_project.loops WHERE id = $1', [item.id]);
logger.info(`Deleted ${deleteResult.rowCount} existing loop(s) with id=${item.id} on ${name}`);
}
// 2. Get room type info to find hotel_id
const roomTypeRes = await client.query(
'SELECT rt.hotel_id, h.hotel_id as hotel_code FROM temporary_project.room_type rt JOIN temporary_project.hotels h ON rt.hotel_id = h.id WHERE rt.id = $1',
[item.roomTypeID]
);
logger.info(`Found ${roomTypeRes.rows.length} room type(s) for room_type_id=${item.roomTypeID}`);
let cloudData = null;
if (roomTypeRes.rows.length > 0) {
const hotelCode = roomTypeRes.rows[0].hotel_code;
logger.info(`Hotel code for room type: ${hotelCode}`);
// 3. Get cloud data from cache
const cloudRow = getCloudData(hotelCode, item.roomTypeID, item.modalAddress);
if (cloudRow) {
cloudData = cloudRow;
logger.info(`Found cloud data for loop ${item.id} from cache: ${JSON.stringify(cloudData)}`);
} else {
logger.warn(`No room type found for room_type_id=${item.roomTypeID}, skipping cloud data sync`);
logger.warn(`No cloud data found for loop ${item.id} (hotel: ${hotelCode}, room type: ${item.roomTypeID}, address: ${item.modalAddress})`);
}
// 4. Insert with cloud data if available
const insertResult = await client.query(
} else {
logger.warn(`No room type found for room_type_id=${item.roomTypeID}, skipping cloud data sync`);
}
// 4. Insert with cloud data if available
const guid = generateGuid();
const insertParams = [
guid,
item.id,
item.name,
item.roomTypeID,
item.modalAddress,
item.type,
cloudData?.type || null,
cloudData?.name || null,
cloudData?.power || null,
cloudData?.rate || null,
cloudData?.temperature || null,
cloudData?.air_type || null,
cloudData?.air_brand || null,
cloudData?.air_model || null,
cloudData?.height || null,
cloudData?.area || null,
cloudData?.heat_loss || null,
cloudData?.remark || null
];
for (const { client: writeClient, name } of writeClients) {
const insertResult = await writeClient.query(
`INSERT INTO temporary_project.loops (
guid, id, loop_name, room_type_id, loop_address, loop_type,
guid, id, loop_name, room_type_id, loop_address, loop_type,
type, name, power, rate, temperature, air_type, air_brand, air_model, height, area, heat_loss, remark
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)`,
[
generateGuid(),
item.id,
item.name,
item.roomTypeID,
item.modalAddress,
item.type,
cloudData?.type || null,
cloudData?.name || null,
cloudData?.power || null,
cloudData?.rate || null,
cloudData?.temperature || null,
cloudData?.air_type || null,
cloudData?.air_brand || null,
cloudData?.air_model || null,
cloudData?.height || null,
cloudData?.area || null,
cloudData?.heat_loss || null,
cloudData?.remark || null
]
insertParams
);
logger.info(`Inserted loop ${item.id}, rowCount=${insertResult.rowCount}`);
successCount++;
} catch (itemError) {
logger.error({ itemError, item }, `Error processing loop item ${item.id}`);
errorCount++;
logger.info(`Inserted loop ${item.id}, rowCount=${insertResult.rowCount} on ${name}`);
}
successCount++;
}
await client.query('COMMIT');
for (const { client: writeClient } of writeClients) {
await writeClient.query('COMMIT');
}
logger.info(`saveLoopsTransaction: Transaction committed. Success: ${successCount}, Errors: ${errorCount}`);
return { successCount, errorCount };
} catch (e) {
await client.query('ROLLBACK');
errorCount++;
for (const { client: writeClient } of writeClients) {
try {
await writeClient.query('ROLLBACK');
} catch (rollbackError) {
logger.error({ rollbackError }, 'saveLoopsTransaction: Rollback failed on one database');
}
}
logger.error({ e }, 'saveLoopsTransaction: Transaction rolled back due to error');
throw e;
} finally {
client.release();
for (const { client: writeClient } of writeClients) {
writeClient.release();
}
}
};