news 2026/5/10 7:20:19

Go语言Kafka实战:高性能消息队列开发指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Go语言Kafka实战:高性能消息队列开发指南

Go语言Kafka实战:高性能消息队列开发指南

1. Kafka概述

Apache Kafka是分布式流处理平台,具有高吞吐量、低延迟、可持久化、可横向扩展等优点,广泛用于日志收集、实时数据管道、流处理等场景。

2. 生产者实现

package kafka import ( "context" "time" "github.com/IBM/sarama" ) type Producer struct { producer sarama.SyncProducer topic string } func NewProducer(brokers []string, topic string) (*Producer, error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true config.Producer.Compression = sarama.CompressionSnappy config.Net.DialTimeout = 10 * time.Second config.Net.ReadTimeout = 10 * time.Second config.Net.WriteTimeout = 10 * time.Second producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return nil, err } return &Producer{ producer: producer, topic: topic, }, nil } func (p *Producer) SendMessage(key, value string) error { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(value), } partition, offset, err := p.producer.SendMessage(msg) if err != nil { return err } fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset) return nil } func (p *Producer) SendMessageWithHeaders(key string, value []byte, headers map[string]string) error { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(value), } for k, v := range headers { msg.Headers = append(msg.Headers, sarama.RecordHeader{ Key: []byte(k), Value: []byte(v), }) } _, _, err := p.producer.SendMessage(msg) return err } func (p *Producer) Close() error { return p.producer.Close() }

3. 消费者实现

type Consumer struct { consumer sarama.ConsumerGroup topic string handler *ConsumerGroupHandler } type ConsumerGroupHandler struct { ready chan bool } func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { close(h.ready) return nil } func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for { select { case message, ok := <-claim.Messages(): if !ok { return nil } fmt.Printf("Message received: key=%s, value=%s, partition=%d, offset=%d\n", string(message.Key), string(message.Value), message.Partition, message.Offset) session.MarkMessage(message, "") case <-session.Context().Done(): return nil } } } func NewConsumer(brokers []string, groupID, topic string) (*Consumer, error) { config := sarama.NewConfig() config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} config.Consumer.Offsets.Initial = sarama.OffsetNewest consumer, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { return nil, err } handler := &ConsumerGroupHandler{ ready: make(chan bool), } return &Consumer{ consumer: consumer, topic: topic, handler: handler, }, nil } func (c *Consumer) Start(ctx context.Context) error { for { if err := c.consumer.Consume(ctx, []string{c.topic}, c.handler); err != nil { return err } } } func (c *Consumer) Close() error { return c.consumer.Close() }

4. 异步生产者

type AsyncProducer struct { producer sarama.AsyncProducer topic string } func NewAsyncProducer(brokers []string, topic string) (*AsyncProducer, error) { config := sarama.NewConfig() config.Producer.Compression = sarama.CompressionSnappy config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Flush.Frequency = 500 * time.Millisecond producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, err } ap := &AsyncProducer{ producer: producer, topic: topic, } go ap.handleErrors() go ap.handleSuccesses() return ap, nil } func (a *AsyncProducer) handleErrors() { for err := range a.producer.Errors() { fmt.Printf("Producer error: %v\n", err) } } func (a *AsyncProducer) handleSuccesses() { for success := range a.producer.Successes() { fmt.Printf("Message sent to partition %d at offset %d\n", success.Partition, success.Offset) } } func (a *AsyncProducer) SendAsync(key, value string) { a.producer.Input() <- &sarama.ProducerMessage{ Topic: a.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(value), } } func (a *AsyncProducer) Close() error { return a.producer.Close() }

5. 消息可靠性保障

type ReliableProducer struct { producer *Producer retries int } func NewReliableProducer(brokers []string, topic string, retries int) (*ReliableProducer, error) { producer, err := NewProducer(brokers, topic) if err != nil { return nil, err } return &ReliableProducer{ producer: producer, retries: retries, }, nil } func (p *ReliableProducer) SendWithRetry(ctx context.Context, key, value string) error { var lastErr error for i := 0; i < p.retries; i++ { if err := p.producer.SendMessage(key, value); err != nil { lastErr = err time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) continue } return nil } return lastErr }

6. 消费者组管理

