#include "MQTTClientWrapper.h" #include "OtarcikCan.h" #include "CANDevice.h" #include "defines.h" #ifdef ENABLE_CHIPSET_DRIVER #include "ComputerBoardInterface.h" #endif CMQTTClientWrapper::CMQTTClientWrapper() { connect(&mMQTTClient,&QMqttClient::stateChanged,this,&CMQTTClientWrapper::StateChanged); connect(&mMQTTClient,&QMqttClient::errorChanged,this,&CMQTTClientWrapper::MQTTClientError); connect(&mMQTTClient,&QMqttClient::messageSent,this,&CMQTTClientWrapper::MQTTMessageSent); // connect(&mMQTTClient,SIGNAL(messageSent(qint32)),this,SLOT(MQTTMessageSent(qint32))); // connect(&mMQTTClient,&QMqttClient::messageStatusChanged,this,&CMQTTClientWrapper::MQTTMessageStatusChanged); // connect(&mMQTTClient,SIGNAL(messageStatusChanged(qint32,QMqtt::MessageStatus,QMqttMessageStatusProperties)),this,SLOT(MQTTMessageStatusChanged(qint32,QMqtt::MessageStatus,QMqttMessageStatusProperties))); mMQTTRefreshTimer = new QTimer; mMQTTRefreshTimer->setSingleShot(true); connect(mMQTTRefreshTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTSendTimerExpired); mMQTTReconnectTimer = new QTimer; mMQTTReconnectTimer->setSingleShot(false); connect(mMQTTReconnectTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTReconnectTimerExpired); mMQTTQueueFlushTimer = new QTimer; mMQTTQueueFlushTimer->setSingleShot(false); mMQTTQueueFlushTimer->setInterval(MQTT_CLIENT_MSG_QUEUE_FLUSH_TIMEOUT); connect(mMQTTQueueFlushTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTQueueFlushTimerExipred); mMQTTQueueFlushTimer->stop(); mProgramPtr = 0; #ifdef ENABLE_CHIPSET_DRIVER mCPUInterface = 0; #endif mMQTTPresenceCANBit = 0; mMessagesQueueMode = MQTT_DROP_MSG_MODE; mDisconnectionIsVoluntary = false; mIsClientConnecting = false; mCircularBufferStatusText = mBufferingModeText = mUploadTimeText = ""; #ifdef ENABLE_DEVELOPMENT_DEBUG_TOOLS mMQTTDisconnectionForced = false; #endif } CMQTTClientWrapper::~CMQTTClientWrapper() { delete mMQTTRefreshTimer; } int CMQTTClientWrapper::SetMQTTParams(CCloudParams *Params) { mMQTTParams = *Params; // mMQTTClientID = "Otarcik_Station_Ville_Emard"; return RET_OK; } #ifdef ENABLE_CHIPSET_DRIVER int CMQTTClientWrapper::SetCPUInterface(CComputerBoardInterface *CPUIF) { mCPUInterface = CPUIF; return RET_OK; } #endif int CMQTTClientWrapper::SetMQTTServerPresenceCANBit(quint64 CANBit) { mMQTTPresenceCANBit = CANBit; return RET_OK; } int CMQTTClientWrapper::StartMQTTClient() { mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; ConnectToBroker(); mCircularBufferStatusText = QString("0/%1 messages (0\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE); mBufferingModeText = "Buffering"; UpdateGUIBufferingStatus(); return RET_OK; } int CMQTTClientWrapper::ConnectToBroker() { //Setup the client before connecting. mMQTTClient.setAutoKeepAlive(true); mMQTTClient.setClientId(mMQTTParams.mMQTTClientID); mMQTTClient.setHostname(mMQTTParams.mMQTTBrokerHostName); mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort); mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword); mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName); mDisconnectionIsVoluntary = false; mMQTTClient.connectToHost(); return RET_OK; } int CMQTTClientWrapper::DisconnectFromBroker() { #ifndef ENABLE_DEVELOPMENT_DEBUG_TOOLS mMQTTClient.disconnectFromHost(); mDisconnectionIsVoluntary = true; mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages. #else mMQTTClient.disconnectFromHost(); if(mMQTTDisconnectionForced == true) { mDisconnectionIsVoluntary = false; //emulate an involuntary disconnection } else { mDisconnectionIsVoluntary = true; mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages. } #endif return RET_OK; } int CMQTTClientWrapper::NewMQTTMessages(QList MessagesList) { Q_UNUSED(MessagesList) // if(mMQTTClient.state() != QMqttClient::Connected) // { // return RET_GENERAL_ERROR; // } // if(mMQTTRefreshTimer->remainingTime() > 0) // { // // qDebug("MQTT timer value: %d",mMQTTRefreshTimer->remainingTime()); // return RET_OK; // } // if(MessagesList.isEmpty()) // { // return RET_GENERAL_ERROR; // } // for(int i = 0; i < MessagesList.size(); i++) // { // qint32 res = mMQTTClient.publish(MessagesList.at(i).mMessageTopic,MessagesList.at(i).mMessagePayload.toLocal8Bit(),0,true); // qDebug("%s : %s",qPrintable(MessagesList.at(i).mMessageTopic), qPrintable(MessagesList.at(i).mMessagePayload)); // } // mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); // qDebug("Sent %d MQTT messages",MessagesList.size()); return RET_OK; } quint64 CMQTTClientWrapper::GetMQTTServerPresenceCANMask() { if(mMQTTClient.state() == QMqttClient::Connected) { return 0; } quint64 Mask = 1; Mask <<= mMQTTPresenceCANBit; return Mask; } void CMQTTClientWrapper::StateChanged() { switch(mMQTTClient.state()) { case QMqttClient::Disconnected: { if(mIsClientConnecting) { //Connection attempt failed, just restart the timer... mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT); CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT déconnecté pendant une reconnexion. ","CMQTTClientWrapper",true,1); } else { CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT déconnecté.","CMQTTClientWrapper",true,1); mProgramPtr->SetMQTTConnectionSatusRequest(false); #ifndef ENABLE_DEVELOPMENT_DEBUG_TOOLS mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT); #else if(mMQTTDisconnectionForced == false) { mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT); } #endif #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE mMQTTQueueFlushTimer->stop(); #endif if(mDisconnectionIsVoluntary == false) { mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //We're disconnected, queue all the messages. CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Passage en mode buffering des messages MQTT","CMQTTClientWrapper",true,1); mBufferingModeText = "Buffering"; UpdateGUIBufferingStatus(); } } break; } case QMqttClient::Connected: { mProgramPtr->SetMQTTConnectionSatusRequest(true); mMQTTReconnectTimer->stop(); mIsClientConnecting = false; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT connecté.","CMQTTClientWrapper",true,1); if(mMQTTMessagesQueue.isEmpty() == false) { mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //Stay in (or enter) queue mode until we empty the buffer CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO non vide, passage au mode de vidage de la FIFO","CMQTTClientWrapper",true,2); mBufferingModeText = "Buffering"; mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100); UpdateGUIBufferingStatus(); #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE mMQTTQueueFlushTimer->start(); #else SendNextQueueMessage(); #endif } else { mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; mBufferingModeText = "Non-buffering"; UpdateGUIBufferingStatus(); } break; } case QMqttClient::Connecting: { mIsClientConnecting = true; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT en cours de connexion... ","CMQTTClientWrapper",true,1); break; } } } int CMQTTClientWrapper::SendMQTTTestMessage() { mMQTTClient.publish(QString("Otarcik/Test"),QString("Test Payload").toLocal8Bit(),0,true); qDebug("Sending MQTT test message"); return RET_OK; } int CMQTTClientWrapper::SetCANDevicesList(QList *List) { mCANDevicesList = List; return RET_OK; } void CMQTTClientWrapper::MQTTSendTimerExpired() { if(mMessagesQueueMode == MQTT_DROP_MSG_MODE) { return; } if(mCANDevicesList == 0) { return; } //Send the CANbus devices messsages for(int j = 0; j < mCANDevicesList->size(); j++) { switch(mMessagesQueueMode) { case MQTT_TRANSMIT_MSG_MODE: { CCANDevice *Device = mCANDevicesList->at(j); QList *MessagesList = Device->GetMQTTMessagesList(); #ifdef ENABLE_CHIPSET_DRIVER if(mCPUInterface != 0) { MessagesList->append(GetSystemStatusMessage()); } #endif if(MessagesList != 0) { for(int i = 0; i < MessagesList->size(); i++) { qint32 res = mMQTTClient.publish(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload.toLocal8Bit(),1,true); QString LogMsg = QString("Envoi d'un message MQTT. Topic: %1 Payload: %2 Result: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(res); CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3); } // qDebug("Sent %d MQTT messages",MessagesList->size()); } break; } case MQTT_QUEUE_MSG_MODE: { CCANDevice *Device = mCANDevicesList->at(j); QList *MessagesList = Device->GetMQTTMessagesList(); #ifdef ENABLE_CHIPSET_DRIVER if(mCPUInterface != 0) { MessagesList->append(GetSystemStatusMessage()); } #endif if(MessagesList != 0) { for(int i = 0; i < MessagesList->size(); i++) { CMQTTMessage *NewMsg = new CMQTTMessage(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload); if(mMQTTMessagesQueue.size() >= MQTT_CLIENT_MSG_QUEUE_SIZE) { delete mMQTTMessagesQueue.takeFirst(); CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO pleine, retrait du premier message.","CMQTTClientWrapper",true,3); } mMQTTMessagesQueue.append(NewMsg); QString LogMsg = QString("Ajout d'un message MQTT à la FIFO. Topic: %1 Payload: %2 FIFO size: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(mMQTTMessagesQueue.size()); CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3); mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100); UpdateGUIBufferingStatus(); } } break; } case MQTT_DROP_MSG_MODE://Just to please the compiler. break; } } mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); } void CMQTTClientWrapper::MQTTReconnectTimerExpired() { if(mMQTTClient.state() == QMqttClient::Connected) { mMQTTReconnectTimer->stop(); //Already connected.... return; } CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tentative de connexion au serveur MQTT...","CMQTTClientWrapper",true,1); ConnectToBroker(); } void CMQTTClientWrapper::MQTTQueueFlushTimerExipred() { #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe { mMQTTQueueFlushTimer->stop(); mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; return; } SendNextQueueMessage(); #endif } int CMQTTClientWrapper::SendNextQueueMessage() { if(mMQTTMessagesQueue.size() == 0) { //Should not happen, this is a logical problem with the program mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Erreur de logique dans SendNextQueueMessage(). MessageQueue vide!","CMQTTClientWrapper",true,2); return RET_GENERAL_ERROR; } CMQTTMessage *Msg = mMQTTMessagesQueue.takeFirst(); mMQTTClient.publish(Msg->mMessageTopic,Msg->mMessagePayload.toLocal8Bit(),1,true); QString LogMsg = QString("Envoi d'un message MQTT provenant du buffer. Topic: %1 Payload: %2 FIFO Size: %2").arg(Msg->mMessageTopic).arg(Msg->mMessagePayload).arg(mMQTTMessagesQueue.size()); CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3); delete Msg; //free memory if(mMQTTMessagesQueue.isEmpty()) { #ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE mMQTTQueueFlushTimer->stop(); #endif mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tous les messages MQTT de la FIFO ont été envoyés au serveur, passage en mode de transmission normal","CMQTTClientWrapper",true,2); } UpdateGUIBufferingStatus(); return RET_OK; } int CMQTTClientWrapper::UpdateGUIBufferingStatus() { if(mMessagesQueueMode == MQTT_QUEUE_MSG_MODE) { mBufferingModeText = "Buffering"; } else { mBufferingModeText = "Transmitting"; } mCircularBufferStatusText = QString("%1").arg(mMQTTMessagesQueue.size()); mProgramPtr->UpdateMQTTBufferingStatusRequest(mBufferingModeText,mCircularBufferStatusText,0); return RET_OK; } void CMQTTClientWrapper::MQTTClientError(QMqttClient::ClientError error) { CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Erreur du client MQTT: %1").arg(error),"CMQTTClientWrapper",true,2); } //This function is used only when flushing the msg queue void CMQTTClientWrapper::MQTTMessageSent(qint32 MsgID) { Q_UNUSED(MsgID) // qDebug("MQTT msg ID %d sent",MsgID); #ifndef MQTT_USE_TIMER_TO_FLUSH_QUEUE if(mMessagesQueueMode != MQTT_QUEUE_MSG_MODE) { return; } if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe { mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE; return; } CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Message MQTT de la FIFO transmis, transmission du prochain...","CMQTTClientWrapper",true,3); SendNextQueueMessage(); #endif } void CMQTTClientWrapper::MQTTMessageStatusChanged(qint32 id, QMqtt::MessageStatus s, const QMqttMessageStatusProperties &properties) { qDebug("Mqtt %d message status changed",id); } CMQTTMessage CMQTTClientWrapper::GetSystemStatusMessage() { #ifdef ENABLE_CHIPSET_DRIVER if(mCPUInterface != 0) { CComputerBoardState tmp = mCPUInterface->GetComputerBoardState(); QString MsgPayload = "{"; QString MsgTopic = mMQTTParams.mMQTTTopicPrefix + "GeneralPCStatus"; //Millisecs to EPOCH timestamp format qint64 TimeStamp = QDateTime::currentDateTime().toMSecsSinceEpoch(); MsgPayload.append(QString("\"timestamp\":%1,").arg(TimeStamp)); MsgPayload.append(QString("\"CPU Temperature\":%1,").arg(tmp.mSystemTemperature)); MsgPayload.append(QString("\"Board 12V\":%1,").arg(tmp.mSystem12V)); MsgPayload.append(QString("\"IO Port\":%1").arg(tmp.mIOPortState)); MsgPayload.append("}"); // qDebug("%s",qPrintable(MsgPayload)); CMQTTMessage NewMessage(MsgTopic, MsgPayload); return NewMessage; } else { return CMQTTMessage(); } #else return CMQTTMessage(); #endif } #ifdef ENABLE_DEVELOPMENT_DEBUG_TOOLS int CMQTTClientWrapper::ForceMQTTClientDisconnection(bool Disconnect) { if(Disconnect == true) { CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Forcing MQTT client disconnection for debug tests...","CMQTTClientWrapper",true,3); mMQTTDisconnectionForced = true; DisconnectFromBroker(); } else { mMQTTDisconnectionForced = false; if(mMessagesQueueMode != MQTT_TRANSMIT_MSG_MODE) { CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Reconnecting MQTT client after being forced OFF...","CMQTTClientWrapper",true,3); ConnectToBroker(); } } } #endif