视频看了几百小时还迷糊?关注我,几分钟让你秒懂!(发点评论可以给博主加热度哦)
一、为什么用 Netty?真实业务场景解析
在物联网(IoT)、金融交易、游戏服务器、即时通讯等场景中,高并发、低延迟、稳定可靠的 TCP 长连接是刚需。
🌰 场景:智能设备上报数据
- 10万台设备通过 TCP 连接服务器
- 每秒上报位置、电量、状态等信息
- 服务器需实时处理并下发指令(如“重启”、“升级”)
传统 Java BIO(Blocking IO)方案:
ServerSocket server = new ServerSocket(8080); while (true) { Socket socket = server.accept(); // 阻塞 new Thread(() -> handle(socket)).start(); // 每连接开一线程 }❌ 问题:
- 线程数爆炸(10万连接 = 10万线程)
- 上下文切换开销大
- 内存耗尽、系统崩溃
Netty 方案(Reactor 模型):
✅ 优势:
- 单线程可处理数万连接(基于 NIO + 事件驱动)
- 内存池、零拷贝优化性能
- 自带心跳、粘包/拆包、重连等机制
二、Spring Boot + Netty 快速搭建 TCP 服务(正确姿势)
💡 注意:Netty 是独立于 Spring 的网络框架,但可无缝集成到 Spring Boot 项目中。
第一步:添加依赖(pom.xml)
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.100.Final</version> </dependency> <!-- 可选:用于优雅关闭 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>第二步:定义消息协议(解决粘包/拆包)
我们采用长度字段 + JSON的自定义协议:
| 4字节长度 | JSON字符串 | |-----------|------------| | 00 00 00 1C | {"cmd":"login","data":{"id":"D1001"}} |✅ 为什么不用纯文本?
因为 TCP 是流式协议,不保证消息边界,必须定义协议!
第三步:编写消息编解码器
1. 消息实体类
public class TcpMessage { private String cmd; // 指令:login, heartbeat, data private Object data; // 负载数据 // getter/setter/toString 略 }2. 编码器(Java对象 → 字节流)
public class MessageEncoder extends MessageToByteEncoder<TcpMessage> { @Override protected void encode(ChannelHandlerContext ctx, TcpMessage msg, ByteBuf out) { try { String json = new ObjectMapper().writeValueAsString(msg); byte[] bytes = json.getBytes(StandardCharsets.UTF_8); out.writeInt(bytes.length); // 写入4字节长度 out.writeBytes(bytes); // 写入JSON内容 } catch (Exception e) { throw new RuntimeException("编码失败", e); } } }3. 解码器(字节流 → Java对象)
public class MessageDecoder extends LengthFieldBasedFrameDecoder { public MessageDecoder() { super( 1024 * 1024, // maxFrameLength 最大帧长度 0, // lengthFieldOffset 长度字段偏移 4, // lengthFieldLength 长度字段占4字节 0, // lengthAdjustment 4 // initialBytesToStrip 跳过长度字段 ); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) return null; try { byte[] bytes = new byte[frame.readableBytes()]; frame.readBytes(bytes); String json = new String(bytes, StandardCharsets.UTF_8); return new ObjectMapper().readValue(json, TcpMessage.class); } finally { frame.release(); } } }第四步:实现业务处理器
@Component @ChannelHandler.Sharable // 标记为可共享,避免每次新建 public class TcpServerHandler extends SimpleChannelInboundHandler<TcpMessage> { private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) { log.info("设备上线: {}", ctx.channel().remoteAddress()); // 可在此记录连接、分配ID等 } @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("设备离线: {}", ctx.channel().remoteAddress()); // 清理资源、标记离线 } @Override protected void channelRead0(ChannelHandlerContext ctx, TcpMessage msg) { log.info("收到消息: {}", msg); // 根据指令处理 switch (msg.getCmd()) { case "login": handleLogin(ctx, msg); break; case "heartbeat": // 心跳直接回复 ctx.writeAndFlush(new TcpMessage("ack", "ok")); break; case "data": handleData(ctx, msg); break; default: ctx.writeAndFlush(new TcpMessage("error", "unknown command")); } } private void handleLogin(ChannelHandlerContext ctx, TcpMessage msg) { // 模拟登录验证 String deviceId = ((Map<String, String>) msg.getData()).get("id"); log.info("设备 {} 登录成功", deviceId); ctx.writeAndFlush(new TcpMessage("login_ack", "success")); } private void handleData(ChannelHandlerContext ctx, TcpMessage msg) { // 处理业务数据 ctx.writeAndFlush(new TcpMessage("data_ack", "received")); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("连接异常: {}", ctx.channel().remoteAddress(), cause); ctx.close(); // 出错关闭连接 } }第五步:启动 Netty 服务(集成到 Spring Boot)
@Component public class NettyTcpServer { private static final Logger log = LoggerFactory.getLogger(NettyTcpServer.class); private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; @Autowired private TcpServerHandler tcpServerHandler; @PostConstruct public void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new MessageDecoder()) .addLast(new MessageEncoder()) .addLast(tcpServerHandler); // 业务处理器 } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); serverChannel = bootstrap.bind(8080).sync().channel(); log.info("Netty TCP 服务已启动,监听端口: 8080"); } @PreDestroy public void stop() { if (serverChannel != null) { serverChannel.close(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("Netty TCP 服务已关闭"); } }✅ 关键点:
- 使用
@PostConstruct启动,@PreDestroy优雅关闭@ChannelHandler.Sharable避免重复创建 Handler 实例
三、客户端测试代码(模拟设备)
public class TcpClientTest { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new MessageDecoder()) .addLast(new MessageEncoder()) .addLast(new SimpleChannelInboundHandler<TcpMessage>() { @Override protected void channelRead0(ChannelHandlerContext ctx, TcpMessage msg) { System.out.println("收到服务端回复: " + msg); } }); } }); Channel channel = bootstrap.connect("localhost", 8080).sync().channel(); // 发送登录 TcpMessage loginMsg = new TcpMessage(); loginMsg.setCmd("login"); loginMsg.setData(Map.of("id", "D1001")); channel.writeAndFlush(loginMsg); // 发送心跳 TcpMessage hb = new TcpMessage(); hb.setCmd("heartbeat"); hb.setData("ping"); channel.writeAndFlush(hb); // 保持连接 Thread.sleep(5000); channel.closeFuture().sync(); } finally { group.shutdownGracefully(); } } }四、常见反例 & 避坑指南(新手必看!)
❌ 反例1:未处理粘包/拆包 → 消息解析错乱!
// 错误:直接按行读取(只适用于 \n 分隔的文本协议) ch.pipeline().addLast(new LineBasedFrameDecoder(1024));✅ 正确:使用LengthFieldBasedFrameDecoder或自定义协议头
❌ 反例2:Handler 不加@Sharable→ 内存泄漏!
// 每次连接都 new 一个 Handler,持有上下文引用无法释放 .childHandler(new ChannelInitializer<...>() { protected void initChannel(...) { ch.pipeline().addLast(new TcpServerHandler()); // ❌ } });✅ 正确:注入 Spring Bean 并标记@Sharable
❌ 反例3:未设置 SO_KEEPALIVE → 僵尸连接堆积
// 默认不开启 TCP 心跳,断网后连接仍“假在线” .childOption(ChannelOption.SO_KEEPALIVE, false); // ❌✅ 正确:
.childOption(ChannelOption.SO_KEEPALIVE, true); // 或应用层实现心跳(更可靠)❌ 反例4:异常未关闭连接 → 资源泄露
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 什么都不做!连接会一直挂着 }✅ 正确:捕获异常后ctx.close()
五、生产环境增强建议
| 功能 | 实现方式 |
|---|---|
| 心跳检测 | 客户端定时发heartbeat,服务端记录最后活跃时间,超时踢出 |
| 连接管理 | 用ConcurrentHashMap<String, Channel>存储设备ID与Channel映射 |
| SSL加密 | 添加SslHandler到 pipeline |
| 流量控制 | 使用ChannelOption.WRITE_BUFFER_WATER_MARK |
| 监控指标 | 暴露连接数、吞吐量等指标到 Prometheus |
六、总结
Netty 强大但复杂,协议设计 > 代码实现。
本文带你:
- ✅ 从零搭建 Spring Boot + Netty TCP 服务
- ✅ 解决粘包/拆包、连接管理、异常处理等核心问题
- ✅ 避开内存泄漏、僵尸连接等致命陷阱
记住:高并发不是堆机器,而是靠合理架构。Netty 是你构建高性能网络服务的基石!
视频看了几百小时还迷糊?关注我,几分钟让你秒懂!(发点评论可以给博主加热度哦)