258 lines
8.9 KiB
C++
258 lines
8.9 KiB
C++
#include "MQTTClientWrapper.h"
|
|
#include "OtarcikCan.h"
|
|
#include "CANDevice.h"
|
|
|
|
CMQTTClientWrapper::CMQTTClientWrapper()
|
|
{
|
|
connect(&mMQTTClient,&QMqttClient::stateChanged,this,&CMQTTClientWrapper::StateChanged);
|
|
mMQTTRefreshTimer = new QTimer;
|
|
mMQTTRefreshTimer->setSingleShot(true);
|
|
connect(mMQTTRefreshTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTSendTimerExpired);
|
|
|
|
mMQTTReconnectTimer = new QTimer;
|
|
mMQTTReconnectTimer->setSingleShot(true);
|
|
connect(mMQTTReconnectTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTReconnectTimerExpired);
|
|
|
|
mMQTTQueueFlushTimer = new QTimer;
|
|
mMQTTQueueFlushTimer->setSingleShot(false);
|
|
mMQTTQueueFlushTimer->setInterval(MQTT_CLIENT_MSG_QUEUE_FLUSH_TIMEOUT);
|
|
connect(mMQTTQueueFlushTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTQueueFlushTimerExipred);
|
|
mMQTTQueueFlushTimer->stop();
|
|
|
|
mProgramPtr = 0;
|
|
mMessagesQueueMode = MQTT_DROP_MSG_MODE;
|
|
mDisconnectionIsVoluntary = false;
|
|
mIsClientConnecting = false;
|
|
}
|
|
|
|
CMQTTClientWrapper::~CMQTTClientWrapper()
|
|
{
|
|
delete mMQTTRefreshTimer;
|
|
}
|
|
|
|
int CMQTTClientWrapper::SetMQTTParams(CCloudParams *Params)
|
|
{
|
|
mMQTTParams = *Params;
|
|
// mMQTTClientID = "Otarcik_Station_Ville_Emard";
|
|
|
|
return RET_OK;
|
|
}
|
|
|
|
int CMQTTClientWrapper::StartMQTTClient()
|
|
{
|
|
mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
|
|
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE;
|
|
ConnectToBroker();
|
|
return RET_OK;
|
|
}
|
|
|
|
int CMQTTClientWrapper::ConnectToBroker()
|
|
{
|
|
//Setup the client before connecting.
|
|
mMQTTClient.setAutoKeepAlive(true);
|
|
mMQTTClient.setClientId(mMQTTParams.mMQTTClientID);
|
|
mMQTTClient.setHostname(mMQTTParams.mMQTTBrokerHostName);
|
|
mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort);
|
|
mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword);
|
|
mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName);
|
|
mDisconnectionIsVoluntary = false;
|
|
|
|
mMQTTClient.connectToHost();
|
|
|
|
return RET_OK;
|
|
}
|
|
|
|
int CMQTTClientWrapper::DisconnectFromBroker()
|
|
{
|
|
mMQTTClient.disconnectFromHost();
|
|
mDisconnectionIsVoluntary = true;
|
|
mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages.
|
|
|
|
return RET_OK;
|
|
}
|
|
|
|
int CMQTTClientWrapper::NewMQTTMessages(QList<CMQTTMessage> MessagesList)
|
|
{
|
|
// if(mMQTTClient.state() != QMqttClient::Connected)
|
|
// {
|
|
// return RET_GENERAL_ERROR;
|
|
// }
|
|
// if(mMQTTRefreshTimer->remainingTime() > 0)
|
|
// {
|
|
// // qDebug("MQTT timer value: %d",mMQTTRefreshTimer->remainingTime());
|
|
// return RET_OK;
|
|
// }
|
|
// if(MessagesList.isEmpty())
|
|
// {
|
|
// return RET_GENERAL_ERROR;
|
|
// }
|
|
|
|
|
|
// for(int i = 0; i < MessagesList.size(); i++)
|
|
// {
|
|
// qint32 res = mMQTTClient.publish(MessagesList.at(i).mMessageTopic,MessagesList.at(i).mMessagePayload.toLocal8Bit(),0,true);
|
|
// qDebug("%s : %s",qPrintable(MessagesList.at(i).mMessageTopic), qPrintable(MessagesList.at(i).mMessagePayload));
|
|
|
|
// }
|
|
// mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
|
|
// qDebug("Sent %d MQTT messages",MessagesList.size());
|
|
|
|
|
|
return RET_OK;
|
|
}
|
|
|
|
void CMQTTClientWrapper::StateChanged()
|
|
{
|
|
switch(mMQTTClient.state())
|
|
{
|
|
case QMqttClient::Disconnected:
|
|
{
|
|
if(mIsClientConnecting)
|
|
{
|
|
//Connection attempt failed, just restart the timer...
|
|
mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT);
|
|
}
|
|
else
|
|
{
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT déconnecté.",true,1);
|
|
mProgramPtr->SetMQTTConnectionSatusRequest(false);
|
|
mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT);
|
|
mMQTTQueueFlushTimer->stop();
|
|
if(mDisconnectionIsVoluntary == false)
|
|
{
|
|
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //We're disconnected, queue all the messages.
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Passage en mode buffering des messages MQTT",true,2);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
case QMqttClient::Connected:
|
|
{
|
|
mProgramPtr->SetMQTTConnectionSatusRequest(true);
|
|
mMQTTReconnectTimer->stop();
|
|
mIsClientConnecting = false;
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Client MQTT connecté.",true,1);
|
|
if(mMQTTMessagesQueue.isEmpty() == false)
|
|
{
|
|
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //Stay in (or enter) queue mode until we empty the buffer
|
|
mMQTTQueueFlushTimer->start();
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("FIFO non vide, passage au mode de vidage de la FIFO",true,2);
|
|
}
|
|
else
|
|
{
|
|
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
|
|
}
|
|
break;
|
|
}
|
|
case QMqttClient::Connecting:
|
|
{
|
|
mIsClientConnecting = true;
|
|
qDebug("MQTT client Connecting...");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
int CMQTTClientWrapper::SendMQTTTestMessage()
|
|
{
|
|
mMQTTClient.publish(QString("Otarcik/Test"),QString("Test Payload").toLocal8Bit(),0,true);
|
|
qDebug("Sending MQTT test message");
|
|
return RET_OK;
|
|
}
|
|
|
|
int CMQTTClientWrapper::SetCANDevicesList(QList<CCANDevice *> *List)
|
|
{
|
|
mCANDevicesList = List;
|
|
return RET_OK;
|
|
}
|
|
|
|
void CMQTTClientWrapper::MQTTSendTimerExpired()
|
|
{
|
|
if(mMessagesQueueMode == MQTT_DROP_MSG_MODE)
|
|
{
|
|
return;
|
|
}
|
|
if(mCANDevicesList == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
//Send the CANbus devices messsages
|
|
for(int j = 0; j < mCANDevicesList->size(); j++)
|
|
{
|
|
switch(mMessagesQueueMode)
|
|
{
|
|
case MQTT_TRANSMIT_MSG_MODE:
|
|
{
|
|
CCANDevice *Device = mCANDevicesList->at(j);
|
|
QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
|
|
if(MessagesList != 0)
|
|
{
|
|
for(int i = 0; i < MessagesList->size(); i++)
|
|
{
|
|
qint32 res = mMQTTClient.publish(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload.toLocal8Bit(),0,true);
|
|
QString LogMsg = QString("Envoi d'un message MQTT. Topic: %1 Payload: %2").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload);
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,true,3);
|
|
|
|
}
|
|
qDebug("Sent %d MQTT messages",MessagesList->size());
|
|
}
|
|
break;
|
|
}
|
|
case MQTT_QUEUE_MSG_MODE:
|
|
{
|
|
CCANDevice *Device = mCANDevicesList->at(j);
|
|
QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
|
|
if(MessagesList != 0)
|
|
{
|
|
for(int i = 0; i < MessagesList->size(); i++)
|
|
{
|
|
CMQTTMessage *NewMsg = new CMQTTMessage(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload);
|
|
if(mMQTTMessagesQueue.size() >= MQTT_CLIENT_MSG_QUEUE_SIZE)
|
|
{
|
|
delete mMQTTMessagesQueue.takeFirst();
|
|
}
|
|
mMQTTMessagesQueue.append(NewMsg);
|
|
QString LogMsg = QString("Ajout d'un message MQTT à la FIFO. Topic: %1 Payload: %2 FIFO size: %3").arg(MessagesList->at(i).mMessageTopic).arg(MessagesList->at(i).mMessagePayload).arg(mMQTTMessagesQueue.size());
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,true,3);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
|
|
|
|
}
|
|
|
|
void CMQTTClientWrapper::MQTTReconnectTimerExpired()
|
|
{
|
|
ConnectToBroker();
|
|
}
|
|
|
|
void CMQTTClientWrapper::MQTTQueueFlushTimerExipred()
|
|
{
|
|
if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe
|
|
{
|
|
mMQTTQueueFlushTimer->stop();
|
|
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
|
|
return;
|
|
}
|
|
|
|
CMQTTMessage *Msg = mMQTTMessagesQueue.takeFirst();
|
|
qint32 res = mMQTTClient.publish(Msg->mMessageTopic,Msg->mMessagePayload.toLocal8Bit(),0,true);
|
|
QString LogMsg = QString("Envoi d'un message MQTT provenant du buffer. Topic: %1 Payload: %2 Buffer Size: %3").arg(Msg->mMessageTopic).arg(Msg->mMessagePayload).arg(mMQTTMessagesQueue.size());
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,true,3);
|
|
|
|
delete Msg; //free memory
|
|
|
|
if(mMQTTMessagesQueue.isEmpty())
|
|
{
|
|
mMQTTQueueFlushTimer->stop();
|
|
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
|
|
CGeneralMessagesLogDispatcher::instance()->AddLogMessage("Tous les messages MQTT de la FIFO ont été envoyés au serveur",true,2);
|
|
}
|
|
|
|
}
|