This commit is contained in:
zhangsan 2024-08-22 19:21:03 +08:00
parent b6f95d5498
commit 09989897c2
7 changed files with 174 additions and 169 deletions

View File

@ -1,2 +1,11 @@
scrape.py脚本负责抓取网址
main_extraction.py负责抓取网页内容
如何运行?
cd到项目根目录
pip install -r requirements.txt
安装所需依赖。
然后先运行scrape.py再运行main_extraction是一个完整的流程。
scrape.py脚本负责抓取处罚信息公开表网址保存至txt文件中
main_extraction.py负责读取txt文件中的url抓取网页内容,处理失败的url将保存至error_urls.txt中成功的会添加到output_data*.xlsx中
标题网址提取.py 负责抓取决定书以及处罚信息公开表保存至excel文件中

View File

@ -1,72 +0,0 @@
from selenium import webdriver
from bs4 import BeautifulSoup
import pandas as pd
import time
def clean_text(html_content):
# 使用BeautifulSoup来解析内容逐个元素提取文本避免添加不必要的空格
soup = BeautifulSoup(html_content, 'html.parser')
text = "" # 初始化一个空字符串用于拼接文本
for element in soup.stripped_strings: # 遍历所有文本节点,去除首尾空白
if element == "一、" or element == "二、": # 如果是列表标记,加入换行符
text += "\n" + element
else:
text += element # 直接拼接文本,不添加额外空格
return text.strip() # 返回处理后的文本
def fetch_data(urls):
# 设置Chrome选项以在后台运行
options = webdriver.ChromeOptions()
options.add_argument('headless')
# 初始化WebDriver
driver = webdriver.Chrome(options=options)
# 初始化一个空的DataFrame以存储最终数据
all_data = pd.DataFrame()
for url in urls:
# 访问页面
driver.get(url)
time.sleep(3) # 等待JavaScript执行
# 获取页面源码
html = driver.page_source
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html, 'html.parser')
table = soup.find('table', class_='MsoNormalTable')
# 如果页面上有表格
if table:
rows = table.find_all('tr')
temp_dict = {}
for row in rows:
columns = row.find_all('td')
if len(columns) >= 2: # 确保每行至少有两个列Header和Content
header = columns[0].get_text(strip=True)
content_html = str(columns[1]) # 获取原始HTML内容
content = clean_text(content_html) # 清洗并去除不必要的空格
temp_dict[header] = content
# 将字典转换为DataFrame并添加到总的DataFrame中
df = pd.DataFrame([temp_dict])
all_data = pd.concat([all_data, df], ignore_index=True)
# 关闭浏览器
driver.quit()
return all_data
# 定义要处理的URL列表
urls = [
"https://www.cbirc.gov.cn/cn/view/pages/ItemDetail.html?docId=1171824&itemId=4115&generaltype=9",
# 可以添加更多的URL
]
# 调用函数并获取数据
result_data = fetch_data(urls)
# 保存到Excel文件
result_data.to_excel('output_data.xlsx', index=False)

View File

@ -1,47 +0,0 @@
import pandas as pd
from bs4 import BeautifulSoup
# 从文件中读取HTML内容
with open('D:/folder/study/reptile-project/data.html', 'r', encoding='utf-8') as file:
html_content = file.read()
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 初始化结果字典
results = {
"行政处罚决定书文号": "",
"被处罚当事人": "",
"主要违法违规事实": "",
"行政处罚依据": "",
"行政处罚决定": "",
"作出处罚决定的机关名称": "",
"作出处罚决定的日期": ""
}
# 获取所有的tr元素
table_rows = soup.find_all('tr')
# 提取信息
if len(table_rows) >= 9:
results["行政处罚决定书文号"] = table_rows[0].find_all('td')[1].find('p').get_text(strip=True)
# 个人姓名、单位名称、单位法定代表人姓名
person_name = table_rows[1].find_all('td')[2].find('p').get_text(strip=True)
org_name = table_rows[2].find_all('td')[2].find('p').get_text(strip=True)
legal_rep_name = table_rows[3].find_all('td')[1].find('p').get_text(strip=True)
# 格式化被处罚当事人信息
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
results["主要违法违规事实"] = table_rows[4].find_all('td')[1].find('p').get_text(strip=True)
results["行政处罚依据"] = table_rows[5].find_all('td')[1].find('p').get_text(strip=True)
results["行政处罚决定"] = table_rows[6].find_all('td')[1].find('p').get_text(strip=True)
results["作出处罚决定的机关名称"] = table_rows[7].find_all('td')[1].find('p').get_text(strip=True)
results["作出处罚决定的日期"] = table_rows[8].find_all('td')[1].find('p').get_text(strip=True)
# 创建DataFrame
df = pd.DataFrame([results])
# 保存DataFrame到Excel文件
df.to_excel('output_data.xlsx', index=False, engine='openpyxl')

