implemented dual channel connection for iris-kit

This commit is contained in:
strawmanbobi
2024-02-19 15:20:42 +08:00
parent 47b4c31146
commit ec9e478b69
9 changed files with 83 additions and 83 deletions

View File

@@ -34,7 +34,6 @@ DeviceProperty PropertyMessageBuffer[MESSAGE_BUFFER_SIZE];
#define SHA256HMAC_SIZE 32
#define DATA_CALLBACK_SIZE 20
#define SIGN_DEBUG_VERBOSE 1
#define MQTT_WAIT_GENERIC (10000)
@@ -162,12 +161,15 @@ int AliyunIoTSDK::mqttCheckConnect() {
return mqttStatus;
}
int AliyunIoTSDK::begin(Client &espClient,
int AliyunIoTSDK::begin(PubSubClient &mqtt_client,
const char *_productKey,
const char *_deviceName,
const char *_deviceSecret,
const char *_region) {
client = new PubSubClient(espClient);
if (NULL == client) {
client = &mqtt_client;
}
productKey = _productKey;
deviceName = _deviceName;
deviceSecret = _deviceSecret;
@@ -200,7 +202,7 @@ int AliyunIoTSDK::begin(Client &espClient,
client->setServer(domain, MQTT_PORT);
#if defined USE_STANDARD_THING_MODEL_TOPIC
client->setCallback(callback);
client.setCallback(callback);
#endif
Serial.print("INFO\tconnection check in begin\n");
return mqttCheckConnect();
@@ -237,18 +239,6 @@ void AliyunIoTSDK::sendEvent(const char *eventId) {
sendEvent(eventId, "{}");
}
void AliyunIoTSDK::sendCustom(const char *topic, const char *eventBody) {
boolean d = client->publish(topic, eventBody);
Serial.print("INFO\tpublish:0 sucessfully:");
Serial.println(d);
}
void AliyunIoTSDK::sendCustomData(const char *topic, const uint8_t *data, int length) {
boolean d = client->publish(topic, data, length);
Serial.print("INFO\tpublish:0 sucessfully:");
Serial.println(d);
}
boolean AliyunIoTSDK::subscribe(const char* topic, int qos) {
return client->subscribe(topic, qos);
}

View File

@@ -64,7 +64,7 @@ public:
* @param _deviceSecret : AliyunIoT device secret
* @param _region : AliyunIoT region
*/
static int begin(Client &espClient,
static int begin(PubSubClient &mqtt_client,
const char *_productKey,
const char *_deviceName,
const char *_deviceSecret,
@@ -117,23 +117,6 @@ public:
*/
static void sendEvent(const char *eventId);
/**
* Send customized topic data
*
* @param topic : topic in string
* @param eventBody : event body in string
*/
static void sendCustom(const char *topic, const char *eventBody);
/**
* Send customized topic payload data
*
* @param topic : topic in string
* @param data : data payload
* @param length : payload length
*/
static void sendCustomData(const char *topic, const uint8_t *data, int length);
/**
* Subscribe MQTT topic for Aliot
*

View File

@@ -55,10 +55,10 @@ static AliyunIoTSDK iot;
// public function definitions
int connectToAliot() {
int connectToAliot(PubSubClient& mqtt_client) {
String aliot_client_id = g_product_key + "." + g_device_name;
int res = iot.begin(wifi_client, g_product_key.c_str(), g_device_name.c_str(), g_device_token.c_str(), g_aliot_region.c_str());
int res = iot.begin(mqtt_client, g_product_key.c_str(), g_device_name.c_str(), g_device_token.c_str(), g_aliot_region.c_str());
if (0 == res) {
INFOLN("Aliyun IoT connected");
} else {
@@ -67,6 +67,8 @@ int connectToAliot() {
return res;
}
void aliotKeepAlive() {
void aliotKeepAlive(PubSubClient& mqtt_client) {
(void) mqtt_client;
iot.loop();
}
}

View File

@@ -22,12 +22,13 @@
*/
#include <Arduino.h>
#include <PubSubClient.h>
#ifndef IRIS_KIT_ALIOT_CLIENT_H
#define IRIS_KIT_ALIOT_CLIENT_H
int connectToAliot();
int connectToAliot(PubSubClient& mqtt_client);
void aliotKeepAlive();
void aliotKeepAlive(PubSubClient& mqtt_client);
#endif // IRIS_KIT_ALIOT_CLIENT_H

View File

@@ -46,7 +46,7 @@ extern int g_mqtt_port;
// private variable definitions
static bool force_disconnected = false;
static PubSubClient* emqx_client = NULL;
// private function declarations
static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length);
@@ -56,24 +56,25 @@ static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length);
int connectToEMQXBroker(PubSubClient &mqtt_client) {
int retry_times = 0;
mqtt_client.setServer(g_mqtt_server.c_str(), g_mqtt_port);
mqtt_client.setCallback(irisIrextIoTCallback);
if (NULL == emqx_client) {
emqx_client = &mqtt_client;
}
emqx_client->setServer(g_mqtt_server.c_str(), g_mqtt_port);
force_disconnected = false;
while (!force_disconnected && !mqtt_client.connected() && retry_times < MQTT_RETRY_MAX) {
while (!force_disconnected && !emqx_client->connected() && retry_times < MQTT_RETRY_MAX) {
INFOF("Connecting to MQTT Broker as %s.....\n", g_mqtt_client_id.c_str());
if (mqtt_client.connect(g_mqtt_client_id.c_str(), g_mqtt_user_name.c_str(), g_mqtt_password.c_str())) {
if (emqx_client->connect(g_mqtt_client_id.c_str(), g_mqtt_user_name.c_str(), g_mqtt_password.c_str())) {
INFOF("Connected to MQTT broker\n");
mqtt_client.subscribe(g_downstream_topic.c_str());
} else {
ERRORF("Failed to connect to MQTT broker, rc = %d\n", mqtt_client.state());
ERRORF("Failed to connect to MQTT broker, rc = %d\n", emqx_client->state());
INFOF("Try again in 5 seconds\n");
retry_times++;
delay(MQTT_RETRY_DELAY);
}
}
if (mqtt_client.connected()) {
if (emqx_client->connected()) {
INFOF("IRext IoT connect done\n");
return 0;
} else {
@@ -82,17 +83,12 @@ int connectToEMQXBroker(PubSubClient &mqtt_client) {
}
}
int disconnectFromEMQXBroker(PubSubClient &mqtt_client) {
void emqxClientKeepAlive() {
emqx_client->loop();
}
int disconnectFromEMQXBroker() {
force_disconnected = true;
mqtt_client.disconnect();
emqx_client->disconnect();
return 0;
}
// private function definitions
static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length) {
INFOF("downstream message received, topic = %s, length = %d\n", topic, length);
if (NULL != g_downstream_topic.c_str() && 0 == strcmp(topic, g_downstream_topic.c_str())) {
handleIrisKitMessage((const char*) data, length);
}
}
}

View File

@@ -29,6 +29,8 @@
int connectToEMQXBroker(PubSubClient &mqtt_client);
int disconnectFromEMQXBroker(PubSubClient &mqtt_client);
int disconnectFromEMQXBroker();
void emqxClientKeepAlive();
#endif // IRIS_KIT_EMQ_CLIENT_H

View File

@@ -58,11 +58,11 @@ String g_aliot_instance_id = "iot-060a2sie";
int g_mqtt_port = 1883;
int g_app_id = 0;
mqtt_type_t g_mqtt_type = MQTT_TYPE_MAX;
boolean g_subscribed = false;
// private variable definitions
static bool downstream_topic_subscribed = false;
static ep_state_t endpoint_state = FSM_IDLE;
// private function declarations
@@ -70,7 +70,7 @@ static void irisIrextIoTCallback(char *topic, uint8_t *data, uint32_t length);
static int iot_retry = 0;
static PubSubClient mqtt_client(wifi_client);
static PubSubClient g_mqtt_client(wifi_client);
// public function definitions
@@ -95,13 +95,19 @@ int connectToIrextIoT() {
INFOF("Try connecting to AliyunIoT, product_key = %s, device_name = %s, device_secret = %s\n",
g_product_key.c_str(), g_device_name.c_str(), g_device_token.c_str());
conn_ret = connectToAliot();
conn_ret = connectToAliot(g_mqtt_client);
if (0 != conn_ret) {
INFOF("Try connecting to IRext IoT %s:%d, client_id = %s, user_name = %s, password.size = %d\n",
g_mqtt_server.c_str(), g_mqtt_port,
g_mqtt_client_id.c_str(), g_mqtt_user_name.c_str(), g_mqtt_password.length());
conn_ret = connectToEMQXBroker(mqtt_client);
conn_ret = connectToEMQXBroker(g_mqtt_client);
if (0 == conn_ret) {
g_mqtt_type = MQTT_TYPE_EMQX;
}
} else {
g_mqtt_type = MQTT_TYPE_ALIOT;
}
if (0 != conn_ret) {
@@ -110,30 +116,51 @@ int connectToIrextIoT() {
return -1;
}
if (!g_subscribed) {
g_mqtt_client.setCallback(irisIoTCallback);
g_mqtt_client.subscribe(g_downstream_topic.c_str());
g_subscribed = true;
}
// send connect request
sendIrisKitConnect();
return 0;
return conn_ret;
}
void irextIoTKeepAlive() {
if (!mqtt_client.connected()) {
connectToEMQXBroker(mqtt_client);
if (MQTT_TYPE_ALIOT == g_mqtt_type) {
aliotKeepAlive(g_mqtt_client);
} else if (MQTT_TYPE_EMQX == g_mqtt_type) {
emqxClientKeepAlive();
}
if (!g_mqtt_client.connected()) {
g_mqtt_client.unsubscribe(g_downstream_topic.c_str());
g_subscribed = false;
connectToIrextIoT();
}
mqtt_client.loop();
}
// not only for IRIS related topic based session
void sendData(const char* topic, const uint8_t *data, int length) {
mqtt_client.publish(topic, data, length);
g_mqtt_client.publish(topic, data, length);
}
void* getSession() {
return &mqtt_client;
return &g_mqtt_client;
}
void checkIrextIoT() {
if (mqtt_client.connected()) {
void checkIrisIoT() {
if (g_mqtt_client.connected()) {
INFOLN("send iris kit heart beat");
sendIrisKitHeartBeat();
}
}
void irisIoTCallback(char *topic, uint8_t *data, uint32_t length) {
INFOF("downstream message received, topic = %s, length = %d\n", topic, length);
if (NULL != g_downstream_topic.c_str() && 0 == strcmp(topic, g_downstream_topic.c_str())) {
handleIrisKitMessage((const char*) data, length);
}
}

View File

@@ -33,24 +33,23 @@
#define TOPIC_DOWNSTREAM_REL "/user/iris_kit_downstream"
#define TOPIC_UPSTREAM_REL "/user/iris_kit_upstream"
typedef enum {
FSM_IDLE = 0,
FSM_CONNECTED = 1,
FSM_ACTIVE = 2,
FSM_MAX = 7,
} ep_state_t;
typedef enum {
MQTT_TYPE_ALIOT = 0,
MQTT_TYPE_EMQX = 1,
MQTT_TYPE_MAX = 7,
} mqtt_type_t;
int connectToIrextIoT();
void irextIoTKeepAlive();
void checkIrextIoT();
void checkIrisIoT();
void* getSession();
void sendData(const char* topic, const uint8_t *data, int length);
int securityPublish(const char *topic, const uint8_t *message, size_t msg_size, void *channel);
void irisIoTCallback(char *topic, uint8_t *data, uint32_t length);
#endif // IRIS_KIT_IOT_H

View File

@@ -223,7 +223,7 @@ void setup() {
connectToIrextIoT();
iotCheckTask.attach_scheduled(MQTT_CHECK_INTERVALS, checkIrextIoT);
iotCheckTask.attach_scheduled(MQTT_CHECK_INTERVALS, checkIrisIoT);
disableIRTask.attach_scheduled(DISABLE_SIGNAL_INTERVALS, disableIR);
}