diff --git a/database_test_project/test.py b/database_test_project/test.py deleted file mode 100644 index c89ae82..0000000 --- a/database_test_project/test.py +++ /dev/null @@ -1,140 +0,0 @@ -import os -from getpass import getpass - -import pandas as pd -from sqlalchemy import create_engine, text - - - -def build_url(prefix: str) -> str: - user = os.getenv(f"{prefix}_PG_USER", "log_admin") - password = os.getenv(f"{prefix}_PG_PASSWORD") - if not password: - password = getpass(f"{prefix} database password: ") - host = os.getenv(f"{prefix}_PG_HOST", "blv-rd.tech") - port = os.getenv(f"{prefix}_PG_PORT", "15433") - default_db = "log_platform" if prefix == "SOURCE" else "test" - database = os.getenv(f"{prefix}_PG_DB", default_db) - if not database: - raise ValueError(f"{prefix}_PG_DB is required") - return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" - - -LOOPS_COLUMNS = [ - "id", - "loop_name", - "room_type_id", - "loop_address", - "loop_type", - "type", - "name", - "power", - "rate", - "temperature", - "air_type", - "air_brand", - "air_model", - "height", - "area", - "heat_loss", - "remark", -] - - -def create_target_table(target_conn, target_schema: str, target_table: str): - target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"')) - target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"')) - - create_sql = f''' - CREATE TABLE "{target_schema}"."{target_table}" ( - "id" int4 NOT NULL, - "loop_name" varchar(255), - "room_type_id" int4 NOT NULL, - "loop_address" varchar(255), - "loop_type" varchar(50), - "type" varchar(254), - "name" varchar(254), - "power" float8, - "rate" float8, - "temperature" float8, - "air_type" varchar(254), - "air_brand" varchar(254), - "air_model" varchar(254), - "height" float8, - "area" float8, - "heat_loss" float8, - "remark" varchar(254), - "hotel_name" varchar(255), - "room_type_name" varchar(255), - "room_name" varchar(255) - ) - ''' - target_conn.execute(text(create_sql)) - - -def main(): - source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") - target_schema = os.getenv("TARGET_SCHEMA", "wh_test") - target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched") - - # 默认源库:log_platform;默认目标库:test - source_engine = create_engine(build_url("SOURCE")) - target_engine = create_engine(build_url("TARGET")) - - with source_engine.connect() as source_conn: - stats_sql = f''' - SELECT - count(*) AS total_rows, - count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows, - count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows - FROM "{source_schema}"."loops" - ''' - stats = source_conn.execute(text(stats_sql)).mappings().first() - - dist_sql = f''' - SELECT - "type", - count(*) AS cnt - FROM "{source_schema}"."loops" - GROUP BY "type" - ORDER BY cnt DESC, "type" NULLS FIRST - LIMIT 20 - ''' - type_dist = source_conn.execute(text(dist_sql)).mappings().all() - - loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS]) - - data_sql = f''' - SELECT - {loops_select}, - h."hotel_name" AS hotel_name, - rt."room_type_name" AS room_type_name, - NULL::varchar(255) AS room_name - FROM "{source_schema}"."loops" l - LEFT JOIN "{source_schema}"."room_type" rt - ON rt."id" = l."room_type_id" - LEFT JOIN "{source_schema}"."hotels" h - ON h."id" = rt."hotel_id" - WHERE trim(coalesce(l."type", '')) IN ('0', '1') - ''' - df = pd.read_sql(text(data_sql), source_conn) - - with target_engine.begin() as target_conn: - create_target_table(target_conn, target_schema, target_table) - df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False) - - print(f"Inserted rows: {len(df)}") - print(f"Target table: {target_schema}.{target_table}") - print( - "Source counts: " - f"total={stats['total_rows']}, " - f"exact_type_0_1={stats['exact_match_rows']}, " - f"normalized_type_0_1={stats['normalized_match_rows']}" - ) - print("Top 20 type distribution:") - for row in type_dist: - print(f"type={row['type']!r}, count={row['cnt']}") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/database_test_python/check_matching_rate.py b/database_test_python/check_matching_rate.py new file mode 100644 index 0000000..9d3a302 --- /dev/null +++ b/database_test_python/check_matching_rate.py @@ -0,0 +1,61 @@ +import os +import pandas as pd +from sqlalchemy import create_engine +import urllib.parse +from db_config import get_db_url + +def main(): + print("正在连接数据库...") + url_80portal = get_db_url("PORTAL") + url_test = get_db_url("TARGET") + + engine_80portal = create_engine(url_80portal) + engine_test = create_engine(url_test) + + print("\n[1/3] 从 15433 test 库提取 loops_power 的数据...") + df_loops = pd.read_sql('SELECT hotel_id, room_id FROM "wh_test"."loops_power"', engine_test) + + print("[2/3] 从 15434 log_platform 库提取 room_status_moment_g5 的数据...") + df_g5 = pd.read_sql('SELECT hotel_id, room_id FROM "room_status"."room_status_moment_g5"', engine_80portal) + + print("\n[3/3] 正在对齐类型并进行交集运算...") + # 彻底数据清洗,防止由于 PostgreSQL 类型或字符串空格导致匹配不上 + df_loops['hotel_id'] = pd.to_numeric(df_loops['hotel_id'], errors='coerce').astype('Int64') + df_loops['room_id'] = df_loops['room_id'].astype(str).str.strip().str.replace('.0', '', regex=False) + + df_g5['hotel_id'] = pd.to_numeric(df_g5['hotel_id'], errors='coerce').astype('Int64') + df_g5['room_id'] = df_g5['room_id'].astype(str).str.strip().str.replace('.0', '', regex=False) + + # 针对 g5 表去重建立参考系 + df_g5_unique = df_g5.drop_duplicates(subset=['hotel_id', 'room_id']).copy() + df_g5_unique['g5_exists'] = True + + # 分析覆盖情况 + total_loops = len(df_loops) + + merged = pd.merge(df_loops, df_g5_unique, on=['hotel_id', 'room_id'], how='left') + matched_loops = merged['g5_exists'].sum() + + match_rate = (matched_loops / total_loops * 100) if total_loops > 0 else 0 + + print("=======================================") + print("匹配结果报告:") + print("=======================================") + print(f"-> 提取的 loops_power 总记录: {total_loops} 条") + print(f"-> 在 g5 表中成功映射找到的记录: {int(matched_loops)} 条") + print(f"-> 整体匹配覆盖率: {match_rate:.2f}%") + + # 挑出匹配上的数据 + mapped = merged[merged['g5_exists'] == True] + if len(mapped) > 0: + print(f"\n[√] 成功匹配配对示例(前10种):") + print(mapped[['hotel_id', 'room_id']].drop_duplicates().head(10).to_string(index=False)) + + # 挑出无法在 G5 找到的烂数据 + unmapped = merged[merged['g5_exists'] != True] + if len(unmapped) > 0: + print(f"\n[!] 发现 {len(unmapped)} 条扁平化回路未在 g5 表中找到对应的 hotel_id/room_id。以下为缺失配对示例(前10种):") + print(unmapped[['hotel_id', 'room_id']].drop_duplicates().head(10).to_string(index=False)) + +if __name__ == "__main__": + main()