This commit is contained in:
zy123 2024-10-24 16:55:39 +08:00
parent 01cd7977ec
commit 4344f9398e
3 changed files with 189 additions and 175 deletions

View File

@ -49,115 +49,118 @@ def pdf2docx(local_path_in):
print(f"format_change p2d:have downloaded file to: {downloaded_filepath}")
return downloaded_filepath
# def doc2docx(local_path_in):
# remote_url = 'http://47.98.59.178:8000/v3/3rd/files/transfer/d2d'
# receive_download_url = upload_file(local_path_in, remote_url)
# print(receive_download_url)
# filename, folder = get_filename_and_folder(local_path_in) # 输入输出在同一个文件夹
# local_filename = os.path.join(folder, filename) # 输出文件名
# downloaded_filepath, file_type = download_file(receive_download_url, local_filename)
# print(f"format_change d2d:have downloaded file to: {downloaded_filepath}")
# return downloaded_filepath
# def docx2pdf(local_path_in):
# remote_url = 'http://47.98.59.178:8000/v3/3rd/files/transfer/d2p'
# receive_download_url = upload_file(local_path_in, remote_url)
# filename, folder = get_filename_and_folder(local_path_in) # 输入输出在同一个文件夹
# local_filename = os.path.join(folder, filename) # 输出文件名
# downloaded_filepath,file_type = download_file(receive_download_url, local_filename)
# print(f"format_change d2p:have downloaded file to: {downloaded_filepath}")
# return downloaded_filepath
def docx2pdf(file_path):
"""
将本地的 .docx .doc 文件转换为 .pdf 文件
def doc2docx(local_path_in):
remote_url = 'http://47.98.59.178:8000/v3/3rd/files/transfer/d2d'
receive_download_url = upload_file(local_path_in, remote_url)
print(receive_download_url)
filename, folder = get_filename_and_folder(local_path_in) # 输入输出在同一个文件夹
local_filename = os.path.join(folder, filename) # 输出文件名
downloaded_filepath, file_type = download_file(receive_download_url, local_filename)
print(f"format_change d2d:have downloaded file to: {downloaded_filepath}")
return downloaded_filepath
def docx2pdf(local_path_in):
remote_url = 'http://47.98.59.178:8000/v3/3rd/files/transfer/d2p'
receive_download_url = upload_file(local_path_in, remote_url)
filename, folder = get_filename_and_folder(local_path_in) # 输入输出在同一个文件夹
local_filename = os.path.join(folder, filename) # 输出文件名
downloaded_filepath,file_type = download_file(receive_download_url, local_filename)
print(f"format_change d2p:have downloaded file to: {downloaded_filepath}")
return downloaded_filepath
参数
- file_path: str, 本地文件的路径支持 .docx .doc 格式
"""
# 检查文件是否存在
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件未找到: {file_path}")
# 获取文件名和扩展名
base_name = os.path.basename(file_path)
name, ext = os.path.splitext(base_name)
ext = ext.lower().lstrip('.')
if ext not in ['docx', 'doc']:
raise ValueError(f"doc2pdf 仅支持 .docx 和 .doc 文件,当前文件扩展名为: .{ext}")
# 定义转换接口
endpoint = 'http://120.26.236.97:5008/convert_to_pdf'
# 获取文件所在目录
output_dir = os.path.dirname(file_path)
# 准备上传的文件
with open(file_path, 'rb') as f:
files = {'file': (base_name, f)}
try:
print(f"正在将 {base_name} 转换为 .pdf 格式...")
response = requests.post(endpoint, files=files)
response.raise_for_status() # 检查请求是否成功
except requests.RequestException as e:
print(f"转换过程中发生错误: {e}")
return
# 准备保存转换后文件的路径
output_file_name = f"{name}.pdf"
output_path = os.path.join(output_dir, output_file_name)
# 保存转换后的文件
with open(output_path, 'wb') as out_file:
out_file.write(response.content)
print(f"文件已成功转换并保存至: {output_path}")
def doc2docx(file_path):
"""
将本地的 .doc 文件转换为 .docx 文件
参数
- file_path: str, 本地文件的路径支持 .doc 格式
"""
# 检查文件是否存在
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件未找到: {file_path}")
# 获取文件名和扩展名
base_name = os.path.basename(file_path)
name, ext = os.path.splitext(base_name)
ext = ext.lower().lstrip('.')
if ext != 'doc':
raise ValueError(f"doc2docx 仅支持 .doc 文件,当前文件扩展名为: .{ext}")
# 定义转换接口
endpoint = 'http://120.26.236.97:5008/convert_to_docx'
# 获取文件所在目录
output_dir = os.path.dirname(file_path)
# 准备上传的文件
with open(file_path, 'rb') as f:
files = {'file': (base_name, f)}
try:
print(f"正在将 {base_name} 转换为 .docx 格式...")
response = requests.post(endpoint, files=files)
response.raise_for_status() # 检查请求是否成功
except requests.RequestException as e:
print(f"转换过程中发生错误: {e}")
return
# 准备保存转换后文件的路径
output_file_name = f"{name}.docx"
output_path = os.path.join(output_dir, output_file_name)
# 保存转换后的文件
with open(output_path, 'wb') as out_file:
out_file.write(response.content)
print(f"文件已成功转换并保存至: {output_path}")
# def docx2pdf(file_path):
# """
# 将本地的 .docx 或 .doc 文件转换为 .pdf 文件。
#
# 参数:
# - file_path: str, 本地文件的路径,支持 .docx 和 .doc 格式。
# """
# # 检查文件是否存在
# if not os.path.isfile(file_path):
# raise FileNotFoundError(f"文件未找到: {file_path}")
#
# # 获取文件名和扩展名
# base_name = os.path.basename(file_path)
# name, ext = os.path.splitext(base_name)
# ext = ext.lower().lstrip('.')
#
# if ext not in ['docx', 'doc']:
# raise ValueError(f"doc2pdf 仅支持 .docx 和 .doc 文件,当前文件扩展名为: .{ext}")
#
# # 定义转换接口
# endpoint = 'http://120.26.236.97:5008/convert_to_pdf'
#
# # 获取文件所在目录
# output_dir = os.path.dirname(file_path)
#
# # 准备上传的文件
# with open(file_path, 'rb') as f:
# files = {'file': (base_name, f)}
# try:
# print(f"正在将 {base_name} 转换为 .pdf 格式...")
# response = requests.post(endpoint, files=files)
# response.raise_for_status() # 检查请求是否成功
# except requests.RequestException as e:
# print(f"转换过程中发生错误: {e}")
# return
#
# # 准备保存转换后文件的路径
# output_file_name = f"{name}.pdf"
# output_path = os.path.join(output_dir, output_file_name)
#
# # 保存转换后的文件
# with open(output_path, 'wb') as out_file:
# out_file.write(response.content)
#
# print(f"文件已成功转换并保存至: {output_path}")
# return output_path
#
#
# def doc2docx(file_path):
# """
# 将本地的 .doc 文件转换为 .docx 文件。
#
# 参数:
# - file_path: str, 本地文件的路径,支持 .doc 格式。
# """
# # 检查文件是否存在
# if not os.path.isfile(file_path):
# raise FileNotFoundError(f"文件未找到: {file_path}")
#
# # 获取文件名和扩展名
# base_name = os.path.basename(file_path)
# name, ext = os.path.splitext(base_name)
# ext = ext.lower().lstrip('.')
#
# if ext != 'doc':
# raise ValueError(f"doc2docx 仅支持 .doc 文件,当前文件扩展名为: .{ext}")
#
# # 定义转换接口
# endpoint = 'http://120.26.236.97:5008/convert_to_docx'
#
# # 获取文件所在目录
# output_dir = os.path.dirname(file_path)
#
# # 准备上传的文件
# with open(file_path, 'rb') as f:
# files = {'file': (base_name, f)}
# try:
# print(f"正在将 {base_name} 转换为 .docx 格式...")
# response = requests.post(endpoint, files=files)
# response.raise_for_status() # 检查请求是否成功
# except requests.RequestException as e:
# print(f"转换过程中发生错误: {e}")
# return
#
# # 准备保存转换后文件的路径
# output_file_name = f"{name}.docx"
# output_path = os.path.join(output_dir, output_file_name)
#
# # 保存转换后的文件
# with open(output_path, 'wb') as out_file:
# out_file.write(response.content)
#
# print(f"文件已成功转换并保存至: {output_path}")
# return output_path
@ -168,8 +171,9 @@ if __name__ == '__main__':
# local_path_in="C:\\Users\\Administrator\\Desktop\\fsdownload\\ztbfile.pdf"
# downloaded_file=doc2docx(local_path_in)
# downloaded_file=pdf2docx(local_path_in)
downloaded_file=docx2pdf(local_path_in)
print(downloaded_file)
for i in range(1):
downloaded_file=docx2pdf(local_path_in)
print(downloaded_file)

