feat: 添加 ETL 脚本,提取并处理 loops 数据,创建目标表并插入聚合结果

This commit is contained in:
2026-04-09 11:47:27 +08:00
parent bd8613afcd
commit 9ec2847bad

View File

@@ -0,0 +1,160 @@
import os
import pandas as pd
from sqlalchemy import create_engine, text
from db_config import get_db_url
LOOPS_COLUMNS = [
"id",
"loop_name",
"room_type_id",
"loop_address",
"loop_type",
"type",
"name",
"power",
"rate",
"temperature",
"air_type",
"air_brand",
"air_model",
"height",
"area",
"heat_loss",
"remark",
]
def create_target_table(target_conn, target_schema: str, target_table: str):
"""
在目标库中创建目标表。
该表包含基础回路信息,以及归属的 hotel_name、room_type_name 和聚合后的 room_id 数组。
"""
target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"'))
target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"'))
create_sql = f'''
CREATE TABLE "{target_schema}"."{target_table}" (
"id" int4 NOT NULL,
"loop_name" varchar(255),
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255),
"loop_type" varchar(50),
"type" varchar(254),
"name" varchar(254),
"power" float8,
"rate" float8,
"temperature" float8,
"air_type" varchar(254),
"air_brand" varchar(254),
"air_model" varchar(254),
"height" float8,
"area" float8,
"heat_loss" float8,
"remark" varchar(254),
"hotel_id" int4,
"hotel_name" varchar(255),
"room_type_name" varchar(255),
"room_id" varchar(255)[]
)
'''
target_conn.execute(text(create_sql))
def main():
"""
主函数:执行数据的抽取、转换与加载 (ETL)。
此脚本主要用于将 loops 表与 room_type、hotels、rooms 表关联,
并通过 array_agg 把同一回路对应多个 room_id 聚合成一个数组作为单行存储 (Aggregated/Enriched)。
"""
# 从环境变量获取 schema 和 table 名称配置
source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project")
target_schema = os.getenv("TARGET_SCHEMA", "wh_test")
target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched")
# 默认源库log_platform默认目标库test
# 使用刚重构的全局 db_config 构建对应的数据库连接引擎
source_engine = create_engine(get_db_url("SOURCE"))
target_engine = create_engine(get_db_url("TARGET"))
with source_engine.connect() as source_conn:
stats_sql = f'''
SELECT
count(*) AS total_rows,
count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows,
count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows
FROM "{source_schema}"."loops"
'''
stats = source_conn.execute(text(stats_sql)).mappings().first()
dist_sql = f'''
SELECT
"type",
count(*) AS cnt
FROM "{source_schema}"."loops"
GROUP BY "type"
ORDER BY cnt DESC, "type" NULLS FIRST
LIMIT 20
'''
type_dist = source_conn.execute(text(dist_sql)).mappings().all()
loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS])
group_by_columns = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS])
# 核心 SQL:
# 将 type 为 '0' 或 '1' 的回路关联酒店和房间信息,
# 并基于 LOOP 唯一信息,将同一回路所辖的多个房间 ID 合并为一个 PostgreSQL 字符串数组。
data_sql = f'''
SELECT
{loops_select},
h."hotel_id" AS hotel_id,
h."hotel_name" AS hotel_name,
rt."room_type_name" AS room_type_name,
COALESCE(array_agg(r."room_id") FILTER (WHERE r."room_id" IS NOT NULL), ARRAY[]::varchar[]) AS room_id
FROM "{source_schema}"."loops" l
LEFT JOIN "{source_schema}"."room_type" rt
ON rt."id" = l."room_type_id"
LEFT JOIN "{source_schema}"."hotels" h
ON h."id" = rt."hotel_id"
LEFT JOIN "{source_schema}"."rooms" r
ON r."hotel_id" = rt."hotel_id" AND r."room_type_id" = l."room_type_id"
WHERE trim(coalesce(l."type", '')) IN ('0', '1')
GROUP BY
{group_by_columns},
h."hotel_id",
h."hotel_name",
rt."room_type_name"
'''
df = pd.read_sql(text(data_sql), source_conn)
with target_engine.begin() as target_conn:
create_target_table(target_conn, target_schema, target_table)
df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False)
print(f"Inserted rows: {len(df)}")
print(f"Target table: {target_schema}.{target_table}")
print(
"Source counts: "
f"total={stats['total_rows']}, "
f"exact_type_0_1={stats['exact_match_rows']}, "
f"normalized_type_0_1={stats['normalized_match_rows']}"
)
print("Top 20 type distribution:")
for row in type_dist:
print(f"type={row['type']!r}, count={row['cnt']}")
if __name__ == "__main__":
main()
# YourActualStrongPasswordForPostgres!
# H3IkLUt8K!x