import os from getpass import getpass import pandas as pd from sqlalchemy import create_engine, text def build_url(prefix: str) -> str: user = os.getenv(f"{prefix}_PG_USER", "log_admin") password = os.getenv(f"{prefix}_PG_PASSWORD") if not password: password = getpass(f"{prefix} database password: ") host = os.getenv(f"{prefix}_PG_HOST", "blv-rd.tech") port = os.getenv(f"{prefix}_PG_PORT", "15433") default_db = "log_platform" if prefix == "SOURCE" else "test" database = os.getenv(f"{prefix}_PG_DB", default_db) if not database: raise ValueError(f"{prefix}_PG_DB is required") return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" LOOPS_COLUMNS = [ "id", "loop_name", "room_type_id", "loop_address", "loop_type", "type", "name", "power", "rate", "temperature", "air_type", "air_brand", "air_model", "height", "area", "heat_loss", "remark", ] def create_target_table(target_conn, target_schema: str, target_table: str): target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"')) target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"')) create_sql = f''' CREATE TABLE "{target_schema}"."{target_table}" ( "id" int4 NOT NULL, "loop_name" varchar(255), "room_type_id" int4 NOT NULL, "loop_address" varchar(255), "loop_type" varchar(50), "type" varchar(254), "name" varchar(254), "power" float8, "rate" float8, "temperature" float8, "air_type" varchar(254), "air_brand" varchar(254), "air_model" varchar(254), "height" float8, "area" float8, "heat_loss" float8, "remark" varchar(254), "hotel_name" varchar(255), "room_type_name" varchar(255), "room_name" varchar(255) ) ''' target_conn.execute(text(create_sql)) def main(): source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") target_schema = os.getenv("TARGET_SCHEMA", "wh_test") target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched") # 默认源库:log_platform;默认目标库:test source_engine = create_engine(build_url("SOURCE")) target_engine = create_engine(build_url("TARGET")) with source_engine.connect() as source_conn: stats_sql = f''' SELECT count(*) AS total_rows, count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows, count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows FROM "{source_schema}"."loops" ''' stats = source_conn.execute(text(stats_sql)).mappings().first() dist_sql = f''' SELECT "type", count(*) AS cnt FROM "{source_schema}"."loops" GROUP BY "type" ORDER BY cnt DESC, "type" NULLS FIRST LIMIT 20 ''' type_dist = source_conn.execute(text(dist_sql)).mappings().all() loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) data_sql = f''' SELECT {loops_select}, h."hotel_name" AS hotel_name, rt."room_type_name" AS room_type_name, NULL::varchar(255) AS room_name FROM "{source_schema}"."loops" l LEFT JOIN "{source_schema}"."room_type" rt ON rt."id" = l."room_type_id" LEFT JOIN "{source_schema}"."hotels" h ON h."id" = rt."hotel_id" WHERE trim(coalesce(l."type", '')) IN ('0', '1') ''' df = pd.read_sql(text(data_sql), source_conn) with target_engine.begin() as target_conn: create_target_table(target_conn, target_schema, target_table) df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False) print(f"Inserted rows: {len(df)}") print(f"Target table: {target_schema}.{target_table}") print( "Source counts: " f"total={stats['total_rows']}, " f"exact_type_0_1={stats['exact_match_rows']}, " f"normalized_type_0_1={stats['normalized_match_rows']}" ) print("Top 20 type distribution:") for row in type_dist: print(f"type={row['type']!r}, count={row['cnt']}") if __name__ == "__main__": main()