type ConsumerGroupManager struct { brokers []string groups map[string]*Consumer } func NewConsumerGroupManager(brokers []string) *ConsumerGroupManager { return &ConsumerGroupManager{ brokers: brokers, groups: make(map[string]*Consumer), } } func (m *ConsumerGroupManager) RegisterConsumer(groupID, topic string) (*Consumer, error) { consumer, err := NewConsumer(m.brokers, groupID, topic) if err != nil { return nil, err } m.groups[groupID] = consumer return consumer, nil } func (m *ConsumerGroupManager) UnregisterConsumer(groupID string) error { consumer, ok := m.groups[groupID] if !ok { return nil } delete(m.groups, groupID) return consumer.Close() }

7. 分区策略

type CustomPartitioner struct{} func (p *CustomPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) { key, err := message.Key.Encode() if err != nil || key == nil { return sarama.NewRandomPartitioner().Partition(message, numPartitions) } hash := crc32.ChecksumIEEE(key) return int32(hash % uint32(numPartitions)), nil } func (p *CustomPartitioner) RequiresConsistency() bool { return true }

8. 消息拦截器

type LoggingInterceptor struct{} func (i *LoggingInterceptor) OnSend(msg *sarama.ProducerMessage) { fmt.Printf("Sending message to topic %s, partition %d\n", msg.Topic, msg.Partition) } func (i *LoggingInterceptor) OnDeliver(msg *sarama.ProducerMessage, err error) { if err != nil { fmt.Printf("Failed to deliver message: %v\n", err) } else { fmt.Printf("Message delivered to topic %s, partition %d\n", msg.Topic, msg.Partition) } }

9. 总结

本文详细介绍了Go语言中使用Sarama库操作Kafka的方法,包括生产者、消费者、异步处理、可靠性保障等核心功能,为构建高性能消息队列系统提供了完整的解决方案。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/10 7:17:13

CANN/metadef自动映射函数

AutoMappingFn 【免费下载链接】metadef Ascend Metadata Definition 项目地址: https://gitcode.com/cann/metadef 函数功能 自动映射回调函数。 函数原型 Status AutoMappingFn(const google::protobuf::Message *op_src, ge::Operator &op)参数说明 参数 输入…

作者头像 李华
网站建设 2026/5/10 7:15:53

技术招聘内卷化:如何在疯狂的市场中识别真人才?

在软件测试这个领域深耕多年&#xff0c;我亲眼见证了招聘市场从“求贤若渴”到“泥沙俱下”的剧烈转变。当“内卷”成为常态&#xff0c;简历上的技术栈越来越华丽&#xff0c;面试回答越来越像标准答案&#xff0c;我们这些身处其中的测试人&#xff0c;反而陷入了一种集体性…

作者头像 李华
网站建设 2026/5/10 7:12:23

ARM架构HFGRTR_EL2寄存器与虚拟化陷阱机制详解

1. ARM架构中的异常级别与虚拟化基础在深入探讨HFGRTR_EL2寄存器之前&#xff0c;我们需要先理解ARMv8/v9架构中的异常级别&#xff08;Exception Levels&#xff09;概念。异常级别是ARM处理器实现特权隔离和安全控制的核心机制&#xff0c;类似于x86架构中的Ring 0-Ring 3&am…

作者头像 李华
网站建设 2026/5/10 7:10:38

MockGPS虚拟定位深度解析:Android位置模拟终极方案

MockGPS虚拟定位深度解析&#xff1a;Android位置模拟终极方案 【免费下载链接】MockGPS Android application to fake GPS 项目地址: https://gitcode.com/gh_mirrors/mo/MockGPS 在移动应用开发测试、隐私保护和地理定位功能验证等场景中&#xff0c;精准的位置模拟需…

作者头像 李华
网站建设 2026/5/10 7:10:32

开源统一身份认证平台Casdoor:架构解析与生产实践指南

1. 项目概述&#xff1a;一个开源的统一身份认证与单点登录平台如果你正在为多个内部系统、SaaS应用或者自研产品搭建一套统一的用户登录体系&#xff0c;并且对市面上商业化的身份即服务&#xff08;IDaaS&#xff09;方案的成本、定制化程度或者数据主权有所顾虑&#xff0c;…

作者头像 李华
网站建设 2026/5/10 7:09:33

AI驱动单元测试生成:Cursor编辑器三阶段工作流实战指南

1. 项目概述&#xff1a;AI驱动的单元测试生成工作流如果你和我一样&#xff0c;长期被单元测试的编写工作所困扰——既知道它的重要性&#xff0c;又常常因为时间紧张或觉得枯燥而将其延后&#xff0c;那么这个名为“AI Unit Test Builder for Cursor”的项目&#xff0c;绝对…

作者头像 李华