thermferm/websocket.c

Thu, 02 May 2024 15:49:16 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Thu, 02 May 2024 15:49:16 +0200
changeset 716
5c30c8ef83a8
parent 711
844588d0df65
child 724
01e3936f62d4
permissions
-rw-r--r--

Version 0.9.19b3. The simulator thread can be paused to be able to add and delete simulators. Added simulated door and PSU status. Devices can now fully use multiple simulators. Better rounding of simulated temperature values. The server SIMULATOR DEL and ADD commands pause the simulator when the linked list is manipulated. Fixed SIGSEGV when a simulator is added. Added socket SO_REUSEADDR again to the server socket.

/**
 * @file websocket.c
 * @brief WebSockets interface
 * @author Michiel Broek <mbroek at mbse dot eu>
 *
 * Copyright (C) 2024
 *
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the mbsePi-apps
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * bms is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 */

#include "thermferm.h"
#include "xutil.h"
#include "devices.h"
#include "websocket.h"
#include <libwebsockets.h>


extern sys_config       Config;
extern int		debug;
extern const char	UNITMODE[5][8];
extern const char	UNITSTAGE[4][12];

int			my_ws_shutdown = 0;
int			my_ws_state = 0;
struct			lws_context *context;
int			ws_clients = 0;
long			ws_pingno = 0;
time_t			last_msg = 0;
pthread_mutex_t		ws_mutex;


/*
 * Based on lws-mirror-protocol
 */
#define MAX_MESSAGE_QUEUE 512

#define WS_INBUF	  2048


/*
 * one of these created for each message
 */
struct a_message {
    void		*payload; /* is malloc'd */
    size_t		len;
};


static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
static int ringbuffer_head;



