feat: 添加实时监听器,初始化能耗数据并定期更新数据库

This commit is contained in:
2026-04-09 11:47:58 +08:00
parent aa9487f680
commit a2cc185279

View File

@@ -0,0 +1,434 @@
import os
import json
import logging
import select
import psycopg2
from psycopg2.extras import RealDictCursor
from sqlalchemy import create_engine, text
from db_config import get_pg_conn_kwargs, get_db_url
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
import time
import threading
def start_listener():
# 1. 建立目标库写连接 (test) - 引擎可以复用
test_kwargs = get_pg_conn_kwargs("TEST")
target_engine = create_engine(f"postgresql+psycopg2://{test_kwargs['user']}:{test_kwargs['password']}@{test_kwargs['host']}:{test_kwargs['port']}/{test_kwargs['dbname']}")
def initialize_energy_tracking():
# alter_table_sql = """
# DO $$
# BEGIN
# BEGIN
# ALTER TABLE wh_test.loops_power RENAME COLUMN last_power_calc_time TO last_change_time;
# EXCEPTION
# WHEN undefined_column THEN -- column doesn't exist, already renamed
# WHEN others THEN
# RAISE NOTICE 'Skipping rename';
# END;
# END $$;
# """
reset_loops_sql = """
-- [启动初始化] 清空回路的各项能耗字段
-- 目的: 以启动脚本的时刻为起点,彻底重置基础能耗累加,更新初始计算时间起点 last_computed_time (避免抛出第一周期差值为 NULL 的报错或极值问题)
UPDATE wh_test.loops_power
SET total_energy_consumption = 0,
last_change_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint,
-- 不再填 NULL必须打上脚本成功启动的时间戳作为能耗运算的起点
last_computed_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint
"""
reset_rooms_sql = """
-- [启动初始化] 清空房间表里的聚合缓存
-- 目的: 将 rooms_power 内部的聚合型 jsonb 对象内的数据重新清空为0保持与 loops_power 基线同步一致
UPDATE wh_test.rooms_power rp
SET loops_power = (
SELECT COALESCE(jsonb_object_agg(key, 0.0), '{}'::jsonb)
FROM jsonb_object_keys(NULLIF(rp.dev_loops, '{}'::jsonb)) AS key
),
loops_energy_consumption = (
SELECT COALESCE(jsonb_object_agg(key, 0.0), '{}'::jsonb)
FROM jsonb_object_keys(NULLIF(rp.dev_loops, '{}'::jsonb)) AS key
),
ideal_energy_consumption = 0.0
WHERE dev_loops IS NOT NULL AND dev_loops != '{}'::jsonb;
"""
try:
with target_engine.begin() as conn:
# try:
# conn.execute(text(alter_table_sql))
# except Exception as alter_err:
# pass # ignore if doesn't exist
conn.execute(text(reset_loops_sql))
conn.execute(text(reset_rooms_sql))
logging.info("Energy tracking fields initialized (cleared total_energy_consumption and reset times).")
except Exception as e:
logging.error(f"Failed to initialize energy tracking: {e}")
# 启动前初始化功耗数据和时间
initialize_energy_tracking()
def update_target_db(payload: dict):
"""收到变更后的执行函数"""
# payload 含有: hotel_id, room_id, device_id, dev_loops, bright_g
hotel_id = payload.get("hotel_id")
room_id = payload.get("room_id")
device_id = payload.get("device_id")
dev_loops = payload.get("dev_loops")
bright_g = payload.get("bright_g")
ts_ms = payload.get("ts_ms")
if isinstance(dev_loops, dict):
sample_keys = list(dev_loops.keys())[:5]
logging.info(
"Payload summary hotel_id=%s room_id=%s device_id=%s bright_g=%s dev_loops_keys(sample)=%s",
hotel_id,
room_id,
device_id,
bright_g,
sample_keys,
)
else:
logging.info(
"Payload summary hotel_id=%s room_id=%s device_id=%s bright_g=%s dev_loops_type=%s",
hotel_id,
room_id,
device_id,
bright_g,
type(dev_loops).__name__,
)
# 将 dev_loops 转为 json 字符串以便入库,如果它是空则转为 '{}'
if dev_loops is None:
dev_loops_str = '{}'
elif isinstance(dev_loops, (dict, list)):
dev_loops_str = json.dumps(dev_loops)
else:
dev_loops_str = str(dev_loops)
# 执行更新 (先更新 loops_power 得出最新功耗,再更新 rooms_power 组装 json)
update_loops_sql = r"""
-- ==========================================
-- 实时动作一: 更新被触发房间下各底层回路数据 (入 loops_power 表)
-- ==========================================
WITH params AS (
-- 使用 CTE 解析并规范外部传入的数据参数,用于下面的动态替换
SELECT
CAST(:hotel_id AS int) AS p_hotel_id,
TRIM(CAST(:room_id AS varchar)) AS p_room_id,
CAST(:bright_g AS int2) AS p_bright_g,
CAST(:dev_loops AS jsonb) AS p_dev_loops
)
UPDATE wh_test.loops_power lp
SET
-- 【能耗累加速度】在更新新功率前,用「现存旧功率」*「距离上次计算经过的时间」得出本时段产生的累加能耗。
-- 单位换算与运算: 毫秒差 -> 除以3600000(换算为经过的小时) -> 乘以当前旧功率瓦数(换算为Wh) -> 除以1000(降级千瓦并储存为度电 kWh)。
total_energy_consumption = COALESCE(lp.total_energy_consumption, 0) +
(COALESCE(lp.real_power, 0) * ( (((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint) - COALESCE(lp.last_computed_time, ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint)))::float8 / 3600000.0 / 1000.0 )),
-- 更新结算基准锚点时间为当前
last_computed_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint,
last_change_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint,
bright_g = COALESCE(p.p_bright_g, lp.bright_g),
rate = COALESCE(
NULLIF(regexp_replace(
COALESCE(
p.p_dev_loops->>TRIM(lp.loop_address),
p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''),
p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0')
),
'\D', '', 'g'
), '')::int >> 8 & 255,
lp.rate
),
online_status = COALESCE(
CASE WHEN COALESCE(
p.p_dev_loops->>TRIM(lp.loop_address),
p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''),
p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0')
) IS NULL THEN NULL
WHEN (NULLIF(regexp_replace(
COALESCE(
p.p_dev_loops->>TRIM(lp.loop_address),
p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''),
p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0')
),
'\D', '', 'g'
), '')::int & 255) = 1 THEN 1
ELSE 0
END,
lp.online_status
),
real_power = (
COALESCE(lp.power, 0) *
COALESCE(NULLIF(regexp_replace(
COALESCE(
p.p_dev_loops->>TRIM(lp.loop_address),
p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''),
p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0')
),
'\D', '', 'g'
), '')::int >> 8 & 255, lp.rate, 0) *
COALESCE(p.p_bright_g, lp.bright_g, 0) *
COALESCE(
CASE WHEN COALESCE(
p.p_dev_loops->>TRIM(lp.loop_address),
p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''),
p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0')
) IS NULL THEN NULL
WHEN (NULLIF(regexp_replace(
COALESCE(
p.p_dev_loops->>TRIM(lp.loop_address),
p.p_dev_loops->>REPLACE(TRIM(lp.loop_address), '-', ''),
p.p_dev_loops->>LTRIM(TRIM(lp.loop_address), '0')
),
'\D', '', 'g'
), '')::int & 255) = 1 THEN 1
ELSE 0
END,
lp.online_status,
0
)
) / 10000.0
FROM params p
WHERE lp.hotel_id = p.p_hotel_id
AND TRIM(lp.room_id) = p.p_room_id
AND TRIM(lp.loop_address) LIKE '024%%';
"""
update_rooms_sql = """
-- ==========================================
-- 实时动作二: 将离散的子回路数据聚合卷起到房间主表 (更新房间级 rooms_power)
-- ==========================================
-- 动作说明:上一步刚刚将各项最新的实际功率(real_power)和能耗打分算出,这里立刻用 jsonb_object_agg
-- 在外层循环出所有存在设备(dev_loops)中的 keys将底层对应的数值打包为一个完整 JSON 同步更新给上游。
UPDATE wh_test.rooms_power rp
SET dev_loops = CAST(:dev_loops AS jsonb),
ts_ms = COALESCE(CAST(:ts_ms AS int8), rp.ts_ms),
-- 评判并标识RCU(房间网关)在线活跃度:
-- 算法:若超过 300000ms即5分钟仍未有任何事件触发通信汇报视作离线 0否则在线 1
rcu_online = CASE
WHEN ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint - COALESCE(CAST(:ts_ms AS int8), rp.ts_ms)) > 300000 THEN 0
ELSE 1
END,
-- 将底层(loops_power表)本房间的所有最新实时功率字典化,映射给外层
loops_power = (
SELECT COALESCE(jsonb_object_agg(
key,
COALESCE(lp.real_power, 0.0)
), '{}'::jsonb)
FROM jsonb_object_keys(CAST(:dev_loops AS jsonb)) AS key
LEFT JOIN (
SELECT loop_address, real_power
FROM wh_test.loops_power
WHERE hotel_id = CAST(:hotel_id AS int) AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar))
) lp
ON (
TRIM(lp.loop_address) = key OR
REPLACE(TRIM(lp.loop_address), '-', '') = key OR
LTRIM(TRIM(lp.loop_address), '0') = key
)
),
loops_energy_consumption = (
SELECT COALESCE(jsonb_object_agg(
key,
COALESCE(lp.total_energy_consumption, 0.0)
), '{}'::jsonb)
FROM jsonb_object_keys(CAST(:dev_loops AS jsonb)) AS key
LEFT JOIN (
SELECT loop_address, total_energy_consumption
FROM wh_test.loops_power
WHERE hotel_id = CAST(:hotel_id AS int) AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar))
) lp
ON (
TRIM(lp.loop_address) = key OR
REPLACE(TRIM(lp.loop_address), '-', '') = key OR
LTRIM(TRIM(lp.loop_address), '0') = key
)
),
ideal_energy_consumption = (
SELECT COALESCE(SUM(COALESCE(lp.total_energy_consumption, 0.0)), 0.0)
FROM jsonb_object_keys(CAST(:dev_loops AS jsonb)) AS key
LEFT JOIN (
SELECT loop_address, total_energy_consumption
FROM wh_test.loops_power
WHERE hotel_id = CAST(:hotel_id AS int) AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar))
) lp
ON (
TRIM(lp.loop_address) = key OR
REPLACE(TRIM(lp.loop_address), '-', '') = key OR
LTRIM(TRIM(lp.loop_address), '0') = key
)
)
WHERE hotel_id = CAST(:hotel_id AS int)
AND TRIM(room_id) = TRIM(CAST(:room_id AS varchar))
AND device_id = CAST(:device_id AS varchar);
"""
try:
with target_engine.begin() as conn:
# 务必先执行 loops_power 计算产生最新功耗,再执行 rooms_power 组装抽取
res2 = conn.execute(text(update_loops_sql), {
"bright_g": bright_g,
"dev_loops": dev_loops_str,
"hotel_id": hotel_id,
"room_id": room_id
})
res1 = conn.execute(text(update_rooms_sql), {
"dev_loops": dev_loops_str,
"hotel_id": hotel_id,
"room_id": room_id,
"device_id": device_id,
"ts_ms": ts_ms
})
logging.info(f"Updated hotel/room/device={hotel_id}/{room_id}/{device_id} | loops rows: {res2.rowcount}, rooms rows: {res1.rowcount}")
if res2.rowcount == 0:
logging.warning(
"No loops_power rows updated. Check room_id mapping and whether dev_loops keys match loop_address (024...) for hotel_id=%s room_id=%s",
hotel_id,
room_id,
)
except Exception as e:
logging.error(f"Error updating target database: {str(e)}")
def run_polling_update():
"""Periodic background update for energy tracking without touching source trigger times."""
update_loops_sql = r"""
-- ==========================================
-- 轮询任务一: 结算仍在耗电且【稳定无设备变动操作】的回路能耗 (入 loops_power 表)
-- ==========================================
-- 背景与核心: 如果某个电器(比如灯)被打开后长达一小时都没有进行“关灯调整”操作实时SQL就不会被触发能耗就停滞不动了。
-- 动作: 这个由异步线程拉起的定时刷新任务仅仅过滤那些 "正在工作产生功率的设备"(real_power>0),计算本周期所消耗的累计能度(kWh)。
-- 区别: 它仅累计能耗并刷新 last_computed_time 锚点,它不再做全套状态、心率重解操作。
UPDATE wh_test.loops_power lp
SET
total_energy_consumption = COALESCE(lp.total_energy_consumption, 0) +
(COALESCE(lp.real_power, 0) * ( (((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint) - COALESCE(lp.last_computed_time, ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint)))::float8 / 3600000.0 / 1000.0 )),
last_computed_time = (EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint
WHERE lp.real_power IS NOT NULL AND lp.real_power > 0;
"""
update_rooms_sql = r"""
-- ==========================================
-- 轮询任务二: 将上面轮询强算出来的变化过的房间能耗,聚合成 JSON 刷出
-- ==========================================
-- 优化核心点: 为了防止上面循环几万个实际上完全没住人、离线的“僵尸房间”导致 DB 锁库,
-- 引入临时视图 CTE (active_rooms),仅从真正有机器运行(real_power>0)的活跃库里连表做聚合。
WITH active_rooms AS (
SELECT DISTINCT hotel_id, TRIM(room_id) AS room_id
FROM wh_test.loops_power
WHERE real_power IS NOT NULL AND real_power > 0
)
UPDATE wh_test.rooms_power rp
-- 继续检测僵尸网关5分钟失联状态同步下线
SET rcu_online = CASE
WHEN ((EXTRACT(epoch FROM clock_timestamp()) * 1000)::bigint - rp.ts_ms) > 300000 THEN 0
ELSE 1
END,
loops_energy_consumption = (
SELECT COALESCE(jsonb_object_agg(
key,
COALESCE(lp.total_energy_consumption, 0.0)
), '{}'::jsonb)
FROM jsonb_object_keys(rp.dev_loops) AS key
LEFT JOIN (
SELECT loop_address, total_energy_consumption
FROM wh_test.loops_power
WHERE hotel_id = rp.hotel_id AND TRIM(room_id) = TRIM(rp.room_id)
) lp
ON (
TRIM(lp.loop_address) = key OR
REPLACE(TRIM(lp.loop_address), '-', '') = key OR
LTRIM(TRIM(lp.loop_address), '0') = key
)
),
ideal_energy_consumption = (
SELECT COALESCE(SUM(COALESCE(lp.total_energy_consumption, 0.0)), 0.0)
FROM jsonb_object_keys(rp.dev_loops) AS key
LEFT JOIN (
SELECT loop_address, total_energy_consumption
FROM wh_test.loops_power
WHERE hotel_id = rp.hotel_id AND TRIM(room_id) = TRIM(rp.room_id)
) lp
ON (
TRIM(lp.loop_address) = key OR
REPLACE(TRIM(lp.loop_address), '-', '') = key OR
LTRIM(TRIM(lp.loop_address), '0') = key
)
)
FROM active_rooms ar
WHERE rp.hotel_id = ar.hotel_id
AND TRIM(rp.room_id) = ar.room_id
AND rp.dev_loops IS NOT NULL
AND rp.dev_loops != '{}'::jsonb;
"""
try:
with target_engine.begin() as conn:
res2 = conn.execute(text(update_loops_sql))
res1 = conn.execute(text(update_rooms_sql))
logging.info(f"Polling update complete. loops: {res2.rowcount}, rooms: {res1.rowcount}")
except Exception as e:
logging.error(f"Error in polling update: {e}")
# 3. 阻塞等待通知并在断线时自动重连
portal_kwargs = get_pg_conn_kwargs("PORTAL")
# 开启 TCP 持久保活机制 (TCP Keepalive) 应对防火墙超时问题
portal_kwargs['keepalives'] = 1
portal_kwargs['keepalives_idle'] = 60
portal_kwargs['keepalives_interval'] = 10
portal_kwargs['keepalives_count'] = 5
channel = 'g5_realtime_channel'
while True:
try:
logging.info(f"Connecting to Source DB for listening: {portal_kwargs['host']}:{portal_kwargs['port']}/{portal_kwargs['dbname']}")
source_conn = psycopg2.connect(**portal_kwargs)
source_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = source_conn.cursor()
cur.execute(f"LISTEN {channel};")
logging.info(f"Listening on channel '{channel}'...")
last_poll_time = time.time()
while True:
# 5秒超时来心跳一下
if select.select([source_conn], [], [], 5) == ([], [], []):
pass
else:
source_conn.poll()
while source_conn.notifies:
notify = source_conn.notifies.pop(0)
try:
payload = json.loads(notify.payload)
update_target_db(payload)
except json.JSONDecodeError:
logging.error(f"Failed to parse payload: {notify.payload}")
# Check for polling update (e.g. every 60 seconds)
if time.time() - last_poll_time >= 60:
threading.Thread(target=run_polling_update, daemon=True).start()
last_poll_time = time.time()
except psycopg2.OperationalError as e:
logging.error(f"PostgreSQL connection lost. Attempting to reconnect in 5 seconds... details: {e}")
try:
if 'source_conn' in locals() and source_conn is not None:
source_conn.close()
except Exception:
pass
time.sleep(5)
except KeyboardInterrupt:
logging.info("Shutting down listener...")
if 'source_conn' in locals() and source_conn is not None:
source_conn.close()
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
time.sleep(5)
if __name__ == "__main__":
start_listener()