123 lines
5.6 KiB
Python
123 lines
5.6 KiB
Python
|
import json
|
|||
|
import os
|
|||
|
import fitz
|
|||
|
import PyPDF2
|
|||
|
import tempfile
|
|||
|
from flask_app.general.clean_pdf import extract_common_header, clean_page_content
|
|||
|
from flask_app.general.llm.doubao import doubao_model
|
|||
|
from flask_app.old_version.table_ocr_old import CommonOcr
|
|||
|
|
|||
|
def extract_img_table_text(ocr_result_pages):
|
|||
|
# 调用豆包对json形式的表格数据进行重构
|
|||
|
# print(ocr_result_pages)
|
|||
|
base_prompt = '''
|
|||
|
任务:你负责解析以json形式输入的表格信息,根据提供的文件内容来恢复表格信息并输出,不要遗漏任何一个文字。
|
|||
|
|
|||
|
要求与指南:
|
|||
|
1. 请运用文档表格理解能力,根据文件内容定位表格的每列和每行,列与列之间用丨分隔,若某列或者某行没有信息则用/填充。
|
|||
|
2. 请不要遗漏任何一个文字,同时不要打乱行与行之间的顺序,也不要打乱列与列之间的顺序,严格按照文字的位置信息来恢复表格信息。
|
|||
|
示例输出:
|
|||
|
表格标题:
|
|||
|
|序号|名称|数量|单位|单价(元)|总价(元)|技术参数|备注|
|
|||
|
|形象展示区|
|
|||
|
|1|公园主E题雕塑|1|套|/|/|根据江夏体育与文化元素定制|/|
|
|||
|
|
|||
|
..........
|
|||
|
'''
|
|||
|
base_prompt += f"\n\n文件内容:\n{json.dumps(ocr_result_pages, ensure_ascii=False, indent=4)}"
|
|||
|
model_res = doubao_model(base_prompt)
|
|||
|
return model_res
|
|||
|
|
|||
|
# 判断pdf中是否有图片, 并输出含有图片的页面列表
|
|||
|
def has_images(pdf_path):
|
|||
|
# 打开PDF文件
|
|||
|
pdf_document = fitz.open(pdf_path)
|
|||
|
# 存储包含图片的页面页数
|
|||
|
pages_with_imgs = {}
|
|||
|
# 遍历PDF的每一页
|
|||
|
for page_num in range(pdf_document.page_count):
|
|||
|
page = pdf_document.load_page(page_num)
|
|||
|
# 获取页面的图片列表
|
|||
|
images = page.get_images(full=True)
|
|||
|
# 如果页面中有图片,返回True
|
|||
|
if images:
|
|||
|
pages_with_imgs[page_num + 1] = images
|
|||
|
# 如果遍历了所有页面,都没有图片,则返回False
|
|||
|
return pages_with_imgs
|
|||
|
|
|||
|
# 调用通用表格识别对图片中的表格进行提取,放回json形式的表格结构
|
|||
|
def table_ocr_extract(image_path):
|
|||
|
table_ocr = CommonOcr(img_path=image_path) # 创建时传递 img_path
|
|||
|
return table_ocr.recognize()
|
|||
|
|
|||
|
# 提取pdf中某一页的所有图片
|
|||
|
def extract_images_from_page(pdf_path, image_list, page_num):
|
|||
|
images = []
|
|||
|
try:
|
|||
|
doc = fitz.open(pdf_path)
|
|||
|
|
|||
|
for img in image_list:
|
|||
|
xref = img[0]
|
|||
|
base_image = doc.extract_image(xref)
|
|||
|
image_bytes = base_image['image']
|
|||
|
image_ext = base_image.get('ext', 'png')
|
|||
|
images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1})
|
|||
|
except Exception as e:
|
|||
|
print(f"提取图片时出错: {e}")
|
|||
|
return images
|
|||
|
def pdf_image2txt(file_path, img_pdf_list):
|
|||
|
common_header = extract_common_header(file_path)
|
|||
|
# print(f"公共抬头:{common_header}")
|
|||
|
# print("--------------------正文开始-------------------")
|
|||
|
result = ""
|
|||
|
pdf_document = fitz.open(file_path)
|
|||
|
with open(file_path, 'rb') as file:
|
|||
|
reader = PyPDF2.PdfReader(file)
|
|||
|
num_pages = len(reader.pages)
|
|||
|
# print(f"Total pages: {num_pages}")
|
|||
|
for page_num in range(num_pages):
|
|||
|
page = reader.pages[page_num]
|
|||
|
# text = page.extract_text()
|
|||
|
text = page.extract_text() or ""
|
|||
|
cleaned_text = clean_page_content(text, common_header)
|
|||
|
# # print(f"--------第{page_num}页-----------")
|
|||
|
if (page_num + 1) in img_pdf_list:
|
|||
|
print(f"第 {page_num + 1} 页含有图片,开始提取图片并OCR")
|
|||
|
images = extract_images_from_page(file_path, img_pdf_list[page_num + 1], page_num)
|
|||
|
for img in images:
|
|||
|
try:
|
|||
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image:
|
|||
|
temp_image.write(img['data'])
|
|||
|
temp_image.flush()
|
|||
|
# 调用OCR函数
|
|||
|
ocr_result = table_ocr_extract(temp_image.name)
|
|||
|
ocr_result = json.loads(ocr_result)
|
|||
|
# 判断是否提取成功并且 pages 中有数据
|
|||
|
if ocr_result['code'] == 200 and len(ocr_result['result']['pages']) > 0:
|
|||
|
print("提取成功,图片数据已提取。")
|
|||
|
ocr_result_pages = ocr_result['result']['pages']
|
|||
|
table_text = extract_img_table_text(ocr_result_pages)
|
|||
|
if table_text.strip():
|
|||
|
cleaned_text += "\n" + table_text
|
|||
|
else:
|
|||
|
print("提取失败或没有页面数据。")
|
|||
|
except Exception as e:
|
|||
|
print(f"OCR处理失败: {e}")
|
|||
|
finally:
|
|||
|
try:
|
|||
|
os.remove(temp_image.name)
|
|||
|
except Exception as e:
|
|||
|
print(f"删除临时文件失败: {e}")
|
|||
|
result += cleaned_text
|
|||
|
|
|||
|
directory = os.path.dirname(os.path.abspath(file_path))
|
|||
|
output_path = os.path.join(directory, 'extract.txt')
|
|||
|
# 将结果保存到 extract.txt 文件中
|
|||
|
try:
|
|||
|
with open(output_path, 'w', encoding='utf-8') as output_file:
|
|||
|
output_file.write(result)
|
|||
|
print(f"提取内容已保存到: {output_path}")
|
|||
|
except IOError as e:
|
|||
|
print(f"写入文件时发生错误: {e}")
|
|||
|
# 返回保存的文件路径
|
|||
|
return output_path
|