diff -r 862de87f9f89 -r cdcd07bbee30 thermferm/mqtt.c --- a/thermferm/mqtt.c Sat Apr 29 17:07:36 2017 +0200 +++ b/thermferm/mqtt.c Mon May 08 16:26:02 2017 +0200 @@ -1,5 +1,5 @@ /***************************************************************************** - * Copyright (C) 2016 + * Copyright (C) 2016-2017 * * Michiel Broek * @@ -26,6 +26,12 @@ extern sys_config Config; extern int debug; +extern const char UNITMODE[5][8]; +extern const char PROFSTATE[5][6]; +extern const char TEMPSTATE[3][8]; + +int Sequence = 0; + #ifdef HAVE_MOSQUITTO_H @@ -48,17 +54,58 @@ char my_hostname[256]; + +char *payload_header(void) +{ + char *tmp, buf[128]; + + tmp = xstrcpy((char *)"{\"timestamp\":"); + sprintf(buf, "%ld", time(NULL)); + tmp = xstrcat(tmp, buf); + tmp = xstrcat(tmp, (char *)",\"seq\":"); + sprintf(buf, "%d", Sequence++); + tmp = xstrcat(tmp, buf); + tmp = xstrcat(tmp, (char *)",\"metric\":"); + return tmp; +} + + + +char *topic_base(char *msgtype) +{ + char *tmp; + + tmp = xstrcpy((char *)"mbv1.0/fermenters/"); + tmp = xstrcat(tmp, msgtype); + tmp = xstrcat(tmp, (char *)"/"); + tmp = xstrcat(tmp, my_hostname); + return tmp; +} + + + void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { + char *topic = NULL; + if (mqtt_connect_lost) { - mqtt_connect_lost = FALSE; - syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); + mqtt_connect_lost = FALSE; + syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { - mqtt_status = STATUS_CONNACK_RECVD; + topic = topic_base((char *)"NCMD"); + topic = xstrcat(topic, (char *)"/#"); + mosquitto_subscribe(mosq, NULL, topic, 0); + free(topic); + topic = topic_base((char *)"DCMD"); + topic = xstrcat(topic, (char *)"/#"); + mosquitto_subscribe(mosq, NULL, topic, 0); + free(topic); + topic = NULL; + mqtt_status = STATUS_CONNACK_RECVD; } else { - syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); + syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } @@ -87,6 +134,18 @@ +void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) +{ + int i; + + syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); + for (i = 1; i < qos_count; i++) { + syslog(LOG_NOTICE, " %d", granted_qos[i]); + } +} + + + void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); @@ -96,15 +155,389 @@ +void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) +{ + if (message->payloadlen) { + syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); + // TODO: process subscribed topics here. + + } else { + syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); + } +} + + + +void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) { + // publish the data + if (payload) + mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); + else + mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); +} + + + +char *unit_data(units_list *unit, bool birth) +{ + char *payload = NULL; + char buf[128]; + bool comma = false; + profiles_list *profile; + prof_step *pstep; + + payload = xstrcat(payload, (char *)"{"); + if (birth || unit->mqtt_flag & MQTT_FLAG_MODE) { + // Also send these on mode change + payload = xstrcat(payload, (char *)"\"uuid\":\""); + payload = xstrcat(payload, unit->uuid); + payload = xstrcat(payload, (char *)"\",\"alias\":\""); + payload = xstrcat(payload, unit->alias); + payload = xstrcat(payload, (char *)"\",\"name\":\""); + payload = xstrcat(payload, unit->name); + payload = xstrcat(payload, (char *)"\""); + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_AIR) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->air_address) { + payload = xstrcat(payload, (char *)"\"air\":{\"address\":\""); + payload = xstrcat(payload, unit->air_address); + payload = xstrcat(payload, (char *)"\",\"state\":\""); + payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]); + payload = xstrcat(payload, (char *)"\",\"temperature\":"); + sprintf(buf, "%.3f", unit->air_temperature / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"air\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_BEER) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->beer_address) { + payload = xstrcat(payload, (char *)"\"beer\":{\"address\":\""); + payload = xstrcat(payload, unit->beer_address); + payload = xstrcat(payload, (char *)"\",\"state\":\""); + payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]); + payload = xstrcat(payload, (char *)"\",\"temperature\":"); + sprintf(buf, "%.3f", unit->beer_temperature / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"beer\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_HEATER) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->heater_address) { + payload = xstrcat(payload, (char *)"\"heater\":{\"address\":\""); + payload = xstrcat(payload, unit->heater_address); + payload = xstrcat(payload, (char *)"\",\"state\":"); + sprintf(buf, "%d", unit->heater_state); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"heater\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_COOLER) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->cooler_address) { + payload = xstrcat(payload, (char *)"\"cooler\":{\"address\":\""); + payload = xstrcat(payload, unit->cooler_address); + payload = xstrcat(payload, (char *)"\",\"state\":"); + sprintf(buf, "%d", unit->cooler_state); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"cooler\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_FAN) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->fan_address) { + payload = xstrcat(payload, (char *)"\"fan\":{\"address\":\""); + payload = xstrcat(payload, unit->fan_address); + payload = xstrcat(payload, (char *)"\",\"state\":"); + sprintf(buf, "%d", unit->fan_state); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"fan\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_DOOR) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->door_address) { + payload = xstrcat(payload, (char *)"\"door\":{\"address\":\""); + payload = xstrcat(payload, unit->door_address); + payload = xstrcat(payload, (char *)"\",\"state\":"); + sprintf(buf, "%d", unit->door_state); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"door\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_LIGHT) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->light_address) { + payload = xstrcat(payload, (char *)"\"light\":{\"address\":\""); + payload = xstrcat(payload, unit->light_address); + payload = xstrcat(payload, (char *)"\",\"state\":"); + sprintf(buf, "%d", unit->light_state); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"light\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_PSU) { + if (comma) + payload = xstrcat(payload, (char *)","); + if (unit->psu_address) { + payload = xstrcat(payload, (char *)"\"psu\":{\"address\":\""); + payload = xstrcat(payload, unit->psu_address); + payload = xstrcat(payload, (char *)"\",\"state\":"); + sprintf(buf, "%d", unit->psu_state); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + } else { + payload = xstrcat(payload, (char *)"\"psu\":null"); + } + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_MODE) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"mode\":\""); + payload = xstrcat(payload, (char *)UNITMODE[unit->mode]); + payload = xstrcat(payload, (char *)"\""); + comma = true; + } + if (birth || unit->mqtt_flag & MQTT_FLAG_SP) { + if (unit->mode != UNITMODE_OFF) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"setpoint\":{\"low\":"); + sprintf(buf, "%.1f", unit->PID_heat->SetP); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"high\":"); + sprintf(buf, "%.1f", unit->PID_cool->SetP); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + comma = true; + } + } + if (birth || unit->mqtt_flag & MQTT_FLAG_PROFILE || unit->mqtt_flag & MQTT_FLAG_PERCENT) { + if (unit->mode == UNITMODE_PROFILE && unit->profile) { + for (profile = Config.profiles; profile; profile = profile->next) { + if (strcmp(unit->profile, profile->uuid) == 0) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"profile\":{\"uuid\":\""); + payload = xstrcat(payload, unit->profile); + payload = xstrcat(payload, (char *)",\"name\":\""); + payload = xstrcat(payload, profile->name); + payload = xstrcat(payload, (char *)"\",\"inittemp\":{\"low\":"); + sprintf(buf, "%.1f", profile->inittemp_lo); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"high\":"); + sprintf(buf, "%.1f", profile->inittemp_hi); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"},\"fridgemode\":"); + sprintf(buf, "%d", profile->fridge_mode); + payload = xstrcat(payload, buf); + comma = false; + if (profile->steps) { + payload = xstrcat(payload, (char *)",\"steps\":["); + for (pstep = profile->steps; pstep; pstep = pstep->next) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"{\"resttime\":"); + sprintf(buf, "%d", pstep->resttime); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"steptime\":"); + sprintf(buf, "%d", pstep->steptime); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"target\":{\"low\":"); + sprintf(buf, "%.1f", pstep->target_lo); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"high\":"); + sprintf(buf, "%.1f", pstep->target_hi); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"},\"fridgemode\":"); + sprintf(buf, "%d", pstep->fridge_mode); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + comma = true; + } + payload = xstrcat(payload, (char *)"]"); + } else { + payload = xstrcat(payload, (char *)",\"steps\":null"); + } + payload = xstrcat(payload, (char *)"}"); + break; + } + } + } else { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"profile\":null"); + } + } + payload = xstrcat(payload, (char *)"}"); + + return payload; +} + + + +void publishDBirth(void) +{ + char *payload = NULL; + units_list *unit; + int comma = FALSE; + + payload = payload_header(); + payload = xstrcat(payload, (char *)"{\"units\":["); + for (unit = Config.units; unit; unit = unit->next) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, unit_data(unit, true)); + comma = TRUE; + } + payload = xstrcat(payload, (char *)"]}}"); + publisher(mosq, topic_base((char *)"DBIRTH"), payload, true); + free(payload); + payload = NULL; +} + #endif +void publishDData(units_list *unit) +{ +#ifdef HAVE_MOSQUITTO_H + + char *payload = NULL, *topic = NULL; + + if (mqtt_use) { + payload = payload_header(); + payload = xstrcat(payload, unit_data(unit, false)); + payload = xstrcat(payload, (char *)"}"); + topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/"); + topic = xstrcat(topic, unit->alias); + publisher(mosq, topic, payload, false); + free(payload); + payload = NULL; + free(topic); + topic = NULL; + } +#endif +} + + + +void publishNData(bool birth, int flag) +{ +#ifdef HAVE_MOSQUITTO_H + char *payload = NULL, buf[64]; + struct utsname ubuf; + bool comma = false; + + payload = payload_header(); + payload = xstrcat(payload, (char *)"{"); + + if (birth || flag & MQTT_NODE_CONTROL) { + payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); + comma = true; + } + + if (birth) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); + if (uname(&ubuf) == 0) { + payload = xstrcat(payload, (char *)",\"os\":\""); + payload = xstrcat(payload, ubuf.sysname); + payload = xstrcat(payload, (char *)"\",\"os_version\":\""); + payload = xstrcat(payload, ubuf.release); + payload = xstrcat(payload, (char *)"\""); + } else { + payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); + } + + payload = xstrcat(payload, (char *)",\"FW\":\""); + payload = xstrcat(payload, (char *)VERSION); + payload = xstrcat(payload, (char *)"\"}"); + comma = true; + } + + if (birth || flag & MQTT_NODE_HT) { + if (Config.temp_address || Config.hum_address) { + if (comma) + payload = xstrcat(payload, (char *)","); + payload = xstrcat(payload, (char *)"\"HT\":{"); + if (Config.temp_address) { + payload = xstrcat(payload, (char *)"\"temperature\":"); + sprintf(buf, "%.1f", Config.temp_value / 1000.0); + payload = xstrcat(payload, buf); + } + if (Config.temp_address && Config.hum_address) + payload = xstrcat(payload, (char *)","); + if (Config.hum_address) { + payload = xstrcat(payload, (char *)"\"humidity\":"); + sprintf(buf, "%.1f", Config.hum_value / 1000.0); + payload = xstrcat(payload, buf); + } + payload = xstrcat(payload, (char *)"}"); + } + } + payload = xstrcat(payload, (char *)"}}"); + + if (birth) + publisher(mosq, topic_base((char *)"NBIRTH"), payload, true); + else + publisher(mosq, topic_base((char *)"NDATA"), payload, false); + + free(payload); + payload = NULL; +#endif +} + + + +void publishBirth(void) +{ + publishNData(true, 0); + publishDBirth(); +} + + + + void mqtt_connect(void) { #ifdef HAVE_MOSQUITTO_H - char *id = NULL; - char err[1024]; - int rc; + char *id = NULL; + char err[1024]; + int rc; /* * Initialize mosquitto communication @@ -134,32 +567,23 @@ return; } - if (debug) { - mosquitto_log_callback_set(mosq, my_log_callback); - } - /* * Set our will */ - state = xstrcpy((char *)"clients/"); - state = xstrcat(state, my_hostname); - state = xstrcat(state, (char *)"/thermferm/state"); - if ((rc = mosquitto_will_set(mosq, state, 1, (char *)"0", mqtt_qos, TRUE))) { - if (rc == MOSQ_ERR_INVAL) { - syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: input parameters invalid"); - } else if (rc == MOSQ_ERR_NOMEM) { - syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: Out of Memory"); - } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) { - syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: invalid payload size"); - } + if ((rc = mosquitto_will_set(mosq, topic_base((char *)"NDEATH"), 0, NULL, mqtt_qos, false))) { + if (rc > MOSQ_ERR_SUCCESS) + syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); mosquitto_lib_cleanup(); return; } + mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); + mosquitto_message_callback_set(mosq, my_message_callback); + mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { @@ -178,7 +602,7 @@ * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); - mosquitto_publish(mosq, &mqtt_mid_sent, state, 1, (char *)"1", mqtt_qos, 1); + publishBirth(); } #endif } @@ -189,7 +613,6 @@ { #ifdef HAVE_MOSQUITTO_H int rc; - char buf[128]; if (mqtt_use) { /* @@ -197,9 +620,8 @@ * After that, remove the retained topic. */ syslog(LOG_NOTICE, "MQTT disconnecting"); - sprintf(buf, "0"); - mosquitto_publish(mosq, &mqtt_mid_sent, state, strlen(buf), buf, mqtt_qos, true); - mosquitto_publish(mosq, &mqtt_mid_sent, state, 0, NULL, mqtt_qos, true); + publisher(mosq, topic_base((char *)"DBIRTH"), NULL, true); + publisher(mosq, topic_base((char *)"NBIRTH"), NULL, true); mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = TRUE; @@ -220,6 +642,7 @@ mosquitto_loop_stop(mosq, FALSE); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); + mqtt_use = FALSE; syslog(LOG_NOTICE, "MQTT disconnected"); } #endif @@ -227,60 +650,3 @@ -void mqtt_publish_int(char *uuid, char *tail, int value) -{ -#ifdef HAVE_MOSQUITTO_H - char topic[1024], buf[128]; - - if (mqtt_use) { - snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail); - snprintf(buf, 127, "%d", value); - mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1); - } -#endif -} - - - -void mqtt_publish_float(char *uuid, char *tail, float value, int decimals) -{ -#ifdef HAVE_MOSQUITTO_H - char topic[1024], buf[128]; - - if (mqtt_use) { - snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail); - snprintf(buf, 127, "%.*f", decimals, value); - mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1); - } -#endif -} - - - -void mqtt_publish_str(char *uuid, char *tail, char *value) -{ -#ifdef HAVE_MOSQUITTO_H - char topic[1024], buf[128]; - - if (mqtt_use) { - snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail); - snprintf(buf, 127, "%s", value); - mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1); - } -#endif -} - - - -void mqtt_publish_clear(char *uuid, char *tail) -{ -#ifdef HAVE_MOSQUITTO_H - char topic[1024]; - - if (mqtt_use) { - snprintf(topic, 1023, "bmsd/%s/%s/%s", my_hostname, uuid, tail); - mosquitto_publish(mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, 1); - } -#endif -} -