void fermenter_ws_receive(char *buf)
{
    struct json_object	*val, *val2, *jobj, *profile;
    units_list		*unit;
    bool		changed;

    jobj = json_tokener_parse(buf);
    json_object_object_get_ex(jobj, "unit", &val);

    for (unit = Config.units ; unit; unit = unit->next) {
	if (strcmp((char *)json_object_get_string(val), unit->alias) == 0) {
	    /*
	     * Setpoints
	     * {"type":"fermenter","unit":"unit0","setpoint_low":20.3,"setpoint_high":20.7}
	     */
	    if ((unit->mode == UNITMODE_FRIDGE) || (unit->mode == UNITMODE_BEER)) {
		changed = false;
		if (json_object_object_get_ex(jobj, "setpoint_low", &val)) {
		    if (unit->PID_heat->SetP != json_object_get_double(val)) {
			changed = true;
		    	syslog(LOG_NOTICE, "ws: unit %s setpoint low from %.1f to %.1f", unit->alias, unit->PID_heat->SetP, json_object_get_double(val));
		    	unit->PID_heat->SetP = json_object_get_double(val);
		    }
		}
		if (json_object_object_get_ex(jobj, "setpoint_high", &val)) {
		    if (unit->PID_cool->SetP != json_object_get_double(val)) {
			changed = true;
		    	syslog(LOG_NOTICE, "ws: unit %s setpoint high from %.1f to %.1f", unit->alias, unit->PID_cool->SetP, json_object_get_double(val));
		    	unit->PID_cool->SetP = json_object_get_double(val);
		    }
		}
		if (changed) {
		    if (unit->mode == UNITMODE_FRIDGE) {
		    	unit->fridge_set_lo = unit->PID_heat->SetP;
		    	unit->fridge_set_hi = unit->PID_cool->SetP;
                    } else {
		    	unit->beer_set_lo = unit->PID_heat->SetP;
		    	unit->beer_set_hi = unit->PID_cool->SetP;
                    }
		    unit->mqtt_flag |= MQTT_FLAG_DATA;
		    return;
		}
	    }

	    /*
	     * Unit mode
	     * {"type":"fermenter","unit":"unit0","mode":"NONE"}
	     */
	    if (json_object_object_get_ex(jobj, "mode", &val)) {
		for (int i = 0; i < 5; i++) {
		    if (strcmp((char *)json_object_get_string(val), UNITMODE[i]) == 0) {
			if (unit->mode != i) {
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    /* Initialize log if the unit is turned on */
			    if ((unit->mode == UNITMODE_OFF) && (i != UNITMODE_OFF)) {
				unit->mqtt_flag |= MQTT_FLAG_BIRTH;
			    }
			    if (i == UNITMODE_PROFILE) {
				/* Do some checks and refuse profile mode cannot be set */
				if (unit->profile_uuid == NULL) {
				    syslog(LOG_NOTICE, "ws: unit %s refuse mode profile, not loaded", unit->alias);
				    break;
				}
			    }
			    syslog(LOG_NOTICE, "ws: unit %s mode to %s", unit->alias, UNITMODE[i]);
			    unit->mode = i;
			    if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg)
				unit->event_msg = xstrcpy((char *)UNITMODE[i]);
			    /* Allways turn everything off after a mode change */
			    unit->PID_cool->OutP = unit->PID_heat->OutP = 0.0;
			    unit->PID_cool->Mode = unit->PID_heat->Mode = PID_MODE_NONE;
			    unit->heater_state = unit->cooler_state = unit->fan_state = unit->light_state = unit->light_timer = 0;
			    unit->heater_wait = unit->cooler_wait = unit->fan_wait = unit->light_wait = 0;
			    device_out(unit->heater_address, unit->heater_state);
			    device_out(unit->cooler_address, unit->cooler_state);
			    device_out(unit->fan_address, unit->fan_state);
			    device_out(unit->light_address, unit->light_state);
			    if (unit->mode == UNITMODE_PROFILE) {
				/*
				 * Set a sane default until it will be overruled by the
				 * main processing loop.
				 */
				unit->prof_target_lo = unit->profile_inittemp_lo;
				unit->prof_target_hi = unit->profile_inittemp_hi;;
				unit->prof_fridge_mode = 0;
				unit->prof_state = PROFILE_OFF;
				unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
				unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    }
			}
			return;
		    }
		}
	    }

	    /*
	     * Unit stage
	     * {"type":"fermenter","unit":"unit0","stage":"SECONDARY"}
	     */
	    if (json_object_object_get_ex(jobj, "stage", &val)) {
		for (int i = 0; i < 4; i++) {
		    if (strcmp((char *)json_object_get_string(val), UNITSTAGE[i]) == 0) {
			if (unit->stage != i) {
			    syslog(LOG_NOTICE, "ws: unit %s: stage to %s", unit->alias, UNITSTAGE[i]);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    unit->stage = i;
			    if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg)
				unit->event_msg = xstrcpy((char *)UNITSTAGE[i]);
			}
			return;
		    }
		}
	    }

	    /*
	     * Unit heater and cooler switch
	     * {"type":"fermenter","unit":"unit0","heater_state":100,"cooler_state":0}
	     */
	    if ((json_object_object_get_ex(jobj, "heater_state", &val)) &&
		(json_object_object_get_ex(jobj, "cooler_state", &val2)) &&
		(unit->mode == UNITMODE_NONE)) {
		if (json_object_get_int(val) != unit->heater_state)
		    unit->heater_state = json_object_get_int(val);
		if (json_object_get_int(val2) != unit->cooler_state)
		    unit->cooler_state = json_object_get_int(val2);
		if (unit->heater_state && unit->cooler_state)
		    unit->heater_state = unit->cooler_state = 0;	// Safety
		unit->mqtt_flag |= MQTT_FLAG_DATA;
		syslog(LOG_NOTICE, "ws: unit %s heater_state to %d, cooler_state to %d", unit->alias, unit->heater_state, unit->cooler_state);
		return;
	    }

	    /*
             * Unit fan switch
	     * {"type":"fermenter","unit":"unit0","fan_state":0}
             */
	    if ((json_object_object_get_ex(jobj, "fan_state", &val)) && (unit->mode == UNITMODE_NONE)) {
                if (json_object_get_int(val) != unit->fan_state)
                    unit->fan_state = json_object_get_int(val);
                unit->mqtt_flag |= MQTT_FLAG_DATA;
                syslog(LOG_NOTICE, "ws: unit %s fan_state to %d", unit->alias, unit->fan_state);
                return;
            }

	    /*
	     * We don't implement "light", "product", "profile" download here.
	     * But "profile" commands are implemented. That means a profile
	     * must be installed by bmsd via mqtt.
	     * {"type":"fermenter","unit":"unit0","profile":{"command":"pause"}}
	     * off pause start abort done
	     */
	    if ((json_object_object_get_ex(jobj, "profile", &profile)) && (unit->mode == UNITMODE_PROFILE)) {
		if (json_object_object_get_ex(profile, "command", &val)) {
		    char *cmd = xstrcpy((char *)json_object_get_string(val));
		    syslog(LOG_NOTICE, "ws: profile command `%s'", cmd);
		    if ((! strcmp(cmd, (char *)"off")) && (unit->prof_state == PROFILE_DONE)) {
			unit->prof_state = PROFILE_OFF;
			syslog(LOG_NOTICE, "ws: unit %s profile to OFF", unit->alias);
			unit->mqtt_flag |= MQTT_FLAG_DATA;
		    } else if (! strcmp(cmd, (char *)"pause")) {
			if (unit->prof_state == PROFILE_RUN) {
			    unit->prof_state = PROFILE_PAUSE;
			    syslog(LOG_NOTICE, "ws: unit %s profile to PAUSE", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			} else if (unit->prof_state == PROFILE_PAUSE) {
			    unit->prof_state = PROFILE_RUN;
			    syslog(LOG_NOTICE, "ws: unit %s profile resume RUN", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			}
		    } else if (! strcmp(cmd, (char *)"start")) {
			if (unit->prof_state == PROFILE_OFF) {
			    unit->prof_state = PROFILE_RUN;
			    unit->prof_started = time(NULL);
			    unit->prof_paused = unit->prof_primary_done = 0;
			    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    syslog(LOG_NOTICE, "ws: unit %s profile start RUN", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    if (! unit->event_msg)
				unit->event_msg = xstrcpy((char *)"Profile start");
			}
		    } else if (! strcmp(cmd, (char *)"abort")) {
			if ((unit->prof_state == PROFILE_RUN) || (unit->prof_state == PROFILE_PAUSE)) {
			    unit->prof_state = PROFILE_OFF;
			    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
			    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    syslog(LOG_NOTICE, "ws: unit %s profile ABORT", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    if (! unit->event_msg)
				unit->event_msg = xstrcpy((char *)"Profile abort");
			}
		    } else if (! strcmp(cmd, (char *)"done")) {
			if (unit->prof_state == PROFILE_DONE) {
			    unit->prof_state = PROFILE_OFF;
			    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
			    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    syslog(LOG_NOTICE, "ws: unit %s profile OFF", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			}
		    }
		    free(cmd);
		    cmd = NULL;
		}
		return;
	    }

	    return;
	}
    }
    syslog(LOG_NOTICE, "fermenter_ws_receive(%s)", buf);
}


void pong_ws_receive(char *buf)
{
    struct json_object  *val, *jobj;

    jobj = json_tokener_parse(buf);
    json_object_object_get_ex(jobj, "pong", &val);
    long ws_pongno  = json_object_get_int(val);
    if (ws_pongno != ws_pingno) {
	syslog(LOG_NOTICE, "ws: ping/pong error %ld/%ld", ws_pingno, ws_pongno);
    }
}


static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
    struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
    int		n, m;
    char	buf[WS_INBUF + 1];

    switch (reason) {

	case LWS_CALLBACK_ESTABLISHED: {
		ws_clients++;
		pss->ringbuffer_tail = ringbuffer_head;
                pss->wsi = wsi;
		syslog(LOG_NOTICE, "Websocket: new client, now %d", ws_clients);
		break;
	}

	case LWS_CALLBACK_PROTOCOL_DESTROY:
		syslog(LOG_NOTICE, "Websocket: protocol cleaning up");
                for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++)
                    if (ringbuffer[n].payload)
                	free(ringbuffer[n].payload);
                break;

        case LWS_CALLBACK_SERVER_WRITEABLE:
                while (pss->ringbuffer_tail != ringbuffer_head) {
                    m = ringbuffer[pss->ringbuffer_tail].len;
                    n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT);
                    if (n < 0) {
                        syslog(LOG_NOTICE, "ws: ERROR %d writing", n);
                        return -1;
                    }
                    if (n < m)
                        syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m);

                    if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
                        pss->ringbuffer_tail = 0;
                    else
                        pss->ringbuffer_tail++;

                    if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
                        lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));

                    if (lws_send_pipe_choked(wsi)) {
                        lws_callback_on_writable(wsi);
                        break;
                    }
                }
                break;

	case LWS_CALLBACK_RECEIVE:

		memcpy(buf, in, len);
		buf[len] = '\0';
		syslog(LOG_NOTICE, "ws: received %ld bytes %s", (long)len, buf);
		/*
		 * These are send by bmsapp to bmsd. Then bmsd resends these via MQTT.
		 * Do we want to change that? Or use it for the new web pages.
		 * {"node":"rpi01","group_id":"fermenters","control":"reboot"}
		 * {"node":"rpi01","group_id":"fermenters","control":"rebirth"}
		 */
		if (strncmp(buf, (char *)"{\"type\":\"fermenter\",", 20) == 0) {
		    fermenter_ws_receive(buf);
		} else if (strncmp(buf, (char *)"{\"type\":\"device\",", 17) == 0) {

		} else if (strncmp(buf, (char *)"{\"type\":\"global\",", 17) == 0) {

#ifdef USE_SIMULATOR
		} else if (strncmp(buf, (char *)"{\"type\":\"simulator\",", 20) == 0) {

#endif
		} else if (strncmp(buf, (char *)"{\"pong\":", 8) == 0) {
		    pong_ws_receive(buf);
		}

                break;

	case LWS_CALLBACK_CLOSED:
		ws_clients--;
		syslog(LOG_NOTICE, "Websocket: del client, now %d", ws_clients);
		break;

	default:
		break;
  }

  return 0;
}



