从零构建用户相似度挖掘系统:基于Graph Embedding与Milvus的实战指南
在数字营销和个性化推荐领域,寻找与种子用户相似的目标人群(Look-alike)一直是核心挑战。传统基于规则标签的方法不仅效率低下,而且难以捕捉用户间复杂的非线性关系。本文将带你用Python构建一个完整的Look-alike系统,涵盖用户关系图构建、EGES算法实现、Milvus向量数据库集成等关键环节。
1. 系统架构设计与技术选型
一个完整的Look-alike系统通常包含数据层、算法层和服务层三个核心模块。我们选择的技术栈组合兼顾了效果与工程落地性:
- 数据处理层:Pandas + NetworkX
- 图嵌入算法:EGES(增强型Graph Embedding)
- 向量检索:Milvus 2.0+
- 服务封装:FastAPI
技术对比分析:
| 方案类型 | 代表算法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 基于标签 | TGI扩散 | 实现简单 | 精度有限 | 冷启动阶段 |
| 基于协同过滤 | ItemCF | 无需特征工程 | 稀疏性问题 | 行为数据丰富场景 |
| 基于图网络 | EGES | 关系捕捉能力强 | 计算成本高 | 社交/行为关联强场景 |
| 基于深度学习 | GNN | 自动特征学习 | 训练复杂 | 超大规模数据 |
提示:EGES(Enhanced Graph Embedding with Side Information)是BGE算法的改进版,通过融合节点属性信息提升嵌入质量,特别适合用户行为稀疏的场景。
2. 用户关系图构建实战
用户关系图是Graph Embedding的基础,其构建质量直接影响最终效果。我们从模拟数据生成开始,演示完整的图构建流程。
2.1 模拟数据生成
import pandas as pd import numpy as np # 生成10000个模拟用户 users = [f"user_{i}" for i in range(10000)] # 用户属性:年龄、性别、城市 attrs = { "age": np.random.randint(18, 60, size=10000), "gender": np.random.choice(["M","F"], size=10000), "city": np.random.choice(["北京","上海","广州","深圳"], size=10000) } # 用户行为数据:物品交互序列 items = [f"item_{i}" for i in range(500)] behavior_data = [] for user in users: for _ in range(np.random.randint(5, 20)): behavior_data.append({ "user_id": user, "item_id": np.random.choice(items), "timestamp": pd.Timestamp.now() - pd.Timedelta(minutes=np.random.randint(0, 10080)) }) df_behavior = pd.DataFrame(behavior_data)2.2 图构建策略
用户关系边的定义需要结合业务场景,常见构建方式包括:
- 共现关系:共同交互过相同物品的用户建立边
- 时序关系:连续交互行为的用户形成边
- 属性相似: demographic特征相似的用户连接
import networkx as nx from collections import defaultdict # 基于物品共现构建用户关系图 G = nx.Graph() user_item_map = defaultdict(set) # 建立用户-物品二分图 for _, row in df_behavior.iterrows(): user_item_map[row['user_id']].add(row['item_id']) # 计算用户间Jaccard相似度作为边权重 users = list(user_item_map.keys()) for i in range(len(users)): for j in range(i+1, len(users)): set_i = user_item_map[users[i]] set_j = user_item_map[users[j]] intersection = len(set_i & set_j) union = len(set_i | set_j) if union > 0 and intersection >= 2: # 至少两个共同物品 weight = intersection / union G.add_edge(users[i], users[j], weight=weight)注意:实际业务中需要根据数据特点调整边权重计算方式,例如引入时间衰减因子。
3. EGES算法实现与优化
EGES算法核心思想是在DeepWalk基础上融合节点属性信息,其模型架构包含以下几个关键组件:
- 主嵌入层:学习节点拓扑结构特征
- 属性嵌入层:处理节点属性特征
- 注意力机制:动态融合多特征源
3.1 基础实现
import torch import torch.nn as nn import torch.nn.functional as F class EGES(nn.Module): def __init__(self, num_nodes, num_attrs, embed_dim=64): super(EGES, self).__init__() self.node_embed = nn.Embedding(num_nodes, embed_dim) self.attr_embeds = nn.ModuleList([ nn.Embedding(num_attrs[attr], embed_dim) for attr in num_attrs ]) self.attention = nn.Linear(embed_dim, 1) def forward(self, node_ids, attr_ids): # 主嵌入 h_node = self.node_embed(node_ids) # 属性嵌入 attr_embs = [] for i, embed in enumerate(self.attr_embeds): attr_embs.append(embed(attr_ids[:,i])) # 注意力权重 all_embs = torch.stack([h_node] + attr_embs, dim=1) attn_scores = F.softmax(self.attention(all_embs), dim=1) # 加权融合 final_embed = torch.sum(attn_scores * all_embs, dim=1) return final_embed3.2 训练技巧
- 负采样优化:采用动态负采样策略,对高频节点增加采样概率
- 多任务学习:同时优化链接预测和属性预测任务
- 渐进式训练:先训练主嵌入层,再联合训练属性嵌入
def train_epoch(model, data_loader, optimizer): model.train() total_loss = 0 for batch in data_loader: optimizer.zero_grad() # 正样本 pos_nodes, pos_attrs = batch['pos'] pos_emb = model(pos_nodes, pos_attrs) # 负样本 neg_nodes, neg_attrs = batch['neg'] neg_emb = model(neg_nodes, neg_attrs) # 损失计算 pos_scores = torch.sum(pos_emb[::2] * pos_emb[1::2], dim=1) neg_scores = torch.sum(pos_emb[::2] * neg_emb[1::2], dim=1) loss = -torch.mean(torch.log(torch.sigmoid(pos_scores - neg_scores))) loss.backward() optimizer.step() total_loss += loss.item() return total_loss / len(data_loader)4. Milvus向量数据库集成
Milvus作为专为向量检索优化的数据库,能够高效处理大规模相似度查询。我们重点介绍部署配置和性能优化技巧。
4.1 集群部署方案
对于千万级用户规模的系统,推荐采用分布式集群部署:
version: '3.5' services: milvus: image: milvusdb/milvus:v2.0.0 ports: - "19530:19530" environment: - ETCD_ENABLED=true - MINIO_ENABLED=true volumes: - ./volumes/milvus:/var/lib/milvus deploy: resources: limits: cpus: '4' memory: 8G etcd: image: quay.io/coreos/etcd:v3.5.0 ports: - "2379:2379" environment: - ETCD_AUTO_COMPACTION_RETENTION=1 volumes: - ./volumes/etcd:/etcd minio: image: minio/minio:RELEASE.2021-06-17T00-10-46Z ports: - "9000:9000" environment: - MINIO_ACCESS_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin volumes: - ./volumes/minio:/data4.2 性能优化参数
通过调整以下参数可以显著提升查询性能:
from pymilvus import Collection, connections, FieldSchema, CollectionSchema, DataType # 连接配置 connections.connect( "default", host="localhost", port="19530", secure=False ) # 集合Schema定义 fields = [ FieldSchema(name="user_id", dtype=DataType.VARCHAR, is_primary=True, max_length=64), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=64) ] schema = CollectionSchema( fields=fields, description="User embedding collection", enable_dynamic_field=False ) # 创建集合 collection = Collection( name="user_embeddings", schema=schema, consistency_level="Strong" ) # 索引配置 index_params = { "index_type": "IVF_FLAT", "metric_type": "IP", # 内积相似度 "params": {"nlist": 1024} } collection.create_index( field_name="embedding", index_params=index_params ) # 加载优化 collection.load( replica_number=2, _refresh=False, _timeout=30 )提示:生产环境建议nlist设置为集群CPU核心数的4-8倍,查询时nprobe设置为nlist的1/16到1/8。
5. 效果评估与线上部署
完整的Look-alike系统需要建立科学的评估体系,我们设计了三层评估指标:
离线指标:
- 覆盖率(Coverage):扩展人群对种子人群的覆盖比例
- 相似度(Similarity):向量空间余弦距离分布
在线指标:
- CTR提升率
- 转化成本下降率
业务指标:
- ROI(投资回报率)
- 用户留存率变化
A/B测试结果示例:
| 策略 | 人群规模 | CTR | CVR | 单次转化成本 |
|---|---|---|---|---|
| 规则扩展 | 50万 | 1.2% | 0.8% | ¥25.6 |
| EGES+Milvus | 50万 | 2.7% | 1.5% | ¥18.3 |
| EGES+Milvus | 200万 | 2.1% | 1.2% | ¥20.1 |
在Docker化部署时,建议采用以下服务架构:
├── app/ │ ├── api/ # FastAPI接口 │ ├── models/ # 算法模型 │ ├── services/ # 业务逻辑 │ └── config.py ├── docker-compose.yml ├── Dockerfile └── requirements.txt接口关键实现示例:
from fastapi import FastAPI from pymilvus import Collection import numpy as np app = FastAPI() collection = Collection("user_embeddings") @app.post("/lookalike") async def find_similar_users( seed_users: list[str], top_k: int = 1000, expand_ratio: float = 10.0 ): # 获取种子用户向量 seed_embeddings = get_embeddings(seed_users) # 相似度搜索 search_params = { "metric_type": "IP", "params": {"nprobe": 128} } results = collection.search( data=seed_embeddings, anns_field="embedding", param=search_params, limit=int(len(seed_users)*expand_ratio), output_fields=["user_id"] ) # 结果聚合与去重 similar_users = set() for hits in results: for hit in hits: similar_users.add(hit.entity.get("user_id")) return { "similar_users": list(similar_users)[:top_k], "count": len(similar_users) }在实际项目中,我们遇到了几个典型问题及解决方案:
- 冷启动问题:新用户缺乏行为数据时,采用属性相似度作为补充
- 数据稀疏性:引入跨域行为数据(如浏览、搜索、购买)丰富关系图
- 在线性能:实现多级缓存策略,对高频查询结果缓存5-10分钟