feat: 更新 README,详细描述 ETL 与实时能耗同步组件及其核心文件

This commit is contained in:
2026-04-09 12:17:44 +08:00
parent 9872bee487
commit 6d31731d53

View File

@@ -1,3 +1,33 @@
# Learn_Energy_Special_Wen
节能专题学习库(文)
节能专题学习库(文)
## 最近更新ETL与实时能耗同步组件 (2026-04-09)
当前阶段构建并完成了从源库15434 / log_platform到目标库15433 / test的回路能耗数据抽取、转换、加载ETL及实时同步的基础架构。包含以下核心文件
### 1. 数据库配置与连接组件 (`db_config.py`)
- **技术细节**:统一管理基于 SQLAlchemy 和 `psycopg2` 的 PostgreSQL 数据库连接。
- 提供了按 `prefix` (`PORTAL`, `TEST`, `SOURCE`, `TARGET`) 获取连接 URL 的工厂方法 `get_db_url`
- **安全性**:实现了 `get_db_password`,内置一次交互式 `getpass()` 收集及本地字典 `_password_cache` 缓存极致,支持环境变量回退机制。避免在批量执行时反复输入密码。
### 2. 回路与房间关联数据打平与校验 (`Loops_Power_Valid_1.py`, `Loops_Power_Valid_2.py`, `validate_flattened.py`)
- **`Loops_Power_Valid_1.py`**:将 `loops` 表的数据(`type IN ('0', '1')`)与 `hotels``room_type` 关联,并利用 PostgreSQL 特有的 `array_agg(...) FILTER ...` 分析函数,将同一回路所属关联的多个房间(`room_id`)聚合为一条数组格式的数据 (`loops_type01_enriched`)。
- **`Loops_Power_Valid_2.py`**一对多的基础平铺Flattened脚本。通过标准的 `LEFT JOIN` 将一个涉及多个房间的合并回路平铺出多行(`loops_type01_flattened`),建立清晰的设备-房间一对一拓扑。
- **`validate_flattened.py`**数据一致性验证工具。对抽样的目标库平铺数据前500条通过源库 `loop_id`, `hotel_id`, `room_type_id`, `room_id` 进行自动化交叉检验。
### 3. 全局静态计算流水线 (`Loops_Power_Calculation.py`)
- **技术细节**:执行存量能耗数据全量清洗构建业务逻辑的核心 Pipeline。
- 并发抽取目标库的架构数据 (`loops_type01_flattened`) 和源库中的客观瞬间状态数据 (`room_status_moment_g5`)。
- 使用 Pandas 进行了内存级数据对齐聚合(`merge`, `astype(str).str.strip()` 防脏数据)。处理位运算掩码解构 `bright_g` 的值,获取具体的真实开关功率规律,得出 `real_power`,然后清洗、回退,最后写回目标表 `wh_test.loops_power``wh_test.rooms_power`
### 4. 数据完整性统计工具 (`check_matching_rate.py`)
- **技术细节**:双库联调监控工具。对比 15433 test 库提取的 `loops_power` 设备总量与 `room_status_moment_g5` 中房间总量的覆盖率关系,利用 Pandas 强制数字/字符串清洗,计算并给出宏观的匹配率数据以及找不到设备的烂数据明细,提供数据排查凭点。
### 5. 数据库实时事件流触发层 (`g5_notify_trigger.sql` 与 `realtime_g5_listener.py`)
- **`g5_notify_trigger.sql`**
- 在源库端创建了拦截更新动作的 PostgreSQL 本地 Trigger (`trg_g5_update`) 监听特定字段变化,采用 `IS DISTINCT FROM` 语法避免无效触发。
- 将变化集利用 `json_build_object` 封装,并下发给 PostgreSQL 的原生异步通知信道 `pg_notify('g5_realtime_channel', payload)`
- **`realtime_g5_listener.py`**
- 使用 Python 进程 `psycopg2` 持久阻塞监听 `LISTEN g5_realtime_channel` 管道获取流式数据。
- **设计亮点**:启动前带有 `initialize_energy_tracking` 函数,通过基于 `clock_timestamp()``jsonb_object_keys` 对数据库的存量能耗记录值归零对齐打锚点(重置 `last_computed_time`)。同时含有 TCP Keepalive 设置防止中间代理意外杀掉长链接。