138 lines
5.4 KiB
Python
138 lines
5.4 KiB
Python
import os
|
|
import time
|
|
from typing import Any, Dict
|
|
|
|
from alibabacloud_credentials.client import Client as CredClient
|
|
from alibabacloud_docmind_api20220711.client import Client as DocMindClient20220711
|
|
from alibabacloud_docmind_api20220711 import models as docmind_models
|
|
from alibabacloud_tea_openapi import models as open_api_models
|
|
from alibabacloud_tea_util.client import Client as UtilClient
|
|
from alibabacloud_tea_util import models as util_models
|
|
|
|
|
|
class DocMindClient:
|
|
def __init__(self, endpoint: str = 'docmind-api.cn-hangzhou.aliyuncs.com'):
|
|
# Initialize credentials
|
|
cred_client = CredClient()
|
|
credential = cred_client.get_credential()
|
|
|
|
# Configure OpenAPI
|
|
config = open_api_models.Config(
|
|
access_key_id=credential.access_key_id,
|
|
access_key_secret=credential.access_key_secret,
|
|
)
|
|
config.endpoint = endpoint
|
|
|
|
# Initialize DocMind API client
|
|
self.client = DocMindClient20220711(config)
|
|
|
|
def submit_job(self, file_path: str, file_name: str) -> str:
|
|
"""
|
|
Submits a document parsing job.
|
|
|
|
:param file_path: Path to the local file to be uploaded.
|
|
:param file_name: Name of the file, including the extension.
|
|
:return: The ID of the submitted job.
|
|
"""
|
|
try:
|
|
with open(file_path, "rb") as file_stream:
|
|
request = docmind_models.SubmitDocParserJobAdvanceRequest(
|
|
file_url_object=file_stream,
|
|
file_name=file_name
|
|
)
|
|
runtime = util_models.RuntimeOptions()
|
|
response = self.client.submit_doc_parser_job_advance(request, runtime)
|
|
job_id = response.body.data.id
|
|
print(f"Job submitted successfully. Job ID: {job_id}")
|
|
return job_id
|
|
except Exception as error:
|
|
UtilClient.assert_as_string(error.message)
|
|
raise
|
|
|
|
def query_status(self, job_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Queries the status of a submitted job.
|
|
|
|
:param job_id: The ID of the job to query.
|
|
:return: A dictionary containing the status and related information.
|
|
"""
|
|
try:
|
|
request = docmind_models.QueryDocParserStatusRequest(id=job_id)
|
|
response = self.client.query_doc_parser_status(request)
|
|
status_info = response.body.data
|
|
print(f"Job Status: {status_info.status}")
|
|
return status_info
|
|
except Exception as error:
|
|
UtilClient.assert_as_string(error.message)
|
|
raise
|
|
|
|
def get_result(self, job_id: str, layout_step_size: int = 10, layout_num: int = 0) -> Dict[str, Any]:
|
|
"""
|
|
Retrieves the result of a completed job.
|
|
|
|
:param job_id: The ID of the completed job.
|
|
:param layout_step_size: Step size for layout processing.
|
|
:param layout_num: Number of layouts to retrieve.
|
|
:return: A dictionary containing the parsing results.
|
|
"""
|
|
try:
|
|
request = docmind_models.GetDocParserResultRequest(
|
|
id=job_id,
|
|
layout_step_size=layout_step_size,
|
|
layout_num=layout_num
|
|
)
|
|
response = self.client.get_doc_parser_result(request)
|
|
result = response.body.data
|
|
print(f"Result retrieved for Job ID: {job_id}")
|
|
return result
|
|
except Exception as error:
|
|
UtilClient.assert_as_string(error.message)
|
|
raise
|
|
|
|
def alipdf2markdown(file_path,output_path):
|
|
file_name=os.path.basename(file_path)
|
|
# Initialize DocMind client
|
|
docmind_client = DocMindClient()
|
|
|
|
# Step 1: Submit the file for parsing
|
|
job_id = docmind_client.submit_job(file_path, file_name)
|
|
|
|
# Step 2: Poll for job status until completion
|
|
while True:
|
|
status_info = docmind_client.query_status(job_id)
|
|
if status_info.status.lower() in ['success', 'failed']:
|
|
break
|
|
print("Job is still processing. Waiting for 10 seconds before retrying...")
|
|
time.sleep(10) # Wait for 10 seconds before checking again
|
|
|
|
if status_info.status.lower() == 'success':
|
|
print("Job completed successfully.")
|
|
# Step 3: Retrieve the parsing result
|
|
try:
|
|
result = docmind_client.get_result(job_id)
|
|
except Exception as e:
|
|
print(f"获取结果失败: {e}")
|
|
return
|
|
|
|
# 提取并连接每个布局的 'markdownContent'
|
|
try:
|
|
print(result)
|
|
layouts = result.get('layouts', [])
|
|
markdown_contents = [layout.get('markdownContent', '') for layout in layouts]
|
|
concatenated_markdown = '\n'.join(markdown_contents)
|
|
|
|
# 将连接后的 markdown 写入 'extract.txt'
|
|
with open(output_path, 'w', encoding='utf-8') as extract_file:
|
|
extract_file.write(concatenated_markdown)
|
|
|
|
# print("Markdown 内容已成功提取到 'extract.txt'。")
|
|
except Exception as e:
|
|
print(f"处理并写入 Markdown 内容失败: {e}")
|
|
else:
|
|
print("Job failed. Please check the error logs for more details.")
|
|
|
|
if __name__ == "__main__":
|
|
file_path = r'C:\Users\Administrator\Desktop\货物标\output1\招标文件正文_procurement.pdf'
|
|
output_path = r'C:\Users\Administrator\Desktop\货物标\extract_files\义务教育学校多媒体.txt'
|
|
alipdf2markdown(file_path,output_path)
|