龙空技术网

Java手写分布式系统远程调用RPC框架

Java耕耘 865

前言:

现时兄弟们对“javaaccept”大体比较关心,咱们都想要分析一些“javaaccept”的相关内容。那么小编同时在网络上搜集了一些有关“javaaccept””的相关内容,希望咱们能喜欢,看官们一起来学习一下吧!

一、RPC简介

最近看hadoop底层通信,都是通过RPC实现的。

RPC(Remote Procedure Call Protocol)远程调用: 远程过程调用是一种常用的分布式网络通信协议,它允许运行于 一台计算机的程序调用另一台计算机的子程序,同时将网络的通信细节隐藏起来, 使得用户无须额外地为这个交互作用编程。分布式系统之间的通信大都通过RPC实现

二、RPC请求过程

client发起服务调用请求client stub代理程序将调用的方法,参数按照一定格式封装,通过服务方的地址,发起网络请求消息通过网络发送到服务端,server stub接收到消息,进行解包,反射调用本地对应的服务本地服务执行将结果返回给server stub,然后server stub会将结果消息打包返回到客户端client stub接收消息解码,得到最终结果.

三、RPC框架架构

要写一个RPC框架,需要哪些组成部分?

序列化方式。序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储。远程代理对象,一般使用jdk动态代理或者cglib代理服务暴露 设置注册中心Zookeeper网络通信,基于事件驱动的Reactor模式

四、RPC框架示例

服务提供者,运行在服务器端,提供服务接口定义与服务实现类服务发布者,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用服务消费者,运行在客户端,通过远程代理对象调用远程服务

服务端代码

服务接口:

//计算学生年龄和的接口public interface CalculateService { String cal(Student sta, Student stb);}public class CalculateServiceImpl implements CalculateService { @Override public String cal(Student sta, Student stb) { return "学生年龄之和:" + (sta.getAge() + stb.getAge()); }}

服务发布

public class PublishUtilI { //服务接口集合 private static List<Object> serviceList; private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10)); public static void publish(int port,Object... services) throws IOException { serviceList= Arrays.asList(services); ServerSocket server = new ServerSocket(port); Socket client; while (true) { //阻塞等待请求 client = server.accept(); //使用线程池处理请求 executor.submit(new ServerHandler(client, serviceList)); } }}

反射调用服务

读取客户端发送的服务名判断服务是否发布如果发布,反射调用服务端对应服务返回结果给客户端

public class ServerHandler implements Runnable { private Socket client = null; private List<Object> serviceList = null; public ServerHandler(Socket client, List<Object> service) { this.client = client; this.serviceList = service; } @Override public void run() { try ( ObjectInputStream input = new ObjectInputStream(client.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(client.getOutputStream()) ) { // 读取客户端要访问那个service Class serviceClass = (Class) input.readObject(); // 找到该服务类 Object obj = findService(serviceClass); if (obj == null) { output.writeObject(serviceClass.getName() + "服务未发现"); } else { //利用反射调用该方法,返回结果 String methodName = input.readUTF(); //读取UTF编码的String字符串 //读取参数类型 Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); //读取参数 Object[] arguments = (Object[]) input.readObject(); Method method = obj.getClass().getMethod(methodName, parameterTypes); //反射执行方法 Object result = method.invoke(obj, arguments); output.writeObject(result); } } catch (Exception e) { e.printStackTrace(); } } private Object findService(Class serviceClass) { for (Object obj : serviceList) { boolean isFather = serviceClass.isAssignableFrom(obj.getClass()); if (isFather) { return obj; } } return null; }}

客户端代码

public class Client { public static void main(String[] args) { CallProxyHandler handler = new CallProxyHandler("127.0.0.1", 1111); CalculateService calculateService = handler.getService(CalculateService.class); Student sta = new Student(1); Student stb = new Student(2); String result = calculateService.cal(sta, stb); System.out.println(result); }}

创建代理类远程调用服务端发布的服务

 public class CallProxyHandler implements InvocationHandler { private String ip; private int port; public CallProxyHandler(String ip, int port) { this.ip = ip; this.port = port; } /** * 获取代理对象 * @param clazz * @param <T> * @return */ @SuppressWarnings("all") public <T> T getService(Class<T> clazz) { return (T) Proxy.newProxyInstance(CallProxyHandler.class.getClassLoader(), new Class<?>[] {clazz}, this); }  /** * 将需要调用服务的方法名,参数类型,参数按照一定格式封装发送至服务端 * 读取服务端返回的结果 * @param proxy * @param method * @param args * @return * @throws Throwable */ @SuppressWarnings("all") @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try ( Socket socket = new Socket(ip, port); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()) ) { output.writeObject(proxy.getClass().getInterfaces()[0]); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); output.flush(); Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } }}

至此,一个简单的RPC服务调用框架完成。但是存在很多问题:

使用java自带的序列化,效率不高,可以使用Hadoop Avro与protobuf使用BIO方式进行网络传输,高并发情况无法应对,使用Netty框架进行网络通信缺少注册中心,服务注册可以使用Zookeeper进行管理。

关注我:私信回复“555”获取往期Java高级架构资料、源码、笔记、视频Dubbo、Redis、Netty、zookeeper、Spring cloud、分布式、高并发等架构技术往期架构视频

标签: #javaaccept