功能概述
这是一个将Word文档(docx)转换为Markdown格式的工具,具有以下特点:
- 支持批量转换目录下的所有docx文件
- 自动提取文档中的图片
- 将图片上传至MinIO存储
- 自动替换Markdown文件中的图片链接
代码
import os
import pypandoc
from comlogger import logger
class DocxConverter:
def __init__(self, minio_util):
self.minio_util = minio_util
def process_docx_in_dir(self, docx_dir, output_dir, progress_callback=None):
"""处理目录下的所有docx文件"""
logger.info(f"开始处理docx文件: {docx_dir}")
# 检查docx_dir 是否存在
if not os.path.exists(docx_dir):
logger.error(f"docx文件目录不存在: {docx_dir}")
return
# 检查output_dir 是否存在
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 遍历docx_dir 下的所有docx文件
for root, dirs, files in os.walk(docx_dir):
for file in files:
if file.lower().endswith('.docx'):
docx_file = os.path.join(root, file)
print(f"开始处理docx文件: {docx_file}")
self.convert_doc_to_markdown(docx_file, output_dir)
if progress_callback:
progress_callback()
def convert_doc_to_markdown(self, doc_file, output_dir):
"""转换doc/docx to markdown, 使用pandoc 命令行"""
output_file = os.path.join(output_dir, os.path.basename(doc_file).replace(".docx", ".md"))
# 使用pandoc 命令行
# 检查文件是否存在
# 确保目录存在
media_dir = os.path.join(output_dir, os.path.basename(doc_file).replace(".docx", "_media"))
if not os.path.exists(media_dir):
os.makedirs(media_dir, exist_ok=True)
# 调用 pypandoc 进行转换,并指定图片存储目录
pypandoc.convert_file(
doc_file, 'md', format='docx', outputfile=output_file,
extra_args=[f'--extract-media={media_dir}']
)
logger.info(f"Markdown 文件已保存: {output_file}")
logger.info(f"图片已提取到目录: {media_dir}")
print(f"Markdown 文件已保存: {output_file}")
print(f"图片已提取到目录: {media_dir}")
# 替换markdown文件中的图片
if os.path.exists(output_file):
self.replace_media_in_markdown(output_file, media_dir)
return output_file
def replace_media_in_markdown(self, markdown_file, media_dir):
"""替换markdown文件中的图片"""
# 使用with语句逐行读取和写入,避免一次性加载整个文件到内存
temp_file = markdown_file + '.tmp'
try:
with open(markdown_file, 'r', encoding='utf-8') as input_file, \
open(temp_file, 'w', encoding='utf-8') as output_file:
content = ''
# 每次读取一个块
chunk_size = 8192 # 8KB chunks
while True:
chunk = input_file.read(chunk_size)
if not chunk:
break
content += chunk
# 当内容达到一定大小时进行处理
if len(content) >= 1024 * 1024: # 1MB
processed_content = self._process_media_content(content, media_dir)
output_file.write(processed_content)
content = ''
# 处理剩余内容
if content:
processed_content = self._process_media_content(content, media_dir)
output_file.write(processed_content)
# 替换原文件
os.replace(temp_file, markdown_file)
except Exception as e:
logger.error(f"处理文件时发生错误: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
raise
def _process_media_content(self, content, media_dir):
"""处理内容中的媒体文件引用"""
for root, _, files in os.walk(media_dir):
for file in files:
if file.endswith(('.png', '.jpg', '.jpeg', '.gif')):
file_path = os.path.join(root, file)
# 上传到minio
logger.info(f"上传图片: {file_path}")
markdown_file_name = os.path.basename(file_path)
markdown_file_name = markdown_file_name.replace('.md', '').replace('.MD', '').replace(' ', '-')
miniourl = self.minio_util.upload_to_minio(file_path, markdown_file_name)
# 使用url替换图片路径
content = content.replace(file_path, miniourl)
return content