From dcab8864952ffe2bc70e54d93db8b03d3265a1a1 Mon Sep 17 00:00:00 2001 From: Indiem87 Date: Mon, 16 Mar 2026 00:47:19 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BB=8E=20SOURCE=20?= =?UTF-8?q?=E5=BA=93=E6=8F=90=E5=8F=96=20loops=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E3=80=81=E8=BF=87=E6=BB=A4=20type=200/1=20=E5=B9=B6=E5=86=99?= =?UTF-8?q?=E5=85=A5=20TARGET=20=E7=9A=84=20ETL=20=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=EF=BC=88=E5=90=AB=E7=BB=9F=E8=AE=A1=E4=B8=8E=E7=9B=AE=E6=A0=87?= =?UTF-8?q?=E8=A1=A8=E5=88=9B=E5=BB=BA=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- database_test_project/test.py | 139 ++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 database_test_project/test.py diff --git a/database_test_project/test.py b/database_test_project/test.py new file mode 100644 index 0000000..48b9b6c --- /dev/null +++ b/database_test_project/test.py @@ -0,0 +1,139 @@ +import os +from getpass import getpass + +import pandas as pd +from sqlalchemy import create_engine, text + + +def build_url(prefix: str) -> str: + user = os.getenv(f"{prefix}_PG_USER", "log_admin") + password = os.getenv(f"{prefix}_PG_PASSWORD") + if not password: + password = getpass(f"{prefix} database password: ") + host = os.getenv(f"{prefix}_PG_HOST", "blv-rd.tech") + port = os.getenv(f"{prefix}_PG_PORT", "15433") + default_db = "log_platform" if prefix == "SOURCE" else "test" + database = os.getenv(f"{prefix}_PG_DB", default_db) + if not database: + raise ValueError(f"{prefix}_PG_DB is required") + return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" + + +LOOPS_COLUMNS = [ + "id", + "loop_name", + "room_type_id", + "loop_address", + "loop_type", + "type", + "name", + "power", + "rate", + "temperature", + "air_type", + "air_brand", + "air_model", + "height", + "area", + "heat_loss", + "remark", +] + + +def create_target_table(target_conn, target_schema: str, target_table: str): + target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"')) + target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"')) + + create_sql = f''' + CREATE TABLE "{target_schema}"."{target_table}" ( + "id" int4 NOT NULL, + "loop_name" varchar(255), + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255), + "loop_type" varchar(50), + "type" varchar(254), + "name" varchar(254), + "power" float8, + "rate" float8, + "temperature" float8, + "air_type" varchar(254), + "air_brand" varchar(254), + "air_model" varchar(254), + "height" float8, + "area" float8, + "heat_loss" float8, + "remark" varchar(254), + "hotel_name" varchar(255), + "room_type_name" varchar(255), + "room_name" varchar(255) + ) + ''' + target_conn.execute(text(create_sql)) + + +def main(): + source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") + target_schema = os.getenv("TARGET_SCHEMA", "wh_test") + target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched") + + # 默认源库:log_platform;默认目标库:test + source_engine = create_engine(build_url("SOURCE")) + target_engine = create_engine(build_url("TARGET")) + + with source_engine.connect() as source_conn: + stats_sql = f''' + SELECT + count(*) AS total_rows, + count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows, + count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows + FROM "{source_schema}"."loops" + ''' + stats = source_conn.execute(text(stats_sql)).mappings().first() + + dist_sql = f''' + SELECT + "type", + count(*) AS cnt + FROM "{source_schema}"."loops" + GROUP BY "type" + ORDER BY cnt DESC, "type" NULLS FIRST + LIMIT 20 + ''' + type_dist = source_conn.execute(text(dist_sql)).mappings().all() + + loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) + + data_sql = f''' + SELECT + {loops_select}, + h."hotel_name" AS hotel_name, + rt."room_type_name" AS room_type_name, + NULL::varchar(255) AS room_name + FROM "{source_schema}"."loops" l + LEFT JOIN "{source_schema}"."room_type" rt + ON rt."id" = l."room_type_id" + LEFT JOIN "{source_schema}"."hotels" h + ON h."id" = rt."hotel_id" + WHERE trim(coalesce(l."type", '')) IN ('0', '1') + ''' + df = pd.read_sql(text(data_sql), source_conn) + + with target_engine.begin() as target_conn: + create_target_table(target_conn, target_schema, target_table) + df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False) + + print(f"Inserted rows: {len(df)}") + print(f"Target table: {target_schema}.{target_table}") + print( + "Source counts: " + f"total={stats['total_rows']}, " + f"exact_type_0_1={stats['exact_match_rows']}, " + f"normalized_type_0_1={stats['normalized_match_rows']}" + ) + print("Top 20 type distribution:") + for row in type_dist: + print(f"type={row['type']!r}, count={row['cnt']}") + + +if __name__ == "__main__": + main() \ No newline at end of file