diff --git a/database_test_python/Loops_Power_Valid_1.py b/database_test_python/Loops_Power_Valid_1.py new file mode 100644 index 0000000..a619235 --- /dev/null +++ b/database_test_python/Loops_Power_Valid_1.py @@ -0,0 +1,160 @@ +import os + +import pandas as pd +from sqlalchemy import create_engine, text + +from db_config import get_db_url + + +LOOPS_COLUMNS = [ + "id", + "loop_name", + "room_type_id", + "loop_address", + "loop_type", + "type", + "name", + "power", + "rate", + "temperature", + "air_type", + "air_brand", + "air_model", + "height", + "area", + "heat_loss", + "remark", +] + + +def create_target_table(target_conn, target_schema: str, target_table: str): + """ + 在目标库中创建目标表。 + 该表包含基础回路信息,以及归属的 hotel_name、room_type_name 和聚合后的 room_id 数组。 + """ + target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"')) + target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"')) + + create_sql = f''' + CREATE TABLE "{target_schema}"."{target_table}" ( + "id" int4 NOT NULL, + "loop_name" varchar(255), + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255), + "loop_type" varchar(50), + "type" varchar(254), + "name" varchar(254), + "power" float8, + "rate" float8, + "temperature" float8, + "air_type" varchar(254), + "air_brand" varchar(254), + "air_model" varchar(254), + "height" float8, + "area" float8, + "heat_loss" float8, + "remark" varchar(254), + "hotel_id" int4, + "hotel_name" varchar(255), + "room_type_name" varchar(255), + "room_id" varchar(255)[] + ) + ''' + target_conn.execute(text(create_sql)) + + +def main(): + """ + 主函数:执行数据的抽取、转换与加载 (ETL)。 + 此脚本主要用于将 loops 表与 room_type、hotels、rooms 表关联, + 并通过 array_agg 把同一回路对应多个 room_id 聚合成一个数组作为单行存储 (Aggregated/Enriched)。 + """ + # 从环境变量获取 schema 和 table 名称配置 + source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") + target_schema = os.getenv("TARGET_SCHEMA", "wh_test") + target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched") + + # 默认源库:log_platform;默认目标库:test + # 使用刚重构的全局 db_config 构建对应的数据库连接引擎 + source_engine = create_engine(get_db_url("SOURCE")) + target_engine = create_engine(get_db_url("TARGET")) + + with source_engine.connect() as source_conn: + stats_sql = f''' + SELECT + count(*) AS total_rows, + count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows, + count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows + FROM "{source_schema}"."loops" + ''' + stats = source_conn.execute(text(stats_sql)).mappings().first() + + dist_sql = f''' + SELECT + "type", + count(*) AS cnt + FROM "{source_schema}"."loops" + GROUP BY "type" + ORDER BY cnt DESC, "type" NULLS FIRST + LIMIT 20 + ''' + type_dist = source_conn.execute(text(dist_sql)).mappings().all() + + loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) + group_by_columns = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) + + # 核心 SQL: + # 将 type 为 '0' 或 '1' 的回路关联酒店和房间信息, + # 并基于 LOOP 唯一信息,将同一回路所辖的多个房间 ID 合并为一个 PostgreSQL 字符串数组。 + data_sql = f''' + SELECT + {loops_select}, + h."hotel_id" AS hotel_id, + h."hotel_name" AS hotel_name, + rt."room_type_name" AS room_type_name, + COALESCE(array_agg(r."room_id") FILTER (WHERE r."room_id" IS NOT NULL), ARRAY[]::varchar[]) AS room_id + FROM "{source_schema}"."loops" l + LEFT JOIN "{source_schema}"."room_type" rt + ON rt."id" = l."room_type_id" + LEFT JOIN "{source_schema}"."hotels" h + ON h."id" = rt."hotel_id" + LEFT JOIN "{source_schema}"."rooms" r + ON r."hotel_id" = rt."hotel_id" AND r."room_type_id" = l."room_type_id" + WHERE trim(coalesce(l."type", '')) IN ('0', '1') + GROUP BY + {group_by_columns}, + h."hotel_id", + h."hotel_name", + rt."room_type_name" + ''' + df = pd.read_sql(text(data_sql), source_conn) + + with target_engine.begin() as target_conn: + create_target_table(target_conn, target_schema, target_table) + df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False) + + print(f"Inserted rows: {len(df)}") + print(f"Target table: {target_schema}.{target_table}") + print( + "Source counts: " + f"total={stats['total_rows']}, " + f"exact_type_0_1={stats['exact_match_rows']}, " + f"normalized_type_0_1={stats['normalized_match_rows']}" + ) + print("Top 20 type distribution:") + for row in type_dist: + print(f"type={row['type']!r}, count={row['cnt']}") + + +if __name__ == "__main__": + main() + + + + + + + + + # YourActualStrongPasswordForPostgres! + # H3IkLUt8K!x