feat: 添加 ETL 脚本,执行数据抽取、转换与加载,支持跨房间回路数据处理
This commit is contained in:
140
database_test_python/Loops_Power_Valid_2.py
Normal file
140
database_test_python/Loops_Power_Valid_2.py
Normal file
@@ -0,0 +1,140 @@
|
||||
import os
|
||||
|
||||
import pandas as pd
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
from db_config import get_db_url
|
||||
|
||||
|
||||
LOOPS_COLUMNS = [
|
||||
"id",
|
||||
"loop_name",
|
||||
"room_type_id",
|
||||
"loop_address",
|
||||
"loop_type",
|
||||
"type",
|
||||
"name",
|
||||
"power",
|
||||
"rate",
|
||||
"temperature",
|
||||
"air_type",
|
||||
"air_brand",
|
||||
"air_model",
|
||||
"height",
|
||||
"area",
|
||||
"heat_loss",
|
||||
"remark",
|
||||
]
|
||||
|
||||
|
||||
def create_target_table(target_conn, target_schema: str, target_table: str):
|
||||
"""
|
||||
在目标库中创建目标表。
|
||||
与 Valid_1 不同,此表用于存储跨房间的一对多摊平(Flattened)后的回路与对应明细 (单个回路对应多个房会生成多行)。
|
||||
"""
|
||||
target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"'))
|
||||
target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"'))
|
||||
|
||||
# room_type_id 已经在 LOOPS_COLUMNS 中包含,所以不重复添加,只需要单独加 hotel_id 和 room_id
|
||||
create_sql = f'''
|
||||
CREATE TABLE "{target_schema}"."{target_table}" (
|
||||
"id" int4 NOT NULL,
|
||||
"loop_name" varchar(255),
|
||||
"room_type_id" int4 NOT NULL,
|
||||
"loop_address" varchar(255),
|
||||
"loop_type" varchar(50),
|
||||
"type" varchar(254),
|
||||
"name" varchar(254),
|
||||
"power" float8,
|
||||
"rate" float8,
|
||||
"temperature" float8,
|
||||
"air_type" varchar(254),
|
||||
"air_brand" varchar(254),
|
||||
"air_model" varchar(254),
|
||||
"height" float8,
|
||||
"area" float8,
|
||||
"heat_loss" float8,
|
||||
"remark" varchar(254),
|
||||
"hotel_id" int4,
|
||||
"room_id" varchar(255)
|
||||
)
|
||||
'''
|
||||
target_conn.execute(text(create_sql))
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
主函数:执行数据的抽取、转换与加载 (ETL)。
|
||||
此脚本主要使用 JOIN 将 loops 表与 rooms 关联,
|
||||
如果某个 circuit (loop) 对应的是多个 room_id,则会直接平铺 (Flattened) 输出为多行记录分别保存。
|
||||
"""
|
||||
source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project")
|
||||
target_schema = os.getenv("TARGET_SCHEMA", "wh_test")
|
||||
target_table = os.getenv("TARGET_TABLE", "loops_type01_flattened")
|
||||
|
||||
# 此处利用 db_config 中统一配置创建源和目标的 SQLAlchemy 连接引擎
|
||||
# 默认源库:log_platform;默认目标库:test
|
||||
source_engine = create_engine(get_db_url("SOURCE"))
|
||||
target_engine = create_engine(get_db_url("TARGET"))
|
||||
|
||||
with source_engine.connect() as source_conn:
|
||||
stats_sql = f'''
|
||||
SELECT
|
||||
count(*) AS total_rows,
|
||||
count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows,
|
||||
count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows
|
||||
FROM "{source_schema}"."loops"
|
||||
'''
|
||||
stats = source_conn.execute(text(stats_sql)).mappings().first()
|
||||
|
||||
dist_sql = f'''
|
||||
SELECT
|
||||
"type",
|
||||
count(*) AS cnt
|
||||
FROM "{source_schema}"."loops"
|
||||
GROUP BY "type"
|
||||
ORDER BY cnt DESC, "type" NULLS FIRST
|
||||
LIMIT 20
|
||||
'''
|
||||
type_dist = source_conn.execute(text(dist_sql)).mappings().all()
|
||||
|
||||
loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS])
|
||||
|
||||
# 核心 SQL:
|
||||
# 使用标准的 LEFT JOIN 自然展开。
|
||||
# 取 type 为 '0' 或 '1' 的回路,一行 loop 如果通过房型关联对应多个 room_id,则会自动分裂成多行 (平铺存储)。
|
||||
data_sql = f'''
|
||||
SELECT
|
||||
{loops_select},
|
||||
h."hotel_id" AS hotel_id,
|
||||
r."room_id" AS room_id
|
||||
FROM "{source_schema}"."loops" l
|
||||
LEFT JOIN "{source_schema}"."room_type" rt
|
||||
ON rt."id" = l."room_type_id"
|
||||
LEFT JOIN "{source_schema}"."hotels" h
|
||||
ON h."id" = rt."hotel_id"
|
||||
LEFT JOIN "{source_schema}"."rooms" r
|
||||
ON r."hotel_id" = rt."hotel_id" AND r."room_type_id" = l."room_type_id"
|
||||
WHERE trim(coalesce(l."type", '')) IN ('0', '1')
|
||||
'''
|
||||
df = pd.read_sql(text(data_sql), source_conn)
|
||||
|
||||
with target_engine.begin() as target_conn:
|
||||
create_target_table(target_conn, target_schema, target_table)
|
||||
df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False)
|
||||
|
||||
print(f"Inserted rows: {len(df)}")
|
||||
print(f"Target table: {target_schema}.{target_table}")
|
||||
print(
|
||||
"Source counts: "
|
||||
f"total={stats['total_rows']}, "
|
||||
f"exact_type_0_1={stats['exact_match_rows']}, "
|
||||
f"normalized_type_0_1={stats['normalized_match_rows']}"
|
||||
)
|
||||
print("Top 20 type distribution:")
|
||||
for row in type_dist:
|
||||
print(f"type={row['type']!r}, count={row['cnt']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user