Début implémentation buffering des messages MQTT lors d'un blackout internet

This commit is contained in:
jfmartel 2023-06-28 13:59:58 -04:00
parent 7bbf2cb812
commit 3a44111079
6 changed files with 197 additions and 10 deletions

View File

@ -0,0 +1,97 @@
VERSION ""
NS_ :
NS_DESC_
CM_
BA_DEF_
BA_
VAL_
CAT_DEF_
CAT_
FILTER
BA_DEF_DEF_
EV_DATA_
ENVVAR_DATA_
SGTYPE_
SGTYPE_VAL_
BA_DEF_SGTYPE_
BA_SGTYPE_
SIG_TYPE_REF_
VAL_TABLE_
SIG_GROUP_
SIG_VALTYPE_
SIGTYPE_VALTYPE_
BO_TX_BU_
BA_DEF_REL_
BA_REL_
BA_DEF_DEF_REL_
BU_SG_REL_
BU_EV_REL_
BU_BO_REL_
SG_MUL_VAL_
BS_:
BU_: Parker_watchdog Parker_master PC
BO_ 304 temperatures1: 8 Parker_master
SG_ outside_container_temp : 8|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ inside_container_temp : 0|8@1- (1,0) [0|0] "°C" Vector__XXX
BO_ 1280 AC_chargers: 8 Parker_master
SG_ AC_chargers_power : 0|8@1+ (0.25,0) [0|0] "kW" Vector__XXX
BO_ 1024 Power_Inverter: 8 Parker_master
SG_ main_inverter_hotest_sensor : 16|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ power_inverter_DC_power : 0|16@1- (0.1,0) [0|0] "kW" Vector__XXX
BO_ 263 stationESS3: 8 Parker_master
SG_ ESS_total_power : 0|16@1- (0.1,0) [0|0] "kW" Vector__XXX
BO_ 288 LowVoltageStatus: 8 Parker_master
SG_ DCDC2outputCurrent : 32|8@1+ (1,0) [0|0] "A" Vector__XXX
SG_ DCDC1outputCurrent : 24|8@1+ (1,0) [0|0] "A" Vector__XXX
SG_ ESS2_12VBatteryVoltage : 16|8@1+ (0.1,0) [0|25.5] "V" Vector__XXX
SG_ ESS1_12VBatteryVoltage : 8|8@1+ (0.1,0) [0|25.5] "V" Vector__XXX
SG_ HouseBatteryVoltage : 0|8@1+ (0.1,0) [0|0] "V" Vector__XXX
BO_ 262 stationESS2: 8 Parker_master
SG_ batt4_low_cell_t : 56|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt3_low_cell_t : 48|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt2_low_cell_t : 40|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt1_low_cell_t : 32|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt4_high_cell_t : 24|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt3_high_cell_t : 16|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt2_high_cell_t : 8|8@1- (1,0) [0|0] "°C" Vector__XXX
SG_ batt1_high_cell_t : 0|8@1- (1,0) [0|0] "°C" Vector__XXX
BO_ 768 PC_dateTime: 8 PC
BO_ 512 WDT_state: 8 Parker_watchdog
BO_ 277 station_state1: 8 Parker_master
BO_ 272 station_ver: 8 Parker_master
BO_ 261 stationESS1: 8 Parker_master
SG_ DC_bus_voltage : 48|16@1+ (0.1,0) [0|0] "V" Vector__XXX
SG_ summed_ESS_current : 32|16@1- (0.1,0) [0|0] "A" Vector__XXX
SG_ SOC_battery4 : 24|8@1+ (0.5,0) [0|127.5] "%" Vector__XXX
SG_ SOC_battery3 : 16|8@1+ (0.5,0) [0|127.5] "%" Vector__XXX
SG_ SOC_battery2 : 8|8@1+ (0.5,0) [0|127.5] "%" Vector__XXX
SG_ SOC_battery1 : 0|8@1+ (0.5,0) [0|127.5] "%" Vector__XXX
BO_ 256 parker_dateTime: 8 Parker_master
SG_ parker_time : 16|16@1+ (0.001,0) [0|0] "h" Vector__XXX
SG_ parker_date : 0|16@1+ (1,0) [0|0] "d" Vector__XXX
CM_ SG_ 263 ESS_total_power "positive values = battery charging";
CM_ SG_ 256 parker_time "hours elapsed since midnight, for example 18.25";
CM_ SG_ 256 parker_date "number of days that have passed since 1899-12-30";
BA_DEF_ "BusType" STRING ;
BA_DEF_DEF_ "BusType" "";

View File

