diff --git a/database_test_python/Loops_Power_Valid_2.py b/database_test_python/Loops_Power_Valid_2.py new file mode 100644 index 0000000..b4a5f86 --- /dev/null +++ b/database_test_python/Loops_Power_Valid_2.py @@ -0,0 +1,140 @@ +import os + +import pandas as pd +from sqlalchemy import create_engine, text + +from db_config import get_db_url + + +LOOPS_COLUMNS = [ + "id", + "loop_name", + "room_type_id", + "loop_address", + "loop_type", + "type", + "name", + "power", + "rate", + "temperature", + "air_type", + "air_brand", + "air_model", + "height", + "area", + "heat_loss", + "remark", +] + + +def create_target_table(target_conn, target_schema: str, target_table: str): + """ + 在目标库中创建目标表。 + 与 Valid_1 不同,此表用于存储跨房间的一对多摊平(Flattened)后的回路与对应明细 (单个回路对应多个房会生成多行)。 + """ + target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"')) + target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"')) + + # room_type_id 已经在 LOOPS_COLUMNS 中包含,所以不重复添加,只需要单独加 hotel_id 和 room_id + create_sql = f''' + CREATE TABLE "{target_schema}"."{target_table}" ( + "id" int4 NOT NULL, + "loop_name" varchar(255), + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255), + "loop_type" varchar(50), + "type" varchar(254), + "name" varchar(254), + "power" float8, + "rate" float8, + "temperature" float8, + "air_type" varchar(254), + "air_brand" varchar(254), + "air_model" varchar(254), + "height" float8, + "area" float8, + "heat_loss" float8, + "remark" varchar(254), + "hotel_id" int4, + "room_id" varchar(255) + ) + ''' + target_conn.execute(text(create_sql)) + + +def main(): + """ + 主函数:执行数据的抽取、转换与加载 (ETL)。 + 此脚本主要使用 JOIN 将 loops 表与 rooms 关联, + 如果某个 circuit (loop) 对应的是多个 room_id,则会直接平铺 (Flattened) 输出为多行记录分别保存。 + """ + source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") + target_schema = os.getenv("TARGET_SCHEMA", "wh_test") + target_table = os.getenv("TARGET_TABLE", "loops_type01_flattened") + + # 此处利用 db_config 中统一配置创建源和目标的 SQLAlchemy 连接引擎 + # 默认源库:log_platform;默认目标库:test + source_engine = create_engine(get_db_url("SOURCE")) + target_engine = create_engine(get_db_url("TARGET")) + + with source_engine.connect() as source_conn: + stats_sql = f''' + SELECT + count(*) AS total_rows, + count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows, + count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows + FROM "{source_schema}"."loops" + ''' + stats = source_conn.execute(text(stats_sql)).mappings().first() + + dist_sql = f''' + SELECT + "type", + count(*) AS cnt + FROM "{source_schema}"."loops" + GROUP BY "type" + ORDER BY cnt DESC, "type" NULLS FIRST + LIMIT 20 + ''' + type_dist = source_conn.execute(text(dist_sql)).mappings().all() + + loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) + + # 核心 SQL: + # 使用标准的 LEFT JOIN 自然展开。 + # 取 type 为 '0' 或 '1' 的回路,一行 loop 如果通过房型关联对应多个 room_id,则会自动分裂成多行 (平铺存储)。 + data_sql = f''' + SELECT + {loops_select}, + h."hotel_id" AS hotel_id, + r."room_id" AS room_id + FROM "{source_schema}"."loops" l + LEFT JOIN "{source_schema}"."room_type" rt + ON rt."id" = l."room_type_id" + LEFT JOIN "{source_schema}"."hotels" h + ON h."id" = rt."hotel_id" + LEFT JOIN "{source_schema}"."rooms" r + ON r."hotel_id" = rt."hotel_id" AND r."room_type_id" = l."room_type_id" + WHERE trim(coalesce(l."type", '')) IN ('0', '1') + ''' + df = pd.read_sql(text(data_sql), source_conn) + + with target_engine.begin() as target_conn: + create_target_table(target_conn, target_schema, target_table) + df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False) + + print(f"Inserted rows: {len(df)}") + print(f"Target table: {target_schema}.{target_table}") + print( + "Source counts: " + f"total={stats['total_rows']}, " + f"exact_type_0_1={stats['exact_match_rows']}, " + f"normalized_type_0_1={stats['normalized_match_rows']}" + ) + print("Top 20 type distribution:") + for row in type_dist: + print(f"type={row['type']!r}, count={row['cnt']}") + + +if __name__ == "__main__": + main()