123 lines
5.7 KiB
Python
123 lines
5.7 KiB
Python
import json
|
||
import os
|
||
import fitz
|
||
import PyPDF2
|
||
import tempfile
|
||
from flask_app.general.读取文件.clean_pdf import extract_common_header, clean_page_content
|
||
from flask_app.general.llm.doubao import doubao_model
|
||
from flask_app.old_version.table_ocr_old import CommonOcr
|
||
|
||
def extract_img_table_text(ocr_result_pages):
|
||
# 调用豆包对json形式的表格数据进行重构
|
||
# print(ocr_result_pages)
|
||
base_prompt = '''
|
||
任务:你负责解析以json形式输入的表格信息,根据提供的文件内容来恢复表格信息并输出,不要遗漏任何一个文字。
|
||
|
||
要求与指南:
|
||
1. 请运用文档表格理解能力,根据文件内容定位表格的每列和每行,列与列之间用丨分隔,若某列或者某行没有信息则用/填充。
|
||
2. 请不要遗漏任何一个文字,同时不要打乱行与行之间的顺序,也不要打乱列与列之间的顺序,严格按照文字的位置信息来恢复表格信息。
|
||
示例输出:
|
||
表格标题:
|
||
|序号|名称|数量|单位|单价(元)|总价(元)|技术参数|备注|
|
||
|形象展示区|
|
||
|1|公园主E题雕塑|1|套|/|/|根据江夏体育与文化元素定制|/|
|
||
|
||
..........
|
||
'''
|
||
base_prompt += f"\n\n文件内容:\n{json.dumps(ocr_result_pages, ensure_ascii=False, indent=4)}"
|
||
model_res = doubao_model(base_prompt)
|
||
return model_res
|
||
|
||
# 判断pdf中是否有图片, 并输出含有图片的页面列表
|
||
def has_images(pdf_path):
|
||
# 打开PDF文件
|
||
pdf_document = fitz.open(pdf_path)
|
||
# 存储包含图片的页面页数
|
||
pages_with_imgs = {}
|
||
# 遍历PDF的每一页
|
||
for page_num in range(pdf_document.page_count):
|
||
page = pdf_document.load_page(page_num)
|
||
# 获取页面的图片列表
|
||
images = page.get_images(full=True)
|
||
# 如果页面中有图片,返回True
|
||
if images:
|
||
pages_with_imgs[page_num + 1] = images
|
||
# 如果遍历了所有页面,都没有图片,则返回False
|
||
return pages_with_imgs
|
||
|
||
# 调用通用表格识别对图片中的表格进行提取,放回json形式的表格结构
|
||
def table_ocr_extract(image_path):
|
||
table_ocr = CommonOcr(img_path=image_path) # 创建时传递 img_path
|
||
return table_ocr.recognize()
|
||
|
||
# 提取pdf中某一页的所有图片
|
||
def extract_images_from_page(pdf_path, image_list, page_num):
|
||
images = []
|
||
try:
|
||
doc = fitz.open(pdf_path)
|
||
|
||
for img in image_list:
|
||
xref = img[0]
|
||
base_image = doc.extract_image(xref)
|
||
image_bytes = base_image['image']
|
||
image_ext = base_image.get('ext', 'png')
|
||
images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1})
|
||
except Exception as e:
|
||
print(f"提取图片时出错: {e}")
|
||
return images
|
||
def pdf_image2txt(file_path, img_pdf_list):
|
||
common_header = extract_common_header(file_path)
|
||
# print(f"公共抬头:{common_header}")
|
||
# print("--------------------正文开始-------------------")
|
||
result = ""
|
||
pdf_document = fitz.open(file_path)
|
||
with open(file_path, 'rb') as file:
|
||
reader = PyPDF2.PdfReader(file)
|
||
num_pages = len(reader.pages)
|
||
# print(f"Total pages: {num_pages}")
|
||
for page_num in range(num_pages):
|
||
page = reader.pages[page_num]
|
||
# text = page.extract_text()
|
||
text = page.extract_text() or ""
|
||
cleaned_text = clean_page_content(text, common_header)
|
||
# # print(f"--------第{page_num}页-----------")
|
||
if (page_num + 1) in img_pdf_list:
|
||
print(f"第 {page_num + 1} 页含有图片,开始提取图片并OCR")
|
||
images = extract_images_from_page(file_path, img_pdf_list[page_num + 1], page_num)
|
||
for img in images:
|
||
try:
|
||
with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image:
|
||
temp_image.write(img['data'])
|
||
temp_image.flush()
|
||
# 调用OCR函数
|
||
ocr_result = table_ocr_extract(temp_image.name)
|
||
ocr_result = json.loads(ocr_result)
|
||
# 判断是否提取成功并且 pages 中有数据
|
||
if ocr_result['code'] == 200 and len(ocr_result['result']['pages']) > 0:
|
||
print("提取成功,图片数据已提取。")
|
||
ocr_result_pages = ocr_result['result']['pages']
|
||
table_text = extract_img_table_text(ocr_result_pages)
|
||
if table_text.strip():
|
||
cleaned_text += "\n" + table_text
|
||
else:
|
||
print("提取失败或没有页面数据。")
|
||
except Exception as e:
|
||
print(f"OCR处理失败: {e}")
|
||
finally:
|
||
try:
|
||
os.remove(temp_image.name)
|
||
except Exception as e:
|
||
print(f"删除临时文件失败: {e}")
|
||
result += cleaned_text
|
||
|
||
directory = os.path.dirname(os.path.abspath(file_path))
|
||
output_path = os.path.join(directory, 'extract.txt')
|
||
# 将结果保存到 extract.txt 文件中
|
||
try:
|
||
with open(output_path, 'w', encoding='utf-8') as output_file:
|
||
output_file.write(result)
|
||
print(f"提取内容已保存到: {output_path}")
|
||
except IOError as e:
|
||
print(f"写入文件时发生错误: {e}")
|
||
# 返回保存的文件路径
|
||
return output_path |