@ -12,7 +12,15 @@ CMQTTClientWrapper::CMQTTClientWrapper()
mMQTTReconnectTimer = new QTimer; mMQTTReconnectTimer = new QTimer;
mMQTTReconnectTimer->setSingleShot(true); mMQTTReconnectTimer->setSingleShot(true);
connect(mMQTTReconnectTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTReconnectTimerExpired); connect(mMQTTReconnectTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTReconnectTimerExpired);
mMQTTQueueFlushTimer = new QTimer;
mMQTTQueueFlushTimer->setSingleShot(false);
connect(mMQTTQueueFlushTimer,&QTimer::timeout,this,&CMQTTClientWrapper::MQTTQueueFlushTimerExipred);
mMQTTQueueFlushTimer->stop();
mProgramPtr = 0; mProgramPtr = 0;
mMessagesQueueMode = MQTT_DROP_MSG_MODE;
mDisconnectionIsVoluntary = false;
} }
CMQTTClientWrapper::~CMQTTClientWrapper() CMQTTClientWrapper::~CMQTTClientWrapper()
@ -37,6 +45,7 @@ int CMQTTClientWrapper::ConnectToBroker()
mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort); mMQTTClient.setPort(mMQTTParams.mMQTTBrokerPort);
mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword); mMQTTClient.setPassword(mMQTTParams.mMQTTBrokerPassword);
mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName); mMQTTClient.setUsername(mMQTTParams.mMQTTBrokerUserName);
mDisconnectionIsVoluntary = false;
mMQTTClient.connectToHost(); mMQTTClient.connectToHost();
@ -46,6 +55,9 @@ int CMQTTClientWrapper::ConnectToBroker()
int CMQTTClientWrapper::DisconnectFromBroker() int CMQTTClientWrapper::DisconnectFromBroker()
{ {
mMQTTClient.disconnectFromHost(); mMQTTClient.disconnectFromHost();
mDisconnectionIsVoluntary = true;
mMessagesQueueMode = MQTT_DROP_MSG_MODE; //It's a voluntary disconnection... don't queue the CAN messages.
return RET_OK; return RET_OK;
} }
@ -89,6 +101,11 @@ void CMQTTClientWrapper::StateChanged()
mProgramPtr->SetMQTTConnectionSatusRequest(false); mProgramPtr->SetMQTTConnectionSatusRequest(false);
mMQTTRefreshTimer->stop(); mMQTTRefreshTimer->stop();
mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT); mMQTTReconnectTimer->start(MQTT_CLIENT_RECONNECT_TIMEOUT);
mMQTTQueueFlushTimer->stop();
if(mDisconnectionIsVoluntary == false)
{
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //We're disconnected, queue all the messages.
}
break; break;
} }
case QMqttClient::Connected: case QMqttClient::Connected:
@ -96,6 +113,16 @@ void CMQTTClientWrapper::StateChanged()
mProgramPtr->SetMQTTConnectionSatusRequest(true); mProgramPtr->SetMQTTConnectionSatusRequest(true);
mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
mMQTTReconnectTimer->stop(); mMQTTReconnectTimer->stop();
if(mMQTTMessagesQueue.isEmpty() == false)
{
mMessagesQueueMode = MQTT_QUEUE_MSG_MODE; //Stay in (or enter) queue mode until we empty the buffer
mMQTTQueueFlushTimer->start();
}
else
{
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
}
qDebug("MQTT client Connected"); qDebug("MQTT client Connected");
break; break;
} }
@ -122,7 +149,7 @@ int CMQTTClientWrapper::SetCANDevicesList(QList<CCANDevice *> *List)
void CMQTTClientWrapper::MQTTSendTimerExpired() void CMQTTClientWrapper::MQTTSendTimerExpired()
{ {
if(mMQTTClient.state() != QMqttClient::Connected) if(mMessagesQueueMode == MQTT_DROP_MSG_MODE)
{ {
return; return;
} }
@ -133,6 +160,10 @@ void CMQTTClientWrapper::MQTTSendTimerExpired()
//Send the CANbus devices messsages //Send the CANbus devices messsages
for(int j = 0; j < mCANDevicesList->size(); j++) for(int j = 0; j < mCANDevicesList->size(); j++)
{
switch(mMessagesQueueMode)
{
case MQTT_TRANSMIT_MSG_MODE:
{ {
CCANDevice *Device = mCANDevicesList->at(j); CCANDevice *Device = mCANDevicesList->at(j);
QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList(); QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
@ -148,6 +179,24 @@ void CMQTTClientWrapper::MQTTSendTimerExpired()
} }
qDebug("Sent %d MQTT messages",MessagesList->size()); qDebug("Sent %d MQTT messages",MessagesList->size());
} }
break;
}
case MQTT_QUEUE_MSG_MODE:
{
CCANDevice *Device = mCANDevicesList->at(j);
QList<CMQTTMessage> *MessagesList = Device->GetMQTTMessagesList();
for(int i = 0; i < MessagesList->size(); i++)
{
CMQTTMessage *NewMsg = new CMQTTMessage(MessagesList->at(i).mMessageTopic,MessagesList->at(i).mMessagePayload);
if(mMQTTMessagesQueue.size() >= MQTT_CLIENT_MSG_QUEUE_SIZE)
{
delete mMQTTMessagesQueue.takeFirst();
}
mMQTTMessagesQueue.append(NewMsg);
}
break;
}
}
} }
mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout); mMQTTRefreshTimer->start(mMQTTParams.mMQTTTransmitTimeout);
@ -158,3 +207,28 @@ void CMQTTClientWrapper::MQTTReconnectTimerExpired()
{ {
ConnectToBroker(); ConnectToBroker();
} }
void CMQTTClientWrapper::MQTTQueueFlushTimerExipred()
{
if(mMQTTMessagesQueue.isEmpty()) //Shouldn't happen... but just to be safe
{
mMQTTQueueFlushTimer->stop();
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
return;
}
CMQTTMessage *Msg = mMQTTMessagesQueue.takeFirst();
qint32 res = mMQTTClient.publish(Msg->mMessageTopic,Msg->mMessagePayload.toLocal8Bit(),0,true);
qDebug("Flushing MQTT Msg queue... %s : %s",qPrintable(Msg->mMessageTopic), qPrintable(Msg->mMessagePayload));
QString LogMsg = QString("Envoi d'un message MQTT de la queue. Topic: %1 Payload: %2").arg(Msg->mMessageTopic).arg(Msg->mMessagePayload);
CGeneralMessagesLogDispatcher::instance()->AddLogMessage(LogMsg,true,3);
delete Msg;
if(mMQTTMessagesQueue.isEmpty())
{
mMQTTQueueFlushTimer->stop();
mMessagesQueueMode = MQTT_TRANSMIT_MSG_MODE;
}
}

