from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup import time from datetime import datetime import random # Function to generate a random waiting time def random_wait(min_time=1, max_time=5): time.sleep(random.uniform(min_time, max_time)) # Function to create a new browser session with options to avoid detection def create_browser(): #适合本地环境 options = webdriver.ChromeOptions() options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--headless") # Enable headless mode options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") options.add_experimental_option('excludeSwitches', ['enable-automation']) options.add_experimental_option('useAutomationExtension', False) driver = webdriver.Chrome(options=options) driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.navigator.chrome = { runtime: {} }; Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); ''' }) return driver # def create_browser(): #适合在docker环境中 # options = webdriver.ChromeOptions() # options.add_argument("--disable-blink-features=AutomationControlled") # options.add_argument("--headless") # Enable headless mode # options.add_argument( # "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") # options.add_experimental_option('excludeSwitches', ['enable-automation']) # options.add_experimental_option('useAutomationExtension', False) # driver = webdriver.Remote( # command_executor='http://chrome:4444/wd/hub', # options=options # ) # return driver def fetch_unique_urls(base_page_url, base_url, date_limit, output_path_prefix): # Initialize WebDriver driver = create_browser() # Set to store unique URLs unique_urls = set() total_urls_saved = 0 # Total count of URLs saved urls_in_current_file = 0 # Count of URLs in the current file # Function to check date def is_date_valid(date_text): given_date = datetime.strptime(date_text, "%Y-%m-%d") return given_date >= date_limit # Function to save URLs to file def save_urls_to_file(urls, file_index): nonlocal total_urls_saved nonlocal urls_in_current_file with open(f"{output_path_prefix}{file_index}.txt", 'a') as file: for url in urls: file.write(url + '\n') total_urls_saved += 1 urls_in_current_file += 1 print(f"URLs have been saved to {output_path_prefix}{file_index}.txt") # Visit the initial page driver.get(base_page_url) cur_page = 0 file_index = 1 # Keep processing until a date before the given date_limit is found while True: cur_page += 1 print("Visiting new page:" + str(cur_page)) # Wait for JavaScript to load random_wait() # Get the page source after JS execution html = driver.page_source # Parse the HTML using BeautifulSoup soup = BeautifulSoup(html, 'html.parser') # Find all
elements that match class conditions div_elements = soup.find_all('div', class_="panel-row ng-scope") # Variable to determine if loop should continue should_continue = False # Iterate through the div elements to find links and dates for div in div_elements: date_span = div.find('span', class_='date ng-binding') if date_span: date_text = date_span.text.strip() if is_date_valid(date_text): should_continue = True link = div.find('a', href=True, attrs={'ng-bind-html': 'x.docSubtitle|trustHtml'}) if link and "处罚信息公开表" in link.text: href = link['href'] full_url = base_url + href if "//cn/view/pages/" not in full_url: unique_urls.add(full_url) else: # Since this date is invalid and dates are sorted in descending order, no need to continue should_continue = False break # Save URLs if they exceed 2000 and reset unique_urls if len(unique_urls) >= 2000: save_urls_to_file(unique_urls, file_index) unique_urls.clear() # If the current file exceeds 20000 URLs, start a new file if urls_in_current_file >= 20000: file_index += 1 urls_in_current_file = 0 # Check if loop should continue if not should_continue: break # Try to find and click the next page button try: next_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.XPATH, "//a[@ng-click='pager.next()']")) ) ActionChains(driver).move_to_element(next_button).click().perform() random_wait() # Wait for the next page to load except Exception as e: print("No more pages or error occurred:", e) break # Save remaining URLs if any if unique_urls: save_urls_to_file(unique_urls, file_index) # Close the browser driver.quit() print("Total URLs saved:", total_urls_saved) # Example usage #base_page_url是你要提取的网址的首页,脚本会自动进行翻页。 #每个txt保存20000条url,txt命名为url1 url2 url3 ... base_page_url = "https://www.cbirc.gov.cn/cn/view/pages/ItemList.html?itemPId=923&itemId=4115&itemUrl=ItemListRightList.html&itemName=%E7%9B%91%E7%AE%A1%E5%88%86%E5%B1%80%E6%9C%AC%E7%BA%A7&itemsubPId=931&itemsubPName=%E8%A1%8C%E6%94%BF%E5%A4%84%E7%BD%9A" base_url = 'https://www.cbirc.gov.cn/cn/view/pages/' date_limit = datetime(2024, 8, 20) output_path_prefix = 'url' fetch_unique_urls(base_page_url, base_url, date_limit, output_path_prefix)