int main(){
//openlog("mtrmbsd",LOG_NDELAY,0);
dictionary *ini;
ini = iniparser_load("cfg.ini");
if(ini == NULL){
return -1;
}
//iniparser_dump(ini,stderr);
char *tIp;
tIp = iniparser_getstring(ini,"mq:ip",NULL);
printf("ip = %s\n",tIp);
amqp_connection_state_t conn;
amqp_socket_t *socket = NULL;
amqp_rpc_reply_t arrt;
//创建连接
conn = amqp_new_connection();
//打开socket
socket = amqp_tcp_socket_new(conn);
if(!socket){
mlcErrx("new socket failed!\n");
return 0;
}
if(amqp_socket_open(socket,"10.247.58.172",5672) != AMQP_STATUS_OK){
mlcErrx("open socket failed!\n");
}
//登录rabbitMQ
arrt = amqp_login(conn,"/",0,1310172,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
if(arrt.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION){
mlcErrx("login mq failed!\n");
return 0;
}
//打开隧道
amqp_channel_open(conn, 1);
amqp_get_rpc_reply(conn);
//amqp_confirm_select_ok_t *acsot = amqp_confirm_select(conn , 1);
//printf("amqp_confirm_select_ok_t= %i",acsot->dummy);
//amqp_get_rpc_reply(conn);
/*创建线程*/
/*
pthread_t id;
int thread_result;
if ((thread_result = pthread_create(&id, NULL, recvMessage, NULL)) != 0){
mlcErrx("can't create thread:%s\n",strerror(thread_result));
return -1;
}*/
/*发送消息*/
FILE *fp = NULL;
if((fp =fopen("test.dat","r")) == NULL){
mlcErrx("read file error!");
}
fseek(fp, 0L, SEEK_END);
size_t size = ftell(fp);
rewind(fp);
size_t t_len =6;
char message[t_len];
//fputs(message,fp);
fclose(fp);
message[0] = 0xf0;
message = 0x00;
message = 0x00;
message = 0x00;
message = 0x09;
message = 0x24;
while(true){
amqp_bytes_t message_bytes;
message_bytes.len =t_len ;
//strcpy((char*)message_bytes.bytes,message);
message_bytes.bytes = message;
amqp_basic_publish(conn,1,amqp_cstring_bytes("mtrmlc.dataIn"),amqp_cstring_bytes("c093"),0,0,NULL,message_bytes);
sleep(10);
}
free(message);
//pthread_join(id, NULL);
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}