rabbitmq-c初探 (一)

2014-11-24 00:04:26 · 作者: · 浏览: 16

RabbitMQ着实是个好东西,当然了也有对C语言client开发的支持。例子和文档少的可怜,只能去项目里去查看example来理解,简单整理了一些,以免走些弯路。主要是在版本对应上,这点就没Maven好了,只能对好类库和例子。接下来我们简单看看需要的东东。

环境:Ubuntu 13.04

rabbitmq-server 默认的3.0.2-1

librabbitmq-dev 默认的0.0.1.hg216-1

项目构造用的qmake(这样简单不少)

1 consumer

1.1 consumer.pro的内容
SOURCES=utils.cpp amqp_consumer.cpp platform_utils.cpp

HEADERS=utils.h

VPATH+=/usr/include

CONFIG+=release

TARGET=consumer

LIBS += -lrabbitmq

1.2 amqp_consumer.cpp代码
这里的代码来自于rabbitmq-c-v0.3.0 具体查看 https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_consumer.c。(对于几个特殊的宏引用作了调整)

#include

#include

#include

#include

#include

#include

#include

#include "utils.h"

#define SUMMARY_EVERY_US 1000000


static void run(amqp_connection_state_t conn)

{

uint64_t start_time = now_microseconds();

int received = 0;

int previous_received = 0;

uint64_t previous_report_time = start_time;

uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

amqp_frame_t frame;

int result;

size_t body_received;

size_t body_target;

uint64_t now;


while (1) {

now = now_microseconds();

if (now > next_summary_time) {

int countOverInterval = received - previous_received;

double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

printf("%d ms: Received %d - %d since last report (%d Hz)\n",

(int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

previous_received = received;

previous_report_time = now;

next_summary_time += SUMMARY_EVERY_US;

}

amqp_maybe_release_buffers(conn);

result = amqp_simple_wait_frame(conn, &frame);

if (result < 0)

return;

if (frame.frame_type != AMQP_FRAME_METHOD)

continue;

if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)

continue;


result = amqp_simple_wait_frame(conn, &frame);

if (result < 0)

return;

if (frame.frame_type != AMQP_FRAME_HEADER) {

fprintf(stderr, "Expected header!");

abort();

}

body_target = frame.payload.properties.body_size;

body_received = 0;


while (body_received < body_target) {

result = amqp_simple_wait_frame(conn, &frame);

if (result < 0)

return;

if (frame.frame_type != AMQP_FRAME_BODY) {

fprintf(stderr, "Expected body!");

abort();

}

body_received += frame.payload.body_fragment.len;

assert(body_received <= body_target);

amqp_dump(frame.payload.body_fragment.bytes,frame.payload.body_fragment.len);

}

received++;

}

}


int main(int argc, char const * const *argv) {

char const *hostname;

int port;

char const *exchange;

char const *bindingkey;

int sockfd;

amqp_connection_state_t conn;

amqp_bytes_t queuename;


if (argc < 3) {

fprintf(stderr, "Usage: amqp_consumer host port\n");

return 1;

}

hostname = argv[1];

port = atoi(argv[2]);

exchange = "amq.direct"; /* argv[3]; */

bindingkey = "test queue"; /* argv[4]; */

conn = amqp_new_connection();

die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

amqp_set_sockfd(conn, sockfd);

die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

"Logging in");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening