# # import re # # import PyPDF2 # # import tempfile # # from utils.ocr_engine import OcrEngine # # # # # # # 假设您的OCR函数名为 `ocr_extract`,并接受图片文件路径作为参数 # # # 请将此函数替换为您实际的OCR实现 # # # # def clean_page_content(text, common_header): # # # 首先删除抬头公共部分 # # if common_header: # 确保有公共抬头才进行替换 # # for header_line in common_header.split('\n'): # # if header_line.strip(): # 只处理非空行 # # # 替换首次出现的完整行 # # text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1) # # # # # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除 # # text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时 # # text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码 # # text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码 # # text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码 # # return text # # # # # # def extract_common_header(pdf_path): # # from PyPDF2 import PdfReader # # # # def get_headers(pdf_document, start_page, pages_to_read): # # headers = [] # # for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))): # # page = pdf_document.pages[i] # # text = page.extract_text() or "" # # if text: # # # 只取每页的前三行,去除前后的空白字符 # # first_lines = [line.strip() for line in text.strip().split('\n')[:3]] # # headers.append(first_lines) # # return headers # # # # def find_common_headers(headers): # # if not headers: # # return [] # # # # # 使用 zip 对齐所有页的对应行 # # common_headers = [] # # for lines in zip(*headers): # # # 检查所有行是否完全相同 # # if all(line == lines[0] for line in lines[1:]): # # common_headers.append(lines[0]) # # return common_headers # # # # pdf_document = PdfReader(pdf_path) # # total_pages = len(pdf_document.pages) # # # # # 定义两个提取策略 # # strategies = [] # # if total_pages >= 3: # # # 策略1:中间的3页 # # middle_page = total_pages // 2 # # start_page = max(0, middle_page - 1) # # strategies.append((start_page, 3)) # # elif total_pages == 2: # # # 策略1:2页 # # strategies.append((0, 2)) # # else: # # # 策略1:1页 # # strategies.append((0, 1)) # # # # # 策略2:前三页 # # if total_pages >= 3: # # strategies.append((0, 3)) # # elif total_pages == 2: # # strategies.append((0, 2)) # # elif total_pages == 1: # # strategies.append((0, 1)) # # # # common_headers = [] # # # # for idx, (start, count) in enumerate(strategies): # # headers = get_headers(pdf_document, start, count) # # if len(headers) < 2: # # continue # 需要至少2页来比较 # # # # current_common = find_common_headers(headers) # # if current_common: # # common_headers = current_common # # break # 找到共同部分后退出 # # # 如果没有找到,继续下一个策略 # # # # return '\n'.join(common_headers) # # # # # # def extract_images_from_page(reader, page): # # images = [] # # try: # # for img in page.images: # # xref = img['xref'] # # image = reader.extract_image(xref) # # image_bytes = image['image'] # # image_ext = image['ext'] # # images.append({'data': image_bytes, 'ext': image_ext}) # # except Exception as e: # # print(f"提取第{reader.pages.index(page) + 1}页图片时出错: {e}") # # return images # # # # # # def extract_text_by_page(file_path): # # common_header = extract_common_header(file_path) # # # print(f"公共抬头:{common_header}") # # # print("--------------------正文开始-------------------") # # result = "" # # with open(file_path, 'rb') as file: # # reader = PyPDF2.PdfReader(file) # # num_pages = len(reader.pages) # # # print(f"Total pages: {num_pages}") # # for page_num in range(num_pages): # # page = reader.pages[page_num] # # text = page.extract_text() or "" # # # # # 提取图片并进行OCR # # images = extract_images_from_page(reader, page) # # ocr_text = "" # # for image in images: # # image_data = image['data'] # # image_ext = image['ext'] # # try: # # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + image_ext) as temp_image: # # temp_image.write(image_data) # # temp_image.flush() # # # 调用OCR函数 # # ocr_result = OcrEngine.recognize_text_from_image(temp_image.name) # # ocr_text += ocr_result + "\n" # # except Exception as e: # # print(f"处理第{page_num + 1}页图片时出错: {e}") # # # # # 清理文本 # # cleaned_text = clean_page_content(text, common_header) # # # 合并OCR文本 # # if ocr_text.strip(): # # cleaned_text += "\n" + ocr_text # # result += cleaned_text # # return result # # # # # # def extract_text_json_by_page(file_path): # # common_header = extract_common_header(file_path) # # # print(f"公共抬头:{common_header}") # # # print("--------------------正文开始-------------------") # # result = {} # # with open(file_path, 'rb') as file: # # reader = PyPDF2.PdfReader(file) # # num_pages = len(reader.pages) # # # print(f"Total pages: {num_pages}") # # for page_num in range(num_pages): # # page = reader.pages[page_num] # # text = page.extract_text() or "" # # # # # 提取图片并进行OCR # # images = extract_images_from_page(reader, page) # # ocr_text = "" # # for image in images: # # image_data = image['data'] # # image_ext = image['ext'] # # try: # # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + image_ext) as temp_image: # # temp_image.write(image_data) # # temp_image.flush() # # # 调用OCR函数 # # ocr_result = OcrEngine.recognize_text_from_image(temp_image.name) # # ocr_text += ocr_result + "\n" # # except Exception as e: # # print(f"处理第{page_num + 1}页图片时出错: {e}") # # # # # 清理文本 # # cleaned_text = clean_page_content(text, common_header) # # # 合并OCR文本 # # if ocr_text.strip(): # # cleaned_text += "\n" + ocr_text # # result[str(page_num + 1)] = cleaned_text # # return result # # # # # # if __name__ == '__main__': # # pdf_path = "C:/test/iDS-TCE900神捕电警抓拍单元产品介绍.pdf" # # res = extract_text_json_by_page(pdf_path) # # print(res) # import json # import os # import re # import shutil # import uuid # # import PyPDF2 # import fitz # PyMuPDF # import tempfile # from utils.ocr_engine import OcrEngine # # # # 假设您的OCR函数名为 `ocr_extract`,并接受图片文件路径作为参数 # def ocr_extract(image_path): # # 示例:调用您的OCR脚本并返回识别的文本 # # 例如: # # return your_ocr_function(image_path) # # 将图片保存到tmp目录 # # shutil.copy(image_path, f'tmp/{uuid.uuid4()}.png') # # return "OCR提取的文本" # 替换为实际的OCR结果 # return OcrEngine.recognize_text_from_image(image_path) # # # def clean_page_content(text, common_header): # # 首先删除抬头公共部分 # if common_header: # 确保有公共抬头才进行替换 # for header_line in common_header.split('\n'): # if header_line.strip(): # 只处理非空行 # # 替换首次出现的完整行 # text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1) # # # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除 # text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时 # text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码 # text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码 # text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码 # return text # # # def extract_common_header(pdf_path): # from PyPDF2 import PdfReader # # def get_headers(pdf_document, start_page, pages_to_read): # headers = [] # for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))): # page = pdf_document.pages[i] # text = page.extract_text() or "" # if text: # # 只取每页的前三行,去除前后的空白字符 # first_lines = [line.strip() for line in text.strip().split('\n')[:3]] # headers.append(first_lines) # return headers # # def find_common_headers(headers): # if not headers: # return [] # # # 使用 zip 对齐所有页的对应行 # common_headers = [] # for lines in zip(*headers): # # 检查所有行是否完全相同 # if all(line == lines[0] for line in lines[1:]): # common_headers.append(lines[0]) # return common_headers # # pdf_document = PdfReader(pdf_path) # total_pages = len(pdf_document.pages) # # # 定义两个提取策略 # strategies = [] # if total_pages >= 3: # # 策略1:中间的3页 # middle_page = total_pages // 2 # start_page = max(0, middle_page - 1) # strategies.append((start_page, 3)) # elif total_pages == 2: # # 策略1:2页 # strategies.append((0, 2)) # else: # # 策略1:1页 # strategies.append((0, 1)) # # # 策略2:前三页 # if total_pages >= 3: # strategies.append((0, 3)) # elif total_pages == 2: # strategies.append((0, 2)) # elif total_pages == 1: # strategies.append((0, 1)) # # common_headers = [] # # for idx, (start, count) in enumerate(strategies): # headers = get_headers(pdf_document, start, count) # if len(headers) < 2: # continue # 需要至少2页来比较 # # current_common = find_common_headers(headers) # if current_common: # common_headers = current_common # break # 找到共同部分后退出 # # 如果没有找到,继续下一个策略 # # return '\n'.join(common_headers) # # # def extract_images_with_pymupdf(pdf_path): # images = [] # try: # doc = fitz.open(pdf_path) # for page_num in range(len(doc)): # page = doc.load_page(page_num) # image_list = page.get_images(full=True) # for img in image_list: # xref = img[0] # base_image = doc.extract_image(xref) # image_bytes = base_image['image'] # image_ext = base_image.get('ext', 'png') # # image_width = base_image.get['xres'] # # image_height = base_image.get['yres'] # # images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1, 'width': image_width, 'height': image_height}). # images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1}) # # # 确保输出目录存在 # if not os.path.exists("output_dir"): # os.makedirs("output_dir") # print(f"创建输出目录: output_dir") # # 构建图片文件名,例如: page_1_img.png # image_filename = f"page_{page_num}_img.{image_ext}" # image_path = os.path.join("output_dir", image_filename) # try: # with open(image_path, 'wb') as img_file: # img_file.write(image_bytes) # print(f"保存图片: {image_path}") # except Exception as e: # print(f"保存图片 {image_filename} 时出错: {e}") # # except Exception as e: # print(f"提取图片时出错: {e}") # return images # # # def extract_text_json_by_page(file_path): # common_header = extract_common_header(file_path) # # print(f"公共抬头:{common_header}") # # print("--------------------正文开始-------------------") # result = {} # # 如果已经有保存的ocr结果,直接读取 # # TODO 待修改 # if os.path.exists(file_path + ".json"): # with open(file_path + ".json", 'r', encoding='utf-8') as f: # result = json.load(f) # return result # images = extract_images_with_pymupdf(file_path) # # filtered_images = filter_images(images) # ocr_text_dict = {} # for img in images: # try: # with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image: # temp_image.write(img['data']) # temp_image.flush() # # 调用OCR函数 # ocr_result = ocr_extract(temp_image.name) # if img['page_num'] in ocr_text_dict: # ocr_text_dict[img['page_num']] += ocr_result + "\n" # else: # ocr_text_dict[img['page_num']] = ocr_result + "\n" # except Exception as e: # print(f"OCR处理失败: {e}") # finally: # try: # os.remove(temp_image.name) # except Exception as e: # print(f"删除临时文件失败: {e}") # # with open(file_path, 'rb') as file: # reader = PyPDF2.PdfReader(file) # num_pages = len(reader.pages) # # print(f"Total pages: {num_pages}") # for page_num in range(num_pages): # page = reader.pages[page_num] # text = page.extract_text() or "" # # 清理文本 # cleaned_text = clean_page_content(text, common_header) # # 合并OCR文本 # if (page_num + 1) in ocr_text_dict and ocr_text_dict[page_num + 1].strip(): # cleaned_text += "\n" + ocr_text_dict[page_num + 1] # result[str(page_num + 1)] = cleaned_text # print("pdf预处理完成") # return result # # # def filter_images(images, min_width=200, min_height=200, text_threshold=5): # """ # 过滤图像,保留可能包含文本的图像。 # :param images: 图像列表,每个图像包含 data, ext, page_num, width, height # :param min_width: 最小宽度 # :param min_height: 最小高度 # :param text_threshold: 检测到的文本字符数阈值 # :return: 过滤后的图像列表 # """ # filtered = [] # for img in images: # # 基于尺寸过滤 # if img['width'] < min_width or img['height'] < min_height: # continue # # 基于文本检测过滤 # try: # # with tempfile.NamedTemporaryFile(delete=True, suffix='.' + img['ext']) as temp_image: # # temp_image.write(img['data']) # # temp_image.flush() # # # 使用OCR快速检测文本 # # ocr_result = ocr_extract(temp_image.name) # # if len(ocr_result.strip()) >= text_threshold: # # img['ocr_text'] = ocr_result # # filtered.append(img) # filtered.append(img) # except Exception as e: # print(f"过滤图像时出错: {e}") # # return filtered # # # if __name__ == '__main__': # pdf_path = "C:/test/发言系统11.pdf" # res = extract_text_json_by_page(pdf_path) # print(res) import os import re import json import tempfile import shutil import uuid from PyPDF2 import PdfReader import fitz # PyMuPDF from utils.ocr_engine import OcrEngine # 请确保OcrEngine已经正确导入 from utils.local_ocr import LocalOCR local_ocr = LocalOCR() def ocr_extract(image_path): # 调用您的OCR引擎来识别图像中的文本 # return OcrEngine.recognize_text_from_image(image_path) # 调用本地ocr return local_ocr.run(image_path) def clean_page_content(text, common_header): # 删除公共抬头 if common_header: for header_line in common_header.split('\n'): if header_line.strip(): text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1) # 删除页码 text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 开头页码 text = re.sub(r'\s+\d+\s*$', '', text) # 结尾页码 text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 形如 /129 text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 形如 '—2—' 或 '-2-' return text def extract_common_header(pdf_path): def get_headers(pdf_document, start_page, pages_to_read): headers = [] for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages))): page = pdf_document.pages[i] text = page.extract_text() or "" if text: first_lines = [line.strip() for line in text.strip().split('\n')[:3]] headers.append(first_lines) return headers def find_common_headers(headers): if not headers: return [] common_headers = [] for lines in zip(*headers): if all(line == lines[0] for line in lines[1:]): common_headers.append(lines[0]) return common_headers pdf_document = PdfReader(pdf_path) total_pages = len(pdf_document.pages) strategies = [] if total_pages >= 3: middle_page = total_pages // 2 start_page = max(0, middle_page - 1) strategies.append((start_page, 3)) elif total_pages == 2: strategies.append((0, 2)) else: strategies.append((0, 1)) if total_pages >= 3: strategies.append((0, 3)) elif total_pages == 2: strategies.append((0, 2)) elif total_pages == 1: strategies.append((0, 1)) common_headers = [] for start, count in strategies: headers = get_headers(pdf_document, start, count) if len(headers) < 2: continue current_common = find_common_headers(headers) if current_common: common_headers = current_common break return '\n'.join(common_headers) def extract_images_from_page(pdf_path, page_num): images = [] try: doc = fitz.open(pdf_path) if page_num < 0 or page_num >= len(doc): print(f"页码 {page_num + 1} 超出范围") return images page = doc.load_page(page_num) image_list = page.get_images(full=True) for img in image_list: xref = img[0] base_image = doc.extract_image(xref) image_bytes = base_image['image'] image_ext = base_image.get('ext', 'png') images.append({'data': image_bytes, 'ext': image_ext, 'page_num': page_num + 1}) except Exception as e: print(f"提取图片时出错: {e}") return images def extract_text_json_by_page(file_path, text_threshold=10): """ 提取PDF每页的文本,若文本量低于text_threshold,则提取图片并OCR。 :param file_path: PDF文件路径 :param text_threshold: 文本量阈值(字符数) :return: 字典,键为页码,值为对应的文本 """ common_header = extract_common_header(file_path) result = {} # 如果已经有保存的OCR结果,直接读取 if os.path.exists(file_path + ".json"): with open(file_path + ".json", 'r', encoding='utf-8') as f: result = json.load(f) return result try: reader = PdfReader(file_path) num_pages = len(reader.pages) for page_num in range(num_pages): page = reader.pages[page_num] text = page.extract_text() or "" cleaned_text = clean_page_content(text, common_header) # 检查文本量是否低于阈值 if len(cleaned_text.strip()) < text_threshold: print(f"第 {page_num + 1} 页文本量低,开始提取图片并OCR") images = extract_images_from_page(file_path, page_num) for img in images: try: with tempfile.NamedTemporaryFile(delete=False, suffix='.' + img['ext']) as temp_image: temp_image.write(img['data']) temp_image.flush() # 调用OCR函数 ocr_result = ocr_extract(temp_image.name) if ocr_result.strip(): cleaned_text += "\n" + ocr_result except Exception as e: print(f"OCR处理失败: {e}") finally: try: os.remove(temp_image.name) except Exception as e: print(f"删除临时文件失败: {e}") result[str(page_num + 1)] = cleaned_text except Exception as e: print(f"处理PDF时出错: {e}") # 保存结果到JSON文件 try: with open(file_path + ".json", 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=4) except Exception as e: print(f"保存结果到JSON文件时出错: {e}") print("PDF预处理完成") return result if __name__ == '__main__': pdf_path = "C:/test/所投主要产品检测报告.pdf" res = extract_text_json_by_page(pdf_path) print(json.dumps(res, ensure_ascii=False, indent=4))