手写一个简单的RPC框架
学习RPC框架,由繁化简,了解其本质原理
文章目录
- 项目简介
- 什么是RPC?
- 项目模块
- 项目代码
- common模块
- client模块
- server模块
- framework模块
- 测试
项目简介
什么是RPC?
RPC(Remote Procedure Call)即远程过程调用,不同于本地调用,RPC是指调用远端机器的函数或方法,且不需要关心底层的调用细节,如网络协议和传输协议等,对于调用者来说,和调用本地方法没有什么区别。
项目模块
- common模块:定义了用户接口和实体类User
- client模块:调用RPC框架的代理类,获取结果
- server模块:
- 实现common的接口,把实现类注册到注册中心中
- 调用RpcServer开启socket
- 根据RpcRequest类的信息,获取到注册中心的实现类
- 执行方法,返回结果,通过socket返回
- Rpc framework
- 注册中心
- RpcRequest,装载类的信息
- RpcServer:创建socket,接受客户端的请求
项目代码
common模块
实体类和定义的接口
package com.rpc.common; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * @Author: Yangmiao * @Date: 2023/2/8 10:37 * @Desc: 网络中传输的信息 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class User implements Serializable { private Integer id; private String name; private Integer age; private int sex; }
package com.rpc.common; /** * @Author: Yangmiao * @Date: 2023/2/8 10:38 * @Desc: */ public interface IUserService { User getById(Integer id); User getUsername(String userName); }
client模块
package com.rpc.client; import com.rpc.common.IUserService; import com.rpc.framework.proxy.RpcProxy; /** * @Author: Yangmiao * @Date: 2023/2/8 11:39 * @Desc: */ public class Client { public static void main(String[] args) { RpcProxy rpcProxy = new RpcProxy(); IUserService productService = (IUserService) rpcProxy.remoteCall("localhost", 10000, IUserService.class); System.out.println("productService = " + productService.getById(10)); } }
server模块
package com.rpc.server; import com.rpc.common.IUserService; import com.rpc.framework.Registry; import com.rpc.framework.RpcServer; /** * @Author: Yangmiao * @Date: 2023/2/8 11:37 * @Desc: * https://www.cnblogs.com/fantongxue/p/16004920.html */ public class Server { /** * 把接口和实现类注册到RPC的注册中心,然后通过RPC的RPCServer开启一个serversocket,监听某一个端口。 * @param args */ public static void main(String[] args) { Registry.put(IUserService.class.getName(), UserServiceImpl.class); new RpcServer().provide(10000); } }
package com.rpc.server; import com.rpc.common.User; import com.rpc.common.IUserService; /** * @Author: Yangmiao * @Date: 2023/2/8 11:35 * @Desc: */ public class UserServiceImpl implements IUserService { @Override public User getById(Integer id) { return User.builder() .id(id) .name("yangmiao") .age(100) .sex(1) .build(); } @Override public User getUsername(String userName) { return User.builder() .name(userName) .build(); } }
framework模块
package com.rpc.framework; import java.util.HashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @Author: Yangmiao * @Date: 2023/2/8 10:39 * @Desc: 注册中心 */ public class Registry { private final static HashMap map = new HashMap(); private final static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); /** * 读缓存 * @param key * @return */ public static Class get(String key){ // 读锁 Lock writeLock = readWriteLock.writeLock(); // 写锁 Lock readLock = readWriteLock.readLock(); Class v = null; readLock.lock(); try { v = map.get(key); }finally { readLock.unlock(); } if (v != null){ return v; } // 缓存中不存在 writeLock.lock(); try { v = map.get(key); if (v==null){ // 1.查询数据库 // 2.写入缓存 map.put(key,v); } }finally { writeLock.unlock(); } return v; } /** * 写缓存 * @param key * @param value * @return */ public static Class put(String key, Class value){ Lock writeLock = readWriteLock.writeLock(); writeLock.lock(); try { return map.put(key,value); }finally { writeLock.unlock(); } } public static boolean containsKey(String key){ return map.containsKey(key); } }
package com.rpc.framework; import lombok.Data; import java.io.Serializable; /** * @Author: Yangmiao * @Date: 2023/2/8 10:41 * @Desc: */ @Data public class RpcRequest implements Serializable { private String className; private String methodName; private Class[] types; private Object[] params; }
package com.rpc.framework; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Author: Yangmiao * @Date: 2023/2/8 10:42 * @Desc: */ public class RpcServer { /** * 创建线程池 */ private ExecutorService executors = Executors.newFixedThreadPool(5); public void provide(int port){ try { ServerSocket serverSocket = new ServerSocket(port); while (true){ Socket socket = serverSocket.accept(); executors.execute(new ProcessHandler(socket)); } } catch (IOException e) { e.printStackTrace(); } } }
package com.rpc.framework; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; /** * @Author: Yangmiao * @Date: 2023/2/8 10:50 * @Desc: 处理服务端逻辑 */ public class ProcessHandler implements Runnable { private Socket socket; public ProcessHandler(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream objectInputStream = null; ObjectOutputStream objectOutputStream = null; try { objectInputStream = new ObjectInputStream(socket.getInputStream()); RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Class clazz = null; // 判断是否存在于注册中心中 if (Registry.containsKey(rpcRequest.getClassName())){ clazz = Registry.get(rpcRequest.getClassName()); } Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getTypes()); Object result = method.invoke(clazz.newInstance(), rpcRequest.getParams()); // 返回结果 objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(result); objectOutputStream.flush(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); }finally { try { if (objectInputStream !=null){ objectInputStream.close(); } if (objectOutputStream !=null){ objectOutputStream.close(); } }catch (IOException e){ e.printStackTrace(); } } } }
代理
package com.rpc.framework.proxy; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; /** * @Author: Yangmiao * @Date: 2023/2/8 11:18 * @Desc: 创建动态代理 */ public class RpcProxy { public T remoteCall(String host,int port,Class clazz){ return (T) Proxy.newProxyInstance( clazz.getClassLoader(), (Class[]) new Class[]{clazz}, new RemoteInvocationHandler(host,port,clazz) ); } }
package com.rpc.framework.proxy; import com.rpc.framework.RpcRequest; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket; /** * @Author: Yangmiao * @Date: 2023/2/8 11:22 * @Desc: 代理类执行的逻辑 */ public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; private Class clazz; public RemoteInvocationHandler(String host,int port,Class clazz){ this.host = host; this.port = port; this.clazz = clazz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(clazz.getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setTypes(method.getParameterTypes()); rpcRequest.setParams(args); ObjectOutputStream objectOutputStream = null; ObjectInputStream objectInputStream = null; try { Socket socket = new Socket(host,port); // 发送消息 objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(rpcRequest); objectOutputStream.flush(); // 接受结果 objectInputStream = new ObjectInputStream(socket.getInputStream()); Object readObject = objectInputStream.readObject(); return readObject; }catch (Exception e){ e.printStackTrace(); }finally { try { if (objectInputStream !=null){ objectInputStream.close(); } if (objectOutputStream!=null){ objectOutputStream.close(); } }catch (IOException e){ e.printStackTrace(); } } return null; } }
测试
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。
还没有评论,来说两句吧...