From 6d31731d53f76cef075a445e499d12d1093b9dad Mon Sep 17 00:00:00 2001 From: Indiem87 Date: Thu, 9 Apr 2026 12:17:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=20README=EF=BC=8C?= =?UTF-8?q?=E8=AF=A6=E7=BB=86=E6=8F=8F=E8=BF=B0=20ETL=20=E4=B8=8E=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E8=83=BD=E8=80=97=E5=90=8C=E6=AD=A5=E7=BB=84=E4=BB=B6?= =?UTF-8?q?=E5=8F=8A=E5=85=B6=E6=A0=B8=E5=BF=83=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index dc1a8e9..50db648 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,33 @@ # Learn_Energy_Special_Wen -节能专题学习库(文) \ No newline at end of file +节能专题学习库(文) + +## 最近更新:ETL与实时能耗同步组件 (2026-04-09) + +当前阶段构建并完成了从源库(15434 / log_platform)到目标库(15433 / test)的回路能耗数据抽取、转换、加载(ETL)及实时同步的基础架构。包含以下核心文件: + +### 1. 数据库配置与连接组件 (`db_config.py`) +- **技术细节**:统一管理基于 SQLAlchemy 和 `psycopg2` 的 PostgreSQL 数据库连接。 + - 提供了按 `prefix` (`PORTAL`, `TEST`, `SOURCE`, `TARGET`) 获取连接 URL 的工厂方法 `get_db_url`。 + - **安全性**:实现了 `get_db_password`,内置一次交互式 `getpass()` 收集及本地字典 `_password_cache` 缓存极致,支持环境变量回退机制。避免在批量执行时反复输入密码。 + +### 2. 回路与房间关联数据打平与校验 (`Loops_Power_Valid_1.py`, `Loops_Power_Valid_2.py`, `validate_flattened.py`) +- **`Loops_Power_Valid_1.py`**:将 `loops` 表的数据(`type IN ('0', '1')`)与 `hotels`、`room_type` 关联,并利用 PostgreSQL 特有的 `array_agg(...) FILTER ...` 分析函数,将同一回路所属关联的多个房间(`room_id`)聚合为一条数组格式的数据 (`loops_type01_enriched`)。 +- **`Loops_Power_Valid_2.py`**:一对多的基础平铺(Flattened)脚本。通过标准的 `LEFT JOIN` 将一个涉及多个房间的合并回路平铺出多行(`loops_type01_flattened`),建立清晰的设备-房间一对一拓扑。 +- **`validate_flattened.py`**:数据一致性验证工具。对抽样的目标库平铺数据(前500条)通过源库 `loop_id`, `hotel_id`, `room_type_id`, `room_id` 进行自动化交叉检验。 + +### 3. 全局静态计算流水线 (`Loops_Power_Calculation.py`) +- **技术细节**:执行存量能耗数据全量清洗构建业务逻辑的核心 Pipeline。 + - 并发抽取目标库的架构数据 (`loops_type01_flattened`) 和源库中的客观瞬间状态数据 (`room_status_moment_g5`)。 + - 使用 Pandas 进行了内存级数据对齐聚合(`merge`, `astype(str).str.strip()` 防脏数据)。处理位运算掩码解构 `bright_g` 的值,获取具体的真实开关功率规律,得出 `real_power`,然后清洗、回退,最后写回目标表 `wh_test.loops_power` 与 `wh_test.rooms_power`。 + +### 4. 数据完整性统计工具 (`check_matching_rate.py`) +- **技术细节**:双库联调监控工具。对比 15433 test 库提取的 `loops_power` 设备总量与 `room_status_moment_g5` 中房间总量的覆盖率关系,利用 Pandas 强制数字/字符串清洗,计算并给出宏观的匹配率数据以及找不到设备的烂数据明细,提供数据排查凭点。 + +### 5. 数据库实时事件流触发层 (`g5_notify_trigger.sql` 与 `realtime_g5_listener.py`) +- **`g5_notify_trigger.sql`**: + - 在源库端创建了拦截更新动作的 PostgreSQL 本地 Trigger (`trg_g5_update`) 监听特定字段变化,采用 `IS DISTINCT FROM` 语法避免无效触发。 + - 将变化集利用 `json_build_object` 封装,并下发给 PostgreSQL 的原生异步通知信道 `pg_notify('g5_realtime_channel', payload)`。 +- **`realtime_g5_listener.py`**: + - 使用 Python 进程 `psycopg2` 持久阻塞监听 `LISTEN g5_realtime_channel` 管道获取流式数据。 + - **设计亮点**:启动前带有 `initialize_energy_tracking` 函数,通过基于 `clock_timestamp()` 及 `jsonb_object_keys` 对数据库的存量能耗记录值归零对齐打锚点(重置 `last_computed_time`)。同时含有 TCP Keepalive 设置防止中间代理意外杀掉长链接。 \ No newline at end of file