一、项目背景与意义
基金净值历史数据是金融分析、投资决策和量化研究的重要基础。无论是进行基金业绩评估、风险分析,还是构建投资组合,获取准确、完整的净值历史数据都至关重要。然而,许多金融数据平台对数据访问设置了各种限制,传统的数据采集方法往往效率低下且不稳定。本文将介绍如何运用Python最新的异步爬虫技术,高效、稳定地采集基金净值历史数据。
二、技术选型与工具介绍
2.1 核心技术栈
aiohttp/asyncio: Python异步HTTP客户端库,实现高并发数据采集
Playwright: 微软推出的现代浏览器自动化工具,支持异步操作
Pandas: 数据处理与分析库,用于数据清洗和存储
SQLAlchemy: 数据库ORM框架,支持多种数据库
BeautifulSoup4: HTML解析库
Redis: 分布式缓存,用于请求去重和状态管理
2.2 环境准备
python
# requirements.txt aiohttp==3.8.5 asyncio==3.4.3 playwright==1.40.0 pandas==2.1.4 sqlalchemy==2.0.23 beautifulsoup4==4.12.2 redis==5.0.1 aioredis==2.0.1 lxml==4.9.3 httpx==0.25.2 pydantic==2.5.0
三、完整爬虫系统架构设计
3.1 系统架构图
text
数据采集系统架构: 用户接口层 → 调度层 → 采集层 → 解析层 → 存储层 → 监控层
3.2 模块化设计
python
# 项目结构 fund_crawler/ ├── core/ │ ├── __init__.py │ ├── async_spider.py # 异步爬虫核心 │ ├── playwright_handler.py # 浏览器自动化处理 │ └── proxy_rotator.py # 代理IP管理 ├── parsers/ │ ├── fund_parser.py # 基金数据解析器 │ └── data_cleaner.py # 数据清洗器 ├── storage/ │ ├── database.py # 数据库操作 │ └── file_storage.py # 文件存储 ├── utils/ │ ├── logger.py # 日志配置 │ ├── rate_limiter.py # 速率限制 │ └── exception_handler.py # 异常处理 ├── config/ │ └── settings.py # 配置文件 └── main.py # 主程序
四、完整代码实现
4.1 配置模块
python
# config/settings.py import os from dataclasses import dataclass from typing import List, Optional from pydantic import BaseSettings class Settings(BaseSettings): # 爬虫配置 USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" REQUEST_TIMEOUT: int = 30 MAX_CONCURRENT_REQUESTS: int = 100 RETRY_ATTEMPTS: int = 3 # 目标网站配置 FUND_DATA_BASE_URL: str = "https://fund.eastmoney.com" FUND_DETAIL_URL: str = "https://fund.eastmoney.com/f10/jjjz_{code}.html" FUND_HISTORY_API: str = "https://api.fund.eastmoney.com/f10/lsjz" # 数据库配置 DATABASE_URL: str = "postgresql://user:password@localhost:5432/fund_data" REDIS_URL: str = "redis://localhost:6379/0" # 存储路径 DATA_DIR: str = "./data/fund_history" LOG_DIR: str = "./logs" # 代理配置 USE_PROXY: bool = False PROXY_POOL_URL: Optional[str] = None class Config: env_file = ".env" settings = Settings()4.2 异步爬虫核心类
python
# core/async_spider.py import asyncio import aiohttp import httpx from typing import Dict, List, Optional, Any from dataclasses import dataclass import logging from datetime import datetime import json from urllib.parse import urlencode from config.settings import settings from utils.logger import setup_logger from utils.rate_limiter import RateLimiter from core.proxy_rotator import ProxyRotator logger = setup_logger(__name__) @dataclass class RequestConfig: """请求配置类""" method: str = "GET" headers: Optional[Dict] = None params: Optional[Dict] = None data: Optional[Dict] = None json: Optional[Dict] = None timeout: int = settings.REQUEST_TIMEOUT retry_count: int = settings.RETRY_ATTEMPTS class AsyncFundSpider: """异步基金数据爬虫""" def __init__(self): self.session: Optional[aiohttp.ClientSession] = None self.rate_limiter = RateLimiter(max_requests=50, time_window=60) self.proxy_rotator = ProxyRotator() if settings.USE_PROXY else None self.cookies = {} async def __aenter__(self): await self.init_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_session(self): """初始化异步会话""" timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT) connector = aiohttp.TCPConnector( limit=settings.MAX_CONCURRENT_REQUESTS, force_close=True, enable_cleanup_closed=True ) headers = { "User-Agent": settings.USER_AGENT, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Referer": "https://fund.eastmoney.com/" } self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers=headers ) # 初始化cookies await self._init_cookies() async def _init_cookies(self): """初始化必要的cookies""" try: async with self.session.get(settings.FUND_DATA_BASE_URL) as response: if response.status == 200: self.cookies = {c.key: c.value for c in response.cookies.values()} except Exception as e: logger.warning(f"初始化cookies失败: {e}") async def fetch_fund_list(self, page: int = 1, page_size: int = 100) -> List[Dict]: """获取基金列表""" url = f"{settings.FUND_DATA_BASE_URL}/DataFundList.aspx" params = { "op": "ph", "dt": "kf", "ft": "all", "rs": "", "gs": "0", "sc": "zzf", "st": "desc", "sd": datetime.now().strftime("%Y-%m-%d"), "ed": datetime.now().strftime("%Y-%m-%d"), "qdii": "", "tabSubtype": ",,,,,", "pi": page, "pn": page_size, "dx": "1" } response = await self.request( url=url, config=RequestConfig(params=params) ) if response: return self._parse_fund_list(response) return [] async def fetch_fund_history(self, fund_code: str, start_date: str = "2000-01-01", end_date: str = None) -> List[Dict]: """获取基金历史净值数据""" if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") # 构造API请求参数 params = { "fundCode": fund_code, "pageIndex": 1, "pageSize": 10000, # 一次性获取所有数据 "startDate": start_date, "endDate": end_date, "_": int(datetime.now().timestamp() * 1000) } headers = { "Referer": f"https://fundf10.eastmoney.com/jjjz_{fund_code}.html", "Host": "api.fund.eastmoney.com" } response = await self.request( url=settings.FUND_HISTORY_API, config=RequestConfig( params=params, headers=headers ) ) if response and "Data" in response and "LSJZList" in response["Data"]: return response["Data"]["LSJZList"] return [] async def request(self, url: str, config: RequestConfig) -> Optional[Any]: """通用异步请求方法""" await self.rate_limiter.acquire() for attempt in range(config.retry_count): try: proxy = None if self.proxy_rotator: proxy = await self.proxy_rotator.get_proxy() async with self.session.request( method=config.method, url=url, headers=config.headers, params=config.params, data=config.data, json=config.json, proxy=proxy, cookies=self.cookies ) as response: if response.status == 200: content_type = response.headers.get("Content-Type", "") if "application/json" in content_type: data = await response.json() elif "text/html" in content_type: text = await response.text() data = {"html": text} else: data = await response.read() logger.info(f"请求成功: {url}") return data elif response.status in [403, 429]: logger.warning(f"请求被限制: {response.status}") await asyncio.sleep(2 ** attempt) continue else: logger.error(f"请求失败: {response.status}") except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.error(f"请求异常 (尝试 {attempt + 1}/{config.retry_count}): {e}") if attempt < config.retry_count - 1: await asyncio.sleep(2 ** attempt) else: raise return None def _parse_fund_list(self, response: Dict) -> List[Dict]: """解析基金列表数据""" funds = [] try: if "datas" in response: for fund_data in response["datas"]: fund_info = { "fund_code": fund_data[0], "fund_name": fund_data[1], "fund_type": fund_data[3], "nav": float(fund_data[4]) if fund_data[4] else None, "acc_nav": float(fund_data[5]) if fund_data[5] else None, "daily_return": float(fund_data[6].rstrip("%")) if fund_data[6] else None, "update_date": fund_data[9] } funds.append(fund_info) except Exception as e: logger.error(f"解析基金列表失败: {e}") return funds async def close(self): """关闭会话""" if self.session and not self.session.closed: await self.session.close()4.3 Playwright处理复杂页面
python
# core/playwright_handler.py from playwright.async_api import async_playwright from typing import Optional, Dict, Any import asyncio import logging logger = logging.getLogger(__name__) class PlaywrightHandler: """处理需要JavaScript渲染的复杂页面""" def __init__(self, headless: bool = True): self.headless = headless self.browser = None self.context = None async def __aenter__(self): await self.init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_browser(self): """初始化浏览器""" playwright = await async_playwright().start() self.browser = await playwright.chromium.launch( headless=self.headless, args=[ "--disable-blink-features=AutomationControlled", "--disable-dev-shm-usage", "--no-sandbox" ] ) # 创建上下文 self.context = await self.browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ) # 添加stealth插件避免被检测 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); """) async def scrape_fund_detail(self, fund_code: str) -> Dict[str, Any]: """获取基金详细信息""" page = None try: page = await self.context.new_page() # 导航到基金详情页 url = f"https://fund.eastmoney.com/{fund_code}.html" await page.goto(url, wait_until="networkidle") # 等待必要元素加载 await page.wait_for_selector(".dataOfFund", timeout=10000) # 提取基本信息 fund_info = { "fund_code": fund_code, "fund_name": await self._extract_text(page, ".fundDetail-tit"), "nav": await self._extract_number(page, ".dataItem1 .dataNums span"), "daily_change": await self._extract_text(page, ".dataItem1 .dataNums .ui-font-middle"), "establishment_date": await self._extract_text(page, ".infoOfFund table tr:nth-child(1) td:nth-child(2)"), "fund_size": await self._extract_text(page, ".infoOfFund table tr:nth-child(1) td:nth-child(3)"), "fund_manager": await self._extract_text(page, ".infoOfFund table tr:nth-child(2) td:nth-child(1)"), "fund_company": await self._extract_text(page, ".infoOfFund table tr:nth-child(2) td:nth-child(3)") } # 提取历史净值表格 await page.click("#historyNav") await page.wait_for_selector(".gmHistory", timeout=5000) # 获取表格数据 table_data = await page.eval_on_selector_all( ".gmHistory table tbody tr", "rows => rows.map(row => Array.from(row.querySelectorAll('td')).map(cell => cell.textContent.trim()))" ) fund_info["history_data"] = table_data return fund_info except Exception as e: logger.error(f"Playwright抓取失败: {e}") return {} finally: if page: await page.close() async def _extract_text(self, page, selector: str) -> str: """提取文本""" try: element = await page.wait_for_selector(selector, timeout=5000) return await element.text_content() if element else "" except: return "" async def _extract_number(self, page, selector: str) -> Optional[float]: """提取数字""" try: text = await self._extract_text(page, selector) if text: # 清理非数字字符 cleaned = ''.join(c for c in text if c.isdigit() or c in '.-') return float(cleaned) if cleaned else None except: return None async def close(self): """关闭浏览器""" if self.context: await self.context.close() if self.browser: await self.browser.close()4.4 数据解析与清洗
python
# parsers/fund_parser.py import pandas as pd import numpy as np from typing import List, Dict, Any from datetime import datetime import logging logger = logging.getLogger(__name__) class FundDataParser: """基金数据解析器""" @staticmethod def parse_history_data(raw_data: List[Dict]) -> pd.DataFrame: """解析历史净值数据""" records = [] for item in raw_data: try: record = { "date": pd.to_datetime(item["FSRQ"]), "nav": float(item["DWJZ"]) if item["DWJZ"] else None, "acc_nav": float(item["LJJZ"]) if item["LJJZ"] else None, "daily_return": item["JZZZL"], "purchase_status": item["SGZT"], "redemption_status": item["SHZT"], "bonus": item["FHFCZ"] if item["FHFCZ"] else 0.0 } # 处理涨跌幅 if record["daily_return"]: try: record["daily_return"] = float(record["daily_return"].rstrip("%")) except: record["daily_return"] = None records.append(record) except Exception as e: logger.warning(f"解析数据项失败: {item}, 错误: {e}") continue df = pd.DataFrame(records) if not df.empty: # 按日期排序 df = df.sort_values("date") # 计算累计收益 if "nav" in df.columns: df["cumulative_return"] = df["nav"].pct_change().add(1).cumprod().sub(1) * 100 return df @staticmethod def calculate_metrics(df: pd.DataFrame) -> Dict[str, Any]: """计算基金指标""" if df.empty: return {} metrics = {} # 基础统计 metrics["data_points"] = len(df) metrics["date_range"] = { "start": df["date"].min().strftime("%Y-%m-%d"), "end": df["date"].max().strftime("%Y-%m-%d") } # 收益率计算 if "nav" in df.columns: df["returns"] = df["nav"].pct_change() # 年化收益率 days = (df["date"].max() - df["date"].min()).days if days > 0: total_return = (df["nav"].iloc[-1] / df["nav"].iloc[0] - 1) * 100 metrics["total_return"] = total_return metrics["annualized_return"] = ((1 + total_return/100) ** (365/days) - 1) * 100 # 波动率 if len(df["returns"]) > 1: daily_volatility = df["returns"].std() metrics["annualized_volatility"] = daily_volatility * np.sqrt(252) * 100 # 最大回撤 cumulative = (1 + df["returns"]).cumprod() running_max = cumulative.expanding().max() drawdown = (cumulative - running_max) / running_max metrics["max_drawdown"] = drawdown.min() * 100 return metrics4.5 数据存储模块
python
# storage/database.py from sqlalchemy import create_engine, Column, String, Float, DateTime, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import pandas as pd from typing import List, Dict, Any import logging logger = logging.getLogger(__name__) Base = declarative_base() class FundHistory(Base): """基金历史净值表""" __tablename__ = "fund_history" id = Column(String(50), primary_key=True) fund_code = Column(String(10), index=True) date = Column(DateTime, index=True) nav = Column(Float) acc_nav = Column(Float) daily_return = Column(Float) purchase_status = Column(String(10)) redemption_status = Column(String(10)) bonus = Column(Float) created_at = Column(DateTime, default=pd.Timestamp.now) updated_at = Column(DateTime, default=pd.Timestamp.now, onupdate=pd.Timestamp.now) class FundInfo(Base): """基金信息表""" __tablename__ = "fund_info" fund_code = Column(String(10), primary_key=True) fund_name = Column(String(100)) fund_type = Column(String(50)) establishment_date = Column(DateTime) fund_size = Column(Float) fund_manager = Column(String(100)) fund_company = Column(String(100)) created_at = Column(DateTime, default=pd.Timestamp.now) class DatabaseManager: """数据库管理器""" def __init__(self, database_url: str): self.engine = create_engine(database_url) self.Session = sessionmaker(bind=self.engine) def init_database(self): """初始化数据库""" Base.metadata.create_all(self.engine) def save_fund_history(self, fund_code: str, history_data: pd.DataFrame): """保存基金历史数据""" session = self.Session() try: for _, row in history_data.iterrows(): record_id = f"{fund_code}_{row['date'].strftime('%Y%m%d')}" record = FundHistory( id=record_id, fund_code=fund_code, date=row["date"], nav=row.get("nav"), acc_nav=row.get("acc_nav"), daily_return=row.get("daily_return"), purchase_status=row.get("purchase_status"), redemption_status=row.get("redemption_status"), bonus=row.get("bonus", 0.0) ) # 使用upsert操作 session.merge(record) session.commit() logger.info(f"已保存基金{fund_code}的{len(history_data)}条历史数据") except Exception as e: session.rollback() logger.error(f"保存数据失败: {e}") raise finally: session.close() def save_fund_info(self, fund_info: Dict[str, Any]): """保存基金信息""" session = self.Session() try: record = FundInfo( fund_code=fund_info["fund_code"], fund_name=fund_info["fund_name"], fund_type=fund_info.get("fund_type"), establishment_date=fund_info.get("establishment_date"), fund_size=fund_info.get("fund_size"), fund_manager=fund_info.get("fund_manager"), fund_company=fund_info.get("fund_company") ) session.merge(record) session.commit() logger.info(f"已保存基金{fund_info['fund_code']}的基本信息") except Exception as e: session.rollback() logger.error(f"保存基金信息失败: {e}") raise finally: session.close()4.6 主程序与任务调度
python
# main.py import asyncio import logging from typing import List, Dict import pandas as pd from datetime import datetime, timedelta from config.settings import settings from core.async_spider import AsyncFundSpider from core.playwright_handler import PlaywrightHandler from parsers.fund_parser import FundDataParser from storage.database import DatabaseManager from utils.logger import setup_logger logger = setup_logger(__name__) class FundDataCrawler: """基金数据爬虫主程序""" def __init__(self): self.spider = None self.playwright = None self.db_manager = DatabaseManager(settings.DATABASE_URL) self.parser = FundDataParser() async def run(self, fund_codes: List[str] = None): """运行爬虫""" if not fund_codes: fund_codes = await self.get_all_fund_codes() logger.info(f"开始采集{len(fund_codes)}个基金的数据") async with AsyncFundSpider() as spider: self.spider = spider tasks = [] for fund_code in fund_codes: task = asyncio.create_task( self.process_fund(fund_code) ) tasks.append(task) # 控制并发数量 if len(tasks) >= 10: await asyncio.gather(*tasks) tasks = [] await asyncio.sleep(1) # 避免请求过于频繁 # 处理剩余任务 if tasks: await asyncio.gather(*tasks) logger.info("数据采集完成") async def get_all_fund_codes(self) -> List[str]: """获取所有基金代码""" logger.info("开始获取基金列表...") all_funds = [] page = 1 while True: try: funds = await self.spider.fetch_fund_list(page=page, page_size=100) if not funds: break all_funds.extend([f["fund_code"] for f in funds]) logger.info(f"已获取第{page}页,共{len(funds)}个基金") page += 1 await asyncio.sleep(0.5) # 避免请求过快 except Exception as e: logger.error(f"获取基金列表失败: {e}") break return list(set(all_funds)) # 去重 async def process_fund(self, fund_code: str): """处理单个基金的数据采集""" try: logger.info(f"开始处理基金: {fund_code}") # 获取历史净值数据 end_date = datetime.now().strftime("%Y-%m-%d") start_date = (datetime.now() - timedelta(days=365*5)).strftime("%Y-%m-%d") raw_data = await self.spider.fetch_fund_history( fund_code=fund_code, start_date=start_date, end_date=end_date ) if not raw_data: logger.warning(f"基金{fund_code}无历史数据") return # 解析数据 history_df = self.parser.parse_history_data(raw_data) if history_df.empty: logger.warning(f"基金{fund_code}数据解析为空") return # 计算指标 metrics = self.parser.calculate_metrics(history_df) logger.info(f"基金{fund_code}指标计算完成: {metrics}") # 保存到数据库 self.db_manager.save_fund_history(fund_code, history_df) # 获取基金详细信息(使用Playwright) async with PlaywrightHandler(headless=True) as playwright: fund_detail = await playwright.scrape_fund_detail(fund_code) if fund_detail: self.db_manager.save_fund_info(fund_detail) # 保存为CSV文件 self.save_to_csv(fund_code, history_df) logger.info(f"基金{fund_code}处理完成,共{len(history_df)}条记录") except Exception as e: logger.error(f"处理基金{fund_code}失败: {e}") def save_to_csv(self, fund_code: str, df: pd.DataFrame): """保存为CSV文件""" import os # 创建目录 os.makedirs(settings.DATA_DIR, exist_ok=True) # 保存文件 filename = f"{settings.DATA_DIR}/fund_{fund_code}_{datetime.now().strftime('%Y%m%d')}.csv" df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已保存到CSV文件: {filename}") async def batch_update(self, days: int = 7): """批量更新近期数据""" logger.info(f"开始更新最近{days}天的数据") # 从数据库获取需要更新的基金 session = self.db_manager.Session() try: # 获取所有基金代码 from storage.database import FundInfo funds = session.query(FundInfo.fund_code).all() fund_codes = [f[0] for f in funds] logger.info(f"需要更新{len(fund_codes)}个基金的数据") # 更新每个基金的近期数据 update_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") async with AsyncFundSpider() as spider: for fund_code in fund_codes: try: raw_data = await spider.fetch_fund_history( fund_code=fund_code, start_date=update_date ) if raw_data: history_df = self.parser.parse_history_data(raw_data) self.db_manager.save_fund_history(fund_code, history_df) logger.info(f"已更新基金{fund_code}的近期数据") await asyncio.sleep(0.1) # 控制请求频率 except Exception as e: logger.error(f"更新基金{fund_code}失败: {e}") continue finally: session.close() logger.info("批量更新完成") async def main(): """主函数""" crawler = FundDataCrawler() # 初始化数据库 crawler.db_manager.init_database() # 运行爬虫(可以指定特定基金代码) # await crawler.run(["000001", "110011"]) # 或获取所有基金数据 await crawler.run() # 或执行批量更新 # await crawler.batch_update(days=30) if __name__ == "__main__": # 设置事件循环策略(Windows需要) try: asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) except: pass # 运行主程序 asyncio.run(main())4.7 实用工具类
python
# utils/rate_limiter.py import asyncio import time from collections import deque from typing import Deque class RateLimiter: """速率限制器""" def __init__(self, max_requests: int, time_window: int): self.max_requests = max_requests self.time_window = time_window self.requests: Deque[float] = deque() self.lock = asyncio.Lock() async def acquire(self): """获取请求许可""" async with self.lock: now = time.time() # 移除过期的请求记录 while self.requests and self.requests[0] <= now - self.time_window: self.requests.popleft() # 检查是否超过限制 if len(self.requests) >= self.max_requests: sleep_time = self.requests[0] + self.time_window - now if sleep_time > 0: await asyncio.sleep(sleep_time) # 重新计算 return await self.acquire() # 添加当前请求 self.requests.append(now)
五、高级功能与优化
5.1 分布式爬虫支持
python
# core/distributed_spider.py import asyncio import pickle from typing import List, Optional import aioredis from config.settings import settings class DistributedSpider: """分布式爬虫协调器""" def __init__(self): self.redis = None self.queue_key = "fund:crawler:queue" self.progress_key = "fund:crawler:progress" async def connect_redis(self): """连接Redis""" self.redis = await aioredis.from_url(settings.REDIS_URL) async def distribute_tasks(self, fund_codes: List[str]): """分发任务到队列""" for code in fund_codes: task = { "fund_code": code, "status": "pending", "created_at": time.time() } await self.redis.lpush(self.queue_key, pickle.dumps(task)) async def get_next_task(self) -> Optional[dict]: """获取下一个任务""" task_data = await self.redis.rpop(self.queue_key) if task_data: return pickle.loads(task_data) return None async def update_progress(self, fund_code: str, status: str): """更新任务进度""" await self.redis.hset(self.progress_key, fund_code, status)5.2 数据质量监控
python
# utils/data_monitor.py import pandas as pd from datetime import datetime, timedelta class DataQualityMonitor: """数据质量监控器""" @staticmethod def check_data_quality(df: pd.DataFrame) -> dict: """检查数据质量""" report = { "total_records": len(df), "missing_values": {}, "date_consistency": True, "value_consistency": True } # 检查缺失值 for column in df.columns: missing_count = df[column].isnull().sum() if missing_count > 0: report["missing_values"][column] = missing_count # 检查日期连续性 if "date" in df.columns: dates = pd.to_datetime(df["date"]).sort_values() date_diff = dates.diff().dt.days if (date_diff.iloc[1:] > 5).any(): # 允许最多5天的间隔 report["date_consistency"] = False report["max_date_gap"] = int(date_diff.max()) # 检查净值合理性 if "nav" in df.columns: if (df["nav"] <= 0).any() or (df["nav"] > 100).any(): report["value_consistency"] = False # 检查异常波动 returns = df["nav"].pct_change() if (returns.abs() > 0.1).any(): # 单日涨跌幅超过10% report["has_extreme_returns"] = True return report六、部署与运行
6.1 Docker部署配置
dockerfile
# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ wget \ gnupg \ unzip \ && rm -rf /var/lib/apt/lists/* # 安装Chrome和Playwright RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update \ && apt-get install -y google-chrome-stable \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD ["python", "main.py"]
6.2 定时任务配置
python
# scheduler.py import schedule import asyncio from datetime import datetime from main import FundDataCrawler async def scheduled_task(): """定时任务""" crawler = FundDataCrawler() await crawler.batch_update(days=1) print(f"[{datetime.now()}] 每日更新完成") def run_scheduler(): """运行调度器""" # 每日凌晨2点执行 schedule.every().day.at("02:00").do( lambda: asyncio.run(scheduled_task()) ) # 每小时检查一次 schedule.every().hour.do( lambda: print(f"[{datetime.now()}] 调度器运行中...") ) while True: schedule.run_pending() time.sleep(60) if __name__ == "__main__": run_scheduler()七、注意事项与最佳实践
7.1 合规性注意事项
遵守robots.txt: 检查目标网站的robots.txt文件
控制请求频率: 避免对服务器造成过大压力
尊重版权: 仅用于个人学习研究,商业用途需获得授权
数据使用限制: 遵守网站的数据使用条款
7.2 性能优化建议
使用连接池: 复用HTTP连接减少开销
实施缓存策略: 缓存已获取的数据
错误重试机制: 实现指数退避重试
分布式采集: 对于大量数据考虑分布式部署
7.3 反爬虫应对策略
轮换User-Agent: 定期更换请求头
使用代理IP池: 避免IP被封禁
模拟人类行为: 添加随机延迟和滚动操作
处理验证码: 集成验证码识别服务
八、总结
本文详细介绍了如何使用Python最新的异步爬虫技术采集基金净值历史数据。通过结合aiohttp、Playwright等现代工具,我们构建了一个高效、稳定的数据采集系统。该系统具有以下特点:
高性能: 基于异步IO,支持高并发数据采集
鲁棒性: 完善的错误处理和重试机制
可扩展性: 模块化设计,便于功能扩展
数据质量: 包含数据清洗和质量检查
易部署: 支持容器化部署和定时任务