Sat, 20 Apr 2024 11:34:00 +0200
Version 0.9.19a1
/** * @file websocket.c * @brief WebSockets interface * @author Michiel Broek <mbroek at mbse dot eu> * * Copyright (C) 2024 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the mbsePi-apps * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * bms is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include "thermferm.h" #include "xutil.h" #include "devices.h" #include "websocket.h" #include <libwebsockets.h> extern sys_config Config; extern int debug; extern const char UNITMODE[5][8]; extern const char UNITSTAGE[4][12]; //extern const char PROFSTATE[5][6]; //extern const char TEMPSTATE[3][8]; int my_ws_shutdown = 0; int my_ws_state = 0; struct lws_context *context; int ws_clients = 0; long ws_pingno = 0; time_t last_msg = 0; pthread_mutex_t ws_mutex; /* * Based on lws-mirror-protocol */ #define MAX_MESSAGE_QUEUE 512 #define WS_INBUF 2048 /* * one of these created for each message */ struct a_message { void *payload; /* is malloc'd */ size_t len; }; static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; static int ringbuffer_head; void fermenter_ws_receive(char *buf) { struct json_object *val, *val2, *jobj, *profile; units_list *unit; bool changed; jobj = json_tokener_parse(buf); json_object_object_get_ex(jobj, "unit", &val); for (unit = Config.units ; unit; unit = unit->next) { if (strcmp((char *)json_object_get_string(val), unit->alias) == 0) { /* * Setpoints * {"type":"fermenter","unit":"unit0","setpoint_low":20.3,"setpoint_high":20.7} */ if ((unit->mode == UNITMODE_FRIDGE) || (unit->mode == UNITMODE_BEER)) { changed = false; if (json_object_object_get_ex(jobj, "setpoint_low", &val)) { if (unit->PID_heat->SetP != json_object_get_double(val)) { changed = true; syslog(LOG_NOTICE, "ws: unit %s setpoint low from %.1f to %.1f", unit->alias, unit->PID_heat->SetP, json_object_get_double(val)); unit->PID_heat->SetP = json_object_get_double(val); } } if (json_object_object_get_ex(jobj, "setpoint_high", &val)) { if (unit->PID_cool->SetP != json_object_get_double(val)) { changed = true; syslog(LOG_NOTICE, "ws: unit %s setpoint high from %.1f to %.1f", unit->alias, unit->PID_cool->SetP, json_object_get_double(val)); unit->PID_cool->SetP = json_object_get_double(val); } } if (changed) { if (unit->mode == UNITMODE_FRIDGE) { unit->fridge_set_lo = unit->PID_heat->SetP; unit->fridge_set_hi = unit->PID_cool->SetP; } else { unit->beer_set_lo = unit->PID_heat->SetP; unit->beer_set_hi = unit->PID_cool->SetP; } unit->mqtt_flag |= MQTT_FLAG_DATA; return; } } /* * Unit mode * {"type":"fermenter","unit":"unit0","mode":"NONE"} */ if (json_object_object_get_ex(jobj, "mode", &val)) { for (int i = 0; i < 5; i++) { if (strcmp((char *)json_object_get_string(val), UNITMODE[i]) == 0) { if (unit->mode != i) { unit->mqtt_flag |= MQTT_FLAG_DATA; /* Initialize log if the unit is turned on */ if ((unit->mode == UNITMODE_OFF) && (i != UNITMODE_OFF)) { unit->mqtt_flag |= MQTT_FLAG_BIRTH; } if (i == UNITMODE_PROFILE) { /* Do some checks and refuse profile mode cannot be set */ if (unit->profile_uuid == NULL) { syslog(LOG_NOTICE, "ws: unit %s refuse mode profile, not loaded", unit->alias); break; } } syslog(LOG_NOTICE, "ws: unit %s mode to %s", unit->alias, UNITMODE[i]); unit->mode = i; if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg) unit->event_msg = xstrcpy((char *)UNITMODE[i]); /* Allways turn everything off after a mode change */ unit->PID_cool->OutP = unit->PID_heat->OutP = 0.0; unit->PID_cool->Mode = unit->PID_heat->Mode = PID_MODE_NONE; unit->heater_state = unit->cooler_state = unit->fan_state = unit->light_state = unit->light_timer = 0; unit->heater_wait = unit->cooler_wait = unit->fan_wait = unit->light_wait = 0; device_out(unit->heater_address, unit->heater_state); device_out(unit->cooler_address, unit->cooler_state); device_out(unit->fan_address, unit->fan_state); device_out(unit->light_address, unit->light_state); if (unit->mode == UNITMODE_PROFILE) { /* * Set a sane default until it will be overruled by the * main processing loop. */ unit->prof_target_lo = unit->profile_inittemp_lo; unit->prof_target_hi = unit->profile_inittemp_hi;; unit->prof_fridge_mode = 0; unit->prof_state = PROFILE_OFF; unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0; unit->prof_peak_abs = unit->prof_peak_rel = 0.0; } } return; } } } /* * Unit stage * {"type":"fermenter","unit":"unit0","stage":"SECONDARY"} */ if (json_object_object_get_ex(jobj, "stage", &val)) { for (int i = 0; i < 4; i++) { if (strcmp((char *)json_object_get_string(val), UNITSTAGE[i]) == 0) { if (unit->stage != i) { syslog(LOG_NOTICE, "ws: unit %s: stage to %s", unit->alias, UNITSTAGE[i]); unit->mqtt_flag |= MQTT_FLAG_DATA; unit->stage = i; if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg) unit->event_msg = xstrcpy((char *)UNITSTAGE[i]); } return; } } } /* * Unit heater and cooler switch * {"type":"fermenter","unit":"unit0","heater_state":100,"cooler_state":0} */ if ((json_object_object_get_ex(jobj, "heater_state", &val)) && (json_object_object_get_ex(jobj, "cooler_state", &val2)) && (unit->mode == UNITMODE_NONE)) { if (json_object_get_int(val) != unit->heater_state) unit->heater_state = json_object_get_int(val); if (json_object_get_int(val2) != unit->cooler_state) unit->cooler_state = json_object_get_int(val2); if (unit->heater_state && unit->cooler_state) unit->heater_state = unit->cooler_state = 0; // Safety unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "ws: unit %s heater_state to %d, cooler_state to %d", unit->alias, unit->heater_state, unit->cooler_state); return; } /* * Unit fan switch * {"type":"fermenter","unit":"unit0","fan_state":0} */ if ((json_object_object_get_ex(jobj, "fan_state", &val)) && (unit->mode == UNITMODE_NONE)) { if (json_object_get_int(val) != unit->fan_state) unit->fan_state = json_object_get_int(val); unit->mqtt_flag |= MQTT_FLAG_DATA; syslog(LOG_NOTICE, "ws: unit %s fan_state to %d", unit->alias, unit->fan_state); return; } /* * We don't implement "light", "product", "profile" download here. * But "profile" commands are implemented. That means a profile * must be installed by bmsd via mqtt. * {"type":"fermenter","unit":"unit0","profile":{"command":"pause"}} * off pause start abort done */ if ((json_object_object_get_ex(jobj, "profile", &profile)) && (unit->mode == UNITMODE_PROFILE)) { if (json_object_object_get_ex(profile, "command", &val)) { char *cmd = xstrcpy((char *)json_object_get_string(val)); syslog(LOG_NOTICE, "ws: profile command `%s'", cmd); if ((! strcmp(cmd, (char *)"off")) && (unit->prof_state == PROFILE_DONE)) { unit->prof_state = PROFILE_OFF; syslog(LOG_NOTICE, "ws: unit %s profile to OFF", unit->alias); unit->mqtt_flag |= MQTT_FLAG_DATA; } else if (! strcmp(cmd, (char *)"pause")) { if (unit->prof_state == PROFILE_RUN) { unit->prof_state = PROFILE_PAUSE; syslog(LOG_NOTICE, "ws: unit %s profile to PAUSE", unit->alias); unit->mqtt_flag |= MQTT_FLAG_DATA; } else if (unit->prof_state == PROFILE_PAUSE) { unit->prof_state = PROFILE_RUN; syslog(LOG_NOTICE, "ws: unit %s profile resume RUN", unit->alias); unit->mqtt_flag |= MQTT_FLAG_DATA; } } else if (! strcmp(cmd, (char *)"start")) { if (unit->prof_state == PROFILE_OFF) { unit->prof_state = PROFILE_RUN; unit->prof_started = time(NULL); unit->prof_paused = unit->prof_primary_done = 0; unit->prof_peak_abs = unit->prof_peak_rel = 0.0; syslog(LOG_NOTICE, "ws: unit %s profile start RUN", unit->alias); unit->mqtt_flag |= MQTT_FLAG_DATA; if (! unit->event_msg) unit->event_msg = xstrcpy((char *)"Profile start"); } } else if (! strcmp(cmd, (char *)"abort")) { if ((unit->prof_state == PROFILE_RUN) || (unit->prof_state == PROFILE_PAUSE)) { unit->prof_state = PROFILE_OFF; unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0; unit->prof_peak_abs = unit->prof_peak_rel = 0.0; syslog(LOG_NOTICE, "ws: unit %s profile ABORT", unit->alias); unit->mqtt_flag |= MQTT_FLAG_DATA; if (! unit->event_msg) unit->event_msg = xstrcpy((char *)"Profile abort"); } } else if (! strcmp(cmd, (char *)"done")) { if (unit->prof_state == PROFILE_DONE) { unit->prof_state = PROFILE_OFF; unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0; unit->prof_peak_abs = unit->prof_peak_rel = 0.0; syslog(LOG_NOTICE, "ws: unit %s profile OFF", unit->alias); unit->mqtt_flag |= MQTT_FLAG_DATA; } } free(cmd); cmd = NULL; } return; } return; } } syslog(LOG_NOTICE, "fermenter_ws_receive(%s)", buf); } void pong_ws_receive(char *buf) { struct json_object *val, *jobj; jobj = json_tokener_parse(buf); json_object_object_get_ex(jobj, "pong", &val); long ws_pongno = json_object_get_int(val); if (ws_pongno != ws_pingno) { syslog(LOG_NOTICE, "ws: ping/pong error %ld/%ld", ws_pingno, ws_pongno); } } static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; int n, m; char buf[WS_INBUF + 1]; switch (reason) { case LWS_CALLBACK_ESTABLISHED: { ws_clients++; pss->ringbuffer_tail = ringbuffer_head; pss->wsi = wsi; syslog(LOG_NOTICE, "Websocket: new client, now %d", ws_clients); break; } case LWS_CALLBACK_PROTOCOL_DESTROY: syslog(LOG_NOTICE, "Websocket: protocol cleaning up"); for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) if (ringbuffer[n].payload) free(ringbuffer[n].payload); break; case LWS_CALLBACK_SERVER_WRITEABLE: while (pss->ringbuffer_tail != ringbuffer_head) { m = ringbuffer[pss->ringbuffer_tail].len; n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); if (n < 0) { syslog(LOG_NOTICE, "ws: ERROR %d writing", n); return -1; } if (n < m) syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) pss->ringbuffer_tail = 0; else pss->ringbuffer_tail++; if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); if (lws_send_pipe_choked(wsi)) { lws_callback_on_writable(wsi); break; } } break; case LWS_CALLBACK_RECEIVE: memcpy(buf, in, len); buf[len] = '\0'; syslog(LOG_NOTICE, "ws: received %ld bytes %s", len, buf); /* * These are send by bmsapp to bmsd. Then bmsd resends these via MQTT. * Do we want to change that? Or use it for the new web pages. * {"node":"rpi01","group_id":"fermenters","control":"reboot"} * {"node":"rpi01","group_id":"fermenters","control":"rebirth"} */ if (strncmp(buf, (char *)"{\"type\":\"fermenter\",", 20) == 0) { fermenter_ws_receive(buf); } else if (strncmp(buf, (char *)"{\"type\":\"device\",", 17) == 0) { } else if (strncmp(buf, (char *)"{\"type\":\"global\",", 17) == 0) { #ifdef USE_SIMULATOR } else if (strncmp(buf, (char *)"{\"type\":\"simulator\",", 20) == 0) { #endif } else if (strncmp(buf, (char *)"{\"pong\":", 8) == 0) { pong_ws_receive(buf); } break; case LWS_CALLBACK_CLOSED: ws_clients--; syslog(LOG_NOTICE, "Websocket: del client, now %d", ws_clients); break; default: break; } return 0; } static struct lws_protocols protocols[] = { { "thermferm-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF }, { NULL, NULL, 0, 0 } /* terminator */ }; /* * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1} * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} */ void ws_broadcast(char *msg) { int len, err; err = pthread_mutex_lock(&ws_mutex); if (err) { syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); } else { len = strlen(msg); if (ringbuffer[ringbuffer_head].payload) free(ringbuffer[ringbuffer_head].payload); ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); ringbuffer[ringbuffer_head].len = len; memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0; else ringbuffer_head++; syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); lws_callback_on_writable_all_protocol(context, &protocols[0]); err = pthread_mutex_unlock(&ws_mutex); if (err) { syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); } last_msg = time(NULL); } } /* * Called every 45 seconds. */ void ws_check(void) { time_t now = time(NULL); char buf[64]; if (((int)now - (int)last_msg) > 45) { snprintf(buf, 63, "{\"ping\":%ld}", ++ws_pingno); ws_broadcast(buf); } } void *my_ws_loop(void *threadid) { struct lws_context_creation_info info; int n = 0; my_ws_state = 1; memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ info.port = Config.websocket_port; info.protocols = protocols; info.gid = -1; info.uid = -1; info.keepalive_timeout = 900; info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; context = lws_create_context(&info); if (context == NULL) { syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); my_ws_state = 0; return (void *)1; } syslog(LOG_NOTICE, "Websocket: server started port %d", info.port); /* * Loop forever until external shutdown variable is set. */ while (n >= 0 && ! my_ws_shutdown) { n = lws_service(context, 50); } lws_context_destroy(context); my_ws_state = 0; syslog(LOG_NOTICE, "Websocket: server stopped"); return (void *)0; }