C rabbitmq-c使用实例
/*
author: lwh
*/
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <mlcLog.h>
#include <iniparser.h>
void *recvMessage(void* args){
mlcDebug("call recvMessage()\n");
amqp_connection_state_t conn;
amqp_socket_t *socket = NULL;
amqp_rpc_reply_t arrt;
//创建连接
conn = amqp_new_connection();
//打开socket
socket = amqp_tcp_socket_new(conn);
if(!socket){
mlcErrx("new socket failed!\n");
return 0;
}
if(amqp_socket_open(socket,"10.247.58.172",5672) != AMQP_STATUS_OK){
mlcErrx("open socket failed!\n");
}
//登录rabbitMQ
arrt = amqp_login(conn,"/",0,524288,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
if(arrt.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION){
mlcErrx("login mq failed!\n");
return 0;
}
amqp_channel_open(conn, 10);
amqp_get_rpc_reply(conn);
//设置QOS
short prefetchCount=1000;
amqp_basic_qos(conn,10,0,prefetchCount,false);
mlcDebug("rcv data……!\n");
//接收消息,指定队列
amqp_basic_consume(conn, 10, amqp_cstring_bytes("queue.test"), amqp_empty_bytes, 0, false, 0, amqp_empty_table);
amqp_get_rpc_reply(conn);
amqp_frame_t frame;
while(true){
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
ret = amqp_consume_message(conn, &envelope, NULL, 0);
if (ret.reply_type == AMQP_RESPONSE_NORMAL ){
char *messageBody = new char[envelope.message.body.len];
memcpy(messageBody,envelope.message.body.bytes,envelope.message.body.len);
//printf("messageBody.len=%i messageBody.bytes=%s\n",envelope.message.body.len,messageBody);
mlcDebug("messageBody.bytes=");
int i = 0;
while(messageBody[i] != EOF){
//printf("%X ",messageBody[i]);
i++;
}
//printf("\n");
if(amqp_basic_ack(conn,10,envelope.delivery_tag,false)>0){
mlcWarnx("failing to send the ack to the broker\n");
}
free(messageBody);
amqp_destroy_envelope(&envelope);
}else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION){
mlcErrx("amqp_consume_message faild!\n");
}
}
}