目录
背景
过程
结果
背景
我有一个对公众号改写的服务已经上线,上线后发现获取文章详情经常被微信风控,一天甚至会出现两次无法获取微信公众号文章的情况,于是我就想办法优化,openclaw装了一个多月了,终于该他上场实战了
过程
直接对话告诉openclaw,获取微信公众号“https://mp.weixin.qq.com/s/xxx”文本内容,欻欻,几秒就搞定了,于是我发现这家伙这次靠谱,于是我就接着告诉他,“将这个功能做成一个api吧,就是发给你文章链接,你返回公众号文本内容”,全部是json格式
请求入参示例:
{"link":"https://mp.weixin.qq.com/s/qP7DRV86j0Z1U4_kHXj8wA"}响应格式:
{ "code": 200, "data": { "title": "标题", "author": "作者", "content": "正文内容" }, "message": "成功" }结果
大约1分钟就生成好了,实测也正常可用,还告诉了我怎么使用,怎么部署服务
使用:
curl -X POST "http://<服务器IP>:8080/api/fetch" \ -H "Content-Type: application/json" \ -d '{"link": "https://mp.weixin.qq.com/s/qP7DRV86j0Z1U4_kHXj8wA", "format": "content"}'是一个python服务,源代码
#!/usr/bin/env python3 """ 微信公众号文章获取 API 服务 提供HTTP接口获取微信公众号文章内容 用法: python3 wechat_article_api.py # 启动服务 (默认端口 8080) python3 wechat_article_api.py --port 8888 # 指定端口 API 端点: POST /api/fetch 请求体: {"url": "https://mp.weixin.qq.com/s/xxxxx", "format": "text"} 返回: {"success": true, "data": {...}} GET /api/fetch?url=https://mp.weixin.qq.com/s/xxxxx&format=text """ import sys import json import urllib.request import urllib.error import urllib.parse import re import argparse from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs class WeChatArticleExtractor: """微信公众号文章提取器""" @staticmethod def fetch_url(url, headers=None): """抓取URL内容""" if headers is None: headers = { 'User-Agent': 'Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9', } try: req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=30) as response: return response.read().decode('utf-8', errors='ignore'), None except Exception as e: return None, str(e) @staticmethod def extract_article_content(html): """提取单篇文章内容""" result = { 'title': '', 'author': '', 'publish_time': '', 'content': '', 'read_count': '', 'like_count': '', 'cover_image': '' } # 提取标题 patterns = [ r'var msg_title = [\'"](.+?)[\'"];', r'<h1[^>]*class="rich_media_title[^"]*"[^>]*>(.*?)</h1>', r'<h2[^>]*class="rich_media_title[^"]*"[^>]*>(.*?)</h2>', ] for pattern in patterns: match = re.search(pattern, html, re.DOTALL) if match: title = match.group(1) title = title.replace('\\x26', '&').replace('\\x0a', '').replace('\\x20', ' ').replace('\\x2c', ',') result['title'] = re.sub(r'<[^>]+>', '', title).strip() break # 清理标题中的JavaScript代码 if result['title'] and "'.html" in result['title']: result['title'] = result['title'].split("'.html")[0] # 提取作者 author_patterns = [ r'var nickname = [\'"](.+?)[\'"];', r'<a[^>]*id="js_name"[^>]*>(.*?)</a>', r'<span[^>]*class="profile_nickname"[^>]*>(.*?)</span>', r'<a[^>]*class="profile_nickname"[^>]*>(.*?)</a>', ] for pattern in author_patterns: match = re.search(pattern, html, re.DOTALL) if match: result['author'] = re.sub(r'<[^>]+>', '', match.group(1)).strip() break # 提取发布时间 time_patterns = [ r'<em[^>]*id="publish_time"[^>]*>(.*?)</em>', r'var publish_time = [\'"](.+?)[\'"];', r'<span[^>]*class="publish_time"[^>]*>(.*?)</span>', ] for pattern in time_patterns: match = re.search(pattern, html, re.DOTALL) if match: result['publish_time'] = re.sub(r'<[^>]+>', '', match.group(1)).strip() break # 提取封面图 cover_patterns = [ r'var msg_cdn_url = [\'"](https?://[^\'"]+)[\'"];', r'<img[^>]*data-src="(https?://mmbiz\.qpic\.cn[^"]+)"[^>]*class="rich_media_thumb', ] for pattern in cover_patterns: match = re.search(pattern, html, re.DOTALL) if match: result['cover_image'] = match.group(1) break # 提取正文 content_match = re.search(r'<div[^>]*id="js_content"[^>]*>(.*?)</div>\s*(?:<script|</div>|<!--)', html, re.DOTALL) if content_match: content_html = content_match.group(1) # 处理图片 content_html = re.sub(r'<img[^>]*data-src="([^"]+)"[^>]*>', r'\n[图片: \1]\n', content_html) # 保留段落和换行 content_html = content_html.replace('</p>', '\n\n').replace('</div>', '\n').replace('</section>', '\n').replace('<br>', '\n').replace('<br/>', '\n') content = re.sub(r'<[^>]+>', '', content_html) content = content.replace(' ', ' ').replace('"', '"').replace('&', '&').replace('<', '<').replace('>', '>') content = re.sub(r'\n\s*\n', '\n\n', content) content = re.sub(r'\n{3,}', '\n\n', content) result['content'] = content.strip() return result @classmethod def fetch_article(cls, url): """获取文章完整信息""" html, error = cls.fetch_url(url) if html is None: return None, error article = cls.extract_article_content(html) article['url'] = url if not article['title']: return None, "无法提取文章标题,文章可能需要登录或已被删除" return article, None class APIHandler(BaseHTTPRequestHandler): """API请求处理器""" def log_message(self, format, *args): """自定义日志输出""" print(f"[{self.log_date_time_string()}] {args[0]}") def _send_json_response(self, status_code, data): """发送JSON响应""" self.send_response(status_code) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8')) def _send_success(self, data, message="成功"): """发送成功响应 - 标准格式""" self._send_json_response(200, { 'code': 200, 'data': data, 'message': message }) def _send_error(self, status_code, message): """发送错误响应 - 标准格式""" self._send_json_response(status_code, { 'code': status_code, 'data': None, 'message': message }) def do_OPTIONS(self): """处理CORS预检请求""" self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def do_GET(self): """处理GET请求""" parsed_path = urlparse(self.path) path = parsed_path.path query_params = parse_qs(parsed_path.query) # 健康检查 if path == '/health': self._send_json_response(200, {'status': 'ok', 'service': 'wechat-article-api'}) return # 获取文章 - 支持 link 或 url 参数 if path == '/api/fetch': url = query_params.get('link', [''])[0] or query_params.get('url', [''])[0] format_type = query_params.get('format', ['text'])[0] # 默认返回纯文本 self._handle_fetch(url, format_type) return # 404 self._send_error(404, 'Not Found') def do_POST(self): """处理POST请求""" parsed_path = urlparse(self.path) path = parsed_path.path # 读取请求体 content_length = int(self.headers.get('Content-Length', 0)) if content_length > 0: body = self.rfile.read(content_length).decode('utf-8') try: data = json.loads(body) if body else {} except json.JSONDecodeError: self._send_error(400, 'Invalid JSON') return else: data = {} # 获取文章 - 支持 link 或 url 参数 if path == '/api/fetch': url = data.get('link') or data.get('url', '') format_type = data.get('format', 'text') # 默认返回纯文本 self._handle_fetch(url, format_type) return # 404 self._send_error(404, 'Not Found') def _handle_fetch(self, url, format_type): """处理获取文章请求""" # 验证URL if not url: self._send_error(400, 'Missing required parameter: url') return if not url.startswith('https://mp.weixin.qq.com/'): self._send_error(400, 'Invalid URL. Only mp.weixin.qq.com URLs are supported') return print(f"Fetching article: {url}") # 获取文章 article, error = WeChatArticleExtractor.fetch_article(url) if article is None: self._send_error(500, f'Failed to fetch article: {error}') return # 根据格式返回 - 统一使用标准格式 {code, data, message} # data 直接返回内容,不包裹在对象中 if format_type == 'content': # 只返回正文内容字符串 self._send_success(article['content'], "成功") elif format_type == 'text': # 纯文本格式 text_output = self._format_as_text(article) self._send_success(text_output, "成功") elif format_type == 'markdown': # Markdown格式 md_output = self._format_as_markdown(article) self._send_success(md_output, "成功") else: # 完整JSON格式 - 返回文章对象 self._send_success(article, "成功") def _format_as_text(self, article): """格式化为纯文本""" lines = [ '=' * 60, f"标题:{article['title']}", f"作者:{article['author']}", f"发布时间:{article['publish_time']}", f"原文链接:{article['url']}", '=' * 60, '', article['content'], '', '=' * 60, ] return '\n'.join(lines) def _format_as_markdown(self, article): """格式化为Markdown""" lines = [ f"# {article['title']}", '', f"**作者:** {article['author']}", '', f"**发布时间:** {article['publish_time']}", '', f"**原文链接:** [{article['url']}]({article['url']})", '', '---', '', article['content'], ] return '\n'.join(lines) def run_server(port=8080): """启动API服务器""" server_address = ('0.0.0.0', port) httpd = HTTPServer(server_address, APIHandler) print(f"=" * 60) print(f"微信公众号文章获取 API 服务") print(f"=" * 60) print(f"服务地址: http://0.0.0.0:{port}") print(f"") print(f"API 端点:") print(f" GET /health - 健康检查") print(f" GET /api/fetch?url=<URL>&format=<format>") print(f" POST /api/fetch - 获取文章内容") print(f"") print(f"参数:") print(f" url: 微信公众号文章链接 (必填)") print(f" format: 输出格式 (可选: json, text, markdown, 默认: json)") print(f"") print(f"示例:") print(f' curl "http://localhost:{port}/api/fetch?url=https://mp.weixin.qq.com/s/xxxxx&format=text"') print(f"=" * 60) print(f"按 Ctrl+C 停止服务") print(f"") try: httpd.serve_forever() except KeyboardInterrupt: print("\n服务已停止") httpd.server_close() def main(): parser = argparse.ArgumentParser( description='微信公众号文章获取 API 服务', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 启动服务 (默认端口 8080) python3 wechat_article_api.py # 指定端口 python3 wechat_article_api.py --port 8888 # 测试 API curl "http://localhost:8080/api/fetch?url=https://mp.weixin.qq.com/s/xxxxx&format=text" """ ) parser.add_argument('--port', type=int, default=8080, help='服务端口 (默认: 8080)') args = parser.parse_args() run_server(args.port) if __name__ == '__main__': main()部署教程:
好的,我来帮你整理部署文档和修改接口。 部署步骤 1. 复制API文件到新服务器 需要复制这个文件: Copy ~/.openclaw/workspace/skills/wechat-account-articles/api/wechat_article_api.py
2. 安装依赖 Copy pip3 install urllib3
3. 启动服务 Copy python3 wechat_article_api.py --port 8080
现在我要去对接到我的系统上了
代码已经长传到gitee和github
gitee地址
https://gitee.com/hhxdn/wechat-article-detail-apigithub地址
https://github.com/wang1688/wechat-article-detail-api夸克网盘地址:
https://pan.quark.cn/s/f730add345b9