274 lines
12 KiB
Python
274 lines
12 KiB
Python
import pandas as pd
|
||
from bs4 import BeautifulSoup
|
||
from selenium import webdriver
|
||
from selenium.webdriver.common.action_chains import ActionChains
|
||
import time
|
||
import random
|
||
import os
|
||
import glob
|
||
|
||
# def clean_text(html_content):
|
||
# soup = BeautifulSoup(html_content, 'html.parser')
|
||
# paragraphs = soup.find_all('p')
|
||
# lines = []
|
||
# for p in paragraphs:
|
||
# line = ''.join([span.get_text(strip=True) for span in p.find_all('span', recursive=False)])
|
||
# lines.append(line)
|
||
# return '\n'.join(lines).strip()
|
||
|
||
def clean_text(html_content):
|
||
import re
|
||
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# 移除脚本和样式内容
|
||
for script_or_style in soup(["script", "style", "o:p"]):
|
||
script_or_style.decompose()
|
||
|
||
# 提取所有可见文本
|
||
text = soup.get_text(strip=True)
|
||
|
||
# 清理和整理文本,去除所有空格
|
||
cleaned_text = re.sub(r'\s+', '', text) # 去除所有空格
|
||
|
||
return cleaned_text
|
||
|
||
|
||
|
||
def process_table(table_rows,current_url, error_urls):
|
||
results = {
|
||
"行政处罚决定书文号": "",
|
||
"被处罚当事人": "",
|
||
"主要违法违规事实": "",
|
||
"行政处罚依据": "",
|
||
"行政处罚决定": "",
|
||
"作出处罚决定的机关名称": "",
|
||
"作出处罚决定的日期": "",
|
||
"网址":current_url # 新增URL列
|
||
}
|
||
|
||
try:
|
||
if len(table_rows) == 9:
|
||
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
|
||
person_name = clean_text(str(table_rows[1].find_all('td')[2]))
|
||
org_name = clean_text(str(table_rows[2].find_all('td')[2]))
|
||
legal_rep_name = clean_text(str(table_rows[3].find_all('td')[1]))
|
||
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
|
||
results["主要违法违规事实"] = clean_text(str(table_rows[4].find_all('td')[1]))
|
||
results["行政处罚依据"] = clean_text(str(table_rows[5].find_all('td')[1]))
|
||
results["行政处罚决定"] = clean_text(str(table_rows[6].find_all('td')[1]))
|
||
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[7].find_all('td')[1]))
|
||
results["作出处罚决定的日期"] = clean_text(str(table_rows[8].find_all('td')[1]))
|
||
|
||
elif len(table_rows) == 10:
|
||
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
|
||
person_name = clean_text(str(table_rows[1].find_all('td')[3]))
|
||
person_org = clean_text(str(table_rows[2].find_all('td')[1]))
|
||
org_name = clean_text(str(table_rows[3].find_all('td')[2]))
|
||
legal_rep_name = clean_text(str(table_rows[4].find_all('td')[1]))
|
||
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"个人单位": "{person_org}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
|
||
results["主要违法违规事实"] = clean_text(str(table_rows[5].find_all('td')[1]))
|
||
results["行政处罚依据"] = clean_text(str(table_rows[6].find_all('td')[1]))
|
||
results["行政处罚决定"] = clean_text(str(table_rows[7].find_all('td')[1]))
|
||
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[8].find_all('td')[1]))
|
||
results["作出处罚决定的日期"] = clean_text(str(table_rows[9].find_all('td')[1]))
|
||
elif len(table_rows) == 8:
|
||
flag = 0
|
||
for row in table_rows:
|
||
cells = row.find_all('td')
|
||
if len(cells) == 3:
|
||
flag = 1
|
||
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
|
||
if(flag):
|
||
org_name=clean_text(str(table_rows[1].find_all('td')[2]))
|
||
name=clean_text(str(table_rows[2].find_all('td')[1]))
|
||
results["被处罚当事人"] = f'"单位名称": "{org_name}"\n"主要负责人姓名": "{name}"'
|
||
else:
|
||
part1 = clean_text(str(table_rows[1].find_all('td')[0])) + ":" + clean_text(
|
||
str(table_rows[1].find_all('td')[1]))
|
||
part2 = clean_text(str(table_rows[2].find_all('td')[0])) + ":" + clean_text(
|
||
str(table_rows[2].find_all('td')[1]))
|
||
results["被处罚当事人"] = part1 + "\n" + part2 # 使用换行符分隔
|
||
results["主要违法违规事实"] = clean_text(str(table_rows[3].find_all('td')[1]))
|
||
results["行政处罚依据"] = clean_text(str(table_rows[4].find_all('td')[1]))
|
||
results["行政处罚决定"] = clean_text(str(table_rows[5].find_all('td')[1]))
|
||
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[6].find_all('td')[1]))
|
||
results["作出处罚决定的日期"] = clean_text(str(table_rows[7].find_all('td')[1]))
|
||
|
||
elif len(table_rows)==7:
|
||
flag=0
|
||
for row in table_rows:
|
||
cells = row.find_all('td')
|
||
if len(cells) == 3:
|
||
# 检查并拼接具有三个td的行的第二和第三个td内容
|
||
name = clean_text(str(cells[1])) + ":" + clean_text(str(cells[2]))
|
||
flag=1
|
||
if(flag):
|
||
results["被处罚当事人"] = name
|
||
else:
|
||
results["被处罚当事人"] = clean_text(str(table_rows[1].find_all_next('td')[1]))
|
||
results["行政处罚决定书文号"]=clean_text(str(table_rows[0].find_all_next('td')[1]))
|
||
results["主要违法违规事实"] = clean_text(str(table_rows[2].find_all_next('td')[1]))
|
||
results["行政处罚依据"] = clean_text(str(table_rows[3].find_all_next('td')[1]))
|
||
results["行政处罚决定"] = clean_text(str(table_rows[4].find_all_next('td')[1]))
|
||
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[5].find_all_next('td')[1]))
|
||
results["作出处罚决定的日期"] = clean_text(str(table_rows[6].find_all_next('td')[1]))
|
||
else:
|
||
print(f"Unexpected number of rows in table at URL: {current_url}")
|
||
error_urls.append(current_url)
|
||
return None
|
||
|
||
|
||
except Exception as e:
|
||
print(f"Error processing table: {e}")
|
||
error_urls.append(current_url)
|
||
return None
|
||
return results
|
||
|
||
# def create_browser(): #适用docker环境中
|
||
# options = webdriver.ChromeOptions()
|
||
# options.add_argument('--headless') # 使用无头模式
|
||
# options.add_argument('--disable-blink-features=AutomationControlled')
|
||
# options.add_argument(
|
||
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
|
||
# options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
# options.add_experimental_option('useAutomationExtension', False)
|
||
# driver = webdriver.Remote(
|
||
# command_executor='http://chrome:4444/wd/hub',
|
||
# options=options
|
||
# )
|
||
# return driver
|
||
|
||
def fetch_data(urls):
|
||
options = webdriver.ChromeOptions() #适用本地环境
|
||
options.add_argument('--headless') # 使用无头模式
|
||
options.add_argument('--disable-blink-features=AutomationControlled')
|
||
options.add_argument(
|
||
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
|
||
options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
options.add_experimental_option('useAutomationExtension', False)
|
||
driver = webdriver.Chrome(options=options)
|
||
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
|
||
'source': '''
|
||
Object.defineProperty(navigator, 'webdriver', {
|
||
get: () => undefined
|
||
});
|
||
window.navigator.chrome = {
|
||
runtime: {}
|
||
};
|
||
Object.defineProperty(navigator, 'languages', {
|
||
get: () => ['en-US', 'en']
|
||
});
|
||
Object.defineProperty(navigator, 'plugins', {
|
||
get: () => [1, 2, 3, 4, 5]
|
||
});
|
||
'''
|
||
})
|
||
# driver = create_browser()
|
||
|
||
all_data = pd.DataFrame()
|
||
error_urls = []
|
||
for url in urls:
|
||
try:
|
||
driver.get(url)
|
||
print("Processing URL:", url)
|
||
random_wait(1, 3) # 随机等待时间
|
||
html = driver.page_source
|
||
soup = BeautifulSoup(html, 'html.parser')
|
||
|
||
selectors = [
|
||
'.Section0 .MsoNormalTable, .Section0 .MsoTableGrid',
|
||
'.Section1 .MsoNormalTable, .Section1 .MsoTableGrid',
|
||
'.WordSection1 .MsoNormalTable, .WordSection1 .MsoTableGrid',
|
||
'.Section0 table',
|
||
'.Section1 table',
|
||
'.WordSection1 table'
|
||
]
|
||
table = None
|
||
for selector in selectors:
|
||
table = soup.select_one(selector)
|
||
if table:
|
||
break
|
||
|
||
if table:
|
||
table_rows = table.find_all('tr')
|
||
results = process_table(table_rows,url,error_urls)
|
||
df = pd.DataFrame([results])
|
||
all_data = pd.concat([all_data, df], ignore_index=True)
|
||
else:
|
||
print(f"No table found for URL: {url}")
|
||
error_urls.append(url)
|
||
|
||
except Exception as e:
|
||
print(f"Error processing URL {url}: {e}")
|
||
error_urls.append(url)
|
||
|
||
driver.quit()
|
||
|
||
if error_urls:
|
||
with open('error_urls.txt', 'w') as file:
|
||
for error_url in error_urls:
|
||
file.write(f"{error_url}\n")
|
||
print(f"Error URLs have been saved to error_urls.txt")
|
||
|
||
return all_data
|
||
|
||
def random_wait(min_time=1, max_time=3):
|
||
time.sleep(random.uniform(min_time, max_time))
|
||
|
||
def create_empty_excel(filename):
|
||
columns = ["行政处罚决定书文号", "被处罚当事人", "主要违法违规事实", "行政处罚依据", "行政处罚决定", "作出处罚决定的机关名称", "作出处罚决定的日期"]
|
||
df = pd.DataFrame(columns=columns)
|
||
df.to_excel(filename, index=False)
|
||
|
||
def process_in_batches(url_files_pattern, output_file_prefix, batch_size=100, max_rows_per_file=10000):
|
||
url_files = glob.glob(url_files_pattern)
|
||
urls = []
|
||
for url_file in url_files:
|
||
with open(url_file, 'r') as file:
|
||
urls.extend([line.strip() for line in file if line.strip()])
|
||
|
||
total_urls = len(urls)
|
||
num_batches = (total_urls // batch_size) + (1 if total_urls % batch_size != 0 else 0)
|
||
|
||
file_index = 1
|
||
output_filename = f'{output_file_prefix}{file_index}.xlsx'
|
||
rows_in_file = 0
|
||
|
||
if not os.path.exists(output_filename):
|
||
create_empty_excel(output_filename)
|
||
|
||
for batch_num in range(num_batches):
|
||
start_index = batch_num * batch_size
|
||
end_index = start_index + batch_size
|
||
batch_urls = urls[start_index:end_index]
|
||
print(f"Processing batch {batch_num + 1} of {num_batches}")
|
||
|
||
batch_data = fetch_data(batch_urls)
|
||
|
||
try:
|
||
existing_data = pd.read_excel(output_filename, sheet_name='Sheet1')
|
||
combined_data = pd.concat([existing_data, batch_data], ignore_index=True)
|
||
except FileNotFoundError:
|
||
combined_data = batch_data
|
||
|
||
with pd.ExcelWriter(output_filename, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
|
||
combined_data.to_excel(writer, index=False, sheet_name='Sheet1')
|
||
|
||
rows_in_file += batch_data.shape[0]
|
||
|
||
if rows_in_file >= max_rows_per_file:
|
||
file_index += 1
|
||
output_filename = f'{output_file_prefix}{file_index}.xlsx'
|
||
rows_in_file = 0
|
||
|
||
if not os.path.exists(output_filename):
|
||
create_empty_excel(output_filename)
|
||
|
||
# Example usage
|
||
url_files_pattern = 'url*.txt' # 匹配所有以 'url' 开头的 txt 文件
|
||
output_file_prefix = 'output_data'
|
||
|
||
process_in_batches(url_files_pattern, output_file_prefix, batch_size=100) #默认每处理100个url更新一次excel,每个excel存储数据大于1万条时新增一个文件保存。
|
||
print("Data has been appended to the existing Excel files.")
|