news 2026/4/16 15:31:42

Python爬虫实战:使用Playwright与异步技术高效采集行业数据报告

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:使用Playwright与异步技术高效采集行业数据报告

一、概述:行业数据报告采集的重要性与挑战

在当今数据驱动的商业环境中,行业数据报告对于市场分析、竞品研究、战略规划具有至关重要的作用。然而,这些宝贵的数据往往分散在各个网站、平台和PDF文档中,手动收集耗时耗力。本文将介绍如何使用最新的Python爬虫技术,构建一个高效、稳定的行业数据报告采集系统。

本文将重点介绍:

  1. 使用Playwright进行现代化网页数据采集

  2. 异步编程提升采集效率

  3. 智能解析PDF报告内容

  4. 数据清洗与存储方案

  5. 反爬虫策略与伦理考虑

二、技术栈选择:为什么选择这些最新技术?

2.1 Playwright vs Selenium vs Requests

Playwright是微软开发的现代化浏览器自动化工具,相比传统的Selenium和Requests,具有以下优势:

  • 支持所有现代浏览器(Chromium、Firefox、WebKit)

  • 自动等待元素加载,减少等待时间

  • 更强大的选择器和事件处理

  • 内置截图、录屏功能

  • 更好的TypeScript/Python支持

2.2 异步编程(asyncio)

使用异步IO可以同时处理多个网页请求,显著提升采集效率,特别是在需要处理大量页面时。

2.3 其他关键技术

  • Pandas:数据处理与分析

  • PyPDF2 / pdfplumber:PDF内容提取

  • MongoDB / PostgreSQL:数据存储

  • FastAPI:构建数据API服务

三、完整爬虫系统架构设计

python

