import pandas as pd from bs4 import BeautifulSoup from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains import time import random import os import glob # def clean_text(html_content): # soup = BeautifulSoup(html_content, 'html.parser') # paragraphs = soup.find_all('p') # lines = [] # for p in paragraphs: # line = ''.join([span.get_text(strip=True) for span in p.find_all('span', recursive=False)]) # lines.append(line) # return '\n'.join(lines).strip() def clean_text(html_content): import re soup = BeautifulSoup(html_content, 'html.parser') # 移除脚本和样式内容 for script_or_style in soup(["script", "style", "o:p"]): script_or_style.decompose() # 提取所有可见文本 text = soup.get_text(strip=True) # 清理和整理文本,去除所有空格 cleaned_text = re.sub(r'\s+', '', text) # 去除所有空格 return cleaned_text def process_table(table_rows,current_url, error_urls): results = { "行政处罚决定书文号": "", "被处罚当事人": "", "主要违法违规事实": "", "行政处罚依据": "", "行政处罚决定": "", "作出处罚决定的机关名称": "", "作出处罚决定的日期": "", "网址":current_url # 新增URL列 } try: if len(table_rows) == 9: results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1])) person_name = clean_text(str(table_rows[1].find_all('td')[2])) org_name = clean_text(str(table_rows[2].find_all('td')[2])) legal_rep_name = clean_text(str(table_rows[3].find_all('td')[1])) results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"' results["主要违法违规事实"] = clean_text(str(table_rows[4].find_all('td')[1])) results["行政处罚依据"] = clean_text(str(table_rows[5].find_all('td')[1])) results["行政处罚决定"] = clean_text(str(table_rows[6].find_all('td')[1])) results["作出处罚决定的机关名称"] = clean_text(str(table_rows[7].find_all('td')[1])) results["作出处罚决定的日期"] = clean_text(str(table_rows[8].find_all('td')[1])) elif len(table_rows) == 10: results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1])) person_name = clean_text(str(table_rows[1].find_all('td')[3])) person_org = clean_text(str(table_rows[2].find_all('td')[1])) org_name = clean_text(str(table_rows[3].find_all('td')[2])) legal_rep_name = clean_text(str(table_rows[4].find_all('td')[1])) results["被处罚当事人"] = f'"个人姓名": "{person_name}"\n"个人单位": "{person_org}"\n"单位名称": "{org_name}"\n"单位法定代表人(主要负责人)姓名": "{legal_rep_name}"' results["主要违法违规事实"] = clean_text(str(table_rows[5].find_all('td')[1])) results["行政处罚依据"] = clean_text(str(table_rows[6].find_all('td')[1])) results["行政处罚决定"] = clean_text(str(table_rows[7].find_all('td')[1])) results["作出处罚决定的机关名称"] = clean_text(str(table_rows[8].find_all('td')[1])) results["作出处罚决定的日期"] = clean_text(str(table_rows[9].find_all('td')[1])) elif len(table_rows) == 8: flag = 0 for row in table_rows: cells = row.find_all('td') if len(cells) == 3: flag = 1 results["行政处罚决定书文号"] = clean_text(str(table_rows[0].find_all('td')[1])) if(flag): org_name=clean_text(str(table_rows[1].find_all('td')[2])) name=clean_text(str(table_rows[2].find_all('td')[1])) results["被处罚当事人"] = f'"单位名称": "{org_name}"\n"主要负责人姓名": "{name}"' else: part1 = clean_text(str(table_rows[1].find_all('td')[0])) + ":" + clean_text( str(table_rows[1].find_all('td')[1])) part2 = clean_text(str(table_rows[2].find_all('td')[0])) + ":" + clean_text( str(table_rows[2].find_all('td')[1])) results["被处罚当事人"] = part1 + "\n" + part2 # 使用换行符分隔 results["主要违法违规事实"] = clean_text(str(table_rows[3].find_all('td')[1])) results["行政处罚依据"] = clean_text(str(table_rows[4].find_all('td')[1])) results["行政处罚决定"] = clean_text(str(table_rows[5].find_all('td')[1])) results["作出处罚决定的机关名称"] = clean_text(str(table_rows[6].find_all('td')[1])) results["作出处罚决定的日期"] = clean_text(str(table_rows[7].find_all('td')[1])) elif len(table_rows)==7: flag=0 for row in table_rows: cells = row.find_all('td') if len(cells) == 3: # 检查并拼接具有三个td的行的第二和第三个td内容 name = clean_text(str(cells[1])) + ":" + clean_text(str(cells[2])) flag=1 if(flag): results["被处罚当事人"] = name else: results["被处罚当事人"] = clean_text(str(table_rows[1].find_all_next('td')[1])) results["行政处罚决定书文号"]=clean_text(str(table_rows[0].find_all_next('td')[1])) results["主要违法违规事实"] = clean_text(str(table_rows[2].find_all_next('td')[1])) results["行政处罚依据"] = clean_text(str(table_rows[3].find_all_next('td')[1])) results["行政处罚决定"] = clean_text(str(table_rows[4].find_all_next('td')[1])) results["作出处罚决定的机关名称"] = clean_text(str(table_rows[5].find_all_next('td')[1])) results["作出处罚决定的日期"] = clean_text(str(table_rows[6].find_all_next('td')[1])) else: print(f"Unexpected number of rows in table at URL: {current_url}") error_urls.append(current_url) return None except Exception as e: print(f"Error processing table: {e}") error_urls.append(current_url) return None return results # def create_browser(): #适用docker环境中 # options = webdriver.ChromeOptions() # options.add_argument('--headless') # 使用无头模式 # options.add_argument('--disable-blink-features=AutomationControlled') # options.add_argument( # 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') # options.add_experimental_option('excludeSwitches', ['enable-automation']) # options.add_experimental_option('useAutomationExtension', False) # driver = webdriver.Remote( # command_executor='http://chrome:4444/wd/hub', # options=options # ) # return driver def fetch_data(urls): options = webdriver.ChromeOptions() #适用本地环境 options.add_argument('--headless') # 使用无头模式 options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument( 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') options.add_experimental_option('excludeSwitches', ['enable-automation']) options.add_experimental_option('useAutomationExtension', False) driver = webdriver.Chrome(options=options) driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.navigator.chrome = { runtime: {} }; Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); ''' }) # driver = create_browser() all_data = pd.DataFrame() error_urls = [] for url in urls: try: driver.get(url) print("Processing URL:", url) random_wait(1, 3) # 随机等待时间 html = driver.page_source soup = BeautifulSoup(html, 'html.parser') selectors = [ '.Section0 .MsoNormalTable, .Section0 .MsoTableGrid', '.Section1 .MsoNormalTable, .Section1 .MsoTableGrid', '.WordSection1 .MsoNormalTable, .WordSection1 .MsoTableGrid', '.Section0 table', '.Section1 table', '.WordSection1 table' ] table = None for selector in selectors: table = soup.select_one(selector) if table: break if table: table_rows = table.find_all('tr') results = process_table(table_rows,url,error_urls) df = pd.DataFrame([results]) all_data = pd.concat([all_data, df], ignore_index=True) else: print(f"No table found for URL: {url}") error_urls.append(url) except Exception as e: print(f"Error processing URL {url}: {e}") error_urls.append(url) driver.quit() if error_urls: with open('error_urls.txt', 'w') as file: for error_url in error_urls: file.write(f"{error_url}\n") print(f"Error URLs have been saved to error_urls.txt") return all_data def random_wait(min_time=1, max_time=3): time.sleep(random.uniform(min_time, max_time)) def create_empty_excel(filename): columns = ["行政处罚决定书文号", "被处罚当事人", "主要违法违规事实", "行政处罚依据", "行政处罚决定", "作出处罚决定的机关名称", "作出处罚决定的日期"] df = pd.DataFrame(columns=columns) df.to_excel(filename, index=False) def process_in_batches(url_files_pattern, output_file_prefix, batch_size=100, max_rows_per_file=10000): url_files = glob.glob(url_files_pattern) urls = [] for url_file in url_files: with open(url_file, 'r') as file: urls.extend([line.strip() for line in file if line.strip()]) total_urls = len(urls) num_batches = (total_urls // batch_size) + (1 if total_urls % batch_size != 0 else 0) file_index = 1 output_filename = f'{output_file_prefix}{file_index}.xlsx' rows_in_file = 0 if not os.path.exists(output_filename): create_empty_excel(output_filename) for batch_num in range(num_batches): start_index = batch_num * batch_size end_index = start_index + batch_size batch_urls = urls[start_index:end_index] print(f"Processing batch {batch_num + 1} of {num_batches}") batch_data = fetch_data(batch_urls) try: existing_data = pd.read_excel(output_filename, sheet_name='Sheet1') combined_data = pd.concat([existing_data, batch_data], ignore_index=True) except FileNotFoundError: combined_data = batch_data with pd.ExcelWriter(output_filename, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer: combined_data.to_excel(writer, index=False, sheet_name='Sheet1') rows_in_file += batch_data.shape[0] if rows_in_file >= max_rows_per_file: file_index += 1 output_filename = f'{output_file_prefix}{file_index}.xlsx' rows_in_file = 0 if not os.path.exists(output_filename): create_empty_excel(output_filename) # Example usage url_files_pattern = 'url*.txt' # 匹配所有以 'url' 开头的 txt 文件 output_file_prefix = 'output_data' process_in_batches(url_files_pattern, output_file_prefix, batch_size=100) #默认每处理100个url更新一次excel,每个excel存储数据大于1万条时新增一个文件保存。 print("Data has been appended to the existing Excel files.")