#include "MQTTAsyncPublish.h" #include using namespace boost; MQTTAsyncPublish::MQTTAsyncPublish() { } MQTTAsyncPublish::~MQTTAsyncPublish() { } bool MQTTAsyncPublish::init(const char* ADDRESS,const char* CLIENTID, set topicVec) { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; //MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc; MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); MQTTAsync_setCallbacks(client, this, connlost, msgarrvd, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); format fmt("Failed to start connect, return code %d"); fmt %rc; //ILog::GetInstance()->LOG_WARNING(fmt.str()); return false; } for(auto var:topicVec) { topicVecs.insert(var); } return true; } bool MQTTAsyncPublish::init(const char * ADDRESS, const char * CLIENTID, std::string topic) { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; //MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc; MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); MQTTAsync_setCallbacks(client, this, connlost, msgarrvd, NULL); conn_opts.username = "admin"; conn_opts.password = "123456"; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); format fmt("Failed to start connect, return code %d"); fmt %rc; //ILog::GetInstance()->LOG_WARNING(fmt.str()); return false; } topicVecs.insert(topic); return true; } bool MQTTAsyncPublish::setTopic(const char * topic,int QOS) { topicVecs.insert(topic); this->QOS = QOS; return true; } void MQTTAsyncPublish::onSend(void * context, MQTTAsync_successData * response) { printf("Message with token value %d delivery confirmed\n", response->token); format fmt("Message with token value %d delivery confirmed;消息成功发送 %s"); fmt % response->token % voidToStr(&(response->alt.pub.message)); //ILog::GetInstance()->LOG_INFO(fmt.str()); } void MQTTAsyncPublish::connlost(void * context, char * cause) { MQTTAsyncPublish* thisbp = (MQTTAsyncPublish*)context; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; printf("\nConnection lost\n"); if (cause) printf(" cause: %s\n", cause); printf("Reconnecting\n"); format fmt("Connection lost cause: %s"); if (cause) fmt % cause; else fmt % "未知"; //ILog::GetInstance()->LOG_CRITICAL(fmt.str()); //ILog::GetInstance()->LOG_INFO("Reconnecting\n"); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = thisbp->client; if ((rc = MQTTAsync_connect(thisbp->client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); format fmt("Failed to start connect, return code %d"); fmt % rc; //ILog::GetInstance()->LOG_WARNING(fmt.str()); thisbp->finished = 1; } } bool MQTTAsyncPublish::pushMsg(int topic, string msg) { format fmt("消息发送:Server->PC, topic:%d,msg:%s"); fmt % topic % msg; //ILog::GetInstance()->LOG_INFO(fmt.str()); MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc; opts.onSuccess = onSend; opts.context = client; pubmsg.payload = (void*)msg.c_str(); pubmsg.payloadlen = (int)msg.length(); //(int)strlen(msg.c_str()); pubmsg.qos = QOS; pubmsg.retained = 0; deliveredtoken = 0; if ((rc = MQTTAsync_sendMessage(client, to_string(topic).c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); format fmt("Failed to start sendMessage, return code %d"); fmt % rc; //ILog::GetInstance()->LOG_WARNING(fmt.str()); return false; } return true; } bool MQTTAsyncPublish::subscribe(const std::string & strTopic, int QOS) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; deliveredtoken = 0; if ((rc = MQTTAsync_subscribe(client, strTopic.c_str(), QOS, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return code %d\n", rc); return false; } return true; } bool MQTTAsyncPublish::unsubscribe(const std::string & strTopic) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; //opts.onSuccess = onSubscribe; //opts.onFailure = onSubscribeFailure; opts.context = client; if ((rc = MQTTAsync_unsubscribe(client, strTopic.c_str(), &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return code %d\n", rc); return false; } return true; } int MQTTAsyncPublish::msgarrvd(void * context, char * topicName, int topicLen, MQTTAsync_message * message) { MQTTAsyncPublish* thisbp = (MQTTAsyncPublish*)context; char* payloadptr; payloadptr = static_cast (message->payload); string src; for (int i = 0; i < message->payloadlen; i++) { src += (*payloadptr++); } printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: %s\n", src.c_str()); format fmt("MQTT收到消息:PC->Server, topic:%d,msg:%s"); fmt % topicName % src; //ILog::GetInstance()->LOG_INFO(fmt.str()); if (thisbp->_msgArrvdFun != NULL) thisbp->_msgArrvdFun(topicName, src); MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void MQTTAsyncPublish::onDisconnect(void * context, MQTTAsync_successData * response) { printf("Successful disconnection\n"); //ILog::GetInstance()->LOG_INFO("Successful disconnection"); disc_finished = 1; } void MQTTAsyncPublish::onSubscribe(void * context, MQTTAsync_successData * response) { printf("Subscribe succeeded\n"); format fmt("Subscribe succeeded"); //ILog::GetInstance()->LOG_INFO(fmt.str()); } void MQTTAsyncPublish::onSubscribeFailure(void * context, MQTTAsync_failureData * response) { printf("Subscribe failed, rc %d\n", response ? response->code : 0); format fmt("Subscribe failed, rc %d"); fmt % (response ? response->code : 0); //ILog::GetInstance()->LOG_WARNING(fmt.str()); } void MQTTAsyncPublish::onConnectFailure(void * context, MQTTAsync_failureData * response) { printf("Connect failed, rc code = %d,msg = %s\n", response ? response->code : 0, response ? response->message : ""); format fmt("Connect failed, rc %d"); fmt % (response ? response->code : 0); // ILog::GetInstance()->LOG_WARNING(fmt.str()); //ILog::GetInstance()->LOG_INFO("Reconnecting\n"); MQTTAsync client = (MQTTAsync)context; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { format fmt("Failed to Reconnecting, return code %d"); fmt % rc; // ILog::GetInstance()->LOG_WARNING(fmt.str()); } } void MQTTAsyncPublish::onConnect(void * context, MQTTAsync_successData * response) { printf("Successful connection\n"); MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; for(auto var:topicVecs) { if ((rc = MQTTAsync_subscribe(client, var.c_str(), 2, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe topic %s, return code %d\n",var.c_str(), rc); format fmt("Failed to start subscribe topic %s, return code %d"); fmt % var.c_str() % rc; //ILog::GetInstance()->LOG_ERROR(fmt.str()); } else { format fmt("Success to start subscribe topic %s, return code %d"); fmt %var.c_str() %rc; //ILog::GetInstance()->LOG_INFO(fmt.str()); } } } set MQTTAsyncPublish::topicVecs;