Redis高级数据结构:超越String的Redis世界
引言
Redis不仅仅是"一个KV存储",它提供了丰富的数据结构,是现代应用架构中不可或缺的组件。深入理解Redis的数据结构,能够帮助我们设计出更高效、更优雅的解决方案。本文将全面解析Redis的五大基础类型和五种扩展类型。
一、String类型及其应用
1.1 String基础操作
package redis import ( "context" "fmt" "time" "github.com/redis/go-redis/v9" ) type StringOperations struct { client *redis.Client } func NewStringOperations(client *redis.Client) *StringOperations { return &StringOperations{client: client} } func (r *StringOperations) SetWithExpiry(ctx context.Context, key, value string, expiry time.Duration) error { return r.client.Set(ctx, key, value, expiry).Err() } func (r *StringOperations) Get(ctx context.Context, key string) (string, error) { return r.client.Get(ctx, key).Result() } func (r *StringOperations) MSet(ctx context.Context, values map[string]interface{}) error { return r.client.MSet(ctx, values).Err() } func (r *StringOperations) MGet(ctx context.Context, keys ...string) ([]interface{}, error) { return r.client.MGet(ctx, keys...).Result() } func (r *StringOperations) SetNX(ctx context.Context, key, value string, expiry time.Duration) (bool, error) { return r.client.SetNX(ctx, key, value, expiry).Result() }1.2 分布式锁实现
package redis import ( "context" "errors" "fmt" "time" "github.com/redis/go-redis/v9" ) var ( ErrLockNotAcquired = errors.New("lock not acquired") ErrLockNotHeld = errors.New("lock not held") ) type DistributedLock struct { client *redis.Client key string value string expiry time.Duration } func NewDistributedLock(client *redis.Client, key string, expiry time.Duration) *DistributedLock { return &DistributedLock{ client: client, key: key, value: fmt.Sprintf("%d", time.Now().UnixNano()), expiry: expiry, } } func (dl *DistributedLock) Acquire(ctx context.Context) (bool, error) { result, err := dl.client.SetNX(ctx, dl.key, dl.value, dl.expiry).Result() if err != nil { return false, err } return result, nil } func (dl *DistributedLock) Release(ctx context.Context) error { script := redis.NewScript(` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end `) result, err := script.Run(ctx, dl.client, []string{dl.key}, dl.value).Int64() if err != nil { return err } if result == 0 { return ErrLockNotHeld } return nil } func (dl *DistributedLock) Extend(ctx context.Context, expiry time.Duration) error { script := redis.NewScript(` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end `) result, err := script.Run(ctx, dl.client, []string{dl.key}, dl.value, expiry.Milliseconds()).Int64() if err != nil { return err } if result == 0 { return ErrLockNotHeld } return nil }二、Hash类型:对象存储
2.1 Hash基本操作
package redis import ( "context" "encoding/json" "time" "github.com/redis/go-redis/v9" ) type HashOperations struct { client *redis.Client } func NewHashOperations(client *redis.Client) *HashOperations { return &HashOperations{client: client} } func (r *HashOperations) HSet(ctx context.Context, key string, values map[string]interface{}) error { return r.client.HSet(ctx, key, values).Err() } func (r *HashOperations) HGet(ctx context.Context, key, field string) (string, error) { return r.client.HGet(ctx, key, field).Result() } func (r *HashOperations) HGetAll(ctx context.Context, key string) (map[string]string, error) { return r.client.HGetAll(ctx, key).Result() } func (r *HashOperations) HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error) { return r.client.HIncrBy(ctx, key, field, incr).Result() } func (r *HashOperations) HExists(ctx context.Context, key, field string) (bool, error) { return r.client.HExists(ctx, key, field).Result() }2.2 对象缓存实战
package redis import ( "context" "encoding/json" "fmt" "time" "github.com/redis/go-redis/v9" ) type UserCache struct { client *redis.Client prefix string } func NewUserCache(client *redis.Client) *UserCache { return &UserCache{ client: client, prefix: "user:", } } type User struct { ID int64 `json:"id"` Name string `json:"name"` Email string `json:"email"` Status string `json:"status"` CreatedAt int64 `json:"created_at"` } func (uc *UserCache) Get(ctx context.Context, userID int64) (*User, error) { key := fmt.Sprintf("%s%d", uc.prefix, userID) data, err := uc.client.HGetAll(ctx, key).Result() if err != nil { return nil, err } if len(data) == 0 { return nil, nil } user := &User{ ID: userID, Name: data["name"], Email: data["email"], Status: data["status"], CreatedAt: time.Now().Unix(), } return user, nil } func (uc *UserCache) Set(ctx context.Context, user *User, expiry time.Duration) error { key := fmt.Sprintf("%s%d", uc.prefix, user.ID) fields := map[string]interface{}{ "name": user.Name, "email": user.Email, "status": user.Status, } pipe := uc.client.Pipeline() pipe.HSet(ctx, key, fields) pipe.Expire(ctx, key, expiry) _, err := pipe.Exec(ctx) return err } func (uc *UserCache) Delete(ctx context.Context, userID int64) error { key := fmt.Sprintf("%s%d", uc.prefix, userID) return uc.client.Del(ctx, key).Err() } func (uc *UserCache) UpdateField(ctx context.Context, userID int64, field string, value interface{}) error { key := fmt.Sprintf("%s%d", uc.prefix, userID) return uc.client.HSet(ctx, key, field, value).Err() } func (uc *UserCache) GetField(ctx context.Context, userID int64, field string) (string, error) { key := fmt.Sprintf("%s%d", uc.prefix, userID) return uc.client.HGet(ctx, key, field).Result() } func (uc *UserCache) IncrementCounter(ctx context.Context, userID int64, field string) (int64, error) { key := fmt.Sprintf("%s%d", uc.prefix, userID) return uc.client.HIncrBy(ctx, key, field, 1).Result() }三、List类型:队列与栈
3.1 阻塞队列实现
package redis import ( "context" "encoding/json" "time" "github.com/redis/go-redis/v9" ) type BlockingQueue struct { client *redis.Client name string } func NewBlockingQueue(client *redis.Client, name string) *BlockingQueue { return &BlockingQueue{ client: client, name: name, } } func (bq *BlockingQueue) Enqueue(ctx context.Context, value interface{}) error { data, err := json.Marshal(value) if err != nil { return err } return bq.client.LPush(ctx, bq.name, data).Err() } func (bq *BlockingQueue) Dequeue(ctx context.Context, timeout time.Duration) ([]byte, error) { result, err := bq.client.BRPop(ctx, timeout, bq.name).Result() if err != nil { return nil, err } if len(result) < 2 { return nil, nil } return []byte(result[1]), nil } func (bq *BlockingQueue) DequeueWithContext(ctx context.Context) ([]byte, error) { result, err := bq.client.BRPop(ctx, 0, bq.name).Result() if err != nil { return nil, err } if len(result) < 2 { return nil, nil } return []byte(result[1]), nil } func (bq *BlockingQueue) Length(ctx context.Context) (int64, error) { return bq.client.LLen(ctx, bq.name).Result() }3.2 延迟队列实现
package redis import ( "context" "encoding/json" "fmt" "time" "github.com/redis/go-redis/v9" ) type DelayedQueue struct { client *redis.Client name string zsetName string } func NewDelayedQueue(client *redis.Client, name string) *DelayedQueue { return &DelayedQueue{ client: client, name: name, zsetName: name + ":delayed", } } type DelayedMessage struct { ID string Payload interface{} ExecuteAt time.Time } func (dq *DelayedQueue) Schedule(ctx context.Context, msg *DelayedMessage) error { data, err := json.Marshal(msg.Payload) if err != nil { return err } score := float64(msg.ExecuteAt.Unix()) pipe := dq.client.Pipeline() pipe.ZAdd(ctx, dq.zsetName, redis.Z{ Score: score, Member: fmt.Sprintf("%s:%s", msg.ID, string(data)), }) pipe.LPush(ctx, dq.name, msg.ID) _, err = pipe.Exec(ctx) return err } func (dq *DelayedQueue) Process(ctx context.Context, handler func(msg *DelayedMessage) error) error { now := float64(time.Now().Unix()) result, err := dq.client.ZPopMin(ctx, dq.zsetName, 1).Result() if err != nil { return err } if len(result) == 0 { return nil } msgID := result[0].Member.(string) payload, err := dq.client.LPop(ctx, dq.name).Result() if err != nil { return err } msg := &DelayedMessage{ ID: msgID, ExecuteAt: time.Unix(int64(result[0].Score), 0), } if err := json.Unmarshal([]byte(payload), &msg.Payload); err != nil { return err } return handler(msg) } func (dq *DelayedQueue) Cancel(ctx context.Context, msgID string) error { pattern := msgID + ":*" result, err := dq.client.ZRangeByScore(ctx, dq.zsetName, &redis.ZRangeBy{ Min: "-inf", Max: "+inf", }).Result() if err != nil { return err } for _, member := range result { if len(member) > len(msgID)+1 && member[:len(msgID)] == msgID { return dq.client.ZRem(ctx, dq.zsetName, member).Err() } } return nil }四、Set类型:无序去重
4.1 标签系统实现
package redis import ( "context" "fmt" "github.com/redis/go-redis/v9" ) type TagSystem struct { client *redis.Client } func NewTagSystem(client *redis.Client) *TagSystem { return &TagSystem{client: client} } func (ts *TagSystem) AddTags(ctx context.Context, entityType string, entityID string, tags ...string) error { key := fmt.Sprintf("entity:%s:%s:tags", entityType, entityID) return ts.client.SAdd(ctx, key, tags).Err() } func (ts *TagSystem) RemoveTags(ctx context.Context, entityType string, entityID string, tags ...string) error { key := fmt.Sprintf("entity:%s:%s:tags", entityType, entityID) return ts.client.SRem(ctx, key, tags).Err() } func (ts *TagSystem) GetTags(ctx context.Context, entityType string, entityID string) ([]string, error) { key := fmt.Sprintf("entity:%s:%s:tags", entityType, entityID) return ts.client.SMembers(ctx, key).Result() } func (ts *TagSystem) HasTag(ctx context.Context, entityType string, entityID string, tag string) (bool, error) { key := fmt.Sprintf("entity:%s:%s:tags", entityType, entityID) return ts.client.SIsMember(ctx, key, tag).Result() } func (ts *TagSystem) GetEntitiesByTag(ctx context.Context, entityType string, tag string) ([]string, error) { key := fmt.Sprintf("tag:%s:%s:%s", entityType, tag, "entities") return ts.client.SMembers(ctx, key).Result() } func (ts *TagSystem) AddEntityToTag(ctx context.Context, entityType string, entityID string, tag string) error { entityKey := fmt.Sprintf("entity:%s:%s:tags", entityType, entityID) tagKey := fmt.Sprintf("tag:%s:%s:%s", entityType, tag, "entities") pipe := ts.client.Pipeline() pipe.SAdd(ctx, entityKey, tag) pipe.SAdd(ctx, tagKey, entityID) _, err := pipe.Exec(ctx) return err } func (ts *TagSystem) GetIntersection(ctx context.Context, entityType string, tags ...string) ([]string, error) { if len(tags) == 0 { return nil, nil } keys := make([]string, len(tags)) for i, tag := range tags { keys[i] = fmt.Sprintf("tag:%s:%s:%s", entityType, tag, "entities") } return ts.client.SInter(ctx, keys...).Result() } func (ts *TagSystem) GetUnion(ctx context.Context, entityType string, tags ...string) ([]string, error) { if len(tags) == 0 { return nil, nil } keys := make([]string, len(tags)) for i, tag := range tags { keys[i] = fmt.Sprintf("tag:%s:%s:%s", entityType, tag, "entities") } return ts.client.SUnion(ctx, keys...).Result() }五、ZSet类型:有序集合
5.1 排行榜实现
package redis import ( "context" "fmt" "github.com/redis/go-redis/v9" ) type Leaderboard struct { client *redis.Client key string } func NewLeaderboard(client *redis.Client, name string) *Leaderboard { return &Leaderboard{ client: client, key: fmt.Sprintf("leaderboard:%s", name), } } func (l *Leaderboard) UpdateScore(ctx context.Context, member string, score float64) error { return l.client.ZAdd(ctx, l.key, redis.Z{ Score: score, Member: member, }).Err() } func (l *Leaderboard) IncrementScore(ctx context.Context, member string, increment float64) (float64, error) { return l.client.ZIncrBy(ctx, l.key, increment, member).Result() } func (l *Leaderboard) GetRank(ctx context.Context, member string) (int64, error) { rank, err := l.client.ZRevRank(ctx, l.key, member).Result() if err != nil { return -1, err } return rank + 1, nil } func (l *Leaderboard) GetScore(ctx context.Context, member string) (float64, error) { return l.client.ZScore(ctx, l.key, member).Result() } func (l *Leaderboard) GetTopN(ctx context.Context, n int64) ([]redis.Z, error) { return l.client.ZRevRangeWithScores(ctx, l.key, 0, n-1).Result() } func (l *Leaderboard) GetRange(ctx context.Context, start, stop int64) ([]redis.Z, error) { return l.client.ZRevRangeWithScores(ctx, l.key, start, stop).Result() } func (l *Leaderboard) GetRankedMembers(ctx context.Context, start, stop int64) ([]string, error) { return l.client.ZRevRange(ctx, l.key, start, stop).Result() } func (l *Leaderboard) RemoveMember(ctx context.Context, member string) error { return l.client.ZRem(ctx, l.key, member).Err() } func (l *Leaderboard) GetCount(ctx context.Context) (int64, error) { return l.client.ZCard(ctx, l.key).Result() } func (l *Leaderboard) GetMembersByScore(ctx context.Context, min, max float64) ([]redis.Z, error) { return l.client.ZRevRangeByScoreWithScores(ctx, l.key, &redis.ZRangeBy{ Min: fmt.Sprintf("%f", min), Max: fmt.Sprintf("%f", max), }).Result() }六、Geospatial:地理位置
package redis import ( "context" "fmt" "github.com/redis/go-redis/v9" ) type GeoOperations struct { client *redis.Client } func NewGeoOperations(client *redis.Client) *GeoOperations { return &GeoOperations{client: client} } func (g *GeoOperations) AddLocation(ctx context.Context, key string, longitude, latitude float64, member string) error { return g.client.GeoAdd(ctx, key, &redis.GeoLocation{ Name: member, Longitude: longitude, Latitude: latitude, }).Err() } func (g *GeoOperations) GetPosition(ctx context.Context, key string, member string) (*redis.GeoPos, error) { pos, err := g.client.GeoPos(ctx, key, member).Result() if err != nil { return nil, err } if len(pos) == 0 || pos[0] == nil { return nil, fmt.Errorf("member not found") } return pos[0], nil } func (g *GeoOperations) GetDistance(ctx context.Context, key, member1, member2, unit string) (float64, error) { return g.client.GeoDist(ctx, key, member1, member2, unit).Result() } func (g *GeoOperations) SearchNearby(ctx context.Context, key string, longitude, latitude, radius float64, unit string, count int) ([]redis.GeoLocation, error) { return g.client.GeoSearchLocation(ctx, key, &redis.GeoSearchLocationQuery{ GeoSearchQuery: redis.GeoSearchQuery{ Longitude: longitude, Latitude: latitude, Radius: radius, Unit: unit, WithCoord: true, WithDist: true, Count: count, Sort: "ASC", }, }).Result() }七、总结
Redis的丰富数据结构为开发者提供了强大的工具集:
- String:适用于简单的KV缓存、计数器、分布式锁
- Hash:适用于对象存储,适合字段级别的更新
- List:适用于消息队列、任务队列、栈
- Set:适用于标签系统、去重场景、交集运算
- ZSet:适用于排行榜、有序事件、延迟队列
- Geospatial:适用于附近的人、地理位置服务
合理选择数据结构,能够显著提升应用性能和代码可读性。