引言:酒店数据采集的重要性与挑战
在当今数字化旅游时代,酒店数据已成为旅游行业决策、市场竞争分析和用户服务优化的关键资源。携程作为中国领先的在线旅游平台,汇集了海量的酒店信息,包括价格、设施、评价、地理位置等宝贵数据。然而,由于携程网站采用了先进的反爬虫技术和动态加载机制,传统爬虫方法已难以应对。
本文将介绍一套基于最新Python爬虫技术的解决方案,使用Playwright进行智能浏览器自动化,结合数据管道处理和反反爬虫策略,实现高效、稳定的携程酒店信息采集系统。
技术栈概览
Playwright: 微软推出的现代化浏览器自动化工具,支持无头浏览器操作
Asyncio: Python异步IO框架,实现高并发数据采集
Pydantic: 数据验证与序列化库,确保数据质量
Pandas & SQLAlchemy: 数据存储与处理
代理IP池与用户代理轮换: 规避反爬虫检测
智能等待策略: 模拟人类浏览行为
环境配置与安装
python
# requirements.txt playwright>=1.40.0 asyncio>=3.4.3 pydantic>=2.5.0 pandas>=2.0.0 sqlalchemy>=2.0.0 aiohttp>=3.9.0 beautifulsoup4>=4.12.0 nest-asyncio>=1.5.0 fake-useragent>=1.4.0 python-dotenv>=1.0.0 asyncio-throttle>=1.0.2
bash
# 安装依赖 pip install -r requirements.txt # 安装Playwright浏览器 playwright install chromium
核心爬虫架构设计
python
import asyncio import random import time from datetime import datetime from typing import List, Optional, Dict, Any from urllib.parse import urlencode, quote from dataclasses import dataclass from contextlib import asynccontextmanager import pandas as pd from pydantic import BaseModel, Field, validator from sqlalchemy import create_engine, Column, Integer, String, Float, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from fake_useragent import UserAgent from playwright.async_api import async_playwright, Page, Browser, BrowserContext import aiohttp from asyncio_throttle import Throttler from dotenv import load_dotenv import logging import nest_asyncio import json import hashlib # 应用nest_asyncio修复事件循环问题 nest_asyncio.apply() # 加载环境变量 load_dotenv() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('ctrip_hotel_crawler.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__)数据模型定义
python
class HotelInfo(BaseModel): """酒店信息数据模型""" hotel_id: str = Field(..., description="酒店唯一ID") hotel_name: str = Field(..., description="酒店名称") hotel_english_name: Optional[str] = Field(None, description="酒店英文名") city: str = Field(..., description="所在城市") district: Optional[str] = Field(None, description="行政区/区域") address: str = Field(..., description="详细地址") star_rating: Optional[float] = Field(None, description="星级评分") ctrip_rating: Optional[float] = Field(None, description="携程评分") review_count: Optional[int] = Field(None, description="评价数量") price_range: Optional[str] = Field(None, description="价格范围") min_price: Optional[float] = Field(None, description="最低价格") max_price: Optional[float] = Field(None, description="最高价格") facilities: List[str] = Field(default_factory=list, description="设施服务") hotel_type: Optional[str] = Field(None, description="酒店类型") brand: Optional[str] = Field(None, description="品牌") longitude: Optional[float] = Field(None, description="经度") latitude: Optional[float] = Field(None, description="纬度") check_in_time: Optional[str] = Field(None, description="入住时间") check_out_time: Optional[str] = Field(None, description="退房时间") phone: Optional[str] = Field(None, description="联系电话") description: Optional[str] = Field(None, description="酒店描述") images: List[str] = Field(default_factory=list, description="图片URL列表") collected_at: datetime = Field(default_factory=datetime.now, description="采集时间") class Config: arbitrary_types_allowed = True class HotelReview(BaseModel): """酒店评价数据模型""" review_id: str = Field(..., description="评价ID") hotel_id: str = Field(..., description="酒店ID") user_name: str = Field(..., description="用户名") user_level: Optional[str] = Field(None, description="用户等级") rating: float = Field(..., description="评分") review_title: Optional[str] = Field(None, description="评价标题") review_content: str = Field(..., description="评价内容") review_date: datetime = Field(..., description="评价日期") room_type: Optional[str] = Field(None, description="房型") travel_purpose: Optional[str] = Field(None, description="出行目的") helpful_count: int = Field(default=0, description="有用数") reply_content: Optional[str] = Field(None, description="商家回复") reply_date: Optional[datetime] = Field(None, description="回复时间") collected_at: datetime = Field(default_factory=datetime.now, description="采集时间")
数据库模型定义
python
Base = declarative_base() class HotelInfoDB(Base): """酒店信息数据库模型""" __tablename__ = 'ctrip_hotel_info' id = Column(Integer, primary_key=True, autoincrement=True) hotel_id = Column(String(100), unique=True, index=True, nullable=False) hotel_name = Column(String(200), nullable=False) hotel_english_name = Column(String(200)) city = Column(String(100), nullable=False) district = Column(String(100)) address = Column(String(500), nullable=False) star_rating = Column(Float) ctrip_rating = Column(Float) review_count = Column(Integer) price_range = Column(String(100)) min_price = Column(Float) max_price = Column(Float) facilities = Column(Text) # JSON字符串存储 hotel_type = Column(String(100)) brand = Column(String(100)) longitude = Column(Float) latitude = Column(Float) check_in_time = Column(String(50)) check_out_time = Column(String(50)) phone = Column(String(50)) description = Column(Text) images = Column(Text) # JSON字符串存储 collected_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) class HotelReviewDB(Base): """酒店评价数据库模型""" __tablename__ = 'ctrip_hotel_reviews' id = Column(Integer, primary_key=True, autoincrement=True) review_id = Column(String(100), unique=True, index=True, nullable=False) hotel_id = Column(String(100), index=True, nullable=False) user_name = Column(String(100), nullable=False) user_level = Column(String(50)) rating = Column(Float, nullable=False) review_title = Column(String(200)) review_content = Column(Text, nullable=False) review_date = Column(DateTime, nullable=False) room_type = Column(String(100)) travel_purpose = Column(String(100)) helpful_count = Column(Integer, default=0) reply_content = Column(Text) reply_date = Column(DateTime) collected_at = Column(DateTime, default=datetime.now)
代理和用户代理管理
python
class ProxyManager: """代理IP管理器""" def __init__(self, proxy_list=None): self.proxy_list = proxy_list or [] self.current_index = 0 self.failed_proxies = set() def get_proxy(self): """获取下一个可用代理""" if not self.proxy_list: return None # 移除失败的代理 self.proxy_list = [p for p in self.proxy_list if p not in self.failed_proxies] if not self.proxy_list: return None proxy = self.proxy_list[self.current_index % len(self.proxy_list)] self.current_index += 1 return proxy def mark_failed(self, proxy): """标记代理为失败""" self.failed_proxies.add(proxy) logger.warning(f"代理 {proxy} 标记为失败,当前失败代理数: {len(self.failed_proxies)}") def add_proxy(self, proxy): """添加新代理""" if proxy not in self.proxy_list: self.proxy_list.append(proxy) logger.info(f"添加新代理: {proxy}") class UserAgentManager: """用户代理管理器""" def __init__(self): self.ua = UserAgent() self.custom_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', ] def get_random_ua(self): """获取随机用户代理""" if random.random() > 0.5: return self.ua.random else: return random.choice(self.custom_agents)核心爬虫类实现
python
class CtripHotelCrawler: """携程酒店爬虫主类""" def __init__( self, headless: bool = True, proxy_manager: Optional[ProxyManager] = None, max_concurrency: int = 3, request_delay: float = 1.0 ): self.headless = headless self.proxy_manager = proxy_manager or ProxyManager() self.ua_manager = UserAgentManager() self.max_concurrency = max_concurrency self.request_delay = request_delay self.throttler = Throttler(rate_limit=max_concurrency, period=1.0) self.playwright = None self.browser = None # 数据库连接 self.engine = create_engine('sqlite:///ctrip_hotels.db', echo=False) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) # 统计信息 self.stats = { 'hotels_collected': 0, 'reviews_collected': 0, 'pages_crawled': 0, 'errors': 0 } @asynccontextmanager async def create_browser_context(self): """创建浏览器上下文""" proxy = self.proxy_manager.get_proxy() user_agent = self.ua_manager.get_random_ua() browser_args = [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox', '--disable-accelerated-2d-canvas', '--disable-gpu', f'--user-agent={user_agent}' ] if proxy: browser_args.append(f'--proxy-server={proxy}') context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=user_agent, ignore_https_errors=True, java_script_enabled=True, bypass_csp=True ) # 添加stealth脚本避免检测 await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en'] }); window.chrome = { runtime: {} }; """) try: yield context finally: await context.close() async def init_browser(self): """初始化浏览器""" self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=self.headless, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-dev-shm-usage' ] ) logger.info("浏览器初始化完成") async def close(self): """关闭爬虫""" if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() logger.info("爬虫已关闭") async def random_delay(self, min_delay=0.5, max_delay=2.0): """随机延迟""" delay = random.uniform(min_delay, max_delay) await asyncio.sleep(delay) async def search_hotels_by_city( self, city: str, checkin_date: Optional[str] = None, checkout_date: Optional[str] = None, max_pages: int = 10 ) -> List[Dict[str, Any]]: """ 根据城市搜索酒店 """ if not checkin_date: checkin_date = datetime.now().strftime('%Y-%m-%d') if not checkout_date: checkout_date = (datetime.now().replace(day=datetime.now().day + 1)).strftime('%Y-%m-%d') base_url = "https://hotels.ctrip.com/hotels/list" params = { 'city': city, 'checkin': checkin_date, 'checkout': checkout_date, 'optionId': city, 'optionType': 'City', 'directSearch': '0' } search_url = f"{base_url}?{urlencode(params)}" logger.info(f"开始搜索城市: {city}, URL: {search_url}") hotels = [] page_num = 1 async with (await self.create_browser_context()) as context: page = await context.new_page() try: await page.goto(search_url, wait_until='networkidle', timeout=60000) await self.random_delay(2, 4) # 等待酒店列表加载 await page.wait_for_selector('.hotel_item', timeout=30000) while page_num <= max_pages: logger.info(f"正在爬取第 {page_num} 页") # 解析当前页酒店 page_hotels = await self.parse_hotel_list_page(page) hotels.extend(page_hotels) self.stats['hotels_collected'] += len(page_hotels) self.stats['pages_crawled'] += 1 # 检查是否有下一页 next_button = await page.query_selector('a.next') if not next_button or page_num >= max_pages: break # 点击下一页 await next_button.click() await page.wait_for_load_state('networkidle') await self.random_delay(3, 5) page_num += 1 except Exception as e: logger.error(f"搜索酒店时出错: {str(e)}") self.stats['errors'] += 1 finally: await page.close() logger.info(f"城市 {city} 搜索完成,找到 {len(hotels)} 家酒店") return hotels async def parse_hotel_list_page(self, page: Page) -> List[Dict[str, Any]]: """解析酒店列表页""" hotels = [] try: hotel_elements = await page.query_selector_all('.hotel_item, .list_item') for element in hotel_elements: try: hotel_info = await self.extract_hotel_basic_info(element) if hotel_info: hotels.append(hotel_info) # 获取详细信息的URL detail_link = await element.query_selector('a[href*="/hotel/"]') if detail_link: detail_url = await detail_link.get_attribute('href') if detail_url and not detail_url.startswith('http'): detail_url = f"https://hotels.ctrip.com{detail_url}" # 获取详细信息 detailed_info = await self.get_hotel_detail(detail_url) if detailed_info: hotel_info.update(detailed_info) except Exception as e: logger.warning(f"解析酒店元素时出错: {str(e)}") continue except Exception as e: logger.error(f"解析酒店列表页时出错: {str(e)}") return hotels async def extract_hotel_basic_info(self, element) -> Optional[Dict[str, Any]]: """提取酒店基本信息""" try: # 酒店名称 name_elem = await element.query_selector('.hotel_name a, .name a') hotel_name = await name_elem.text_content() if name_elem else None # 酒店ID(从链接中提取) link_elem = await element.query_selector('a[href*="/hotel/"]') href = await link_elem.get_attribute('href') if link_elem else '' hotel_id = None if href: import re match = re.search(r'/hotel/(\d+)', href) if match: hotel_id = match.group(1) # 价格 price_elem = await element.query_selector('.price, .base_price') price_text = await price_elem.text_content() if price_elem else '' price = None if price_text: import re match = re.search(r'(\d+)', price_text.replace(',', '')) if match: price = float(match.group(1)) # 评分 score_elem = await element.query_selector('.score, .comment') score_text = await score_elem.text_content() if score_elem else '' score = None if score_text: import re match = re.search(r'(\d+\.?\d*)', score_text) if match: score = float(match.group(1)) # 位置 location_elem = await element.query_selector('.location, .area') location = await location_elem.text_content() if location_elem else '' info = { 'hotel_id': hotel_id, 'hotel_name': hotel_name.strip() if hotel_name else '', 'min_price': price, 'ctrip_rating': score, 'district': location.strip() if location else '', 'source_url': href } return {k: v for k, v in info.items() if v is not None} except Exception as e: logger.warning(f"提取酒店基本信息时出错: {str(e)}") return None async def get_hotel_detail(self, detail_url: str) -> Optional[Dict[str, Any]]: """获取酒店详细信息""" if not detail_url: return None async with (await self.create_browser_context()) as context: page = await context.new_page() try: await page.goto(detail_url, wait_until='networkidle', timeout=60000) await self.random_delay(2, 4) detail_info = {} # 等待详情页加载 try: await page.wait_for_selector('.hotel-intro', timeout=10000) except: pass # 提取详细信息 # 1. 地址 address_elem = await page.query_selector('.address, .detail-address') if address_elem: detail_info['address'] = (await address_elem.text_content()).strip() # 2. 设施 facilities = [] facility_elems = await page.query_selector_all('.facility-item, .service-item') for elem in facility_elems: facility_text = await elem.text_content() if facility_text: facilities.append(facility_text.strip()) if facilities: detail_info['facilities'] = facilities # 3. 酒店描述 desc_elem = await page.query_selector('.description, .hotel-desc') if desc_elem: detail_info['description'] = (await desc_elem.text_content()).strip() # 4. 联系电话 phone_elem = await page.query_selector('.phone, .tel') if phone_elem: detail_info['phone'] = (await phone_elem.text_content()).strip() # 5. 经纬度(从地图中提取) map_script = await page.query_selector('script[type="text/javascript"]:contains("longitude")') if map_script: script_text = await map_script.text_content() import re lon_match = re.search(r'longitude["\']?\s*:\s*["\']?([-\d\.]+)', script_text) lat_match = re.search(r'latitude["\']?\s*:\s*["\']?([-\d\.]+)', script_text) if lon_match and lat_match: detail_info['longitude'] = float(lon_match.group(1)) detail_info['latitude'] = float(lat_match.group(1)) # 6. 图片 images = [] img_elems = await page.query_selector_all('.hotel-img img, .slide-img') for elem in img_elems[:10]: # 限制前10张图片 img_url = await elem.get_attribute('src') if img_url and img_url.startswith('http'): images.append(img_url) if images: detail_info['images'] = images return detail_info except Exception as e: logger.error(f"获取酒店详情时出错 {detail_url}: {str(e)}") return None finally: await page.close() async def get_hotel_reviews( self, hotel_id: str, max_reviews: int = 100 ) -> List[Dict[str, Any]]: """获取酒店评价""" review_url = f"https://hotels.ctrip.com/hotel/dianping/{hotel_id}.html" reviews = [] page_num = 1 async with (await self.create_browser_context()) as context: page = await context.new_page() try: await page.goto(review_url, wait_until='networkidle', timeout=60000) await self.random_delay(2, 4) while len(reviews) < max_reviews: logger.info(f"正在爬取酒店 {hotel_id} 的第 {page_num} 页评价") # 解析评价 page_reviews = await self.parse_review_page(page, hotel_id) reviews.extend(page_reviews) self.stats['reviews_collected'] += len(page_reviews) # 检查是否还有更多评价 next_button = await page.query_selector('.next_page, a.next') if not next_button or len(reviews) >= max_reviews: break # 点击下一页 await next_button.click() await page.wait_for_load_state('networkidle') await self.random_delay(3, 5) page_num += 1 except Exception as e: logger.error(f"获取酒店评价时出错 {hotel_id}: {str(e)}") finally: await page.close() logger.info(f"酒店 {hotel_id} 评价爬取完成,共 {len(reviews)} 条评价") return reviews async def parse_review_page(self, page: Page, hotel_id: str) -> List[Dict[str, Any]]: """解析评价页""" reviews = [] try: review_elements = await page.query_selector_all('.comment_item, .review-item') for element in review_elements: try: review_info = await self.extract_review_info(element, hotel_id) if review_info: reviews.append(review_info) except Exception as e: logger.warning(f"解析评价元素时出错: {str(e)}") continue except Exception as e: logger.error(f"解析评价页时出错: {str(e)}") return reviews async def extract_review_info(self, element, hotel_id: str) -> Optional[Dict[str, Any]]: """提取评价信息""" try: # 评价ID(基于内容生成) content_elem = await element.query_selector('.content, .review-text') content = await content_elem.text_content() if content_elem else '' review_id = hashlib.md5(f"{hotel_id}_{content[:50]}".encode()).hexdigest() # 用户名 user_elem = await element.query_selector('.user, .username') user_name = await user_elem.text_content() if user_elem else '匿名用户' # 评分 rating_elem = await element.query_selector('.score, .rating') rating_text = await rating_elem.text_content() if rating_elem else '' rating = None if rating_text: import re match = re.search(r'(\d+\.?\d*)', rating_text) if match: rating = float(match.group(1)) # 评价日期 date_elem = await element.query_selector('.date, .time') date_text = await date_elem.text_content() if date_elem else '' # 尝试解析日期 review_date = datetime.now() if date_text: try: # 尝试多种日期格式 date_text = date_text.replace('年', '-').replace('月', '-').replace('日', '') review_date = datetime.strptime(date_text.strip(), '%Y-%m-%d') except: pass # 房型 room_elem = await element.query_selector('.room, .room-type') room_type = await room_elem.text_content() if room_elem else '' review_info = { 'review_id': review_id, 'hotel_id': hotel_id, 'user_name': user_name.strip(), 'rating': rating, 'review_content': content.strip() if content else '', 'review_date': review_date, 'room_type': room_type.strip() if room_type else '' } return {k: v for k, v in review_info.items() if v is not None} except Exception as e: logger.warning(f"提取评价信息时出错: {str(e)}") return None async def save_to_database(self, hotels_data: List[Dict], reviews_data: List[Dict]): """保存数据到数据库""" session = self.Session() try: # 保存酒店信息 for hotel_data in hotels_data: hotel_id = hotel_data.get('hotel_id') if not hotel_id: continue # 检查是否已存在 existing = session.query(HotelInfoDB).filter_by(hotel_id=hotel_id).first() # 处理设施和图片字段 facilities = hotel_data.get('facilities', []) images = hotel_data.get('images', []) hotel_data['facilities'] = json.dumps(facilities, ensure_ascii=False) if facilities else None hotel_data['images'] = json.dumps(images, ensure_ascii=False) if images else None if existing: # 更新现有记录 for key, value in hotel_data.items(): if hasattr(existing, key) and value is not None: setattr(existing, key, value) existing.updated_at = datetime.now() else: # 创建新记录 hotel_db = HotelInfoDB(**hotel_data) session.add(hotel_db) # 保存评价信息 for review_data in reviews_data: review_id = review_data.get('review_id') if not review_id: continue # 检查是否已存在 existing_review = session.query(HotelReviewDB).filter_by(review_id=review_id).first() if not existing_review: review_db = HotelReviewDB(**review_data) session.add(review_db) session.commit() logger.info(f"数据保存成功: {len(hotels_data)} 家酒店, {len(reviews_data)} 条评价") except Exception as e: session.rollback() logger.error(f"保存数据到数据库时出错: {str(e)}") finally: session.close() async def export_to_csv(self, hotels_data: List[Dict], reviews_data: List[Dict]): """导出数据到CSV文件""" try: # 导出酒店数据 if hotels_data: hotels_df = pd.DataFrame(hotels_data) hotels_df.to_csv(f'ctrip_hotels_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv', index=False, encoding='utf-8-sig') logger.info(f"酒店数据已导出到CSV,共 {len(hotels_data)} 条记录") # 导出评价数据 if reviews_data: reviews_df = pd.DataFrame(reviews_data) reviews_df.to_csv(f'ctrip_reviews_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv', index=False, encoding='utf-8-sig') logger.info(f"评价数据已导出到CSV,共 {len(reviews_data)} 条记录") except Exception as e: logger.error(f"导出数据到CSV时出错: {str(e)}") def print_stats(self): """打印统计信息""" logger.info("=" * 50) logger.info("爬虫统计信息:") logger.info(f"采集酒店数量: {self.stats['hotels_collected']}") logger.info(f"采集评价数量: {self.stats['reviews_collected']}") logger.info(f"爬取页面数量: {self.stats['pages_crawled']}") logger.info(f"错误次数: {self.stats['errors']}") logger.info("=" * 50)主程序入口
python
async def main(): """主函数""" # 配置代理(可选) proxy_list = [ # 添加你的代理IP列表 # 'http://proxy1:port', # 'http://proxy2:port', ] # 初始化爬虫 crawler = CtripHotelCrawler( headless=True, # 设置为False可以显示浏览器 proxy_manager=ProxyManager(proxy_list), max_concurrency=2, request_delay=2.0 ) try: # 初始化浏览器 await crawler.init_browser() # 要搜索的城市列表 cities = ['beijing', 'shanghai', 'guangzhou', 'shenzhen'] all_hotels = [] all_reviews = [] # 遍历城市搜索酒店 for city in cities: logger.info(f"开始采集 {city} 的酒店数据") # 搜索酒店 hotels = await crawler.search_hotels_by_city( city=city, max_pages=3 # 每个城市爬取3页 ) all_hotels.extend(hotels) # 获取部分酒店的评价 for i, hotel in enumerate(hotels[:5]): # 每个城市前5家酒店 hotel_id = hotel.get('hotel_id') if hotel_id: reviews = await crawler.get_hotel_reviews( hotel_id=hotel_id, max_reviews=20 # 每个酒店最多20条评价 ) all_reviews.extend(reviews) # 防止请求过于频繁 await asyncio.sleep(3) # 城市间延迟 await asyncio.sleep(5) # 保存数据 await crawler.save_to_database(all_hotels, all_reviews) # 导出CSV await crawler.export_to_csv(all_hotels, all_reviews) # 打印统计 crawler.print_stats() except KeyboardInterrupt: logger.info("用户中断爬虫") except Exception as e: logger.error(f"爬虫运行出错: {str(e)}") finally: # 关闭爬虫 await crawler.close() if __name__ == "__main__": # 运行主程序 asyncio.run(main())高级功能扩展
python
class AdvancedCtripCrawler(CtripHotelCrawler): """高级携程爬虫,添加更多功能""" async def search_by_coordinates(self, lat: float, lon: float, radius_km: int = 5): """根据坐标搜索酒店""" pass async def get_real_time_prices(self, hotel_id: str, date_range: List[str]): """获取实时价格数据""" pass async def analyze_competition(self, hotel_ids: List[str]): """竞争分析:比较多家酒店的数据""" pass async def monitor_price_changes(self, hotel_ids: List[str], interval_hours: int = 6): """价格监控:定时检查价格变化""" pass class DataAnalyzer: """数据分析器""" @staticmethod def analyze_price_distribution(hotels_data): """分析价格分布""" pass @staticmethod def analyze_review_sentiment(reviews_data): """分析评价情感""" pass @staticmethod def generate_hotel_report(hotel_id, hotels_data, reviews_data): """生成酒店报告""" pass class DistributedCrawler: """分布式爬虫管理器""" def __init__(self, num_workers: int = 4): self.num_workers = num_workers self.task_queue = asyncio.Queue() async def distribute_tasks(self, tasks: List): """分发任务到多个worker""" pass