From 311e8027e013d062f955cd631a4331ad3d97cc47 Mon Sep 17 00:00:00 2001 From: MartyJ28 <1578128844@qq.com> Date: Sat, 23 Nov 2024 14:48:05 +0800 Subject: [PATCH] =?UTF-8?q?Demo=E5=BF=83=E8=B7=B3=E7=9B=91=E6=B5=8B?= =?UTF-8?q?=E5=92=8C=E7=9F=AD=E7=BA=BF=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../demo/Service/nettyClientHeart.java | 137 ++++++++++++++++++ .../demo/Service/nettyServerHeart.java | 100 +++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 src/main/java/com/example/demo/Service/nettyClientHeart.java create mode 100644 src/main/java/com/example/demo/Service/nettyServerHeart.java diff --git a/src/main/java/com/example/demo/Service/nettyClientHeart.java b/src/main/java/com/example/demo/Service/nettyClientHeart.java new file mode 100644 index 0000000..1effe3d --- /dev/null +++ b/src/main/java/com/example/demo/Service/nettyClientHeart.java @@ -0,0 +1,137 @@ +package com.example.demo.Service; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class nettyClientHeart { + + private final String host = "127.0.0.1"; // 目标服务器地址 + private final int port = 8888; // 目标服务器端口 + private int failCount; + private Bootstrap bootstrap; + + public void start() throws InterruptedException { + // 1. 创建 EventLoopGroup 用于管理客户端的 NIO 线程 + EventLoopGroup group = new NioEventLoopGroup(); + + + // 2. Bootstrap 用于创建客户端 + bootstrap = new Bootstrap(); + bootstrap.group(group) // 指定线程组 + .channel(NioSocketChannel.class) // 指定使用 NIO 传输的Channel + //.option(ChannelOption.SO_KEEPALIVE, true) // 保持长连接 + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + // 3. 配置客户端的 ChannelHandler + //ch.pipeline().addLast(new NettyMsgEncoder()); + ch.pipeline().addLast(new nettyClientHandler2(nettyClientHeart.this)); + } + }); + + // 4. 连接服务器 + connect(); + + +// } finally { +// // 7. 关闭线程组 +// group.shutdownGracefully(); +// } + } + + public void connect() throws InterruptedException { + ChannelFuture channelFuture = bootstrap.connect(host, port); + channelFuture.addListener((ChannelFutureListener)future->{ + if(!future.isSuccess()){ + final EventLoop loop = channelFuture.channel().eventLoop(); + loop.schedule(()->{ + log.info("我与服务器失去了连接,正在进行重连..."); + try{ + connect(); + failCount++; + }catch ( Exception e){ + e.printStackTrace(); + } + },3L, TimeUnit.SECONDS); + }else{ + System.out.println("服务端连接成功!"); + } + }); + + // 5. 向服务器发送消息 + //future.channel().writeAndFlush("Hello, Netty Server!"); + + // 6. 等待连接关闭 + channelFuture.channel().closeFuture().sync(); + } + + public static void main(String[] args) throws InterruptedException { + new nettyClientHeart().start(); + } + + +} + +class nettyClientHandler2 extends ChannelInboundHandlerAdapter { + private nettyClientHeart nettyClient; + + public nettyClientHandler2(nettyClientHeart nettyClient){ + this.nettyClient = nettyClient; + } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf byteBuf = (ByteBuf)msg; + System.out.println("Received message from server: " + byteBuf.toString(StandardCharsets.UTF_8)); + } + + @Override + //当 Channel 已经连接/绑定并且已经就绪时调用 + public void channelActive(ChannelHandlerContext ctx) throws Exception { + for(int i = 0;i < 1;i++) { + String bb = (i+1) +""; + byte[] cc = new byte[0]; + try{ + int mills = new Random().nextInt(5); + Thread.sleep(mills*1000); + bb += "休息" + mills + "秒"; + cc = bb.getBytes(StandardCharsets.UTF_8); + ByteBuf ccBuf = Unpooled.buffer(cc.length); + ccBuf.writeBytes(cc); + ctx.writeAndFlush(ccBuf); + }catch (Exception e){ + e.printStackTrace(); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 处理异常 + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + try { + ctx.pipeline().remove(this); + ctx.channel().close(); + nettyClient.connect(); + }catch (Exception e){ + e.printStackTrace(); + } + } +} + diff --git a/src/main/java/com/example/demo/Service/nettyServerHeart.java b/src/main/java/com/example/demo/Service/nettyServerHeart.java new file mode 100644 index 0000000..2537f2c --- /dev/null +++ b/src/main/java/com/example/demo/Service/nettyServerHeart.java @@ -0,0 +1,100 @@ +package com.example.demo.Service; + + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +@Slf4j +public class nettyServerHeart { + private final int port; + + public nettyServerHeart(int port) { + this.port = port; + } + + public void start() throws InterruptedException { + EventLoopGroup bossGroup = new NioEventLoopGroup(1);//线程 + EventLoopGroup workerGroup = new NioEventLoopGroup();//8 + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG,128) + .childOption(ChannelOption.SO_KEEPALIVE,true) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + log.info("客户端channel初始化"); + p.addLast(new IdleStateHandler(4,0,0, TimeUnit.SECONDS)); + p.addLast(new nettyServerHandler2()); + + } + }); + + + ChannelFuture f = b.bind(port).sync();//成功之后在执行 + log.info("服务启动成功!"); + f.addListener((ChannelFutureListener)future -> log.info("监听端口:{}",future.isSuccess())); + f.channel().closeFuture().sync(); + + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + public static void main(String[] args) throws InterruptedException { + new nettyServerHeart(8888).start(); + } +} + + +@Slf4j +class nettyServerHandler2 extends ChannelInboundHandlerAdapter { + int i = 0; + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + i++; + ByteBuf in = (ByteBuf)msg; + + System.out.println("Received message: " +in.toString(io.netty.util.CharsetUtil.UTF_8)); + System.out.println(); + //ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello from Netty Server",CharsetUtil.UTF_8)); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if(evt instanceof IdleStateEvent){ + IdleStateEvent idleStateEvent = (IdleStateEvent)evt; + switch (idleStateEvent.state()){ + case ALL_IDLE: + log.error("读写都空闲"); + break; + case READER_IDLE: + log.error("读空闲"); + break; + case WRITER_IDLE: + log.error("写空闲"); + break; + default: + System.out.println("default!"); + } + } + } +}