登录
    Technology changes quickly but people's minds change slowly.

自己动手实现简单的微服务rpc调用

技术宅 破玉 1149次浏览 0个评论

什么是rpc(来自百度百科)

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

参考资料:rpc原理
参考代码:rpc代码

Rpc要解决的问题

RPC要解决的两个问题:

  1. 解决分布式系统中,服务之间的调用问题。
  2. 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。

实际情况下,RPC很少用到http协议来进行数据传输,毕竟我只是想传输一下数据而已,何必动用到一个文本传输的应用层协议呢,我为什么不直接使用二进制传输?比如直接用Java的Socket协议进行传输?

要实现一个RPC不算难,难的是实现一个高性能高可靠的RPC框架。

比如,既然是分布式了,那么一个服务可能有多个实例,你在调用时,要如何获取这些实例的地址呢?

这时候就需要一个服务注册中心,比如在Dubbo里头,就可以使用Zookeeper作为注册中心,在调用时,从Zookeeper获取服务的实例列表,再从中选择一个进行调用。

那么选哪个调用好呢?这时候就需要负载均衡了,于是你又得考虑如何实现复杂均衡,比如Dubbo就提供了好几种负载均衡策略。

这还没完,总不能每次调用时都去注册中心查询实例列表吧,这样效率多低呀,于是又有了缓存,有了缓存,就要考虑缓存的更新问题,blablabla……

你以为就这样结束了,没呢,还有这些:

  • 客户端总不能每次调用完都干等着服务端返回数据吧,于是就要支持异步调用;
  • 服务端的接口修改了,老的接口还有人在用,怎么办?总不能让他们都改了吧?这就需要版本控制了;
  • 服务端总不能每次接到请求都马上启动一个线程去处理吧?于是就需要线程池;
  • 服务端关闭时,还没处理完的请求怎么办?是直接结束呢,还是等全部请求处理完再关闭呢?
  • ……

如此种种,都是一个优秀的RPC框架需要考虑的问题。

我们当前的目标是先实现个简单的rpc 调用,来理解我们分布式系统中服务调用的基本原理。

简单Rpc实现

实现的话,当然就分为客户端和服务端两步,参照我们之前介绍的 feignclient 我们来完成一个简单的rpcclient

整个流程的话,就是客户端调用远程服务端的某个方法,服务端最终返回数据给客户端。
<

客户端实现

客户端要发送什么数据给服务端呢?

我们在客户端调用的是服务端提供的接口,所以我们需要将客户端调用的信息传输过去,那么我们可以将要传输的信息分为两类

  • 第一类是服务端可以根据这个信息找到相应的接口实现类和方法
  • 第二类是调用此方法传输的参数信息

所以我们首先提供实体来保存这两类数据,由于是网络传输,还要保证我们的实体要序列化。

请求的话,我们就用RpcRequest 来保存

/**
 * rpc  request
 */
@Data
public class RpcRequest implements Serializable {

    private static final long serialVersionUID = -8558250604858740384L;
    private String className;
    private String methodName;
    private Class [] parameTypes;
    private Object [] parameters;
}

响应的话,我们用RpcResponse 来保存

/**
 * rpc response
 */
@Data
public class RpcResponse implements Serializable {
    private static final long serialVersionUID = 9201838302447943808L;
    private Object result;
}

那么我们如何将用户的请求封装到客户端,客户端再发给服务端呢?

借助于我们之前理解的 FeignClient和spring 容器的知识,我们可以利用注解和动态代理来实现请求封装和服务调用。

我们提供一个简单的注解RpcClient:


@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClient {

}

上面注解的作用和FeignClient 作用一样,来标示这是一个远程调用的接口,以便spring能够扫描到,并注入容器中。此过程,我们需要提供代理类,以便动态代理能够调用,提供工厂类,生成代理类,提供注册类,将bean注入到spring 容器。spring 已经为我们提供了这方面的开发流程,详细的问题,请自行学习spring的相关文章。

首先提供代理类,代理类是服务调用的核心,就是通过执行代理类的 invoke方法,将用户信息以及相关类信息序列化,通过 Socket 发给服务端,并解析服务端响应的数据,最终返回给用户。

