#include "MQTTClientWrapper.h" #include "OtarcikCan.h" #include "CANDevice.h" #include "defines.h" #include "ComputerBoardInterface.h" CMQTTClientWrapper::CMQTTClientWrapper() { connect(&mMQTTClient,&QMqttClient::stateChanged,this,&CMQTTClientWrapper::StateChanged); connect(&mMQTTClient,&QMqttClient::errorChanged,this,&CMQTTClientWrapper::MQTTClientError); connect(&mMQTTClient,&QMqttClient::messageSent,this,&CMQTTClientWrapper::MQTTMessageSent); mMQTTRefreshTimer = new QTimer; mMQTTRefreshTimer->setSingleShot(true); connect(mMQTTRefreshTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTSendTimerExpired); mMQTTReconnectTimer = new QTimer; mMQTTReconnectTimer->setSingleShot(false); connect(mMQTTReconnectTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTReconnectTimerExpired); mMQTTQueueFlushTimer = new QTimer; mMQTTQueueFlushTimer->setSingleShot(false); mMQTTQueueFlushTimer->setInterval(MQTT_CLIENT_MSG_QUEUE_FLUSH_TIMEOUT); connect(mMQTTQueueFlushTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTQueueFlushTimerExipred); mMQTTQueueFlushTimer->stop(); mProgramPtr = 0; mCPUInterface = 0; mMQTTPresenceCANBit = 0; mMessagesQueueMode = MQTT_DROP_MSG_MODE; mDisconnectionIsVoluntary = false; mIsClientConnecting = false; mCircularBufferStatusText = mBufferingModeText = mUploadTimeText = ""; #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE #endif } CMQTTClientWrapper::~CMQTTClientWrapper() { delete mMQTTRefreshTimer; } int CMQTTClientWrapper::SetMQTTParams(CCloudParams *Params) { mMQTTParams = *Params; // mMQTTClientID = "Otarcik_Station_Ville_Emard"; return RET_OK; } int CMQTTClientWrapper::SetCPUInterface(CComputerBoardInterface *CPUIF) { mCPUInterface = CPUIF; return RET_OK; } int CMQTTClientWrapper::SetMQTTServerPresenceCANBit(quint64 CANBit) { mMQTTPresenceCANBit = CANBit; } int CMQTTClientWrapper::StartMQTTClient() { mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; ConnectToBroker(); mCircularBufferStatusText = QString("0/%1 messages (0\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE); mBufferingModeText = "Buffering"; UpdateGUIBufferingStatus(); return RET_OK; } int CMQTTClientWrapper::ConnectToBroker() { //Setup the client before connecting. mMQTTClient.setAutoKeepAlive(true); mMQTTClient.setClientId(mMQTTParams.mMQTTClientID); mMQTTClient.setHostname(mMQTTParams.mMQTTBrokerHostName); mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort); mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword); mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName); mDisconnectionIsVoluntary = false; mMQTTClient.connectToHost(); return RET_OK; } int CMQTTClientWrapper::DisconnectFromBroker() { mMQTTClient.disconnectFromHost(); mDisconnectionIsVoluntary = true; mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages. return RET_OK; } int CMQTTClientWrapper::NewMQTTMessages(QList MessagesList) { // if(mMQTTClient.state() != QMqttClient::Connected) // { // return RET_GENERAL_ERROR; // } // if(mMQTTRefreshTimer->remainingTime() > 0) // { // // qDebug("MQTT timer value: %d",mMQTTRefreshTimer->remainingTime()); // return RET_OK; // } // if(MessagesList.isEmpty()) // { // return RET_GENERAL_ERROR; // } // for(int i = 0; i < MessagesList.size(); i++) // { // qint32 res = mMQTTClient.publish(MessagesList.at(i).mMessageTopic,MessagesList.at(i).mMessagePayload.toLocal8Bit(),0,true); // qDebug("%s : %s",qPrintable(MessagesList.at(i).mMessageTopic), qPrintable(MessagesList.at(i).mMessagePayload)); // } // mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); // qDebug("Sent %d MQTT messages",MessagesList.size()); return RET_OK; } quint64 CMQTTClientWrapper::GetMQTTServerPresenceCANMask() { if(mMQTTClient.state() == QMqttClient::Connected) { return 0; } quint64 Mask = 1; Mask <<= mMQTTPresenceCANBit; return Mask; } void CMQTTClientWrapper::StateChanged() { switch(mMQTTClient.state()) { case QMqttClient::Disconnected: { if(mIsClientConnecting) { //Connection attempt failed, just restart the timer... mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT); CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT déconnecté pendant une reconnexion. ","CMQTTClientWrapper",true,1); } else { CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT déconnecté.","CMQTTClientWrapper",true,1); mProgramPtr->SetMQTTConnectionSatusRequest(false); mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT); #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE mMQTTQueueFlushTimer->stop(); #endif if(mDisconnectionIsVoluntary == false) { mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //We're disconnected, queue all the messages. CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Passage en mode buffering des messages MQTT","CMQTTClientWrapper",true,1); mBufferingModeText = "Buffering"; UpdateGUIBufferingStatus(); } } break; } case QMqttClient::Connected: { mProgramPtr->SetMQTTConnectionSatusRequest(true); mMQTTReconnectTimer->stop(); mIsClientConnecting = false; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT connecté.","CMQTTClientWrapper",true,1); if(mMQTTMessagesQueue.isEmpty() == false) { mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //Stay in (or enter) queue mode until we empty the buffer CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO non vide, passage au mode de vidage de la FIFO","CMQTTClientWrapper",true,2); mBufferingModeText = "Buffering"; mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100); UpdateGUIBufferingStatus(); #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE mMQTTQueueFlushTimer->start(); #else SendNextQueueMessage(); #endif } else { mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; mBufferingModeText = "Non-buffering"; UpdateGUIBufferingStatus(); } break; } case QMqttClient::Connecting: { mIsClientConnecting = true; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT en cours de connexion... ","CMQTTClientWrapper",true,1); break; } } } int CMQTTClientWrapper::SendMQTTTestMessage() { mMQTTClient.publish(QString("Otarcik/Test"),QString("Test Payload").toLocal8Bit(),0,true); qDebug("Sending MQTT test message"); return RET_OK; } int CMQTTClientWrapper::SetCANDevicesList(QList *List) { mCANDevicesList = List; return RET_OK; } void CMQTTClientWrapper::MQTTSendTimerExpired() { if(mMessagesQueueMode == MQTT_DROP_MSG_MODE) { return; } if(mCANDevicesList == 0) { return; } //Send the CANbus devices messsages for(int j = 0; j < mCANDevicesList->size(); j++) { switch(mMessagesQueueMode) { case MQTT_TRANSMIT_MSG_MODE: { CCANDevice *Device = mCANDevicesList->at(j); QList *MessagesList = Device->GetMQTTMessagesList(); if(mCPUInterface) { CComputerBoardState tmp =mCPUInterface->GetComputerBoardState(); } if(MessagesList != 0) { for(int i = 0; i < MessagesList->size(); i++) { qint32 res = mMQTTClient.publish(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload.toLocal8Bit(),0,true); QString LogMsg = QString("Envoi d'un message MQTT. Topic: %1 Payload: %2 Result: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(res); CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3); } // qDebug("Sent %d MQTT messages",MessagesList->size()); } break; } case MQTT_QUEUE_MSG_MODE: { CCANDevice *Device = mCANDevicesList->at(j); QList *MessagesList = Device->GetMQTTMessagesList(); if(mCPUInterface) { CComputerBoardState tmp =mCPUInterface->GetComputerBoardState(); } if(MessagesList != 0) { for(int i = 0; i < MessagesList->size(); i++) { CMQTTMessage *NewMsg = new CMQTTMessage(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload); if(mMQTTMessagesQueue.size() >= MQTT_CLIENT_MSG_QUEUE_SIZE) { delete mMQTTMessagesQueue.takeFirst(); CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO pleine, retrait du premier message.","CMQTTClientWrapper",true,3); } mMQTTMessagesQueue.append(NewMsg); QString LogMsg = QString("Ajout d'un message MQTT à la FIFO. Topic: %1 Payload: %2 FIFO size: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(mMQTTMessagesQueue.size()); CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3); mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100); UpdateGUIBufferingStatus(); } } break; } } } mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); } void CMQTTClientWrapper::MQTTReconnectTimerExpired() { if(mMQTTClient.state() == QMqttClient::Connected) { mMQTTReconnectTimer->stop(); //Already connected.... return; } CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tentative de connexion au serveur MQTT...","CMQTTClientWrapper",true,1); ConnectToBroker(); } void CMQTTClientWrapper::MQTTQueueFlushTimerExipred() { #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe { mMQTTQueueFlushTimer->stop(); mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; return; } SendNextQueueMessage(); #endif } int CMQTTClientWrapper::SendNextQueueMessage() { CMQTTMessage *Msg = mMQTTMessagesQueue.takeFirst(); qint32 res = mMQTTClient.publish(Msg->mMessageTopic,Msg->mMessagePayload.toLocal8Bit(),0,true); QString LogMsg = QString("Envoi d'un message MQTT provenant du buffer. Topic: %1 Payload: %2 FIFO Size: %3").arg(Msg->mMessageTopic).arg(Msg->mMessagePayload).arg(mMQTTMessagesQueue.size()); CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3); delete Msg; //free memory if(mMQTTMessagesQueue.isEmpty()) { #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE mMQTTQueueFlushTimer->stop(); #endif mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tous les messages MQTT de la FIFO ont été envoyés au serveur, passage en mode de transmission normal","CMQTTClientWrapper",true,2); } return RET_OK; } int CMQTTClientWrapper::UpdateGUIBufferingStatus() { //mProgramPtr->UpdateMQTTBufferingStatusRequest(mBufferingModeText,mCircularBufferStatusText,mRemainingBufferingTimeText); } void CMQTTClientWrapper::MQTTClientError(QMqttClient::ClientError error) { CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Erreur du client MQTT: %1").arg(error),"CMQTTClientWrapper",true,2); } //This function is used only when flushing the msg queue void CMQTTClientWrapper::MQTTMessageSent(qint32 MsgID) { #ifndef MQTT_USE_TIMER_TO_FLUSH_QUEUE if(mMessagesQueueMode != MQTT_QUEUE_MSG_MODE) { return; } if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe { mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; return; } SendNextQueueMessage(); #endif }