2025-11-13 10:29:27 +08:00

859 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from smbclient import (
register_session,
listdir,
scandir,
stat,
makedirs, # 递归创建目录
open_file
)
from datetime import datetime
import numpy as np
import cv2
import pandas as pd
import io
class SMBScanner:
def __init__(self, ip, username, password, domain=''):
self.ip = ip
self.username = username
self.password = password
self.domain = domain
def connect(self):
"""连接 SMB 共享"""
try:
register_session(
self.ip,
username=self.username,
password=self.password
)
print(f"成功连接到 {self.ip}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def directory_exists(self, full_path):
"""
检查目录是否存在
Args:
full_path: 全路径
Returns:
bool: 目录是否存在
"""
if not self.connect():
return False
try:
# 尝试获取目录信息
dir_stat = stat(full_path)
return True
except Exception as e:
print(f"未知错误: {e}")
return False
def read_excel(self, smb_path, sheet_name=0):
"""读取Excel文件"""
if not self.connect():
return False
try:
with open_file(smb_path, mode='rb') as smb_file:
file_content = smb_file.read()
excel_data = io.BytesIO(file_content)
df = pd.read_excel(excel_data, sheet_name=sheet_name)
return df
except Exception as e:
print(f"读取Excel失败: {e}")
return None
def process_all_rows(self, df):
"""
处理所有行数据
"""
if df is None or df.empty:
print("没有数据可处理")
return
print("开始处理每行数据:")
print("=" * 60)
results = []
for row_number, (index, row) in enumerate(df.iterrows(), 1):
print(f"\n处理第 {row_number} 行:")
print("-" * 40)
# 显示行数据
for col_name in df.columns:
value = row[col_name]
print(f" {col_name}: {value}")
# 处理逻辑(根据实际需求修改)
processed_row = {
'row_number': row_number,
'original_index': index,
'data': row.to_dict(),
'summary': f"处理了 {len(df.columns)} 个字段"
}
results.append(processed_row)
# 进度显示
if row_number % 10 == 0 or row_number == len(df):
print(f"\n 进度: {row_number}/{len(df)} ({row_number/len(df)*100:.1f}%)")
print("\n" + "=" * 60)
print(f"处理完成!共处理 {len(results)} 行数据")
return results
def get_smb_images(self, full_path):
"""SMB 图片文件获取"""
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
image_files = []
try:
for entry in scandir(full_path):
if entry.is_file():
_, ext = os.path.splitext(entry.name)
if ext.lower() in image_extensions:
image_files.append(entry.path)
elif entry.is_dir():
imgs = self.get_smb_images(entry.path)
image_files.extend(imgs)
except Exception as e:
print(f"错误: {e}")
return image_files
def build_full_path(self, share_path, file_path):
"""构建完整的 SMB 路径"""
# 清理路径中的多余斜杠
share_path = share_path.strip('\\')
file_path = file_path.lstrip('\\')
return f"\\\\{self.ip}\\{share_path}\\{file_path}"
def read_txt_by_line(self, full_path):
"""逐行读取,适合大文件"""
if not self.connect():
return None
print(f"读取 TXT 文件: {full_path}")
try:
with open_file(full_path, mode='rb') as file_obj:
content_bytes = file_obj.read()
# 使用 StringIO 逐行处理
text_content = content_bytes.decode('utf-8', errors='ignore')
string_io = io.StringIO(text_content)
lines = []
line_number = 0
while True:
line = string_io.readline()
if not line: # 读到文件末尾
break
line_number += 1
line = line.strip()
# print(f"行 {line_number}: {line}")
lines.append(line)
print(f"总共读取 {line_number}")
return lines
except Exception as e:
print(f"读取文件时出错: {e}")
return None
def read_img_file(self, full_path):
"""读取文件并返回 OpenCV 图像"""
if not self.connect():
return None
print(f"读取文件: {full_path}")
file_obj = None
try:
# 以二进制模式读取文件
file_obj = open_file(full_path, mode='rb')
content = b""
# 分块读取文件内容
while True:
chunk = file_obj.read(8192) # 8KB 块
if not chunk:
break
content += chunk
print(f"成功读取 {len(content)} 字节")
# 解码图像
if len(content) == 0:
print("文件为空")
return None
image_array = np.frombuffer(content, np.uint8)
image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if image is None:
print("图像解码失败 - 可能不是有效的图像文件")
return None
print(f"图像解码成功: {image.shape}")
return image
except Exception as e:
print(f"读取文件失败: {e}")
return None
finally:
if file_obj:
file_obj.close()
def writeFile(self, share_path, file_path, data, chunk_size=8192):
"""写入文件到 SMB 共享"""
if not self.connect():
return False
full_path = self.build_full_path(share_path, file_path)
file_obj = None
try:
# 确保目录存在
dir_path = os.path.dirname(full_path)
try:
makedirs(dir_path, exist_ok=True)
except:
pass # 目录可能已存在
file_obj = open_file(full_path, mode='wb')
if isinstance(data, bytes):
total_size = len(data)
written = 0
for i in range(0, total_size, chunk_size):
chunk = data[i:i + chunk_size]
file_obj.write(chunk)
written += len(chunk)
print(f"写入进度: {written}/{total_size} 字节 ({written/total_size*100:.1f}%)")
elif hasattr(data, '__iter__'):
total_written = 0
for chunk in data:
if isinstance(chunk, str):
chunk = chunk.encode('utf-8')
file_obj.write(chunk)
total_written += len(chunk)
print(f"已写入: {total_written} 字节")
else:
file_obj.write(bytes(data))
print(f"文件写入完成: {full_path}")
return True
except Exception as e:
print(f"写入文件失败: {e}")
return False
finally:
if file_obj:
file_obj.close()
def writeImageToFile(self, share_path, file_path, image, image_format='.jpg', quality=95):
"""将 OpenCV 图像写入 SMB 文件"""
if not self.connect():
return False
full_path = f"{file_path}{image_format}"
file_obj = None
try:
if image_format.lower() == '.jpg':
encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
success, encoded_image = cv2.imencode(image_format, image, encode_params)
else:
success, encoded_image = cv2.imencode(image_format, image)
if not success:
print("图像编码失败")
return False
image_data = encoded_image.tobytes()
return self.writeFile(share_path, f"{file_path}{image_format}", image_data)
except Exception as e:
print(f"写入图像失败: {e}")
return False
def _ensure_remote_directory(self, share_name, remote_dir):
"""确保远程目录存在"""
if not remote_dir:
return
try:
# 构建完整远程路径
full_remote_path = self.build_full_path(share_name, remote_dir)
# 使用 makedirs 递归创建目录(如果不存在)
makedirs(full_remote_path, exist_ok=True)
print(f"确保远程目录存在: {remote_dir}")
except Exception as e:
print(f"创建远程目录失败: {e}")
raise
def upload_directory(self, local_dir, share_name, remote_dir="", overwrite=True):
"""
将本地目录推送到远程共享目录
"""
if not self.connect():
return False
print(f"开始上传目录: {local_dir} -> {share_name}/{remote_dir}")
if not os.path.exists(local_dir):
print(f"本地目录不存在: {local_dir}")
return False
try:
# 确保远程目录存在
self._ensure_remote_directory(share_name, remote_dir)
# 递归上传目录内容
success = self._upload_directory_recursive(local_dir, share_name, remote_dir, overwrite)
if success:
print("目录上传完成")
else:
print("目录上传过程中出现错误")
return success
except Exception as e:
print(f"上传目录失败: {e}")
return False
def _upload_directory_recursive(self, local_path, share_name, remote_path, overwrite):
"""递归上传目录内容"""
try:
success = True
for item_name in os.listdir(local_path):
local_item_path = os.path.join(local_path, item_name)
remote_item_path = f"{remote_path}/{item_name}" if remote_path else item_name
if os.path.isdir(local_item_path):
# 处理子目录
print(f"上传子目录: {item_name}")
# 确保远程子目录存在
self._ensure_remote_directory(share_name, remote_item_path)
# 递归上传子目录
sub_success = self._upload_directory_recursive(local_item_path, share_name, remote_item_path, overwrite)
if not sub_success:
success = False
else:
# 上传文件
file_success = self._upload_single_file(local_item_path, share_name, remote_item_path, overwrite)
if not file_success:
success = False
return success
except Exception as e:
print(f"上传目录内容失败 {local_path}: {e}")
return False
def _upload_single_file(self, local_file_path, share_name, remote_file_path, overwrite):
"""上传单个文件"""
file_obj = None
try:
# 构建远程完整路径
full_remote_path = self.build_full_path(share_name, remote_file_path)
# 检查文件是否已存在
if not overwrite:
try:
stat(full_remote_path)
print(f"文件已存在,跳过: {remote_file_path}")
return True
except FileNotFoundError:
# 文件不存在,继续上传
pass
# 上传文件
print(f"上传文件: {os.path.basename(local_file_path)}")
# 读取本地文件
with open(local_file_path, 'rb') as local_file:
local_content = local_file.read()
# 写入远程文件
with open_file(full_remote_path, mode='wb') as remote_file:
remote_file.write(local_content)
file_size = len(local_content)
print(f"文件上传成功: {remote_file_path} ({file_size} 字节)")
return True
except Exception as e:
print(f"上传文件失败 {local_file_path}: {e}")
return False
def upload_file(self, local_file_path, share_name, remote_file_path, overwrite=True):
"""
上传单个文件到远程共享目录
"""
if not self.connect():
return False
print(f"上传文件: {local_file_path} -> {share_name}/{remote_file_path}")
file_obj = None
try:
# 构建远程完整路径
full_remote_path = self.build_full_path(share_name, remote_file_path)
# 检查文件是否已存在
if not overwrite:
try:
stat(full_remote_path)
print(f"文件已存在,跳过: {remote_file_path}")
return True
except FileNotFoundError:
# 文件不存在,继续上传
pass
# 以二进制模式读取本地文件
with open(local_file_path, 'rb') as local_file:
content = b""
# 分块读取文件内容
while True:
chunk = local_file.read(8192) # 8KB 块
if not chunk:
break
content += chunk
print(f"成功读取 {len(content)} 字节")
if len(content) == 0:
print("文件为空")
return False
# 写入远程文件
with open_file(full_remote_path, mode='wb') as remote_file:
remote_file.write(content)
print(f"文件上传成功")
return True
except Exception as e:
print(f"上传文件失败: {e}")
return False
def find_folders_by_name(self, share_path, folder_name, start_dir="", max_depth=10):
"""专门查找文件夹"""
return self.find_items_by_name(
share_path=share_path,
target_name=folder_name,
item_type="folder",
start_dir=start_dir,
max_depth=max_depth
)
def find_files_by_name(self, share_path, file_name, start_dir="", max_depth=10):
"""专门查找文件"""
return self.find_items_by_name(
share_path=share_path,
target_name=file_name,
item_type="file",
start_dir=start_dir,
max_depth=max_depth
)
def find_items_by_name(self, share_path, target_name, item_type="both", start_dir="", max_depth=10):
"""
递归查找指定名称的文件夹和/或文件
Args:
share_path: 共享名称
target_name: 目标名称(支持通配符 * 和 ?
item_type: 查找类型 - "folder", "file", "both"
start_dir: 起始目录
max_depth: 最大搜索深度
Returns:
list: 找到的完整路径列表
"""
if not self.connect():
return []
found_paths = []
start_path = self.build_full_path(share_path, start_dir)
try:
self._search_recursive(
share_path=share_path,
current_path=start_path,
target_name=target_name,
item_type=item_type,
found_paths=found_paths,
current_depth=0,
max_depth=max_depth
)
except Exception as e:
print(f"搜索过程中出错: {e}")
return found_paths
def _search_recursive(self, share_path, current_path, target_name, item_type, found_paths, current_depth, max_depth):
"""递归搜索文件夹和文件"""
if current_depth > max_depth:
return
try:
for entry in scandir(current_path):
try:
# 检查文件夹
if entry.is_dir():
if self._is_match(entry.name, target_name) and item_type in ["both", "folder"]:
found_paths.append(entry.path)
print(f"找到目标文件夹: {entry.path}")
# 递归搜索子目录
self._search_recursive(
share_path=share_path,
current_path=entry.path,
target_name=target_name,
item_type=item_type,
found_paths=found_paths,
current_depth=current_depth + 1,
max_depth=max_depth
)
# 检查文件
elif entry.is_file():
if self._is_match(entry.name, target_name) and item_type in ["both", "file"]:
found_paths.append(entry.path)
print(f"找到目标文件: {entry.path}")
except Exception as e:
print(f"处理条目 {entry.path} 时出错: {e}")
except Exception as e:
print(f"搜索目录 {current_path} 时出错: {e}")
def _is_match(self, name, pattern):
"""
检查名称是否匹配模式(支持简单通配符)
Args:
name: 实际名称
pattern: 匹配模式(支持 * 和 ?
Returns:
bool: 是否匹配
"""
# 如果没有通配符,直接比较
if '*' not in pattern and '?' not in pattern:
return name.lower() == pattern.lower()
# 通配符匹配
import fnmatch
return fnmatch.fnmatch(name.lower(), pattern.lower())
def list_directory(self, share_path, dir, recursive=False, max_depth=3):
"""列出目录内容"""
if not self.connect():
return []
try:
full_path = f"\\\\{self.ip}\\{share_path}\\{dir}"
print(f"开始遍历: {full_path}")
result = []
self._walk_directory(full_path, recursive, max_depth, 0, result)
except Exception as e:
print(f"遍历失败: {e}")
return result
def _walk_directory(self, path, recursive, max_depth, current_depth, result):
"""递归遍历目录"""
if current_depth > max_depth:
return
try:
for entry in scandir(path):
try:
file_stat = stat(entry.path)
indent = " " * current_depth
# 创建条目信息字典
item = {
'name': entry.name,
'path': entry.path,
'depth': current_depth,
'indent': indent,
'is_dir': entry.is_dir(),
'size': file_stat.st_size if not entry.is_dir() else 0,
'modified_time': datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
}
if entry.is_dir():
# print(f"{indent}文件夹:{entry.name}/")
result.append(item)
if recursive and current_depth < max_depth:
sub_items = self._walk_directory(
entry.path,
recursive,
max_depth,
current_depth + 1
)
result.extend(sub_items)
else:
file_size = self._format_size(file_stat.st_size)
mod_time = datetime.fromtimestamp(
file_stat.st_mtime
).strftime('%Y-%m-%d %H:%M:%S')
# print(f"{indent}文件:{entry.name} [{file_size}] [{mod_time}]")
item['formatted_size'] = file_size
result.append(item)
except Exception as e:
print(f"{indent} 无法访问: {entry.name} - {e}")
except Exception as e:
print(f"无法读取目录 {path}: {e}")
return result
def _format_size(self, size_bytes):
"""格式化文件大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def get_file_info(self, share_path, file_path):
"""获取文件详细信息"""
if not self.connect():
return None
try:
full_path = f"\\\\{self.ip}\\{share_path}\\{file_path}"
file_stat = stat(full_path)
return {
'name': os.path.basename(file_path),
'path': full_path,
'size': file_stat.st_size,
'size_formatted': self._format_size(file_stat.st_size),
'create_time': datetime.fromtimestamp(file_stat.st_ctime),
'modify_time': datetime.fromtimestamp(file_stat.st_mtime),
'access_time': datetime.fromtimestamp(file_stat.st_atime),
'is_dir': False # 需要额外判断
}
except Exception as e:
print(f"获取文件信息失败: {e}")
return None
def display_image(self, image, window_name="Image"):
"""
显示图像
Args:
image: OpenCV图像
window_name: 窗口名称
"""
# 创建窗口
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
# 调整窗口大小适应屏幕
screen_width = 1920 # 可根据实际屏幕调整
screen_height = 1080
img_height, img_width = image.shape[:2]
# 计算缩放比例
scale = min(screen_width / img_width, screen_height / img_height, 1.0)
if scale < 1.0:
new_width = int(img_width * scale)
new_height = int(img_height * scale)
image = cv2.resize(image, (new_width, new_height))
print(f"图像已缩放: {img_width}x{img_height} -> {new_width}x{new_height}")
# 显示图像
cv2.imshow(window_name, image)
print("图像显示中... 按任意键关闭窗口")
# 等待按键
cv2.waitKey(0)
cv2.destroyAllWindows()
print("窗口已关闭")
# 从传入的路径中提取ip共享目录目标访问目录
def get_conf(zip_url, user_name, pwd) :
zip_url = zip_url.replace('\\\\', '/')
zip_url = zip_url.replace('\\', '/')
if zip_url.startswith("/"):
zip_url = zip_url.replace('/', '', 1)
parts = zip_url.split('/')
if len(parts) < 2 :
print(f"传入的共享目录格式错误: {zip_url}")
return "", "fail"
dir = ''
if len(parts) > 2:
new_parts = parts[2:]
dir = '/'.join(new_parts)
# 配置信息
config = {
'ip': parts[0],
'username': user_name,
'password': pwd,
'domain': '', # 工作组留空
'share': parts[1],
'dir': dir
}
return config
def get_scanner(zip_url, user_name, pwd) :
config = get_conf(zip_url, user_name, pwd)
# 创建扫描器
scanner = SMBScanner(
ip=config['ip'],
username=config['username'],
password=config['password'],
domain=config['domain']
)
return scanner
# filename -> 桩号
def get_pile_dict(dir,user_name,pwd) :
config = get_conf(dir, user_name, pwd)
scanner = get_scanner(dir, user_name=user_name, pwd=pwd)
found_paths = scanner.find_files_by_name(
share_path=config['share'],
file_name='fileindex.txt'
)
print(f"\n找到 {len(found_paths)}'fileindex.txt' 文件:")
for i, path in enumerate(found_paths, 1):
print(f"{i}. {path}")
lines = scanner.read_txt_by_line(full_path=found_paths[0])
pile_dict = {}
for i, line in enumerate(lines, 1):
parts = line.strip().split("->")
if len(parts)>=4:
pile_dict[parts[3]]=parts[1] # filename -> 桩号
return pile_dict
def main():
# 配置信息
config = {
'ip': '192.168.110.114',
'username': 'administrator',
'password': 'abc@1234',
'domain': '', # 工作组留空
'share': 'share_File',
'dir': '西南计算机'
}
# 创建扫描器
scanner = SMBScanner(
ip=config['ip'],
username=config['username'],
password=config['password'],
domain=config['domain']
)
# 遍历共享目录
# scanner.list_directory(
# share_path=config['share'],
# dir=config['dir'],
# recursive=True, # 递归遍历
# max_depth=9 # 最大深度
# )
# 读取文件
# full_path = scanner.build_full_path(
# share_path=config['share'],
# file_path= f"{config['dir']}/AA县/报送数据/图像类/CD45500155A/Images/20250508131651/01/20250508-131712-644.jpg"
# )
# image = scanner.read_img_file(full_path=full_path)
# scanner.display_image(image)
# # 写入文件
# scanner.writeImageToFile(
# share_path=config['share'],
# file_path= f"{config['dir']}/AA县/报送数据/图像类_识别/CD45500155A/Images/20250508131651/01/20250508-131712-644.jpg",
# image=image
# )
# # 查找指定文件夹 报送数据
# found_paths = scanner.find_folders_by_name(
# share_path=config['share'],
# folder_name='报送数据'
# )
# print(f"\n找到 {len(found_paths)} 个 '报送数据' 文件夹:")
# for i, path in enumerate(found_paths, 1):
# print(f"{i}. {path}")
# # 查找指定目录中的所有图片
# full_path = scanner.build_full_path(share_path=config['share'], file_path='西南计算机\\AA县\\报送数据')
# imgPaths = scanner.get_smb_images(full_path)
# for i, path in enumerate(imgPaths, 1):
# print(f"{i}. {path}")
# # 读取excel
# full_path = scanner.build_full_path(share_path=config['share'], file_path='西南计算机\\AA县\\24年年报.xlsx')
# df = scanner.read_excel(full_path)
# scanner.process_all_rows(df)
# 读取txt
# found_paths = scanner.find_files_by_name(
# share_path=config['share'],
# file_name='fileindex.txt'
# )
# print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:")
# for i, path in enumerate(found_paths, 1):
# print(f"{i}. {path}")
# lines = scanner.read_txt_by_line(full_path=found_paths[0])
# for i, line in enumerate(lines, 1):
# print(f"{i}. {line}")
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions"
scanner.upload_directory(output_dir, config['share'], remote_dir="西南计算机/AA县/报送数据_识别")
if __name__ == "__main__":
main()