从零开始用Python进行小红书数据采集的6个核心技术
【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs
你是否曾想分析小红书平台的热门笔记却不知从何入手?面对复杂的API认证和频繁变化的接口参数感到困惑?想通过数据分析发现内容创作规律却被技术门槛阻挡?本文将带你掌握6个核心技术,从环境搭建到数据可视化,全方位提升你的小红书数据采集能力,让你轻松获取有价值的平台数据!
1. 环境配置与认证机制:构建稳定的数据采集基础
如何搭建一个既安全又高效的小红书数据采集环境?从工具选择到认证实现,这是每个数据采集者需要解决的首要问题。
开发环境准备
首先需要安装必要的Python库,我们推荐使用以下组合:
# 安装核心依赖库 # pip install requests httpx python-dotenv pydantic loguru # 导入所需库 import os import time import json import httpx from dotenv import load_dotenv from pydantic import BaseModel from loguru import logger from typing import Dict, Optional, List # 加载环境变量 load_dotenv() # 从.env文件加载敏感信息认证机制实现
小红书的认证机制与其他平台有所不同,我们需要实现基于Cookie的会话管理:
class XHSClient: def __init__(self, cookie: str = None): """ 初始化小红书客户端 :param cookie: 小红书网页版Cookie """ self.cookie = cookie or os.getenv("XHS_COOKIE") self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", "Cookie": self.cookie, "Referer": "https://www.xiaohongshu.com/", "Content-Type": "application/json" } self.client = httpx.Client(headers=self.headers, timeout=10) self.is_authenticated = False def authenticate(self) -> bool: """验证Cookie有效性""" try: response = self.client.get("https://www.xiaohongshu.com/api/sns/web/v1/user/me") if response.status_code == 200: self.is_authenticated = True logger.success("Cookie认证成功") return True logger.error(f"认证失败: {response.status_code}") return False except Exception as e: logger.error(f"认证出错: {str(e)}") return False # 使用示例 client = XHSClient() if not client.authenticate(): logger.error("请更新有效的Cookie")💡 专家提示:获取小红书Cookie的方法很简单,在浏览器中登录小红书网页版,按F12打开开发者工具,在Application->Cookies中找到名为web_session的Cookie值,将其保存到.env文件中。建议定期更新Cookie以避免失效。
| 认证方式 | 实现难度 | 稳定性 | 适用场景 |
|---|---|---|---|
| Cookie认证 | 低 | 中(需定期更新) | 个人数据采集 |
| 账号密码登录 | 中 | 高 | 长期项目 |
| 第三方API | 高 | 高 | 商业应用 |
实战练习
尝试创建一个完整的认证模块,包含Cookie自动更新机制和登录状态监控,当检测到认证失效时能自动提醒用户更新Cookie。
2. 笔记数据采集:从发现页到详情页的完整流程
如何高效采集小红书的笔记数据?从热门推荐到关键词搜索,掌握这些技巧让你轻松获取海量内容数据。
发现页数据采集
小红书发现页是获取热门内容的重要渠道,我们可以通过以下方法采集:
def get_discovery_notes(self, page: int = 1, page_size: int = 20) -> List[Dict]: """ 获取发现页笔记列表 :param page: 页码 :param page_size: 每页数量 :return: 笔记列表 """ url = "https://www.xiaohongshu.com/api/sns/web/v1/feed" params = { "page": page, "page_size": page_size, "version": "v6" } try: response = self.client.get(url, params=params) data = response.json() if data.get("success"): return data.get("data", {}).get("items", []) logger.error(f"获取发现页数据失败: {data.get('msg')}") return [] except Exception as e: logger.error(f"请求发现页出错: {str(e)}") return []关键词搜索采集
针对特定主题的内容采集,可以使用关键词搜索功能:
def search_notes(self, keyword: str, page: int = 1, page_size: int = 20) -> List[Dict]: """ 搜索关键词相关笔记 :param keyword: 搜索关键词 :param page: 页码 :param page_size: 每页数量 :return: 笔记列表 """ url = "https://www.xiaohongshu.com/api/sns/web/v1/search/notes" params = { "keyword": keyword, "page": page, "page_size": page_size, "sort": "general" # general/hot/time } try: response = self.client.get(url, params=params) data = response.json() if data.get("success"): return data.get("data", {}).get("notes", []) logger.error(f"搜索失败: {data.get('msg')}") return [] except Exception as e: logger.error(f"搜索请求出错: {str(e)}") return []笔记详情采集
获取单篇笔记的详细信息:
def get_note_detail(self, note_id: str) -> Optional[Dict]: """ 获取笔记详细信息 :param note_id: 笔记ID :return: 笔记详情 """ url = f"https://www.xiaohongshu.com/api/sns/web/v1/note/{note_id}" try: response = self.client.get(url) data = response.json() if data.get("success"): return data.get("data") logger.error(f"获取笔记详情失败: {data.get('msg')}") return None except Exception as e: logger.error(f"获取笔记详情出错: {str(e)}") return None # 使用示例 notes = client.search_notes("美食探店", page=1, page_size=10) for note in notes: detail = client.get_note_detail(note["id"]) print(f"标题: {detail['title']}, 收藏数: {detail['collection_count']}") time.sleep(1) # 控制请求频率⚠️ 注意:小红书对请求频率有严格限制,建议每两次请求之间至少间隔1秒,避免短时间内发送大量请求导致IP被限制。对于大规模采集,建议使用代理IP池分散请求压力。
实战练习
实现一个笔记采集器,能够根据关键词搜索并自动翻页,采集至少100篇相关笔记的标题、作者、发布时间、点赞数、收藏数和评论数等信息。
3. 反爬策略与IP管理:突破采集限制的关键技术
面对小红书的反爬机制,如何确保数据采集的稳定性和持续性?掌握这些高级技巧让你的采集项目顺利进行。
请求频率控制
实现智能的请求间隔控制,避免触发反爬机制:
import random from time import sleep class SmartRateLimiter: def __init__(self, base_delay: float = 1.0, jitter_range: tuple = (0.5, 1.5)): """ 智能请求频率控制器 :param base_delay: 基础延迟时间(秒) :param jitter_range: 随机抖动范围 """ self.base_delay = base_delay self.jitter_range = jitter_range self.last_request_time = 0 def wait(self): """根据上次请求时间计算并等待合适的时间""" current_time = time.time() elapsed = current_time - self.last_request_time # 计算需要等待的时间 if elapsed < self.base_delay: wait_time = self.base_delay - elapsed + random.uniform(*self.jitter_range) sleep(wait_time) # 更新上次请求时间 self.last_request_time = time.time() # 在XHSClient中集成 class XHSClient: def __init__(self, cookie: str = None): # ... 其他初始化代码 ... self.rate_limiter = SmartRateLimiter(base_delay=1.2) def get(self, url, **kwargs): """带频率控制的GET请求""" self.rate_limiter.wait() return self.client.get(url, **kwargs)代理IP池实现
使用代理IP分散请求来源,降低单个IP被封禁的风险:
class ProxyPool: def __init__(self, proxy_file: str = "proxies.txt"): """ 代理IP池 :param proxy_file: 代理IP文件路径 """ self.proxies = self.load_proxies(proxy_file) self.current_index = 0 def load_proxies(self, proxy_file: str) -> List[str]: """从文件加载代理IP列表""" try: with open(proxy_file, "r") as f: return [line.strip() for line in f if line.strip()] except FileNotFoundError: logger.warning("代理文件不存在,将使用本地IP") return [] def get_next_proxy(self) -> Optional[str]: """获取下一个代理IP""" if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy # 在XHSClient中使用代理 class XHSClient: def __init__(self, cookie: str = None, use_proxy: bool = False): # ... 其他初始化代码 ... self.proxy_pool = ProxyPool() if use_proxy else None def get(self, url, **kwargs): self.rate_limiter.wait() # 如果启用代理,获取下一个代理 if self.proxy_pool: proxy = self.proxy_pool.get_next_proxy() if proxy: kwargs["proxies"] = {"https": proxy} return self.client.get(url, **kwargs)💡 专家提示:优质的代理IP是突破反爬限制的关键。建议选择高匿HTTPS代理,定期检测代理有效性,并建立代理评分机制,自动剔除连接速度慢或不稳定的代理。
| 反爬策略 | 实施难度 | 效果 | 成本 |
|---|---|---|---|
| 请求频率控制 | 低 | 中 | 低 |
| 代理IP池 | 中 | 高 | 中 |
| 浏览器自动化 | 高 | 高 | 高 |
| 分布式采集 | 高 | 最高 | 最高 |
实战练习
实现一个代理IP质量检测工具,能够定期测试代理池中IP的连接速度、匿名度和稳定性,并自动更新代理池列表。
4. 数据清洗与预处理:从原始数据到可用信息
采集到的原始数据往往包含噪声和无关信息,如何将其转化为干净、结构化的数据?这些数据清洗技巧将大大提升你的数据分析效率。
数据清洗基础
首先,我们需要定义数据模型来规范数据结构:
from pydantic import BaseModel, field_validator from datetime import datetime import re class Note(BaseModel): """笔记数据模型""" note_id: str title: str content: str user_id: str username: str post_time: datetime like_count: int collect_count: int comment_count: int share_count: int tags: List[str] image_count: int @field_validator('content') def clean_content(cls, v): """清理笔记内容""" # 移除HTML标签 v = re.sub(r'<[^>]*>', '', v) # 移除多余空白 v = re.sub(r'\s+', ' ', v).strip() return v @field_validator('post_time', mode='before') def parse_post_time(cls, v): """解析发布时间""" if isinstance(v, str): # 处理不同格式的时间字符串 for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']: try: return datetime.strptime(v, fmt) except ValueError: continue # 如果所有格式都失败,使用当前时间并记录警告 logger.warning(f"无法解析时间格式: {v}") return datetime.now() return v数据去重与标准化
实现数据去重和标准化处理:
import pandas as pd class NoteProcessor: def __init__(self): """笔记数据处理器""" self.notes_df = pd.DataFrame() def add_notes(self, raw_notes: List[Dict]): """添加原始笔记数据并处理""" # 转换为DataFrame new_notes = [] for note in raw_notes: try: # 提取需要的字段并创建Note对象 note_obj = Note( note_id=note.get("id"), title=note.get("title", ""), content=note.get("desc", ""), user_id=note.get("user", {}).get("user_id", ""), username=note.get("user", {}).get("nickname", ""), post_time=note.get("time"), like_count=note.get("liked_count", 0), collect_count=note.get("collected_count", 0), comment_count=note.get("comment_count", 0), share_count=note.get("share_count", 0), tags=[tag.get("name") for tag in note.get("tags", [])], image_count=len(note.get("images", [])) ) new_notes.append(note_obj.dict()) except Exception as e: logger.warning(f"处理笔记失败: {str(e)}, 笔记ID: {note.get('id')}") # 转换为DataFrame并去重 new_df = pd.DataFrame(new_notes) if not self.notes_df.empty: # 合并并去重 self.notes_df = pd.concat([self.notes_df, new_df]).drop_duplicates("note_id") else: self.notes_df = new_df def save_to_csv(self, filename: str = "cleaned_notes.csv"): """保存清洗后的数据到CSV文件""" self.notes_df.to_csv(filename, index=False, encoding="utf-8-sig") logger.success(f"已保存{len(self.notes_df)}条清洗后的笔记数据到{filename}") def get_statistics(self): """获取数据统计信息""" if self.notes_df.empty: return "暂无数据" stats = { "总笔记数": len(self.notes_df), "平均点赞数": self.notes_df["like_count"].mean(), "平均收藏数": self.notes_df["collect_count"].mean(), "平均评论数": self.notes_df["comment_count"].mean(), "发布时间范围": f"{self.notes_df['post_time'].min()}至{self.notes_df['post_time'].max()}", "热门标签": self.notes_df["tags"].explode().value_counts().head(10).to_dict() } return stats # 使用示例 processor = NoteProcessor() raw_notes = client.search_notes("旅行攻略", page=1, page_size=50) processor.add_notes(raw_notes) processor.save_to_csv() print(processor.get_statistics())⚠️ 注意:数据清洗是数据分析的关键步骤,直接影响后续分析结果的准确性。对于文本数据,要特别注意编码问题和特殊字符处理;对于数值数据,要检查异常值和缺失值;对于时间数据,要统一格式以便进行时间序列分析。
实战练习
创建一个完整的小红书笔记数据清洗流水线,能够处理至少1000条笔记数据,实现数据去重、缺失值处理、异常值检测和数据标准化,并生成数据质量报告。
5. 高级数据采集:评论、用户与关系网络
除了笔记内容,用户评论和用户关系网络也是重要的分析对象。掌握这些高级采集技巧,让你的数据分析更加全面。
评论数据采集
采集笔记评论数据的实现:
def get_note_comments(self, note_id: str, page: int = 1, page_size: int = 20) -> List[Dict]: """ 获取笔记评论 :param note_id: 笔记ID :param page: 页码 :param page_size: 每页数量 :return: 评论列表 """ url = f"https://www.xiaohongshu.com/api/sns/web/v1/comment/list/{note_id}" params = { "page": page, "page_size": page_size, "order": "hot" # hot/time } try: response = self.client.get(url, params=params) data = response.json() if data.get("success"): return data.get("data", {}).get("comments", []) logger.error(f"获取评论失败: {data.get('msg')}") return [] except Exception as e: logger.error(f"获取评论出错: {str(e)}") return []用户信息采集
采集用户公开信息的方法:
def get_user_profile(self, user_id: str) -> Optional[Dict]: """ 获取用户公开资料 :param user_id: 用户ID :return: 用户资料 """ url = f"https://www.xiaohongshu.com/api/sns/web/v1/user/{user_id}/profile" try: response = self.client.get(url) data = response.json() if data.get("success"): return data.get("data") logger.error(f"获取用户资料失败: {data.get('msg')}") return None except Exception as e: logger.error(f"获取用户资料出错: {str(e)}") return None用户关系网络采集
采集用户关注和粉丝关系:
def get_user_following(self, user_id: str, page: int = 1, page_size: int = 20) -> List[Dict]: """ 获取用户关注列表 :param user_id: 用户ID :param page: 页码 :param page_size: 每页数量 :return: 关注用户列表 """ url = f"https://www.xiaohongshu.com/api/sns/web/v1/user/{user_id}/following" params = { "page": page, "page_size": page_size } try: response = self.client.get(url, params=params) data = response.json() if data.get("success"): return data.get("data", {}).get("users", []) logger.error(f"获取关注列表失败: {data.get('msg')}") return [] except Exception as e: logger.error(f"获取关注列表出错: {str(e)}") return [] # 使用示例 note_id = "642d1e8f0000000001003a8b" comments = client.get_note_comments(note_id, page=1, page_size=50) # 采集评论用户信息 user_profiles = [] for comment in comments[:5]: # 只采集前5个评论用户 user_id = comment["user_id"] profile = client.get_user_profile(user_id) if profile: user_profiles.append(profile) # 获取每个用户的前5个关注 followings = client.get_user_following(user_id, page=1, page_size=5) profile["followings"] = followings💡 专家提示:用户关系网络采集可能会涉及大量请求,建议使用广度优先或深度优先策略进行控制。对于社交关系分析,可使用NetworkX库构建关系图,分析社区结构和关键节点。
实战练习
实现一个用户关系网络爬虫,以特定笔记的评论用户为起点,采集用户的关注关系,构建小型社交网络,并分析用户之间的连接关系。
6. 数据可视化与分析:从数据到洞察
采集和清洗数据后,如何将其转化为直观的图表和有价值的洞察?这些数据可视化技巧将帮助你更好地理解和展示数据。
基础数据可视化
使用Matplotlib和Seaborn创建基础图表:
import matplotlib.pyplot as plt import seaborn as sns import pandas as pd class NoteVisualizer: def __init__(self, df: pd.DataFrame): """ 笔记数据可视化工具 :param df: 处理后的笔记数据DataFrame """ self.df = df # 设置中文字体 plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"] plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 def plot_interaction_distribution(self): """绘制互动数据分布""" plt.figure(figsize=(15, 5)) # 点赞数分布 plt.subplot(1, 3, 1) sns.histplot(data=self.df, x="like_count", bins=30) plt.title("点赞数分布") plt.xlabel("点赞数") plt.ylabel("笔记数量") # 收藏数分布 plt.subplot(1, 3, 2) sns.histplot(data=self.df, x="collect_count", bins=30) plt.title("收藏数分布") plt.xlabel("收藏数") plt.ylabel("笔记数量") # 评论数分布 plt.subplot(1, 3, 3) sns.histplot(data=self.df, x="comment_count", bins=30) plt.title("评论数分布") plt.xlabel("评论数") plt.ylabel("笔记数量") plt.tight_layout() plt.savefig("interaction_distribution.png") plt.show() def plot_tags_cloud(self): """生成标签云""" from wordcloud import WordCloud import jieba # 提取所有标签 tags = [] for tag_list in self.df["tags"]: tags.extend(tag_list) # 生成标签文本 tag_text = " ".join(tags) # 创建词云 wc = WordCloud( font_path="simhei.ttf", # 中文字体路径 background_color="white", width=1000, height=600, max_words=100 ).generate(tag_text) plt.figure(figsize=(12, 8)) plt.imshow(wc, interpolation="bilinear") plt.axis("off") plt.title("热门标签词云") plt.savefig("tags_wordcloud.png") plt.show() def plot_post_time_analysis(self): """发布时间分析""" # 提取小时和星期几 self.df["post_hour"] = self.df["post_time"].dt.hour self.df["post_weekday"] = self.df["post_time"].dt.weekday # 0=周一, 6=周日 plt.figure(figsize=(12, 6)) # 按小时发布分布 plt.subplot(1, 2, 1) hour_counts = self.df["post_hour"].value_counts().sort_index() sns.barplot(x=hour_counts.index, y=hour_counts.values) plt.title("一天中发布时间分布") plt.xlabel("小时") plt.ylabel("发布数量") # 按星期发布分布 plt.subplot(1, 2, 2) weekday_counts = self.df["post_weekday"].value_counts().sort_index() sns.barplot(x=weekday_counts.index, y=weekday_counts.values) plt.title("一周中发布时间分布") plt.xlabel("星期") plt.ylabel("发布数量") plt.xticks(range(7), ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]) plt.tight_layout() plt.savefig("post_time_analysis.png") plt.show() # 使用示例 df = pd.read_csv("cleaned_notes.csv") df["post_time"] = pd.to_datetime(df["post_time"]) # 转换为datetime类型 visualizer = NoteVisualizer(df) visualizer.plot_interaction_distribution() visualizer.plot_tags_cloud() visualizer.plot_post_time_analysis()高级交互式可视化
使用Plotly创建交互式可视化:
import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots def interactive_visualization(df: pd.DataFrame): """创建交互式可视化""" # 1. 互动指标相关性散点图 fig = px.scatter( df, x="like_count", y="collect_count", size="comment_count", color="image_count", hover_data=["title", "username"], title="点赞数与收藏数相关性" ) fig.write_html("scatter_interactive.html") # 2. 多指标箱线图 fig = px.box( df, y=["like_count", "collect_count", "comment_count"], title="互动指标分布比较" ) fig.write_html("boxplot_interactive.html") # 3. 发布时间热力图 # 准备数据 heatmap_data = df.groupby(["post_weekday", "post_hour"]).size().unstack() # 创建热力图 fig = px.imshow( heatmap_data, labels=dict(x="小时", y="星期", color="发布数量"), x=heatmap_data.columns, y=["周一", "周二", "周三", "周四", "周五", "周六", "周日"], title="发布时间热力图" ) fig.write_html("heatmap_interactive.html") # 使用示例 interactive_visualization(df)💡 专家提示:数据可视化不仅是展示结果的手段,也是发现数据规律的重要方法。建议先从基础图表开始探索数据分布和相关性,再使用交互式可视化深入分析感兴趣的部分。对于大规模数据,考虑使用降维技术(如PCA、t-SNE)进行可视化。
实战练习
基于采集的小红书笔记数据,创建一个完整的数据可视化报告,包含互动指标分析、发布时间模式、热门标签分析和用户行为特征等内容,并提出至少3个有价值的内容创作建议。
实战项目:小红书内容趋势分析系统
现在让我们将所学知识整合起来,构建一个完整的小红书内容趋势分析系统。
项目架构
小红书内容趋势分析系统 ├── 数据采集模块 │ ├── 笔记采集器 │ ├── 评论采集器 │ └── 用户信息采集器 ├── 数据处理模块 │ ├── 数据清洗 │ ├── 数据存储 │ └── 数据预处理 ├── 数据分析模块 │ ├── 趋势分析 │ ├── 情感分析 │ └── 用户画像 └── 可视化模块 ├── 静态报表 └── 交互式仪表盘核心代码实现
class XHSTrendAnalyzer: def __init__(self, keywords: List[str], db_path: str = "xhs_data.db"): """ 小红书趋势分析器 :param keywords: 要分析的关键词列表 :param db_path: 数据库路径 """ self.keywords = keywords self.client = XHSClient(use_proxy=True) self.processor = NoteProcessor() self.db_path = db_path self.init_database() def init_database(self): """初始化数据库""" import sqlite3 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建笔记表 cursor.execute(''' CREATE TABLE IF NOT EXISTS notes ( note_id TEXT PRIMARY KEY, title TEXT, content TEXT, user_id TEXT, username TEXT, post_time DATETIME, like_count INTEGER, collect_count INTEGER, comment_count INTEGER, share_count INTEGER, image_count INTEGER ) ''') # 创建标签表 cursor.execute(''' CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, note_id TEXT, tag TEXT, FOREIGN KEY(note_id) REFERENCES notes(note_id) ) ''') conn.commit() conn.close() def collect_data(self, days: int = 7, max_notes_per_keyword: int = 200): """ 采集数据 :param days: 采集多少天内的数据 :param max_notes_per_keyword: 每个关键词最大采集笔记数 """ end_date = datetime.now() start_date = end_date - timedelta(days=days) for keyword in self.keywords: logger.info(f"开始采集关键词: {keyword}") page = 1 collected = 0 while collected < max_notes_per_keyword: notes = self.client.search_notes(keyword, page=page, page_size=20) if not notes: break # 过滤时间范围内的笔记 filtered_notes = [] for note in notes: note_time = datetime.fromtimestamp(note.get("time", 0)) if start_date <= note_time <= end_date: filtered_notes.append(note) if not filtered_notes: break # 处理并保存笔记 self.processor.add_notes(filtered_notes) self.save_to_database(filtered_notes) collected += len(filtered_notes) page += 1 logger.info(f"关键词 {keyword} 已采集 {collected}/{max_notes_per_keyword} 条笔记") logger.success(f"数据采集完成,共采集 {len(self.processor.notes_df)} 条笔记") def save_to_database(self, notes: List[Dict]): """保存笔记到数据库""" import sqlite3 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() for note in notes: try: # 解析笔记数据 note_id = note.get("id") title = note.get("title", "") content = note.get("desc", "") user_id = note.get("user", {}).get("user_id", "") username = note.get("user", {}).get("nickname", "") post_time = datetime.fromtimestamp(note.get("time", 0)).strftime("%Y-%m-%d %H:%M:%S") like_count = note.get("liked_count", 0) collect_count = note.get("collected_count", 0) comment_count = note.get("comment_count", 0) share_count = note.get("share_count", 0) image_count = len(note.get("images", [])) # 插入笔记 cursor.execute(''' INSERT OR IGNORE INTO notes (note_id, title, content, user_id, username, post_time, like_count, collect_count, comment_count, share_count, image_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (note_id, title, content, user_id, username, post_time, like_count, collect_count, comment_count, share_count, image_count)) # 插入标签 tags = [tag.get("name") for tag in note.get("tags", [])] for tag in tags: cursor.execute(''' INSERT OR IGNORE INTO tags (note_id, tag) VALUES (?, ?) ''', (note_id, tag)) except Exception as e: logger.warning(f"保存笔记失败: {str(e)}, 笔记ID: {note.get('id')}") conn.commit() conn.close() def generate_trend_report(self, output_dir: str = "report"): """生成趋势分析报告""" # 创建报告目录 os.makedirs(output_dir, exist_ok=True) # 从数据库加载数据 import sqlite3 conn = sqlite3.connect(self.db_path) self.notes_df = pd.read_sql("SELECT * FROM notes", conn) self.tags_df = pd.read_sql("SELECT * FROM tags", conn) conn.close() # 转换时间格式 self.notes_df["post_time"] = pd.to_datetime(self.notes_df["post_time"]) # 生成可视化图表 visualizer = NoteVisualizer(self.notes_df) visualizer.plot_interaction_distribution() visualizer.plot_tags_cloud() visualizer.plot_post_time_analysis() # 生成交互式可视化 interactive_visualization(self.notes_df) # 生成趋势分析 self.analyze_trends(output_dir) logger.success(f"趋势分析报告已生成至 {output_dir} 目录") def analyze_trends(self, output_dir: str): """分析内容趋势""" # 1. 每日发布量趋势 self.notes_df["post_date"] = self.notes_df["post_time"].dt.date daily_counts = self.notes_df.groupby("post_date").size() plt.figure(figsize=(12, 6)) daily_counts.plot(kind="line") plt.title("每日笔记发布量趋势") plt.xlabel("日期") plt.ylabel("笔记数量") plt.savefig(f"{output_dir}/daily_trend.png") plt.close() # 2. 热门标签趋势 tag_counts = self.tags_df["tag"].value_counts().head(10) plt.figure(figsize=(12, 6)) tag_counts.plot(kind="bar") plt.title("热门标签分布") plt.xlabel("标签") plt.ylabel("出现次数") plt.savefig(f"{output_dir}/hot_tags.png") plt.close() # 使用示例 analyzer = XHSTrendAnalyzer(keywords=["美食探店", "旅行攻略", "数码评测"]) analyzer.collect_data(days=14, max_notes_per_keyword=300) analyzer.generate_trend_report()项目扩展方向
- 增加定时采集功能,实现自动化趋势监控
- 集成情感分析模块,分析笔记和评论的情感倾向
- 添加竞品分析功能,比较不同关键词的表现
- 开发Web界面,提供更友好的交互体验
- 实现内容推荐功能,基于历史数据预测热门内容
数据伦理与合规性讨论
在进行数据采集和分析时,我们必须重视数据伦理和合规性问题:
- 数据采集边界:只采集公开可访问的数据,不尝试突破平台限制或访问未授权内容
- 用户隐私保护:匿名化处理采集到的用户数据,不泄露个人敏感信息
- 请求频率控制:尊重平台规则,不进行过度采集影响平台正常运行
- 数据使用范围:确保数据仅用于合法目的,不用于商业营销或恶意竞争
- 版权尊重:不盗用或滥用采集到的内容,引用时注明来源
随着数据隐私法规的不断完善,作为数据从业者,我们有责任遵守相关法律法规,坚持伦理原则,确保数据采集和使用的合法性和道德性。
总结
通过本文介绍的6个核心技术,你已经掌握了小红书数据采集的完整流程,从环境搭建、数据采集、反爬策略、数据清洗到数据分析和可视化。这些技能不仅适用于小红书,也可以迁移到其他社交媒体平台的数据采集项目中。
数据采集是数据分析的基础,而高质量的数据是获取有价值洞察的前提。希望本文介绍的技术和方法能够帮助你更好地进行数据采集工作,从海量数据中挖掘出有价值的信息。
记住,技术本身是中性的,关键在于如何使用它。始终保持对数据伦理的敬畏,遵守平台规则和法律法规,让数据采集技术成为创造价值的工具,而非滥用的手段。
官方文档:docs/
【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考