""" 行业数据报告采集系统 - 完整实现 支持:网页爬取、PDF解析、数据清洗、持久化存储 """ import asyncio import re import json import pandas as pd from datetime import datetime from typing import List, Dict, Optional, Any from dataclasses import dataclass, asdict import logging from urllib.parse import urljoin, urlparse import hashlib # PDF处理相关 import pdfplumber from PyPDF2 import PdfReader # 数据库相关 from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 异步爬虫核心 from playwright.async_api import async_playwright, Browser, Page, Response import aiohttp from aiohttp import ClientSession, TCPConnector import aiofiles # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('industry_data_crawler.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据模型定义 Base = declarative_base() class IndustryReport(Base): """行业报告数据模型""" __tablename__ = 'industry_reports' id = Column(String(64), primary_key=True) title = Column(String(500), nullable=False) source_url = Column(String(1000), nullable=False) publish_date = Column(DateTime) industry_category = Column(String(200)) data_source = Column(String(200)) file_url = Column(String(1000)) file_type = Column(String(50)) # pdf, doc, html等 content_summary = Column(Text) full_content = Column(Text) keywords = Column(JSON) metadata = Column(JSON) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) def __repr__(self): return f"<IndustryReport(title='{self.title[:50]}...', source='{self.data_source}')>" @dataclass class ReportData: """报告数据结构类""" title: str source_url: str publish_date: Optional[datetime] industry_category: str data_source: str file_url: Optional[str] file_type: str content_summary: str full_content: str keywords: List[str] metadata: Dict[str, Any] def generate_id(self): """生成唯一ID""" content = f"{self.source_url}{self.title}{self.publish_date}" return hashlib.sha256(content.encode()).hexdigest() def to_dict(self): """转换为字典""" data = asdict(self) data['id'] = self.generate_id() data['publish_date'] = self.publish_date.isoformat() if self.publish_date else None return data class IndustryDataCrawler: """行业数据报告采集器主类""" def __init__(self, config_path: str = "config.json"): """ 初始化采集器 Args: config_path: 配置文件路径 """ self.config = self._load_config(config_path) self.db_engine = None self.db_session = None self._init_database() self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", ] def _load_config(self, config_path: str) -> Dict: """加载配置文件""" default_config = { "database": { "url": "sqlite:///industry_reports.db", "echo": False }, "crawler": { "max_concurrent": 5, "request_timeout": 30, "retry_times": 3, "delay_range": [1, 3] }, "target_sites": { "艾瑞咨询": "https://www.iresearch.com.cn", "易观分析": "https://www.analysys.cn", "艾媒网": "https://www.iimedia.cn", "199IT": "http://www.199it.com", "数据局": "https://www.shujuju.cn" }, "proxies": [], # 代理配置 "keywords": ["行业报告", "白皮书", "市场分析", "趋势报告"] } try: with open(config_path, 'r', encoding='utf-8') as f: user_config = json.load(f) # 合并配置 for key in default_config: if key in user_config: if isinstance(default_config[key], dict) and isinstance(user_config[key], dict): default_config[key].update(user_config[key]) else: default_config[key] = user_config[key] except FileNotFoundError: logger.warning(f"配置文件 {config_path} 不存在,使用默认配置") return default_config def _init_database(self): """初始化数据库连接""" db_url = self.config["database"]["url"] self.db_engine = create_engine( db_url, echo=self.config["database"]["echo"] ) Base.metadata.create_all(self.db_engine) Session = sessionmaker(bind=self.db_engine) self.db_session = Session() logger.info(f"数据库初始化完成: {db_url}") async def crawl_site(self, site_name: str, site_url: str): """ 爬取指定网站的报告 Args: site_name: 网站名称 site_url: 网站URL """ logger.info(f"开始爬取网站: {site_name} ({site_url})") async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) # 创建上下文 context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.user_agents[0] ) # 创建页面 page = await context.new_page() try: # 访问网站 await page.goto(site_url, timeout=60000) # 根据不同网站采取不同的爬取策略 if "iresearch" in site_url: reports = await self._crawl_iresearch(page, site_url) elif "analysys" in site_url: reports = await self._crawl_analysys(page, site_url) elif "iimedia" in site_url: reports = await self._crawl_iimedia(page, site_url) else: reports = await self._crawl_general_site(page, site_url) # 处理获取到的报告 for report_info in reports: try: report_data = await self._process_report(report_info) if report_data: self._save_to_database(report_data) logger.info(f"成功保存报告: {report_data.title}") except Exception as e: logger.error(f"处理报告失败: {report_info.get('url', '未知')}, 错误: {str(e)}") logger.info(f"网站 {site_name} 爬取完成,共获取 {len(reports)} 个报告") except Exception as e: logger.error(f"爬取网站 {site_name} 失败: {str(e)}") finally: await browser.close() async def _crawl_iresearch(self, page: Page, base_url: str) -> List[Dict]: """爬取艾瑞咨询网站""" reports = [] try: # 查找报告链接 report_links = await page.query_selector_all( "a[href*='report'], a:has-text('报告'), a:has-text('白皮书')" ) for link in report_links[:20]: # 限制数量 href = await link.get_attribute('href') title = await link.text_content() or await link.get_attribute('title') or "" if href and any(kw in title for kw in self.config["keywords"]): full_url = urljoin(base_url, href) reports.append({ 'url': full_url, 'title': title.strip(), 'source': '艾瑞咨询' }) # 尝试搜索报告列表页 await page.goto(f"{base_url}/report.shtml", timeout=30000) # 等待内容加载 await page.wait_for_selector(".report-list, .list-content", timeout=10000) # 提取更多报告 items = await page.query_selector_all(".report-item, .list-item") for item in items[:30]: try: link = await item.query_selector("a") if link: href = await link.get_attribute('href') title = await link.text_content() or "" if href and any(kw in title for kw in self.config["keywords"]): full_url = urljoin(base_url, href) # 提取日期 date_elem = await item.query_selector(".date, .time") date_str = await date_elem.text_content() if date_elem else "" reports.append({ 'url': full_url, 'title': title.strip(), 'date': self._parse_date(date_str.strip()), 'source': '艾瑞咨询' }) except Exception as e: logger.debug(f"提取报告项失败: {str(e)}") continue except Exception as e: logger.error(f"爬取艾瑞咨询失败: {str(e)}") return reports async def _crawl_analysys(self, page: Page, base_url: str) -> List[Dict]: """爬取易观分析网站""" reports = [] try: # 导航到报告页面 await page.goto(f"{base_url}/report", timeout=30000) await page.wait_for_load_state('networkidle') # 提取报告信息 report_elements = await page.query_selector_all(".report-card, .article-item") for elem in report_elements[:25]: try: # 提取标题和链接 title_elem = await elem.query_selector("h3, h4, .title") link_elem = await elem.query_selector("a") if not link_elem: continue href = await link_elem.get_attribute('href') title = await title_elem.text_content() if title_elem else await link_elem.text_content() if href and title: full_url = urljoin(base_url, href) # 提取日期和摘要 date_elem = await elem.query_selector(".date, .time, .publish-date") date_str = await date_elem.text_content() if date_elem else "" summary_elem = await elem.query_selector(".summary, .description, .abstract") summary = await summary_elem.text_content() if summary_elem else "" reports.append({ 'url': full_url, 'title': title.strip(), 'date': self._parse_date(date_str.strip()), 'summary': summary.strip(), 'source': '易观分析' }) except Exception as e: logger.debug(f"提取报告元素失败: {str(e)}") continue except Exception as e: logger.error(f"爬取易观分析失败: {str(e)}") return reports async def _crawl_iimedia(self, page: Page, base_url: str) -> List[Dict]: """爬取艾媒网""" reports = [] try: # 直接访问报告列表页 await page.goto(f"{base_url}/report", timeout=30000) # 使用更通用的选择器 await page.wait_for_selector("a[href*='report']:visible", timeout=10000) # 查找所有包含报告的链接 links = await page.query_selector_all( "a[href*='report']:visible, a:has-text('报告'):visible, a:has-text('Research'):visible" ) seen_urls = set() for link in links: try: href = await link.get_attribute('href') if not href or href in seen_urls: continue title = await link.text_content() or await link.get_attribute('title') or "" # 过滤非报告链接 if not any(kw in title for kw in self.config["keywords"]): continue full_url = urljoin(base_url, href) seen_urls.add(href) reports.append({ 'url': full_url, 'title': title.strip()[:200], 'source': '艾媒网' }) except Exception as e: continue except Exception as e: logger.error(f"爬取艾媒网失败: {str(e)}") return reports async def _crawl_general_site(self, page: Page, base_url: str) -> List[Dict]: """通用网站爬取方法""" reports = [] try: # 搜索报告相关页面 search_selectors = [ "a[href*='report']", "a[href*='white-paper']", "a[href*='research']", "a:has-text('报告')", "a:has-text('白皮书')", "a:has-text('研究')" ] for selector in search_selectors: try: links = await page.query_selector_all(selector) for link in links: try: href = await link.get_attribute('href') title = await link.text_content() or "" if href and title: full_url = urljoin(base_url, href) # 检查是否已存在 if not any(r['url'] == full_url for r in reports): reports.append({ 'url': full_url, 'title': title.strip()[:300], 'source': urlparse(base_url).netloc }) except: continue except: continue # 去重 unique_reports = [] seen_urls = set() for report in reports: if report['url'] not in seen_urls: seen_urls.add(report['url']) unique_reports.append(report) except Exception as e: logger.error(f"通用爬取失败: {str(e)}") return unique_reports[:50] # 限制数量 async def _process_report(self, report_info: Dict) -> Optional[ReportData]: """ 处理单个报告,提取详细信息 Args: report_info: 报告基本信息 Returns: ReportData对象或None """ try: url = report_info['url'] # 判断是否是PDF文件 if url.lower().endswith('.pdf'): return await self._process_pdf_report(url, report_info) else: return await self._process_html_report(url, report_info) except Exception as e: logger.error(f"处理报告失败 {report_info.get('url', '未知')}: {str(e)}") return None async def _process_pdf_report(self, pdf_url: str, report_info: Dict) -> Optional[ReportData]: """处理PDF报告""" try: logger.info(f"处理PDF报告: {pdf_url}") # 下载PDF文件 pdf_content = await self._download_file(pdf_url) if not pdf_content: return None # 解析PDF内容 pdf_info = await self._parse_pdf_content(pdf_content, pdf_url) # 构建报告数据 report_data = ReportData( title=report_info.get('title') or pdf_info.get('title', '未知标题'), source_url=pdf_url, publish_date=report_info.get('date') or pdf_info.get('date'), industry_category=self._categorize_industry(report_info.get('title', '')), data_source=report_info.get('source', '未知来源'), file_url=pdf_url, file_type='pdf', content_summary=pdf_info.get('summary', '')[:500], full_content=pdf_info.get('full_text', '')[:10000], # 限制长度 keywords=self._extract_keywords(pdf_info.get('full_text', '')), metadata={ 'page_count': pdf_info.get('page_count', 0), 'file_size': len(pdf_content), 'extracted_time': datetime.now().isoformat() } ) return report_data except Exception as e: logger.error(f"处理PDF报告失败 {pdf_url}: {str(e)}") return None async def _process_html_report(self, url: str, report_info: Dict) -> Optional[ReportData]: """处理HTML报告页面""" try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.user_agents[1] ) page = await context.new_page() await page.goto(url, timeout=60000) await page.wait_for_load_state('networkidle') # 提取页面内容 title = await self._extract_page_title(page) content = await self._extract_main_content(page) date = await self._extract_publish_date(page) # 查找PDF下载链接 pdf_links = await page.query_selector_all("a[href$='.pdf']") pdf_url = None if pdf_links: pdf_href = await pdf_links[0].get_attribute('href') pdf_url = urljoin(url, pdf_href) if pdf_href else None await browser.close() # 构建报告数据 report_data = ReportData( title=title or report_info.get('title', '未知标题'), source_url=url, publish_date=date or report_info.get('date'), industry_category=self._categorize_industry(title or ''), data_source=report_info.get('source', '未知来源'), file_url=pdf_url, file_type='html' if not pdf_url else 'pdf', content_summary=self._generate_summary(content)[:500], full_content=content[:15000], # 限制长度 keywords=self._extract_keywords(content), metadata={ 'has_pdf': bool(pdf_url), 'word_count': len(content), 'extracted_time': datetime.now().isoformat() } ) return report_data except Exception as e: logger.error(f"处理HTML报告失败 {url}: {str(e)}") return None async def _download_file(self, url: str) -> Optional[bytes]: """下载文件""" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=30) as response: if response.status == 200: return await response.read() else: logger.warning(f"下载文件失败 {url}: HTTP {response.status}") return None except Exception as e: logger.error(f"下载文件失败 {url}: {str(e)}") return None async def _parse_pdf_content(self, pdf_content: bytes, pdf_url: str) -> Dict: """解析PDF内容""" pdf_info = { 'title': '', 'date': None, 'summary': '', 'full_text': '', 'page_count': 0 } try: # 使用pdfplumber提取文本 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf: pdf_info['page_count'] = len(pdf.pages) # 提取所有页面文本 full_text = "" for i, page in enumerate(pdf.pages[:50]): # 限制前50页 text = page.extract_text() if text: full_text += text + "\n" pdf_info['full_text'] = full_text # 尝试从第一页提取标题 if pdf.pages: first_page_text = pdf.pages[0].extract_text() or "" lines = first_page_text.split('\n') if lines: pdf_info['title'] = lines[0][:200] # 生成摘要 if full_text: pdf_info['summary'] = self._generate_summary(full_text[:5000]) except Exception as e: logger.error(f"解析PDF失败 {pdf_url}: {str(e)}") # 回退到PyPDF2 try: reader = PdfReader(io.BytesIO(pdf_content)) pdf_info['page_count'] = len(reader.pages) full_text = "" for page in reader.pages[:10]: text = page.extract_text() if text: full_text += text pdf_info['full_text'] = full_text except Exception as e2: logger.error(f"PyPDF2解析也失败 {pdf_url}: {str(e2)}") return pdf_info async def _extract_page_title(self, page: Page) -> str: """提取页面标题""" try: # 尝试多种选择器 selectors = [ "h1", ".title", ".article-title", ".report-title", "title" ] for selector in selectors: try: element = await page.query_selector(selector) if element: title = await element.text_content() if title and len(title) > 10: return title.strip() except: continue # 回退到页面标题 return (await page.title())[:200] except: return "" async def _extract_main_content(self, page: Page) -> str: """提取主要内容""" try: # 尝试常见的内容选择器 content_selectors = [ ".article-content", ".report-content", ".content", ".main-content", "article", ".details", ".body" ] for selector in content_selectors: try: element = await page.query_selector(selector) if element: text = await element.text_content() if text and len(text) > 200: return self._clean_text(text) except: continue # 回退到body提取 body = await page.query_selector("body") if body: text = await body.text_content() return self._clean_text(text)[:10000] return "" except Exception as e: logger.error(f"提取内容失败: {str(e)}") return "" async def _extract_publish_date(self, page: Page) -> Optional[datetime]: """提取发布日期""" try: date_selectors = [ ".publish-date", ".date", ".time", "[itemprop='datePublished']", "meta[property='article:published_time']", "meta[name='publish_date']" ] for selector in date_selectors: try: if selector.startswith("meta"): element = await page.query_selector(selector) if element: date_str = await element.get_attribute('content') if date_str: return self._parse_date(date_str) else: element = await page.query_selector(selector) if element: date_text = await element.text_content() if date_text: return self._parse_date(date_text.strip()) except: continue return None except: return None def _parse_date(self, date_str: str) -> Optional[datetime]: """解析日期字符串""" if not date_str: return None # 常见日期格式 date_patterns = [ r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[日]?', r'(\d{4})\.(\d{1,2})\.(\d{1,2})', r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', ] for pattern in date_patterns: match = re.search(pattern, date_str) if match: try: groups = match.groups() if len(groups) == 3: # 统一转换为YYYY-MM-DD格式 if len(groups[0]) == 4: # YYYY-MM-DD year, month, day = groups else: # DD-MM-YYYY day, month, year = groups return datetime(int(year), int(month), int(day)) except: continue return None def _categorize_industry(self, text: str) -> str: """行业分类""" industry_keywords = { "互联网": ["互联网", "电商", "社交", "游戏", "在线教育", "短视频", "直播"], "金融": ["金融", "银行", "保险", "证券", "支付", "区块链", "数字货币"], "科技": ["科技", "人工智能", "AI", "大数据", "云计算", "5G", "物联网"], "消费": ["消费", "零售", "餐饮", "食品", "饮料", "美妆", "服装"], "医疗": ["医疗", "医药", "健康", "医院", "疫苗", "生物"], "汽车": ["汽车", "新能源", "自动驾驶", "电动车", "造车"], "房地产": ["房地产", "房产", "楼市", "房价", "住宅"], "教育": ["教育", "培训", "学校", "在线教育", "K12"], "旅游": ["旅游", "酒店", "航空", "景区", "出行"] } text_lower = text.lower() for industry, keywords in industry_keywords.items(): for keyword in keywords: if keyword.lower() in text_lower: return industry return "其他" def _extract_keywords(self, text: str, top_n: int = 10) -> List[str]: """提取关键词""" if not text: return [] # 简单的关键词提取(实际项目建议使用jieba等库) words = re.findall(r'[\u4e00-\u9fa5]{2,6}', text) # 过滤停用词 stop_words = {"的", "了", "在", "是", "和", "与", "及", "或", "等", "有", "这个", "这些"} filtered_words = [w for w in words if w not in stop_words] # 词频统计 from collections import Counter word_counts = Counter(filtered_words) return [word for word, count in word_counts.most_common(top_n)] def _generate_summary(self, text: str, max_length: int = 500) -> str: """生成摘要""" if not text: return "" # 简单的摘要生成(实际项目建议使用文本摘要算法) sentences = re.split(r'[。!?!?]', text) # 取前几个句子作为摘要 summary = "" for sentence in sentences: if len(summary) + len(sentence) < max_length: summary += sentence + "。" else: break return summary.strip() or text[:max_length] def _clean_text(self, text: str) -> str: """清洗文本""" if not text: return "" # 去除多余空白字符 text = re.sub(r'\s+', ' ', text) # 去除特殊字符 text = re.sub(r'[^\w\u4e00-\u9fa5\s.,!?;:,。!?;:、()()【】\[\]《》<>]', '', text) return text.strip() def _save_to_database(self, report_data: ReportData): """保存到数据库""" try: # 检查是否已存在 existing = self.db_session.query(IndustryReport).filter_by( id=report_data.generate_id() ).first() if existing: # 更新现有记录 existing.title = report_data.title existing.content_summary = report_data.content_summary existing.full_content = report_data.full_content existing.updated_at = datetime.now() logger.info(f"更新报告: {report_data.title}") else: # 创建新记录 report_dict = report_data.to_dict() db_report = IndustryReport(**report_dict) self.db_session.add(db_report) logger.info(f"新增报告: {report_data.title}") self.db_session.commit() except Exception as e: logger.error(f"保存到数据库失败: {str(e)}") self.db_session.rollback() async def run(self): """运行爬虫""" logger.info("开始行业数据报告采集任务") start_time = datetime.now() sites = self.config["target_sites"] # 创建异步任务 tasks = [] for site_name, site_url in sites.items(): task = asyncio.create_task(self.crawl_site(site_name, site_url)) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 total_reports = self.db_session.query(IndustryReport).count() end_time = datetime.now() duration = (end_time - start_time).total_seconds() logger.info(f"采集任务完成!") logger.info(f"总耗时: {duration:.2f}秒") logger.info(f"数据库中共有报告: {total_reports}份") # 生成统计报告 self._generate_statistics_report() def _generate_statistics_report(self): """生成统计报告""" try: # 查询统计数据 reports = self.db_session.query(IndustryReport).all() if not reports: logger.warning("没有找到报告数据") return # 创建DataFrame data = [] for report in reports: data.append({ '标题': report.title, '来源': report.data_source, '行业分类': report.industry_category, '发布日期': report.publish_date.strftime('%Y-%m-%d') if report.publish_date else '未知', '文件类型': report.file_type, '关键词': ', '.join(report.keywords) if report.keywords else '' }) df = pd.DataFrame(data) # 保存为Excel excel_path = f"industry_reports_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" df.to_excel(excel_path, index=False) # 生成统计信息 stats = { '报告总数': len(df), '来源分布': df['来源'].value_counts().to_dict(), '行业分布': df['行业分类'].value_counts().to_dict(), '文件类型分布': df['文件类型'].value_counts().to_dict() } # 保存统计信息 stats_path = f"crawler_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(stats_path, 'w', encoding='utf-8') as f: json.dump(stats, f, ensure_ascii=False, indent=2) logger.info(f"统计报告已保存: {excel_path}") logger.info(f"统计信息已保存: {stats_path}") # 打印摘要 print("\n" + "="*50) print("行业数据报告采集统计摘要") print("="*50) print(f"报告总数: {stats['报告总数']}") print("\n来源分布:") for source, count in stats['来源分布'].items(): print(f" {source}: {count}") print("\n行业分布:") for industry, count in stats['行业分布'].items(): print(f" {industry}: {count}") except Exception as e: logger.error(f"生成统计报告失败: {str(e)}") def export_to_csv(self, output_path: str = "industry_reports.csv"): """导出数据到CSV""" try: reports = self.db_session.query(IndustryReport).all() data = [] for report in reports: data.append({ 'id': report.id, 'title': report.title, 'source': report.data_source, 'url': report.source_url, 'publish_date': report.publish_date.isoformat() if report.publish_date else '', 'industry': report.industry_category, 'file_type': report.file_type, 'summary': report.content_summary, 'keywords': json.dumps(report.keywords, ensure_ascii=False) if report.keywords else '', 'created_at': report.created_at.isoformat() }) df = pd.DataFrame(data) df.to_csv(output_path, index=False, encoding='utf-8-sig') logger.info(f"数据已导出到: {output_path}") return df except Exception as e: logger.error(f"导出CSV失败: {str(e)}") return None # 使用示例 async def main(): """主函数""" # 创建爬虫实例 crawler = IndustryDataCrawler() # 运行爬虫 await crawler.run() # 导出数据 crawler.export_to_csv() # 关闭数据库连接 if crawler.db_session: crawler.db_session.close() print("\n采集任务已完成!") # 异步运行 if __name__ == "__main__": import io # 用于pdfplumber的BytesIO # 运行主函数 asyncio.run(main())
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:45:39

