Thu, 10 Jan 2019 16:33:42 +0100
Version 0.9.0. Implemented DCMD via mqtt to set stage, mode, setpoint low and high. Implemeted DCMD via mqtt to set heater, cooler, fan and light state. Implemented DCMD via mqtt to set product code and name. Set the PID's in fridge mode without idle range offset, that was an old leftover setting that was obsolete.
/***************************************************************************** * Copyright (C) 2016-2019 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the mbsePi-apps * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * mbsePi-apps is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *****************************************************************************/ #include "thermferm.h" #include "logger.h" #include "devices.h" #include "xutil.h" #include "mqtt.h" extern sys_config Config; extern int debug; extern int my_shutdown; extern int my_reboot; extern const char UNITMODE[5][8]; extern const char UNITSTAGE[4][12]; extern const char PROFSTATE[5][6]; extern const char TEMPSTATE[3][8]; int Sequence = 0; /* Global variables for use in callbacks. */ int mqtt_qos = 0; int mqtt_status = STATUS_CONNECTING; int mqtt_mid_sent = 0; int mqtt_last_mid = -1; int mqtt_last_mid_sent = -1; int mqtt_connected = TRUE; int mqtt_disconnect_sent = FALSE; int mqtt_connect_lost = FALSE; int mqtt_my_shutdown = FALSE; int mqtt_use = FALSE; int keepalive = 60; unsigned int max_inflight = 20; struct mosquitto *mosq = NULL; char *state = NULL; char my_hostname[256]; char *payload_header(void) { char *tmp, buf[128]; tmp = xstrcpy((char *)"{\"timestamp\":"); sprintf(buf, "%ld", time(NULL)); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"seq\":"); sprintf(buf, "%d", Sequence++); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"metric\":"); return tmp; } char *topic_base(char *msgtype) { char *tmp; tmp = xstrcpy((char *)"mbv1.0/fermenters/"); tmp = xstrcat(tmp, msgtype); tmp = xstrcat(tmp, (char *)"/"); tmp = xstrcat(tmp, my_hostname); return tmp; } void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { char *topic = NULL; if (mqtt_connect_lost) { mqtt_connect_lost = FALSE; syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { topic = topic_base((char *)"NCMD"); topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = topic_base((char *)"DCMD"); topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = NULL; mqtt_status = STATUS_CONNACK_RECVD; } else { syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) { if (mqtt_my_shutdown) { syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); mqtt_connected = FALSE; } else { /* * The remote server was brought down. We must keep running */ syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); mqtt_connect_lost = TRUE; } } void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) { mqtt_last_mid_sent = mid; } void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { syslog(LOG_NOTICE, " %d", granted_qos[i]); } } void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); if (debug) fprintf(stdout, "MQTT: %s\n", str); } void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) { char *message_type, *message_node, *message_alias; units_list *unit; struct json_object *jobj, *metric, *val, *setpoint; time_t timestamp; int timediff; if (message->payloadlen) { /* * Process received commands */ strtok(message->topic, "/"); // Ignore mbv1.0 strtok(NULL, "/"); // Ignore group_id message_type = strtok(NULL, "/"); message_node = strtok(NULL, "/\0"); message_alias = strtok(NULL, "\0"); jobj = json_tokener_parse(message->payload); if (json_object_object_get_ex(jobj, "timestamp", &val)) { timestamp = json_object_get_int(val); timediff = (int)timestamp - time(NULL); if ((timediff < 61) && (timediff > -61)) { if (json_object_object_get_ex(jobj, "metric", &metric)) { if ((json_object_object_get_ex(metric, "Node Control/Reboot", &val)) && (strcmp(message_type, "NCMD") == 0)) { if (json_object_get_boolean(val) == true) { syslog(LOG_NOTICE, "MQTT: `Node Control/Reboot' command"); /* * Reboot. The erver process will restart which is handled * in the main thread loop. */ my_reboot = my_shutdown = TRUE; return; } } if ((json_object_object_get_ex(metric, "Node Control/Rebirth", &val)) && (strcmp(message_type, "NCMD") == 0)) { if (json_object_get_boolean(val) == true) { /* * Resend all birth certificates. */ syslog(LOG_NOTICE, "MQTT: `Node Control/Rebirth' command"); publishNData(true, 0); publishDBirthAll(); return; } } if ((strcmp(message_type, "DCMD") == 0) && message_node && message_alias) { syslog(LOG_NOTICE, "%s", (char *)json_object_get_string(metric)); for (unit = Config.units ; unit; unit = unit->next) { if (strcmp(unit->alias, message_alias) == 0) { syslog(LOG_NOTICE, "MQTT: DCMD for %s/%s", (char *)message_node, (char *)message_alias); if (json_object_object_get_ex(metric, "stage", &val)) { // syslog(LOG_NOTICE, "Change state %s", UNITSTAGE[unit->stage]); for (int i = 0; i < 4; i++) { if (strcmp((char *)json_object_get_string(val), UNITSTAGE[i]) == 0) { if (unit->stage != i) { syslog(LOG_NOTICE, "DCMD change fermenter %s: stage to %s", message_alias, UNITSTAGE[i]); unit->mqtt_flag |= MQTT_FLAG_DATA; unit->stage = i; } break; } } } printf("start mode\n"); if (json_object_object_get_ex(metric, "mode", &val)) { for (int i = 0; i < 4; i++) { if (strcmp((char *)json_object_get_string(val), UNITMODE[i]) == 0) { if (unit->mode != i) { syslog(LOG_NOTICE, "DCMD change fermenter %s: mode to %s", message_alias, UNITMODE[i]); unit->mqtt_flag |= MQTT_FLAG_DATA; /* Initialize log if the unit is turned on */ if ((unit->mode == UNITMODE_OFF) && (i != UNITMODE_OFF)) { initlog(unit->product_code, unit->product_name); unit->mqtt_flag |= MQTT_FLAG_BIRTH; } else if ((unit->mode != UNITMODE_OFF) && (i == UNITMODE_OFF)) { unit->mqtt_flag |= MQTT_FLAG_DEATH; } syslog(LOG_NOTICE, "Fermenter unit %s mode %s to %s", unit->uuid, UNITMODE[unit->mode], UNITMODE[i]); unit->mode = i; /* Allways turn everything off after a mode change */ unit->PID_cool->OutP = unit->PID_heat->OutP = 0.0; unit->PID_cool->Mode = unit->PID_heat->Mode = PID_MODE_NONE; unit->heater_state = unit->cooler_state = unit->fan_state = unit->light_state = 0; unit->heater_wait = unit->cooler_wait = unit->fan_wait = unit->light_wait = 0; device_out(unit->heater_address, unit->heater_state); device_out(unit->cooler_address, unit->cooler_state); device_out(unit->fan_address, unit->fan_state); device_out(unit->light_address, unit->light_state); if (unit->mode == UNITMODE_PROFILE) { /* * Set a sane default until it will be overruled by the * main processing loop. */ unit->prof_target_lo = unit->prof_target_hi = 20.0; unit->prof_fridge_mode = 0; if (unit->profile) { unit->mqtt_flag |= MQTT_FLAG_DATA; } } } break; } } } printf("start setpoint\n"); if (json_object_object_get_ex(metric, "setpoint", &setpoint)) { if ((unit->mode == UNITMODE_FRIDGE) || (unit->mode == UNITMODE_BEER)) { /* * Only set new setpoints if running as FRIDGE or in BEER mode. */ if (json_object_object_get_ex(setpoint, "low", &val)) { unit->PID_heat->SetP = json_object_get_double(val); } if (json_object_object_get_ex(setpoint, "high", &val)) { unit->PID_cool->SetP = json_object_get_double(val); } if (unit->mode == UNITMODE_FRIDGE) { unit->fridge_set = unit->PID_heat->SetP + ((unit->PID_cool->SetP - unit->PID_heat->SetP) / 2); } else { unit->beer_set = unit->PID_heat->SetP + ((unit->PID_cool->SetP - unit->PID_heat->SetP) / 2); } unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: setpoints %.1f %.1f", message_alias, unit->PID_heat->SetP, unit->PID_cool->SetP); } } printf("start heater\n"); if ((json_object_object_get_ex(metric, "heater", &setpoint)) && (unit->mode == UNITMODE_NONE)) { if (json_object_object_get_ex(setpoint, "state", &val)) { if (json_object_get_int(val) != unit->heater_state) { unit->heater_state = json_object_get_int(val); if (unit->heater_state) // Safety unit->cooler_state = 0; unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: heater_state to %d", message_alias, unit->heater_state); } } } if ((json_object_object_get_ex(metric, "cooler", &setpoint)) && (unit->mode == UNITMODE_NONE)) { if (json_object_object_get_ex(setpoint, "state", &val)) { if (json_object_get_int(val) != unit->cooler_state) { unit->cooler_state = json_object_get_int(val); if (unit->cooler_state) unit->heater_state = 0; unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: cooler_state to %d", message_alias, unit->cooler_state); } } } if ((json_object_object_get_ex(metric, "fan", &setpoint)) && (unit->mode == UNITMODE_NONE)) { if (json_object_object_get_ex(setpoint, "state", &val)) { if (json_object_get_int(val) != unit->fan_state) { unit->fan_state = json_object_get_int(val); unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: fan_state to %d", message_alias, unit->fan_state); } } } if ((json_object_object_get_ex(metric, "light", &setpoint)) && (unit->mode == UNITMODE_NONE)) { if (json_object_object_get_ex(setpoint, "state", &val)) { if (json_object_get_int(val) != unit->light_state) { unit->light_state = json_object_get_int(val); unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: light_state to %d", message_alias, unit->light_state); } } } printf("start product\n"); if ((json_object_object_get_ex(metric, "product", &setpoint)) && (unit->mode == UNITMODE_OFF)) { if (json_object_object_get_ex(setpoint, "code", &val)) { if (strcmp((char *)json_object_get_string(val), unit->product_code)) { free(unit->product_code); unit->product_code = xstrcpy((char *)json_object_get_string(val)); unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: product_code to `%s'", message_alias, unit->product_code); } } if (json_object_object_get_ex(setpoint, "name", &val)) { if (strcmp((char *)json_object_get_string(val), unit->product_name)) { free(unit->product_name); unit->product_name = xstrcpy((char *)json_object_get_string(val)); unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "DCMD change fermenter %s: product_name to `%s'", message_alias, unit->product_name); } } } } if (unit->mqtt_flag) { printf("do mqtt flag\n"); if (debug) fprintf(stdout, "flag value %d\n", unit->mqtt_flag); if (unit->mqtt_flag & MQTT_FLAG_BIRTH) { publishDBirth(unit); } else { publishDData(unit); } if (unit->mqtt_flag & MQTT_FLAG_DEATH) { publishDDeath(unit); } unit->mqtt_flag |= MQTT_FLAG_DLOG; // Something to log } printf("einde unit %s\n", unit->alias); } printf("return\n"); return; } printf("metric: %s\n", (char *)json_object_get_string(metric)); syslog(LOG_NOTICE, "MQTT: %s payload not understood\n", (char *)message->payload); return; } } else { syslog(LOG_NOTICE, "MQTT: got payload with timestamp %d seconds error", timediff); return; } } syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); } else { syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); } } void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) { // publish the data if (payload) mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); else mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); } void pub_domoticz_temp(int idx, int value) { char *dload = NULL; char sidx[10], sval[20]; if (idx == 0) return; sprintf(sidx, "%d", idx); sprintf(sval, "%.3f", value / 1000.0); dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":"); dload = xstrcat(dload, sidx); dload = xstrcat(dload, (char *)",\"nvalue\":0,\"svalue\":\""); dload = xstrcat(dload, sval); dload = xstrcat(dload, (char *)"\"}"); publisher(mosq, (char *)"domoticz/in", dload, false); free(dload); dload = NULL; } void pub_domoticz_output(int idx, int value) { char *dload = NULL; char sidx[10], sval[10]; if (idx == 0) return; sprintf(sidx, "%d", idx); sprintf(sval, "%d", value); dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":"); dload = xstrcat(dload, sidx); dload = xstrcat(dload, (char *)",\"nvalue\":"); if (value >= 50) dload = xstrcat(dload, (char *)"1"); else dload = xstrcat(dload, (char *)"0"); dload = xstrcat(dload, (char *)",\"svalue\":\""); dload = xstrcat(dload, sval); dload = xstrcat(dload, (char *)"\"}"); publisher(mosq, (char *)"domoticz/in", dload, false); free(dload); dload = NULL; } char *unit_data(units_list *unit, bool birth) { char *payload = NULL; char buf[128]; bool comma = false; profiles_list *profile; prof_step *pstep; payload = xstrcpy((char *)"{"); /* * Fixed unit values, never change these! */ if (birth) { payload = xstrcat(payload, (char *)"\"uuid\":\""); payload = xstrcat(payload, unit->uuid); payload = xstrcat(payload, (char *)"\",\"alias\":\""); payload = xstrcat(payload, unit->alias); payload = xstrcat(payload, (char *)"\","); } /* * Product (beer) loaded information. */ if ((unit->product_name && strlen(unit->product_name)) || (unit->product_code && strlen(unit->product_code)) || (unit->product_uuid && strlen(unit->product_uuid))) { comma = false; payload = xstrcat(payload, (char *)"\"product\":{"); if (unit->product_uuid && strlen(unit->product_uuid) && strcmp((char *)"(null)", unit->product_uuid)) { payload = xstrcat(payload, (char *)"\"uuid\":\""); payload = xstrcat(payload, unit->product_uuid); payload = xstrcat(payload, (char *)"\""); comma = true; } if (unit->product_code && strlen(unit->product_code)) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"code\":\""); payload = xstrcat(payload, unit->product_code); payload = xstrcat(payload, (char *)"\""); comma = true; } if (unit->product_name && strlen(unit->product_name)) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"name\":\""); payload = xstrcat(payload, unit->product_name); payload = xstrcat(payload, (char *)"\""); } payload = xstrcat(payload, (char *)"}"); } /* * Air temperature sensor */ if (unit->air_address) { payload = xstrcat(payload, (char *)",\"air\":{\"address\":\""); payload = xstrcat(payload, unit->air_address); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", unit->air_temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"air\":null"); } /* * Beer temperature sensor */ if (unit->beer_address) { payload = xstrcat(payload, (char *)",\"beer\":{\"address\":\""); payload = xstrcat(payload, unit->beer_address); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", unit->beer_temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"beer\":null"); } /* * External chiller temperature sensor */ if (unit->chiller_address) { payload = xstrcat(payload, (char *)",\"chiller\":{\"address\":\""); payload = xstrcat(payload, unit->chiller_address); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)TEMPSTATE[unit->chiller_state]); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"chiller\":null"); } /* * Heater control, power 0..100% and usage count. */ if (unit->heater_address) { payload = xstrcat(payload, (char *)",\"heater\":{\"address\":\""); payload = xstrcat(payload, unit->heater_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->heater_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->heater_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"heater\":null"); } /* * Cooler control, power 0..100% and usage counter. */ if (unit->cooler_address) { payload = xstrcat(payload, (char *)",\"cooler\":{\"address\":\""); payload = xstrcat(payload, unit->cooler_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->cooler_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->cooler_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"cooler\":null"); } /* * Fan control, 0..100% and usage counter. */ if (unit->fan_address) { payload = xstrcat(payload, (char *)",\"fan\":{\"address\":\""); payload = xstrcat(payload, unit->fan_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->fan_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->fan_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"fan\":null"); } /* * Interior lights control, 0..100% and usage counter. */ if (unit->light_address) { payload = xstrcat(payload, (char *)",\"light\":{\"address\":\""); payload = xstrcat(payload, unit->light_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->light_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->light_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"light\":null"); } /* * Door sensor. */ if (unit->door_address) { payload = xstrcat(payload, (char *)",\"door\":{\"address\":\""); payload = xstrcat(payload, unit->door_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->door_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"door\":null"); } /* * PSU status */ if (unit->psu_address) { payload = xstrcat(payload, (char *)",\"psu\":{\"address\":\""); payload = xstrcat(payload, unit->psu_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->psu_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"psu\":null"); } /* * Working mode and setpoints */ payload = xstrcat(payload, (char *)",\"stage\":\""); payload = xstrcat(payload, (char *)UNITSTAGE[unit->stage]); payload = xstrcat(payload, (char *)"\",\"mode\":\""); payload = xstrcat(payload, (char *)UNITMODE[unit->mode]); payload = xstrcat(payload, (char *)"\",\"setpoint\":{\"low\":"); sprintf(buf, "%.1f", unit->PID_heat->SetP); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", unit->PID_cool->SetP); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"alarm\":"); sprintf(buf, "%d", unit->alarm_flag); payload = xstrcat(payload, buf); /* * Loaded profile and state */ if (unit->profile) { for (profile = Config.profiles; profile; profile = profile->next) { if (strcmp(unit->profile, profile->uuid) == 0) { payload = xstrcat(payload, (char *)",\"profile\":{\"uuid\":\""); payload = xstrcat(payload, unit->profile); payload = xstrcat(payload, (char *)"\",\"name\":\""); payload = xstrcat(payload, profile->name); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)PROFSTATE[unit->prof_state]); payload = xstrcat(payload, (char *)"\",\"percent\":"); sprintf(buf, "%d", unit->prof_percent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"inittemp\":{\"low\":"); sprintf(buf, "%.1f", profile->inittemp_lo); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", profile->inittemp_hi); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"fridgemode\":"); sprintf(buf, "%d", profile->fridge_mode); payload = xstrcat(payload, buf); comma = false; if (profile->steps) { payload = xstrcat(payload, (char *)",\"steps\":["); for (pstep = profile->steps; pstep; pstep = pstep->next) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"{\"resttime\":"); sprintf(buf, "%d", pstep->resttime); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"steptime\":"); sprintf(buf, "%d", pstep->steptime); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"target\":{\"low\":"); sprintf(buf, "%.1f", pstep->target_lo); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", pstep->target_hi); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"fridgemode\":"); sprintf(buf, "%d", pstep->fridge_mode); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); comma = true; } payload = xstrcat(payload, (char *)"]"); } else { payload = xstrcat(payload, (char *)",\"steps\":null"); } payload = xstrcat(payload, (char *)"}"); break; } } } else { payload = xstrcat(payload, (char *)",\"profile\":null"); } payload = xstrcat(payload, (char *)"}"); return payload; } /** * @brief Publish DBIRTH for all active units. If there are no active units, don't * publish anything. This function should be called at program start. */ void publishDBirthAll(void) { char *topic = NULL, *payload = NULL, *payloadu = NULL; units_list *unit; int comma = FALSE; payload = payload_header(); payload = xstrcat(payload, (char *)"{\"units\":["); for (unit = Config.units; unit; unit = unit->next) { if (unit->mode != UNITMODE_OFF) { if (comma) payload = xstrcat(payload, (char *)","); payloadu = unit_data(unit, true); payload = xstrcat(payload, payloadu); comma = TRUE; free(payloadu); payloadu = NULL; } } if (comma) { // Only publish if there is at least one unit active. payload = xstrcat(payload, (char *)"]}}"); topic = topic_base((char *)"DBIRTH"); publisher(mosq, topic, payload, true); free(topic); topic = NULL; } free(payload); payload = NULL; } void publishDData(units_list *unit) { char *payload = NULL, *payloadu = NULL, *topic = NULL; if (mqtt_use) { payload = payload_header(); payloadu = unit_data(unit, false); payload = xstrcat(payload, payloadu); payload = xstrcat(payload, (char *)"}"); topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, payload, false); free(payload); payload = NULL; free(payloadu); payloadu = NULL; free(topic); topic = NULL; } } void publishDBirth(units_list *unit) { char *payload = NULL, *topic = NULL; if (mqtt_use) { payload = payload_header(); payload = xstrcat(payload, unit_data(unit, true)); payload = xstrcat(payload, (char *)"}"); topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, payload, true); free(payload); payload = NULL; free(topic); topic = NULL; } } void publishDDeath(units_list *unit) { char *topic = NULL; if (mqtt_use) { // First delete presistent DBIRTH topic topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; topic = xstrcat(topic_base((char *)"DDEATH"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; } } void publishDLog(units_list *unit) { char buf[32], *payload = NULL, *topic = NULL; bool comma = false; if (mqtt_use) { payload = payload_header(); payload = xstrcat(payload, (char *)"{"); if ((unit->product_name && strlen(unit->product_name)) || (unit->product_code && strlen(unit->product_code)) || (unit->product_uuid && strlen(unit->product_uuid))) { comma = false; payload = xstrcat(payload, (char *)"\"product\":{"); if (unit->product_uuid && strlen(unit->product_uuid) && strcmp((char *)"(null)", unit->product_uuid)) { payload = xstrcat(payload, (char *)"\"uuid\":\""); payload = xstrcat(payload, unit->product_uuid); payload = xstrcat(payload, (char *)"\""); comma = true; } if (unit->product_code && strlen(unit->product_code)) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"code\":\""); payload = xstrcat(payload, unit->product_code); payload = xstrcat(payload, (char *)"\""); comma = true; } if (unit->product_name && strlen(unit->product_name)) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"name\":\""); payload = xstrcat(payload, unit->product_name); payload = xstrcat(payload, (char *)"\""); } payload = xstrcat(payload, (char *)"}"); comma = true; } if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"stage\":\""); payload = xstrcat(payload, (char *)UNITSTAGE[unit->stage]); payload = xstrcat(payload, (char *)"\",\"mode\":\""); payload = xstrcat(payload, (char *)UNITMODE[unit->mode]); payload = xstrcat(payload, (char *)"\",\"temperature\":{"); comma = false; if (unit->air_address) { payload = xstrcat(payload, (char *)"\"air\":"); sprintf(buf, "%.3f", unit->air_temperature / 1000.0); payload = xstrcat(payload, buf); comma = true; } if (unit->beer_address) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"beer\":"); sprintf(buf, "%.3f", unit->beer_temperature / 1000.0); payload = xstrcat(payload, buf); comma = true; } if (unit->chiller_address) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"chiller\":"); sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0); payload = xstrcat(payload, buf); comma = true; } if (Config.temp_address) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"room\":"); sprintf(buf, "%.1f", Config.temp_value / 1000.0); payload = xstrcat(payload, buf); } payload = xstrcat(payload, (char *)"},\"setpoint\":{\"low\":"); sprintf(buf, "%.1f", unit->PID_heat->SetP); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", unit->PID_cool->SetP); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); if (unit->heater_address) { payload = xstrcat(payload, (char *)",\"heater\":{\"power\":"); sprintf(buf, "%d", unit->heater_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->heater_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } if (unit->cooler_address) { payload = xstrcat(payload, (char *)",\"cooler\":{\"power\":"); sprintf(buf, "%d", unit->cooler_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->cooler_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } if (unit->fan_address) { payload = xstrcat(payload, (char *)",\"fan\":{\"power\":"); sprintf(buf, "%d", unit->fan_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"usage\":"); sprintf(buf, "%d", unit->fan_usage); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } // sg if (unit->event_msg) { payload = xstrcat(payload, (char *)",\"event\":\""); payload = xstrcat(payload, unit->event_msg); payload = xstrcat(payload, (char *)"\""); } payload = xstrcat(payload, (char *)",\"fermenter_uuid\":\""); payload = xstrcat(payload, unit->uuid); payload = xstrcat(payload, (char *)"\"}}"); topic = xstrcat(topic_base((char *)"DLOG"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, payload, true); free(payload); payload = NULL; free(topic); topic = NULL; } } void publishNData(bool birth, int flag) { char *topic = NULL, *payload = NULL, sidx[10], buf[64]; struct utsname ubuf; bool comma = false; payload = payload_header(); payload = xstrcat(payload, (char *)"{"); if (birth) { payload = xstrcat(payload, (char *)"\"uuid\":\""); payload = xstrcat(payload, Config.uuid); payload = xstrcat(payload, (char *)"\","); #ifdef HAVE_WIRINGPI_H /* * Get the info from the WiringPi libary */ int model, rev, mem, maker, warranty; piBoardId (&model, &rev, &mem, &maker, &warranty); payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Raspberry Pi "); payload = xstrcat(payload, (char *)piMakerNames[maker]); payload = xstrcat(payload, (char *)"\",\"hardwaremodel\":\""); payload = xstrcat(payload, (char *)piModelNames[model]); payload = xstrcat(payload, (char *)" rev "); payload = xstrcat(payload, (char *)piRevisionNames[rev]); payload = xstrcat(payload, (char *)"\""); #else if (uname(&ubuf) == 0) { payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\""); payload = xstrcat(payload, ubuf.machine); payload = xstrcat(payload, (char *)"\",\"hardwaremodel\":\"Unknown\""); } else { payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); } #endif if (uname(&ubuf) == 0) { payload = xstrcat(payload, (char *)",\"os\":\""); payload = xstrcat(payload, ubuf.sysname); payload = xstrcat(payload, (char *)"\",\"os_version\":\""); payload = xstrcat(payload, ubuf.release); payload = xstrcat(payload, (char *)"\""); } else { payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); } payload = xstrcat(payload, (char *)",\"FW\":\""); payload = xstrcat(payload, (char *)VERSION); payload = xstrcat(payload, (char *)"\"}"); comma = true; } if (Config.temp_address || Config.hum_address) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"THB\":{"); if (Config.temp_address) { payload = xstrcat(payload, (char *)"\"temperature\":"); sprintf(buf, "%.1f", Config.temp_value / 1000.0); payload = xstrcat(payload, buf); } if (Config.temp_address && Config.hum_address) payload = xstrcat(payload, (char *)","); if (Config.hum_address) { payload = xstrcat(payload, (char *)"\"humidity\":"); sprintf(buf, "%.1f", Config.hum_value / 1000.0); payload = xstrcat(payload, buf); } payload = xstrcat(payload, (char *)"}"); } /* * Find our network information */ FILE *f; char line[100], *ifname, *c, ip[NI_MAXHOST]; struct ifaddrs *ifaddr, *ifa; int family, s; if (birth && (f = fopen("/proc/net/route" , "r"))) { while (fgets(line, 100, f)) { ifname = strtok(line , " \t"); c = strtok(NULL , " \t"); // Take the entry with destination '00000000' if (ifname != NULL && c != NULL && (strcmp(c , "00000000") == 0)) { if (getifaddrs(&ifaddr) == -1) { syslog(LOG_NOTICE, "error getifaddrs error %d", errno); goto neterr; } //Walk through linked list, maintaining head pointer so we can free list later for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { if (ifa->ifa_addr == NULL) { continue; } family = ifa->ifa_addr->sa_family; if ((strcmp( ifa->ifa_name, ifname) == 0) && (family == AF_INET)) { s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), ip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); if (s != 0) { syslog(LOG_NOTICE, "getnameinfo() error=%d for %s", errno, ifname); goto neterr; } payload = xstrcat(payload, (char *)",\"net\":{\"address\":\""); payload = xstrcat(payload, ip); payload = xstrcat(payload, (char *)"\",\"ifname\":\""); payload = xstrcat(payload, ifname); payload = xstrcat(payload, (char *)"\",\"rssi\":0}"); // TODO: get rssi if wlan interface. } } freeifaddrs(ifaddr); } } fclose(f); } neterr: payload = xstrcat(payload, (char *)"}}"); if (birth) { topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, payload, true); } else { topic = topic_base((char *)"NDATA"); publisher(mosq, topic, payload, false); } free(topic); topic = NULL; free(payload); payload = NULL; if ((Config.temp_address || Config.hum_address) && Config.temp_hum_idx) { sprintf(sidx, "%d", Config.temp_hum_idx); sprintf(buf, "%.1f;%.1f;0", Config.temp_value / 1000.0, Config.hum_value / 1000.0); payload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":"); payload = xstrcat(payload, sidx); payload = xstrcat(payload, (char *)",\"nvalue\":0,\"svalue\":\""); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"\"}"); publisher(mosq, (char *)"domoticz/in", payload, false); free(payload); payload = NULL; } } void mqtt_connect(void) { char *id = NULL, *topic; char err[1024]; int rc; /* * Initialize mosquitto communication */ gethostname(my_hostname, 255); mosquitto_lib_init(); id = xstrcpy((char *)"thermferm/"); id = xstrcat(id, my_hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { /* * Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } mosq = mosquitto_new(id, TRUE, NULL); if (!mosq) { switch(errno) { case ENOMEM: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); break; case EINVAL: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); break; } mosquitto_lib_cleanup(); return; } free(id); id = NULL; /* * Set our will */ topic = topic_base((char *)"NDEATH"); if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { if (rc > MOSQ_ERR_SUCCESS) syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); mosquitto_lib_cleanup(); return; } free(topic); if (debug) mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { strerror_r(errno, err, 1024); syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); } else { syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); } mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); } else { mqtt_use = TRUE; syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); /* * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); publishNData(true, 0); } } void mqtt_disconnect(void) { int rc; char *topic; if (mqtt_use) { /* * Final publish 0 to clients/<hostname>/thermferm/state * After that, remove the retained topic. */ syslog(LOG_NOTICE, "MQTT disconnecting"); topic = topic_base((char *)"DBIRTH"); publisher(mosq, topic, NULL, true); // Not always needed, but ... free(topic); topic = topic_base((char *)"DDEATH"); publisher(mosq, topic, NULL, true); free(topic); topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, NULL, true); free(topic); topic = topic_base((char *)"NDEATH"); publisher(mosq, topic, NULL, true); free(topic); mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = TRUE; do { if (mqtt_status == STATUS_WAITING) { if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { mosquitto_disconnect(mosq); mqtt_disconnect_sent = TRUE; } usleep(100000); } rc = MOSQ_ERR_SUCCESS; } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); mosquitto_loop_stop(mosq, FALSE); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mqtt_use = FALSE; mqtt_status = STATUS_CONNECTING; mqtt_mid_sent = 0; mqtt_last_mid = -1; mqtt_last_mid_sent = -1; mqtt_connected = TRUE; mqtt_disconnect_sent = FALSE; mqtt_connect_lost = FALSE; mqtt_my_shutdown = FALSE; syslog(LOG_NOTICE, "MQTT disconnected"); } }