thermferm/mqtt.c

changeset 550
04c942cded91
parent 549
ee1bcad035f0
child 554
ab9f22ab57b5
--- a/thermferm/mqtt.c	Sat Jul 21 20:40:02 2018 +0200
+++ b/thermferm/mqtt.c	Mon Jul 23 14:41:21 2018 +0200
@@ -26,6 +26,8 @@
 
 extern sys_config       Config;
 extern int		debug;
+extern int		my_shutdown;
+extern int		my_reboot;
 extern const char	UNITMODE[5][8];
 extern const char	PROFSTATE[5][6];
 extern const char	TEMPSTATE[3][8];
@@ -157,10 +159,58 @@
 
 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
 {
+    char		*message_type;
+    struct json_object	*jobj, *metric, *val;
+    time_t		timestamp;
+    int			timediff;
+
     if (message->payloadlen) {
+	/*
+	 * Process received commands
+	 */
+	strtok(message->topic, "/");	// Ignore mbv1.0
+	strtok(NULL, "/");		// Ignore group_id
+	message_type = strtok(NULL, "/");
+
+	jobj = json_tokener_parse(message->payload);
+	if (json_object_object_get_ex(jobj, "timestamp", &val)) {
+	    timestamp = json_object_get_int(val);
+	    timediff = (int)timestamp - time(NULL);
+	    if ((timediff < 61) && (timediff > -61)) {
+		if (json_object_object_get_ex(jobj, "metric", &metric)) {
+		    if ((json_object_object_get_ex(metric, "Node Control/Reboot", &val)) && (strcmp(message_type, "NCMD") == 0)) {
+			if (json_object_get_boolean(val) == true) {
+			    syslog(LOG_NOTICE, "MQTT: `Node Control/Reboot' command");
+			    /*
+			     * Reboot. The erver process will restart which is handled
+			     * in the main thread loop.
+			     */
+			    my_reboot = my_shutdown = TRUE;
+			    return;
+			}
+		    }
+		    if ((json_object_object_get_ex(metric, "Node Control/Rebirth", &val)) && (strcmp(message_type, "NCMD") == 0)) {
+			if (json_object_get_boolean(val) == true) {
+			    /*
+			     * Resend all birth certificates.
+			     */
+			    syslog(LOG_NOTICE, "MQTT: `Node Control/Rebirth' command");
+			    publishNData(true, 0);
+			    publishDBirthAll();
+			    return;
+			}
+		    }
+		    printf("metric: %s\n", (char *)json_object_get_string(metric));
+		    syslog(LOG_NOTICE, "MQTT: %s payload not understood\n", (char *)message->payload);
+		    return;
+		}
+	    } else {
+		syslog(LOG_NOTICE, "MQTT: got payload with timestamp %d seconds error", timediff);
+		return;
+	    }
+	}
+
 	syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
-        // TODO: process subscribed topics here.
-
     } else {
 	syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
     }

mercurial