From a2cc185279aedde2bf6fec39038194029b808933 Mon Sep 17 00:00:00 2001 From: Indiem87 Date: Thu, 9 Apr 2026 11:47:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E7=9B=91=E5=90=AC=E5=99=A8=EF=BC=8C=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E8=83=BD=E8=80=97=E6=95=B0=E6=8D=AE=E5=B9=B6=E5=AE=9A=E6=9C=9F?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- database_test_python/realtime_g5_listener.py | 434 +++++++++++++++++++ 1 file changed, 434 insertions(+) create mode 100644 database_test_python/realtime_g5_listener.py diff --git a/database_test_python/realtime_g5_listener.py b/database_test_python/realtime_g5_listener.py new file mode 100644 index 0000000..98e5c20 --- /dev/null +++ b/database_test_python/realtime_g5_listener.py @@ -0,0 +1,434 @@ +import os +import json +import logging +import select +import psycopg2 +from psycopg2.extras import RealDictCursor +from sqlalchemy import create_engine, text + +from db_config import get_pg_conn_kwargs, get_db_url + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +import time +import threading + +def start_listener(): + # 1. 建立目标库写连接 (test) - 引擎可以复用 + test_kwargs = get_pg_conn_kwargs("TEST") + target_engine = create_engine(f"postgresql+psycopg2://{test_kwargs['user']}:{test_kwargs['password']}@{test_kwargs['host']}:{test_kwargs['port']}/{test_kwargs['dbname']}") + + def initialize_energy_tracking(): + # alter_table_sql = """ + # DO $$ + # BEGIN + # BEGIN + # ALTER TABLE wh_test.loops_power RENAME COLUMN last_power_calc_time TO last_change_time; + # EXCEPTION + # WHEN undefined_column THEN -- column doesn't exist, already renamed + # WHEN others THEN + # RAISE NOTICE 'Skipping rename'; + # END; + # END $$; + # """ + reset_loops_sql = """ + -- [启动初始化] 清空回路的各项能耗字段 + -- 目的: 以启动脚本的时刻为起点,彻底重置基础能耗累加,更新初始计算时间起点 last_computed_time (避免抛出第一周期差值为 NULL 的报错或极值问题) + UPDATE wh_test.loops_power + SET total_energy_consumption = 0, + last_change_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint, + -- 不再填 NULL,必须打上脚本成功启动的时间戳,作为能耗运算的起点 + last_computed_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint + """ + reset_rooms_sql = """ + -- [启动初始化] 清空房间表里的聚合缓存 + -- 目的: 将 rooms_power 内部的聚合型 jsonb 对象内的数据重新清空为0,保持与 loops_power 基线同步一致 + UPDATE wh_test.rooms_power rp + SET loops_power = ( + SELECT COALESCE(jsonb_object_agg(key, 0.0), '{}'::jsonb) + FROM jsonb_object_keys(NULLIF(rp.dev_loops, '{}'::jsonb)) AS key + ), + loops_energy_consumption = ( + SELECT COALESCE(jsonb_object_agg(key, 0.0), '{}'::jsonb) + FROM jsonb_object_keys(NULLIF(rp.dev_loops, '{}'::jsonb)) AS key + ), + ideal_energy_consumption = 0.0 + WHERE dev_loops IS NOT NULL AND dev_loops != '{}'::jsonb; + """ + try: + with target_engine.begin() as conn: + # try: + # conn.execute(text(alter_table_sql)) + # except Exception as alter_err: + # pass # ignore if doesn't exist + conn.execute(text(reset_loops_sql)) + conn.execute(text(reset_rooms_sql)) + logging.info("Energy tracking fields initialized (cleared total_energy_consumption and reset times).") + except Exception as e: + logging.error(f"Failed to initialize energy tracking: {e}") + + # 启动前初始化功耗数据和时间 + initialize_energy_tracking() + + def update_target_db(payload: dict): + """收到变更后的执行函数""" + # payload 含有: hotel_id, room_id, device_id, dev_loops, bright_g + hotel_id = payload.get("hotel_id") + room_id = payload.get("room_id") + device_id = payload.get("device_id") + dev_loops = payload.get("dev_loops") + bright_g = payload.get("bright_g") + ts_ms = payload.get("ts_ms") + + if isinstance(dev_loops, dict): + sample_keys = list(dev_loops.keys())[:5] + logging.info( + "Payload summary hotel_id=%s room_id=%s device_id=%s bright_g=%s dev_loops_keys(sample)=%s", + hotel_id, + room_id, + device_id, + bright_g, + sample_keys, + ) + else: + logging.info( + "Payload summary hotel_id=%s room_id=%s device_id=%s bright_g=%s dev_loops_type=%s", + hotel_id, + room_id, + device_id, + bright_g, + type(dev_loops).__name__, + ) + + # 将 dev_loops 转为 json 字符串以便入库,如果它是空则转为 '{}' + if dev_loops is None: + dev_loops_str = '{}' + elif isinstance(dev_loops, (dict, list)): + dev_loops_str = json.dumps(dev_loops) + else: + dev_loops_str = str(dev_loops) + + # 执行更新 (先更新 loops_power 得出最新功耗,再更新 rooms_power 组装 json) + update_loops_sql = r""" + -- ========================================== + -- 实时动作一: 更新被触发房间下各底层回路数据 (入 loops_power 表) + -- ========================================== + WITH params AS ( + -- 使用 CTE 解析并规范外部传入的数据参数,用于下面的动态替换 + SELECT + CAST(:hotel_id AS int) AS p_hotel_id, + TRIM(CAST(:room_id AS varchar)) AS p_room_id, + CAST(:bright_g AS int2) AS p_bright_g, + CAST(:dev_loops AS jsonb) AS p_dev_loops + ) + UPDATE wh_test.loops_power lp + SET + -- 【能耗累加速度】在更新新功率前,用「现存旧功率」*「距离上次计算经过的时间」得出本时段产生的累加能耗。 + -- 单位换算与运算: 毫秒差 -> 除以3600000(换算为经过的小时) -> 乘以当前旧功率瓦数(换算为Wh) -> 除以1000(降级千瓦并储存为度电 kWh)。 + total_energy_consumption = COALESCE(lp.total_energy_consumption, 0) + + (COALESCE(lp.real_power, 0) * ( (((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint) - COALESCE(lp.last_computed_time, ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint)))::float8 / 3600000.0 / 1000.0 )), + + -- 更新结算基准锚点时间为当前 + last_computed_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint, + last_change_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint, + bright_g = COALESCE(p.p_bright_g, lp.bright_g), + rate = COALESCE( + NULLIF(regexp_replace( + COALESCE( + p.p_dev_loops->>TRIM(lp.loop_address), + p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''), + p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0') + ), + '\D', '', 'g' + ), '')::int >> 8 & 255, + lp.rate + ), + online_status = COALESCE( + CASE WHEN COALESCE( + p.p_dev_loops->>TRIM(lp.loop_address), + p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''), + p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0') + ) IS NULL THEN NULL + WHEN (NULLIF(regexp_replace( + COALESCE( + p.p_dev_loops->>TRIM(lp.loop_address), + p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''), + p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0') + ), + '\D', '', 'g' + ), '')::int & 255) = 1 THEN 1 + ELSE 0 + END, + lp.online_status + ), + real_power = ( + COALESCE(lp.power, 0) * + COALESCE(NULLIF(regexp_replace( + COALESCE( + p.p_dev_loops->>TRIM(lp.loop_address), + p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''), + p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0') + ), + '\D', '', 'g' + ), '')::int >> 8 & 255, lp.rate, 0) * + COALESCE(p.p_bright_g, lp.bright_g, 0) * + COALESCE( + CASE WHEN COALESCE( + p.p_dev_loops->>TRIM(lp.loop_address), + p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''), + p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0') + ) IS NULL THEN NULL + WHEN (NULLIF(regexp_replace( + COALESCE( + p.p_dev_loops->>TRIM(lp.loop_address), + p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''), + p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0') + ), + '\D', '', 'g' + ), '')::int & 255) = 1 THEN 1 + ELSE 0 + END, + lp.online_status, + 0 + ) + ) / 10000.0 + FROM params p + WHERE lp.hotel_id = p.p_hotel_id + AND TRIM(lp.room_id) = p.p_room_id + AND TRIM(lp.loop_address) LIKE '024%%'; + """ + + update_rooms_sql = """ + -- ========================================== + -- 实时动作二: 将离散的子回路数据聚合卷起到房间主表 (更新房间级 rooms_power) + -- ========================================== + -- 动作说明:上一步刚刚将各项最新的实际功率(real_power)和能耗打分算出,这里立刻用 jsonb_object_agg + -- 在外层循环出所有存在设备(dev_loops)中的 keys,将底层对应的数值打包为一个完整 JSON 同步更新给上游。 + UPDATE wh_test.rooms_power rp + SET dev_loops = CAST(:dev_loops AS jsonb), + ts_ms = COALESCE(CAST(:ts_ms AS int8), rp.ts_ms), + -- 评判并标识RCU(房间网关)在线活跃度: + -- 算法:若超过 300000ms(即5分钟)仍未有任何事件触发通信汇报,视作离线 0;否则在线 1 + rcu_online = CASE + WHEN ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint - COALESCE(CAST(:ts_ms AS int8), rp.ts_ms)) > 300000 THEN 0 + ELSE 1 + END, + + -- 将底层(loops_power表)本房间的所有最新实时功率字典化,映射给外层 + loops_power = ( + SELECT COALESCE(jsonb_object_agg( + key, + COALESCE(lp.real_power, 0.0) + ), '{}'::jsonb) + FROM jsonb_object_keys(CAST(:dev_loops AS jsonb)) AS key + LEFT JOIN ( + SELECT loop_address, real_power + FROM wh_test.loops_power + WHERE hotel_id = CAST(:hotel_id AS int) AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar)) + ) lp + ON ( + TRIM(lp.loop_address) = key OR + REPLACE(TRIM(lp.loop_address), '-', '') = key OR + LTRIM(TRIM(lp.loop_address), '0') = key + ) + ), + loops_energy_consumption = ( + SELECT COALESCE(jsonb_object_agg( + key, + COALESCE(lp.total_energy_consumption, 0.0) + ), '{}'::jsonb) + FROM jsonb_object_keys(CAST(:dev_loops AS jsonb)) AS key + LEFT JOIN ( + SELECT loop_address, total_energy_consumption + FROM wh_test.loops_power + WHERE hotel_id = CAST(:hotel_id AS int) AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar)) + ) lp + ON ( + TRIM(lp.loop_address) = key OR + REPLACE(TRIM(lp.loop_address), '-', '') = key OR + LTRIM(TRIM(lp.loop_address), '0') = key + ) + ), + ideal_energy_consumption = ( + SELECT COALESCE(SUM(COALESCE(lp.total_energy_consumption, 0.0)), 0.0) + FROM jsonb_object_keys(CAST(:dev_loops AS jsonb)) AS key + LEFT JOIN ( + SELECT loop_address, total_energy_consumption + FROM wh_test.loops_power + WHERE hotel_id = CAST(:hotel_id AS int) AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar)) + ) lp + ON ( + TRIM(lp.loop_address) = key OR + REPLACE(TRIM(lp.loop_address), '-', '') = key OR + LTRIM(TRIM(lp.loop_address), '0') = key + ) + ) + WHERE hotel_id = CAST(:hotel_id AS int) + AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar)) + AND device_id = CAST(:device_id AS varchar); + """ + + try: + with target_engine.begin() as conn: + # 务必先执行 loops_power 计算产生最新功耗,再执行 rooms_power 组装抽取 + res2 = conn.execute(text(update_loops_sql), { + "bright_g": bright_g, + "dev_loops": dev_loops_str, + "hotel_id": hotel_id, + "room_id": room_id + }) + res1 = conn.execute(text(update_rooms_sql), { + "dev_loops": dev_loops_str, + "hotel_id": hotel_id, + "room_id": room_id, + "device_id": device_id, + "ts_ms": ts_ms + }) + logging.info(f"Updated hotel/room/device={hotel_id}/{room_id}/{device_id} | loops rows: {res2.rowcount}, rooms rows: {res1.rowcount}") + if res2.rowcount == 0: + logging.warning( + "No loops_power rows updated. Check room_id mapping and whether dev_loops keys match loop_address (024...) for hotel_id=%s room_id=%s", + hotel_id, + room_id, + ) + except Exception as e: + logging.error(f"Error updating target database: {str(e)}") + + def run_polling_update(): + """Periodic background update for energy tracking without touching source trigger times.""" + update_loops_sql = r""" + -- ========================================== + -- 轮询任务一: 结算仍在耗电且【稳定无设备变动操作】的回路能耗 (入 loops_power 表) + -- ========================================== + -- 背景与核心: 如果某个电器(比如灯)被打开后长达一小时都没有进行“关灯调整”操作,实时SQL就不会被触发,能耗就停滞不动了。 + -- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(kWh)。 + -- 区别: 它仅累计能耗并刷新 last_computed_time 锚点,它不再做全套状态、心率重解操作。 + UPDATE wh_test.loops_power lp + SET + total_energy_consumption = COALESCE(lp.total_energy_consumption, 0) + + (COALESCE(lp.real_power, 0) * ( (((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint) - COALESCE(lp.last_computed_time, ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint)))::float8 / 3600000.0 / 1000.0 )), + last_computed_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint + WHERE lp.real_power IS NOT NULL AND lp.real_power > 0; + """ + + update_rooms_sql = r""" + -- ========================================== + -- 轮询任务二: 将上面轮询强算出来的变化过的房间能耗,聚合成 JSON 刷出 + -- ========================================== + -- 优化核心点: 为了防止上面循环几万个实际上完全没住人、离线的“僵尸房间”导致 DB 锁库, + -- 引入临时视图 CTE (active_rooms),仅从真正有机器运行(real_power>0)的活跃库里连表做聚合。 + WITH active_rooms AS ( + SELECT DISTINCT hotel_id, TRIM(room_id) AS room_id + FROM wh_test.loops_power + WHERE real_power IS NOT NULL AND real_power > 0 + ) + UPDATE wh_test.rooms_power rp + -- 继续检测僵尸网关,5分钟失联状态同步下线 + SET rcu_online = CASE + WHEN ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint - rp.ts_ms) > 300000 THEN 0 + ELSE 1 + END, + loops_energy_consumption = ( + SELECT COALESCE(jsonb_object_agg( + key, + COALESCE(lp.total_energy_consumption, 0.0) + ), '{}'::jsonb) + FROM jsonb_object_keys(rp.dev_loops) AS key + LEFT JOIN ( + SELECT loop_address, total_energy_consumption + FROM wh_test.loops_power + WHERE hotel_id = rp.hotel_id AND TRIM(room_id) = TRIM(rp.room_id) + ) lp + ON ( + TRIM(lp.loop_address) = key OR + REPLACE(TRIM(lp.loop_address), '-', '') = key OR + LTRIM(TRIM(lp.loop_address), '0') = key + ) + ), + ideal_energy_consumption = ( + SELECT COALESCE(SUM(COALESCE(lp.total_energy_consumption, 0.0)), 0.0) + FROM jsonb_object_keys(rp.dev_loops) AS key + LEFT JOIN ( + SELECT loop_address, total_energy_consumption + FROM wh_test.loops_power + WHERE hotel_id = rp.hotel_id AND TRIM(room_id) = TRIM(rp.room_id) + ) lp + ON ( + TRIM(lp.loop_address) = key OR + REPLACE(TRIM(lp.loop_address), '-', '') = key OR + LTRIM(TRIM(lp.loop_address), '0') = key + ) + ) + FROM active_rooms ar + WHERE rp.hotel_id = ar.hotel_id + AND TRIM(rp.room_id) = ar.room_id + AND rp.dev_loops IS NOT NULL + AND rp.dev_loops != '{}'::jsonb; + """ + try: + with target_engine.begin() as conn: + res2 = conn.execute(text(update_loops_sql)) + res1 = conn.execute(text(update_rooms_sql)) + logging.info(f"Polling update complete. loops: {res2.rowcount}, rooms: {res1.rowcount}") + except Exception as e: + logging.error(f"Error in polling update: {e}") + + # 3. 阻塞等待通知并在断线时自动重连 + portal_kwargs = get_pg_conn_kwargs("PORTAL") + + # 开启 TCP 持久保活机制 (TCP Keepalive) 应对防火墙超时问题 + portal_kwargs['keepalives'] = 1 + portal_kwargs['keepalives_idle'] = 60 + portal_kwargs['keepalives_interval'] = 10 + portal_kwargs['keepalives_count'] = 5 + + channel = 'g5_realtime_channel' + + while True: + try: + logging.info(f"Connecting to Source DB for listening: {portal_kwargs['host']}:{portal_kwargs['port']}/{portal_kwargs['dbname']}") + source_conn = psycopg2.connect(**portal_kwargs) + source_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = source_conn.cursor() + cur.execute(f"LISTEN {channel};") + logging.info(f"Listening on channel '{channel}'...") + + last_poll_time = time.time() + while True: + # 5秒超时来心跳一下 + if select.select([source_conn], [], [], 5) == ([], [], []): + pass + else: + source_conn.poll() + while source_conn.notifies: + notify = source_conn.notifies.pop(0) + try: + payload = json.loads(notify.payload) + update_target_db(payload) + except json.JSONDecodeError: + logging.error(f"Failed to parse payload: {notify.payload}") + + # Check for polling update (e.g. every 60 seconds) + if time.time() - last_poll_time >= 60: + threading.Thread(target=run_polling_update, daemon=True).start() + last_poll_time = time.time() + + except psycopg2.OperationalError as e: + logging.error(f"PostgreSQL connection lost. Attempting to reconnect in 5 seconds... details: {e}") + try: + if 'source_conn' in locals() and source_conn is not None: + source_conn.close() + except Exception: + pass + time.sleep(5) + except KeyboardInterrupt: + logging.info("Shutting down listener...") + if 'source_conn' in locals() and source_conn is not None: + source_conn.close() + break + except Exception as e: + logging.error(f"Unexpected error: {e}") + time.sleep(5) + +if __name__ == "__main__": + start_listener()