This commit is contained in:
zhangsan 2024-07-26 23:44:36 +08:00
commit f6db74c2a6
12 changed files with 7053 additions and 0 deletions

19
.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
# 忽略IDE配置文件
.idea/
# 忽略Python编译文件
*.pyc
__pycache__/
# 忽略虚拟环境文件夹
venv/
# 忽略操作系统生成的文件
.DS_Store
Thumbs.db
# 忽略日志文件
*.log
# 忽略临时文件
*.tmp

BIN
Exported_Data.xlsx Normal file

Binary file not shown.

1
data.html Normal file

File diff suppressed because one or more lines are too long

72
extract_table.py Normal file
View File

@ -0,0 +1,72 @@
from selenium import webdriver
from bs4 import BeautifulSoup
import pandas as pd
import time
def clean_text(html_content):
# 使用BeautifulSoup来解析内容逐个元素提取文本避免添加不必要的空格
soup = BeautifulSoup(html_content, 'html.parser')
text = "" # 初始化一个空字符串用于拼接文本
for element in soup.stripped_strings: # 遍历所有文本节点,去除首尾空白
if element == "一、" or element == "二、": # 如果是列表标记,加入换行符
text += "\n" + element
else:
text += element # 直接拼接文本,不添加额外空格
return text.strip() # 返回处理后的文本
def fetch_data(urls):
# 设置Chrome选项以在后台运行
options = webdriver.ChromeOptions()
options.add_argument('headless')
# 初始化WebDriver
driver = webdriver.Chrome(options=options)
# 初始化一个空的DataFrame以存储最终数据
all_data = pd.DataFrame()
for url in urls:
# 访问页面
driver.get(url)
time.sleep(3) # 等待JavaScript执行
# 获取页面源码
html = driver.page_source
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html, 'html.parser')
table = soup.find('table', class_='MsoNormalTable')
# 如果页面上有表格
if table:
rows = table.find_all('tr')
temp_dict = {}
for row in rows:
columns = row.find_all('td')
if len(columns) >= 2: # 确保每行至少有两个列Header和Content
header = columns[0].get_text(strip=True)
content_html = str(columns[1]) # 获取原始HTML内容
content = clean_text(content_html) # 清洗并去除不必要的空格
temp_dict[header] = content
# 将字典转换为DataFrame并添加到总的DataFrame中
df = pd.DataFrame([temp_dict])
all_data = pd.concat([all_data, df], ignore_index=True)
# 关闭浏览器
driver.quit()
return all_data
# 定义要处理的URL列表
urls = [
"https://www.cbirc.gov.cn/cn/view/pages/ItemDetail.html?docId=1171824&itemId=4115&generaltype=9",
# 可以添加更多的URL
]
# 调用函数并获取数据
result_data = fetch_data(urls)
# 保存到Excel文件
result_data.to_excel('output_data.xlsx', index=False)

47
extract_table2.py Normal file
View File

@ -0,0 +1,47 @@
import pandas as pd
from bs4 import BeautifulSoup
# 从文件中读取HTML内容
with open('D:/folder/study/reptile-project/data.html', 'r', encoding='utf-8') as file:
html_content = file.read()
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 初始化结果字典
results = {
"行政处罚决定书文号": "",
"被处罚当事人": "",
"主要违法违规事实": "",
"行政处罚依据": "",
"行政处罚决定": "",
"作出处罚决定的机关名称": "",
"作出处罚决定的日期": ""
}
# 获取所有的tr元素
table_rows = soup.find_all('tr')
# 提取信息
if len(table_rows) >= 9:
results["行政处罚决定书文号"] = table_rows[0].find_all('td')[1].find('p').get_text(strip=True)
# 个人姓名、单位名称、单位法定代表人姓名
person_name = table_rows[1].find_all('td')[2].find('p').get_text(strip=True)
org_name = table_rows[2].find_all('td')[2].find('p').get_text(strip=True)
legal_rep_name = table_rows[3].find_all('td')[1].find('p').get_text(strip=True)
# 格式化被处罚当事人信息
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
results["主要违法违规事实"] = table_rows[4].find_all('td')[1].find('p').get_text(strip=True)
results["行政处罚依据"] = table_rows[5].find_all('td')[1].find('p').get_text(strip=True)
results["行政处罚决定"] = table_rows[6].find_all('td')[1].find('p').get_text(strip=True)
results["作出处罚决定的机关名称"] = table_rows[7].find_all('td')[1].find('p').get_text(strip=True)
results["作出处罚决定的日期"] = table_rows[8].find_all('td')[1].find('p').get_text(strip=True)
# 创建DataFrame
df = pd.DataFrame([results])
# 保存DataFrame到Excel文件
df.to_excel('output_data.xlsx', index=False, engine='openpyxl')

