From bd8613afcdf13bf2d71cbbefd16c892abd88e0d0 Mon Sep 17 00:00:00 2001 From: Indiem87 Date: Thu, 9 Apr 2026 11:47:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=20ETL=20=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=EF=BC=8C=E5=88=9B=E5=BB=BA=E7=9B=AE=E6=A0=87=E8=A1=A8?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E5=B9=B6=E5=A4=84=E7=90=86=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=8A=BD=E5=8F=96=E4=B8=8E=E8=BD=AC=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Loops_Power_Calculation.py | 299 ++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 database_test_python/Loops_Power_Calculation.py diff --git a/database_test_python/Loops_Power_Calculation.py b/database_test_python/Loops_Power_Calculation.py new file mode 100644 index 0000000..fc8f949 --- /dev/null +++ b/database_test_python/Loops_Power_Calculation.py @@ -0,0 +1,299 @@ +import os +import json + +import pandas as pd +from sqlalchemy import create_engine, text + +from db_config import get_db_url + + +def get_80portal_engine(): + """ + 配置并获取 80portal (源库1) 的数据库连接 (基于 settings.json 中的 80portal 连接配置) + 15434 / log_platform + """ + return create_engine(get_db_url("PORTAL")) + + +def get_test_engine(): + """ + 配置并获取 test (源库2 及 目标库) 的数据库连接 (基于 settings.json 中的 test 连接配置) + 15433 / test + """ + return create_engine(get_db_url("TEST")) + + +def create_target_tables(engine): + """ + 在 test 库的 wh_test schema 下创建目标表结构: loops_power 和 rooms_power + """ + schema = "wh_test" + with engine.begin() as conn: + # 创建 loops_power 表 + conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema}"')) + conn.execute(text(f'DROP TABLE IF EXISTS "{schema}"."loops_power"')) + create_loops_power_sql = f''' + CREATE TABLE "{schema}"."loops_power" ( + "id" int4 NOT NULL, + "loop_name" varchar(255), + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255), + "loop_type" varchar(50), + "type" varchar(254), + "name" varchar(254), + "power" float8, + "rate" float8, + "bright_g" int2, + "online_status" int2, + "real_power" float8, + "total_energy_consumption" float8, + "last_change_time" int8, + "last_computed_time" int8, + "temperature" float8, + "air_type" varchar(254), + "air_brand" varchar(254), + "air_model" varchar(254), + "height" float8, + "area" float8, + "heat_loss" float8, + "remark" varchar(254), + "hotel_id" int4, + "room_id" varchar(255) + ) + ''' + conn.execute(text(create_loops_power_sql)) + + # 创建 rooms_power 表 + conn.execute(text(f'DROP TABLE IF EXISTS "{schema}"."rooms_power"')) + create_rooms_power_sql = f''' + CREATE TABLE "{schema}"."rooms_power" ( + "hotel_id" int4, + "room_id" varchar(255), + "device_id" varchar(255), + "ts_ms" int8, + "sys_lock_status" int2, + "online_status" int2, + "launcher_version" varchar(255), + "app_version" varchar(255), + "config_version" varchar(255), + "register_ts_ms" int8, + "upgrade_ts_ms" int8, + "config_ts_ms" int8, + "ip" varchar(255), + "pms_status" int2, + "power_state" int2, + "cardless_state" int2, + "service_mask" int8, + "insert_card" int2, + "bright_g" int2, + "agreement_ver" varchar(255), + "air_address" text[], + "air_state" int2[], + "air_model" int2[], + "air_speed" int2[], + "air_set_temp" int2[], + "air_now_temp" int2[], + "air_solenoid_valve" int2[], + "elec_address" text[], + "elec_voltage" float8[], + "elec_ampere" float8[], + "elec_power" float8[], + "elec_phase" float8[], + "elec_energy" float8[], + "elec_sum_energy" float8[], + "dev_loops" jsonb, + "loops_power" jsonb, + "loops_energy_consumption" jsonb, + "ideal_energy_consumption" float8, + "carbon_state" int2, + "energy_carbon_sum" float8, + "energy_nocard_sum" float8, + "external_device" jsonb, + "faulty_device_count" jsonb, + "rcu_online" int2 + ) + ''' + conn.execute(text(create_rooms_power_sql)) + + +def extract_data(engine_80portal, engine_test): + """ + 从两个源库抽取相应的数据 + """ + print("正在从 80portal/room_status/room_status_moment_g5 抽取数据...") + sql_moment = 'SELECT * FROM "room_status"."room_status_moment_g5"' + dfs_moment = pd.read_sql(text(sql_moment), engine_80portal) + + print("正在从 test/wh_test/loops_type01_flattened 抽取数据...") + sql_loops = "SELECT * FROM wh_test.loops_type01_flattened WHERE loop_address LIKE '024%%'" + dfs_loops = pd.read_sql(text(sql_loops), engine_test) + + return dfs_moment, dfs_loops + + +def transform_data(dfs_moment, dfs_loops): + """ + 将两个源表的数据进行关联或融合计算,产出目标表所需的 DataFrame + """ + print("正在处理数据 (提取位运算解析 rate / online_status 及求值 real_power)...") + + # 建立 df_rooms_power 供落库并序列化 JSON + df_rooms_power = dfs_moment.copy() + + def init_loops_power(x): + import json + if isinstance(x, dict): + return json.dumps({k: 0.0 for k in x.keys()}) + elif isinstance(x, str): + try: + d = json.loads(x) + if isinstance(d, dict): + return json.dumps({k: 0.0 for k in d.keys()}) + except Exception: + pass + return '{}' + + df_rooms_power['loops_power'] = df_rooms_power['dev_loops'].apply(init_loops_power) + df_rooms_power['loops_energy_consumption'] = df_rooms_power['loops_power'] + df_rooms_power['ideal_energy_consumption'] = 0.0 + + import time + current_time_ms = int(time.time() * 1000) + df_rooms_power['rcu_online'] = df_rooms_power['ts_ms'].apply( + lambda ts: 0 if pd.notnull(ts) and (current_time_ms - float(ts) > 300000) else 1 + ) + + if 'dev_loops' in df_rooms_power.columns: + df_rooms_power['dev_loops'] = df_rooms_power['dev_loops'].apply( + lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if pd.notnull(x) else '{}') + ) + if 'external_device' in df_rooms_power.columns: + df_rooms_power['external_device'] = df_rooms_power['external_device'].apply( + lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if pd.notnull(x) else '{}') + ) + if 'faulty_device_count' in df_rooms_power.columns: + df_rooms_power['faulty_device_count'] = df_rooms_power['faulty_device_count'].apply( + lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if pd.notnull(x) else '{}') + ) + + # 将 bright_g 和原始字典态的 dev_loops 关联到 loops 数据以便计算 + room_info_df = dfs_moment[['hotel_id', 'room_id', 'bright_g', 'dev_loops']].drop_duplicates(['hotel_id', 'room_id']) + + # 强制将融合键转换为字符串并去除头尾空格,防止类型不同(int与str)且防止隐藏空格导致匹配失败 + dfs_loops['hotel_id'] = dfs_loops['hotel_id'].astype(str).str.strip() + dfs_loops['room_id'] = dfs_loops['room_id'].astype(str).str.strip() + room_info_df['hotel_id'] = room_info_df['hotel_id'].astype(str).str.strip() + room_info_df['room_id'] = room_info_df['room_id'].astype(str).str.strip() + + df_loops_power = pd.merge(dfs_loops, room_info_df, on=['hotel_id', 'room_id'], how='left') + + unmatched_rows = df_loops_power[df_loops_power['dev_loops'].isna()].shape[0] + if unmatched_rows > 0: + print(f"\n[WARNING] {unmatched_rows} 行 loops_power 数据未在 80portal 找到对应的房态数据(可能是历史脏数据或确实没有该房数据)。\n") + + + def calculate_derive_metrics(row): + rate = row.get('rate') + online_status = None + real_power = None + + dev_loops = row.get('dev_loops') + loop_addr = str(row.get('loop_address')).strip() + + if isinstance(dev_loops, str): + try: + import json + dev_loops = json.loads(dev_loops) + except Exception: + pass + + # 执行位运算解析 + if isinstance(dev_loops, dict) and loop_addr in dev_loops: + try: + val = int(dev_loops[loop_addr]) + # 从高8位提取 rate + rate = float((val >> 8) & 0xFF) + # 从低8位提取在线状态 + online_status = 1 if (val & 0xFF) == 1 else 0 + except (ValueError, TypeError): + pass + + bright_g = row.get('bright_g') + power = row.get('power') + + # 依照公式: power * rate * bright_g * online_status / 10000 计算 real_power + if pd.notna(power) and pd.notna(rate) and pd.notna(bright_g) and pd.notna(online_status): + real_power = float((power * rate * bright_g * online_status) / 10000.0) + + return pd.Series([rate, online_status, real_power], index=['rate', 'online_status', 'real_power']) + + # 回填运算结果 + df_loops_power[['rate', 'online_status', 'real_power']] = df_loops_power.apply(calculate_derive_metrics, axis=1) + + print("\n[Debug] 一些存在 dev_loops 的数据的属性情况:") + debug_df = df_loops_power[df_loops_power['dev_loops'].notna()] + print(debug_df[['loop_address', 'power', 'rate', 'bright_g', 'online_status']].head(10)) + if not debug_df.empty: + sample = debug_df.iloc[0] + print("\n[Sample dev_loops]:", sample['dev_loops']) + print("[Sample loop_address]:", repr(sample['loop_address'])) + + # 显示计算结果中成功匹配且算出了 real_power 的前十条 + matched_df = df_loops_power[df_loops_power['real_power'].notna()] + print(f"\n--- [Check] 找到 {len(matched_df)} 条成功匹配并计算出真实功率的数据。以下为前 10 条示例: ---") + + display_cols = ['hotel_id', 'room_id', 'loop_address', 'dev_loops', 'power', 'bright_g', 'rate', 'online_status', 'real_power'] + actual_cols = [c for c in display_cols if c in df_loops_power.columns] + + pd.set_option('display.max_columns', None) + pd.set_option('display.width', 1000) + + if not matched_df.empty: + print(matched_df[actual_cols].head(10)) + else: + print("警告:匹配出的数据中没有任何一条能正确计算出 real_power !") + print("-" * 50 + "\n") + + # 用完后清除暂留的辅助列 + df_loops_power = df_loops_power.drop(columns=['dev_loops']) + + # 将关联使用的 hotel_id 转回后续目标表需要的 INT 数值类型以防报错 + df_loops_power['hotel_id'] = pd.to_numeric(df_loops_power['hotel_id'], errors='coerce').fillna(0).astype(int) + + return df_loops_power, df_rooms_power + + +def load_data(engine_test, df_loops_power, df_rooms_power): + """ + 将计算结果写回 test 库 + """ + schema = "wh_test" + print(f"正在写入目标表 test/{schema}/loops_power... 行数: {{len(df_loops_power)}}") + df_loops_power.to_sql("loops_power", engine_test, schema=schema, if_exists="append", index=False) + + print(f"正在写入目标表 test/{schema}/rooms_power... 行数: {{len(df_rooms_power)}}") + df_rooms_power.to_sql("rooms_power", engine_test, schema=schema, if_exists="append", index=False) + + +def main(): + # 1. 建立数据库连接 + engine_80portal = get_80portal_engine() + engine_test = get_test_engine() + + # 2. 初始化目标表结构 + create_target_tables(engine_test) + + # 3. 提取数据 + dfs_moment, dfs_loops = extract_data(engine_80portal, engine_test) + + # 4. 转换计算 + df_loops_power, df_rooms_power = transform_data(dfs_moment, dfs_loops) + + # 5. 加载写入 + load_data(engine_test, df_loops_power, df_rooms_power) + + print("ETL Pipeline 框架搭建完成") + + +if __name__ == "__main__": + main()