YULTek/Otarcik_CAN/Sources/CloudLogging/MQTTClientWrapper.cpp
2025-05-03 20:26:14 -04:00

607 lines
22 KiB
C++

#include "MQTTClientWrapper.h"
#include "OtarcikCan.h"
#include "CANDevice.h"
#include "defines.h"
#ifdef ENABLE_CHIPSET_DRIVER
#include "ComputerBoardInterface.h"
#endif
CMQTTClientWrapper::CMQTTClientWrapper()
{
connect(&mMQTTClient,&QMqttClient::stateChanged,this,&CMQTTClientWrapper::StateChanged);
connect(&mMQTTClient,&QMqttClient::errorChanged,this,&CMQTTClientWrapper::MQTTClientError);
connect(&mMQTTClient,&QMqttClient::messageSent,this,&CMQTTClientWrapper::MQTTMessageSent);
// connect(&mMQTTClient,SIGNAL(messageSent(qint32)),this,SLOT(MQTTMessageSent(qint32)));
// connect(&mMQTTClient,&QMqttClient::messageStatusChanged,this,&CMQTTClientWrapper::MQTTMessageStatusChanged);
// connect(&mMQTTClient,SIGNAL(messageStatusChanged(qint32,QMqtt::MessageStatus,QMqttMessageStatusProperties)),this,SLOT(MQTTMessageStatusChanged(qint32,QMqtt::MessageStatus,QMqttMessageStatusProperties)));
mMQTTRefreshTimer = new QTimer;
mMQTTRefreshTimer->setSingleShot(true);
connect(mMQTTRefreshTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTSendTimerExpired);
mMQTTReconnectTimer = new QTimer;
mMQTTReconnectTimer->setSingleShot(false);
connect(mMQTTReconnectTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTReconnectTimerExpired);
mMQTTQueueFlushTimer = new QTimer;
mMQTTQueueFlushTimer->setSingleShot(false);
mMQTTQueueFlushTimer->setInterval(MQTT_CLIENT_MSG_QUEUE_FLUSH_TIMEOUT);
connect(mMQTTQueueFlushTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTQueueFlushTimerExipred);
mMQTTQueueFlushTimer->stop();
mProgramPtr = 0;
#ifdef ENABLE_CHIPSET_DRIVER
mCPUInterface = 0;
#endif
mMQTTPresenceCANBit = 0;
mMessagesQueueMode = MQTT_DROP_MSG_MODE;
mDisconnectionIsVoluntary = false;
mIsClientConnecting = false;
mCircularBufferStatusText = mBufferingModeText = mUploadTimeText = "";
#ifdef ENABLE_DEVELOPMENT_DEBUG_TOOLS
mMQTTDisconnectionForced = false;
#endif
}
CMQTTClientWrapper::~CMQTTClientWrapper()
{
delete mMQTTRefreshTimer;
}
int CMQTTClientWrapper::SetMQTTParams(CCloudParams *Params)
{
mMQTTParams = *Params;
// mMQTTClientID = "Otarcik_Station_Ville_Emard";
return RET_OK;
}
#ifdef ENABLE_CHIPSET_DRIVER
int CMQTTClientWrapper::SetCPUInterface(CComputerBoardInterface *CPUIF)
{
mCPUInterface = CPUIF;
return RET_OK;
}
#endif
int CMQTTClientWrapper::SetMQTTServerPresenceCANBit(quint64 CANBit)
{
mMQTTPresenceCANBit = CANBit;
return RET_OK;
}
int CMQTTClientWrapper::StartMQTTClient()
{
// mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE;
ConnectToBroker();
mCircularBufferStatusText = QString("0/%1 messages (0\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE);
mBufferingModeText = "Buffering";
UpdateGUIBufferingStatus();
return RET_OK;
}
int CMQTTClientWrapper::ConnectToBroker()
{
//Setup the client before connecting.
mMQTTClient.setAutoKeepAlive(true);
mMQTTClient.setClientId(mMQTTParams.mMQTTClientID);
mMQTTClient.setHostname(mMQTTParams.mMQTTBrokerHostName);
mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort);
mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword);
mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName);
mDisconnectionIsVoluntary = false;
mMQTTClient.connectToHost();
return RET_OK;
}
int CMQTTClientWrapper::DisconnectFromBroker()
{
#ifndef ENABLE_DEVELOPMENT_DEBUG_TOOLS
mMQTTClient.disconnectFromHost();
mDisconnectionIsVoluntary = true;
mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages.
#else
mMQTTClient.disconnectFromHost();
if(mMQTTDisconnectionForced == true)
{
mDisconnectionIsVoluntary = false; //emulate an involuntary disconnection
}
else
{
mDisconnectionIsVoluntary = true;
mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages.
}
#endif
return RET_OK;
}
int CMQTTClientWrapper::NewMQTTMessages(QList<CMQTTMessage> MessagesList)
{
Q_UNUSED(MessagesList)
// if(mMQTTClient.state() != QMqttClient::Connected)
// {
// return RET_GENERAL_ERROR;
// }
// if(mMQTTRefreshTimer->remainingTime() > 0)
// {
// // qDebug("MQTT timer value: %d",mMQTTRefreshTimer->remainingTime());
// return RET_OK;
// }
// if(MessagesList.isEmpty())
// {
// return RET_GENERAL_ERROR;
// }
// for(int i = 0; i < MessagesList.size(); i++)
// {
// qint32 res = mMQTTClient.publish(MessagesList.at(i).mMessageTopic,MessagesList.at(i).mMessagePayload.toLocal8Bit(),0,true);
// qDebug("%s : %s",qPrintable(MessagesList.at(i).mMessageTopic), qPrintable(MessagesList.at(i).mMessagePayload));
// }
// mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
// qDebug("Sent %d MQTT messages",MessagesList.size());
return RET_OK;
}
quint64 CMQTTClientWrapper::GetMQTTServerPresenceCANMask()
{
if(mMQTTClient.state() == QMqttClient::Connected)
{
return 0;
}
quint64 Mask = 1;
Mask <<= mMQTTPresenceCANBit;
return Mask;
}
void CMQTTClientWrapper::StateChanged()
{
switch(mMQTTClient.state())
{
case QMqttClient::Disconnected:
{
if(mIsClientConnecting)
{
//Connection attempt failed, just restart the timer...
mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT);
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Client MQTT %1 déconnecté pendant une reconnexion. ").arg(mMQTTParams.mMQTTBrokerHostName),"CMQTTClientWrapper",true,1);
}
else
{
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Client MQTT %1 déconnecté.").arg(mMQTTParams.mMQTTBrokerHostName),"CMQTTClientWrapper",true,1);
mProgramPtr->SetMQTTConnectionSatusRequest(false);
#ifndef ENABLE_DEVELOPMENT_DEBUG_TOOLS
mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT);
#else
if(mMQTTDisconnectionForced == false)
{
mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT);
}
#endif
#ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE
mMQTTQueueFlushTimer->stop();
#endif
if(mDisconnectionIsVoluntary == false && mMQTTParams.mMQTTEnableDataLogging == true)
{
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //We're disconnected, queue all the messages.
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Passage en mode buffering des messages MQTT pour %1").arg(mMQTTParams.mMQTTBrokerHostName),"CMQTTClientWrapper",true,1);
mBufferingModeText = "Buffering";
UpdateGUIBufferingStatus();
}
}
break;
}
case QMqttClient::Connected:
{
mProgramPtr->SetMQTTConnectionSatusRequest(true);
mMQTTReconnectTimer->stop();
mIsClientConnecting = false;
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Client MQTT %1 connecté.").arg(mMQTTParams.mMQTTBrokerHostName),"CMQTTClientWrapper",true,1);
if(mMQTTMessagesQueue.isEmpty() == false)
{
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //Stay in (or enter) queue mode until we empty the buffer
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("FIFO non vide, passage au mode de vidage de la FIFO pour %1").arg(mMQTTParams.mMQTTBrokerHostName),"CMQTTClientWrapper",true,2);
mBufferingModeText = "Buffering";
mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100);
UpdateGUIBufferingStatus();
#ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE
mMQTTQueueFlushTimer->start();
#else
SendNextQueueMessage();
#endif
}
else
{
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
mBufferingModeText = "Non-buffering";
UpdateGUIBufferingStatus();
}
break;
}
case QMqttClient::Connecting:
{
mIsClientConnecting = true;
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Client MQTT %1 en cours de connexion... ").arg(mMQTTParams.mMQTTBrokerHostName),"CMQTTClientWrapper",true,1);
mProgramPtr->SetMQTTConnectionSatusRequest(false);
break;
}
}
}
int CMQTTClientWrapper::SendMQTTTestMessage()
{
mMQTTClient.publish(QString("Otarcik/Test"),QString("Test Payload").toLocal8Bit(),0,true);
qDebug("Sending MQTT test message");
return RET_OK;
}
int CMQTTClientWrapper::SetCANDevicesList(QList<CCANDevice *> *List)
{
mCANDevicesList = List;
return RET_OK;
}
int CMQTTClientWrapper::SendMQTTMessages(QList<CMQTTMessage> *MessagesList)
{
if(mMessagesQueueMode == MQTT_DROP_MSG_MODE)
{
return RET_OK;
}
if(mCANDevicesList == 0)
{
return RET_GENERAL_ERROR;
}
//Send the CANbus devices messsages
// for(int j = 0; j < mCANDevicesList->size(); j++)
{
switch(mMessagesQueueMode)
{
case MQTT_TRANSMIT_MSG_MODE:
{
// CCANDevice *Device = mCANDevicesList->at(j);
// QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
#ifdef ENABLE_CHIPSET_DRIVER
if(mCPUInterface != 0)
{
MessagesList->append(GetSystemStatusMessage());
}
#endif
if(MessagesList != 0)
{
for(int i = 0; i < MessagesList->size(); i++)
{
qint32 res = mMQTTClient.publish(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload.toLocal8Bit(),1,true);
QString LogMsg = QString("%4 : Envoi d'un message MQTT. Topic: %1 Payload: %2 Result: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(res).arg(mMQTTParams.mMQTTBrokerHostName);
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3);
}
// qDebug("Sent %d MQTT messages",MessagesList->size());
}
break;
}
case MQTT_QUEUE_MSG_MODE:
{
// CCANDevice *Device = mCANDevicesList->at(j);
// QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
#ifdef ENABLE_CHIPSET_DRIVER
if(mCPUInterface != 0)
{
MessagesList->append(GetSystemStatusMessage());
}
#endif
if(MessagesList != 0)
{
for(int i = 0; i < MessagesList->size(); i++)
{
CMQTTMessage *NewMsg = new CMQTTMessage(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload);
if(mMQTTMessagesQueue.size() >= MQTT_CLIENT_MSG_QUEUE_SIZE)
{
delete mMQTTMessagesQueue.takeFirst();
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO pleine, retrait du premier message.","CMQTTClientWrapper",true,3);
}
mMQTTMessagesQueue.append(NewMsg);
QString LogMsg = QString("Ajout d'un message MQTT à la FIFO. Topic: %1 Payload: %2 FIFO size: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(mMQTTMessagesQueue.size());
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3);
mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100);
UpdateGUIBufferingStatus();
}
}
break;
}
case MQTT_DROP_MSG_MODE://Just to please the compiler.
break;
}
}
// mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
return RET_OK;
}
void CMQTTClientWrapper::MQTTSendTimerExpired()
{
// if(mMessagesQueueMode == MQTT_DROP_MSG_MODE)
// {
// return;
// }
// if(mCANDevicesList == 0)
// {
// return;
// }
// //Send the CANbus devices messsages
// for(int j = 0; j < mCANDevicesList->size(); j++)
// {
// switch(mMessagesQueueMode)
// {
// case MQTT_TRANSMIT_MSG_MODE:
// {
// CCANDevice *Device = mCANDevicesList->at(j);
// QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
//#ifdef ENABLE_CHIPSET_DRIVER
// if(mCPUInterface != 0)
// {
// MessagesList->append(GetSystemStatusMessage());
// }
//#endif
// if(MessagesList != 0)
// {
// for(int i = 0; i < MessagesList->size(); i++)
// {
// qint32 res = mMQTTClient.publish(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload.toLocal8Bit(),1,true);
// QString LogMsg = QString("Envoi d'un message MQTT. Topic: %1 Payload: %2 Result: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(res);
// CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3);
// }
// // qDebug("Sent %d MQTT messages",MessagesList->size());
// }
// break;
// }
// case MQTT_QUEUE_MSG_MODE:
// {
// CCANDevice *Device = mCANDevicesList->at(j);
// QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
//#ifdef ENABLE_CHIPSET_DRIVER
// if(mCPUInterface != 0)
// {
// MessagesList->append(GetSystemStatusMessage());
// }
//#endif
// if(MessagesList != 0)
// {
// for(int i = 0; i < MessagesList->size(); i++)
// {
// CMQTTMessage *NewMsg = new CMQTTMessage(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload);
// if(mMQTTMessagesQueue.size() >= MQTT_CLIENT_MSG_QUEUE_SIZE)
// {
// delete mMQTTMessagesQueue.takeFirst();
// CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO pleine, retrait du premier message.","CMQTTClientWrapper",true,3);
// }
// mMQTTMessagesQueue.append(NewMsg);
// QString LogMsg = QString("Ajout d'un message MQTT à la FIFO. Topic: %1 Payload: %2 FIFO size: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(mMQTTMessagesQueue.size());
// CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3);
// mCircularBufferStatusText = QString("%2/%1 messages (0%3\%)").arg(MQTT_CLIENT_MSG_QUEUE_SIZE).arg(mMQTTMessagesQueue.size()).arg((mMQTTMessagesQueue.size()/MQTT_CLIENT_MSG_QUEUE_SIZE)*100);
// UpdateGUIBufferingStatus();
// }
// }
// break;
// }
// case MQTT_DROP_MSG_MODE://Just to please the compiler.
// break;
// }
// }
// mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
}
void CMQTTClientWrapper::MQTTReconnectTimerExpired()
{
if(mMQTTClient.state() == QMqttClient::Connected)
{
mMQTTReconnectTimer->stop(); //Already connected....
return;
}
DisconnectFromBroker();
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tentative de connexion au serveur MQTT...","CMQTTClientWrapper",true,1);
ConnectToBroker();
}
void CMQTTClientWrapper::MQTTQueueFlushTimerExipred()
{
#ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE
if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe
{
mMQTTQueueFlushTimer->stop();
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
return;
}
SendNextQueueMessage();
#endif
}
int CMQTTClientWrapper::SendNextQueueMessage()
{
if(mMQTTMessagesQueue.size() == 0)
{
//Should not happen, this is a logical problem with the program
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Erreur de logique dans SendNextQueueMessage(). MessageQueue vide!","CMQTTClientWrapper",true,2);
return RET_GENERAL_ERROR;
}
CMQTTMessage *Msg = mMQTTMessagesQueue.takeFirst();
mMQTTClient.publish(Msg->mMessageTopic,Msg->mMessagePayload.toLocal8Bit(),1,true);
QString LogMsg = QString("Envoi d'un message MQTT provenant du buffer. Topic: %1 Payload: %2 FIFO Size: %2").arg(Msg->mMessageTopic).arg(Msg->mMessagePayload).arg(mMQTTMessagesQueue.size());
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,"CMQTTClientWrapper",true,3);
delete Msg; //free memory
if(mMQTTMessagesQueue.isEmpty())
{
#ifdef MQTT_USE_TIMER_TO_FLUSH_QUEUE
mMQTTQueueFlushTimer->stop();
#endif
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tous les messages MQTT de la FIFO ont été envoyés au serveur, passage en mode de transmission normal","CMQTTClientWrapper",true,2);
}
UpdateGUIBufferingStatus();
return RET_OK;
}
int CMQTTClientWrapper::UpdateGUIBufferingStatus()
{
if(mMessagesQueueMode == MQTT_QUEUE_MSG_MODE)
{
mBufferingModeText = "Buffering";
}
else
{
mBufferingModeText = "Transmitting";
}
mCircularBufferStatusText = QString("%1").arg(mMQTTMessagesQueue.size());
mProgramPtr->UpdateMQTTBufferingStatusRequest(mBufferingModeText,mCircularBufferStatusText,0);
return RET_OK;
}
void CMQTTClientWrapper::MQTTClientError(QMqttClient::ClientError error)
{
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(QString("Erreur du client MQTT: %1").arg(error),"CMQTTClientWrapper",true,2);
}
bool CMQTTClientWrapper::IsMQTTClientConnected()
{
if(mMQTTClient.state() != QMqttClient::Connected)
return false;
else
return true;
}
QString CMQTTClientWrapper::GetMQTTClientConnectionState()
{
QString StateText;
if(mMQTTClient.state() == QMqttClient::Connected)
{
StateText = QString("%1 : Connecté").arg(mMQTTParams.mMQTTBrokerHostName);
}
else if(mMQTTClient.state() == QMqttClient::Connecting)
{
StateText = QString("%1 : En connexion...").arg(mMQTTParams.mMQTTBrokerHostName);
}
else
{
StateText = QString("%1 : Déconnecté").arg(mMQTTParams.mMQTTBrokerHostName);
}
return StateText;
}
//This function is used only when flushing the msg queue
void CMQTTClientWrapper::MQTTMessageSent(qint32 MsgID)
{
Q_UNUSED(MsgID)
// qDebug("MQTT msg ID %d sent",MsgID);
#ifndef MQTT_USE_TIMER_TO_FLUSH_QUEUE
if(mMessagesQueueMode != MQTT_QUEUE_MSG_MODE)
{
return;
}
if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe
{
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
return;
}
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Message MQTT de la FIFO transmis, transmission du prochain...","CMQTTClientWrapper",true,3);
SendNextQueueMessage();
#endif
}
void CMQTTClientWrapper::MQTTMessageStatusChanged(qint32 id, QMqtt::MessageStatus s, const QMqttMessageStatusProperties &properties)
{
qDebug("Mqtt %d message status changed",id);
}
CMQTTMessage CMQTTClientWrapper::GetSystemStatusMessage()
{
#ifdef ENABLE_CHIPSET_DRIVER
if(mCPUInterface != 0)
{
CComputerBoardState tmp = mCPUInterface->GetComputerBoardState();
QString MsgPayload = "{";
QString MsgTopic = mMQTTParams.mMQTTTopicPrefix + "GeneralPCStatus";
//Millisecs to EPOCH timestamp format
qint64 TimeStamp = QDateTime::currentDateTime().toMSecsSinceEpoch();
MsgPayload.append(QString("\"timestamp\":%1,").arg(TimeStamp));
MsgPayload.append(QString("\"CPU Temperature\":%1,").arg(tmp.mSystemTemperature));
MsgPayload.append(QString("\"Board 12V\":%1,").arg(tmp.mSystem12V));
MsgPayload.append(QString("\"IO Port\":%1").arg(tmp.mIOPortState));
MsgPayload.append("}");
// qDebug("%s",qPrintable(MsgPayload));
CMQTTMessage NewMessage(MsgTopic, MsgPayload);
return NewMessage;
}
else
{
return CMQTTMessage();
}
#else
return CMQTTMessage();
#endif
}
#ifdef ENABLE_DEVELOPMENT_DEBUG_TOOLS
int CMQTTClientWrapper::ForceMQTTClientDisconnection(bool Disconnect)
{
if(Disconnect == true)
{
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Forcing MQTT client disconnection for debug tests...","CMQTTClientWrapper",true,3);
mMQTTDisconnectionForced = true;
DisconnectFromBroker();
}
else
{
mMQTTDisconnectionForced = false;
if(mMessagesQueueMode != MQTT_TRANSMIT_MSG_MODE)
{
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Reconnecting MQTT client after being forced OFF...","CMQTTClientWrapper",true,3);
ConnectToBroker();
}
}
}
#endif