diff --git a/database_test_python/realtime_g5_listener.py b/database_test_python/realtime_g5_listener.py index 98e5c20..47bc220 100644 --- a/database_test_python/realtime_g5_listener.py +++ b/database_test_python/realtime_g5_listener.py @@ -19,18 +19,6 @@ def start_listener(): target_engine = create_engine(f"postgresql+psycopg2://{test_kwargs['user']}:{test_kwargs['password']}@{test_kwargs['host']}:{test_kwargs['port']}/{test_kwargs['dbname']}") def initialize_energy_tracking(): - # alter_table_sql = """ - # DO $$ - # BEGIN - # BEGIN - # ALTER TABLE wh_test.loops_power RENAME COLUMN last_power_calc_time TO last_change_time; - # EXCEPTION - # WHEN undefined_column THEN -- column doesn't exist, already renamed - # WHEN others THEN - # RAISE NOTICE 'Skipping rename'; - # END; - # END $$; - # """ reset_loops_sql = """ -- [启动初始化] 清空回路的各项能耗字段 -- 目的: 以启动脚本的时刻为起点,彻底重置基础能耗累加,更新初始计算时间起点 last_computed_time (避免抛出第一周期差值为 NULL 的报错或极值问题) @@ -57,10 +45,6 @@ def start_listener(): """ try: with target_engine.begin() as conn: - # try: - # conn.execute(text(alter_table_sql)) - # except Exception as alter_err: - # pass # ignore if doesn't exist conn.execute(text(reset_loops_sql)) conn.execute(text(reset_rooms_sql)) logging.info("Energy tracking fields initialized (cleared total_energy_consumption and reset times).") @@ -79,6 +63,18 @@ def start_listener(): dev_loops = payload.get("dev_loops") bright_g = payload.get("bright_g") ts_ms = payload.get("ts_ms") + + def format_pg_array(val): + if isinstance(val, list): + return "{" + ",".join(str(x) for x in val) + "}" + return val + + elec_voltage = format_pg_array(payload.get("elec_voltage")) + elec_ampere = format_pg_array(payload.get("elec_ampere")) + elec_power = format_pg_array(payload.get("elec_power")) + elec_energy = format_pg_array(payload.get("elec_energy")) + elec_phase = format_pg_array(payload.get("elec_phase")) + elec_sum_energy = format_pg_array(payload.get("elec_sum_energy")) if isinstance(dev_loops, dict): sample_keys = list(dev_loops.keys())[:5] @@ -207,6 +203,12 @@ def start_listener(): UPDATE wh_test.rooms_power rp SET dev_loops = CAST(:dev_loops AS jsonb), ts_ms = COALESCE(CAST(:ts_ms AS int8), rp.ts_ms), + elec_voltage = COALESCE(CAST(:elec_voltage AS float8[]), rp.elec_voltage), + elec_ampere = COALESCE(CAST(:elec_ampere AS float8[]), rp.elec_ampere), + elec_power = COALESCE(CAST(:elec_power AS float8[]), rp.elec_power), + elec_energy = COALESCE(CAST(:elec_energy AS float8[]), rp.elec_energy), + elec_phase = COALESCE(CAST(:elec_phase AS float8[]), rp.elec_phase), + elec_sum_energy = COALESCE(CAST(:elec_sum_energy AS float8[]), rp.elec_sum_energy), -- 评判并标识RCU(房间网关)在线活跃度: -- 算法:若超过 300000ms(即5分钟)仍未有任何事件触发通信汇报,视作离线 0;否则在线 1 rcu_online = CASE @@ -282,7 +284,13 @@ def start_listener(): "hotel_id": hotel_id, "room_id": room_id, "device_id": device_id, - "ts_ms": ts_ms + "ts_ms": ts_ms, + "elec_voltage": elec_voltage, + "elec_ampere": elec_ampere, + "elec_power": elec_power, + "elec_energy": elec_energy, + "elec_phase": elec_phase, + "elec_sum_energy": elec_sum_energy }) logging.info(f"Updated hotel/room/device={hotel_id}/{room_id}/{device_id} | loops rows: {res2.rowcount}, rooms rows: {res1.rowcount}") if res2.rowcount == 0: @@ -301,7 +309,7 @@ def start_listener(): -- 轮询任务一: 结算仍在耗电且【稳定无设备变动操作】的回路能耗 (入 loops_power 表) -- ========================================== -- 背景与核心: 如果某个电器(比如灯)被打开后长达一小时都没有进行“关灯调整”操作,实时SQL就不会被触发,能耗就停滞不动了。 - -- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(kWh)。 + -- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(Wh)。 -- 区别: 它仅累计能耗并刷新 last_computed_time 锚点,它不再做全套状态、心率重解操作。 UPDATE wh_test.loops_power lp SET