zbparse/flask_app/general/OCR调用参考.py

595 lines
22 KiB
Python
Raw Normal View History

2024-12-03 11:50:15 +08:00
# # import re
# # import PyPDF2
# # import tempfile
# # from utils.ocr_engine import OcrEngine
# #
# #
# # # 假设您的OCR函数名为 `ocr_extract`,并接受图片文件路径作为参数
# # # 请将此函数替换为您实际的OCR实现
# #
# # def clean_page_content(text, common_header):
# # # 首先删除抬头公共部分
# # if common_header: # 确保有公共抬头才进行替换
# # for header_line in common_header.split('\n'):
# # if header_line.strip(): # 只处理非空行
# # # 替换首次出现的完整行
# # text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# #
# # # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
# # text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
# # text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
# # text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
# # text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
# # return text
# #
# #
# # def extract_common_header(pdf_path):
# # from PyPDF2 import PdfReader
# #
# # def get_headers(pdf_document, start_page, pages_to_read):
# # headers = []
# # for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))):
# # page = pdf_document.pages[i]
# # text = page.extract_text() or ""
# # if text:
# # # 只取每页的前三行,去除前后的空白字符
# # first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
# # headers.append(first_lines)
# # return headers
# #
# # def find_common_headers(headers):
# # if not headers:
# # return []
# #
# # # 使用 zip 对齐所有页的对应行
# # common_headers = []
# # for lines in zip(*headers):
# # # 检查所有行是否完全相同
# # if all(line == lines[0] for line in lines[1:]):
# # common_headers.append(lines[0])
# # return common_headers
# #
# # pdf_document = PdfReader(pdf_path)
# # total_pages = len(pdf_document.pages)
# #
# # # 定义两个提取策略
# # strategies = []
# # if total_pages >= 3:
# # # 策略1中间的3页
# # middle_page = total_pages // 2
# # start_page = max(0, middle_page - 1)
# # strategies.append((start_page, 3))
# # elif total_pages == 2:
# # # 策略12页
# # strategies.append((0, 2))
# # else:
# # # 策略11页
# # strategies.append((0, 1))
# #
# # # 策略2前三页
# # if total_pages >= 3:
# # strategies.append((0, 3))
# # elif total_pages == 2:
# # strategies.append((0, 2))
# # elif total_pages == 1:
# # strategies.append((0, 1))
# #
# # common_headers = []
# #
# # for idx, (start, count) in enumerate(strategies):
# # headers = get_headers(pdf_document, start, count)
# # if len(headers) < 2:
# # continue # 需要至少2页来比较
# #
# # current_common = find_common_headers(headers)
# # if current_common:
# # common_headers = current_common
# # break # 找到共同部分后退出
# # # 如果没有找到,继续下一个策略
# #
# # return '\n'.join(common_headers)
# #
# #
# # def extract_images_from_page(reader, page):
# # images = []
# # try:
# # for img in page.images:
# # xref = img['xref']
# # image = reader.extract_image(xref)
# # image_bytes = image['image']
# # image_ext = image['ext']
# # images.append({'data': image_bytes, 'ext': image_ext})
# # except Exception as e:
# # print(f"提取第{reader.pages.index(page) + 1}页图片时出错: {e}")
# # return images
# #
# #
# # def extract_text_by_page(file_path):
# # common_header = extract_common_header(file_path)
# # # print(f"公共抬头:{common_header}")
# # # print("--------------------正文开始-------------------")
# # result = ""
# # with open(file_path, 'rb') as file:
# # reader = PyPDF2.PdfReader(file)
# # num_pages = len(reader.pages)
# # # print(f"Total pages: {num_pages}")
# # for page_num in range(num_pages):
# # page = reader.pages[page_num]
# # text = page.extract_text() or ""
# #
# # # 提取图片并进行OCR
# # images = extract_images_from_page(reader, page)
# # ocr_text = ""
# # for image in images:
# # image_data = image['data']
# # image_ext = image['ext']
# # try:
# # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + image_ext) as temp_image:
# # temp_image.write(image_data)
# # temp_image.flush()
# # # 调用OCR函数
# # ocr_result = OcrEngine.recognize_text_from_image(temp_image.name)
# # ocr_text += ocr_result + "\n"
# # except Exception as e:
# # print(f"处理第{page_num + 1}页图片时出错: {e}")
# #
# # # 清理文本
# # cleaned_text = clean_page_content(text, common_header)
# # # 合并OCR文本
# # if ocr_text.strip():
# # cleaned_text += "\n" + ocr_text
# # result += cleaned_text
# # return result
# #
# #
# # def extract_text_json_by_page(file_path):
# # common_header = extract_common_header(file_path)
# # # print(f"公共抬头:{common_header}")
# # # print("--------------------正文开始-------------------")
# # result = {}
# # with open(file_path, 'rb') as file:
# # reader = PyPDF2.PdfReader(file)
# # num_pages = len(reader.pages)
# # # print(f"Total pages: {num_pages}")
# # for page_num in range(num_pages):
# # page = reader.pages[page_num]
# # text = page.extract_text() or ""
# #
# # # 提取图片并进行OCR
# # images = extract_images_from_page(reader, page)
# # ocr_text = ""
# # for image in images:
# # image_data = image['data']
# # image_ext = image['ext']
# # try:
# # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + image_ext) as temp_image:
# # temp_image.write(image_data)
# # temp_image.flush()
# # # 调用OCR函数
# # ocr_result = OcrEngine.recognize_text_from_image(temp_image.name)
# # ocr_text += ocr_result + "\n"
# # except Exception as e:
# # print(f"处理第{page_num + 1}页图片时出错: {e}")
# #
# # # 清理文本
# # cleaned_text = clean_page_content(text, common_header)
# # # 合并OCR文本
# # if ocr_text.strip():
# # cleaned_text += "\n" + ocr_text
# # result[str(page_num + 1)] = cleaned_text
# # return result
# #
# #
# # if __name__ == '__main__':
# # pdf_path = "C:/test/iDS-TCE900神捕电警抓拍单元产品介绍.pdf"
# # res = extract_text_json_by_page(pdf_path)
# # print(res)
# import json
# import os
# import re
# import shutil
# import uuid
#
# import PyPDF2
# import fitz # PyMuPDF
# import tempfile
# from utils.ocr_engine import OcrEngine
#
#
# # 假设您的OCR函数名为 `ocr_extract`,并接受图片文件路径作为参数
# def ocr_extract(image_path):
# # 示例调用您的OCR脚本并返回识别的文本
# # 例如:
# # return your_ocr_function(image_path)
# # 将图片保存到tmp目录
# # shutil.copy(image_path, f'tmp/{uuid.uuid4()}.png')
# # return "OCR提取的文本" # 替换为实际的OCR结果
# return OcrEngine.recognize_text_from_image(image_path)
#
#
# def clean_page_content(text, common_header):
# # 首先删除抬头公共部分
# if common_header: # 确保有公共抬头才进行替换
# for header_line in common_header.split('\n'):
# if header_line.strip(): # 只处理非空行
# # 替换首次出现的完整行
# text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
#
# # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
# text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
# text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
# text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
# text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
# return text
#
#
# def extract_common_header(pdf_path):
# from PyPDF2 import PdfReader
#
# def get_headers(pdf_document, start_page, pages_to_read):
# headers = []
# for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))):
# page = pdf_document.pages[i]
# text = page.extract_text() or ""
# if text:
# # 只取每页的前三行,去除前后的空白字符
# first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
# headers.append(first_lines)
# return headers
#
# def find_common_headers(headers):
# if not headers:
# return []
#
# # 使用 zip 对齐所有页的对应行
# common_headers = []
# for lines in zip(*headers):
# # 检查所有行是否完全相同
# if all(line == lines[0] for line in lines[1:]):
# common_headers.append(lines[0])
# return common_headers
#
# pdf_document = PdfReader(pdf_path)
# total_pages = len(pdf_document.pages)
#
# # 定义两个提取策略
# strategies = []
# if total_pages >= 3:
# # 策略1中间的3页
# middle_page = total_pages // 2
# start_page = max(0, middle_page - 1)
# strategies.append((start_page, 3))
# elif total_pages == 2:
# # 策略12页
# strategies.append((0, 2))
# else:
# # 策略11页
# strategies.append((0, 1))
#
# # 策略2前三页
# if total_pages >= 3:
# strategies.append((0, 3))
# elif total_pages == 2:
# strategies.append((0, 2))
# elif total_pages == 1:
# strategies.append((0, 1))
#
# common_headers = []
#
# for idx, (start, count) in enumerate(strategies):
# headers = get_headers(pdf_document, start, count)
# if len(headers) < 2:
# continue # 需要至少2页来比较
#
# current_common = find_common_headers(headers)
# if current_common:
# common_headers = current_common
# break # 找到共同部分后退出
# # 如果没有找到,继续下一个策略
#
# return '\n'.join(common_headers)
#
#
# def extract_images_with_pymupdf(pdf_path):
# images = []
# try:
# doc = fitz.open(pdf_path)
# for page_num in range(len(doc)):
# page = doc.load_page(page_num)
# image_list = page.get_images(full=True)
# for img in image_list:
# xref = img[0]
# base_image = doc.extract_image(xref)
# image_bytes = base_image['image']
# image_ext = base_image.get('ext', 'png')
# # image_width = base_image.get['xres']
# # image_height = base_image.get['yres']
# # images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1, 'width': image_width, 'height': image_height}).
# images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1})
#
# # 确保输出目录存在
# if not os.path.exists("output_dir"):
# os.makedirs("output_dir")
# print(f"创建输出目录: output_dir")
# # 构建图片文件名,例如: page_1_img.png
# image_filename = f"page_{page_num}_img.{image_ext}"
# image_path = os.path.join("output_dir", image_filename)
# try:
# with open(image_path, 'wb') as img_file:
# img_file.write(image_bytes)
# print(f"保存图片: {image_path}")
# except Exception as e:
# print(f"保存图片 {image_filename} 时出错: {e}")
#
# except Exception as e:
# print(f"提取图片时出错: {e}")
# return images
#
#
# def extract_text_json_by_page(file_path):
# common_header = extract_common_header(file_path)
# # print(f"公共抬头:{common_header}")
# # print("--------------------正文开始-------------------")
# result = {}
# # 如果已经有保存的ocr结果,直接读取
# # TODO 待修改
# if os.path.exists(file_path + ".json"):
# with open(file_path + ".json", 'r', encoding='utf-8') as f:
# result = json.load(f)
# return result
# images = extract_images_with_pymupdf(file_path)
# # filtered_images = filter_images(images)
# ocr_text_dict = {}
# for img in images:
# try:
# with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image:
# temp_image.write(img['data'])
# temp_image.flush()
# # 调用OCR函数
# ocr_result = ocr_extract(temp_image.name)
# if img['page_num'] in ocr_text_dict:
# ocr_text_dict[img['page_num']] += ocr_result + "\n"
# else:
# ocr_text_dict[img['page_num']] = ocr_result + "\n"
# except Exception as e:
# print(f"OCR处理失败: {e}")
# finally:
# try:
# os.remove(temp_image.name)
# except Exception as e:
# print(f"删除临时文件失败: {e}")
#
# with open(file_path, 'rb') as file:
# reader = PyPDF2.PdfReader(file)
# num_pages = len(reader.pages)
# # print(f"Total pages: {num_pages}")
# for page_num in range(num_pages):
# page = reader.pages[page_num]
# text = page.extract_text() or ""
# # 清理文本
# cleaned_text = clean_page_content(text, common_header)
# # 合并OCR文本
# if (page_num + 1) in ocr_text_dict and ocr_text_dict[page_num + 1].strip():
# cleaned_text += "\n" + ocr_text_dict[page_num + 1]
# result[str(page_num + 1)] = cleaned_text
# print("pdf预处理完成")
# return result
#
#
# def filter_images(images, min_width=200, min_height=200, text_threshold=5):
# """
# 过滤图像,保留可能包含文本的图像。
# :param images: 图像列表,每个图像包含 data, ext, page_num, width, height
# :param min_width: 最小宽度
# :param min_height: 最小高度
# :param text_threshold: 检测到的文本字符数阈值
# :return: 过滤后的图像列表
# """
# filtered = []
# for img in images:
# # 基于尺寸过滤
# if img['width'] < min_width or img['height'] < min_height:
# continue
# # 基于文本检测过滤
# try:
# # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + img['ext']) as temp_image:
# # temp_image.write(img['data'])
# # temp_image.flush()
# # # 使用OCR快速检测文本
# # ocr_result = ocr_extract(temp_image.name)
# # if len(ocr_result.strip()) >= text_threshold:
# # img['ocr_text'] = ocr_result
# # filtered.append(img)
# filtered.append(img)
# except Exception as e:
# print(f"过滤图像时出错: {e}")
#
# return filtered
#
#
# if __name__ == '__main__':
# pdf_path = "C:/test/发言系统11.pdf"
# res = extract_text_json_by_page(pdf_path)
# print(res)
import os
import re
import json
import tempfile
import shutil
import uuid
from PyPDF2 import PdfReader
import fitz # PyMuPDF
from utils.ocr_engine import OcrEngine # 请确保OcrEngine已经正确导入
from utils.local_ocr import LocalOCR
local_ocr = LocalOCR()
def ocr_extract(image_path):
# 调用您的OCR引擎来识别图像中的文本
# return OcrEngine.recognize_text_from_image(image_path)
# 调用本地ocr
return local_ocr.run(image_path)
def clean_page_content(text, common_header):
# 删除公共抬头
if common_header:
for header_line in common_header.split('\n'):
if header_line.strip():
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 开头页码
text = re.sub(r'\s+\d+\s*$', '', text) # 结尾页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 形如 /129
text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 形如 '—2—' 或 '-2-'
return text
def extract_common_header(pdf_path):
def get_headers(pdf_document, start_page, pages_to_read):
headers = []
for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))):
page = pdf_document.pages[i]
text = page.extract_text() or ""
if text:
first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
headers.append(first_lines)
return headers
def find_common_headers(headers):
if not headers:
return []
common_headers = []
for lines in zip(*headers):
if all(line == lines[0] for line in lines[1:]):
common_headers.append(lines[0])
return common_headers
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
strategies = []
if total_pages >= 3:
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
strategies.append((start_page, 3))
elif total_pages == 2:
strategies.append((0, 2))
else:
strategies.append((0, 1))
if total_pages >= 3:
strategies.append((0, 3))
elif total_pages == 2:
strategies.append((0, 2))
elif total_pages == 1:
strategies.append((0, 1))
common_headers = []
for start, count in strategies:
headers = get_headers(pdf_document, start, count)
if len(headers) < 2:
continue
current_common = find_common_headers(headers)
if current_common:
common_headers = current_common
break
return '\n'.join(common_headers)
def extract_images_from_page(pdf_path, page_num):
images = []
try:
doc = fitz.open(pdf_path)
if page_num < 0 or page_num >= len(doc):
print(f"页码 {page_num + 1} 超出范围")
return images
page = doc.load_page(page_num)
image_list = page.get_images(full=True)
for img in image_list:
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image['image']
image_ext = base_image.get('ext', 'png')
images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1})
except Exception as e:
print(f"提取图片时出错: {e}")
return images
def extract_text_json_by_page(file_path, text_threshold=10):
"""
提取PDF每页的文本若文本量低于text_threshold则提取图片并OCR
:param file_path: PDF文件路径
:param text_threshold: 文本量阈值字符数
:return: 字典键为页码值为对应的文本
"""
common_header = extract_common_header(file_path)
result = {}
# 如果已经有保存的OCR结果,直接读取
if os.path.exists(file_path + ".json"):
with open(file_path + ".json", 'r', encoding='utf-8') as f:
result = json.load(f)
return result
try:
reader = PdfReader(file_path)
num_pages = len(reader.pages)
for page_num in range(num_pages):
page = reader.pages[page_num]
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
# 检查文本量是否低于阈值
if len(cleaned_text.strip()) < text_threshold:
print(f"{page_num + 1} 页文本量低开始提取图片并OCR")
images = extract_images_from_page(file_path, page_num)
for img in images:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image:
temp_image.write(img['data'])
temp_image.flush()
# 调用OCR函数
ocr_result = ocr_extract(temp_image.name)
if ocr_result.strip():
cleaned_text += "\n" + ocr_result
except Exception as e:
print(f"OCR处理失败: {e}")
finally:
try:
os.remove(temp_image.name)
except Exception as e:
print(f"删除临时文件失败: {e}")
result[str(page_num + 1)] = cleaned_text
except Exception as e:
print(f"处理PDF时出错: {e}")
# 保存结果到JSON文件
try:
with open(file_path + ".json", 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=4)
except Exception as e:
print(f"保存结果到JSON文件时出错: {e}")
print("PDF预处理完成")
return result
if __name__ == '__main__':
pdf_path = "C:/test/所投主要产品检测报告.pdf"
res = extract_text_json_by_page(pdf_path)
print(json.dumps(res, ensure_ascii=False, indent=4))