Files
Learn_Energy_Special_Wen/database_test_project/test.py
Indiem87 f7df419e21 添加 ETL 脚本:导出并丰富 loops 数据(交互式密码输入)
- 提取 SOURCE 库中 loops 表中 type 为 '0'/'1' 的行,联表补全酒店与房型名称,创建目标 schema/table 并写入到 TARGET 库。
   - 使用 SQLAlchemy + pandas 实现;支持通过环境变量配置连接信息,包含用于创建目标表的 create_target_table 函数和用于构建连接 URL 的 build_url 函数。
   - 脚本会在目标库创建 schema(若不存在)并重建目标表。
   - 依赖: pandas, sqlalchemy, psycopg2。
2026-03-16 01:15:07 -04:00

140 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from getpass import getpass
import pandas as pd
from sqlalchemy import create_engine, text
def build_url(prefix: str) -> str:
user = os.getenv(f"{prefix}_PG_USER", "log_admin")
password = os.getenv(f"{prefix}_PG_PASSWORD")
if not password:
password = getpass(f"{prefix} database password: ")
host = os.getenv(f"{prefix}_PG_HOST", "blv-rd.tech")
port = os.getenv(f"{prefix}_PG_PORT", "15433")
default_db = "log_platform" if prefix == "SOURCE" else "test"
database = os.getenv(f"{prefix}_PG_DB", default_db)
if not database:
raise ValueError(f"{prefix}_PG_DB is required")
return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
LOOPS_COLUMNS = [
"id",
"loop_name",
"room_type_id",
"loop_address",
"loop_type",
"type",
"name",
"power",
"rate",
"temperature",
"air_type",
"air_brand",
"air_model",
"height",
"area",
"heat_loss",
"remark",
]
def create_target_table(target_conn, target_schema: str, target_table: str):
target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"'))
target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"'))
create_sql = f'''
CREATE TABLE "{target_schema}"."{target_table}" (
"id" int4 NOT NULL,
"loop_name" varchar(255),
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255),
"loop_type" varchar(50),
"type" varchar(254),
"name" varchar(254),
"power" float8,
"rate" float8,
"temperature" float8,
"air_type" varchar(254),
"air_brand" varchar(254),
"air_model" varchar(254),
"height" float8,
"area" float8,
"heat_loss" float8,
"remark" varchar(254),
"hotel_name" varchar(255),
"room_type_name" varchar(255),
"room_name" varchar(255)
)
'''
target_conn.execute(text(create_sql))
def main():
source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project")
target_schema = os.getenv("TARGET_SCHEMA", "wh_test")
target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched")
# 默认源库log_platform默认目标库test
source_engine = create_engine(build_url("SOURCE"))
target_engine = create_engine(build_url("TARGET"))
with source_engine.connect() as source_conn:
stats_sql = f'''
SELECT
count(*) AS total_rows,
count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows,
count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows
FROM "{source_schema}"."loops"
'''
stats = source_conn.execute(text(stats_sql)).mappings().first()
dist_sql = f'''
SELECT
"type",
count(*) AS cnt
FROM "{source_schema}"."loops"
GROUP BY "type"
ORDER BY cnt DESC, "type" NULLS FIRST
LIMIT 20
'''
type_dist = source_conn.execute(text(dist_sql)).mappings().all()
loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS])
data_sql = f'''
SELECT
{loops_select},
h."hotel_name" AS hotel_name,
rt."room_type_name" AS room_type_name,
NULL::varchar(255) AS room_name
FROM "{source_schema}"."loops" l
LEFT JOIN "{source_schema}"."room_type" rt
ON rt."id" = l."room_type_id"
LEFT JOIN "{source_schema}"."hotels" h
ON h."id" = rt."hotel_id"
WHERE trim(coalesce(l."type", '')) IN ('0', '1')
'''
df = pd.read_sql(text(data_sql), source_conn)
with target_engine.begin() as target_conn:
create_target_table(target_conn, target_schema, target_table)
df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False)
print(f"Inserted rows: {len(df)}")
print(f"Target table: {target_schema}.{target_table}")
print(
"Source counts: "
f"total={stats['total_rows']}, "
f"exact_type_0_1={stats['exact_match_rows']}, "
f"normalized_type_0_1={stats['normalized_match_rows']}"
)
print("Top 20 type distribution:")
for row in type_dist:
print(f"type={row['type']!r}, count={row['cnt']}")
if __name__ == "__main__":
main()