feat: 添加数据匹配分析脚本,计算 loops_power 与 g5 表的匹配覆盖率
This commit is contained in:
@@ -1,140 +0,0 @@
|
|||||||
import os
|
|
||||||
from getpass import getpass
|
|
||||||
|
|
||||||
import pandas as pd
|
|
||||||
from sqlalchemy import create_engine, text
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def build_url(prefix: str) -> str:
|
|
||||||
user = os.getenv(f"{prefix}_PG_USER", "log_admin")
|
|
||||||
password = os.getenv(f"{prefix}_PG_PASSWORD")
|
|
||||||
if not password:
|
|
||||||
password = getpass(f"{prefix} database password: ")
|
|
||||||
host = os.getenv(f"{prefix}_PG_HOST", "blv-rd.tech")
|
|
||||||
port = os.getenv(f"{prefix}_PG_PORT", "15433")
|
|
||||||
default_db = "log_platform" if prefix == "SOURCE" else "test"
|
|
||||||
database = os.getenv(f"{prefix}_PG_DB", default_db)
|
|
||||||
if not database:
|
|
||||||
raise ValueError(f"{prefix}_PG_DB is required")
|
|
||||||
return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
|
|
||||||
|
|
||||||
|
|
||||||
LOOPS_COLUMNS = [
|
|
||||||
"id",
|
|
||||||
"loop_name",
|
|
||||||
"room_type_id",
|
|
||||||
"loop_address",
|
|
||||||
"loop_type",
|
|
||||||
"type",
|
|
||||||
"name",
|
|
||||||
"power",
|
|
||||||
"rate",
|
|
||||||
"temperature",
|
|
||||||
"air_type",
|
|
||||||
"air_brand",
|
|
||||||
"air_model",
|
|
||||||
"height",
|
|
||||||
"area",
|
|
||||||
"heat_loss",
|
|
||||||
"remark",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def create_target_table(target_conn, target_schema: str, target_table: str):
|
|
||||||
target_conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{target_schema}"'))
|
|
||||||
target_conn.execute(text(f'DROP TABLE IF EXISTS "{target_schema}"."{target_table}"'))
|
|
||||||
|
|
||||||
create_sql = f'''
|
|
||||||
CREATE TABLE "{target_schema}"."{target_table}" (
|
|
||||||
"id" int4 NOT NULL,
|
|
||||||
"loop_name" varchar(255),
|
|
||||||
"room_type_id" int4 NOT NULL,
|
|
||||||
"loop_address" varchar(255),
|
|
||||||
"loop_type" varchar(50),
|
|
||||||
"type" varchar(254),
|
|
||||||
"name" varchar(254),
|
|
||||||
"power" float8,
|
|
||||||
"rate" float8,
|
|
||||||
"temperature" float8,
|
|
||||||
"air_type" varchar(254),
|
|
||||||
"air_brand" varchar(254),
|
|
||||||
"air_model" varchar(254),
|
|
||||||
"height" float8,
|
|
||||||
"area" float8,
|
|
||||||
"heat_loss" float8,
|
|
||||||
"remark" varchar(254),
|
|
||||||
"hotel_name" varchar(255),
|
|
||||||
"room_type_name" varchar(255),
|
|
||||||
"room_name" varchar(255)
|
|
||||||
)
|
|
||||||
'''
|
|
||||||
target_conn.execute(text(create_sql))
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project")
|
|
||||||
target_schema = os.getenv("TARGET_SCHEMA", "wh_test")
|
|
||||||
target_table = os.getenv("TARGET_TABLE", "loops_type01_enriched")
|
|
||||||
|
|
||||||
# 默认源库:log_platform;默认目标库:test
|
|
||||||
source_engine = create_engine(build_url("SOURCE"))
|
|
||||||
target_engine = create_engine(build_url("TARGET"))
|
|
||||||
|
|
||||||
with source_engine.connect() as source_conn:
|
|
||||||
stats_sql = f'''
|
|
||||||
SELECT
|
|
||||||
count(*) AS total_rows,
|
|
||||||
count(*) FILTER (WHERE "type" IN ('0', '1')) AS exact_match_rows,
|
|
||||||
count(*) FILTER (WHERE trim(coalesce("type", '')) IN ('0', '1')) AS normalized_match_rows
|
|
||||||
FROM "{source_schema}"."loops"
|
|
||||||
'''
|
|
||||||
stats = source_conn.execute(text(stats_sql)).mappings().first()
|
|
||||||
|
|
||||||
dist_sql = f'''
|
|
||||||
SELECT
|
|
||||||
"type",
|
|
||||||
count(*) AS cnt
|
|
||||||
FROM "{source_schema}"."loops"
|
|
||||||
GROUP BY "type"
|
|
||||||
ORDER BY cnt DESC, "type" NULLS FIRST
|
|
||||||
LIMIT 20
|
|
||||||
'''
|
|
||||||
type_dist = source_conn.execute(text(dist_sql)).mappings().all()
|
|
||||||
|
|
||||||
loops_select = ",\n ".join([f'l."{c}"' for c in LOOPS_COLUMNS])
|
|
||||||
|
|
||||||
data_sql = f'''
|
|
||||||
SELECT
|
|
||||||
{loops_select},
|
|
||||||
h."hotel_name" AS hotel_name,
|
|
||||||
rt."room_type_name" AS room_type_name,
|
|
||||||
NULL::varchar(255) AS room_name
|
|
||||||
FROM "{source_schema}"."loops" l
|
|
||||||
LEFT JOIN "{source_schema}"."room_type" rt
|
|
||||||
ON rt."id" = l."room_type_id"
|
|
||||||
LEFT JOIN "{source_schema}"."hotels" h
|
|
||||||
ON h."id" = rt."hotel_id"
|
|
||||||
WHERE trim(coalesce(l."type", '')) IN ('0', '1')
|
|
||||||
'''
|
|
||||||
df = pd.read_sql(text(data_sql), source_conn)
|
|
||||||
|
|
||||||
with target_engine.begin() as target_conn:
|
|
||||||
create_target_table(target_conn, target_schema, target_table)
|
|
||||||
df.to_sql(target_table, target_conn, schema=target_schema, if_exists="append", index=False)
|
|
||||||
|
|
||||||
print(f"Inserted rows: {len(df)}")
|
|
||||||
print(f"Target table: {target_schema}.{target_table}")
|
|
||||||
print(
|
|
||||||
"Source counts: "
|
|
||||||
f"total={stats['total_rows']}, "
|
|
||||||
f"exact_type_0_1={stats['exact_match_rows']}, "
|
|
||||||
f"normalized_type_0_1={stats['normalized_match_rows']}"
|
|
||||||
)
|
|
||||||
print("Top 20 type distribution:")
|
|
||||||
for row in type_dist:
|
|
||||||
print(f"type={row['type']!r}, count={row['cnt']}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
61
database_test_python/check_matching_rate.py
Normal file
61
database_test_python/check_matching_rate.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import os
|
||||||
|
import pandas as pd
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
import urllib.parse
|
||||||
|
from db_config import get_db_url
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print("正在连接数据库...")
|
||||||
|
url_80portal = get_db_url("PORTAL")
|
||||||
|
url_test = get_db_url("TARGET")
|
||||||
|
|
||||||
|
engine_80portal = create_engine(url_80portal)
|
||||||
|
engine_test = create_engine(url_test)
|
||||||
|
|
||||||
|
print("\n[1/3] 从 15433 test 库提取 loops_power 的数据...")
|
||||||
|
df_loops = pd.read_sql('SELECT hotel_id, room_id FROM "wh_test"."loops_power"', engine_test)
|
||||||
|
|
||||||
|
print("[2/3] 从 15434 log_platform 库提取 room_status_moment_g5 的数据...")
|
||||||
|
df_g5 = pd.read_sql('SELECT hotel_id, room_id FROM "room_status"."room_status_moment_g5"', engine_80portal)
|
||||||
|
|
||||||
|
print("\n[3/3] 正在对齐类型并进行交集运算...")
|
||||||
|
# 彻底数据清洗,防止由于 PostgreSQL 类型或字符串空格导致匹配不上
|
||||||
|
df_loops['hotel_id'] = pd.to_numeric(df_loops['hotel_id'], errors='coerce').astype('Int64')
|
||||||
|
df_loops['room_id'] = df_loops['room_id'].astype(str).str.strip().str.replace('.0', '', regex=False)
|
||||||
|
|
||||||
|
df_g5['hotel_id'] = pd.to_numeric(df_g5['hotel_id'], errors='coerce').astype('Int64')
|
||||||
|
df_g5['room_id'] = df_g5['room_id'].astype(str).str.strip().str.replace('.0', '', regex=False)
|
||||||
|
|
||||||
|
# 针对 g5 表去重建立参考系
|
||||||
|
df_g5_unique = df_g5.drop_duplicates(subset=['hotel_id', 'room_id']).copy()
|
||||||
|
df_g5_unique['g5_exists'] = True
|
||||||
|
|
||||||
|
# 分析覆盖情况
|
||||||
|
total_loops = len(df_loops)
|
||||||
|
|
||||||
|
merged = pd.merge(df_loops, df_g5_unique, on=['hotel_id', 'room_id'], how='left')
|
||||||
|
matched_loops = merged['g5_exists'].sum()
|
||||||
|
|
||||||
|
match_rate = (matched_loops / total_loops * 100) if total_loops > 0 else 0
|
||||||
|
|
||||||
|
print("=======================================")
|
||||||
|
print("匹配结果报告:")
|
||||||
|
print("=======================================")
|
||||||
|
print(f"-> 提取的 loops_power 总记录: {total_loops} 条")
|
||||||
|
print(f"-> 在 g5 表中成功映射找到的记录: {int(matched_loops)} 条")
|
||||||
|
print(f"-> 整体匹配覆盖率: {match_rate:.2f}%")
|
||||||
|
|
||||||
|
# 挑出匹配上的数据
|
||||||
|
mapped = merged[merged['g5_exists'] == True]
|
||||||
|
if len(mapped) > 0:
|
||||||
|
print(f"\n[√] 成功匹配配对示例(前10种):")
|
||||||
|
print(mapped[['hotel_id', 'room_id']].drop_duplicates().head(10).to_string(index=False))
|
||||||
|
|
||||||
|
# 挑出无法在 G5 找到的烂数据
|
||||||
|
unmapped = merged[merged['g5_exists'] != True]
|
||||||
|
if len(unmapped) > 0:
|
||||||
|
print(f"\n[!] 发现 {len(unmapped)} 条扁平化回路未在 g5 表中找到对应的 hotel_id/room_id。以下为缺失配对示例(前10种):")
|
||||||
|
print(unmapped[['hotel_id', 'room_id']].drop_duplicates().head(10).to_string(index=False))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user