Python爬虫实战:基于异步技术与AI解析的智能视频链接抓取工具

摘要随着视频内容的爆炸式增长&#xff0c;如何高效地从各类网站抓取视频链接成为数据采集领域的重要课题。本文将深入探讨如何构建一个现代化的视频链接抓取工具&#xff0c;采用最新的异步编程技术、AI辅助解析和智能识别算法&#xff0c;实现高效、稳定的视频资源采集。一、…

作者头像 李华
网站建设 2026/4/16 12:21:51

java开发经典的猜数字游戏

一、游戏核心逻辑这款猜数字游戏的规则很简单&#xff1a;程序随机生成 1-100 之间的整数&#xff0c;玩家输入数字猜测&#xff0c;程序提示 “猜大了”“猜小了”&#xff0c;直到猜对为止&#xff0c;最后统计猜测次数。二、完整可运行代码java运行import java.util.Random;…

作者头像 李华
网站建设 2026/4/16 14:28:50

100条必背网络安全知识点,你都掌握了吗?

100条必背网络安全知识点&#xff0c;你都掌握了吗&#xff1f; 1988年&#xff0c;一款名为“莫里斯蠕虫”的程序悄然传播&#xff0c;它最初是康奈尔大学研究员的实验项目&#xff0c;目的是测量互联网规模。可谁也没想到&#xff0c;这个程序失控后感染了数千台电脑&#x…

