feat: 优化能耗数据初始化和更新逻辑,添加电压、电流等新字段支持

This commit is contained in:
2026-04-09 12:00:14 +08:00
parent d3ece5f0ca
commit 9872bee487

View File

@@ -19,18 +19,6 @@ def start_listener():
target_engine = create_engine(f"postgresql+psycopg2://{test_kwargs['user']}:{test_kwargs['password']}@{test_kwargs['host']}:{test_kwargs['port']}/{test_kwargs['dbname']}") target_engine = create_engine(f"postgresql+psycopg2://{test_kwargs['user']}:{test_kwargs['password']}@{test_kwargs['host']}:{test_kwargs['port']}/{test_kwargs['dbname']}")
def initialize_energy_tracking(): def initialize_energy_tracking():
# alter_table_sql = """
# DO $$
# BEGIN
# BEGIN
# ALTER TABLE wh_test.loops_power RENAME COLUMN last_power_calc_time TO last_change_time;
# EXCEPTION
# WHEN undefined_column THEN -- column doesn't exist, already renamed
# WHEN others THEN
# RAISE NOTICE 'Skipping rename';
# END;
# END $$;
# """
reset_loops_sql = """ reset_loops_sql = """
-- [启动初始化] 清空回路的各项能耗字段 -- [启动初始化] 清空回路的各项能耗字段
-- 目的: 以启动脚本的时刻为起点,彻底重置基础能耗累加,更新初始计算时间起点 last_computed_time (避免抛出第一周期差值为 NULL 的报错或极值问题) -- 目的: 以启动脚本的时刻为起点,彻底重置基础能耗累加,更新初始计算时间起点 last_computed_time (避免抛出第一周期差值为 NULL 的报错或极值问题)
@@ -57,10 +45,6 @@ def start_listener():
""" """
try: try:
with target_engine.begin() as conn: with target_engine.begin() as conn:
# try:
# conn.execute(text(alter_table_sql))
# except Exception as alter_err:
# pass # ignore if doesn't exist
conn.execute(text(reset_loops_sql)) conn.execute(text(reset_loops_sql))
conn.execute(text(reset_rooms_sql)) conn.execute(text(reset_rooms_sql))
logging.info("Energy tracking fields initialized (cleared total_energy_consumption and reset times).") logging.info("Energy tracking fields initialized (cleared total_energy_consumption and reset times).")
@@ -80,6 +64,18 @@ def start_listener():
bright_g = payload.get("bright_g") bright_g = payload.get("bright_g")
ts_ms = payload.get("ts_ms") ts_ms = payload.get("ts_ms")
def format_pg_array(val):
if isinstance(val, list):
return "{" + ",".join(str(x) for x in val) + "}"
return val
elec_voltage = format_pg_array(payload.get("elec_voltage"))
elec_ampere = format_pg_array(payload.get("elec_ampere"))
elec_power = format_pg_array(payload.get("elec_power"))
elec_energy = format_pg_array(payload.get("elec_energy"))
elec_phase = format_pg_array(payload.get("elec_phase"))
elec_sum_energy = format_pg_array(payload.get("elec_sum_energy"))
if isinstance(dev_loops, dict): if isinstance(dev_loops, dict):
sample_keys = list(dev_loops.keys())[:5] sample_keys = list(dev_loops.keys())[:5]
logging.info( logging.info(
@@ -207,6 +203,12 @@ def start_listener():
UPDATE wh_test.rooms_power rp UPDATE wh_test.rooms_power rp
SET dev_loops = CAST(:dev_loops AS jsonb), SET dev_loops = CAST(:dev_loops AS jsonb),
ts_ms = COALESCE(CAST(:ts_ms AS int8), rp.ts_ms), ts_ms = COALESCE(CAST(:ts_ms AS int8), rp.ts_ms),
elec_voltage = COALESCE(CAST(:elec_voltage AS float8[]), rp.elec_voltage),
elec_ampere = COALESCE(CAST(:elec_ampere AS float8[]), rp.elec_ampere),
elec_power = COALESCE(CAST(:elec_power AS float8[]), rp.elec_power),
elec_energy = COALESCE(CAST(:elec_energy AS float8[]), rp.elec_energy),
elec_phase = COALESCE(CAST(:elec_phase AS float8[]), rp.elec_phase),
elec_sum_energy = COALESCE(CAST(:elec_sum_energy AS float8[]), rp.elec_sum_energy),
-- 评判并标识RCU(房间网关)在线活跃度: -- 评判并标识RCU(房间网关)在线活跃度:
-- 算法:若超过 300000ms即5分钟仍未有任何事件触发通信汇报视作离线 0否则在线 1 -- 算法:若超过 300000ms即5分钟仍未有任何事件触发通信汇报视作离线 0否则在线 1
rcu_online = CASE rcu_online = CASE
@@ -282,7 +284,13 @@ def start_listener():
"hotel_id": hotel_id, "hotel_id": hotel_id,
"room_id": room_id, "room_id": room_id,
"device_id": device_id, "device_id": device_id,
"ts_ms": ts_ms "ts_ms": ts_ms,
"elec_voltage": elec_voltage,
"elec_ampere": elec_ampere,
"elec_power": elec_power,
"elec_energy": elec_energy,
"elec_phase": elec_phase,
"elec_sum_energy": elec_sum_energy
}) })
logging.info(f"Updated hotel/room/device={hotel_id}/{room_id}/{device_id} | loops rows: {res2.rowcount}, rooms rows: {res1.rowcount}") logging.info(f"Updated hotel/room/device={hotel_id}/{room_id}/{device_id} | loops rows: {res2.rowcount}, rooms rows: {res1.rowcount}")
if res2.rowcount == 0: if res2.rowcount == 0:
@@ -301,7 +309,7 @@ def start_listener():
-- 轮询任务一: 结算仍在耗电且【稳定无设备变动操作】的回路能耗 (入 loops_power 表) -- 轮询任务一: 结算仍在耗电且【稳定无设备变动操作】的回路能耗 (入 loops_power 表)
-- ========================================== -- ==========================================
-- 背景与核心: 如果某个电器(比如灯)被打开后长达一小时都没有进行“关灯调整”操作实时SQL就不会被触发能耗就停滞不动了。 -- 背景与核心: 如果某个电器(比如灯)被打开后长达一小时都没有进行“关灯调整”操作实时SQL就不会被触发能耗就停滞不动了。
-- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(kWh)。 -- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(Wh)。
-- 区别: 它仅累计能耗并刷新 last_computed_time 锚点,它不再做全套状态、心率重解操作。 -- 区别: 它仅累计能耗并刷新 last_computed_time 锚点,它不再做全套状态、心率重解操作。
UPDATE wh_test.loops_power lp UPDATE wh_test.loops_power lp
SET SET