背景:毕设“仿真”之痛
做网络工程毕设,最怕老师一句“你的数据从哪来?”
过去三年,我帮十几位学弟学妹调过代码,最常见的是三种“翻车现场”:
- 用Packet Tracer拖几个路由器,跑个RIP/OSPF,拓扑漂亮却全是“假包”,答辩时被问“时延为什么固定 10 ms”直接社死。
- 把2003年的NS2代码改个参数,声称“改进TCP拥塞算法”,结果内核版本早就不支持TCL接口,现场演示直接段错误。
- 用Wireshark手工点“开始捕获”,30秒停止,导出CSV,样本只有1 200行,连t检验都过不了。
一句话:脱离真实流量,再花哨的算法都像在真空里比划拳。
于是我把目标拆成三条硬指标:
- 必须“活的”数据——直接蹭校园网,不插网线也能抓到几千人同时在线。
- 必须“轻量”——笔记本就能跑,不给实验室添预算。
- 必须“完整”——从抓包到页面一条链,老师看得见、摸得着、点得动。
技术选型:为什么不是tcpdump + Grafana
抓包框架对比
| 方案 | 脚本化难度 | 字段解析 | 依赖 | 备注 |
|---|---|---|---|---|
| tcpdump + 管道 | 低 | 自己算偏移 | 无 | 写正则写到哭 |
| Wireshark Lua API | 中 | 全协议树 | 装Wireshark | 内存爆炸,GUI甩不掉 |
| PyShark | 低 | 全协议树 | 装tshark | 调一次进程fork一次,CPU起飞 |
| Scapy | 低 | 一行一个字段 | 纯PyPI | 自带decode,回调常驻内存 |
结论:Scapy = Pythonic + 零依赖 + 协议层随意拆,毕设答辩机没root也能pip install。
Web框架权衡
- Flask:生态老、插件多、老师认识,轻量到单文件就能跑。
- FastAPI:异步+自动文档漂亮,但毕设场景QPS<20,异步收益几乎为零;再加uvloop、asyncpg,服务器还要开防火墙端口,多出来的复杂度老师并不加分。
最终拍板:Flask + Jinja2 + Chart.js,一页HTML交差,本地127.0.0.1演示,省掉CORS、Nginx、HTTPS一堆麻烦。
核心实现:三条线程跑通“抓-算-展”
系统架构极简,分三层:
- 捕获层:Scapy异步嗅探,把原始包丢进
queue.Queue,解耦生产/消费。 - 聚合层:Pandas滚动窗口,每30秒汇总一次五元组(sip,sport,dip,dport,proto),算字节量、包量、流持续时间。
- 服务层:Flask提供
/api/flows与/api/alert,前端轮询绘图;异常流(如单IP>5000次SYN)实时告警。
关键代码片段
1. 抓包回调(Scapy)
from scapy.all import sniff, IP, TCP import queue, threading pkt_queue = queue.Queue(maxsize=10000) def handle(pkt): # 只处理IPv4+TCP,减轻CPU if IP in pkt and TCP in pkt: ip_layer = pkt[IP] tcp_layer = pkt[TCP] record = { 'sip': ip_layer.src, 'dip': ip_layer.dst, 'sport': tcp_layer.sport, 'dport': tcp_layer.dport, 'proto': 'TCP', 'size': len(pkt), 'time': pkt.time } # 非阻塞丢包,防止内存堆积 try: pkt_queue.put_nowait(record) except queue.Full: pass def capture(iface='eth0'): sniff(iface=iface, prn=handle, store=0) threading.Thread(target=capture, daemon=True).start()2. 聚合与异常检测
import pandas as pd from collections import defaultdict flow_map = defaultdict(lambda: {'bytes':0, 'pkts':0, 'start':None, 'end':0}) def aggregate(): while True: try: row = pkt_queue.get(timeout=1) except queue.Empty: continue key = (row['sip'],row['sport'],row['dip'],row['dport'],row['proto']) fl = flow_map[key] fl['bytes'] += row['size'] fl['pkts'] += 1 fl['end'] = row['time'] if fl['start'] is None: fl['start'] = row['time'] # 后台线程定时落库 def snapshot(): while True: time.sleep(30) df = pd.DataFrame.from_records( [{'key':k, **v} for k,v in flow_map.items()] ) df.to_csv('/tmp/flows.csv', index=False) # 简单告警:SYN包过多 syn_cnt = df[df['pkts']>5000].shape[0] if syn_cnt: print(f'[ALERT] {syn_cnt} suspicious flows detected!') flow_map.clear() threading.Thread(target=aggregate, daemon=True).start() threading.Thread(target=snapshot, daemon=True).start()3. Flask路由
from flask import Flask, jsonify app = Flask(__name__) @app.route('/api/flows') def flows(): df = pd.read_csv('/tmp/flows.csv') return jsonify(df.to_dict(orient='records')) @app.route('/api/alert') def alert(): # 这里读redis/文件都行,演示用全局变量 return jsonify({'syn_flood': syn_cnt}) if __name__ == '__main__': app.run(debug=False, threaded=True)性能与安全:别让笔记本成“烤机”
内存泄漏
Scapy默认会缓存解析模板,长时间跑会吃光RAM。在脚本头部加:conf.debug_dissector = 0
并定期pkt_queue.queue.clear(),可把内存压在300 M以内(千兆镜像口,峰值6 000 pkt/s测试)。数据脱敏
校园网抓包极易踩GDPR/个人信息红线。落盘前统一匿名化:- IP末段置零
ip_mask = '.'.join(ip.split('.')[:3])+'.0' - TCP payload直接截断为0,只保留首部长度,防止抓到HTTP账号。
- IP末段置零
接口幂等
/api/flows每次读文件,天然幂等;如果改写成POST /drop下发防火墙策略,一定带UUID去重,防止前端重复点击把整栋宿舍楼踢下线。
生产环境踩坑速查表
- 权限:非root用户想跑混杂模式,加
setcap cap_net_raw,cap_net_admin+eip /usr/bin/python3.9,千万别给777。 - 内核丢包:
netstat -s | drop看packet drops。高负载时ethtool -G eth0 rx 4096把环形缓冲区调到最大。 - 时区:Scapy的
pkt.time是UTC,写数据库记得datetime.utcfromtimestamp,否则前端画出来凌晨三点高峰,老师以为你抓鬼。 - 大页内存:如果服务器>8 G流量,开
enable_hugepage=1,缺省4 k页容易TLB打满。 - NVME落盘:CSV只是演示,真实场景用ClickHouse或InfluxDB,列式压缩能把1 T原始pcap压到30 G指标数据。
可扩展方向:把“毕设”写成“论文”
- DDoS检测:把SYN泛洪指标换成Higgs-CNN,用Zeek导出bidirectional-flow,当数据集喂给PyTorch,AUC>0.98就能水一篇中文核心。
- Prometheus + Grafana:把
snapshot()改成推送到Pushgateway,顺手写个alerts.yml,值班老师能在微信收到“宿舍205 SYN Flood”机器人提醒。 - 分布式探针:树莓派+PoE,塞在弱电井,用ZeroTier打洞回传,毕业礼物留给实验室,下届学弟直接继承真实数据。
- 协议逆向:把未知UDP载荷取前128字节做k-means聚类,说不定能挖出某款手游的私有协议,再发一篇CTF Write-up。
结尾:动手才算学完
整套代码不到400行,却能跑通“抓包-特征-告警-展示”全链路;把笔记本带到教室,插根网线就能现场演示,老师提问再也不用“回头补数据”。
如果你也在为毕设头秃,不妨先pip install scapy flask,把本文脚本跑起来,再逐步往里面塞自己的算法。
网络世界不缺理论,缺的是能把比特变成图表、把异常变成警报的人。
下一步,你想给系统加上什么?DDoS检测、DNS隧道识别,还是直接对接Prometheus?——别光想,commit一把再说。