什么是rpc(来自百度百科)
RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
Rpc要解决的问题
RPC要解决的两个问题:
- 解决分布式系统中,服务之间的调用问题。
- 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
实际情况下,RPC很少用到http协议来进行数据传输,毕竟我只是想传输一下数据而已,何必动用到一个文本传输的应用层协议呢,我为什么不直接使用二进制传输?比如直接用Java的Socket协议进行传输?
要实现一个RPC不算难,难的是实现一个高性能高可靠的RPC框架。
比如,既然是分布式了,那么一个服务可能有多个实例,你在调用时,要如何获取这些实例的地址呢?
这时候就需要一个服务注册中心,比如在Dubbo里头,就可以使用Zookeeper作为注册中心,在调用时,从Zookeeper获取服务的实例列表,再从中选择一个进行调用。
那么选哪个调用好呢?这时候就需要负载均衡了,于是你又得考虑如何实现复杂均衡,比如Dubbo就提供了好几种负载均衡策略。
这还没完,总不能每次调用时都去注册中心查询实例列表吧,这样效率多低呀,于是又有了缓存,有了缓存,就要考虑缓存的更新问题,blablabla……
你以为就这样结束了,没呢,还有这些:
- 客户端总不能每次调用完都干等着服务端返回数据吧,于是就要支持异步调用;
- 服务端的接口修改了,老的接口还有人在用,怎么办?总不能让他们都改了吧?这就需要版本控制了;
- 服务端总不能每次接到请求都马上启动一个线程去处理吧?于是就需要线程池;
- 服务端关闭时,还没处理完的请求怎么办?是直接结束呢,还是等全部请求处理完再关闭呢?
- ……
如此种种,都是一个优秀的RPC框架需要考虑的问题。
我们当前的目标是先实现个简单的rpc 调用,来理解我们分布式系统中服务调用的基本原理。
简单Rpc实现
实现的话,当然就分为客户端和服务端两步,参照我们之前介绍的 feignclient 我们来完成一个简单的rpcclient
整个流程的话,就是客户端调用远程服务端的某个方法,服务端最终返回数据给客户端。
<
客户端实现
客户端要发送什么数据给服务端呢?
我们在客户端调用的是服务端提供的接口,所以我们需要将客户端调用的信息传输过去,那么我们可以将要传输的信息分为两类
- 第一类是服务端可以根据这个信息找到相应的接口实现类和方法
- 第二类是调用此方法传输的参数信息
所以我们首先提供实体来保存这两类数据,由于是网络传输,还要保证我们的实体要序列化。
请求的话,我们就用RpcRequest 来保存
/**
* rpc request
*/
@Data
public class RpcRequest implements Serializable {
private static final long serialVersionUID = -8558250604858740384L;
private String className;
private String methodName;
private Class [] parameTypes;
private Object [] parameters;
}
响应的话,我们用RpcResponse 来保存
/**
* rpc response
*/
@Data
public class RpcResponse implements Serializable {
private static final long serialVersionUID = 9201838302447943808L;
private Object result;
}
那么我们如何将用户的请求封装到客户端,客户端再发给服务端呢?
借助于我们之前理解的 FeignClient和spring 容器的知识,我们可以利用注解和动态代理来实现请求封装和服务调用。
我们提供一个简单的注解RpcClient:
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClient {
}
上面注解的作用和FeignClient 作用一样,来标示这是一个远程调用的接口,以便spring能够扫描到,并注入容器中。此过程,我们需要提供代理类,以便动态代理能够调用,提供工厂类,生成代理类,提供注册类,将bean注入到spring 容器。spring 已经为我们提供了这方面的开发流程,详细的问题,请自行学习spring的相关文章。
首先提供代理类,代理类是服务调用的核心,就是通过执行代理类的 invoke方法,将用户信息以及相关类信息序列化,通过 Socket 发给服务端,并解析服务端响应的数据,最终返回给用户。
@Slf4j
@Component
public class RpcDynamicProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String requestJson = objectToJson(method,args);
Socket client = new Socket("127.0.0.1", 20006);
client.setSoTimeout(10000);
//获取Socket的输出流,用来发送数据到服务端
PrintStream out = new PrintStream(client.getOutputStream());
//获取Socket的输入流,用来接收从服务端发送过来的数据
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
//发送数据到服务端
out.println(requestJson);
RpcResponse response = new RpcResponse();
Gson gson =new Gson();
try{
//从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常
String responsJson = buf.readLine();
response = gson.fromJson(responsJson, RpcResponse.class);
}catch(SocketTimeoutException e){
log.info("Time out, No response");
}
if(client != null){
//如果构造函数建立起了连接,则关闭套接字,如果没有建立起连接,自然不用关闭
client.close(); //只关闭socket,其关联的输入输出流也会被关闭
}
return response.getResult();
}
public String objectToJson(Method method,Object [] args){
RpcRequest request = new RpcRequest();
String methodName = method.getName();
Class[] parameterTypes = method.getParameterTypes();
String className = method.getDeclaringClass().getName();
request.setMethodName(methodName);
request.setParameTypes(parameterTypes);
request.setParameters(args);
request.setClassName(getClassName(className));
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
Gson gson = gsonBuilder.create();
return gson.toJson(request);
}
private String getClassName(String beanClassName){
String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1);
className = className.substring(0,1).toLowerCase() + className.substring(1);
return className;
}
}
提供 工厂类,生成代理类的对象
public class RpcClientFactoryBean implements FactoryBean {
private Class classType;
@Autowired
private RpcDynamicProxy rpcDynamicProxy;
public RpcClientFactoryBean(Class classType) {
this.classType = classType;
}
@Override
public Object getObject() throws Exception {
ClassLoader classLoader = classType.getClassLoader();
Object object = Proxy.newProxyInstance(classLoader,new Class[]{classType},rpcDynamicProxy);
return object;
}
@Override
public Class getObjectType() {
return this.classType;
}
@Override
public boolean isSingleton() {
return false;
}
提供注册类,扫描相关RpcClient 注解的类,将bean注入spring 容器
public class RpcClientsRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
registerRpcClients(importingClassMetadata,registry);
}
/**
* 根据注解 注册 rpcClients
* @param metadata
* @param registry
*/
public void registerRpcClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider provider = getScanner();
//设置扫描器
provider.addIncludeFilter(new AnnotationTypeFilter(RpcClient.class));
//扫描此包下的所有带有@RpcClient的注解的类
Set beanDefinitionSet = provider.findCandidateComponents("com.magicdu.rpc.rpcclient");
for (BeanDefinition beanDefinition : beanDefinitionSet){
if (beanDefinition instanceof AnnotatedBeanDefinition){
//获得注解上的参数信息
AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
String beanClassAllName = beanDefinition.getBeanClassName();
Map paraMap = annotatedBeanDefinition.getMetadata()
.getAnnotationAttributes(RpcClient.class.getCanonicalName());
//将RpcClient的工厂类注册进去
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(RpcClientFactoryBean.class);
//设置RpcClinetFactoryBean工厂类中的构造函数的值
builder.addConstructorArgValue(beanClassAllName);
builder.getBeanDefinition().setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
//将其注册进容器中
registry.registerBeanDefinition(
beanClassAllName,
builder.getBeanDefinition());
}
}
}
//允许Spring扫描接口上的注解
protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false) {
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
};
}
}
然后将 注册类,引入 spring 配置,以便启动时扫描
@Configuration
@Import(RpcClientsRegistrar.class)
public class RpcConfig {
}
这样客户端我们就写完了。
服务端代码
服务端简单来说,就是开个端口,接受客户端发送的参数,找到对应的方法,调用完,返回数据就完了。
我们将所有的客户端调用的方法都用 @Service 注解标记,服务端启动时我们将所有Service注解的方法缓存起来,客户端调用时,通过反射,我们便能执行服务端的方法了。并且扫描完就启动相关Socket,我们提供一个 Socket ,等待客户端调用。
@Component
@Slf4j
public class RpcServiceConfig implements CommandLineRunner {
@Autowired
private ApplicationContext applicationContext;
public static Map rpcServiceMap = new HashMap();
@Override
public void run(String... args) throws Exception {
Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Service.class);
for (Object bean: beansWithAnnotation.values()){
Class clazz = bean.getClass();
Class[] interfaces = clazz.getInterfaces();
for (Class inter : interfaces){
rpcServiceMap.put(getClassName(inter.getName()),bean);
log.info("已经加载的服务:"+inter.getName());
}
}
startPort();
}
private String getClassName(String beanClassName){
String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1);
className = className.substring(0,1).toLowerCase() + className.substring(1);
return className;
}
public void startPort() throws IOException {
//服务端在20006端口监听客户端请求的TCP连接
ServerSocket server = new ServerSocket(20006);
Socket client = null;
boolean f = true;
while (f) {
//等待客户端的连接,如果没有获取连接
client = server.accept();
System.out.println("与客户端连接成功!");
//为每个客户端连接开启一个线程
new Thread(new ServerThread(client)).start();
}
server.close();
}
}
提供线程,供客户端调用,如果收到客户端请求,就去执行对应的方法。
@Slf4j
public class ServerThread implements Runnable {
private Socket client = null;
public ServerThread(Socket client) {
this.client = client;
}
@Override
public void run() {
try {
//获取Socket的输出流,用来向客户端发送数据
PrintStream out = new PrintStream(client.getOutputStream());
//获取Socket的输入流,用来接收从客户端发送过来的数据
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
boolean flag = true;
while (flag) {
//接收从客户端发送过来的数据
String str = buf.readLine();
if (str == null || "".equals(str)) {
flag = false;
} else {
out.println(CommonDeal.getInvokeMethodMes(str));
}
}
out.close();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行方法的实现如下:
@Slf4j
public class CommonDeal {
public static String getInvokeMethodMes(String str){
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
Gson gson = gsonBuilder.create();
RpcRequest request = gson.fromJson(str, RpcRequest.class);
return gson.toJson(invokeMethod(request));
}
private static RpcResponse invokeMethod(RpcRequest request) {
String className = request.getClassName();
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
Class[] parameTypes = request.getParameTypes();
Object o = RpcServiceConfig.rpcServiceMap.get(className);
RpcResponse response = new RpcResponse();
try {
Method method = o.getClass().getDeclaredMethod(methodName, parameTypes);
Object invokeMethod = method.invoke(o, parameters);
response.setResult(invokeMethod);
} catch (NoSuchMethodException e) {
log.info("没有找到" + methodName);
} catch (IllegalAccessException e) {
log.info("执行错误" + parameters);
} catch (InvocationTargetException e) {
log.info("执行错误" + parameters);
}
return response;
}
}
总结
其中还有很多的功能需要完善,例如一个完整RPC框架肯定还需要服务注册与发现,而且双方通信肯定也不能是直接开启一个线程一直在等着,肯定需要是异步的等等的各种功能。这个例子只是简单帮助我们了解相关Rpc的思路以及简单的实现。