#include "MQTTClientWrapper.h" #include "OtarcikCan.h" #include "CANDevice.h" CMQTTClientWrapper::CMQTTClientWrapper() { connect(&mMQTTClient,&QMqttClient::stateChanged,this,&CMQTTClientWrapper::StateChanged); mMQTTRefreshTimer = new QTimer; mMQTTRefreshTimer->setSingleShot(true); connect(mMQTTRefreshTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTSendTimerExpired); mProgramPtr = 0; } CMQTTClientWrapper::~CMQTTClientWrapper() { delete mMQTTRefreshTimer; } int CMQTTClientWrapper::SetMQTTParams(CCloudParams *Params) { mMQTTParams = *Params; mMQTTClientID = "Otarcik_Station_Ville_Emard"; return RET_OK; } int CMQTTClientWrapper::ConnectToBroker() { //Setup the client before connecting. mMQTTClient.setAutoKeepAlive(true); mMQTTClient.setClientId(mMQTTClientID); mMQTTClient.setHostname(mMQTTParams.mMQTTBrokerHostName); mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort); mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword); mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName); mMQTTClient.connectToHost(); return RET_OK; } int CMQTTClientWrapper::DisconnectFromBroker() { mMQTTClient.disconnectFromHost(); return RET_OK; } int CMQTTClientWrapper::NewMQTTMessages(QList MessagesList) { // if(mMQTTClient.state() != QMqttClient::Connected) // { // return RET_GENERAL_ERROR; // } // if(mMQTTRefreshTimer->remainingTime() > 0) // { // // qDebug("MQTT timer value: %d",mMQTTRefreshTimer->remainingTime()); // return RET_OK; // } // if(MessagesList.isEmpty()) // { // return RET_GENERAL_ERROR; // } // for(int i = 0; i < MessagesList.size(); i++) // { // qint32 res = mMQTTClient.publish(MessagesList.at(i).mMessageTopic,MessagesList.at(i).mMessagePayload.toLocal8Bit(),0,true); // qDebug("%s : %s",qPrintable(MessagesList.at(i).mMessageTopic), qPrintable(MessagesList.at(i).mMessagePayload)); // } // mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); // qDebug("Sent %d MQTT messages",MessagesList.size()); return RET_OK; } void CMQTTClientWrapper::StateChanged() { switch(mMQTTClient.state()) { case QMqttClient::Disconnected: { qDebug("MQTT client Disconnected"); mProgramPtr->SetMQTTConnectionSatusRequest(false); mMQTTRefreshTimer->stop(); break; } case QMqttClient::Connected: { mProgramPtr->SetMQTTConnectionSatusRequest(true); mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); qDebug("MQTT client Connected"); break; } case QMqttClient::Connecting: { qDebug("MQTT client Connecting..."); break; } } } int CMQTTClientWrapper::SendMQTTTestMessage() { mMQTTClient.publish(QString("Otarcik/Test"),QString("Test Payload").toLocal8Bit(),0,true); qDebug("Sending MQTT test message"); return RET_OK; } int CMQTTClientWrapper::SetCANDevicesList(QList *List) { mCANDevicesList = List; return RET_OK; } void CMQTTClientWrapper::MQTTSendTimerExpired() { if(mMQTTClient.state() != QMqttClient::Connected) { return; } if(mCANDevicesList == 0) { return; } //Send the CANbus devices messsages for(int j = 0; j < mCANDevicesList->size(); j++) { CCANDevice *Device = mCANDevicesList->at(j); QList *MessagesList = Device->GetMQTTMessagesList(); if(MessagesList != 0) { for(int i = 0; i < MessagesList->size(); i++) { qint32 res = mMQTTClient.publish(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload.toLocal8Bit(),0,true); qDebug("%s : %s",qPrintable(MessagesList->at(i).mMessageTopic), qPrintable(MessagesList->at(i).mMessagePayload)); QString LogMsg = QString("Envoi d'un message MQTT. Topic: %1. Payload: %2").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload); // CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,false); } qDebug("Sent %d MQTT messages",MessagesList->size()); } } mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); }