View File

@ -17,6 +17,13 @@ class CMQTTClientWrapper : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
typedef enum
{
MQTT_DROP_MSG_MODE,
MQTT_TRANSMIT_MSG_MODE,
MQTT_QUEUE_MSG_MODE
}eMQTTMsgQueuingMode;
CMQTTClientWrapper(); CMQTTClientWrapper();
~CMQTTClientWrapper(); ~CMQTTClientWrapper();
int SetMQTTParams(CCloudParams *Params); int SetMQTTParams(CCloudParams *Params);
@ -30,17 +37,22 @@ public:
// QString mMQTTClientID; // QString mMQTTClientID;
QTimer *mMQTTRefreshTimer; QTimer *mMQTTRefreshTimer;
QTimer *mMQTTReconnectTimer; QTimer *mMQTTReconnectTimer;
QTimer *mMQTTQueueFlushTimer;
eMQTTMsgQueuingMode mMessagesQueueMode;
bool mDisconnectionIsVoluntary;
private: private:
QMqttClient mMQTTClient; QMqttClient mMQTTClient;
CCloudParams mMQTTParams; CCloudParams mMQTTParams;
QList<CCANDevice*> *mCANDevicesList; QList<CCANDevice*> *mCANDevicesList;
QList<CMQTTMessage*> mMQTTMessagesQueue;
public slots: public slots:
void StateChanged(); void StateChanged();
void MQTTSendTimerExpired(); void MQTTSendTimerExpired();
void MQTTReconnectTimerExpired(); void MQTTReconnectTimerExpired();
void MQTTQueueFlushTimerExipred();
}; };

View File

@ -5,6 +5,8 @@
#define GENERAL_MESSAGES_MAX_LOG_LINES 5000 //The number of lines of general status log we keep in the general status window (avoids fucking up because you know.... RAM) #define GENERAL_MESSAGES_MAX_LOG_LINES 5000 //The number of lines of general status log we keep in the general status window (avoids fucking up because you know.... RAM)
#define MQTT_CLIENT_RECONNECT_TIMEOUT 5000 //Time in ms between reconnect attemps to MQTT broker when connection is lost #define MQTT_CLIENT_RECONNECT_TIMEOUT 5000 //Time in ms between reconnect attemps to MQTT broker when connection is lost
#define MQTT_CLIENT_MSG_QUEUE_FLUSH_TIMEOUT 500 //The delay in ms between the TX messages when flushing the msg queue after a server reconnection
#define MQTT_CLIENT_MSG_QUEUE_SIZE 1000 //The maximum number of messages to be queued. Older messages are lost when adding a new one in a full queue
//#define USE_TOPIC_PREFIX //#define USE_TOPIC_PREFIX
#define FORCE_FLOAT_REPRESENTATION #define FORCE_FLOAT_REPRESENTATION

Binary file not shown.

View File

@ -14,4 +14,6 @@ x Valider pourquoi le MQTT ne se reconnecte pas après une déconnexion
x Ajouter un affichage des payloads JSON x Ajouter un affichage des payloads JSON
x Contrôler le niveau de log x Contrôler le niveau de log
x Assurer de ne pas avoir à redémarrer le soft quand on change un paramètre CAN (CAN data repopulate) x Assurer de ne pas avoir à redémarrer le soft quand on change un paramètre CAN (CAN data repopulate)
- Implémenter un client/serveur
- Ajouter le buffering lors d'un blackout internet