作者头像 李华
网站建设 2026/4/16 3:26:21

【强烈建议收藏】CTF竞赛全方位解析:零基础学习网络安全的最佳实践

【强烈建议收藏】CTF竞赛全方位解析&#xff1a;零基础学习网络安全的最佳实践 CTF(Capture The Flag)是网络安全领域的技术竞技比赛&#xff0c;主要分为解题、攻防、混合和战争分享四种模式。题型涵盖Web、逆向、Pwn、密码学、隐写、杂项和编程等方向&#xff0c;全面考察参…

作者头像 李华
网站建设 2026/4/15 10:24:13

数据脱敏效果验证的核心测试维度

1. 完整性校验 测试用例设计&#xff1a;构造包含身份证号&#xff08;18位15位&#xff09;、手机号&#xff08;含国际区号&#xff09;、银行卡号&#xff08;不同发卡机构&#xff09;的复合数据集 验证指标&#xff1a;脱敏后字段长度一致性、特殊字符保留率&#xff08;…

作者头像 李华
网站建设 2026/4/16 9:25:29

实例控制台无法访问网页推理?五步定位Hunyuan-MT-7B部署故障

实例控制台无法访问网页推理&#xff1f;五步定位Hunyuan-MT-7B部署故障 在AI模型落地越来越依赖“开箱即用”体验的今天&#xff0c;一个看似简单的按钮——“网页推理”&#xff0c;却常常成为用户与强大能力之间的最后一道屏障。不少开发者反馈&#xff1a;明明已经成功部署…

作者头像 李华