diff --git a/database_test_project/test.py b/database_test_project/test.py new file mode 100644 index 0000000..48b9b6c --- /dev/null +++ b/database_test_project/test.py @@ -0,0 +1,139 @@ +import os +from getpass import getpass + +import pandas as pd +from sqlalchemy import create_engine, text + + +def build_url(prefix: str) -> str: + user = os.getenv(f"{prefix}_PG_USER", "log_admin") + password = os.getenv(f"{prefix}_PG_PASSWORD") + if not password: + password = getpass(f"{prefix} database password: ") + host = os.getenv(f"{prefix}_PG_HOST", "blv-rd.tech") + port = os.getenv(f"{prefix}_PG_PORT", "15433") + default_db = "log_platform" if prefix == "SOURCE" else "test" + database = os.getenv(f"{prefix}_PG_DB", default_db) + if not database: + raise ValueError(f"{prefix}_PG_DB is required") + return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" + + +LOOPS_COLUMNS = [ + "id", + "loop_name", + "room_type_id", + "loop_address", + "loop_type", + "type", + "name", + "power", + "rate", + "temperature", + "air_type", + "air_brand", + "air_model", + "height", + "area", + "heat_loss", + "remark", +] + + +def create_target_table(target_conn, target_schema: str, target_table: str): + target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"')) + target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"')) + + create_sql = f''' + CREATE TABLE "{target_schema}"."{target_table}" ( + "id" int4 NOT NULL, + "loop_name" varchar(255), + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255), + "loop_type" varchar(50), + "type" varchar(254), + "name" varchar(254), + "power" float8, + "rate" float8, + "temperature" float8, + "air_type" varchar(254), + "air_brand" varchar(254), + "air_model" varchar(254), + "height" float8, + "area" float8, + "heat_loss" float8, + "remark" varchar(254), + "hotel_name" varchar(255), + "room_type_name" varchar(255), + "room_name" varchar(255) + ) + ''' + target_conn.execute(text(create_sql)) + + +def main(): + source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") + target_schema = os.getenv("TARGET_SCHEMA", "wh_test") + target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched") + + # 默认源库:log_platform;默认目标库:test + source_engine = create_engine(build_url("SOURCE")) + target_engine = create_engine(build_url("TARGET")) + + with source_engine.connect() as source_conn: + stats_sql = f''' + SELECT + count(*) AS total_rows, + count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows, + count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows + FROM "{source_schema}"."loops" + ''' + stats = source_conn.execute(text(stats_sql)).mappings().first() + + dist_sql = f''' + SELECT + "type", + count(*) AS cnt + FROM "{source_schema}"."loops" + GROUP BY "type" + ORDER BY cnt DESC, "type" NULLS FIRST + LIMIT 20 + ''' + type_dist = source_conn.execute(text(dist_sql)).mappings().all() + + loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) + + data_sql = f''' + SELECT + {loops_select}, + h."hotel_name" AS hotel_name, + rt."room_type_name" AS room_type_name, + NULL::varchar(255) AS room_name + FROM "{source_schema}"."loops" l + LEFT JOIN "{source_schema}"."room_type" rt + ON rt."id" = l."room_type_id" + LEFT JOIN "{source_schema}"."hotels" h + ON h."id" = rt."hotel_id" + WHERE trim(coalesce(l."type", '')) IN ('0', '1') + ''' + df = pd.read_sql(text(data_sql), source_conn) + + with target_engine.begin() as target_conn: + create_target_table(target_conn, target_schema, target_table) + df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False) + + print(f"Inserted rows: {len(df)}") + print(f"Target table: {target_schema}.{target_table}") + print( + "Source counts: " + f"total={stats['total_rows']}, " + f"exact_type_0_1={stats['exact_match_rows']}, " + f"normalized_type_0_1={stats['normalized_match_rows']}" + ) + print("Top 20 type distribution:") + for row in type_dist: + print(f"type={row['type']!r}, count={row['cnt']}") + + +if __name__ == "__main__": + main() \ No newline at end of file