static struct lws_protocols protocols[] = {
	{ "thermferm-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF },
        { NULL, NULL, 0, 0 } /* terminator */
};


/*
 * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1}
 * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"}
 */
void ws_broadcast(char *msg)
{
    int         len, err;

    err = pthread_mutex_lock(&ws_mutex);
    if (err) {
	syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err);
    } else {

    	len = strlen(msg);
    	if (ringbuffer[ringbuffer_head].payload)
            free(ringbuffer[ringbuffer_head].payload);

    	ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
    	ringbuffer[ringbuffer_head].len = len;
    	memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len);
    	if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
            ringbuffer_head = 0;
    	else
            ringbuffer_head++;

//    	syslog(LOG_NOTICE, "ws: broadcast buffer=%d  len=%d", ringbuffer_head, len);

    	lws_callback_on_writable_all_protocol(context, &protocols[0]);
    	err = pthread_mutex_unlock(&ws_mutex);
    	if (err) {
            syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err);
    	}
    	last_msg = time(NULL);
    }
}



/*
 * Called every 45 seconds.
 */
void ws_check(void)
{
    time_t	now = time(NULL);
    char	buf[64];

    if (((int)now - (int)last_msg) > 45) {
	snprintf(buf, 63, "{\"ping\":%ld}", ++ws_pingno);
	ws_broadcast(buf);
    }
}



void *my_ws_loop(void *threadid)
{
    struct	lws_context_creation_info info;
    int		n = 0;

    my_ws_state = 1;
    memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
    info.port = Config.websocket_port;
    info.protocols = protocols;
    info.gid = -1;
    info.uid = -1;
    info.keepalive_timeout = 900;
    info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;

    context = lws_create_context(&info);

    if (context == NULL) {
	syslog(LOG_NOTICE, "libwebsocket_create_context() failed");
	my_ws_state = 0;
	return (void *)1;
    }
    syslog(LOG_NOTICE, "Websocket: server started port %d", info.port);

    /*
     * Loop forever until external shutdown variable is set.
     */
    while (n >= 0 && ! my_ws_shutdown) {

	n = lws_service(context, 50);
    }
    lws_context_destroy(context);

    my_ws_state = 0;
    syslog(LOG_NOTICE, "Websocket: server stopped");
    return (void *)0;
}

mercurial