Netty群聊系统实例

实例要求

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)

目的:进一步理解Netty非阻塞网络编程机制

服务端

编写GroupChatServer类

image.png
GroupChatServerHandler

CleanShot 2020-11-13 at 10.51.07

private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

public class GroupChatServer {
    private int port;//监听端口


    public GroupChatServer(int port) {
        this.port = port;
    }


    //编写run方法,处理客户端端请求
    public void run() throws InterruptedException {
        //创建两个线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {


                        @Override

                        protected void initChannel(SocketChannel ch) throws Exception {

                            //获取到pipelineå
                            ChannelPipeline pipeline = ch.pipeline();

                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());

                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己到业务处理handler
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });


            System.out.println("Netty 服务器启动");
            ChannelFuture channelFuture = bootstrap.bind(port).sync();

            //监听关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new GroupChatServer(7000).run();
    }
}

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {



    //定义一个channel组,管理所以到channel
    //GlobalEventExecutor.INSTANCE 是全局的事件执行器,单例
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");



    //handlerAdded表示连接建立,一旦连接,第一个被执行该方法
    //将当前channel加入到channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户端加入聊天端信息推送给其他在线端客户端。该方法会将channelGroup中所有端channel遍历,并发送信息。因此不需要自己遍历
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new Date()) + "\n");
        channelGroup.add(channel);
    }


    //断开连接,将xx客户离开信息推送哥当前在线的客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
        System.out.println("当前channelGroup大小:" + channelGroup.size());
    }

    //表示channel处于活动的状态 提示xxx上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上线了~"+sdf.format(new Date()) + "\n");
    }


    //表示channel处于不活动的状态 提示xxx下线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 离线了~"+sdf.format(new Date()) + "\n");
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //获取当前channel
        Channel channel = ctx.channel();
        //遍历channelGroup,根据不同的情况,推送不同的消息 (排除自身)
        channelGroup.forEach(ch -> {
            if (channel != ch) {//不是当前channel,转发消息
                ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息:" + msg + sdf.format(new Date()) + "\n");
            } else {//自己发送的信息,在自己上如何显示
                ch.writeAndFlush("[自己]" + " 发送了消息:" + msg + "\n");
            }
        });
    }
}

客户端

GroupChatClient

cdn.jsdelivr.net/gh/kylincw/…

image-20201113150943626

public class GroupChatClient {



    //属性
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {


                        @Override

                        protected void initChannel(SocketChannel ch) throws Exception {

                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();

                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());

                            pipeline.addLast("encoder", new StringEncoder());
                            //自定义handler
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("--------" + channel.localAddress() + "--------");
            //客户端需要输入信息,创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                //通过channel发送大服务器端
                channel.writeAndFlush(msg+"\r\n");
            }
        } finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        new GroupChatClient("127.0.0.1",7000).run();
    }
}
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {





    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());//取掉两端空格
    }
}

测试

image.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY6EM7Jj' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片