feat: 添加数据验证脚本,验证目标表与源表数据一致性

This commit is contained in:
2026-04-09 11:48:07 +08:00
parent a2cc185279
commit 8ad417365e

View File

@@ -0,0 +1,98 @@
import os
from getpass import getpass
import pandas as pd
from sqlalchemy import create_engine, text
from db_config import get_db_url
def main():
source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project")
target_schema = os.getenv("TARGET_SCHEMA", "wh_test")
target_table = os.getenv("TARGET_TABLE", "loops_type01_flattened")
source_engine = create_engine(get_db_url("SOURCE"))
target_engine = create_engine(get_db_url("TARGET"))
print("\n[1/3] 从目标库抽取前500条记录...")
with target_engine.connect() as target_conn:
target_sql = f'''
SELECT id AS loop_id, loop_name, room_type_id, hotel_id, room_id
FROM "{target_schema}"."{target_table}"
LIMIT 500
'''
df_target = pd.read_sql(text(target_sql), target_conn)
if df_target.empty:
print("未在目标表中找到数据。请先运行 test_flattened.py 生成数据。")
return
print(f"✅ 成功抽取 {len(df_target)} 条目标记录,开始按源表进行逐条交叉验证...")
errors = []
success_count = 0
with source_engine.connect() as source_conn:
for idx, row in df_target.iterrows():
loop_id = row['loop_id']
target_room_type_id = row['room_type_id']
target_hotel_id = row['hotel_id']
target_room_id = row['room_id']
# 步骤 1验证这个 loop 的 room_type_id 是否一致
val_sql_1 = f'''
SELECT l.room_type_id, h.hotel_id
FROM "{source_schema}"."loops" l
LEFT JOIN "{source_schema}"."room_type" rt ON l.room_type_id = rt.id
LEFT JOIN "{source_schema}"."hotels" h ON rt.hotel_id = h.id
WHERE l.id = :loop_id
LIMIT 1
'''
source_basic = source_conn.execute(text(val_sql_1), {"loop_id": loop_id}).mappings().first()
if not source_basic:
errors.append(f"{idx}: loop_id {loop_id} 在源库 loops 中不存在")
continue
if source_basic['room_type_id'] != target_room_type_id:
errors.append(f"{idx}: loop_id {loop_id} room_type_id 不匹配. 源:{source_basic['room_type_id']} 目标:{target_room_type_id}")
continue
if pd.notna(target_hotel_id) and source_basic['hotel_id'] != target_hotel_id:
errors.append(f"{idx}: loop_id {loop_id} hotel_id 不匹配. 源推导:{source_basic['hotel_id']} 目标:{target_hotel_id}")
continue
# 步骤 2如果提取出了具体的 room_id验证它确实属于对应的 hotel_id和room_type_id
if pd.notna(target_room_id):
val_sql_2 = f'''
SELECT 1
FROM "{source_schema}"."rooms" r
LEFT JOIN "{source_schema}"."hotels" h ON r.hotel_id = h.id
WHERE h.hotel_id = :hotel_id AND r.room_type_id = :room_type_id AND r.room_id = :room_id
LIMIT 1
'''
room_check = source_conn.execute(
text(val_sql_2),
{"hotel_id": target_hotel_id, "room_type_id": target_room_type_id, "room_id": target_room_id}
).first()
if not room_check:
errors.append(f"{idx}: loops_id {loop_id} 对应的房间 {target_room_id} 未在 rooms (hotel={target_hotel_id}, r_type={target_room_type_id}) 中找到对应映射")
continue
success_count += 1
print(f"\n[3/3] 验证完成。共检查 {len(df_target)} 条数据。")
print(f"✅ 验证通过条数: {success_count}")
if errors:
print(f"❌ 发现 {len(errors)} 个异常。前 5 个异常展示如下:")
for e in errors[:5]:
print(" - " + e)
else:
print("🎉 完美!抽样的所有数据结构匹配逻辑都百分百正确。")
if __name__ == "__main__":
main()