From 9872bee4871be6959bb3d9ee1b0d9a69578fa3d1 Mon Sep 17 00:00:00 2001 From: Indiem87 Date: Thu, 9 Apr 2026 12:00:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E8=83=BD=E8=80=97?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=88=9D=E5=A7=8B=E5=8C=96=E5=92=8C=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=80=BB=E8=BE=91=EF=BC=8C=E6=B7=BB=E5=8A=A0=E7=94=B5?= =?UTF-8?q?=E5=8E=8B=E3=80=81=E7=94=B5=E6=B5=81=E7=AD=89=E6=96=B0=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- database_test_python/realtime_g5_listener.py | 44 ++++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/database_test_python/realtime_g5_listener.py b/database_test_python/realtime_g5_listener.py index 98e5c20..47bc220 100644 --- a/database_test_python/realtime_g5_listener.py +++ b/database_test_python/realtime_g5_listener.py @@ -19,18 +19,6 @@ def start_listener(): target_engine = create_engine(f"postgresql+psycopg2://{test_kwargs['user']}:{test_kwargs['password']}@{test_kwargs['host']}:{test_kwargs['port']}/{test_kwargs['dbname']}") def initialize_energy_tracking(): - # alter_table_sql = """ - # DO $$ - # BEGIN - # BEGIN - # ALTER TABLE wh_test.loops_power RENAME COLUMN last_power_calc_time TO last_change_time; - # EXCEPTION - # WHEN undefined_column THEN -- column doesn't exist, already renamed - # WHEN others THEN - # RAISE NOTICE 'Skipping rename'; - # END; - # END $$; - # """ reset_loops_sql = """ -- [启动初始化] 清空回路的各项能耗字段 -- 目的: 以启动脚本的时刻为起点,彻底重置基础能耗累加,更新初始计算时间起点 last_computed_time (避免抛出第一周期差值为 NULL 的报错或极值问题) @@ -57,10 +45,6 @@ def start_listener(): """ try: with target_engine.begin() as conn: - # try: - # conn.execute(text(alter_table_sql)) - # except Exception as alter_err: - # pass # ignore if doesn't exist conn.execute(text(reset_loops_sql)) conn.execute(text(reset_rooms_sql)) logging.info("Energy tracking fields initialized (cleared total_energy_consumption and reset times).") @@ -79,6 +63,18 @@ def start_listener(): dev_loops = payload.get("dev_loops") bright_g = payload.get("bright_g") ts_ms = payload.get("ts_ms") + + def format_pg_array(val): + if isinstance(val, list): + return "{" + ",".join(str(x) for x in val) + "}" + return val + + elec_voltage = format_pg_array(payload.get("elec_voltage")) + elec_ampere = format_pg_array(payload.get("elec_ampere")) + elec_power = format_pg_array(payload.get("elec_power")) + elec_energy = format_pg_array(payload.get("elec_energy")) + elec_phase = format_pg_array(payload.get("elec_phase")) + elec_sum_energy = format_pg_array(payload.get("elec_sum_energy")) if isinstance(dev_loops, dict): sample_keys = list(dev_loops.keys())[:5] @@ -207,6 +203,12 @@ def start_listener(): UPDATE wh_test.rooms_power rp SET dev_loops = CAST(:dev_loops AS jsonb), ts_ms = COALESCE(CAST(:ts_ms AS int8), rp.ts_ms), + elec_voltage = COALESCE(CAST(:elec_voltage AS float8[]), rp.elec_voltage), + elec_ampere = COALESCE(CAST(:elec_ampere AS float8[]), rp.elec_ampere), + elec_power = COALESCE(CAST(:elec_power AS float8[]), rp.elec_power), + elec_energy = COALESCE(CAST(:elec_energy AS float8[]), rp.elec_energy), + elec_phase = COALESCE(CAST(:elec_phase AS float8[]), rp.elec_phase), + elec_sum_energy = COALESCE(CAST(:elec_sum_energy AS float8[]), rp.elec_sum_energy), -- 评判并标识RCU(房间网关)在线活跃度: -- 算法:若超过 300000ms(即5分钟)仍未有任何事件触发通信汇报,视作离线 0;否则在线 1 rcu_online = CASE @@ -282,7 +284,13 @@ def start_listener(): "hotel_id": hotel_id, "room_id": room_id, "device_id": device_id, - "ts_ms": ts_ms + "ts_ms": ts_ms, + "elec_voltage": elec_voltage, + "elec_ampere": elec_ampere, + "elec_power": elec_power, + "elec_energy": elec_energy, + "elec_phase": elec_phase, + "elec_sum_energy": elec_sum_energy }) logging.info(f"Updated hotel/room/device={hotel_id}/{room_id}/{device_id} | loops rows: {res2.rowcount}, rooms rows: {res1.rowcount}") if res2.rowcount == 0: @@ -301,7 +309,7 @@ def start_listener(): -- 轮询任务一: 结算仍在耗电且【稳定无设备变动操作】的回路能耗 (入 loops_power 表) -- ========================================== -- 背景与核心: 如果某个电器(比如灯)被打开后长达一小时都没有进行“关灯调整”操作,实时SQL就不会被触发,能耗就停滞不动了。 - -- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(kWh)。 + -- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(Wh)。 -- 区别: 它仅累计能耗并刷新 last_computed_time 锚点,它不再做全套状态、心率重解操作。 UPDATE wh_test.loops_power lp SET