151 lines
4.5 KiB
Python
151 lines
4.5 KiB
Python
import psycopg2
|
||
from typing import List, Dict, Any, Optional, Union
|
||
|
||
|
||
def batch_query_model_func_id(
|
||
target_ids: List[Union[int, str]],
|
||
host: str,
|
||
port: int,
|
||
dbname: str,
|
||
user: str,
|
||
password: str,
|
||
parse_func: Optional[callable] = None
|
||
) -> []:
|
||
"""
|
||
批量从 PostgreSQL 数据库查询数据并解析
|
||
|
||
参数:
|
||
target_ids: 要查询的ID列表
|
||
endpoint: 数据库端点
|
||
port: 数据库端口
|
||
dbname: 数据库名称
|
||
user: 用户名
|
||
password: 密码
|
||
parse_func: 可选的解析函数,用于处理每行结果
|
||
|
||
返回:
|
||
字典,键为查询ID,值为查询结果或解析后的结果
|
||
"""
|
||
conn = None
|
||
result_dict = []
|
||
|
||
if not target_ids:
|
||
print("警告: 提供的ID列表为空")
|
||
return result_dict
|
||
|
||
try:
|
||
# 连接数据库
|
||
conn = psycopg2.connect(
|
||
host=host,
|
||
port=port,
|
||
dbname=dbname,
|
||
user=user,
|
||
password=password,
|
||
)
|
||
|
||
with conn.cursor() as cursor:
|
||
# 使用IN子句批量查询
|
||
placeholders = ','.join(['%s'] * len(target_ids))
|
||
query = f"select func_id,model_func_id from ai_model_list WHERE func_id IN ({placeholders});"
|
||
|
||
cursor.execute(query, tuple(target_ids))
|
||
|
||
# 获取所有结果
|
||
rows = cursor.fetchall()
|
||
|
||
# 处理结果
|
||
for row in rows:
|
||
func_id, model_func_id = row
|
||
row_data={
|
||
"func_id":func_id,
|
||
"model_func_id":model_func_id
|
||
}
|
||
result_dict.append(row_data)
|
||
print(f"record_idrecord_id {model_func_id}")
|
||
# if parse_func:
|
||
# # 如果有解析函数,使用它处理结果
|
||
# try:
|
||
# parsed_result = parse_func(stylefile)
|
||
# result_dict[record_id] = parsed_result
|
||
# except Exception as e:
|
||
# print(f"解析ID {record_id} 的结果时出错: {e}")
|
||
# result_dict[record_id] = {"error": str(e), "raw": stylefile}
|
||
# else:
|
||
# # 没有解析函数,直接存储原始结果
|
||
# result_dict[record_id] = stylefile
|
||
# if result_dict
|
||
# # 处理未找到的ID
|
||
# found_ids = {row[0] for row in rows}
|
||
# missing_ids = set(target_ids) - found_ids
|
||
# for missing_id in missing_ids:
|
||
# result_dict[missing_id] = None
|
||
# print(f"警告: 未找到ID {missing_id} 的记录")
|
||
|
||
except psycopg2.Error as e:
|
||
print(f"数据库查询失败: {e}")
|
||
raise
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
return result_dict
|
||
|
||
|
||
# 示例解析函数
|
||
def example_parse_stylefile(stylefile_path: str) -> Dict[str, str]:
|
||
"""
|
||
示例解析函数,解析MinIO对象路径
|
||
|
||
假设路径格式为: "bucket_name/path/to/file"
|
||
返回包含bucket和key的字典
|
||
"""
|
||
if not stylefile_path:
|
||
return {"error": "空路径"}
|
||
|
||
try:
|
||
parts = stylefile_path.split('/', 1)
|
||
if len(parts) < 2:
|
||
return {"error": "无效路径格式", "raw": stylefile_path}
|
||
|
||
bucket, key = parts
|
||
return {
|
||
"bucket": bucket,
|
||
"key": key,
|
||
"full_path": stylefile_path
|
||
}
|
||
except Exception as e:
|
||
return {"error": str(e), "raw": stylefile_path}
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
# 数据库连接信息
|
||
db_config = {
|
||
"endpoint": "8.137.54.85",
|
||
"port": 5060,
|
||
"dbname": "smart_dev_123",
|
||
"user": "postgres",
|
||
"password": "root"
|
||
}
|
||
|
||
# 要查询的ID列表
|
||
target_ids = [1, 2, 3, 4, 5]
|
||
|
||
try:
|
||
# 不使用解析函数的基本查询
|
||
results = batch_query_model_func_id(target_ids, **db_config)
|
||
print("基本查询结果:")
|
||
for id_, path in results.items():
|
||
print(f"ID {id_}: {path}")
|
||
|
||
# # 使用解析函数的查询
|
||
# results_parsed = batch_query_and_parse(
|
||
# target_ids,
|
||
# **db_config,
|
||
# parse_func=example_parse_stylefile
|
||
# )
|
||
print("\n解析后的结果:")
|
||
# for id_, data in results_parsed.items():
|
||
# print(f"ID {id_}: {data}")
|
||
|
||
except Exception as e:
|
||
print(f"发生错误: {e}") |