大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01服务提供者收发消息基础实现 -> opt-01自定义网络传输协议的实现 -> opt-02自定义编解码实现 -> opt-03服务提供者调用真实方法实现 -> opt-04完善服务消费者发送消息基础功能 -> opt-05注册中心基础功能实现 -> opt-06服务提供者整合注册中心 -> opt-07服务消费者整合注册中心 -> opt-08完善服务消费者接收响应结果 -> opt-09服务消费者,服务提供者整合SpringBoot -> opt-10动态代理屏蔽RPC服务调用底层细节 -> opt-10SPI机制基础功能实现 -> opt-11SPI机制扩展随机负载均衡策略 -> opt-12SPI机制扩展轮询负载均衡策略 -> opt-13SPI机制扩展JDK序列化 -> opt-14SPI机制扩展JSON序列化 -> opt-15SPI机制扩展protustuff序列化 -> opt-16自定义注解 -> opt-01 服务提供者收发消息基础实现 -> opt-01 自定义网络传输协议的实现 -> opt-02 自定义编解码实现 -> opt-03 服务提供者调用真实方法实现 -> opt-04 完善服务消费者发送消息基础功能 -> opt-05 注册中心基础功能实现 -> opt-06 服务提供者整合注册中心 -> opt-07 服务消费者整合注册中心 -> opt-08 完善服务消费者接收响应结果 -> opt-09 服务消费者,服务提供者整合SpringBoot -> opt-10 动态代理屏蔽RPC服务调用底层细节 -> opt-10 SPI机制基础功能实现 -> opt-11 SPI机制扩展随机负载均衡策略 -> opt-12 SPI机制扩展轮询负载均衡策略 -> opt-13 SPI机制扩展JDK序列化 -> opt-14 SPI机制扩展JSON序列化 -> opt-15 SPI机制扩展protustuff序列化 -> opt-16自定义注解 -> opt-01 服务提供者收发消息基础实现 -> opt-01 自定义网络传输协议的实现 -> opt-02 自定义编解码实现 -> opt-03 服务提供者调用真实方法实现 -> opt-04 完善服务消费者发送消息基础功能 -> opt-05 注册中心基础功能实现 -> opt-06 服务提供者整合注册中心 -> opt-07 服务消费者整合注册中心 -> opt-08 完善服务消费者接收响应结果 -> opt-09 服务消费者,服务提供者整合SpringBoot -> opt-10 动态代理屏蔽RPC服务调用底层细节 -> opt-10 SPI机制基础功能实现 -> opt-11 SPI机制扩展随机负载均衡策略 -> opt-12 SPI机制扩展轮询负载均衡策略 -> opt-13 SPI机制扩展JDK序列化 -> opt-14 SPI机制扩展JSON序列化 -> opt-15 SPI机制扩展protustuff序列化 -> opt-16
前言
在上一章中我们设计了自己的网络传输协议,并且实现了。但是现在的服务端只能接收String类型的协议。也就是Netty自带的。如果我们此时直接使用是会有问题的。所以我们还需要实现自己的编解码器
设计
服务消费者通过自定义网络传输协议发送请求,首先会通过编码器的处理,然后经过网络传输到达服务提供者,服务提供者通过解码器处理之后,拿到我们能够处理的数据。之后返回响应数据
实现
创建子项目 xpc-rpc-codec,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>xpc-rpc</artifactId><groupId>com.xpc</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>xpc-rpc-codec</artifactId><dependencies><dependency><groupId>com.xpc</groupId><artifactId>xpc-rpc-protocol</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.xpc</groupId><artifactId>xpc-rpc-serialization-jdk</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.xpc</groupId><artifactId>xpc-rpc-common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project><?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>xpc-rpc</artifactId> <groupId>com.xpc</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>xpc-rpc-codec</artifactId> <dependencies> <dependency> <groupId>com.xpc</groupId> <artifactId>xpc-rpc-protocol</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.xpc</groupId> <artifactId>xpc-rpc-serialization-jdk</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.xpc</groupId> <artifactId>xpc-rpc-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project><?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>xpc-rpc</artifactId> <groupId>com.xpc</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>xpc-rpc-codec</artifactId> <dependencies> <dependency> <groupId>com.xpc</groupId> <artifactId>xpc-rpc-protocol</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.xpc</groupId> <artifactId>xpc-rpc-serialization-jdk</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.xpc</groupId> <artifactId>xpc-rpc-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
创建一个通用的接口 com.xpc.rpc.codec.RpcCoder
package com.xpc.rpc.codec;import com.xpc.rpc.common.serialization.Serialization;/*** 通用接口*/public interface RpcCoder {/*** 获取具体的序列化接口* 后续会使用SPI机制进行扩展,现在默认给一个JDK的序列化方式*/Serialization getJdkSerialization();}package com.xpc.rpc.codec; import com.xpc.rpc.common.serialization.Serialization; /** * 通用接口 */ public interface RpcCoder { /** * 获取具体的序列化接口 * 后续会使用SPI机制进行扩展,现在默认给一个JDK的序列化方式 */ Serialization getJdkSerialization(); }package com.xpc.rpc.codec; import com.xpc.rpc.common.serialization.Serialization; /** * 通用接口 */ public interface RpcCoder { /** * 获取具体的序列化接口 * 后续会使用SPI机制进行扩展,现在默认给一个JDK的序列化方式 */ Serialization getJdkSerialization(); }
创建解码器:com.xpc.rpc.codec.RpcDecoder
package com.xpc.rpc.codec;import com.xpc.rpc.common.enums.RpcMsgType;import com.xpc.rpc.common.serialization.Serialization;import com.xpc.rpc.protocol.ProtocolMessage;import com.xpc.rpc.protocol.header.RpcHeader;import com.xpc.rpc.protocol.request.RpcRequest;import com.xpc.rpc.protocol.response.RpcResponse;import com.xpc.rpc.serialization.jdk.JdkSerialization;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;/*** 解码器*/public class RpcDecoder extends ByteToMessageDecoder implements RpcCoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if(in.readableBytes() < 16) {return;}//拿到请求消息类型int msgType = in.readInt();//拿到请求体长度int bodyLen = in.readInt();//拿到请求idlong requestId = in.readLong();if(in.readableBytes() < bodyLen) {//表示读取的数据还不够in.resetReaderIndex();return;}//获取请求体byte[] data = new byte[bodyLen];in.readBytes(data);RpcHeader rpcHeader = new RpcHeader();rpcHeader.setRequestId(requestId);rpcHeader.setBodyLen(bodyLen);rpcHeader.setMsgType(msgType);ProtocolMessage protocolMessage = new ProtocolMessage();protocolMessage.setRpcHeader(rpcHeader);//获取序列化接口,后续会使用SPI机制扩展Serialization jdkSerialization = getJdkSerialization();RpcMsgType rpcMsgType = RpcMsgType.findByType(msgType);switch (rpcMsgType){case REQUEST:RpcRequest request = jdkSerialization.deserialize(data, RpcRequest.class);protocolMessage.setT(request);break;case RESPONSE:RpcResponse response = jdkSerialization.deserialize(data, RpcResponse.class);protocolMessage.setT(response);break;}//更新读索引in.markReaderIndex();//最后通过管道写出去out.add(protocolMessage);}@Overridepublic Serialization getJdkSerialization() {return new JdkSerialization();}}package com.xpc.rpc.codec; import com.xpc.rpc.common.enums.RpcMsgType; import com.xpc.rpc.common.serialization.Serialization; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.header.RpcHeader; import com.xpc.rpc.protocol.request.RpcRequest; import com.xpc.rpc.protocol.response.RpcResponse; import com.xpc.rpc.serialization.jdk.JdkSerialization; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 解码器 */ public class RpcDecoder extends ByteToMessageDecoder implements RpcCoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if(in.readableBytes() < 16) { return; } //拿到请求消息类型 int msgType = in.readInt(); //拿到请求体长度 int bodyLen = in.readInt(); //拿到请求id long requestId = in.readLong(); if(in.readableBytes() < bodyLen) { //表示读取的数据还不够 in.resetReaderIndex(); return; } //获取请求体 byte[] data = new byte[bodyLen]; in.readBytes(data); RpcHeader rpcHeader = new RpcHeader(); rpcHeader.setRequestId(requestId); rpcHeader.setBodyLen(bodyLen); rpcHeader.setMsgType(msgType); ProtocolMessage protocolMessage = new ProtocolMessage(); protocolMessage.setRpcHeader(rpcHeader); //获取序列化接口,后续会使用SPI机制扩展 Serialization jdkSerialization = getJdkSerialization(); RpcMsgType rpcMsgType = RpcMsgType.findByType(msgType); switch (rpcMsgType){ case REQUEST: RpcRequest request = jdkSerialization.deserialize(data, RpcRequest.class); protocolMessage.setT(request); break; case RESPONSE: RpcResponse response = jdkSerialization.deserialize(data, RpcResponse.class); protocolMessage.setT(response); break; } //更新读索引 in.markReaderIndex(); //最后通过管道写出去 out.add(protocolMessage); } @Override public Serialization getJdkSerialization() { return new JdkSerialization(); } }package com.xpc.rpc.codec; import com.xpc.rpc.common.enums.RpcMsgType; import com.xpc.rpc.common.serialization.Serialization; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.header.RpcHeader; import com.xpc.rpc.protocol.request.RpcRequest; import com.xpc.rpc.protocol.response.RpcResponse; import com.xpc.rpc.serialization.jdk.JdkSerialization; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 解码器 */ public class RpcDecoder extends ByteToMessageDecoder implements RpcCoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if(in.readableBytes() < 16) { return; } //拿到请求消息类型 int msgType = in.readInt(); //拿到请求体长度 int bodyLen = in.readInt(); //拿到请求id long requestId = in.readLong(); if(in.readableBytes() < bodyLen) { //表示读取的数据还不够 in.resetReaderIndex(); return; } //获取请求体 byte[] data = new byte[bodyLen]; in.readBytes(data); RpcHeader rpcHeader = new RpcHeader(); rpcHeader.setRequestId(requestId); rpcHeader.setBodyLen(bodyLen); rpcHeader.setMsgType(msgType); ProtocolMessage protocolMessage = new ProtocolMessage(); protocolMessage.setRpcHeader(rpcHeader); //获取序列化接口,后续会使用SPI机制扩展 Serialization jdkSerialization = getJdkSerialization(); RpcMsgType rpcMsgType = RpcMsgType.findByType(msgType); switch (rpcMsgType){ case REQUEST: RpcRequest request = jdkSerialization.deserialize(data, RpcRequest.class); protocolMessage.setT(request); break; case RESPONSE: RpcResponse response = jdkSerialization.deserialize(data, RpcResponse.class); protocolMessage.setT(response); break; } //更新读索引 in.markReaderIndex(); //最后通过管道写出去 out.add(protocolMessage); } @Override public Serialization getJdkSerialization() { return new JdkSerialization(); } }
对于该类中第一个判断为什么是16,这是解释一下,一个完整的请求包括请求头和请求体,我们请求头包括 消息类型,请求体长度,请求唯一ID,int是4字节,long占8字节,所以请求头一共占16字节,那还有请求体呢? 其实一个请求的请求体为空也是有可能的,也就是 bodyLen = 0的时候,所以一个完整的请求大小至少要等于 16字节
/*** 请求类型,是普通请求,或者是心跳消息等*/private int msgType;/*** 消息体的长度*/private int bodyLen;/*** 请求的唯一ID*/private Long requestId;/** * 请求类型,是普通请求,或者是心跳消息等 */ private int msgType; /** * 消息体的长度 */ private int bodyLen; /** * 请求的唯一ID */ private Long requestId;/** * 请求类型,是普通请求,或者是心跳消息等 */ private int msgType; /** * 消息体的长度 */ private int bodyLen; /** * 请求的唯一ID */ private Long requestId;
创建编码器:com.xpc.rpc.codec.RpcEncoder
package com.xpc.rpc.codec;import com.xpc.rpc.common.serialization.Serialization;import com.xpc.rpc.protocol.ProtocolMessage;import com.xpc.rpc.serialization.jdk.JdkSerialization;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;public class RpcEncoder extends MessageToByteEncoder<ProtocolMessage> implements RpcCoder {@Overridepublic Serialization getJdkSerialization() {return new JdkSerialization();}@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage protocolMessage, ByteBuf out) throws Exception {out.writeInt(protocolMessage.getRpcHeader().getMsgType());byte[] bytes = getJdkSerialization().serialize(protocolMessage.getT());out.writeInt(bytes.length);out.writeLong(protocolMessage.getRpcHeader().getRequestId());out.writeBytes(bytes);}}package com.xpc.rpc.codec; import com.xpc.rpc.common.serialization.Serialization; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.serialization.jdk.JdkSerialization; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class RpcEncoder extends MessageToByteEncoder<ProtocolMessage> implements RpcCoder { @Override public Serialization getJdkSerialization() { return new JdkSerialization(); } @Override protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage protocolMessage, ByteBuf out) throws Exception { out.writeInt(protocolMessage.getRpcHeader().getMsgType()); byte[] bytes = getJdkSerialization().serialize(protocolMessage.getT()); out.writeInt(bytes.length); out.writeLong(protocolMessage.getRpcHeader().getRequestId()); out.writeBytes(bytes); } }package com.xpc.rpc.codec; import com.xpc.rpc.common.serialization.Serialization; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.serialization.jdk.JdkSerialization; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class RpcEncoder extends MessageToByteEncoder<ProtocolMessage> implements RpcCoder { @Override public Serialization getJdkSerialization() { return new JdkSerialization(); } @Override protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage protocolMessage, ByteBuf out) throws Exception { out.writeInt(protocolMessage.getRpcHeader().getMsgType()); byte[] bytes = getJdkSerialization().serialize(protocolMessage.getT()); out.writeInt(bytes.length); out.writeLong(protocolMessage.getRpcHeader().getRequestId()); out.writeBytes(bytes); } }
然后将服务端和客户端的编解码器都替换成我们自己实现的编解码器
服务端:com.xpc.rpc.provider.server.base.BaseServer,修改startNettyServer()方法
@Overridepublic void startNettyServer() {//其它代码省略bootstrap.group(bossGroup,workerGroup).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();//Netty自带的编解码器//pipeline.addLast(new StringDecoder());//pipeline.addLast(new StringEncoder());//替换成我们自己的编解码器pipeline.addLast(new RpcDecoder());pipeline.addLast(new RpcEncoder());//我们自己实现的处理器pipeline.addLast(new RpcProviderHandler(handlerMap));}});try {ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();LOGGER.info("Netty 服务端启动成功............");channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {LOGGER.error("Netty 服务端启动失败:{}",e);}finally {shutDown();}}@Override public void startNettyServer() { //其它代码省略 bootstrap.group(bossGroup,workerGroup) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //Netty自带的编解码器 //pipeline.addLast(new StringDecoder()); //pipeline.addLast(new StringEncoder()); //替换成我们自己的编解码器 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); //我们自己实现的处理器 pipeline.addLast(new RpcProviderHandler(handlerMap)); } }); try { ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync(); LOGGER.info("Netty 服务端启动成功............"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.error("Netty 服务端启动失败:{}",e); }finally { shutDown(); } }@Override public void startNettyServer() { //其它代码省略 bootstrap.group(bossGroup,workerGroup) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //Netty自带的编解码器 //pipeline.addLast(new StringDecoder()); //pipeline.addLast(new StringEncoder()); //替换成我们自己的编解码器 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); //我们自己实现的处理器 pipeline.addLast(new RpcProviderHandler(handlerMap)); } }); try { ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync(); LOGGER.info("Netty 服务端启动成功............"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.error("Netty 服务端启动失败:{}",e); }finally { shutDown(); } }
修改com.xpc.rpc.provide.handler.RpcProviderHandler
package com.xpc.rpc.provider.handler;import com.xpc.rpc.common.enums.RpcMsgType;import com.xpc.rpc.protocol.ProtocolMessage;import com.xpc.rpc.protocol.header.RpcHeader;import com.xpc.rpc.protocol.request.RpcRequest;import com.xpc.rpc.protocol.response.RpcResponse;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public class RpcProviderHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcRequest>> {private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);private Map<String,Object> handlerMap;public RpcProviderHandler(Map<String,Object> handlerMap) {this.handlerMap = handlerMap;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage<RpcRequest> msg) throws Exception {LOGGER.info("className: {}",msg.getT().getClassName());LOGGER.info("method:{}",msg.getT().getMethodName());LOGGER.info("ParameterTypes:{}",msg.getT().getParameterTypes());LOGGER.info("Parameters:{}",msg.getT().getParameters());//写回响应数据ProtocolMessage<RpcResponse> protocolMessage = new ProtocolMessage<RpcResponse>();RpcHeader rpcHeader = msg.getRpcHeader();rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType());protocolMessage.setRpcHeader(rpcHeader);RpcResponse response = new RpcResponse();response.setCode(200);response.setData("success");protocolMessage.setT(response);ctx.writeAndFlush(protocolMessage);}}package com.xpc.rpc.provider.handler; import com.xpc.rpc.common.enums.RpcMsgType; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.header.RpcHeader; import com.xpc.rpc.protocol.request.RpcRequest; import com.xpc.rpc.protocol.response.RpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class RpcProviderHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcRequest>> { private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class); private Map<String,Object> handlerMap; public RpcProviderHandler(Map<String,Object> handlerMap) { this.handlerMap = handlerMap; } @Override protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage<RpcRequest> msg) throws Exception { LOGGER.info("className: {}",msg.getT().getClassName()); LOGGER.info("method:{}",msg.getT().getMethodName()); LOGGER.info("ParameterTypes:{}",msg.getT().getParameterTypes()); LOGGER.info("Parameters:{}",msg.getT().getParameters()); //写回响应数据 ProtocolMessage<RpcResponse> protocolMessage = new ProtocolMessage<RpcResponse>(); RpcHeader rpcHeader = msg.getRpcHeader(); rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType()); protocolMessage.setRpcHeader(rpcHeader); RpcResponse response = new RpcResponse(); response.setCode(200); response.setData("success"); protocolMessage.setT(response); ctx.writeAndFlush(protocolMessage); } }package com.xpc.rpc.provider.handler; import com.xpc.rpc.common.enums.RpcMsgType; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.header.RpcHeader; import com.xpc.rpc.protocol.request.RpcRequest; import com.xpc.rpc.protocol.response.RpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class RpcProviderHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcRequest>> { private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class); private Map<String,Object> handlerMap; public RpcProviderHandler(Map<String,Object> handlerMap) { this.handlerMap = handlerMap; } @Override protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage<RpcRequest> msg) throws Exception { LOGGER.info("className: {}",msg.getT().getClassName()); LOGGER.info("method:{}",msg.getT().getMethodName()); LOGGER.info("ParameterTypes:{}",msg.getT().getParameterTypes()); LOGGER.info("Parameters:{}",msg.getT().getParameters()); //写回响应数据 ProtocolMessage<RpcResponse> protocolMessage = new ProtocolMessage<RpcResponse>(); RpcHeader rpcHeader = msg.getRpcHeader(); rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType()); protocolMessage.setRpcHeader(rpcHeader); RpcResponse response = new RpcResponse(); response.setCode(200); response.setData("success"); protocolMessage.setT(response); ctx.writeAndFlush(protocolMessage); } }
修改 com.xpc.rpc.consumer.RpcConsumer 的构造方法方法
public RpcConsumer() {eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();//Netty自带的编解码器//pipeline.addLast(new StringDecoder());//pipeline.addLast(new StringEncoder());//替换成我们自己的编解码器pipeline.addLast(new RpcDecoder());pipeline.addLast(new RpcEncoder());pipeline.addLast(new RpcConsumerHandler());}});try {ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {if(future.isSuccess()) {LOGGER.info("客户端连接成功........");}else {LOGGER.info("客户端连接失败........");}}});channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}public RpcConsumer() { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //Netty自带的编解码器 //pipeline.addLast(new StringDecoder()); //pipeline.addLast(new StringEncoder()); //替换成我们自己的编解码器 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); pipeline.addLast(new RpcConsumerHandler()); } }); try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()) { LOGGER.info("客户端连接成功........"); }else { LOGGER.info("客户端连接失败........"); } } }); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } }public RpcConsumer() { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //Netty自带的编解码器 //pipeline.addLast(new StringDecoder()); //pipeline.addLast(new StringEncoder()); //替换成我们自己的编解码器 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); pipeline.addLast(new RpcConsumerHandler()); } }); try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()) { LOGGER.info("客户端连接成功........"); }else { LOGGER.info("客户端连接失败........"); } } }); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } }
修改com.xpc.rpc.consumer.handler.RpcConsumerHandler
package com.xpc.rpc.consumer.handler;import com.xpc.rpc.common.enums.RpcMsgType;import com.xpc.rpc.protocol.ProtocolMessage;import com.xpc.rpc.protocol.header.RpcHeader;import com.xpc.rpc.protocol.request.RpcRequest;import com.xpc.rpc.protocol.response.RpcResponse;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> {private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//写一条消息到服务端ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>();RpcHeader rpcHeader = new RpcHeader();rpcHeader.setMsgType(RpcMsgType.REQUEST.getType());rpcHeader.setRequestId(1L);RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName("test");rpcRequest.setMethodName("hello");rpcRequest.setParameterTypes(new Class[]{String.class});rpcRequest.setParameters(new Object[]{"coco"});protocolMessage.setRpcHeader(rpcHeader);protocolMessage.setT(rpcRequest);ctx.writeAndFlush(protocolMessage);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception {LOGGER.info("code: {}",protocolMessage.getT().getCode());LOGGER.info("data: {}",protocolMessage.getT().getData());}}package com.xpc.rpc.consumer.handler; import com.xpc.rpc.common.enums.RpcMsgType; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.header.RpcHeader; import com.xpc.rpc.protocol.request.RpcRequest; import com.xpc.rpc.protocol.response.RpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> { private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //写一条消息到服务端 ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>(); RpcHeader rpcHeader = new RpcHeader(); rpcHeader.setMsgType(RpcMsgType.REQUEST.getType()); rpcHeader.setRequestId(1L); RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName("test"); rpcRequest.setMethodName("hello"); rpcRequest.setParameterTypes(new Class[]{String.class}); rpcRequest.setParameters(new Object[]{"coco"}); protocolMessage.setRpcHeader(rpcHeader); protocolMessage.setT(rpcRequest); ctx.writeAndFlush(protocolMessage); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception { LOGGER.info("code: {}",protocolMessage.getT().getCode()); LOGGER.info("data: {}",protocolMessage.getT().getData()); } }package com.xpc.rpc.consumer.handler; import com.xpc.rpc.common.enums.RpcMsgType; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.header.RpcHeader; import com.xpc.rpc.protocol.request.RpcRequest; import com.xpc.rpc.protocol.response.RpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> { private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //写一条消息到服务端 ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>(); RpcHeader rpcHeader = new RpcHeader(); rpcHeader.setMsgType(RpcMsgType.REQUEST.getType()); rpcHeader.setRequestId(1L); RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName("test"); rpcRequest.setMethodName("hello"); rpcRequest.setParameterTypes(new Class[]{String.class}); rpcRequest.setParameters(new Object[]{"coco"}); protocolMessage.setRpcHeader(rpcHeader); protocolMessage.setT(rpcRequest); ctx.writeAndFlush(protocolMessage); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception { LOGGER.info("code: {}",protocolMessage.getT().getCode()); LOGGER.info("data: {}",protocolMessage.getT().getData()); } }
测试
先启动服务提供者:com.xpc.test.netty.ProviderTest
然后启动服务消费者:com.xpc.test.netty.ConsumerTest
服务端日志:
客户端日志: