feat: 实现 ETL 流程,创建目标表结构并处理数据抽取与转换

This commit is contained in:
2026-04-09 11:47:12 +08:00
parent a4fab79970
commit bd8613afcd

View File

@@ -0,0 +1,299 @@
import os
import json
import pandas as pd
from sqlalchemy import create_engine, text
from db_config import get_db_url
def get_80portal_engine():
"""
配置并获取 80portal (源库1) 的数据库连接 (基于 settings.json 中的 80portal 连接配置)
15434 / log_platform
"""
return create_engine(get_db_url("PORTAL"))
def get_test_engine():
"""
配置并获取 test (源库2 及 目标库) 的数据库连接 (基于 settings.json 中的 test 连接配置)
15433 / test
"""
return create_engine(get_db_url("TEST"))
def create_target_tables(engine):
"""
在 test 库的 wh_test schema 下创建目标表结构: loops_power 和 rooms_power
"""
schema = "wh_test"
with engine.begin() as conn:
# 创建 loops_power 表
conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema}"'))
conn.execute(text(f'DROP TABLE IF EXISTS "{schema}"."loops_power"'))
create_loops_power_sql = f'''
CREATE TABLE "{schema}"."loops_power" (
"id" int4 NOT NULL,
"loop_name" varchar(255),
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255),
"loop_type" varchar(50),
"type" varchar(254),
"name" varchar(254),
"power" float8,
"rate" float8,
"bright_g" int2,
"online_status" int2,
"real_power" float8,
"total_energy_consumption" float8,
"last_change_time" int8,
"last_computed_time" int8,
"temperature" float8,
"air_type" varchar(254),
"air_brand" varchar(254),
"air_model" varchar(254),
"height" float8,
"area" float8,
"heat_loss" float8,
"remark" varchar(254),
"hotel_id" int4,
"room_id" varchar(255)
)
'''
conn.execute(text(create_loops_power_sql))
# 创建 rooms_power 表
conn.execute(text(f'DROP TABLE IF EXISTS "{schema}"."rooms_power"'))
create_rooms_power_sql = f'''
CREATE TABLE "{schema}"."rooms_power" (
"hotel_id" int4,
"room_id" varchar(255),
"device_id" varchar(255),
"ts_ms" int8,
"sys_lock_status" int2,
"online_status" int2,
"launcher_version" varchar(255),
"app_version" varchar(255),
"config_version" varchar(255),
"register_ts_ms" int8,
"upgrade_ts_ms" int8,
"config_ts_ms" int8,
"ip" varchar(255),
"pms_status" int2,
"power_state" int2,
"cardless_state" int2,
"service_mask" int8,
"insert_card" int2,
"bright_g" int2,
"agreement_ver" varchar(255),
"air_address" text[],
"air_state" int2[],
"air_model" int2[],
"air_speed" int2[],
"air_set_temp" int2[],
"air_now_temp" int2[],
"air_solenoid_valve" int2[],
"elec_address" text[],
"elec_voltage" float8[],
"elec_ampere" float8[],
"elec_power" float8[],
"elec_phase" float8[],
"elec_energy" float8[],
"elec_sum_energy" float8[],
"dev_loops" jsonb,
"loops_power" jsonb,
"loops_energy_consumption" jsonb,
"ideal_energy_consumption" float8,
"carbon_state" int2,
"energy_carbon_sum" float8,
"energy_nocard_sum" float8,
"external_device" jsonb,
"faulty_device_count" jsonb,
"rcu_online" int2
)
'''
conn.execute(text(create_rooms_power_sql))
def extract_data(engine_80portal, engine_test):
"""
从两个源库抽取相应的数据
"""
print("正在从 80portal/room_status/room_status_moment_g5 抽取数据...")
sql_moment = 'SELECT * FROM "room_status"."room_status_moment_g5"'
dfs_moment = pd.read_sql(text(sql_moment), engine_80portal)
print("正在从 test/wh_test/loops_type01_flattened 抽取数据...")
sql_loops = "SELECT * FROM wh_test.loops_type01_flattened WHERE loop_address LIKE '024%%'"
dfs_loops = pd.read_sql(text(sql_loops), engine_test)
return dfs_moment, dfs_loops
def transform_data(dfs_moment, dfs_loops):
"""
将两个源表的数据进行关联或融合计算,产出目标表所需的 DataFrame
"""
print("正在处理数据 (提取位运算解析 rate / online_status 及求值 real_power)...")
# 建立 df_rooms_power 供落库并序列化 JSON
df_rooms_power = dfs_moment.copy()
def init_loops_power(x):
import json
if isinstance(x, dict):
return json.dumps({k: 0.0 for k in x.keys()})
elif isinstance(x, str):
try:
d = json.loads(x)
if isinstance(d, dict):
return json.dumps({k: 0.0 for k in d.keys()})
except Exception:
pass
return '{}'
df_rooms_power['loops_power'] = df_rooms_power['dev_loops'].apply(init_loops_power)
df_rooms_power['loops_energy_consumption'] = df_rooms_power['loops_power']
df_rooms_power['ideal_energy_consumption'] = 0.0
import time
current_time_ms = int(time.time() * 1000)
df_rooms_power['rcu_online'] = df_rooms_power['ts_ms'].apply(
lambda ts: 0 if pd.notnull(ts) and (current_time_ms - float(ts) > 300000) else 1
)
if 'dev_loops' in df_rooms_power.columns:
df_rooms_power['dev_loops'] = df_rooms_power['dev_loops'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if pd.notnull(x) else '{}')
)
if 'external_device' in df_rooms_power.columns:
df_rooms_power['external_device'] = df_rooms_power['external_device'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if pd.notnull(x) else '{}')
)
if 'faulty_device_count' in df_rooms_power.columns:
df_rooms_power['faulty_device_count'] = df_rooms_power['faulty_device_count'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if pd.notnull(x) else '{}')
)
# 将 bright_g 和原始字典态的 dev_loops 关联到 loops 数据以便计算
room_info_df = dfs_moment[['hotel_id', 'room_id', 'bright_g', 'dev_loops']].drop_duplicates(['hotel_id', 'room_id'])
# 强制将融合键转换为字符串并去除头尾空格,防止类型不同(int与str)且防止隐藏空格导致匹配失败
dfs_loops['hotel_id'] = dfs_loops['hotel_id'].astype(str).str.strip()
dfs_loops['room_id'] = dfs_loops['room_id'].astype(str).str.strip()
room_info_df['hotel_id'] = room_info_df['hotel_id'].astype(str).str.strip()
room_info_df['room_id'] = room_info_df['room_id'].astype(str).str.strip()
df_loops_power = pd.merge(dfs_loops, room_info_df, on=['hotel_id', 'room_id'], how='left')
unmatched_rows = df_loops_power[df_loops_power['dev_loops'].isna()].shape[0]
if unmatched_rows > 0:
print(f"\n[WARNING] {unmatched_rows} 行 loops_power 数据未在 80portal 找到对应的房态数据(可能是历史脏数据或确实没有该房数据)。\n")
def calculate_derive_metrics(row):
rate = row.get('rate')
online_status = None
real_power = None
dev_loops = row.get('dev_loops')
loop_addr = str(row.get('loop_address')).strip()
if isinstance(dev_loops, str):
try:
import json
dev_loops = json.loads(dev_loops)
except Exception:
pass
# 执行位运算解析
if isinstance(dev_loops, dict) and loop_addr in dev_loops:
try:
val = int(dev_loops[loop_addr])
# 从高8位提取 rate
rate = float((val >> 8) & 0xFF)
# 从低8位提取在线状态
online_status = 1 if (val & 0xFF) == 1 else 0
except (ValueError, TypeError):
pass
bright_g = row.get('bright_g')
power = row.get('power')
# 依照公式: power * rate * bright_g * online_status / 10000 计算 real_power
if pd.notna(power) and pd.notna(rate) and pd.notna(bright_g) and pd.notna(online_status):
real_power = float((power * rate * bright_g * online_status) / 10000.0)
return pd.Series([rate, online_status, real_power], index=['rate', 'online_status', 'real_power'])
# 回填运算结果
df_loops_power[['rate', 'online_status', 'real_power']] = df_loops_power.apply(calculate_derive_metrics, axis=1)
print("\n[Debug] 一些存在 dev_loops 的数据的属性情况:")
debug_df = df_loops_power[df_loops_power['dev_loops'].notna()]
print(debug_df[['loop_address', 'power', 'rate', 'bright_g', 'online_status']].head(10))
if not debug_df.empty:
sample = debug_df.iloc[0]
print("\n[Sample dev_loops]:", sample['dev_loops'])
print("[Sample loop_address]:", repr(sample['loop_address']))
# 显示计算结果中成功匹配且算出了 real_power 的前十条
matched_df = df_loops_power[df_loops_power['real_power'].notna()]
print(f"\n--- [Check] 找到 {len(matched_df)} 条成功匹配并计算出真实功率的数据。以下为前 10 条示例: ---")
display_cols = ['hotel_id', 'room_id', 'loop_address', 'dev_loops', 'power', 'bright_g', 'rate', 'online_status', 'real_power']
actual_cols = [c for c in display_cols if c in df_loops_power.columns]
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
if not matched_df.empty:
print(matched_df[actual_cols].head(10))
else:
print("警告:匹配出的数据中没有任何一条能正确计算出 real_power !")
print("-" * 50 + "\n")
# 用完后清除暂留的辅助列
df_loops_power = df_loops_power.drop(columns=['dev_loops'])
# 将关联使用的 hotel_id 转回后续目标表需要的 INT 数值类型以防报错
df_loops_power['hotel_id'] = pd.to_numeric(df_loops_power['hotel_id'], errors='coerce').fillna(0).astype(int)
return df_loops_power, df_rooms_power
def load_data(engine_test, df_loops_power, df_rooms_power):
"""
将计算结果写回 test 库
"""
schema = "wh_test"
print(f"正在写入目标表 test/{schema}/loops_power... 行数: {{len(df_loops_power)}}")
df_loops_power.to_sql("loops_power", engine_test, schema=schema, if_exists="append", index=False)
print(f"正在写入目标表 test/{schema}/rooms_power... 行数: {{len(df_rooms_power)}}")
df_rooms_power.to_sql("rooms_power", engine_test, schema=schema, if_exists="append", index=False)
def main():
# 1. 建立数据库连接
engine_80portal = get_80portal_engine()
engine_test = get_test_engine()
# 2. 初始化目标表结构
create_target_tables(engine_test)
# 3. 提取数据
dfs_moment, dfs_loops = extract_data(engine_80portal, engine_test)
# 4. 转换计算
df_loops_power, df_rooms_power = transform_data(dfs_moment, dfs_loops)
# 5. 加载写入
load_data(engine_test, df_loops_power, df_rooms_power)
print("ETL Pipeline 框架搭建完成")
if __name__ == "__main__":
main()