This commit is contained in:
2025-03-07 15:32:58 +08:00
commit 73d42bbccf
20 changed files with 3735 additions and 0 deletions

8
core/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2025 Any2MIF Project
# All rights reserved.
"""
Any2MIF - 核心功能包
"""

144
core/batch_manager.py Normal file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2025 Any2MIF Project
# All rights reserved.
"""
Any2MIF - 批量队列管理器
负责管理批量转换任务
"""
import os
import threading
import queue
from PyQt6.QtCore import QObject, pyqtSignal
class BatchManager(QObject):
"""批量队列管理器"""
# 自定义信号
progress_updated = pyqtSignal(int, int, str) # 进度更新信号 (当前, 总数, 文件名)
conversion_completed = pyqtSignal(int, int) # 转换完成信号 (成功数, 失败数)
def __init__(self, converter):
"""
初始化批量队列管理器
参数:
converter: MIF转换器实例
"""
super().__init__()
self.converter = converter
self.task_queue = queue.Queue()
self.is_running = False
self.worker_thread = None
self.success_count = 0
self.fail_count = 0
def start_batch(self, files, output_dir, params, processed_image=None):
"""
启动批量转换
参数:
files (list): 文件路径列表
output_dir (str): 输出目录
params (dict): 转换参数
processed_image (PIL.Image, optional): 处理后的图像,如果提供则用于所有图像文件
"""
# 如果已经在运行,则返回
if self.is_running:
return
# 重置计数器
self.success_count = 0
self.fail_count = 0
# 清空队列
while not self.task_queue.empty():
self.task_queue.get()
# 添加任务到队列
for file_path in files:
output_file = os.path.join(
output_dir,
f"{os.path.splitext(os.path.basename(file_path))[0]}.mif"
)
# 如果有处理后的图像,并且是图像文件,则标记为使用处理后的图像
use_processed = processed_image is not None and file_path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'))
self.task_queue.put((file_path, output_file, params, use_processed, processed_image))
# 设置运行标志
self.is_running = True
# 创建并启动工作线程
self.worker_thread = threading.Thread(target=self._worker)
self.worker_thread.daemon = True
self.worker_thread.start()
def _worker(self):
"""工作线程函数"""
total_tasks = self.task_queue.qsize()
current_task = 0
try:
while not self.task_queue.empty():
# 获取任务
file_path, output_file, params, use_processed, processed_image = self.task_queue.get()
current_task += 1
# 发出进度更新信号
self.progress_updated.emit(current_task, total_tasks, os.path.basename(file_path))
try:
# 执行转换
if use_processed and processed_image:
# 使用处理后的图像直接转换
success = self.converter.convert_from_image(processed_image, output_file, params)
else:
# 使用文件路径转换
success = self.converter.convert(file_path, output_file, params)
# 更新计数器
if success:
self.success_count += 1
else:
self.fail_count += 1
except Exception as e:
print(f"转换错误: {str(e)}")
self.fail_count += 1
# 标记任务完成
self.task_queue.task_done()
finally:
# 设置运行标志
self.is_running = False
# 发出转换完成信号
self.conversion_completed.emit(self.success_count, self.fail_count)
def is_busy(self):
"""
检查是否正在处理任务
返回:
bool: 是否正在处理任务
"""
return self.is_running
def get_progress(self):
"""
获取当前进度
返回:
tuple: (当前任务数, 总任务数, 成功数, 失败数)
"""
total_tasks = self.task_queue.qsize() + self.success_count + self.fail_count
current_task = self.success_count + self.fail_count
return (current_task, total_tasks, self.success_count, self.fail_count)
def cancel(self):
"""取消所有任务"""
# 清空队列
while not self.task_queue.empty():
self.task_queue.get()
self.task_queue.task_done()