View File

@ -1,8 +1,7 @@
from PyPDF2 import PdfReader, PdfWriter
import re # 导入正则表达式库
import os # 用于文件和文件夹操作
from flask_app.general.format_change import docx2pdf # 确保此模块可用
from flask_app.general.merge_pdfs import merge_pdfs # 确保此模块可用
# from flask_app.general.format_change import docx2pdf # 确保此模块可用
def clean_page_content(text, common_header):
"""
@ -14,13 +13,14 @@ def clean_page_content(text, common_header):
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码,支持多种格式
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
# 删除页码,合并多个正则表达式为一个
text = re.sub(
r'(^\s*\d+\s*(?=\D))|(\s+\d+\s*$)|(\s*/\s*\d+\s*)|(\s*[—-]\s*\d+\s*[—-]\s*)',
'',
text,
flags=re.MULTILINE
)
return text
def extract_common_header(pdf_path):
"""
从PDF的前几页提取公共抬头
@ -39,34 +39,31 @@ def extract_common_header(pdf_path):
if text:
# 只取每页的前三行
first_lines = text.strip().split('\n')[:3]
headers.append(first_lines)
headers.append(set(line.strip() for line in first_lines if line.strip()))
if len(headers) < 2:
return "" # 如果没有足够的页来比较,返回空字符串
# 寻找每一行中的公共部分,按顺序保留
common_headers = []
for lines in zip(*headers):
first_words = lines[0].split()
common_line = [word for word in first_words if all(word in line.split() for line in lines[1:])]
if common_line:
common_headers.append(' '.join(common_line))
common_headers = set.intersection(*headers)
return '\n'.join(common_headers)
def is_pdf_or_doc(filename):
"""
判断文件是否为PDF或Word文档
"""
return filename.lower().endswith(('.pdf', '.doc', '.docx'))
def convert_to_pdf(file_path):
"""
如果是Word文档则转换为PDF
"""
if file_path.lower().endswith(('.doc', '.docx')):
return docx2pdf(file_path)
return file_path
# def convert_to_pdf(file_path):
# """
# 如果是Word文档则转换为PDF。
# """
# if file_path.lower().endswith(('.doc', '.docx')):
# return docx2pdf(file_path)
# return file_path
def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix):
"""
@ -76,8 +73,8 @@ def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_fo
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
if start_page < 0 or end_page >= len(pdf_document.pages) or start_page > end_page:
print(f"无效的页面范围: {start_page}{end_page}")
if start_page is None or end_page is None or start_page > end_page:
print(f"无效的页面范围: {start_page}{end_page} 文件: {pdf_path}")
return ""
output_doc = PdfWriter()
@ -88,24 +85,32 @@ def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_fo
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
except Exception as e:
print(f"Error in save_extracted_pages: {e}")
print(f"Error in save_extracted_pages for file {pdf_path}: {e}")
return "" # 返回空字符串
def extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header):
def find_page_indices(pdf_document, begin_pattern, end_pattern, begin_page, common_header):
"""
通用函数根据模式提取起始页和结束页
查找起始页和结束页的索引
"""
start_page = None
end_page = None
total_pages = len(pdf_document.pages)
for i, page in enumerate(pdf_document.pages):
if i <= begin_page:
continue
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page:
if start_page is None and re.search(begin_pattern, cleaned_text):
start_page = i
if start_page is not None and re.search(end_pattern, cleaned_text) and i > start_page:
continue
if start_page is not None and re.search(end_pattern, cleaned_text):
end_page = i
break
return start_page, end_page
return start_page, end_page if end_page else total_pages - 1
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
"""
@ -114,37 +119,45 @@ def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_patter
try:
common_header = extract_common_header(pdf_path)
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages) - 1 # 获取总页数
# 提取起始页和结束页
start_page, end_page = extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header)
start_page, end_page = find_page_indices(pdf_document, begin_pattern, end_pattern, begin_page, common_header)
if output_suffix == "format":
if output_suffix == "format" and start_page is None:
print(f"{output_suffix}: 未找到起始页,尝试二次提取! 文件: {pdf_path}")
# 二次提取逻辑,保留原有功能
start_page, end_page = find_page_indices(
pdf_document,
re.compile(r'^(?:响应|投标).*?格式.*', re.MULTILINE),
end_pattern,
begin_page,
common_header
)
if start_page is None:
print(f"{output_suffix}: 未找到起始页,提取失败!")
print(f"{output_suffix}: 未找到起始页,提取失败! 文件: {pdf_path}")
return ""
if end_page is None:
# 如果未匹配到结束页,默认截取到文件末尾
end_page = total_pages
print(f"{output_suffix}: 未找到结束页,默认截取到文件末尾。")
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
if start_page is None or end_page is None:
print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
if start_page is None:
print(f"未找到起始页,提取失败! 文件: {pdf_path}")
return ""
if end_page is None:
end_page = len(pdf_document.pages) - 1
print(f"{output_suffix}: 未找到结束页,默认截取到文件末尾。 文件: {pdf_path}")
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
except Exception as e:
print(f"Error processing {pdf_path}: {e}")
return ""
def process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
"""
处理单个文件转换为PDF如果需要并提取页面
"""
pdf_path = convert_to_pdf(file_path)
result = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
return result or ""
# pdf_path = convert_to_pdf(file_path)
pdf_path=file_path
return extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) or ""
def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
"""
@ -159,20 +172,15 @@ def process_input(input_path, output_folder, begin_pattern, begin_page, end_patt
file_path = os.path.join(input_path, file_name)
if is_pdf_or_doc(file_path):
result = process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if isinstance(result, tuple):
generated_files.extend([f if f else "" for f in result]) # 保留空字符串
else:
generated_files.append(result) # 直接添加result可能是空字符串
generated_files.append(result)
elif os.path.isfile(input_path) and is_pdf_or_doc(input_path):
result = process_files(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if isinstance(result, tuple):
generated_files.extend([f if f else "" for f in result]) # 保留空字符串
else:
generated_files.append(result) # 直接添加result可能是空字符串
generated_files.append(result)
else:
print("提供的路径既不是文件夹也不是PDF文件。")
print("提供的路径既不是文件夹也不是PDF/Word文件。")
return [f for f in generated_files if f] # 过滤空字符串
return generated_files
def truncate_pdf_main(input_path, output_folder, selection):
"""
@ -180,27 +188,30 @@ def truncate_pdf_main(input_path, output_folder, selection):
"""
try:
if selection == 1: # 投标文件格式
begin_page = 5
begin_page = 10
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:响应|投标).*?格式.*'
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:响应|投标).*?格式.*',
re.MULTILINE
)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+',
re.MULTILINE
)
local_output_suffix = "format"
else:
print("无效的选择:请选择1")
return None
return []
# 调用相应的处理函数
return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, local_output_suffix) or ""
return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, local_output_suffix)
except Exception as e:
print(f"Error in truncate_pdf_main: {e}")
return "" # 返回空字符串
return []
if __name__ == "__main__":
# 定义输入和输出路径
input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles"
input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\包头市公安支队机动车查验监管系统招标文201907.pdf"
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\新建文件夹"
selection = 1 # 1 - 投标文件格式
# 执行截取

View File

@ -431,8 +431,7 @@ def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header):
else:
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix,
)
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
except Exception as e:
print(f"Error in extract_pages_twice: {e}")
return ""