ZeroMQ指南-第1章-基础-分而治之(一)

2014-11-24 07:57:51 · 作者: · 浏览: 0
分而治之
作为最终示例(你肯定对生动的代码开始生厌并希望回头去钻研关于比较性、抽象性准则的语言学探讨),让我们来做一个小型超级计算。然后喝个咖啡。我们的超级计算程序是个非常典型的并行处理模型。我们有:
一个通风机(ventilator)来产生可以并行处理的任务
一组工人(worker)来处理任务
一个水槽(sink)来回收工人处理的结果
事实上,工人运行于超快的机子,没准是GPU(图形处理单元)来做困难运算。这是通风机代码,生成100个任务,每个任务都是一条消息告诉工人休眠(sleep)几毫秒。
taskvent: Parallel task ventilator in C
[cpp]
//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers…\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
} www.2cto.com
printf ("Total expected cost: %d msec\n", total_msec);
sleep (1); // Give 0MQ time to deliver
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
图 5 - 并行管道
这是工人程序。接收消息,休眠指定的时间,然后表明自己完成任务:
taskwork: Parallel task worker in C
[cpp]
//
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
// Simple progress indicator for the viewer
fflush (stdout);
printf ("%s.", string);
// Do the work
s_sleep (atoi (string));
free (string);
// Send results to sink
s_send (sender, "");
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
这是水槽程序。它收集这100个任务,然后计算整个处理消耗的时间,让我们能够证实如果有多个工人时他们真的是并行运转的:
tasksink: Parallel task sink in C
[cpp]
//
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
#include "zhelpers.h"
int main (void)
{
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver