From 286b605e4f221a36c6d59eb5cb50afb0c9ed17ab Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Fri, 3 Apr 2026 18:47:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E6=95=B0=E6=8D=AE=E5=BA=93):=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E5=8F=8C=E6=95=B0=E6=8D=AE=E5=BA=93=E5=86=99=E5=85=A5?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E5=B9=B6=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加对G5数据库的支持,实现双数据库同时写入功能 重构数据库配置和连接管理,支持主库和G5库的独立配置 优化数据同步逻辑,添加缺失字段并保持现有字段不变 改进数据过滤和事务处理,增强错误处理和日志记录 --- temporary_project_management/.env | 10 +- .../sync_g5_loops_schema_with_primary.sql | 17 ++ .../src/config/index.js | 97 ++++++--- temporary_project_management/src/db/index.js | 22 +- temporary_project_management/src/db/utils.js | 30 ++- .../src/scripts/main.js | 69 ++++-- .../src/services/dataService.js | 203 +++++++++++------- 7 files changed, 317 insertions(+), 131 deletions(-) create mode 100644 temporary_project_management/docs/sync_g5_loops_schema_with_primary.sql diff --git a/temporary_project_management/.env b/temporary_project_management/.env index 52016c2..50446db 100644 --- a/temporary_project_management/.env +++ b/temporary_project_management/.env @@ -7,13 +7,19 @@ PORT=3000 # 数据库配置 POSTGRES_HOST=10.8.8.109 POSTGRES_PORT=5433 +POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres! + POSTGRES_DATABASE=log_platform POSTGRES_USER=log_admin -POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres! POSTGRES_MAX_CONNECTIONS=2 POSTGRES_IDLE_TIMEOUT_MS=30000 + +POSTGRES_HOST_G5=10.8.8.80 +POSTGRES_PORT_G5=5434 +POSTGRES_PASSWORD_G5=H3IkLUt8K!x + # 启用的酒店ID列表 -ENABLED_HOTEL_IDS=1000-2500 #1001,1003,1068,1085,1865,2000-2500 +ENABLED_HOTEL_IDS=1000-2500 # 接口启用配置 (true/false) ENABLE_API_HOTEL_LIST=true # 酒店列表 diff --git a/temporary_project_management/docs/sync_g5_loops_schema_with_primary.sql b/temporary_project_management/docs/sync_g5_loops_schema_with_primary.sql new file mode 100644 index 0000000..c808cd6 --- /dev/null +++ b/temporary_project_management/docs/sync_g5_loops_schema_with_primary.sql @@ -0,0 +1,17 @@ +-- Make G5 (10.8.8.80) loops table compatible with primary (10.8.8.109) +-- Scope: temporary_project.loops +-- Strategy: add missing columns only, keep existing columns unchanged + +ALTER TABLE temporary_project.loops + ADD COLUMN IF NOT EXISTS type varchar(254), + ADD COLUMN IF NOT EXISTS name varchar(254), + ADD COLUMN IF NOT EXISTS power double precision, + ADD COLUMN IF NOT EXISTS rate double precision, + ADD COLUMN IF NOT EXISTS temperature double precision, + ADD COLUMN IF NOT EXISTS air_type varchar(254), + ADD COLUMN IF NOT EXISTS air_brand varchar(254), + ADD COLUMN IF NOT EXISTS air_model varchar(254), + ADD COLUMN IF NOT EXISTS height double precision, + ADD COLUMN IF NOT EXISTS area double precision, + ADD COLUMN IF NOT EXISTS heat_loss double precision, + ADD COLUMN IF NOT EXISTS remark varchar(254); diff --git a/temporary_project_management/src/config/index.js b/temporary_project_management/src/config/index.js index a160648..e50880e 100644 --- a/temporary_project_management/src/config/index.js +++ b/temporary_project_management/src/config/index.js @@ -1,16 +1,57 @@ require('dotenv').config(); +const stripInlineComment = (value) => { + if (typeof value !== 'string') return ''; + return value.split('#')[0].trim(); +}; + +const parseBooleanEnv = (value, defaultValue = true) => { + const normalized = stripInlineComment(value).toLowerCase(); + if (!normalized) return defaultValue; + if (normalized === 'true') return true; + if (normalized === 'false') return false; + return defaultValue; +}; + +const primaryDbConfig = { + host: process.env.POSTGRES_HOST || '10.8.8.109', + port: parseInt(process.env.POSTGRES_PORT, 10) || 5433, + database: process.env.POSTGRES_DATABASE || 'log_platform', + user: process.env.POSTGRES_USER || 'log_admin', + password: process.env.POSTGRES_PASSWORD || 'YourActualStrongPasswordForPostgres!', + max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS, 10) || 6, + idleTimeoutMillis: parseInt(process.env.POSTGRES_IDLE_TIMEOUT_MS, 10) || 30000, +}; + +const g5DbConfig = { + host: process.env.POSTGRES_HOST_G5 || primaryDbConfig.host, + port: parseInt(process.env.POSTGRES_PORT_G5, 10) || primaryDbConfig.port, + database: process.env.POSTGRES_DATABASE_G5 || primaryDbConfig.database, + user: process.env.POSTGRES_USER_G5 || primaryDbConfig.user, + password: process.env.POSTGRES_PASSWORD_G5 || primaryDbConfig.password, + max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS_G5, 10) || primaryDbConfig.max, + idleTimeoutMillis: parseInt(process.env.POSTGRES_IDLE_TIMEOUT_MS_G5, 10) || primaryDbConfig.idleTimeoutMillis, +}; + +const hasG5Overrides = Boolean( + process.env.POSTGRES_HOST_G5 || + process.env.POSTGRES_PORT_G5 || + process.env.POSTGRES_DATABASE_G5 || + process.env.POSTGRES_USER_G5 || + process.env.POSTGRES_PASSWORD_G5 +); + +const isSameDbTarget = + primaryDbConfig.host === g5DbConfig.host && + primaryDbConfig.port === g5DbConfig.port && + primaryDbConfig.database === g5DbConfig.database && + primaryDbConfig.user === g5DbConfig.user; + module.exports = { port: process.env.PORT || 3000, - dbConfig: { - host: process.env.POSTGRES_HOST || '10.8.8.109', - port: parseInt(process.env.POSTGRES_PORT, 10) || 5433, - database: process.env.POSTGRES_DATABASE || 'log_platform', - user: process.env.POSTGRES_USER || 'log_admin', - password: process.env.POSTGRES_PASSWORD || 'YourActualStrongPasswordForPostgres!', - max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS, 10) || 6, - idleTimeoutMillis: parseInt(process.env.POSTGRES_IDLE_TIMEOUT_MS, 10) || 30000, - }, + dbConfig: primaryDbConfig, + dbConfigG5: g5DbConfig, + enableDualWrite: hasG5Overrides && !isSameDbTarget, cloudDbConfig: { host: process.env.CLOUD_DB_HOST || 'blv-cloud-db.mysql.rds.aliyuncs.com', port: parseInt(process.env.CLOUD_DB_PORT, 10) || 3307, @@ -21,35 +62,39 @@ module.exports = { enabledHotelIds: parseHotelIds(process.env.ENABLED_HOTEL_IDS), apiBaseUrl: process.env.API_BASE_URL || 'http://www.boonlive-rcu.com:7000/api/values', apiToggles: { - hotelList: process.env.ENABLE_API_HOTEL_LIST !== 'false', - hostList: process.env.ENABLE_API_HOST_LIST !== 'false', - roomTypeInfo: process.env.ENABLE_API_ROOM_TYPE_INFO !== 'false', - roomTypeModalInfo: process.env.ENABLE_API_ROOM_TYPE_MODAL_INFO !== 'false', + hotelList: parseBooleanEnv(process.env.ENABLE_API_HOTEL_LIST, true), + hostList: parseBooleanEnv(process.env.ENABLE_API_HOST_LIST, true), + roomTypeInfo: parseBooleanEnv(process.env.ENABLE_API_ROOM_TYPE_INFO, true), + roomTypeModalInfo: parseBooleanEnv(process.env.ENABLE_API_ROOM_TYPE_MODAL_INFO, true), } }; function parseHotelIds(envVar) { - if (!envVar) return []; - const parts = envVar.split(','); + const raw = stripInlineComment(envVar); + if (!raw) return []; + + const parts = raw.split(','); const ids = new Set(); parts.forEach(part => { part = part.trim(); - if (part.includes('-')) { - const [startStr, endStr] = part.split('-'); - const start = parseInt(startStr.trim(), 10); - const end = parseInt(endStr.trim(), 10); - - if (!isNaN(start) && !isNaN(end) && start <= end) { + if (!part) return; + + const rangeMatch = part.match(/^(\d+)\s*-\s*(\d+)$/); + if (rangeMatch) { + const start = parseInt(rangeMatch[1], 10); + const end = parseInt(rangeMatch[2], 10); + + if (start <= end) { for (let i = start; i <= end; i++) { ids.add(i); } } - } else { - const num = parseInt(part, 10); - if (!isNaN(num)) { - ids.add(num); - } + return; + } + + if (/^\d+$/.test(part)) { + ids.add(parseInt(part, 10)); } }); diff --git a/temporary_project_management/src/db/index.js b/temporary_project_management/src/db/index.js index c80bed0..fed620d 100644 --- a/temporary_project_management/src/db/index.js +++ b/temporary_project_management/src/db/index.js @@ -1,8 +1,9 @@ const { Pool } = require('pg'); -const { dbConfig } = require('../config'); +const { dbConfig, dbConfigG5, enableDualWrite } = require('../config'); const logger = require('../utils/logger'); const pool = new Pool(dbConfig); +const poolG5 = enableDualWrite ? new Pool(dbConfigG5) : null; pool.on('error', (err, client) => { logger.error({ err }, 'Unexpected error on idle client'); @@ -10,7 +11,22 @@ pool.on('error', (err, client) => { }); pool.on('connect', () => { - logger.debug('New client connected to database'); + logger.debug('New client connected to primary database'); }); -module.exports = pool; +if (poolG5) { + poolG5.on('error', (err, client) => { + logger.error({ err }, 'Unexpected error on idle client (G5)'); + process.exit(-1); + }); + + poolG5.on('connect', () => { + logger.debug('New client connected to G5 database'); + }); +} + +module.exports = { + pool, + poolG5, + enableDualWrite +}; diff --git a/temporary_project_management/src/db/utils.js b/temporary_project_management/src/db/utils.js index c6d0232..512021f 100644 --- a/temporary_project_management/src/db/utils.js +++ b/temporary_project_management/src/db/utils.js @@ -1,14 +1,38 @@ -const pool = require('./index'); +const { pool, poolG5, enableDualWrite } = require('./index'); const query = (text, params) => pool.query(text, params); const getClient = () => pool.connect(); -const close = () => pool.end(); +const getWriteClients = async () => { + const primaryClient = await pool.connect(); + + if (!enableDualWrite || !poolG5) { + return [ + { name: 'primary', client: primaryClient } + ]; + } + + const g5Client = await poolG5.connect(); + return [ + { name: 'primary', client: primaryClient }, + { name: 'g5', client: g5Client } + ]; +}; + +const close = async () => { + await pool.end(); + if (poolG5) { + await poolG5.end(); + } +}; module.exports = { query, getClient, + getWriteClients, close, - pool + pool, + poolG5, + enableDualWrite }; diff --git a/temporary_project_management/src/scripts/main.js b/temporary_project_management/src/scripts/main.js index a423f9c..26878fc 100644 --- a/temporary_project_management/src/scripts/main.js +++ b/temporary_project_management/src/scripts/main.js @@ -1,4 +1,3 @@ -const initDB = require('../db/init'); const { concurrentFetch, queuedFetch } = require('../utils/http'); const { saveHotelsTransaction, saveRoomsTransaction, saveRoomTypesTransaction, saveLoopsTransaction } = require('../services/dataService'); const { enabledHotelIds, apiToggles } = require('../config'); @@ -19,12 +18,34 @@ const stats = { endTime: 0 }; +const filterPhase2DataByEnabledHotels = (hotels, rooms, roomTypes, enabledIds) => { + if (!Array.isArray(enabledIds) || enabledIds.length === 0) { + return { hotels, rooms, roomTypes, filtered: false }; + } + + const enabledSet = new Set(enabledIds.map(id => String(id))); + const filteredHotels = hotels.filter(item => enabledSet.has(String(item.hotelCode))); + + const allowedHotelIds = new Set(filteredHotels.map(item => String(item.hotelID))); + + const filteredRooms = rooms.filter(item => allowedHotelIds.has(String(item.hotelID))); + + const filteredRoomTypes = roomTypes.filter(item => allowedHotelIds.has(String(item.hotelID))); + + return { + hotels: filteredHotels, + rooms: filteredRooms, + roomTypes: filteredRoomTypes, + filtered: true + }; +}; + const main = async () => { try { logger.info('Starting Application...'); - // Phase 1: Init - await initDB(); + // Phase 1: Init runtime dependencies (skip schema creation by requirement) + logger.info('Skip database schema initialization and use existing tables only.'); await initCloudDb(); // Phase 2: Concurrent Data Fetch @@ -35,28 +56,37 @@ const main = async () => { // Helper to conditionally fetch or return empty array const fetchIfEnabled = (enabled, fetchFn) => enabled ? fetchFn : Promise.resolve([]); - const [hotels, rooms, roomTypes] = await Promise.all([ + const [rawHotels, rawRooms, rawRoomTypes] = await Promise.all([ fetchIfEnabled(apiToggles.hotelList, concurrentFetch(endpoints.getHotelList)), fetchIfEnabled(apiToggles.hostList, concurrentFetch(endpoints.getHostList)), fetchIfEnabled(apiToggles.roomTypeInfo, concurrentFetch(endpoints.getRoomTypeInfo)) ]); - logger.info(`Fetched ${hotels.length} hotels, ${rooms.length} rooms, ${roomTypes.length} room types.`); + logger.info(`Fetched RAW ${rawHotels.length} hotels, ${rawRooms.length} rooms, ${rawRoomTypes.length} room types.`); + + const scopedData = filterPhase2DataByEnabledHotels(rawHotels, rawRooms, rawRoomTypes, enabledHotelIds); + const hotels = scopedData.hotels; + const rooms = scopedData.rooms; + const roomTypes = scopedData.roomTypes; + + if (scopedData.filtered) { + logger.info(`Filtered by ENABLED_HOTEL_IDS (${enabledHotelIds.join(', ')}): ${hotels.length} hotels, ${rooms.length} rooms, ${roomTypes.length} room types.`); + } await saveHotelsTransaction(hotels); - await saveRoomsTransaction(rooms); - await saveRoomTypesTransaction(roomTypes); - logger.info('Phase 2 Completed: Data saved.'); + await saveRoomsTransaction(rooms); + await saveRoomTypesTransaction(roomTypes); + logger.info('Phase 2 Completed: Data saved.'); - // Fetch all cloud data at once for caching - if (apiToggles.roomTypeModalInfo) { - const { fetchAllCloudData } = require('../services/dataService'); - await fetchAllCloudData(); + // Fetch cloud data for enabled hotels only when scoped list exists. + if (apiToggles.roomTypeModalInfo) { + const { fetchAllCloudData } = require('../services/dataService'); + await fetchAllCloudData(enabledHotelIds); + } + } catch (error) { + logger.error({ error }, 'Phase 2 failed. Exiting.'); + throw error; } - } catch (error) { - logger.error({ error }, 'Phase 2 failed. Exiting.'); - throw error; - } // Phase 3: Loop Address Fetching if (apiToggles.roomTypeModalInfo) { @@ -100,8 +130,11 @@ const main = async () => { }); if (loops && Array.isArray(loops)) { - await saveLoopsTransaction(loops); - logger.info(`Saved ${loops.length} loops for Hotel ID ${hotelId}`); + const loopResult = await saveLoopsTransaction(loops); + const savedCount = loopResult && typeof loopResult.successCount === 'number' + ? loopResult.successCount + : loops.length; + logger.info(`Saved ${savedCount} loops for Hotel ID ${hotelId}`); } else { logger.warn(`No loops returned for Hotel ID ${hotelId}`); } diff --git a/temporary_project_management/src/services/dataService.js b/temporary_project_management/src/services/dataService.js index 05fc558..0af9845 100644 --- a/temporary_project_management/src/services/dataService.js +++ b/temporary_project_management/src/services/dataService.js @@ -1,5 +1,5 @@ const { v4: uuidv4 } = require('uuid'); -const { getClient } = require('../db/utils'); +const { getClient, getWriteClients, enableDualWrite } = require('../db/utils'); const { queryCloud } = require('../db/cloudDb'); const logger = require('../utils/logger'); @@ -10,12 +10,25 @@ let cloudDataCache = null; const generateGuid = () => uuidv4().replace(/-/g, ''); // 一次性获取所有云端数据并缓存 -const fetchAllCloudData = async () => { +const fetchAllCloudData = async (hotelCodes = []) => { try { - logger.info('Fetching all cloud data...'); + const normalizedHotelCodes = Array.isArray(hotelCodes) + ? hotelCodes.map(code => Number(code)).filter(code => Number.isInteger(code)) + : []; + + logger.info(`Fetching cloud data... scope=${normalizedHotelCodes.length > 0 ? `hotels(${normalizedHotelCodes.join(',')})` : 'all hotels'}`); const startTime = Date.now(); - - const cloudRows = await queryCloud('SELECT * FROM tbl_room_type_circuit_powers'); + + let cloudRows = []; + if (normalizedHotelCodes.length > 0) { + const placeholders = normalizedHotelCodes.map(() => '?').join(', '); + cloudRows = await queryCloud( + `SELECT * FROM tbl_room_type_circuit_powers WHERE hotel_rcu_code IN (${placeholders})`, + normalizedHotelCodes + ); + } else { + cloudRows = await queryCloud('SELECT * FROM tbl_room_type_circuit_powers'); + } // 构建索引,方便快速查询 const cache = {}; @@ -71,10 +84,12 @@ const validateSchema = (data, requiredFields) => { const saveEntitiesTransaction = async (tableName, data, deleteByField, deleteValueExtractor, insertQuery, insertParamsExtractor) => { if (data.length === 0) return; - - const client = await getClient(); + + const clients = await getWriteClients(); try { - await client.query('BEGIN'); + for (const { client } of clients) { + await client.query('BEGIN'); + } // 1. Delete existing // Optimization: Batch delete? @@ -99,19 +114,32 @@ const saveEntitiesTransaction = async (tableName, data, deleteByField, deleteVal for (const item of data) { const deleteVal = deleteValueExtractor(item); - // Delete query: "DELETE FROM schema.table WHERE col = $1" - await client.query(`DELETE FROM temporary_project.${tableName} WHERE ${deleteByField} = $1`, [deleteVal]); - + for (const { client } of clients) { + await client.query(`DELETE FROM temporary_project.${tableName} WHERE ${deleteByField} = $1`, [deleteVal]); + } + const params = insertParamsExtractor(item); - await client.query(insertQuery, params); + for (const { client } of clients) { + await client.query(insertQuery, params); + } } - await client.query('COMMIT'); + for (const { client } of clients) { + await client.query('COMMIT'); + } } catch (e) { - await client.query('ROLLBACK'); + for (const { client } of clients) { + try { + await client.query('ROLLBACK'); + } catch (rollbackError) { + logger.error({ rollbackError }, 'Rollback failed on one database'); + } + } throw e; } finally { - client.release(); + for (const { client } of clients) { + client.release(); + } } }; @@ -163,93 +191,110 @@ const saveLoopsTransaction = async (data) => { logger.info('saveLoopsTransaction: Schema validation passed'); const client = await getClient(); + const writeClients = await getWriteClients(); let successCount = 0; let errorCount = 0; try { - await client.query('BEGIN'); - logger.info('saveLoopsTransaction: Transaction started'); + for (const { client: writeClient } of writeClients) { + await writeClient.query('BEGIN'); + } + logger.info({ dualWrite: enableDualWrite }, 'saveLoopsTransaction: Transaction started'); for (let i = 0; i < data.length; i++) { const item = data[i]; logger.info(`Processing loop item ${i + 1}/${data.length}: id=${item.id}, roomTypeID=${item.roomTypeID}, modalAddress=${item.modalAddress}`); - - try { - // 1. Delete existing - const deleteResult = await client.query('DELETE FROM temporary_project.loops WHERE id = $1', [item.id]); - logger.info(`Deleted ${deleteResult.rowCount} existing loop(s) with id=${item.id}`); - - // 2. Get room type info to find hotel_id - const roomTypeRes = await client.query( - 'SELECT rt.hotel_id, h.hotel_id as hotel_code FROM temporary_project.room_type rt JOIN temporary_project.hotels h ON rt.hotel_id = h.id WHERE rt.id = $1', - [item.roomTypeID] - ); - - logger.info(`Found ${roomTypeRes.rows.length} room type(s) for room_type_id=${item.roomTypeID}`); - - let cloudData = null; - if (roomTypeRes.rows.length > 0) { - const hotelCode = roomTypeRes.rows[0].hotel_code; - logger.info(`Hotel code for room type: ${hotelCode}`); - - // 3. Get cloud data from cache - const cloudRow = getCloudData(hotelCode, item.roomTypeID, item.modalAddress); - - if (cloudRow) { - cloudData = cloudRow; - logger.info(`Found cloud data for loop ${item.id} from cache: ${JSON.stringify(cloudData)}`); - } else { - logger.warn(`No cloud data found for loop ${item.id} (hotel: ${hotelCode}, room type: ${item.roomTypeID}, address: ${item.modalAddress})`); - } + + // 1. Delete existing + for (const { client: writeClient, name } of writeClients) { + const deleteResult = await writeClient.query('DELETE FROM temporary_project.loops WHERE id = $1', [item.id]); + logger.info(`Deleted ${deleteResult.rowCount} existing loop(s) with id=${item.id} on ${name}`); + } + + // 2. Get room type info to find hotel_id + const roomTypeRes = await client.query( + 'SELECT rt.hotel_id, h.hotel_id as hotel_code FROM temporary_project.room_type rt JOIN temporary_project.hotels h ON rt.hotel_id = h.id WHERE rt.id = $1', + [item.roomTypeID] + ); + + logger.info(`Found ${roomTypeRes.rows.length} room type(s) for room_type_id=${item.roomTypeID}`); + + let cloudData = null; + if (roomTypeRes.rows.length > 0) { + const hotelCode = roomTypeRes.rows[0].hotel_code; + logger.info(`Hotel code for room type: ${hotelCode}`); + + // 3. Get cloud data from cache + const cloudRow = getCloudData(hotelCode, item.roomTypeID, item.modalAddress); + + if (cloudRow) { + cloudData = cloudRow; + logger.info(`Found cloud data for loop ${item.id} from cache: ${JSON.stringify(cloudData)}`); } else { - logger.warn(`No room type found for room_type_id=${item.roomTypeID}, skipping cloud data sync`); + logger.warn(`No cloud data found for loop ${item.id} (hotel: ${hotelCode}, room type: ${item.roomTypeID}, address: ${item.modalAddress})`); } - - // 4. Insert with cloud data if available - const insertResult = await client.query( + } else { + logger.warn(`No room type found for room_type_id=${item.roomTypeID}, skipping cloud data sync`); + } + + // 4. Insert with cloud data if available + const guid = generateGuid(); + const insertParams = [ + guid, + item.id, + item.name, + item.roomTypeID, + item.modalAddress, + item.type, + cloudData?.type || null, + cloudData?.name || null, + cloudData?.power || null, + cloudData?.rate || null, + cloudData?.temperature || null, + cloudData?.air_type || null, + cloudData?.air_brand || null, + cloudData?.air_model || null, + cloudData?.height || null, + cloudData?.area || null, + cloudData?.heat_loss || null, + cloudData?.remark || null + ]; + + for (const { client: writeClient, name } of writeClients) { + const insertResult = await writeClient.query( `INSERT INTO temporary_project.loops ( - guid, id, loop_name, room_type_id, loop_address, loop_type, + guid, id, loop_name, room_type_id, loop_address, loop_type, type, name, power, rate, temperature, air_type, air_brand, air_model, height, area, heat_loss, remark ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)`, - [ - generateGuid(), - item.id, - item.name, - item.roomTypeID, - item.modalAddress, - item.type, - cloudData?.type || null, - cloudData?.name || null, - cloudData?.power || null, - cloudData?.rate || null, - cloudData?.temperature || null, - cloudData?.air_type || null, - cloudData?.air_brand || null, - cloudData?.air_model || null, - cloudData?.height || null, - cloudData?.area || null, - cloudData?.heat_loss || null, - cloudData?.remark || null - ] + insertParams ); - - logger.info(`Inserted loop ${item.id}, rowCount=${insertResult.rowCount}`); - successCount++; - - } catch (itemError) { - logger.error({ itemError, item }, `Error processing loop item ${item.id}`); - errorCount++; + logger.info(`Inserted loop ${item.id}, rowCount=${insertResult.rowCount} on ${name}`); } + + successCount++; } - await client.query('COMMIT'); + for (const { client: writeClient } of writeClients) { + await writeClient.query('COMMIT'); + } logger.info(`saveLoopsTransaction: Transaction committed. Success: ${successCount}, Errors: ${errorCount}`); + return { successCount, errorCount }; } catch (e) { - await client.query('ROLLBACK'); + errorCount++; + for (const { client: writeClient } of writeClients) { + try { + await writeClient.query('ROLLBACK'); + } catch (rollbackError) { + logger.error({ rollbackError }, 'saveLoopsTransaction: Rollback failed on one database'); + } + } logger.error({ e }, 'saveLoopsTransaction: Transaction rolled back due to error'); throw e; } finally { client.release(); + for (const { client: writeClient } of writeClients) { + writeClient.release(); + } } };