256
core/converter.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2025 Any2MIF Project
# All rights reserved.
"""
Any2MIF - MIF格式转换器
负责将任意文件转换为MIF格式
"""
import os
import math
from datetime import datetime
class MifConverter:
"""MIF格式转换器"""
def __init__(self):
"""初始化转换器"""
pass
def convert_from_image(self, image, output_file, params):
"""
将PIL图像对象直接转换为MIF格式
参数:
image (PIL.Image): PIL图像对象
output_file (str): 输出文件路径
params (dict): 转换参数
width (int): 数据宽度 (8/16/32)
auto_depth (bool): 是否自动计算深度
depth (int): 深度 (如果auto_depth为False)
radix (str): 数据格式 (HEX/BIN/DEC)
offset (int): 地址偏移量
header (bool): 是否添加文件头注释
addr_format (str): 地址格式 (standard/compact)
data_per_line (int): 每行数据数量
返回:
bool: 转换是否成功
"""
try:
# 将图像转换为字节数据
if image.mode != 'L':
image = image.convert('L') # 转换为灰度图像
# 将图像转换为字节数组
data = bytearray()
for y in range(image.height):
for x in range(image.width):
pixel = image.getpixel((x, y))
data.append(pixel)
# 计算深度 - 使用图像的当前尺寸(可能是标准化后的尺寸)
if params['auto_depth']:
# 如果启用了自动计算深度,根据图像的当前尺寸和数据宽度计算深度
bytes_per_word = params['width'] // 8
# 使用图像的实际尺寸计算深度
depth = math.ceil((image.width * image.height) / bytes_per_word)
else:
depth = params['depth']
# 打开输出文件
with open(output_file, 'w', encoding='utf-8') as f:
# 根据参数决定是否写入文件头
if params.get('header', True):
self._write_header(f, "内存中图像", params, depth)
# 写入数据
self._write_data(f, data, params, depth)
return True
except Exception as e:
print(f"转换错误: {str(e)}")
return False
def convert(self, input_file, output_file, params):
"""
将输入文件转换为MIF格式
参数:
input_file (str): 输入文件路径
output_file (str): 输出文件路径
params (dict): 转换参数
width (int): 数据宽度 (8/16/32)
auto_depth (bool): 是否自动计算深度
depth (int): 深度 (如果auto_depth为False)
radix (str): 数据格式 (HEX/BIN/DEC)
offset (int): 地址偏移量
header (bool): 是否添加文件头注释
addr_format (str): 地址格式 (standard/compact)
data_per_line (int): 每行数据数量
返回:
bool: 转换是否成功
"""
try:
# 读取输入文件
with open(input_file, 'rb') as f:
data = f.read()
# 计算深度
if params['auto_depth']:
bytes_per_word = params['width'] // 8
depth = math.ceil(len(data) / bytes_per_word)
else:
depth = params['depth']
# 打开输出文件
with open(output_file, 'w', encoding='utf-8') as f:
# 根据参数决定是否写入文件头
if params.get('header', True):
self._write_header(f, input_file, params, depth)
# 写入数据
self._write_data(f, data, params, depth)
return True
except Exception as e:
print(f"转换错误: {str(e)}")
return False
def _write_header(self, file, input_file, params, depth):
"""
写入MIF文件头
参数:
file: 输出文件对象
input_file: 输入文件路径
params: 转换参数
depth: 深度
"""
file.write("WIDTH=%d;\n" % params['width'])
file.write("DEPTH=%d;\n" % depth)
file.write("\n")
file.write("ADDRESS_RADIX=%s;\n" % params.get('addr_radix', 'HEX'))
file.write("DATA_RADIX=%s;\n" % params['radix'])
file.write("\n")
file.write("CONTENT BEGIN\n")
def _write_data(self, file, data, params, depth):
"""写入MIF数据部分"""
bytes_per_word = params['width'] // 8
words_per_line = params['data_per_line']
# 处理每个地址
for addr in range(depth):
# 计算实际地址(加上偏移量)
actual_addr = addr + params['offset']
# 计算数据索引
start_idx = addr * bytes_per_word
end_idx = start_idx + bytes_per_word
# 如果超出数据范围填充0
if start_idx >= len(data):
word_data = 0
else:
# 读取数据
word_bytes = data[start_idx:min(end_idx, len(data))]
# 如果数据不足填充0
if len(word_bytes) < bytes_per_word:
word_bytes = word_bytes + b'\x00' * (bytes_per_word - len(word_bytes))
# 转换为整数
word_data = int.from_bytes(word_bytes, byteorder='little')
# 格式化地址
addr_radix = params.get('addr_radix', 'HEX')
if addr_radix == 'HEX':
addr_format = "@%X" if params['addr_format'] == 'standard' else "%X:"
addr_str = addr_format % actual_addr
elif addr_radix == 'BIN':
# 计算二进制位数(根据深度和偏移量)
max_addr = depth + params['offset'] - 1
bin_digits = len(bin(max_addr)[2:])
if params['addr_format'] == 'standard':
addr_str = "@" + bin(actual_addr)[2:].zfill(bin_digits)
else: # compact
addr_str = bin(actual_addr)[2:].zfill(bin_digits) + ":"
else: # DEC
if params['addr_format'] == 'standard':
addr_str = "@%d" % actual_addr
else: # compact
addr_str = "%d:" % actual_addr
# 格式化数据
if params['radix'] == 'HEX':
# 计算十六进制位数每4位二进制对应1位十六进制
hex_digits = math.ceil(params['width'] / 4)
data_str = format(word_data, 'X').zfill(hex_digits)
elif params['radix'] == 'BIN':
data_str = bin(word_data)[2:].zfill(params['width'])
else: # DEC
# 对于十进制,根据位宽计算最大可能的位数
max_dec_digits = len(str(2**params['width'] - 1))
data_str = str(word_data).zfill(max_dec_digits)
# 写入一行
if (addr + 1) % words_per_line == 0 or addr == depth - 1:
file.write(f"{addr_str} {data_str};\n")
else:
file.write(f"{addr_str} {data_str},\n")
# 写入文件尾
file.write("END;\n")
def estimate_output_size(self, input_file, params):
"""
估计输出文件大小
参数:
input_file (str): 输入文件路径
params (dict): 转换参数
返回:
int: 估计的输出文件大小(字节)
"""
try:
# 获取输入文件大小
input_size = os.path.getsize(input_file)
# 计算深度
bytes_per_word = params['width'] // 8
if params['auto_depth']:
depth = math.ceil(input_size / bytes_per_word)
else:
depth = params['depth']
# 估计每行平均长度
if params['radix'] == 'HEX':
avg_data_len = params['width'] // 4
elif params['radix'] == 'BIN':
avg_data_len = params['width']
else: # DEC
avg_data_len = len(str(2 ** params['width'] - 1))
# 估计地址长度
addr_radix = params.get('addr_radix', 'HEX')
if addr_radix == 'HEX':
addr_len = len(hex(depth + params['offset'])[2:]) + 2
elif addr_radix == 'BIN':
addr_len = len(bin(depth + params['offset'])[2:]) + 2
else: # DEC
addr_len = len(str(depth + params['offset'])) + 2
# 估计每行长度
line_len = addr_len + avg_data_len + 3 # 包括空格、分隔符和换行符
# 估计总大小(不包含文件头)
total_size = depth * line_len
return total_size
except Exception:
# 如果估计失败,返回一个保守的估计值
return input_size * 5