@Slf4j
@Component
public class RpcDynamicProxy implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String requestJson = objectToJson(method,args);
        Socket client = new Socket("127.0.0.1", 20006);
        client.setSoTimeout(10000);
        //获取Socket的输出流,用来发送数据到服务端
        PrintStream out = new PrintStream(client.getOutputStream());
        //获取Socket的输入流,用来接收从服务端发送过来的数据
        BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
        //发送数据到服务端
        out.println(requestJson);
        RpcResponse response = new RpcResponse();
        Gson gson =new Gson();
        try{
            //从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常
            String responsJson = buf.readLine();
            response = gson.fromJson(responsJson, RpcResponse.class);
        }catch(SocketTimeoutException e){
            log.info("Time out, No response");
        }
        if(client != null){
            //如果构造函数建立起了连接,则关闭套接字,如果没有建立起连接,自然不用关闭
            client.close(); //只关闭socket,其关联的输入输出流也会被关闭
        }
        return response.getResult();
    }



    public String objectToJson(Method method,Object [] args){
        RpcRequest request = new RpcRequest();
        String methodName = method.getName();
        Class[] parameterTypes = method.getParameterTypes();
        String className = method.getDeclaringClass().getName();
        request.setMethodName(methodName);
        request.setParameTypes(parameterTypes);
        request.setParameters(args);
        request.setClassName(getClassName(className));
        GsonBuilder gsonBuilder = new GsonBuilder();
        gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
        Gson gson = gsonBuilder.create();
        return gson.toJson(request);
    }

    private String getClassName(String beanClassName){
        String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1);
        className = className.substring(0,1).toLowerCase() + className.substring(1);
        return className;
    }
}

提供 工厂类,生成代理类的对象

public class RpcClientFactoryBean implements FactoryBean {

    private Class classType;
    @Autowired
    private RpcDynamicProxy rpcDynamicProxy;

    public RpcClientFactoryBean(Class classType) {
        this.classType = classType;
    }

    @Override
    public Object getObject() throws Exception {
        ClassLoader classLoader = classType.getClassLoader();
        Object object = Proxy.newProxyInstance(classLoader,new Class[]{classType},rpcDynamicProxy);
        return object;
    }

    @Override
    public Class getObjectType() {
        return this.classType;
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

提供注册类,扫描相关RpcClient 注解的类,将bean注入spring 容器

public class RpcClientsRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        registerRpcClients(importingClassMetadata,registry);
    }

    /**
     * 根据注解 注册 rpcClients
     * @param metadata
     * @param registry
     */
    public void registerRpcClients(AnnotationMetadata metadata,
                                     BeanDefinitionRegistry registry) {
        ClassPathScanningCandidateComponentProvider provider = getScanner();
        //设置扫描器
        provider.addIncludeFilter(new AnnotationTypeFilter(RpcClient.class));
        //扫描此包下的所有带有@RpcClient的注解的类
        Set beanDefinitionSet = provider.findCandidateComponents("com.magicdu.rpc.rpcclient");
        for (BeanDefinition beanDefinition : beanDefinitionSet){
            if (beanDefinition instanceof AnnotatedBeanDefinition){
                //获得注解上的参数信息
                AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
                String beanClassAllName = beanDefinition.getBeanClassName();
                Map paraMap = annotatedBeanDefinition.getMetadata()
                        .getAnnotationAttributes(RpcClient.class.getCanonicalName());
                //将RpcClient的工厂类注册进去
                BeanDefinitionBuilder builder = BeanDefinitionBuilder
                        .genericBeanDefinition(RpcClientFactoryBean.class);
                //设置RpcClinetFactoryBean工厂类中的构造函数的值
                builder.addConstructorArgValue(beanClassAllName);
                builder.getBeanDefinition().setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
                //将其注册进容器中
                registry.registerBeanDefinition(
                        beanClassAllName,
                        builder.getBeanDefinition());
            }
        }
    }
    //允许Spring扫描接口上的注解
    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false) {
            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                return beanDefinition.getMetadata().isInterface() &amp;&amp; beanDefinition.getMetadata().isIndependent();
            }
        };
    }
}

然后将 注册类,引入 spring 配置,以便启动时扫描

