From ec9e478b695fb01092002bd6b8d79d8f0ddf2136 Mon Sep 17 00:00:00 2001 From: strawmanbobi Date: Mon, 19 Feb 2024 15:20:42 +0800 Subject: [PATCH] implemented dual channel connection for iris-kit --- lib/AliyunIoTSDK/src/AliyunIoTSDK.cpp | 22 +++-------- lib/AliyunIoTSDK/src/AliyunIoTSDK.h | 19 +--------- src/aliot_client.cpp | 10 +++-- src/aliot_client.h | 5 ++- src/emq_client.cpp | 36 ++++++++---------- src/emq_client.h | 4 +- src/iot_hub.cpp | 53 ++++++++++++++++++++------- src/iot_hub.h | 15 ++++---- src/iris_kit.cpp | 2 +- 9 files changed, 83 insertions(+), 83 deletions(-) diff --git a/lib/AliyunIoTSDK/src/AliyunIoTSDK.cpp b/lib/AliyunIoTSDK/src/AliyunIoTSDK.cpp index 4ecc555..ecbc040 100644 --- a/lib/AliyunIoTSDK/src/AliyunIoTSDK.cpp +++ b/lib/AliyunIoTSDK/src/AliyunIoTSDK.cpp @@ -34,7 +34,6 @@ DeviceProperty PropertyMessageBuffer[MESSAGE_BUFFER_SIZE]; #define SHA256HMAC_SIZE 32 #define DATA_CALLBACK_SIZE 20 -#define SIGN_DEBUG_VERBOSE 1 #define MQTT_WAIT_GENERIC (10000) @@ -162,12 +161,15 @@ int AliyunIoTSDK::mqttCheckConnect() { return mqttStatus; } -int AliyunIoTSDK::begin(Client &espClient, +int AliyunIoTSDK::begin(PubSubClient &mqtt_client, const char *_productKey, const char *_deviceName, const char *_deviceSecret, const char *_region) { - client = new PubSubClient(espClient); + if (NULL == client) { + client = &mqtt_client; + } + productKey = _productKey; deviceName = _deviceName; deviceSecret = _deviceSecret; @@ -200,7 +202,7 @@ int AliyunIoTSDK::begin(Client &espClient, client->setServer(domain, MQTT_PORT); #if defined USE_STANDARD_THING_MODEL_TOPIC - client->setCallback(callback); + client.setCallback(callback); #endif Serial.print("INFO\tconnection check in begin\n"); return mqttCheckConnect(); @@ -237,18 +239,6 @@ void AliyunIoTSDK::sendEvent(const char *eventId) { sendEvent(eventId, "{}"); } -void AliyunIoTSDK::sendCustom(const char *topic, const char *eventBody) { - boolean d = client->publish(topic, eventBody); - Serial.print("INFO\tpublish:0 sucessfully:"); - Serial.println(d); -} - -void AliyunIoTSDK::sendCustomData(const char *topic, const uint8_t *data, int length) { - boolean d = client->publish(topic, data, length); - Serial.print("INFO\tpublish:0 sucessfully:"); - Serial.println(d); -} - boolean AliyunIoTSDK::subscribe(const char* topic, int qos) { return client->subscribe(topic, qos); } diff --git a/lib/AliyunIoTSDK/src/AliyunIoTSDK.h b/lib/AliyunIoTSDK/src/AliyunIoTSDK.h index 84fcc2d..62ea400 100644 --- a/lib/AliyunIoTSDK/src/AliyunIoTSDK.h +++ b/lib/AliyunIoTSDK/src/AliyunIoTSDK.h @@ -64,7 +64,7 @@ public: * @param _deviceSecret : AliyunIoT device secret * @param _region : AliyunIoT region */ - static int begin(Client &espClient, + static int begin(PubSubClient &mqtt_client, const char *_productKey, const char *_deviceName, const char *_deviceSecret, @@ -117,23 +117,6 @@ public: */ static void sendEvent(const char *eventId); - /** - * Send customized topic data - * - * @param topic : topic in string - * @param eventBody : event body in string - */ - static void sendCustom(const char *topic, const char *eventBody); - - /** - * Send customized topic payload data - * - * @param topic : topic in string - * @param data : data payload - * @param length : payload length - */ - static void sendCustomData(const char *topic, const uint8_t *data, int length); - /** * Subscribe MQTT topic for Aliot * diff --git a/src/aliot_client.cpp b/src/aliot_client.cpp index 34578f0..1023e04 100644 --- a/src/aliot_client.cpp +++ b/src/aliot_client.cpp @@ -55,10 +55,10 @@ static AliyunIoTSDK iot; // public function definitions -int connectToAliot() { +int connectToAliot(PubSubClient& mqtt_client) { String aliot_client_id = g_product_key + "." + g_device_name; - int res = iot.begin(wifi_client, g_product_key.c_str(), g_device_name.c_str(), g_device_token.c_str(), g_aliot_region.c_str()); + int res = iot.begin(mqtt_client, g_product_key.c_str(), g_device_name.c_str(), g_device_token.c_str(), g_aliot_region.c_str()); if (0 == res) { INFOLN("Aliyun IoT connected"); } else { @@ -67,6 +67,8 @@ int connectToAliot() { return res; } -void aliotKeepAlive() { +void aliotKeepAlive(PubSubClient& mqtt_client) { + (void) mqtt_client; iot.loop(); -} \ No newline at end of file +} + diff --git a/src/aliot_client.h b/src/aliot_client.h index b1e2bf1..1ebf351 100644 --- a/src/aliot_client.h +++ b/src/aliot_client.h @@ -22,12 +22,13 @@ */ #include +#include #ifndef IRIS_KIT_ALIOT_CLIENT_H #define IRIS_KIT_ALIOT_CLIENT_H -int connectToAliot(); +int connectToAliot(PubSubClient& mqtt_client); -void aliotKeepAlive(); +void aliotKeepAlive(PubSubClient& mqtt_client); #endif // IRIS_KIT_ALIOT_CLIENT_H \ No newline at end of file diff --git a/src/emq_client.cpp b/src/emq_client.cpp index 7569d80..a729a5b 100644 --- a/src/emq_client.cpp +++ b/src/emq_client.cpp @@ -46,7 +46,7 @@ extern int g_mqtt_port; // private variable definitions static bool force_disconnected = false; - +static PubSubClient* emqx_client = NULL; // private function declarations static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length); @@ -56,24 +56,25 @@ static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length); int connectToEMQXBroker(PubSubClient &mqtt_client) { int retry_times = 0; - mqtt_client.setServer(g_mqtt_server.c_str(), g_mqtt_port); - mqtt_client.setCallback(irisIrextIoTCallback); + if (NULL == emqx_client) { + emqx_client = &mqtt_client; + } + emqx_client->setServer(g_mqtt_server.c_str(), g_mqtt_port); force_disconnected = false; - while (!force_disconnected && !mqtt_client.connected() && retry_times < MQTT_RETRY_MAX) { + while (!force_disconnected && !emqx_client->connected() && retry_times < MQTT_RETRY_MAX) { INFOF("Connecting to MQTT Broker as %s.....\n", g_mqtt_client_id.c_str()); - if (mqtt_client.connect(g_mqtt_client_id.c_str(), g_mqtt_user_name.c_str(), g_mqtt_password.c_str())) { + if (emqx_client->connect(g_mqtt_client_id.c_str(), g_mqtt_user_name.c_str(), g_mqtt_password.c_str())) { INFOF("Connected to MQTT broker\n"); - mqtt_client.subscribe(g_downstream_topic.c_str()); } else { - ERRORF("Failed to connect to MQTT broker, rc = %d\n", mqtt_client.state()); + ERRORF("Failed to connect to MQTT broker, rc = %d\n", emqx_client->state()); INFOF("Try again in 5 seconds\n"); retry_times++; delay(MQTT_RETRY_DELAY); } } - if (mqtt_client.connected()) { + if (emqx_client->connected()) { INFOF("IRext IoT connect done\n"); return 0; } else { @@ -82,17 +83,12 @@ int connectToEMQXBroker(PubSubClient &mqtt_client) { } } -int disconnectFromEMQXBroker(PubSubClient &mqtt_client) { +void emqxClientKeepAlive() { + emqx_client->loop(); +} + +int disconnectFromEMQXBroker() { force_disconnected = true; - mqtt_client.disconnect(); + emqx_client->disconnect(); return 0; -} - - -// private function definitions -static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length) { - INFOF("downstream message received, topic = %s, length = %d\n", topic, length); - if (NULL != g_downstream_topic.c_str() && 0 == strcmp(topic, g_downstream_topic.c_str())) { - handleIrisKitMessage((const char*) data, length); - } -} +} \ No newline at end of file diff --git a/src/emq_client.h b/src/emq_client.h index dd3c909..fe92a98 100644 --- a/src/emq_client.h +++ b/src/emq_client.h @@ -29,6 +29,8 @@ int connectToEMQXBroker(PubSubClient &mqtt_client); -int disconnectFromEMQXBroker(PubSubClient &mqtt_client); +int disconnectFromEMQXBroker(); + +void emqxClientKeepAlive(); #endif // IRIS_KIT_EMQ_CLIENT_H \ No newline at end of file diff --git a/src/iot_hub.cpp b/src/iot_hub.cpp index 41bbe32..b8c9b66 100644 --- a/src/iot_hub.cpp +++ b/src/iot_hub.cpp @@ -58,11 +58,11 @@ String g_aliot_instance_id = "iot-060a2sie"; int g_mqtt_port = 1883; int g_app_id = 0; - +mqtt_type_t g_mqtt_type = MQTT_TYPE_MAX; +boolean g_subscribed = false; // private variable definitions static bool downstream_topic_subscribed = false; -static ep_state_t endpoint_state = FSM_IDLE; // private function declarations @@ -70,7 +70,7 @@ static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length); static int iot_retry = 0; -static PubSubClient mqtt_client(wifi_client); +static PubSubClient g_mqtt_client(wifi_client); // public function definitions @@ -95,13 +95,19 @@ int connectToIrextIoT() { INFOF("Try connecting to AliyunIoT, product_key = %s, device_name = %s, device_secret = %s\n", g_product_key.c_str(), g_device_name.c_str(), g_device_token.c_str()); - conn_ret = connectToAliot(); + conn_ret = connectToAliot(g_mqtt_client); if (0 != conn_ret) { INFOF("Try connecting to IRext IoT %s:%d, client_id = %s, user_name = %s, password.size = %d\n", g_mqtt_server.c_str(), g_mqtt_port, g_mqtt_client_id.c_str(), g_mqtt_user_name.c_str(), g_mqtt_password.length()); - conn_ret = connectToEMQXBroker(mqtt_client); + conn_ret = connectToEMQXBroker(g_mqtt_client); + if (0 == conn_ret) { + g_mqtt_type = MQTT_TYPE_EMQX; + } + } else { + g_mqtt_type = MQTT_TYPE_ALIOT; + } if (0 != conn_ret) { @@ -110,30 +116,51 @@ int connectToIrextIoT() { return -1; } + if (!g_subscribed) { + g_mqtt_client.setCallback(irisIoTCallback); + g_mqtt_client.subscribe(g_downstream_topic.c_str()); + g_subscribed = true; + } + // send connect request sendIrisKitConnect(); - return 0; + return conn_ret; } void irextIoTKeepAlive() { - if (!mqtt_client.connected()) { - connectToEMQXBroker(mqtt_client); + if (MQTT_TYPE_ALIOT == g_mqtt_type) { + aliotKeepAlive(g_mqtt_client); + } else if (MQTT_TYPE_EMQX == g_mqtt_type) { + emqxClientKeepAlive(); + } + + if (!g_mqtt_client.connected()) { + g_mqtt_client.unsubscribe(g_downstream_topic.c_str()); + g_subscribed = false; + connectToIrextIoT(); } - mqtt_client.loop(); } // not only for IRIS related topic based session void sendData(const char* topic, const uint8_t *data, int length) { - mqtt_client.publish(topic, data, length); + g_mqtt_client.publish(topic, data, length); } void* getSession() { - return &mqtt_client; + return &g_mqtt_client; } -void checkIrextIoT() { - if (mqtt_client.connected()) { +void checkIrisIoT() { + if (g_mqtt_client.connected()) { + INFOLN("send iris kit heart beat"); sendIrisKitHeartBeat(); } } + +void irisIoTCallback(char *topic, uint8_t *data, uint32_t length) { + INFOF("downstream message received, topic = %s, length = %d\n", topic, length); + if (NULL != g_downstream_topic.c_str() && 0 == strcmp(topic, g_downstream_topic.c_str())) { + handleIrisKitMessage((const char*) data, length); + } +} \ No newline at end of file diff --git a/src/iot_hub.h b/src/iot_hub.h index 1247ed1..6f2c6e5 100644 --- a/src/iot_hub.h +++ b/src/iot_hub.h @@ -33,24 +33,23 @@ #define TOPIC_DOWNSTREAM_REL "/user/iris_kit_downstream" #define TOPIC_UPSTREAM_REL "/user/iris_kit_upstream" -typedef enum { - FSM_IDLE = 0, - FSM_CONNECTED = 1, - FSM_ACTIVE = 2, - FSM_MAX = 7, -} ep_state_t; +typedef enum { + MQTT_TYPE_ALIOT = 0, + MQTT_TYPE_EMQX = 1, + MQTT_TYPE_MAX = 7, +} mqtt_type_t; int connectToIrextIoT(); void irextIoTKeepAlive(); -void checkIrextIoT(); +void checkIrisIoT(); void* getSession(); void sendData(const char* topic, const uint8_t *data, int length); -int securityPublish(const char *topic, const uint8_t *message, size_t msg_size, void *channel); +void irisIoTCallback(char *topic, uint8_t *data, uint32_t length); #endif // IRIS_KIT_IOT_H \ No newline at end of file diff --git a/src/iris_kit.cpp b/src/iris_kit.cpp index 1decf70..b05dcbc 100644 --- a/src/iris_kit.cpp +++ b/src/iris_kit.cpp @@ -223,7 +223,7 @@ void setup() { connectToIrextIoT(); - iotCheckTask.attach_scheduled(MQTT_CHECK_INTERVALS, checkIrextIoT); + iotCheckTask.attach_scheduled(MQTT_CHECK_INTERVALS, checkIrisIoT); disableIRTask.attach_scheduled(DISABLE_SIGNAL_INTERVALS, disableIR); }