RPC 通信框架
Created 2021.02.26 by William Yu; Last modified: 2022.08.09-V1.2.3
Contact: windmillyucong@163.com
Copyleft! 2022 William Yu. Some rights reserved.
RPC 通信框架
远程过程调用(Remote Procedure Call, RPC)
References
- https://ketao1989.github.io/2016/12/10/rpc-theory-in-action/
Basic Concepts
LPC & IPC 进程间通信
- Windows 编程里面称为LPC
-
Linux 编程里面 IPC ,即进程间通信
- Linux 环境下 IPC 实现的方式
- 管道
- 共享内存
- 信号量
- socket 套接字
- 消息队列(已逐渐淘汰)
- 信号
- 通常使用共享内存完成进程间通信,通过信号量完成进程间的同步协调
管道
”|” 操作符
- shell 中的 “|” 操作符
- Linux 每个命令都是一个进程
-
各个进程的标准输出 STDOUT,标准输入 STDIN
-
Linux 管道分类
-
匿名管道
-
只有父子进程间的通信
-
创建方法:pipe()
1 2
#include <uinstd.h> int pipe(int filedes[2]);
- 参数:
- filedes返回两个文件描述符
- filedes[0] 为读而打开
- filedes[1] 为写而打开
- filedes返回两个文件描述符
- 参数:
-
-
命名管道
-
可在单台机器内的任何一组进程之间通信
-
创建方法: mkfifo()
1 2 3
#include <sys/types.h> #include <sys/stat.h> int mkfifo(const char * pathname, mode_t mode);
- 返回
- 成功返回0,失败返回 -1
- 返回
-
-
信号量 Semaphore
- 主要用于同步协作
- 对它的操作都是原子进行
- 一般提供两种方法:
- P 即wait()
- V 即notify()
- P(sv):如果sv的值大于零,就给它减1;如果它的值为零,就挂起该进程的执行s
- V(sv):如果有其他进程因等待sv而被挂起,就让它恢复运行,如果没有进程因等待sv而挂起,就给它加1
- linux对外提供的API接口方法如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct sem {
short sempid; /* pid of last operaton */
ushort semval; /* current value */
ushort semncnt; /* num procs awaiting increase in semval */
ushort semzcnt; /* num procs awaiting semval = 0 */
};
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/types.h>
//首先获取一个信号量,只有该方法可以才能直接使用key,其他方法必须先semget然后才能使用信号量
int semget(key_t key, int nsems, int flag);
//对信号量进行操作,直接控制信号量信息,比如删除信号量
int semctl(int semid, int semnum, int cmd, union semun arg);
//改变信号量的值,P,V操作都是通过该方法
int semop(int sem_id, struct sembuf *sem_opa, size_t num_sem_ops);
### 共享内存
- 同一台机器的硬件设备一般对于同一个系统来说,都是共享的
- 但是,正如进程和线程最大的区别就是一些资源是否隔离。不同的进程,其内存资源使用是隔离独立的,每个进程有自己的一套内存地址映射逻辑,也即是系统是无法直接从不同进程的相同虚拟内存地址找到共同的物理内存地址的,这样,就无法像线程一样,简单把数据对象设置为
static然后线程间就可以共享获取了。 - 因此,Linux对外提供了共享内存的方法来完成进程间通信。
- 对外提供的API如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
//创建共享内存空间,大小为size
int shmget(key_t key, size_t size, int shmflg);
//所有需要使用共享内存通信的进程,映射到自身的内存地址空间中
void *shmat(int shmid, void *addr, int flag);
//从当前进程地址空间中分离该共享内存
int shmdt(const void *shmaddr);
//控制共享内存的,比如删除该共享内存空间等
int shmctl(int shm_id, int command, struct shmid_ds *buf);
socket 套接字
- Socket一般情况下用在不同的两台机器的不同进程之间通信的
- 当Socket创建时的类型为
AF_LOCAL或AF_UNIX时,则是本地进程通信了(当然你也可以直接使用网络套接字)
Web Service 技术

- 技术体系,服务于架构体系
- web 服务技术产生的架构背景是 SOA(面向服务架构)和微服务
RPC
此时才出现RPC,本文重点记录 RPC 相关的内容,主角登场
RPC
RPC(Remote Procedure Call Protocol)远程过程调用协议
Basic Concepts
- 用于分布式系统服务群中开发应用
- 远程过程调用
- 客户端基于某种传输协议通过网络向服务端请求服务处理
- RPC 远程过程调用
- LPC & IPC 本地过程调用
- LPC Windows 编程
- IPC Linux 编程,即进程间通信
通用架构

