thermometers/main.c

changeset 6
9db76e20e21e
parent 3
e854e3d704de
child 7
d74b26b2f217
--- a/thermometers/main.c	Tue Apr 22 17:34:24 2014 +0200
+++ b/thermometers/main.c	Tue Apr 22 23:28:17 2014 +0200
@@ -26,11 +26,40 @@
 #include <getopt.h>
 #include <sys/types.h>
 #include <signal.h>
+#include <string.h>
+#include <errno.h>
+
+#include <mosquitto.h>
 
 
 #include "main.h"
 
 
+#define STATUS_CONNECTING 0
+#define STATUS_CONNACK_RECVD 1
+#define STATUS_WAITING 2
+
+/* Global variables for use in callbacks. See sub_client.c for an example of
+ * using a struct to hold variables for use in callbacks. */
+static char *topic = NULL;
+//static char *message = NULL;
+//static long msglen = 0;
+static int qos = 0;
+//static int retain = 0;
+static int status = STATUS_CONNECTING;
+static int mid_sent = 0;
+static int last_mid = -1;
+static int last_mid_sent = -1;
+static bool connected = true;
+//static char *username = NULL;
+//static char *password = NULL;
+static bool disconnect_sent = false;
+static bool quiet = false;
+static bool debug = false;
+static bool shutdown = false;
+
+
+
 void help(void)
 {
     fprintf(stdout, "Usage: thermomeneters [-d] [-h]\n");
@@ -43,23 +72,78 @@
 void die(int onsig)
 {
     switch (onsig) {
-	case SIGHUP:	fprintf(stdout, "[main] Hangup detected");
+	case SIGHUP:	fprintf(stdout, "[main] Hangup detected\n");
 			break;
-	case SIGINT:	fprintf(stdout, "[main] Interrupt from keyboard");
+	case SIGINT:	fprintf(stdout, "[main] Interrupt from keyboard\n");
 			break;
-	case SIGTERM:	fprintf(stdout, "[main] Termination signal received");
+	case SIGTERM:	fprintf(stdout, "[main] Termination signal received\n");
 			break;
-	default:	fprintf(stdout, "[main] die on signal %d", onsig);
+	default:	fprintf(stdout, "[main] die on signal %d\n", onsig);
     }
 
-    exit(onsig);
+    shutdown = true;
+}
+
+
+
+void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
+{
+    int rc = MOSQ_ERR_SUCCESS;
+
+    fprintf(stdout, (char *)"my_connect_callback result=%d\n", result);
+    if (!result) {
+	status = STATUS_CONNACK_RECVD;
+    } else {
+	fprintf(stderr, "%s\n", mosquitto_connack_string(result));
+    }
+}
+
+
+
+void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+    fprintf(stdout, (char *)"my_disconnect_callback\n");
+    connected = false;
+}
+
+
+
+void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
+{
+    fprintf(stdout, (char *)"my_publish_callback mid=%d\n", mid);
+
+    last_mid_sent = mid;
+//        if(mode == MSGMODE_STDIN_LINE){
+//                if(mid == last_mid){
+//                        mosquitto_disconnect(mosq);
+//                        disconnect_sent = true;
+//                }
+//        }else if(disconnect_sent == false){
+//                mosquitto_disconnect(mosq);
+//                disconnect_sent = true;
+//        }
+}
+
+
+
+void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
+{
+        printf("%s\n", str);
 }
 
 
 
 int main(int argc, char *argv[])
 {
-    int		i, c;
+    int			i, c, len, rc, rc2;
+    char		*id = NULL;
+    char		*host = (char *)"lx02.mbse.ym";
+    int			port = 1883;
+    struct mosquitto	*mosq = NULL;
+    char		hostname[256], buf[1024];
+    int			keepalive = 60;
+    unsigned int	max_inflight = 20;
+    char		err[1024];
 
     while (1) {
 	int option_index = 0;
@@ -74,7 +158,7 @@
 	    break;
 
 	switch (c) {
-	    case 'd':	
+	    case 'd':	debug = true;
 			break;
 	    case 'h':	help();
 			return 1;
@@ -91,7 +175,140 @@
 	    signal(i, (void (*))die);
     }
 
+    /*
+     * Initialize mosquitto communication
+     */
+    mosquitto_lib_init();
+    hostname[0] = '\0';
+    gethostname(hostname, 256);
+    hostname[255] = '\0';
+    len = strlen("thermometers/") + 1 + strlen(hostname);
+    id = malloc(len);
+    if(!id) {
+	if (!quiet)
+	    fprintf(stderr, "Error: Out of memory.\n");
+	mosquitto_lib_cleanup();
+	return 1;
+    }
+    snprintf(id, len, "thermometers/%s", hostname);
+    if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
+	/*
+	 * Enforce maximum client id length of 23 characters
+	 */
+	id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+    }
 
+    fprintf(stdout, "id: %s\n", id);
+
+    mosq = mosquitto_new(id, true, NULL);
+    if(!mosq) {
+	switch(errno) {
+	    case ENOMEM:
+		if (!quiet)
+		    fprintf(stderr, "Error: Out of memory.\n");
+		break;
+	    case EINVAL:
+		if (!quiet)
+		    fprintf(stderr, "Error: Invalid id.\n");
+		break;
+	}
+	mosquitto_lib_cleanup();
+	return 1;
+    }
+
+    if(debug) {
+	mosquitto_log_callback_set(mosq, my_log_callback);
+    }
+
+    /*
+     * Set our will
+     */
+    topic = malloc(28 + strlen(hostname));
+    sprintf(topic, "clients/%s/thermometers/state", hostname);
+    sprintf(buf, "0");
+    rc = mosquitto_will_set(mosq, topic, strlen(buf), buf, qos, true);
+    if (rc) {
+        if (rc == MOSQ_ERR_INVAL) {
+            fprintf(stderr, "Input parameters invalid\n");
+        } else if (rc == MOSQ_ERR_NOMEM) {
+            fprintf(stderr, "Out of Memory\n");
+	} else if (rc == MOSQ_ERR_PAYLOAD_SIZE) {
+	    fprintf(stderr, "Invalid payload size\n");
+        }
+        mosquitto_lib_cleanup();
+        return rc;
+    }
+
+    mosquitto_max_inflight_messages_set(mosq, max_inflight);
+    mosquitto_connect_callback_set(mosq, my_connect_callback);
+    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
+    mosquitto_publish_callback_set(mosq, my_publish_callback);
+
+    rc = mosquitto_connect(mosq, host, port, keepalive);
+    if (rc) {
+	if (rc == MOSQ_ERR_ERRNO) {
+	    strerror_r(errno, err, 1024);
+	    fprintf(stderr, "Error: %s\n", err);
+	} else {
+	    fprintf(stderr, "Unable to connect (%d).\n", rc);
+	}
+	mosquitto_lib_cleanup();
+	return rc;
+    }
+
+    /*
+     * Initialise is complete, report our presence state
+     */
+    mosquitto_loop_start(mosq);
+
+//    topic = malloc(28 + strlen(hostname));
+    sprintf(topic, "clients/%s/thermometers/state", hostname);
+    sprintf(buf, "1");
+    rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, 1);
+    free(topic);
+
+
+    fprintf(stdout, (char *)"Enter loop, connected %d\n", connected);
+    do {
+	if (status == STATUS_CONNACK_RECVD) {
+//	    fprintf(stdout, (char *)"Ok\n");
+//                                if(fgets(buf, 1024, stdin)){
+//                                        buf[strlen(buf)-1] = '\0';
+//                                        rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, retain);
+//                                        if(rc2){
+//                                                if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
+//                                                mosquitto_disconnect(mosq);
+//                                        }
+//	    } else
+	    if (shutdown) {
+		fprintf(stdout, (char *)"Shutdown\n");
+		topic = malloc(28 + strlen(hostname));
+		sprintf(topic, "clients/%s/thermometers/state", hostname);
+		sprintf(buf, "0");
+		rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, true);
+		free(topic);
+		last_mid = mid_sent;
+		status = STATUS_WAITING;
+	    }
+	} else if (status == STATUS_WAITING) {
+	    fprintf(stdout, (char *)"Waiting\n");
+	    if (last_mid_sent == last_mid && disconnect_sent == false) {
+		mosquitto_disconnect(mosq);
+		disconnect_sent = true;
+	    }
+	    usleep(100000);
+	}
+	rc = MOSQ_ERR_SUCCESS;
+
+    } while(rc == MOSQ_ERR_SUCCESS && connected);
+    fprintf(stdout, (char *)"Out of loop\n");
+
+    mosquitto_loop_stop(mosq, false);
+
+    mosquitto_destroy(mosq);
+    mosquitto_lib_cleanup();
+
+    fprintf(stdout, (char *)"Bye Bye\n");
     return 0;
 }
 

mercurial