diff -r 8c44725dc4ad -r 9db76e20e21e thermometers/main.c --- a/thermometers/main.c Tue Apr 22 17:34:24 2014 +0200 +++ b/thermometers/main.c Tue Apr 22 23:28:17 2014 +0200 @@ -26,11 +26,40 @@ #include #include #include +#include +#include + +#include #include "main.h" +#define STATUS_CONNECTING 0 +#define STATUS_CONNACK_RECVD 1 +#define STATUS_WAITING 2 + +/* Global variables for use in callbacks. See sub_client.c for an example of + * using a struct to hold variables for use in callbacks. */ +static char *topic = NULL; +//static char *message = NULL; +//static long msglen = 0; +static int qos = 0; +//static int retain = 0; +static int status = STATUS_CONNECTING; +static int mid_sent = 0; +static int last_mid = -1; +static int last_mid_sent = -1; +static bool connected = true; +//static char *username = NULL; +//static char *password = NULL; +static bool disconnect_sent = false; +static bool quiet = false; +static bool debug = false; +static bool shutdown = false; + + + void help(void) { fprintf(stdout, "Usage: thermomeneters [-d] [-h]\n"); @@ -43,23 +72,78 @@ void die(int onsig) { switch (onsig) { - case SIGHUP: fprintf(stdout, "[main] Hangup detected"); + case SIGHUP: fprintf(stdout, "[main] Hangup detected\n"); break; - case SIGINT: fprintf(stdout, "[main] Interrupt from keyboard"); + case SIGINT: fprintf(stdout, "[main] Interrupt from keyboard\n"); break; - case SIGTERM: fprintf(stdout, "[main] Termination signal received"); + case SIGTERM: fprintf(stdout, "[main] Termination signal received\n"); break; - default: fprintf(stdout, "[main] die on signal %d", onsig); + default: fprintf(stdout, "[main] die on signal %d\n", onsig); } - exit(onsig); + shutdown = true; +} + + + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result) +{ + int rc = MOSQ_ERR_SUCCESS; + + fprintf(stdout, (char *)"my_connect_callback result=%d\n", result); + if (!result) { + status = STATUS_CONNACK_RECVD; + } else { + fprintf(stderr, "%s\n", mosquitto_connack_string(result)); + } +} + + + +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + fprintf(stdout, (char *)"my_disconnect_callback\n"); + connected = false; +} + + + +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) +{ + fprintf(stdout, (char *)"my_publish_callback mid=%d\n", mid); + + last_mid_sent = mid; +// if(mode == MSGMODE_STDIN_LINE){ +// if(mid == last_mid){ +// mosquitto_disconnect(mosq); +// disconnect_sent = true; +// } +// }else if(disconnect_sent == false){ +// mosquitto_disconnect(mosq); +// disconnect_sent = true; +// } +} + + + +void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) +{ + printf("%s\n", str); } int main(int argc, char *argv[]) { - int i, c; + int i, c, len, rc, rc2; + char *id = NULL; + char *host = (char *)"lx02.mbse.ym"; + int port = 1883; + struct mosquitto *mosq = NULL; + char hostname[256], buf[1024]; + int keepalive = 60; + unsigned int max_inflight = 20; + char err[1024]; while (1) { int option_index = 0; @@ -74,7 +158,7 @@ break; switch (c) { - case 'd': + case 'd': debug = true; break; case 'h': help(); return 1; @@ -91,7 +175,140 @@ signal(i, (void (*))die); } + /* + * Initialize mosquitto communication + */ + mosquitto_lib_init(); + hostname[0] = '\0'; + gethostname(hostname, 256); + hostname[255] = '\0'; + len = strlen("thermometers/") + 1 + strlen(hostname); + id = malloc(len); + if(!id) { + if (!quiet) + fprintf(stderr, "Error: Out of memory.\n"); + mosquitto_lib_cleanup(); + return 1; + } + snprintf(id, len, "thermometers/%s", hostname); + if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { + /* + * Enforce maximum client id length of 23 characters + */ + id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; + } + fprintf(stdout, "id: %s\n", id); + + mosq = mosquitto_new(id, true, NULL); + if(!mosq) { + switch(errno) { + case ENOMEM: + if (!quiet) + fprintf(stderr, "Error: Out of memory.\n"); + break; + case EINVAL: + if (!quiet) + fprintf(stderr, "Error: Invalid id.\n"); + break; + } + mosquitto_lib_cleanup(); + return 1; + } + + if(debug) { + mosquitto_log_callback_set(mosq, my_log_callback); + } + + /* + * Set our will + */ + topic = malloc(28 + strlen(hostname)); + sprintf(topic, "clients/%s/thermometers/state", hostname); + sprintf(buf, "0"); + rc = mosquitto_will_set(mosq, topic, strlen(buf), buf, qos, true); + if (rc) { + if (rc == MOSQ_ERR_INVAL) { + fprintf(stderr, "Input parameters invalid\n"); + } else if (rc == MOSQ_ERR_NOMEM) { + fprintf(stderr, "Out of Memory\n"); + } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) { + fprintf(stderr, "Invalid payload size\n"); + } + mosquitto_lib_cleanup(); + return rc; + } + + mosquitto_max_inflight_messages_set(mosq, max_inflight); + mosquitto_connect_callback_set(mosq, my_connect_callback); + mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); + mosquitto_publish_callback_set(mosq, my_publish_callback); + + rc = mosquitto_connect(mosq, host, port, keepalive); + if (rc) { + if (rc == MOSQ_ERR_ERRNO) { + strerror_r(errno, err, 1024); + fprintf(stderr, "Error: %s\n", err); + } else { + fprintf(stderr, "Unable to connect (%d).\n", rc); + } + mosquitto_lib_cleanup(); + return rc; + } + + /* + * Initialise is complete, report our presence state + */ + mosquitto_loop_start(mosq); + +// topic = malloc(28 + strlen(hostname)); + sprintf(topic, "clients/%s/thermometers/state", hostname); + sprintf(buf, "1"); + rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, 1); + free(topic); + + + fprintf(stdout, (char *)"Enter loop, connected %d\n", connected); + do { + if (status == STATUS_CONNACK_RECVD) { +// fprintf(stdout, (char *)"Ok\n"); +// if(fgets(buf, 1024, stdin)){ +// buf[strlen(buf)-1] = '\0'; +// rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, retain); +// if(rc2){ +// if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2); +// mosquitto_disconnect(mosq); +// } +// } else + if (shutdown) { + fprintf(stdout, (char *)"Shutdown\n"); + topic = malloc(28 + strlen(hostname)); + sprintf(topic, "clients/%s/thermometers/state", hostname); + sprintf(buf, "0"); + rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, true); + free(topic); + last_mid = mid_sent; + status = STATUS_WAITING; + } + } else if (status == STATUS_WAITING) { + fprintf(stdout, (char *)"Waiting\n"); + if (last_mid_sent == last_mid && disconnect_sent == false) { + mosquitto_disconnect(mosq); + disconnect_sent = true; + } + usleep(100000); + } + rc = MOSQ_ERR_SUCCESS; + + } while(rc == MOSQ_ERR_SUCCESS && connected); + fprintf(stdout, (char *)"Out of loop\n"); + + mosquitto_loop_stop(mosq, false); + + mosquitto_destroy(mosq); + mosquitto_lib_cleanup(); + + fprintf(stdout, (char *)"Bye Bye\n"); return 0; }