1.服务端
1.服务注册
@Configuration
public class NettyServerStart {
@Autowired
private ServiceRegistrationScan registrationScan;
@Autowired
private NettyServer nettyServer;
@PostConstruct
public void start() {
//服务注册
registrationScan.scan();
//服务器启动
nettyServer.start();
}
}
@PostConstruct注解的作用是在依赖注入完成后,执行被注解的方法。
深入ServiceRegistrationScan
@Component
public class ServiceRegistrationScan implements ApplicationContextAware {
@Autowired
private ServerZKit serverZKit;
@Autowired
private RpcServerConfiguration rpcServerConfiguration;
//把spring容器注入进来因为会用到它去拿我们需要暴露服务的类
private static ApplicationContext context;
public void scan() {
//创建一个根节点
serverZKit.createRootNode();
//拿到所有实现这个接口的类
Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(MoXiuRpcServer.class);
if (!beansWithAnnotation.isEmpty()) {
//拿到当前服务器的IP地址
String realIp = IpUtil.getRealIp();
for (Object value : beansWithAnnotation.values()) {
MoXiuRpcServer annotation = value.getClass().getAnnotation(MoXiuRpcServer.class);
//拿到类似于这样的 com.moxiu.xxx
Class aClass = annotation.interfaceName();
String s = aClass.getName();
//以这个接口创建一个结点
serverZKit.createPersistentNode(s);
String childName = s + "/" + realIp +":"+ rpcServerConfiguration.getRpcPort();
//创建本机IP地址加端口
serverZKit.createNode(childName);
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
public static <T> Class<T> getBean(String beanName) {
return (Class<T>) context.getBean(beanName);
}
}
以上代码为注入spring容器拿到所有实现了@MoXiuRpcServer注解的实现类,把该类注册到ZK中。
这个接口是什么呢?
@Component
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MoXiuRpcServer {
Class interfaceName() default void.class; //实现类需要暴露的接口名字
}
怎么使用呢
@MoXiuRpcServer(interfaceName = UserService.class)
public class UserServiceImpl implements UserService {
@Override
public String getUser() {
return null;
}
}
因为客户端需要根据接口生成代理对象,所以这里需要你指定他到底是那个接口的实现类。
2.编写Netty服务器
Netty服务器采用主从Reactor模式及主Reactor只负责建立连接,从Reactor会负责到客户端数据的读写等业务处理。
@Component
public class NettyServer implements ServerStart {
@Autowired
private RpcServerConfiguration rpcServerConfiguration;
@Override
public void start() {
//主Reactor 只负责连接建立所以只要1个线程就行
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boosGroup"));
//从Reactor 负责连接数据读写 传0默认为当前cpu核数*2个线程
EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("workerGroup"));
//业务线程池 负责业务处理可以不要
EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(8, new DefaultThreadFactory("businessGroup"));
ServerBootstrap bootstrap = new ServerBootstrap();
MxRequestHandle mxRequestHandle = new MxRequestHandle();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
//编码
pipeline.addLast("OneEncoding", new LengthFieldPrepender(4));
pipeline.addLast("MxResponseEncode", new MxRpcResponseEncode());
//解码
pipeline.addLast("OneDecode", new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
pipeline.addLast("MxRequestDecode", new MxRpcRequestDecode());
pipeline.addLast(eventExecutors, "MxRpcRequest", mxRequestHandle);
}
});
try {
ChannelFuture sync = bootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
eventExecutors.shutdownGracefully();
}
}
}
这里详解各个编码器和解码器
- OneEncoding 为二次编码器,new LengthFieldPrepender(4)为发送的每一条数据前面添加一个固定4个字节表示为数据长度,解决粘包,黏包问题,
- MxResponseEncode为一次编码器,把MxResponse转化为ByteBuf交给一次编码器进行封装。
- OneDecode为一次解码器,把封装为自动长度4个字节的数据后面的数据读出来交给MxRequestDecode二次解码器。
- MxRequestDecode为二次解码器,把ByteBuf对象转化为MxRequest对象然后交给MxRpcRequest业务处理器处理。
这里给出一个流程图
数据写出就不画了很懒
MxRquest对象 客户端请求数据 服务端拿到数据后可以根据该信息去查找对应的方法然后调用封装给客户端
@Data
@Builder
public class MxRequest {
//请求id
private String requestId;
//请求的类
private String className;
//请求的方法名
private String methodName;
//参数类型
private Class<?>[] parameterTypes;
//参数
private Object[] parameters;
}
MxReponse对象 响应对象
@Data
@Builder
public class MxResponse {
private String requestId;
private Object result;
}
MxRpcResponseEncode 把MxResponse转化为ByteBuf
public class MxRpcResponseEncode extends MessageToByteEncoder<MxResponse> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MxResponse mxResponse, ByteBuf byteBuf) throws Exception {
byteBuf.writeBytes(ProtostuffUtils.serialize(mxResponse));
}
}
MxRpcRequestDecode 把客户端发过来的ByteBuf转化为MxRequest对象给业务处理器去处理
public class MxRpcRequestDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
MxRequest deserialize = ProtostuffUtils.deserialize(bytes, MxRequest.class);
list.add(deserialize);
}
}
序列化采用的是Protostuff速度快也不用去用Proto生成的对象,不好用我就用封装的了。
业务处理handler
@ChannelHandler.Sharable
public class MxRequestHandle extends SimpleChannelInboundHandler<MxRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MxRequest mxRequest) throws Exception {
String requestId = mxRequest.getRequestId();
String className = mxRequest.getClassName();
Object[] parameters = mxRequest.getParameters();
Class<?>[] parameterTypes = mxRequest.getParameterTypes();
String methodName = mxRequest.getMethodName();
//通过类名找到这个类
Class bean = ServiceRegistrationScan.getBean(className);
//拿到调用的方法
Method method = bean.getMethod(methodName, parameterTypes);
//调用方法
Object invoke = method.invoke(bean, parameters);
//封装返回值
MxResponse build = MxResponse.builder().requestId(requestId).result(invoke).build();
//写出
channelHandlerContext.channel().writeAndFlush(build);
}
}
3.测试服务注册功能
可见该类已经注册到ZK中。
后续可带来客户端调试,大家也发现了这和dubbo的很像大概一致,后续带来dubbo源码解析。
© 版权声明
文章版权归作者所有,未经允许请勿转载,侵权请联系 admin@trc20.tw 删除。
THE END