ai_project_v1/middleware/query_postgress.py

151 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import psycopg2
from typing import List, Dict, Any, Optional, Union
def batch_query_model_func_id(
target_ids: List[Union[int, str]],
host: str,
port: int,
dbname: str,
user: str,
password: str,
parse_func: Optional[callable] = None
) -> []:
"""
批量从 PostgreSQL 数据库查询数据并解析
参数:
target_ids: 要查询的ID列表
endpoint: 数据库端点
port: 数据库端口
dbname: 数据库名称
user: 用户名
password: 密码
parse_func: 可选的解析函数,用于处理每行结果
返回:
字典键为查询ID值为查询结果或解析后的结果
"""
conn = None
result_dict = []
if not target_ids:
print("警告: 提供的ID列表为空")
return result_dict
try:
# 连接数据库
conn = psycopg2.connect(
host=host,
port=port,
dbname=dbname,
user=user,
password=password,
)
with conn.cursor() as cursor:
# 使用IN子句批量查询
placeholders = ','.join(['%s'] * len(target_ids))
query = f"select func_id,model_func_id from ai_model_list WHERE func_id IN ({placeholders});"
cursor.execute(query, tuple(target_ids))
# 获取所有结果
rows = cursor.fetchall()
# 处理结果
for row in rows:
func_id, model_func_id = row
row_data={
"func_id":func_id,
"model_func_id":model_func_id
}
result_dict.append(row_data)
print(f"record_idrecord_id {model_func_id}")
# if parse_func:
# # 如果有解析函数,使用它处理结果
# try:
# parsed_result = parse_func(stylefile)
# result_dict[record_id] = parsed_result
# except Exception as e:
# print(f"解析ID {record_id} 的结果时出错: {e}")
# result_dict[record_id] = {"error": str(e), "raw": stylefile}
# else:
# # 没有解析函数,直接存储原始结果
# result_dict[record_id] = stylefile
# if result_dict
# # 处理未找到的ID
# found_ids = {row[0] for row in rows}
# missing_ids = set(target_ids) - found_ids
# for missing_id in missing_ids:
# result_dict[missing_id] = None
# print(f"警告: 未找到ID {missing_id} 的记录")
except psycopg2.Error as e:
print(f"数据库查询失败: {e}")
raise
finally:
if conn:
conn.close()
return result_dict
# 示例解析函数
def example_parse_stylefile(stylefile_path: str) -> Dict[str, str]:
"""
示例解析函数解析MinIO对象路径
假设路径格式为: "bucket_name/path/to/file"
返回包含bucket和key的字典
"""
if not stylefile_path:
return {"error": "空路径"}
try:
parts = stylefile_path.split('/', 1)
if len(parts) < 2:
return {"error": "无效路径格式", "raw": stylefile_path}
bucket, key = parts
return {
"bucket": bucket,
"key": key,
"full_path": stylefile_path
}
except Exception as e:
return {"error": str(e), "raw": stylefile_path}
# 使用示例
if __name__ == "__main__":
# 数据库连接信息
db_config = {
"endpoint": "8.137.54.85",
"port": 5060,
"dbname": "smart_dev_123",
"user": "postgres",
"password": "root"
}
# 要查询的ID列表
target_ids = [1, 2, 3, 4, 5]
try:
# 不使用解析函数的基本查询
results = batch_query_model_func_id(target_ids, **db_config)
print("基本查询结果:")
for id_, path in results.items():
print(f"ID {id_}: {path}")
# # 使用解析函数的查询
# results_parsed = batch_query_and_parse(
# target_ids,
# **db_config,
# parse_func=example_parse_stylefile
# )
print("\n解析后的结果:")
# for id_, data in results_parsed.items():
# print(f"ID {id_}: {data}")
except Exception as e:
print(f"发生错误: {e}")