什么是rpc(来自百度百科)
RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
Rpc要解决的问题
RPC要解决的两个问题:
- 解决分布式系统中,服务之间的调用问题。
- 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
实际情况下,RPC很少用到http协议来进行数据传输,毕竟我只是想传输一下数据而已,何必动用到一个文本传输的应用层协议呢,我为什么不直接使用二进制传输?比如直接用Java的Socket协议进行传输?
要实现一个RPC不算难,难的是实现一个高性能高可靠的RPC框架。
比如,既然是分布式了,那么一个服务可能有多个实例,你在调用时,要如何获取这些实例的地址呢?
这时候就需要一个服务注册中心,比如在Dubbo里头,就可以使用Zookeeper作为注册中心,在调用时,从Zookeeper获取服务的实例列表,再从中选择一个进行调用。
那么选哪个调用好呢?这时候就需要负载均衡了,于是你又得考虑如何实现复杂均衡,比如Dubbo就提供了好几种负载均衡策略。
这还没完,总不能每次调用时都去注册中心查询实例列表吧,这样效率多低呀,于是又有了缓存,有了缓存,就要考虑缓存的更新问题,blablabla……
你以为就这样结束了,没呢,还有这些:
- 客户端总不能每次调用完都干等着服务端返回数据吧,于是就要支持异步调用;
- 服务端的接口修改了,老的接口还有人在用,怎么办?总不能让他们都改了吧?这就需要版本控制了;
- 服务端总不能每次接到请求都马上启动一个线程去处理吧?于是就需要线程池;
- 服务端关闭时,还没处理完的请求怎么办?是直接结束呢,还是等全部请求处理完再关闭呢?
- ……
如此种种,都是一个优秀的RPC框架需要考虑的问题。
我们当前的目标是先实现个简单的rpc 调用,来理解我们分布式系统中服务调用的基本原理。
简单Rpc实现
实现的话,当然就分为客户端和服务端两步,参照我们之前介绍的 feignclient 我们来完成一个简单的rpcclient
整个流程的话,就是客户端调用远程服务端的某个方法,服务端最终返回数据给客户端。
<
客户端实现
客户端要发送什么数据给服务端呢?
我们在客户端调用的是服务端提供的接口,所以我们需要将客户端调用的信息传输过去,那么我们可以将要传输的信息分为两类
- 第一类是服务端可以根据这个信息找到相应的接口实现类和方法
- 第二类是调用此方法传输的参数信息
所以我们首先提供实体来保存这两类数据,由于是网络传输,还要保证我们的实体要序列化。
请求的话,我们就用RpcRequest 来保存
/** * rpc request */ @Data public class RpcRequest implements Serializable { private static final long serialVersionUID = -8558250604858740384L; private String className; private String methodName; private Class [] parameTypes; private Object [] parameters; }
响应的话,我们用RpcResponse 来保存
/** * rpc response */ @Data public class RpcResponse implements Serializable { private static final long serialVersionUID = 9201838302447943808L; private Object result; }
那么我们如何将用户的请求封装到客户端,客户端再发给服务端呢?
借助于我们之前理解的 FeignClient和spring 容器的知识,我们可以利用注解和动态代理来实现请求封装和服务调用。
我们提供一个简单的注解RpcClient:
@Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RpcClient { }
上面注解的作用和FeignClient 作用一样,来标示这是一个远程调用的接口,以便spring能够扫描到,并注入容器中。此过程,我们需要提供代理类,以便动态代理能够调用,提供工厂类,生成代理类,提供注册类,将bean注入到spring 容器。spring 已经为我们提供了这方面的开发流程,详细的问题,请自行学习spring的相关文章。
首先提供代理类,代理类是服务调用的核心,就是通过执行代理类的 invoke方法,将用户信息以及相关类信息序列化,通过 Socket 发给服务端,并解析服务端响应的数据,最终返回给用户。
@Slf4j @Component public class RpcDynamicProxy implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String requestJson = objectToJson(method,args); Socket client = new Socket("127.0.0.1", 20006); client.setSoTimeout(10000); //获取Socket的输出流,用来发送数据到服务端 PrintStream out = new PrintStream(client.getOutputStream()); //获取Socket的输入流,用来接收从服务端发送过来的数据 BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream())); //发送数据到服务端 out.println(requestJson); RpcResponse response = new RpcResponse(); Gson gson =new Gson(); try{ //从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常 String responsJson = buf.readLine(); response = gson.fromJson(responsJson, RpcResponse.class); }catch(SocketTimeoutException e){ log.info("Time out, No response"); } if(client != null){ //如果构造函数建立起了连接,则关闭套接字,如果没有建立起连接,自然不用关闭 client.close(); //只关闭socket,其关联的输入输出流也会被关闭 } return response.getResult(); } public String objectToJson(Method method,Object [] args){ RpcRequest request = new RpcRequest(); String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); String className = method.getDeclaringClass().getName(); request.setMethodName(methodName); request.setParameTypes(parameterTypes); request.setParameters(args); request.setClassName(getClassName(className)); GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory()); Gson gson = gsonBuilder.create(); return gson.toJson(request); } private String getClassName(String beanClassName){ String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1); className = className.substring(0,1).toLowerCase() + className.substring(1); return className; } }
提供 工厂类,生成代理类的对象
public class RpcClientFactoryBean implements FactoryBean { private Class classType; @Autowired private RpcDynamicProxy rpcDynamicProxy; public RpcClientFactoryBean(Class classType) { this.classType = classType; } @Override public Object getObject() throws Exception { ClassLoader classLoader = classType.getClassLoader(); Object object = Proxy.newProxyInstance(classLoader,new Class[]{classType},rpcDynamicProxy); return object; } @Override public Class getObjectType() { return this.classType; } @Override public boolean isSingleton() { return false; }
提供注册类,扫描相关RpcClient 注解的类,将bean注入spring 容器
public class RpcClientsRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { registerRpcClients(importingClassMetadata,registry); } /** * 根据注解 注册 rpcClients * @param metadata * @param registry */ public void registerRpcClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { ClassPathScanningCandidateComponentProvider provider = getScanner(); //设置扫描器 provider.addIncludeFilter(new AnnotationTypeFilter(RpcClient.class)); //扫描此包下的所有带有@RpcClient的注解的类 Set beanDefinitionSet = provider.findCandidateComponents("com.magicdu.rpc.rpcclient"); for (BeanDefinition beanDefinition : beanDefinitionSet){ if (beanDefinition instanceof AnnotatedBeanDefinition){ //获得注解上的参数信息 AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition; String beanClassAllName = beanDefinition.getBeanClassName(); Map paraMap = annotatedBeanDefinition.getMetadata() .getAnnotationAttributes(RpcClient.class.getCanonicalName()); //将RpcClient的工厂类注册进去 BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(RpcClientFactoryBean.class); //设置RpcClinetFactoryBean工厂类中的构造函数的值 builder.addConstructorArgValue(beanClassAllName); builder.getBeanDefinition().setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); //将其注册进容器中 registry.registerBeanDefinition( beanClassAllName, builder.getBeanDefinition()); } } } //允许Spring扫描接口上的注解 protected ClassPathScanningCandidateComponentProvider getScanner() { return new ClassPathScanningCandidateComponentProvider(false) { @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent(); } }; } }
然后将 注册类,引入 spring 配置,以便启动时扫描
@Configuration @Import(RpcClientsRegistrar.class) public class RpcConfig { }
这样客户端我们就写完了。
服务端代码
服务端简单来说,就是开个端口,接受客户端发送的参数,找到对应的方法,调用完,返回数据就完了。
我们将所有的客户端调用的方法都用 @Service 注解标记,服务端启动时我们将所有Service注解的方法缓存起来,客户端调用时,通过反射,我们便能执行服务端的方法了。并且扫描完就启动相关Socket,我们提供一个 Socket ,等待客户端调用。
@Component @Slf4j public class RpcServiceConfig implements CommandLineRunner { @Autowired private ApplicationContext applicationContext; public static Map rpcServiceMap = new HashMap(); @Override public void run(String... args) throws Exception { Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Service.class); for (Object bean: beansWithAnnotation.values()){ Class clazz = bean.getClass(); Class[] interfaces = clazz.getInterfaces(); for (Class inter : interfaces){ rpcServiceMap.put(getClassName(inter.getName()),bean); log.info("已经加载的服务:"+inter.getName()); } } startPort(); } private String getClassName(String beanClassName){ String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1); className = className.substring(0,1).toLowerCase() + className.substring(1); return className; } public void startPort() throws IOException { //服务端在20006端口监听客户端请求的TCP连接 ServerSocket server = new ServerSocket(20006); Socket client = null; boolean f = true; while (f) { //等待客户端的连接,如果没有获取连接 client = server.accept(); System.out.println("与客户端连接成功!"); //为每个客户端连接开启一个线程 new Thread(new ServerThread(client)).start(); } server.close(); } }
提供线程,供客户端调用,如果收到客户端请求,就去执行对应的方法。
@Slf4j public class ServerThread implements Runnable { private Socket client = null; public ServerThread(Socket client) { this.client = client; } @Override public void run() { try { //获取Socket的输出流,用来向客户端发送数据 PrintStream out = new PrintStream(client.getOutputStream()); //获取Socket的输入流,用来接收从客户端发送过来的数据 BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream())); boolean flag = true; while (flag) { //接收从客户端发送过来的数据 String str = buf.readLine(); if (str == null || "".equals(str)) { flag = false; } else { out.println(CommonDeal.getInvokeMethodMes(str)); } } out.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } }
执行方法的实现如下:
@Slf4j public class CommonDeal { public static String getInvokeMethodMes(String str){ GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory()); Gson gson = gsonBuilder.create(); RpcRequest request = gson.fromJson(str, RpcRequest.class); return gson.toJson(invokeMethod(request)); } private static RpcResponse invokeMethod(RpcRequest request) { String className = request.getClassName(); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); Class[] parameTypes = request.getParameTypes(); Object o = RpcServiceConfig.rpcServiceMap.get(className); RpcResponse response = new RpcResponse(); try { Method method = o.getClass().getDeclaredMethod(methodName, parameTypes); Object invokeMethod = method.invoke(o, parameters); response.setResult(invokeMethod); } catch (NoSuchMethodException e) { log.info("没有找到" + methodName); } catch (IllegalAccessException e) { log.info("执行错误" + parameters); } catch (InvocationTargetException e) { log.info("执行错误" + parameters); } return response; } }
总结
其中还有很多的功能需要完善,例如一个完整RPC框架肯定还需要服务注册与发现,而且双方通信肯定也不能是直接开启一个线程一直在等着,肯定需要是异步的等等的各种功能。这个例子只是简单帮助我们了解相关Rpc的思路以及简单的实现。