View File

View File

@ -102,7 +102,6 @@ def process_table(table_rows,current_url, error_urls):
if len(cells) == 3:
# 检查并拼接具有三个td的行的第二和第三个td内容
name = clean_text(str(cells[1])) + ":" + clean_text(str(cells[2]))
print(name)
flag=1
if(flag):
results["被处罚当事人"] = name
@ -126,21 +125,7 @@ def process_table(table_rows,current_url, error_urls):
return None
return results
def create_browser():
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 使用无头模式
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Remote(
command_executor='http://chrome:4444/wd/hub',
options=options
)
return driver
def fetch_data(urls):
# def create_browser(): #适用docker环境中
# options = webdriver.ChromeOptions()
# options.add_argument('--headless') # 使用无头模式
# options.add_argument('--disable-blink-features=AutomationControlled')
@ -148,24 +133,38 @@ def fetch_data(urls):
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
# options.add_experimental_option('excludeSwitches', ['enable-automation'])
# options.add_experimental_option('useAutomationExtension', False)
# driver = webdriver.Chrome(options=options)
# driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
# 'source': '''
# Object.defineProperty(navigator, 'webdriver', {
# get: () => undefined
# });
# window.navigator.chrome = {
# runtime: {}
# };
# Object.defineProperty(navigator, 'languages', {
# get: () => ['en-US', 'en']
# });
# Object.defineProperty(navigator, 'plugins', {
# get: () => [1, 2, 3, 4, 5]
# });
# '''
# })
driver = create_browser()
# driver = webdriver.Remote(
# command_executor='http://chrome:4444/wd/hub',
# options=options
# )
# return driver
def fetch_data(urls):
options = webdriver.ChromeOptions() #适用本地环境
options.add_argument('--headless') # 使用无头模式
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {}
};
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
'''
})
# driver = create_browser()
all_data = pd.DataFrame()
error_urls = []
@ -270,5 +269,5 @@ def process_in_batches(url_files_pattern, output_file_prefix, batch_size=100, ma
url_files_pattern = 'url*.txt' # 匹配所有以 'url' 开头的 txt 文件
output_file_prefix = 'output_data'
process_in_batches(url_files_pattern, output_file_prefix, batch_size=100)
process_in_batches(url_files_pattern, output_file_prefix, batch_size=100) #默认每处理100个url更新一次excel每个excel存储数据大于1万条时新增一个文件保存。
print("Data has been appended to the existing Excel files.")

View File

@ -1,5 +1,3 @@
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
@ -17,7 +15,7 @@ def random_wait(min_time=1, max_time=5):
# Function to create a new browser session with options to avoid detection
def create_browser():
def create_browser(): #适合本地环境
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--headless") # Enable headless mode
@ -44,7 +42,7 @@ def create_browser():
})
return driver
# def create_browser():
# def create_browser(): #适合在docker环境中
# options = webdriver.ChromeOptions()
# options.add_argument("--disable-blink-features=AutomationControlled")
# options.add_argument("--headless") # Enable headless mode
@ -164,13 +162,10 @@ def fetch_unique_urls(base_page_url, base_url, date_limit, output_path_prefix):
# Example usage
base_page_url = "https://www.cbirc.gov.cn/cn/view/pages/ItemList.html?itemPId=923&itemId=4113&itemUrl=ItemListRightList.html&itemName=%E6%80%BB%E5%B1%80%E6%9C%BA%E5%85%B3&itemsubPId=931&itemsubPName=%E8%A1%8C%E6%94%BF%E5%A4%84%E7%BD%9A"
#base_page_url是你要提取的网址的首页脚本会自动进行翻页。
#每个txt保存20000条urltxt命名为url1 url2 url3 ...
base_page_url = "https://www.cbirc.gov.cn/cn/view/pages/ItemList.html?itemPId=923&itemId=4115&itemUrl=ItemListRightList.html&itemName=%E7%9B%91%E7%AE%A1%E5%88%86%E5%B1%80%E6%9C%AC%E7%BA%A7&itemsubPId=931&itemsubPName=%E8%A1%8C%E6%94%BF%E5%A4%84%E7%BD%9A"
base_url = 'https://www.cbirc.gov.cn/cn/view/pages/'
date_limit = datetime(2006, 4, 26)
date_limit = datetime(2024, 8, 20)
output_path_prefix = 'url'
# 从环境变量读取参数
# base_page_url = os.getenv('BASE_PAGE_URL')
# base_url = os.getenv('BASE_URL')
# date_limit = datetime.strptime(os.getenv('DATE_LIMIT'), "%Y-%m-%d")
# output_path_prefix = os.getenv('OUTPUT_PATH_PREFIX')
fetch_unique_urls(base_page_url, base_url, date_limit, output_path_prefix)

121
标题网址提取.py Normal file
View File

@ -0,0 +1,121 @@
import os
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from bs4 import BeautifulSoup
import time
from datetime import datetime
import random
def create_empty_excel(filename):
"""创建一个包含指定列的空Excel文件。"""
columns = ["标题", "网址", "日期"]
df = pd.DataFrame(columns=columns)
df.to_excel(filename, index=False)
def save_data_to_excel(data, filename):
"""将收集的数据追加到Excel文件中。如果文件不存在先创建文件。"""
new_data_df = pd.DataFrame(data, columns=['标题', '网址', '日期'])
if not os.path.exists(filename):
create_empty_excel(filename)
# 读取现有的Excel文件到DataFrame
existing_df = pd.read_excel(filename)
# 使用concat而不是append来合并数据框
updated_df = pd.concat([existing_df, new_data_df], ignore_index=True)
# 将更新后的DataFrame写回到Excel文件覆盖原有文件
updated_df.to_excel(filename, index=False)
def random_wait(min_time=1, max_time=3):
time.sleep(random.uniform(min_time, max_time))
def create_browser():
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--headless")
options.add_experimental_option('excludeSwitches', ['enable-automation'])
driver = webdriver.Chrome(options=options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {}
};
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
'''
})
return driver
def fetch_and_save_data(base_page_url, base_url, date_limit, output_filename='extracted_data.xlsx', batch_size=100):
driver = create_browser()
collected_data = []
driver.get(base_page_url)
should_continue = True
cur_page = 0
batch_number = 1
while should_continue:
cur_page += 1
print(f"Visiting new page: {cur_page}")
random_wait()
soup = BeautifulSoup(driver.page_source, 'html.parser')
div_elements = soup.find_all('div', class_="panel-row ng-scope")
for div in div_elements:
date_span = div.find('span', class_='date ng-binding')
if date_span:
date_text = date_span.text.strip()
date = datetime.strptime(date_text, "%Y-%m-%d")
if date >= date_limit:
link = div.find('a', href=True)
if link and ("处罚决定" in link.text or "监罚" in link.text):
title = link.get('title', '')
href = link['href']
full_url = base_url + href
collected_data.append([title, full_url, date_text])
if len(collected_data) >= batch_size:
save_data_to_excel(collected_data, output_filename)
collected_data = [] # Reset the collected data list after saving
print(f"Batch {batch_number} saved. Continuing to next batch.")
batch_number += 1
else:
should_continue = False
break
if should_continue:
try:
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//a[@ng-click='pager.next()']"))
)
ActionChains(driver).click(next_button).perform()
except Exception as e:
print(f"Failed to navigate to next page: {e}")
break
if collected_data: # Save any remaining data after finishing all pages
save_data_to_excel(collected_data, output_filename)
print(f"Final batch saved.")
driver.quit()
print("Data has been saved or appended to extracted_data.xlsx")
# Example usage
base_page_url = "https://www.cbirc.gov.cn/cn/view/pages/ItemList.html?itemPId=923&itemId=4115&itemUrl=ItemListRightList.html&itemName=%E7%9B%91%E7%AE%A1%E5%88%86%E5%B1%80%E6%9C%AC%E7%BA%A7&itemsubPId=931&itemsubPName=%E8%A1%8C%E6%94%BF%E5%A4%84%E7%BD%9A"
base_url = 'https://www.cbirc.gov.cn/cn/view/pages/'
date_limit = datetime(2024, 8, 20)
fetch_and_save_data(base_page_url, base_url, date_limit) #默认输出到本目录下的output_filename='extracted_data.xlsx', batch_size=100