239
core/image_processor.py Normal file
View File

@@ -0,0 +1,239 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2025 Any2MIF Project
# All rights reserved.
"""
Any2MIF - 图像处理器
负责图像的灰度化、二值化等操作
"""
import os
import numpy as np
from PIL import Image, ImageFilter
class ImageProcessor:
"""图像处理器"""
def __init__(self):
"""初始化图像处理器"""
# 支持的图像格式
self.supported_formats = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff']
def is_image_file(self, file_path):
"""
检查文件是否为图像文件
参数:
file_path (str): 文件路径
返回:
bool: 是否为图像文件
"""
_, ext = os.path.splitext(file_path.lower())
return ext in self.supported_formats
def grayscale(self, image, method='weighted'):
"""
将图像转换为灰度图
参数:
image (PIL.Image): 输入图像
method (str): 灰度化方法
'average': 平均值法
'weighted': 加权平均法
'max': 最大值法
返回:
PIL.Image: 灰度图像
"""
# 确保图像是RGB模式
if image.mode != 'RGB':
image = image.convert('RGB')
# 转换为numpy数组
img_array = np.array(image)
# 根据方法进行灰度化
if method == 'average':
# 平均值法: (R + G + B) / 3
gray_array = np.mean(img_array, axis=2).astype(np.uint8)
elif method == 'weighted':
# 加权平均法: 0.299 * R + 0.587 * G + 0.114 * B
gray_array = (0.299 * img_array[:, :, 0] +
0.587 * img_array[:, :, 1] +
0.114 * img_array[:, :, 2]).astype(np.uint8)
elif method == 'max':
# 最大值法: max(R, G, B)
gray_array = np.max(img_array, axis=2).astype(np.uint8)
else:
# 默认使用加权平均法
gray_array = (0.299 * img_array[:, :, 0] +
0.587 * img_array[:, :, 1] +
0.114 * img_array[:, :, 2]).astype(np.uint8)
# 转换回PIL图像
return Image.fromarray(gray_array, mode='L')
def threshold(self, image, threshold=128, method='fixed'):
"""
对图像进行二值化处理
参数:
image (PIL.Image): 输入图像
threshold (int): 阈值 (0-255)
method (str): 二值化方法
'fixed': 固定阈值
'adaptive': 自适应阈值
'otsu': Otsu阈值
返回:
PIL.Image: 二值化图像
"""
# 确保图像是灰度模式
if image.mode != 'L':
image = image.convert('L')
# 转换为numpy数组
img_array = np.array(image)
# 根据方法进行二值化
if method == 'fixed':
# 固定阈值
binary_array = (img_array > threshold).astype(np.uint8) * 255
elif method == 'adaptive':
# 自适应阈值 (简化版本)
# 使用局部区域的平均值作为阈值
kernel_size = 15
# 创建一个平均滤波器
kernel = np.ones((kernel_size, kernel_size)) / (kernel_size * kernel_size)
# 计算局部平均值
local_mean = np.zeros_like(img_array, dtype=np.float32)
# 简单的局部平均实现
padded = np.pad(img_array, kernel_size // 2, mode='reflect')
for i in range(img_array.shape[0]):
for j in range(img_array.shape[1]):
local_mean[i, j] = np.mean(padded[i:i+kernel_size, j:j+kernel_size])
# 二值化
binary_array = (img_array > local_mean - threshold // 2).astype(np.uint8) * 255
elif method == 'otsu':
# Otsu阈值
# 计算直方图
hist, bins = np.histogram(img_array.flatten(), 256, [0, 256])
# 计算累积和
cum_sum = hist.cumsum()
cum_sum_sq = (hist * np.arange(256)).cumsum()
# 初始化
max_var = 0
otsu_threshold = 0
# 遍历所有可能的阈值
for t in range(1, 256):
# 前景和背景的权重
w_bg = cum_sum[t]
w_fg = cum_sum[-1] - w_bg
# 如果前景或背景为空,跳过
if w_bg == 0 or w_fg == 0:
continue
# 前景和背景的均值
mean_bg = cum_sum_sq[t] / w_bg
mean_fg = (cum_sum_sq[-1] - cum_sum_sq[t]) / w_fg
# 计算类间方差
var_between = w_bg * w_fg * (mean_bg - mean_fg) ** 2
# 更新最大方差和阈值
if var_between > max_var:
max_var = var_between
otsu_threshold = t
# 使用Otsu阈值进行二值化
binary_array = (img_array > otsu_threshold).astype(np.uint8) * 255
else:
# 默认使用固定阈值
binary_array = (img_array > threshold).astype(np.uint8) * 255
# 转换回PIL图像
return Image.fromarray(binary_array, mode='L')
def denoise(self, image, method='median', strength=3):
"""
对图像进行降噪处理
参数:
image (PIL.Image): 输入图像
method (str): 降噪方法
'median': 中值滤波
'gaussian': 高斯滤波
'bilateral': 双边滤波 (简化版本)
strength (int): 降噪强度 (1-10)
返回:
PIL.Image: 降噪后的图像
"""
# 根据方法进行降噪
if method == 'median':
# 中值滤波
kernel_size = strength * 2 - 1 # 确保是奇数
return image.filter(ImageFilter.MedianFilter(size=kernel_size))
elif method == 'gaussian':
# 高斯滤波
radius = strength / 2
return image.filter(ImageFilter.GaussianBlur(radius=radius))
elif method == 'bilateral':
# 双边滤波 (简化版本)
# 由于PIL没有内置的双边滤波我们使用高斯滤波代替
# 在实际应用中可以使用OpenCV的双边滤波
radius = strength / 2
return image.filter(ImageFilter.GaussianBlur(radius=radius))
else:
# 默认使用中值滤波
kernel_size = strength * 2 - 1 # 确保是奇数
return image.filter(ImageFilter.MedianFilter(size=kernel_size))
def resize(self, image, width, height, keep_aspect=True):
"""
调整图像大小
参数:
image (PIL.Image): 输入图像
width (int): 目标宽度
height (int): 目标高度
keep_aspect (bool): 是否保持纵横比
返回:
PIL.Image: 调整大小后的图像
"""
if keep_aspect:
# 计算缩放比例
width_ratio = width / image.width
height_ratio = height / image.height
ratio = min(width_ratio, height_ratio)
# 计算新尺寸
new_width = int(image.width * ratio)
new_height = int(image.height * ratio)
# 调整图像大小
resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 创建新图像
new_image = Image.new(image.mode, (width, height), (0, 0, 0))
# 计算粘贴位置
paste_x = (width - new_width) // 2
paste_y = (height - new_height) // 2
# 粘贴调整大小后的图像
new_image.paste(resized_image, (paste_x, paste_y))
return new_image
else:
# 直接调整到指定大小
return image.resize((width, height), Image.Resampling.LANCZOS)