- client
- server
- Message Protocol:在上文我们已经说到,一次完整的client-server的交互肯定是携带某种两端都能识别的,共同约定的消息格式。RPC的消息管理层专门对网络传输所承载的消息信息进行编码和解码操作。目前流行的技术趋势是不同的RPC实现,为了加强自身框架的效率都有一套(或者几套)私有的消息格式。
- Selector/Processor:存在于RPC服务端,用于服务器端某一个RPC接口的实现的特性(它并不知道自己是一个将要被RPC提供给第三方系统调用的服务)。所以在RPC框架中应该有一种“负责执行RPC接口实现”的角色。包括:管理RPC接口的注册、判断客户端的请求权限、控制接口实现类的执行在内的各种工作。
- IDL:实际上IDL(接口定义语言)并不是RPC实现中所必须的。但是需要跨语言的RPC框架一定会有IDL部分的存在。这是因为要找到一个各种语言能够理解的消息结构、接口定义的描述形式。如果您的RPC实现没有考虑跨语言性,那么IDL部分就不需要包括,例如JAVA RMI因为就是为了在JAVA语言间进行使用,所以JAVA RMI就没有相应的IDL。
rpc框架性能分析
- 使用的网络IO模型
- 基于的网络协议
- 应用层 HTTP协议
- 或者 传输层 TCP协议
- 消息封装格式
- Schema和序列化
- 序列化和反序列化,对象到二进制数据的转换
现有 RPC 框架
- 国内
- Dubbo 来自阿里巴巴 http://dubbo.I/O/
- Motan 新浪微博自用 https://github.com/weibocom/motan
- Dubbox 当当基于 dubbo 的 https://github.com/dangdangdotcom/dubbox
- rpcx 基于 Golang 的 https://github.com/smallnest/rpcx
- 国外
- Thrift from facebook https://thrift.apache.org
- Avro from hadoop https://avro.apache.org
- Finagle by twitter https://twitter.github.I/O/finagle
- gRPC by Google http://www.grpc.I/O (Google inside use Stuppy)
- Hessian from cuacho http://hessian.caucho.com
- Coral Service inside amazon (not open sourced)
- 上述列出来的都是现在互联网企业常用的解决方案,暂时不考虑传统的 SOAP,XML-RPC 等。这些是有网络资料的,实际上很多公司内部都会针对自己的业务场景,以及和公司内的平台相融合(比如监控平台等),自研一套框架,但是殊途同归,都逃不掉刚刚上面所列举的 RPC 的要考虑的各个部分。
结构
-
客户端 client
-
服务 server
-
服务管理 service_manager
-
代理 agent
-
服务代理 service_manager_agent_client
-
客户代理 service_manager_agent_server
-
数据包结构 rpc_data
rpc_data 数据包
- 两种数据包
- RPCRequest
- RPCReply
RPCRequest 请求数据包
1
2
3
4
5
6
7
8
9
10
11
class RPCRequest : public proto::RPCRequestBase{
public:
...
enum State { WAIT_SEND, WAIT_ACK, WAIT_RESULT, FINISHED }; // 4种状态
std::string topic_;
State state_ = WAIT_SEND;
uint64_t timestamp_;
private:
static uint8_t node_id_;
}
RPCReply 回复数据包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class RPCReply : public proto::RPCReplyBase {
public:
RPCReply() {}
RPCReply(const RPCRequest &request,
proto::ReplyAttribute attribute = proto::RESULT);
RPCReply(uint32_t req_id, uint8_t reply_node_id,
proto::ReplyAttribute attribute = proto::RESULT);
~RPCReply() {}
// clone
std::shared_ptr<RPCReply> Clone() const;
void Clone(const RPCReply &obj);
std::string topic_;
uint64_t timestamp_;
uint8_t reply_node_id_;
};
Server (virtual)
-
服务端
- ServiceManger类的一部分,提供服务端功能
-
方法
- 注册
- 发送回复
NodeServer
-
方法
- 注册服务
- 发送回复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class NodeServer : public Server {
public:
/**
* @brief Default constructor.
*/
NodeServer(PortDataType port_data_type, uint8_t node_id);
/**
* @brief Destructor.
*/
~NodeServer() override;
void RegisterService(const std::string &topic,
req_callback_t request_callback) override;
void SendReply(uint8_t node_id, const RPCReply &reply) const;
void SendReply(const std::string &topic, const RPCReply &reply) const override;
private:
// PIMPL
class Impl;
std::shared_ptr<Impl> impl_;
};
RpcService
-
功能
- 注册服务
- 发送回复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RpcServer : public Server {
public:
/**
* @brief Default constructor.
*/
explicit RpcServer(std::string name = "RpcServer",RPCBackEnd backend = FASTRTPS_PUB_SUB);
/**
* @brief Destructor.
*/
~RpcServer() override;
void RegisterService(const std::string &topic,
req_callback_t request_callback) override;
void SendReply(const std::string &topic,
const RPCReply &reply) const override;
private:
// PIMPL
class Impl;
std::shared_ptr<Impl> impl_;
};
Client (virtual)
-
客户端
- ServiceManger类的一部分,提供服务端功能
- 方法
- 发送请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Client {
public:
/**
* @brief Default constructor.
*/
explicit Client(std::string name = "");
/**
* @brief Destructor.
*/
virtual ~Client();
virtual void Send(const std::string &topic, const RPCRequest &request,
rep_callback_t reply_callback) = 0;
const std::string name() const;
private:
// PIMPL
class Impl;
std::shared_ptr<Impl> impl_;
};
NodeClient
-
功能
- 发送请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class NodeClient : public Client {
public:
/**
* @brief Default constructor.
*/
NodeClient(PortDataType port_data_type, uint8_t node_id);
/**
* @brief Destructor.
*/
~NodeClient() override;
void Send(const std::string &topic, const RPCRequest &request,
rep_callback_t reply_callback) override ;
private:
// PIMPL
class Impl;
std::shared_ptr<Impl> impl_;
};
RpcClient
-
功能
- 发送请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class RpcClient : public Client {
public:
/**
* @brief Default constructor.
*/
explicit RpcClient(std::string name = "RpcClient",
RPCBackEnd backend = FASTRTPS_PUB_SUB);
/**
* @brief Destructor.
*/
~RpcClient() override;
void Send(const std::string &topic, const RPCRequest &request,
rep_callback_t reply_callback) override;
private:
// PIMPL
class Impl;
std::shared_ptr<Impl> impl_;
};
ServiceManger
-
提供4个功能
- 开启代理(service_manager_agent_server)
- 注册服务
- 发送回复
- 发送请求
- 注册服务,和发送回复 -> 调用服务端
- 发送请求 -> 调用客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ServiceManager {
public:
/**
* @brief Default constructor.
*/
ServiceManager(PortDataType port_data_type, uint8_t node_id);
/**
* @brief Destructor.
*/
virtual ~ServiceManager();
void StartAgent(int port);
void RegisterService(const std::string &topic,
req_callback_t request_callback);
void SendRequest(const std::string &topic, const RPCRequest &request,
rep_callback_t reply_callback);
void SendReply(const RPCReply &reply);
private:
// PIMPL
class Impl;
std::shared_ptr<Impl> impl_;
};
service_manager_agent_server
-
服务管理器的远程代理服务端,部署在机器上的服务管理器,管理机器内程序的topic-list
-
由机器调用,debug_node会使用一个
1 2 3 4
实现一个 service_manager_ service_manager_->StartAgent(agent_port); 启动代理 service_manager_ 里面实现一个 service_manager_agent_server_
-
提供port即可,IP为机器自身IP
service_manager_agent_client
-
服务管理器的远程代理客户端
-
由笔记本调用
1
service_manager_ = std::make_shared<rpc::ServiceManagerAgentClient>(server_addr, port);
-
需要提供远程机器的IP和端口port
Process
local rpc cs Process
-
server_manager_ 启动
-
Server 服务端
- server_manager_ 注册 [topic, 回调函数]
1
2
3
server_manager_->RegisterService(
topic, [server_manager_](const RPCRequest &request) -> void
...
-
注册函数处理请求,返回reply
-
收到request,回调启动
-
回调里面构造reply,然后发送回复sendreply
-
Client 客户端
-
构造msg
-
server_manager_ 发送请求
1 2 3 4 5 6 7 8 9 10
server_manager_->SendRequest( topic, request_, [](const RPCReply &reply) -> void { static int recv_count = 0; LOG(ERROR) << "Recv sever reply:" << std::endl << "id:" << reply.req_id() << std::endl << "attribute:" << (int)reply.attribute() << std::endl; recv_count = recv_count + 1; LOG(ERROR) << "Recv count:" << recv_count; });
-
回调里面处理回复reply
-
remote rpc cs Process
- manage_agent 代理启动
Q
- topic 和 node_id 的关系?
- node_id 之间通过topic 会话,两个node之间可能有多个topic