diff --git a/database_test_python/validate_flattened.py b/database_test_python/validate_flattened.py new file mode 100644 index 0000000..7dd3354 --- /dev/null +++ b/database_test_python/validate_flattened.py @@ -0,0 +1,98 @@ +import os +from getpass import getpass + +import pandas as pd +from sqlalchemy import create_engine, text + +from db_config import get_db_url + + +def main(): + source_schema = os.getenv("SOURCE_SCHEMA", "temporary_project") + target_schema = os.getenv("TARGET_SCHEMA", "wh_test") + target_table = os.getenv("TARGET_TABLE", "loops_type01_flattened") + + source_engine = create_engine(get_db_url("SOURCE")) + target_engine = create_engine(get_db_url("TARGET")) + + print("\n[1/3] 从目标库抽取前500条记录...") + with target_engine.connect() as target_conn: + target_sql = f''' + SELECT id AS loop_id, loop_name, room_type_id, hotel_id, room_id + FROM "{target_schema}"."{target_table}" + LIMIT 500 + ''' + df_target = pd.read_sql(text(target_sql), target_conn) + + if df_target.empty: + print("未在目标表中找到数据。请先运行 test_flattened.py 生成数据。") + return + + print(f"✅ 成功抽取 {len(df_target)} 条目标记录,开始按源表进行逐条交叉验证...") + + errors = [] + success_count = 0 + + with source_engine.connect() as source_conn: + for idx, row in df_target.iterrows(): + loop_id = row['loop_id'] + target_room_type_id = row['room_type_id'] + target_hotel_id = row['hotel_id'] + target_room_id = row['room_id'] + + # 步骤 1:验证这个 loop 的 room_type_id 是否一致 + val_sql_1 = f''' + SELECT l.room_type_id, h.hotel_id + FROM "{source_schema}"."loops" l + LEFT JOIN "{source_schema}"."room_type" rt ON l.room_type_id = rt.id + LEFT JOIN "{source_schema}"."hotels" h ON rt.hotel_id = h.id + WHERE l.id = :loop_id + LIMIT 1 + ''' + source_basic = source_conn.execute(text(val_sql_1), {"loop_id": loop_id}).mappings().first() + + if not source_basic: + errors.append(f"行 {idx}: loop_id {loop_id} 在源库 loops 中不存在") + continue + + if source_basic['room_type_id'] != target_room_type_id: + errors.append(f"行 {idx}: loop_id {loop_id} room_type_id 不匹配. 源:{source_basic['room_type_id']} 目标:{target_room_type_id}") + continue + + if pd.notna(target_hotel_id) and source_basic['hotel_id'] != target_hotel_id: + errors.append(f"行 {idx}: loop_id {loop_id} hotel_id 不匹配. 源推导:{source_basic['hotel_id']} 目标:{target_hotel_id}") + continue + + # 步骤 2:如果提取出了具体的 room_id,验证它确实属于对应的 hotel_id和room_type_id + if pd.notna(target_room_id): + val_sql_2 = f''' + SELECT 1 + FROM "{source_schema}"."rooms" r + LEFT JOIN "{source_schema}"."hotels" h ON r.hotel_id = h.id + WHERE h.hotel_id = :hotel_id AND r.room_type_id = :room_type_id AND r.room_id = :room_id + LIMIT 1 + ''' + room_check = source_conn.execute( + text(val_sql_2), + {"hotel_id": target_hotel_id, "room_type_id": target_room_type_id, "room_id": target_room_id} + ).first() + + if not room_check: + errors.append(f"行 {idx}: loops_id {loop_id} 对应的房间 {target_room_id} 未在 rooms (hotel={target_hotel_id}, r_type={target_room_type_id}) 中找到对应映射") + continue + + success_count += 1 + + print(f"\n[3/3] 验证完成。共检查 {len(df_target)} 条数据。") + print(f"✅ 验证通过条数: {success_count}") + + if errors: + print(f"❌ 发现 {len(errors)} 个异常。前 5 个异常展示如下:") + for e in errors[:5]: + print(" - " + e) + else: + print("🎉 完美!抽样的所有数据结构匹配逻辑都百分百正确。") + + +if __name__ == "__main__": + main()