Go语言Kafka实战:高性能消息队列开发指南
1. Kafka概述
Apache Kafka是分布式流处理平台,具有高吞吐量、低延迟、可持久化、可横向扩展等优点,广泛用于日志收集、实时数据管道、流处理等场景。
2. 生产者实现
package kafka import ( "context" "time" "github.com/IBM/sarama" ) type Producer struct { producer sarama.SyncProducer topic string } func NewProducer(brokers []string, topic string) (*Producer, error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true config.Producer.Compression = sarama.CompressionSnappy config.Net.DialTimeout = 10 * time.Second config.Net.ReadTimeout = 10 * time.Second config.Net.WriteTimeout = 10 * time.Second producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return nil, err } return &Producer{ producer: producer, topic: topic, }, nil } func (p *Producer) SendMessage(key, value string) error { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(value), } partition, offset, err := p.producer.SendMessage(msg) if err != nil { return err } fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset) return nil } func (p *Producer) SendMessageWithHeaders(key string, value []byte, headers map[string]string) error { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(value), } for k, v := range headers { msg.Headers = append(msg.Headers, sarama.RecordHeader{ Key: []byte(k), Value: []byte(v), }) } _, _, err := p.producer.SendMessage(msg) return err } func (p *Producer) Close() error { return p.producer.Close() }3. 消费者实现
type Consumer struct { consumer sarama.ConsumerGroup topic string handler *ConsumerGroupHandler } type ConsumerGroupHandler struct { ready chan bool } func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { close(h.ready) return nil } func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for { select { case message, ok := <-claim.Messages(): if !ok { return nil } fmt.Printf("Message received: key=%s, value=%s, partition=%d, offset=%d\n", string(message.Key), string(message.Value), message.Partition, message.Offset) session.MarkMessage(message, "") case <-session.Context().Done(): return nil } } } func NewConsumer(brokers []string, groupID, topic string) (*Consumer, error) { config := sarama.NewConfig() config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} config.Consumer.Offsets.Initial = sarama.OffsetNewest consumer, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { return nil, err } handler := &ConsumerGroupHandler{ ready: make(chan bool), } return &Consumer{ consumer: consumer, topic: topic, handler: handler, }, nil } func (c *Consumer) Start(ctx context.Context) error { for { if err := c.consumer.Consume(ctx, []string{c.topic}, c.handler); err != nil { return err } } } func (c *Consumer) Close() error { return c.consumer.Close() }4. 异步生产者
type AsyncProducer struct { producer sarama.AsyncProducer topic string } func NewAsyncProducer(brokers []string, topic string) (*AsyncProducer, error) { config := sarama.NewConfig() config.Producer.Compression = sarama.CompressionSnappy config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Flush.Frequency = 500 * time.Millisecond producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, err } ap := &AsyncProducer{ producer: producer, topic: topic, } go ap.handleErrors() go ap.handleSuccesses() return ap, nil } func (a *AsyncProducer) handleErrors() { for err := range a.producer.Errors() { fmt.Printf("Producer error: %v\n", err) } } func (a *AsyncProducer) handleSuccesses() { for success := range a.producer.Successes() { fmt.Printf("Message sent to partition %d at offset %d\n", success.Partition, success.Offset) } } func (a *AsyncProducer) SendAsync(key, value string) { a.producer.Input() <- &sarama.ProducerMessage{ Topic: a.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(value), } } func (a *AsyncProducer) Close() error { return a.producer.Close() }5. 消息可靠性保障
type ReliableProducer struct { producer *Producer retries int } func NewReliableProducer(brokers []string, topic string, retries int) (*ReliableProducer, error) { producer, err := NewProducer(brokers, topic) if err != nil { return nil, err } return &ReliableProducer{ producer: producer, retries: retries, }, nil } func (p *ReliableProducer) SendWithRetry(ctx context.Context, key, value string) error { var lastErr error for i := 0; i < p.retries; i++ { if err := p.producer.SendMessage(key, value); err != nil { lastErr = err time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) continue } return nil } return lastErr }6. 消费者组管理
type ConsumerGroupManager struct { brokers []string groups map[string]*Consumer } func NewConsumerGroupManager(brokers []string) *ConsumerGroupManager { return &ConsumerGroupManager{ brokers: brokers, groups: make(map[string]*Consumer), } } func (m *ConsumerGroupManager) RegisterConsumer(groupID, topic string) (*Consumer, error) { consumer, err := NewConsumer(m.brokers, groupID, topic) if err != nil { return nil, err } m.groups[groupID] = consumer return consumer, nil } func (m *ConsumerGroupManager) UnregisterConsumer(groupID string) error { consumer, ok := m.groups[groupID] if !ok { return nil } delete(m.groups, groupID) return consumer.Close() }7. 分区策略
type CustomPartitioner struct{} func (p *CustomPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) { key, err := message.Key.Encode() if err != nil || key == nil { return sarama.NewRandomPartitioner().Partition(message, numPartitions) } hash := crc32.ChecksumIEEE(key) return int32(hash % uint32(numPartitions)), nil } func (p *CustomPartitioner) RequiresConsistency() bool { return true }8. 消息拦截器
type LoggingInterceptor struct{} func (i *LoggingInterceptor) OnSend(msg *sarama.ProducerMessage) { fmt.Printf("Sending message to topic %s, partition %d\n", msg.Topic, msg.Partition) } func (i *LoggingInterceptor) OnDeliver(msg *sarama.ProducerMessage, err error) { if err != nil { fmt.Printf("Failed to deliver message: %v\n", err) } else { fmt.Printf("Message delivered to topic %s, partition %d\n", msg.Topic, msg.Partition) } }9. 总结
本文详细介绍了Go语言中使用Sarama库操作Kafka的方法,包括生产者、消费者、异步处理、可靠性保障等核心功能,为构建高性能消息队列系统提供了完整的解决方案。