C++多进程并发框架(二)

2014-11-24 11:08:43 · 作者: · 浏览: 1
接和Service 相连接,而是通过Broker 中间层完成了消息传递。关于Broker 模式可以参见:http://blog.chinaunix.net/uid-23093301-id-90459.html
进程间通信采用TPC,而不是多线程使用的共享内存方式。Service 一般是单线程架构的,通过启动多进程实现相对于多线程的并发。由于Broker模式天生石分布式的,所以有很好的Scalability。
消息时序图
C++多进程并发框架

如何注册服务和接口
来看一下Echo 服务的实现:

struct echo_service_t
{
public:
void echo(echo_t::in_t& in_msg_, rpc_callcack_t& cb_)
{
logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));
echo_t::out_t out;
out.value = in_msg_.value;
cb_(out);
}
};

int main(int argc, char* argv[])
{
int g_index = 1;
if (argc > 1)
{
g_index = atoi(argv[1]);
}
char buff[128];
snprintf(buff, sizeof(buff), "tcp://%s:%s", "127.0.0.1", "10241");

msg_bus_t msg_bus;
assert(0 == singleton_t::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");

echo_service_t f;

singleton_t::instance().create_service_group("echo");
singleton_t::instance().create_service("echo", g_index)
.bind_service(&f)
.reg(&echo_service_t::echo);

signal_helper_t::wait();

singleton_t::instance().close();
//usleep(1000);
cout <<"\noh end\n";
return 0;
}
●create_service_group 创建一个服务group,一个服务组可能有多个并行的实例
●create_service 以特定的id 创建一个服务实例
●reg 为该服务注册接口
●接口的定义规范为void echo(echo_t::in_t& in_msg_, rpc_callcack_t& cb_),第一个参数为输入的消息struct,第二个参数为回调函数的模板特例,模板参数为返回消息的struct 类型。接口无需知道发送消息等细节,只需将结果callback 即可。
●注册到Broker 后,所有Client都可获取该服务
消息定义的规范
我们约定每个接口(远程或本地都应满足)都包含一个输入消息和一个结果消息。来看一下echo 服务的消息定义:

struct echo_t
{
struct in_t: public msg_i
{
in_t():
msg_i("echo_t::in_t")
{}
virtual string encode()
{
return (init_encoder() << value).get_buff();
}
virtual void decode(const string& src_buff_)
{
init_decoder(src_buff_) >> value;
}

string value;
};
struct out_t: public msg_i
{
out_t():
msg_i("echo_t::out_t")
{}
virtual string encode()
{
return (init_encoder() << value).get_buff();
}
virtual void decode(const string& src_buff_)
{
init_decoder(src_buff_) >> value;
}

string value;
};
};
●每个接口必须包含in_t消息和out_t消息,并且他们定义在接口名(如echo _t)的内部
●所有消息都继承于msg_i, 其封装了二进制的序列化、反序列化等。构造时赋予类型名作为消息的名称。
●每个消息必须实现encode 和 decode 函数
这里需要指出的是,FFLIB 中不需要为每个消息定义对应的CMD。当接口如echo向Broker 注册时,reg接口通过C++ 模板的类型推断会自动将该msg name 注册给Broker, Broker为每个msg name 分配唯一的msg_id。Msg_bus 中自动维护了msg_name 和msg_id 的映射。Msg_i 的定义如下:

struct msg_i : public codec_i
{
msg_i(const char* msg_name_):
cmd(0),
uuid(0),
service_group_id(0),
service_id(0),
msg_id(0),
msg_name(msg_name_)
{}

void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)
{
service_group_id = group_id;
service_id = id_;
uuid = uuid_;
msg_id = msg_id_;
}

uint16_t cmd;
uint16_t get_group_id() const{ return service_group_id; }
uint16_t get_service_id() const{ return service_id; }
uint32_t get_uuid() const{ return uuid; }

uint16_t get_msg_id() const{ return msg_id; }
const string& get_name() const
{
if (msg_name.empty() == false)
{
return msg_name;
}