@Configuration
@Import(RpcClientsRegistrar.class)
public class RpcConfig {
}

这样客户端我们就写完了。

服务端代码

服务端简单来说,就是开个端口,接受客户端发送的参数,找到对应的方法,调用完,返回数据就完了。

我们将所有的客户端调用的方法都用 @Service 注解标记,服务端启动时我们将所有Service注解的方法缓存起来,客户端调用时,通过反射,我们便能执行服务端的方法了。并且扫描完就启动相关Socket,我们提供一个 Socket ,等待客户端调用。

@Component
@Slf4j
public class RpcServiceConfig implements CommandLineRunner {
    @Autowired
    private ApplicationContext applicationContext;
    public static Map rpcServiceMap = new HashMap();

    @Override
    public void run(String... args) throws Exception {
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Service.class);
        for (Object bean: beansWithAnnotation.values()){
            Class clazz = bean.getClass();
            Class[] interfaces = clazz.getInterfaces();
            for (Class inter : interfaces){
                rpcServiceMap.put(getClassName(inter.getName()),bean);
                log.info("已经加载的服务:"+inter.getName());
            }
        }
        startPort();
    }
    private String getClassName(String beanClassName){
        String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1);
        className = className.substring(0,1).toLowerCase() + className.substring(1);
        return className;
    }

    public void  startPort() throws IOException {
                //服务端在20006端口监听客户端请求的TCP连接
        ServerSocket server = new ServerSocket(20006);
        Socket client = null;
        boolean f = true;
        while (f) {
            //等待客户端的连接,如果没有获取连接
            client = server.accept();
           System.out.println("与客户端连接成功!");
           //为每个客户端连接开启一个线程
            new Thread(new ServerThread(client)).start();
        }
        server.close();
    }
}

提供线程,供客户端调用,如果收到客户端请求,就去执行对应的方法。

@Slf4j
public class ServerThread implements Runnable {

    private Socket client = null;

    public ServerThread(Socket client) {
        this.client = client;
    }

    @Override
    public void run() {
        try {
            //获取Socket的输出流,用来向客户端发送数据
            PrintStream out = new PrintStream(client.getOutputStream());
            //获取Socket的输入流,用来接收从客户端发送过来的数据
            BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
            boolean flag = true;
            while (flag) {
                //接收从客户端发送过来的数据
                String str = buf.readLine();
                if (str == null || "".equals(str)) {
                    flag = false;
                } else {
                    out.println(CommonDeal.getInvokeMethodMes(str));
                }
            }
            out.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

执行方法的实现如下:

@Slf4j
public class CommonDeal {

    public static String getInvokeMethodMes(String str){
        GsonBuilder gsonBuilder = new GsonBuilder();
        gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
        Gson gson = gsonBuilder.create();
        RpcRequest request = gson.fromJson(str, RpcRequest.class);
        return gson.toJson(invokeMethod(request));
    }

    private static RpcResponse invokeMethod(RpcRequest request) {
        String className = request.getClassName();
        String methodName = request.getMethodName();
        Object[] parameters = request.getParameters();
        Class[] parameTypes = request.getParameTypes();
        Object o = RpcServiceConfig.rpcServiceMap.get(className);
        RpcResponse response = new RpcResponse();
        try {
            Method method = o.getClass().getDeclaredMethod(methodName, parameTypes);
            Object invokeMethod = method.invoke(o, parameters);
            response.setResult(invokeMethod);
        } catch (NoSuchMethodException e) {
            log.info("没有找到" + methodName);
        } catch (IllegalAccessException e) {
            log.info("执行错误" + parameters);
        } catch (InvocationTargetException e) {
            log.info("执行错误" + parameters);
        }
        return response;
    }
}

总结

其中还有很多的功能需要完善,例如一个完整RPC框架肯定还需要服务注册与发现,而且双方通信肯定也不能是直接开启一个线程一直在等着,肯定需要是异步的等等的各种功能。这个例子只是简单帮助我们了解相关Rpc的思路以及简单的实现。


华裳绕指柔, 版权所有丨如未注明 , 均为原创|转载请注明自己动手实现简单的微服务rpc调用
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址