import pandas as pd from bs4 import BeautifulSoup from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains import time import random def clean_text(html_content): soup = BeautifulSoup(html_content, 'html.parser') paragraphs = soup.find_all('p') lines = [] for p in paragraphs: # 如果 span 有子元素,比如 (一),就忽略 span 标签 line = ''.join([span.get_text(strip=True) for span in p.find_all('span', recursive=False)]) lines.append(line) return '\n'.join(lines).strip() def process_table(table_rows): results = { "行政处罚决定书文号": "", "被处罚当事人": "", "主要违法违规事实": "", "行政处罚依据": "", "行政处罚决定": "", "作出处罚决定的机关名称": "", "作出处罚决定的日期": "" } try: if len(table_rows) == 9: results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1])) person_name = clean_text(str(table_rows[1].find_all('td')[2])) org_name = clean_text(str(table_rows[2].find_all('td')[2])) legal_rep_name = clean_text(str(table_rows[3].find_all('td')[1])) results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"' results["主要违法违规事实"] = clean_text(str(table_rows[4].find_all('td')[1])) results["行政处罚依据"] = clean_text(str(table_rows[5].find_all('td')[1])) results["行政处罚决定"] = clean_text(str(table_rows[6].find_all('td')[1])) results["作出处罚决定的机关名称"] = clean_text(str(table_rows[7].find_all('td')[1])) results["作出处罚决定的日期"] = clean_text(str(table_rows[8].find_all('td')[1])) elif len(table_rows) == 10: results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1])) person_name = clean_text(str(table_rows[1].find_all('td')[3])) person_org = clean_text(str(table_rows[2].find_all('td')[1])) org_name = clean_text(str(table_rows[3].find_all('td')[2])) legal_rep_name = clean_text(str(table_rows[4].find_all('td')[1])) results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"个人单位": "{person_org}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"' results["主要违法违规事实"] = clean_text(str(table_rows[5].find_all('td')[1])) results["行政处罚依据"] = clean_text(str(table_rows[6].find_all('td')[1])) results["行政处罚决定"] = clean_text(str(table_rows[7].find_all('td')[1])) results["作出处罚决定的机关名称"] = clean_text(str(table_rows[8].find_all('td')[1])) results["作出处罚决定的日期"] = clean_text(str(table_rows[9].find_all('td')[1])) else: temp_dict = {} for row in table_rows: columns = row.find_all('td') if len(columns) >= 2: header = columns[0].get_text(strip=True) if "违法违规" in header: header = "主要违法违规事实" if "机关名称" in header: header = "作出处罚决定的机关名称" if "日期" in header: header = "作出处罚决定的日期" content_html = str(columns[1]) content = clean_text(content_html) temp_dict[header] = content results = temp_dict except Exception as e: print(f"Error processing table: {e}") return results def fetch_data(urls): options = webdriver.ChromeOptions() options.add_argument('--headless') # 使用无头模式 options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument( 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') options.add_experimental_option('excludeSwitches', ['enable-automation']) options.add_experimental_option('useAutomationExtension', False) driver = webdriver.Chrome(options=options) driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.navigator.chrome = { runtime: {} }; Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); ''' }) all_data = pd.DataFrame() error_urls = [] for url in urls: try: driver.get(url) print("Processing URL:", url) random_wait(1, 3) # 随机等待时间 html = driver.page_source soup = BeautifulSoup(html, 'html.parser') # 尝试不同的选择器 selectors = [ '.Section0 .MsoNormalTable, .Section0 .MsoTableGrid', '.Section1 .MsoNormalTable, .Section1 .MsoTableGrid', '.WordSection1 .MsoNormalTable, .WordSection1 .MsoTableGrid', '.Section0 table', # 直接查找Section0内的table '.Section1 table', # 直接查找Section1内的table '.WordSection1 table' # 直接查找WordSection1内的table ] table = None for selector in selectors: table = soup.select_one(selector) if table: break if table: table_rows = table.find_all('tr') results = process_table(table_rows) df = pd.DataFrame([results]) all_data = pd.concat([all_data, df], ignore_index=True) else: print(f"No table found for URL: {url}") error_urls.append(url) except Exception as e: print(f"Error processing URL {url}: {e}") error_urls.append(url) driver.quit() if error_urls: with open('error_urls.txt', 'w') as file: for error_url in error_urls: file.write(f"{error_url}\n") print(f"Error URLs have been saved to error_urls.txt") return all_data def random_wait(min_time=1, max_time=3): time.sleep(random.uniform(min_time, max_time)) def process_in_batches(urls, batch_size=100): total_urls = len(urls) num_batches = (total_urls // batch_size) + (1 if total_urls % batch_size != 0 else 0) for batch_num in range(num_batches): start_index = batch_num * batch_size end_index = start_index + batch_size batch_urls = urls[start_index:end_index] print(f"Processing batch {batch_num + 1} of {num_batches}") batch_data = fetch_data(batch_urls) try: existing_data = pd.read_excel('output_data2.xlsx', sheet_name='Sheet1') combined_data = pd.concat([existing_data, batch_data], ignore_index=True) except FileNotFoundError: combined_data = batch_data with pd.ExcelWriter('output_data2.xlsx', engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer: combined_data.to_excel(writer, index=False, sheet_name='Sheet1') # 读取URL列表 with open('url2.txt', 'r') as file: urls = [line.strip() for line in file if line.strip()] # 分批处理URL并写入Excel process_in_batches(urls, batch_size=50) print("Data has been appended to the existing Excel file.")