reptile/main_extraction.py
2024-08-22 19:21:03 +08:00

274 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import time
import random
import os
import glob
# def clean_text(html_content):
# soup = BeautifulSoup(html_content, 'html.parser')
# paragraphs = soup.find_all('p')
# lines = []
# for p in paragraphs:
# line = ''.join([span.get_text(strip=True) for span in p.find_all('span', recursive=False)])
# lines.append(line)
# return '\n'.join(lines).strip()
def clean_text(html_content):
import re
soup = BeautifulSoup(html_content, 'html.parser')
# 移除脚本和样式内容
for script_or_style in soup(["script", "style", "o:p"]):
script_or_style.decompose()
# 提取所有可见文本
text = soup.get_text(strip=True)
# 清理和整理文本,去除所有空格
cleaned_text = re.sub(r'\s+', '', text) # 去除所有空格
return cleaned_text
def process_table(table_rows,current_url, error_urls):
results = {
"行政处罚决定书文号": "",
"被处罚当事人": "",
"主要违法违规事实": "",
"行政处罚依据": "",
"行政处罚决定": "",
"作出处罚决定的机关名称": "",
"作出处罚决定的日期": "",
"网址":current_url # 新增URL列
}
try:
if len(table_rows) == 9:
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
person_name = clean_text(str(table_rows[1].find_all('td')[2]))
org_name = clean_text(str(table_rows[2].find_all('td')[2]))
legal_rep_name = clean_text(str(table_rows[3].find_all('td')[1]))
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
results["主要违法违规事实"] = clean_text(str(table_rows[4].find_all('td')[1]))
results["行政处罚依据"] = clean_text(str(table_rows[5].find_all('td')[1]))
results["行政处罚决定"] = clean_text(str(table_rows[6].find_all('td')[1]))
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[7].find_all('td')[1]))
results["作出处罚决定的日期"] = clean_text(str(table_rows[8].find_all('td')[1]))
elif len(table_rows) == 10:
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
person_name = clean_text(str(table_rows[1].find_all('td')[3]))
person_org = clean_text(str(table_rows[2].find_all('td')[1]))
org_name = clean_text(str(table_rows[3].find_all('td')[2]))
legal_rep_name = clean_text(str(table_rows[4].find_all('td')[1]))
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"个人单位": "{person_org}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
results["主要违法违规事实"] = clean_text(str(table_rows[5].find_all('td')[1]))
results["行政处罚依据"] = clean_text(str(table_rows[6].find_all('td')[1]))
results["行政处罚决定"] = clean_text(str(table_rows[7].find_all('td')[1]))
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[8].find_all('td')[1]))
results["作出处罚决定的日期"] = clean_text(str(table_rows[9].find_all('td')[1]))
elif len(table_rows) == 8:
flag = 0
for row in table_rows:
cells = row.find_all('td')
if len(cells) == 3:
flag = 1
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
if(flag):
org_name=clean_text(str(table_rows[1].find_all('td')[2]))
name=clean_text(str(table_rows[2].find_all('td')[1]))
results["被处罚当事人"] = f'"单位名称": "{org_name}"\n"主要负责人姓名": "{name}"'
else:
part1 = clean_text(str(table_rows[1].find_all('td')[0])) + ":" + clean_text(
str(table_rows[1].find_all('td')[1]))
part2 = clean_text(str(table_rows[2].find_all('td')[0])) + ":" + clean_text(
str(table_rows[2].find_all('td')[1]))
results["被处罚当事人"] = part1 + "\n" + part2 # 使用换行符分隔
results["主要违法违规事实"] = clean_text(str(table_rows[3].find_all('td')[1]))
results["行政处罚依据"] = clean_text(str(table_rows[4].find_all('td')[1]))
results["行政处罚决定"] = clean_text(str(table_rows[5].find_all('td')[1]))
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[6].find_all('td')[1]))
results["作出处罚决定的日期"] = clean_text(str(table_rows[7].find_all('td')[1]))
elif len(table_rows)==7:
flag=0
for row in table_rows:
cells = row.find_all('td')
if len(cells) == 3:
# 检查并拼接具有三个td的行的第二和第三个td内容
name = clean_text(str(cells[1])) + ":" + clean_text(str(cells[2]))
flag=1
if(flag):
results["被处罚当事人"] = name
else:
results["被处罚当事人"] = clean_text(str(table_rows[1].find_all_next('td')[1]))
results["行政处罚决定书文号"]=clean_text(str(table_rows[0].find_all_next('td')[1]))
results["主要违法违规事实"] = clean_text(str(table_rows[2].find_all_next('td')[1]))
results["行政处罚依据"] = clean_text(str(table_rows[3].find_all_next('td')[1]))
results["行政处罚决定"] = clean_text(str(table_rows[4].find_all_next('td')[1]))
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[5].find_all_next('td')[1]))
results["作出处罚决定的日期"] = clean_text(str(table_rows[6].find_all_next('td')[1]))
else:
print(f"Unexpected number of rows in table at URL: {current_url}")
error_urls.append(current_url)
return None
except Exception as e:
print(f"Error processing table: {e}")
error_urls.append(current_url)
return None
return results
# def create_browser(): #适用docker环境中
# options = webdriver.ChromeOptions()
# options.add_argument('--headless') # 使用无头模式
# options.add_argument('--disable-blink-features=AutomationControlled')
# options.add_argument(
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
# options.add_experimental_option('excludeSwitches', ['enable-automation'])
# options.add_experimental_option('useAutomationExtension', False)
# driver = webdriver.Remote(
# command_executor='http://chrome:4444/wd/hub',
# options=options
# )
# return driver
def fetch_data(urls):
options = webdriver.ChromeOptions() #适用本地环境
options.add_argument('--headless') # 使用无头模式
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {}
};
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
'''
})
# driver = create_browser()
all_data = pd.DataFrame()
error_urls = []
for url in urls:
try:
driver.get(url)
print("Processing URL:", url)
random_wait(1, 3) # 随机等待时间
html = driver.page_source
soup = BeautifulSoup(html, 'html.parser')
selectors = [
'.Section0 .MsoNormalTable, .Section0 .MsoTableGrid',
'.Section1 .MsoNormalTable, .Section1 .MsoTableGrid',
'.WordSection1 .MsoNormalTable, .WordSection1 .MsoTableGrid',
'.Section0 table',
'.Section1 table',
'.WordSection1 table'
]
table = None
for selector in selectors:
table = soup.select_one(selector)
if table:
break
if table:
table_rows = table.find_all('tr')
results = process_table(table_rows,url,error_urls)
df = pd.DataFrame([results])
all_data = pd.concat([all_data, df], ignore_index=True)
else:
print(f"No table found for URL: {url}")
error_urls.append(url)
except Exception as e:
print(f"Error processing URL {url}: {e}")
error_urls.append(url)
driver.quit()
if error_urls:
with open('error_urls.txt', 'w') as file:
for error_url in error_urls:
file.write(f"{error_url}\n")
print(f"Error URLs have been saved to error_urls.txt")
return all_data
def random_wait(min_time=1, max_time=3):
time.sleep(random.uniform(min_time, max_time))
def create_empty_excel(filename):
columns = ["行政处罚决定书文号", "被处罚当事人", "主要违法违规事实", "行政处罚依据", "行政处罚决定", "作出处罚决定的机关名称", "作出处罚决定的日期"]
df = pd.DataFrame(columns=columns)
df.to_excel(filename, index=False)
def process_in_batches(url_files_pattern, output_file_prefix, batch_size=100, max_rows_per_file=10000):
url_files = glob.glob(url_files_pattern)
urls = []
for url_file in url_files:
with open(url_file, 'r') as file:
urls.extend([line.strip() for line in file if line.strip()])
total_urls = len(urls)
num_batches = (total_urls // batch_size) + (1 if total_urls % batch_size != 0 else 0)
file_index = 1
output_filename = f'{output_file_prefix}{file_index}.xlsx'
rows_in_file = 0
if not os.path.exists(output_filename):
create_empty_excel(output_filename)
for batch_num in range(num_batches):
start_index = batch_num * batch_size
end_index = start_index + batch_size
batch_urls = urls[start_index:end_index]
print(f"Processing batch {batch_num + 1} of {num_batches}")
batch_data = fetch_data(batch_urls)
try:
existing_data = pd.read_excel(output_filename, sheet_name='Sheet1')
combined_data = pd.concat([existing_data, batch_data], ignore_index=True)
except FileNotFoundError:
combined_data = batch_data
with pd.ExcelWriter(output_filename, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
combined_data.to_excel(writer, index=False, sheet_name='Sheet1')
rows_in_file += batch_data.shape[0]
if rows_in_file >= max_rows_per_file:
file_index += 1
output_filename = f'{output_file_prefix}{file_index}.xlsx'
rows_in_file = 0
if not os.path.exists(output_filename):
create_empty_excel(output_filename)
# Example usage
url_files_pattern = 'url*.txt' # 匹配所有以 'url' 开头的 txt 文件
output_file_prefix = 'output_data'
process_in_batches(url_files_pattern, output_file_prefix, batch_size=100) #默认每处理100个url更新一次excel每个excel存储数据大于1万条时新增一个文件保存。
print("Data has been appended to the existing Excel files.")