0
main.py Normal file
View File

188
main_extraction.py Normal file
View File

@ -0,0 +1,188 @@
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import time
import random
def clean_text(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
paragraphs = soup.find_all('p')
lines = []
for p in paragraphs:
# 如果 span 有子元素,比如 <span>(一)</span>,就忽略 span 标签
line = ''.join([span.get_text(strip=True) for span in p.find_all('span', recursive=False)])
lines.append(line)
return '\n'.join(lines).strip()
def process_table(table_rows):
results = {
"行政处罚决定书文号": "",
"被处罚当事人": "",
"主要违法违规事实": "",
"行政处罚依据": "",
"行政处罚决定": "",
"作出处罚决定的机关名称": "",
"作出处罚决定的日期": ""
}
try:
if len(table_rows) == 9:
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
person_name = clean_text(str(table_rows[1].find_all('td')[2]))
org_name = clean_text(str(table_rows[2].find_all('td')[2]))
legal_rep_name = clean_text(str(table_rows[3].find_all('td')[1]))
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
results["主要违法违规事实"] = clean_text(str(table_rows[4].find_all('td')[1]))
results["行政处罚依据"] = clean_text(str(table_rows[5].find_all('td')[1]))
results["行政处罚决定"] = clean_text(str(table_rows[6].find_all('td')[1]))
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[7].find_all('td')[1]))
results["作出处罚决定的日期"] = clean_text(str(table_rows[8].find_all('td')[1]))
elif len(table_rows) == 10:
results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1]))
person_name = clean_text(str(table_rows[1].find_all('td')[3]))
person_org = clean_text(str(table_rows[2].find_all('td')[1]))
org_name = clean_text(str(table_rows[3].find_all('td')[2]))
legal_rep_name = clean_text(str(table_rows[4].find_all('td')[1]))
results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"个人单位": "{person_org}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"'
results["主要违法违规事实"] = clean_text(str(table_rows[5].find_all('td')[1]))
results["行政处罚依据"] = clean_text(str(table_rows[6].find_all('td')[1]))
results["行政处罚决定"] = clean_text(str(table_rows[7].find_all('td')[1]))
results["作出处罚决定的机关名称"] = clean_text(str(table_rows[8].find_all('td')[1]))
results["作出处罚决定的日期"] = clean_text(str(table_rows[9].find_all('td')[1]))
else:
temp_dict = {}
for row in table_rows:
columns = row.find_all('td')
if len(columns) >= 2:
header = columns[0].get_text(strip=True)
if "违法违规" in header:
header = "主要违法违规事实"
if "机关名称" in header:
header = "作出处罚决定的机关名称"
if "日期" in header:
header = "作出处罚决定的日期"
content_html = str(columns[1])
content = clean_text(content_html)
temp_dict[header] = content
results = temp_dict
except Exception as e:
print(f"Error processing table: {e}")
return results
def fetch_data(urls):
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 使用无头模式
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {}
};
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
'''
})
all_data = pd.DataFrame()
error_urls = []
for url in urls:
try:
driver.get(url)
print("Processing URL:", url)
random_wait(1, 3) # 随机等待时间
html = driver.page_source
soup = BeautifulSoup(html, 'html.parser')
# 尝试不同的选择器
selectors = [
'.Section0 .MsoNormalTable, .Section0 .MsoTableGrid',
'.Section1 .MsoNormalTable, .Section1 .MsoTableGrid',
'.WordSection1 .MsoNormalTable, .WordSection1 .MsoTableGrid',
'.Section0 table', # 直接查找Section0内的table
'.Section1 table', # 直接查找Section1内的table
'.WordSection1 table' # 直接查找WordSection1内的table
]
table = None
for selector in selectors:
table = soup.select_one(selector)
if table:
break
if table:
table_rows = table.find_all('tr')
results = process_table(table_rows)
df = pd.DataFrame([results])
all_data = pd.concat([all_data, df], ignore_index=True)
else:
print(f"No table found for URL: {url}")
error_urls.append(url)
except Exception as e:
print(f"Error processing URL {url}: {e}")
error_urls.append(url)
driver.quit()
if error_urls:
with open('error_urls.txt', 'w') as file:
for error_url in error_urls:
file.write(f"{error_url}\n")
print(f"Error URLs have been saved to error_urls.txt")
return all_data
def random_wait(min_time=1, max_time=3):
time.sleep(random.uniform(min_time, max_time))
def process_in_batches(urls, batch_size=100):
total_urls = len(urls)
num_batches = (total_urls // batch_size) + (1 if total_urls % batch_size != 0 else 0)
for batch_num in range(num_batches):
start_index = batch_num * batch_size
end_index = start_index + batch_size
batch_urls = urls[start_index:end_index]
print(f"Processing batch {batch_num + 1} of {num_batches}")
batch_data = fetch_data(batch_urls)
try:
existing_data = pd.read_excel('output_data.xlsx', sheet_name='Sheet1')
combined_data = pd.concat([existing_data, batch_data], ignore_index=True)
except FileNotFoundError:
combined_data = batch_data
with pd.ExcelWriter('output_data.xlsx', engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
combined_data.to_excel(writer, index=False, sheet_name='Sheet1')
# 读取URL列表
with open('urls.txt', 'r') as file:
urls = [line.strip() for line in file if line.strip()]
# 分批处理URL并写入Excel
process_in_batches(urls, batch_size=50)
print("Data has been appended to the existing Excel file.")

BIN
output_data.xlsx Normal file

Binary file not shown.

133
scrape.py Normal file
View File

@ -0,0 +1,133 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time
from datetime import datetime
import random
# Function to generate a random waiting time
def random_wait(min_time=1, max_time=5):
time.sleep(random.uniform(min_time, max_time))
# Function to create a new browser session with options to avoid detection
def create_browser():
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled")
# options.add_argument("--headless") # Uncomment this line to use headless mode
options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {}
};
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
'''
})
return driver
# Initialize WebDriver
driver = create_browser()
# Base URL information
base_page_url = "https://www.cbirc.gov.cn/cn/view/pages/ItemList.html?itemPId=923&itemId=4115&itemUrl=ItemListRightList.html&itemName=%E7%9B%91%E7%AE%A1%E5%88%86%E5%B1%80%E6%9C%AC%E7%BA%A7&itemsubPId=931&itemsubPName=%E8%A1%8C%E6%94%BF%E5%A4%84%E7%BD%9A"
base_url = 'https://www.cbirc.gov.cn/cn/view/pages/'
# Set to store unique URLs
unique_urls = set()
# Function to check date
def is_date_valid(date_text):
given_date = datetime.strptime(date_text, "%Y-%m-%d")
return given_date >= datetime(2023, 6, 1)
# Visit the initial page
driver.get(base_page_url)
cur_page = 0
# Keep processing until a date before June 1, 2023, is found
while True:
cur_page += 1
print("Visiting new page:" + str(cur_page))
# Wait for JavaScript to load
random_wait()
# Get the page source after JS execution
html = driver.page_source
# Parse the HTML using BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Find all <div> elements that match class conditions
div_elements = soup.find_all('div', class_="panel-row ng-scope")
# Variable to determine if loop should continue
should_continue = False
# Iterate through the div elements to find links and dates
for div in div_elements:
date_span = div.find('span', class_='date ng-binding')
if date_span:
date_text = date_span.text.strip()
if is_date_valid(date_text):
should_continue = True
link = div.find('a', href=True, attrs={'ng-bind-html': 'x.docSubtitle|trustHtml'})
if link and "处罚信息公开表" in link.text:
href = link['href']
full_url = base_url + href
if "//cn/view/pages/" not in full_url:
unique_urls.add(full_url)
else:
# Since this date is invalid and dates are sorted in descending order, no need to continue
should_continue = False
break
# Check if loop should continue
if not should_continue:
break
# Try to find and click the next page button
try:
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//a[@ng-click='pager.next()']"))
)
ActionChains(driver).move_to_element(next_button).click().perform()
random_wait() # Wait for the next page to load
except Exception as e:
print("No more pages or error occurred:", e)
break
# Close the browser
driver.quit()
# Print all unique URLs and count them
cnt = 0
# Open a file to write
with open('urls.txt', 'w') as file:
for url in unique_urls:
cnt += 1
file.write(url + '\n') # Write each URL followed by a newline
print("URLs have been saved to urls1.txt")
print("Total URLs found:", cnt)

1
testurl.txt Normal file
View File

@ -0,0 +1 @@
https://www.cbirc.gov.cn/cn/view/pages/ItemDetail.html?docId=1116560&itemId=4115&generaltype=9

5184
urls.txt Normal file

File diff suppressed because it is too large Load Diff

1408
urls1.txt Normal file

File diff suppressed because it is too large Load Diff