zbparse/flask_app/general/OCR调用参考.py
2024-12-03 11:50:15 +08:00

595 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# # import re
# # import PyPDF2
# # import tempfile
# # from utils.ocr_engine import OcrEngine
# #
# #
# # # 假设您的OCR函数名为 `ocr_extract`,并接受图片文件路径作为参数
# # # 请将此函数替换为您实际的OCR实现
# #
# # def clean_page_content(text, common_header):
# # # 首先删除抬头公共部分
# # if common_header: # 确保有公共抬头才进行替换
# # for header_line in common_header.split('\n'):
# # if header_line.strip(): # 只处理非空行
# # # 替换首次出现的完整行
# # text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# #
# # # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
# # text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
# # text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
# # text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
# # text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
# # return text
# #
# #
# # def extract_common_header(pdf_path):
# # from PyPDF2 import PdfReader
# #
# # def get_headers(pdf_document, start_page, pages_to_read):
# # headers = []
# # for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))):
# # page = pdf_document.pages[i]
# # text = page.extract_text() or ""
# # if text:
# # # 只取每页的前三行,去除前后的空白字符
# # first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
# # headers.append(first_lines)
# # return headers
# #
# # def find_common_headers(headers):
# # if not headers:
# # return []
# #
# # # 使用 zip 对齐所有页的对应行
# # common_headers = []
# # for lines in zip(*headers):
# # # 检查所有行是否完全相同
# # if all(line == lines[0] for line in lines[1:]):
# # common_headers.append(lines[0])
# # return common_headers
# #
# # pdf_document = PdfReader(pdf_path)
# # total_pages = len(pdf_document.pages)
# #
# # # 定义两个提取策略
# # strategies = []
# # if total_pages >= 3:
# # # 策略1中间的3页
# # middle_page = total_pages // 2
# # start_page = max(0, middle_page - 1)
# # strategies.append((start_page, 3))
# # elif total_pages == 2:
# # # 策略12页
# # strategies.append((0, 2))
# # else:
# # # 策略11页
# # strategies.append((0, 1))
# #
# # # 策略2前三页
# # if total_pages >= 3:
# # strategies.append((0, 3))
# # elif total_pages == 2:
# # strategies.append((0, 2))
# # elif total_pages == 1:
# # strategies.append((0, 1))
# #
# # common_headers = []
# #
# # for idx, (start, count) in enumerate(strategies):
# # headers = get_headers(pdf_document, start, count)
# # if len(headers) < 2:
# # continue # 需要至少2页来比较
# #
# # current_common = find_common_headers(headers)
# # if current_common:
# # common_headers = current_common
# # break # 找到共同部分后退出
# # # 如果没有找到,继续下一个策略
# #
# # return '\n'.join(common_headers)
# #
# #
# # def extract_images_from_page(reader, page):
# # images = []
# # try:
# # for img in page.images:
# # xref = img['xref']
# # image = reader.extract_image(xref)
# # image_bytes = image['image']
# # image_ext = image['ext']
# # images.append({'data': image_bytes, 'ext': image_ext})
# # except Exception as e:
# # print(f"提取第{reader.pages.index(page) + 1}页图片时出错: {e}")
# # return images
# #
# #
# # def extract_text_by_page(file_path):
# # common_header = extract_common_header(file_path)
# # # print(f"公共抬头:{common_header}")
# # # print("--------------------正文开始-------------------")
# # result = ""
# # with open(file_path, 'rb') as file:
# # reader = PyPDF2.PdfReader(file)
# # num_pages = len(reader.pages)
# # # print(f"Total pages: {num_pages}")
# # for page_num in range(num_pages):
# # page = reader.pages[page_num]
# # text = page.extract_text() or ""
# #
# # # 提取图片并进行OCR
# # images = extract_images_from_page(reader, page)
# # ocr_text = ""
# # for image in images:
# # image_data = image['data']
# # image_ext = image['ext']
# # try:
# # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + image_ext) as temp_image:
# # temp_image.write(image_data)
# # temp_image.flush()
# # # 调用OCR函数
# # ocr_result = OcrEngine.recognize_text_from_image(temp_image.name)
# # ocr_text += ocr_result + "\n"
# # except Exception as e:
# # print(f"处理第{page_num + 1}页图片时出错: {e}")
# #
# # # 清理文本
# # cleaned_text = clean_page_content(text, common_header)
# # # 合并OCR文本
# # if ocr_text.strip():
# # cleaned_text += "\n" + ocr_text
# # result += cleaned_text
# # return result
# #
# #
# # def extract_text_json_by_page(file_path):
# # common_header = extract_common_header(file_path)
# # # print(f"公共抬头:{common_header}")
# # # print("--------------------正文开始-------------------")
# # result = {}
# # with open(file_path, 'rb') as file:
# # reader = PyPDF2.PdfReader(file)
# # num_pages = len(reader.pages)
# # # print(f"Total pages: {num_pages}")
# # for page_num in range(num_pages):
# # page = reader.pages[page_num]
# # text = page.extract_text() or ""
# #
# # # 提取图片并进行OCR
# # images = extract_images_from_page(reader, page)
# # ocr_text = ""
# # for image in images:
# # image_data = image['data']
# # image_ext = image['ext']
# # try:
# # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + image_ext) as temp_image:
# # temp_image.write(image_data)
# # temp_image.flush()
# # # 调用OCR函数
# # ocr_result = OcrEngine.recognize_text_from_image(temp_image.name)
# # ocr_text += ocr_result + "\n"
# # except Exception as e:
# # print(f"处理第{page_num + 1}页图片时出错: {e}")
# #
# # # 清理文本
# # cleaned_text = clean_page_content(text, common_header)
# # # 合并OCR文本
# # if ocr_text.strip():
# # cleaned_text += "\n" + ocr_text
# # result[str(page_num + 1)] = cleaned_text
# # return result
# #
# #
# # if __name__ == '__main__':
# # pdf_path = "C:/test/iDS-TCE900神捕电警抓拍单元产品介绍.pdf"
# # res = extract_text_json_by_page(pdf_path)
# # print(res)
# import json
# import os
# import re
# import shutil
# import uuid
#
# import PyPDF2
# import fitz # PyMuPDF
# import tempfile
# from utils.ocr_engine import OcrEngine
#
#
# # 假设您的OCR函数名为 `ocr_extract`,并接受图片文件路径作为参数
# def ocr_extract(image_path):
# # 示例调用您的OCR脚本并返回识别的文本
# # 例如:
# # return your_ocr_function(image_path)
# # 将图片保存到tmp目录
# # shutil.copy(image_path, f'tmp/{uuid.uuid4()}.png')
# # return "OCR提取的文本" # 替换为实际的OCR结果
# return OcrEngine.recognize_text_from_image(image_path)
#
#
# def clean_page_content(text, common_header):
# # 首先删除抬头公共部分
# if common_header: # 确保有公共抬头才进行替换
# for header_line in common_header.split('\n'):
# if header_line.strip(): # 只处理非空行
# # 替换首次出现的完整行
# text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
#
# # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
# text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
# text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
# text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
# text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
# return text
#
#
# def extract_common_header(pdf_path):
# from PyPDF2 import PdfReader
#
# def get_headers(pdf_document, start_page, pages_to_read):
# headers = []
# for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))):
# page = pdf_document.pages[i]
# text = page.extract_text() or ""
# if text:
# # 只取每页的前三行,去除前后的空白字符
# first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
# headers.append(first_lines)
# return headers
#
# def find_common_headers(headers):
# if not headers:
# return []
#
# # 使用 zip 对齐所有页的对应行
# common_headers = []
# for lines in zip(*headers):
# # 检查所有行是否完全相同
# if all(line == lines[0] for line in lines[1:]):
# common_headers.append(lines[0])
# return common_headers
#
# pdf_document = PdfReader(pdf_path)
# total_pages = len(pdf_document.pages)
#
# # 定义两个提取策略
# strategies = []
# if total_pages >= 3:
# # 策略1中间的3页
# middle_page = total_pages // 2
# start_page = max(0, middle_page - 1)
# strategies.append((start_page, 3))
# elif total_pages == 2:
# # 策略12页
# strategies.append((0, 2))
# else:
# # 策略11页
# strategies.append((0, 1))
#
# # 策略2前三页
# if total_pages >= 3:
# strategies.append((0, 3))
# elif total_pages == 2:
# strategies.append((0, 2))
# elif total_pages == 1:
# strategies.append((0, 1))
#
# common_headers = []
#
# for idx, (start, count) in enumerate(strategies):
# headers = get_headers(pdf_document, start, count)
# if len(headers) < 2:
# continue # 需要至少2页来比较
#
# current_common = find_common_headers(headers)
# if current_common:
# common_headers = current_common
# break # 找到共同部分后退出
# # 如果没有找到,继续下一个策略
#
# return '\n'.join(common_headers)
#
#
# def extract_images_with_pymupdf(pdf_path):
# images = []
# try:
# doc = fitz.open(pdf_path)
# for page_num in range(len(doc)):
# page = doc.load_page(page_num)
# image_list = page.get_images(full=True)
# for img in image_list:
# xref = img[0]
# base_image = doc.extract_image(xref)
# image_bytes = base_image['image']
# image_ext = base_image.get('ext', 'png')
# # image_width = base_image.get['xres']
# # image_height = base_image.get['yres']
# # images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1, 'width': image_width, 'height': image_height}).
# images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1})
#
# # 确保输出目录存在
# if not os.path.exists("output_dir"):
# os.makedirs("output_dir")
# print(f"创建输出目录: output_dir")
# # 构建图片文件名,例如: page_1_img.png
# image_filename = f"page_{page_num}_img.{image_ext}"
# image_path = os.path.join("output_dir", image_filename)
# try:
# with open(image_path, 'wb') as img_file:
# img_file.write(image_bytes)
# print(f"保存图片: {image_path}")
# except Exception as e:
# print(f"保存图片 {image_filename} 时出错: {e}")
#
# except Exception as e:
# print(f"提取图片时出错: {e}")
# return images
#
#
# def extract_text_json_by_page(file_path):
# common_header = extract_common_header(file_path)
# # print(f"公共抬头:{common_header}")
# # print("--------------------正文开始-------------------")
# result = {}
# # 如果已经有保存的ocr结果,直接读取
# # TODO 待修改
# if os.path.exists(file_path + ".json"):
# with open(file_path + ".json", 'r', encoding='utf-8') as f:
# result = json.load(f)
# return result
# images = extract_images_with_pymupdf(file_path)
# # filtered_images = filter_images(images)
# ocr_text_dict = {}
# for img in images:
# try:
# with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image:
# temp_image.write(img['data'])
# temp_image.flush()
# # 调用OCR函数
# ocr_result = ocr_extract(temp_image.name)
# if img['page_num'] in ocr_text_dict:
# ocr_text_dict[img['page_num']] += ocr_result + "\n"
# else:
# ocr_text_dict[img['page_num']] = ocr_result + "\n"
# except Exception as e:
# print(f"OCR处理失败: {e}")
# finally:
# try:
# os.remove(temp_image.name)
# except Exception as e:
# print(f"删除临时文件失败: {e}")
#
# with open(file_path, 'rb') as file:
# reader = PyPDF2.PdfReader(file)
# num_pages = len(reader.pages)
# # print(f"Total pages: {num_pages}")
# for page_num in range(num_pages):
# page = reader.pages[page_num]
# text = page.extract_text() or ""
# # 清理文本
# cleaned_text = clean_page_content(text, common_header)
# # 合并OCR文本
# if (page_num + 1) in ocr_text_dict and ocr_text_dict[page_num + 1].strip():
# cleaned_text += "\n" + ocr_text_dict[page_num + 1]
# result[str(page_num + 1)] = cleaned_text
# print("pdf预处理完成")
# return result
#
#
# def filter_images(images, min_width=200, min_height=200, text_threshold=5):
# """
# 过滤图像,保留可能包含文本的图像。
# :param images: 图像列表,每个图像包含 data, ext, page_num, width, height
# :param min_width: 最小宽度
# :param min_height: 最小高度
# :param text_threshold: 检测到的文本字符数阈值
# :return: 过滤后的图像列表
# """
# filtered = []
# for img in images:
# # 基于尺寸过滤
# if img['width'] < min_width or img['height'] < min_height:
# continue
# # 基于文本检测过滤
# try:
# # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + img['ext']) as temp_image:
# # temp_image.write(img['data'])
# # temp_image.flush()
# # # 使用OCR快速检测文本
# # ocr_result = ocr_extract(temp_image.name)
# # if len(ocr_result.strip()) >= text_threshold:
# # img['ocr_text'] = ocr_result
# # filtered.append(img)
# filtered.append(img)
# except Exception as e:
# print(f"过滤图像时出错: {e}")
#
# return filtered
#
#
# if __name__ == '__main__':
# pdf_path = "C:/test/发言系统11.pdf"
# res = extract_text_json_by_page(pdf_path)
# print(res)
import os
import re
import json
import tempfile
import shutil
import uuid
from PyPDF2 import PdfReader
import fitz # PyMuPDF
from utils.ocr_engine import OcrEngine # 请确保OcrEngine已经正确导入
from utils.local_ocr import LocalOCR
local_ocr = LocalOCR()
def ocr_extract(image_path):
# 调用您的OCR引擎来识别图像中的文本
# return OcrEngine.recognize_text_from_image(image_path)
# 调用本地ocr
return local_ocr.run(image_path)
def clean_page_content(text, common_header):
# 删除公共抬头
if common_header:
for header_line in common_header.split('\n'):
if header_line.strip():
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 开头页码
text = re.sub(r'\s+\d+\s*$', '', text) # 结尾页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 形如 /129
text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 形如 '—2—' 或 '-2-'
return text
def extract_common_header(pdf_path):
def get_headers(pdf_document, start_page, pages_to_read):
headers = []
for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))):
page = pdf_document.pages[i]
text = page.extract_text() or ""
if text:
first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
headers.append(first_lines)
return headers
def find_common_headers(headers):
if not headers:
return []
common_headers = []
for lines in zip(*headers):
if all(line == lines[0] for line in lines[1:]):
common_headers.append(lines[0])
return common_headers
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
strategies = []
if total_pages >= 3:
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
strategies.append((start_page, 3))
elif total_pages == 2:
strategies.append((0, 2))
else:
strategies.append((0, 1))
if total_pages >= 3:
strategies.append((0, 3))
elif total_pages == 2:
strategies.append((0, 2))
elif total_pages == 1:
strategies.append((0, 1))
common_headers = []
for start, count in strategies:
headers = get_headers(pdf_document, start, count)
if len(headers) < 2:
continue
current_common = find_common_headers(headers)
if current_common:
common_headers = current_common
break
return '\n'.join(common_headers)
def extract_images_from_page(pdf_path, page_num):
images = []
try:
doc = fitz.open(pdf_path)
if page_num < 0 or page_num >= len(doc):
print(f"页码 {page_num + 1} 超出范围")
return images
page = doc.load_page(page_num)
image_list = page.get_images(full=True)
for img in image_list:
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image['image']
image_ext = base_image.get('ext', 'png')
images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1})
except Exception as e:
print(f"提取图片时出错: {e}")
return images
def extract_text_json_by_page(file_path, text_threshold=10):
"""
提取PDF每页的文本若文本量低于text_threshold则提取图片并OCR。
:param file_path: PDF文件路径
:param text_threshold: 文本量阈值(字符数)
:return: 字典,键为页码,值为对应的文本
"""
common_header = extract_common_header(file_path)
result = {}
# 如果已经有保存的OCR结果,直接读取
if os.path.exists(file_path + ".json"):
with open(file_path + ".json", 'r', encoding='utf-8') as f:
result = json.load(f)
return result
try:
reader = PdfReader(file_path)
num_pages = len(reader.pages)
for page_num in range(num_pages):
page = reader.pages[page_num]
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
# 检查文本量是否低于阈值
if len(cleaned_text.strip()) < text_threshold:
print(f"{page_num + 1} 页文本量低开始提取图片并OCR")
images = extract_images_from_page(file_path, page_num)
for img in images:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image:
temp_image.write(img['data'])
temp_image.flush()
# 调用OCR函数
ocr_result = ocr_extract(temp_image.name)
if ocr_result.strip():
cleaned_text += "\n" + ocr_result
except Exception as e:
print(f"OCR处理失败: {e}")
finally:
try:
os.remove(temp_image.name)
except Exception as e:
print(f"删除临时文件失败: {e}")
result[str(page_num + 1)] = cleaned_text
except Exception as e:
print(f"处理PDF时出错: {e}")
# 保存结果到JSON文件
try:
with open(file_path + ".json", 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=4)
except Exception as e:
print(f"保存结果到JSON文件时出错: {e}")
print("PDF预处理完成")
return result
if __name__ == '__main__':
pdf_path = "C:/test/所投主要产品检测报告.pdf"
res = extract_text_json_by_page(pdf_path)
print(json.dumps(res, ensure_ascii=False, indent=4))