thermferm/websocket.c

Sun, 05 May 2024 17:24:54 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Sun, 05 May 2024 17:24:54 +0200
changeset 730
6eba006ed8f5
parent 724
01e3936f62d4
permissions
-rw-r--r--

Much faster shutdown of the websocket service.

/**
 * @file websocket.c
 * @brief WebSockets interface
 * @author Michiel Broek <mbroek at mbse dot eu>
 *
 * Copyright (C) 2024
 *
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the mbsePi-apps
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * bms is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 */

#include "thermferm.h"
#include "xutil.h"
#include "devices.h"
#include "websocket.h"
#include <libwebsockets.h>


extern sys_config       Config;
extern int		debug;
extern const char	UNITMODE[5][8];
extern const char	UNITSTAGE[4][12];

int			my_ws_shutdown = 0;
int			my_ws_state = 0;
struct			lws_context *context;
int			ws_clients = 0;
long			ws_pingno = 0;
time_t			last_msg = 0;
pthread_mutex_t		ws_mutex;


/*
 * Based on lws-mirror-protocol
 */
#define MAX_MESSAGE_QUEUE 512

#define WS_INBUF	  2048


/*
 * one of these created for each message
 */
struct a_message {
    void		*payload; /* is malloc'd */
    size_t		len;
};


static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
static int ringbuffer_head;



void fermenter_ws_receive(char *buf)
{
    struct json_object	*val, *val2, *jobj, *profile;
    units_list		*unit;
    bool		changed;

    jobj = json_tokener_parse(buf);
    json_object_object_get_ex(jobj, "unit", &val);

    for (unit = Config.units ; unit; unit = unit->next) {
	if (strcmp((char *)json_object_get_string(val), unit->alias) == 0) {
	    /*
	     * Setpoints
	     * {"type":"fermenter","unit":"unit0","setpoint_low":20.3,"setpoint_high":20.7}
	     */
	    if ((unit->mode == UNITMODE_FRIDGE) || (unit->mode == UNITMODE_BEER)) {
		changed = false;
		if (json_object_object_get_ex(jobj, "setpoint_low", &val)) {
		    if (unit->PID_heat->SetP != json_object_get_double(val)) {
			changed = true;
		    	syslog(LOG_NOTICE, "ws: unit %s setpoint low from %.1f to %.1f", unit->alias, unit->PID_heat->SetP, json_object_get_double(val));
		    	unit->PID_heat->SetP = json_object_get_double(val);
		    }
		}
		if (json_object_object_get_ex(jobj, "setpoint_high", &val)) {
		    if (unit->PID_cool->SetP != json_object_get_double(val)) {
			changed = true;
		    	syslog(LOG_NOTICE, "ws: unit %s setpoint high from %.1f to %.1f", unit->alias, unit->PID_cool->SetP, json_object_get_double(val));
		    	unit->PID_cool->SetP = json_object_get_double(val);
		    }
		}
		if (changed) {
		    if (unit->mode == UNITMODE_FRIDGE) {
		    	unit->fridge_set_lo = unit->PID_heat->SetP;
		    	unit->fridge_set_hi = unit->PID_cool->SetP;
                    } else {
		    	unit->beer_set_lo = unit->PID_heat->SetP;
		    	unit->beer_set_hi = unit->PID_cool->SetP;
                    }
		    unit->mqtt_flag |= MQTT_FLAG_DATA;
		    return;
		}
	    }

	    /*
	     * Unit mode
	     * {"type":"fermenter","unit":"unit0","mode":"NONE"}
	     */
	    if (json_object_object_get_ex(jobj, "mode", &val)) {
		for (int i = 0; i < 5; i++) {
		    if (strcmp((char *)json_object_get_string(val), UNITMODE[i]) == 0) {
			if (unit->mode != i) {
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    /* Initialize log if the unit is turned on */
			    if ((unit->mode == UNITMODE_OFF) && (i != UNITMODE_OFF)) {
				unit->mqtt_flag |= MQTT_FLAG_BIRTH;
			    }
			    if (i == UNITMODE_PROFILE) {
				/* Do some checks and refuse profile mode cannot be set */
				if (unit->profile_uuid == NULL) {
				    syslog(LOG_NOTICE, "ws: unit %s refuse mode profile, not loaded", unit->alias);
				    break;
				}
			    }
			    syslog(LOG_NOTICE, "ws: unit %s mode to %s", unit->alias, UNITMODE[i]);
			    unit->mode = i;
			    if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg)
				unit->event_msg = xstrcpy((char *)UNITMODE[i]);
			    /* Allways turn everything off after a mode change */
			    unit->PID_cool->OutP = unit->PID_heat->OutP = 0.0;
			    unit->PID_cool->Mode = unit->PID_heat->Mode = PID_MODE_NONE;
			    unit->heater_state = unit->cooler_state = unit->fan_state = unit->light_state = unit->light_timer = 0;
			    unit->heater_wait = unit->cooler_wait = unit->fan_wait = unit->light_wait = 0;
			    device_out(unit->heater_address, unit->heater_state);
			    device_out(unit->cooler_address, unit->cooler_state);
			    device_out(unit->fan_address, unit->fan_state);
			    device_out(unit->light_address, unit->light_state);
			    if (unit->mode == UNITMODE_PROFILE) {
				/*
				 * Set a sane default until it will be overruled by the
				 * main processing loop.
				 */
				unit->prof_target_lo = unit->profile_inittemp_lo;
				unit->prof_target_hi = unit->profile_inittemp_hi;;
				unit->prof_fridge_mode = 0;
				unit->prof_state = PROFILE_OFF;
				unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
				unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    }
			}
			return;
		    }
		}
	    }

	    /*
	     * Unit stage
	     * {"type":"fermenter","unit":"unit0","stage":"SECONDARY"}
	     */
	    if (json_object_object_get_ex(jobj, "stage", &val)) {
		for (int i = 0; i < 4; i++) {
		    if (strcmp((char *)json_object_get_string(val), UNITSTAGE[i]) == 0) {
			if (unit->stage != i) {
			    syslog(LOG_NOTICE, "ws: unit %s: stage to %s", unit->alias, UNITSTAGE[i]);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    unit->stage = i;
			    if ((unit->mode != UNITMODE_OFF) && ! unit->event_msg)
				unit->event_msg = xstrcpy((char *)UNITSTAGE[i]);
			}
			return;
		    }
		}
	    }

	    /*
	     * Unit heater and cooler switch
	     * {"type":"fermenter","unit":"unit0","heater_state":100,"cooler_state":0}
	     */
	    if ((json_object_object_get_ex(jobj, "heater_state", &val)) &&
		(json_object_object_get_ex(jobj, "cooler_state", &val2)) &&
		(unit->mode == UNITMODE_NONE)) {
		if (json_object_get_int(val) != unit->heater_state)
		    unit->heater_state = json_object_get_int(val);
		if (json_object_get_int(val2) != unit->cooler_state)
		    unit->cooler_state = json_object_get_int(val2);
		if (unit->heater_state && unit->cooler_state)
		    unit->heater_state = unit->cooler_state = 0;	// Safety
		unit->mqtt_flag |= MQTT_FLAG_DATA;
		syslog(LOG_NOTICE, "ws: unit %s heater_state to %d, cooler_state to %d", unit->alias, unit->heater_state, unit->cooler_state);
		return;
	    }

	    /*
             * Unit fan switch
	     * {"type":"fermenter","unit":"unit0","fan_state":0}
             */
	    if ((json_object_object_get_ex(jobj, "fan_state", &val)) && (unit->mode == UNITMODE_NONE)) {
                if (json_object_get_int(val) != unit->fan_state)
                    unit->fan_state = json_object_get_int(val);
                unit->mqtt_flag |= MQTT_FLAG_DATA;
                syslog(LOG_NOTICE, "ws: unit %s fan_state to %d", unit->alias, unit->fan_state);
                return;
            }

	    /*
	     * We don't implement "light", "product", "profile" download here.
	     * But "profile" commands are implemented. That means a profile
	     * must be installed by bmsd via mqtt.
	     * {"type":"fermenter","unit":"unit0","profile":{"command":"pause"}}
	     * off pause start abort done
	     */
	    if ((json_object_object_get_ex(jobj, "profile", &profile)) && (unit->mode == UNITMODE_PROFILE)) {
		if (json_object_object_get_ex(profile, "command", &val)) {
		    char *cmd = xstrcpy((char *)json_object_get_string(val));
		    syslog(LOG_NOTICE, "ws: profile command `%s'", cmd);
		    if ((! strcmp(cmd, (char *)"off")) && (unit->prof_state == PROFILE_DONE)) {
			unit->prof_state = PROFILE_OFF;
			syslog(LOG_NOTICE, "ws: unit %s profile to OFF", unit->alias);
			unit->mqtt_flag |= MQTT_FLAG_DATA;
		    } else if (! strcmp(cmd, (char *)"pause")) {
			if (unit->prof_state == PROFILE_RUN) {
			    unit->prof_state = PROFILE_PAUSE;
			    syslog(LOG_NOTICE, "ws: unit %s profile to PAUSE", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			} else if (unit->prof_state == PROFILE_PAUSE) {
			    unit->prof_state = PROFILE_RUN;
			    syslog(LOG_NOTICE, "ws: unit %s profile resume RUN", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			}
		    } else if (! strcmp(cmd, (char *)"start")) {
			if (unit->prof_state == PROFILE_OFF) {
			    unit->prof_state = PROFILE_RUN;
			    unit->prof_started = time(NULL);
			    unit->prof_paused = unit->prof_primary_done = 0;
			    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    syslog(LOG_NOTICE, "ws: unit %s profile start RUN", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    if (! unit->event_msg)
				unit->event_msg = xstrcpy((char *)"Profile start");
			}
		    } else if (! strcmp(cmd, (char *)"abort")) {
			if ((unit->prof_state == PROFILE_RUN) || (unit->prof_state == PROFILE_PAUSE)) {
			    unit->prof_state = PROFILE_OFF;
			    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
			    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    syslog(LOG_NOTICE, "ws: unit %s profile ABORT", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			    if (! unit->event_msg)
				unit->event_msg = xstrcpy((char *)"Profile abort");
			}
		    } else if (! strcmp(cmd, (char *)"done")) {
			if (unit->prof_state == PROFILE_DONE) {
			    unit->prof_state = PROFILE_OFF;
			    unit->prof_started = unit->prof_paused = unit->prof_primary_done = 0;
			    unit->prof_peak_abs = unit->prof_peak_rel = 0.0;
			    syslog(LOG_NOTICE, "ws: unit %s profile OFF", unit->alias);
			    unit->mqtt_flag |= MQTT_FLAG_DATA;
			}
		    }
		    free(cmd);
		    cmd = NULL;
		}
		return;
	    }

	    return;
	}
    }
    syslog(LOG_NOTICE, "fermenter_ws_receive(%s)", buf);
}


void pong_ws_receive(char *buf)
{
    struct json_object  *val, *jobj;

    jobj = json_tokener_parse(buf);
    json_object_object_get_ex(jobj, "pong", &val);
    long ws_pongno  = json_object_get_int(val);
    if (ws_pongno != ws_pingno) {
	syslog(LOG_NOTICE, "ws: ping/pong error %ld/%ld", ws_pingno, ws_pongno);
    }
}


static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
    struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
    int		n, m;
    char	buf[WS_INBUF + 1];

    switch (reason) {

	case LWS_CALLBACK_ESTABLISHED: {
		ws_clients++;
		pss->ringbuffer_tail = ringbuffer_head;
                pss->wsi = wsi;
		syslog(LOG_NOTICE, "Websocket: new client, now %d", ws_clients);
		break;
	}

	case LWS_CALLBACK_PROTOCOL_DESTROY:
                for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++)
                    if (ringbuffer[n].payload)
                	free(ringbuffer[n].payload);
                break;

        case LWS_CALLBACK_SERVER_WRITEABLE:
                while (pss->ringbuffer_tail != ringbuffer_head) {
                    m = ringbuffer[pss->ringbuffer_tail].len;
                    n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT);
                    if (n < 0) {
                        syslog(LOG_NOTICE, "ws: ERROR %d writing", n);
                        return -1;
                    }
                    if (n < m)
                        syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m);

                    if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
                        pss->ringbuffer_tail = 0;
                    else
                        pss->ringbuffer_tail++;

                    if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
                        lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));

                    if (lws_send_pipe_choked(wsi)) {
                        lws_callback_on_writable(wsi);
                        break;
                    }
                }
                break;

	case LWS_CALLBACK_RECEIVE:

		memcpy(buf, in, len);
		buf[len] = '\0';
		syslog(LOG_NOTICE, "ws: received %ld bytes %s", (long)len, buf);
		/*
		 * These are send by bmsapp to bmsd. Then bmsd resends these via MQTT.
		 * Do we want to change that? Or use it for the new web pages.
		 * {"node":"rpi01","group_id":"fermenters","control":"reboot"}
		 * {"node":"rpi01","group_id":"fermenters","control":"rebirth"}
		 */
		if (strncmp(buf, (char *)"{\"type\":\"fermenter\",", 20) == 0) {
		    fermenter_ws_receive(buf);
		} else if (strncmp(buf, (char *)"{\"type\":\"device\",", 17) == 0) {

		} else if (strncmp(buf, (char *)"{\"type\":\"global\",", 17) == 0) {

#ifdef USE_SIMULATOR
		} else if (strncmp(buf, (char *)"{\"type\":\"simulator\",", 20) == 0) {

#endif
		} else if (strncmp(buf, (char *)"{\"pong\":", 8) == 0) {
		    pong_ws_receive(buf);
		}

                break;

	case LWS_CALLBACK_CLOSED:
		ws_clients--;
		syslog(LOG_NOTICE, "Websocket: del client, now %d", ws_clients);
		break;

	default:
		break;
  }

  return 0;
}



static struct lws_protocols protocols[] = {
	{ "thermferm-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF },
        { NULL, NULL, 0, 0 } /* terminator */
};


/*
 * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1}
 * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"}
 */
void ws_broadcast(char *msg)
{
    int         len, err;

    err = pthread_mutex_lock(&ws_mutex);
    if (err) {
	syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err);
    } else {

    	len = strlen(msg);
    	if (ringbuffer[ringbuffer_head].payload)
            free(ringbuffer[ringbuffer_head].payload);

    	ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
    	ringbuffer[ringbuffer_head].len = len;
    	memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len);
    	if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
            ringbuffer_head = 0;
    	else
            ringbuffer_head++;

//    	syslog(LOG_NOTICE, "ws: broadcast buffer=%d  len=%d", ringbuffer_head, len);

    	lws_callback_on_writable_all_protocol(context, &protocols[0]);
    	err = pthread_mutex_unlock(&ws_mutex);
    	if (err) {
            syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err);
    	}
    	last_msg = time(NULL);
    }
}



/*
 * Called every 45 seconds.
 */
void ws_check(void)
{
    time_t	now = time(NULL);
    char	buf[64];

    if (((int)now - (int)last_msg) > 45) {
	snprintf(buf, 63, "{\"ping\":%ld}", ++ws_pingno);
	ws_broadcast(buf);
    }
}


/*
 * Fast way to stop the websocket service.
 */
void my_ws_stop(void)
{
    my_ws_shutdown = 1;
    lws_cancel_service(context);
}


void *my_ws_loop(void *threadid)
{
    struct	lws_context_creation_info info;
    int		n = 0;
    pid_t	pid = gettid();

    my_ws_state = 1;
    memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
    info.port = Config.websocket_port;
    info.protocols = protocols;
    info.gid = -1;
    info.uid = -1;
    info.keepalive_timeout = 900;
    info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;

    context = lws_create_context(&info);

    if (context == NULL) {
	syslog(LOG_NOTICE, "libwebsocket_create_context() failed");
	my_ws_state = 0;
	return (void *)1;
    }
    syslog(LOG_NOTICE, "Thread my_ws_loop started port %d, pid=%d", info.port, pid);

    /*
     * Loop forever until external shutdown variable is set.
     */
    while (n >= 0 && ! my_ws_shutdown) {

	n = lws_service(context, 50);
    }
    lws_context_destroy(context);

    my_ws_state = 0;
    syslog(LOG_NOTICE, "Thread my_ws_loop stopped");
    return (void *)0;
}

mercurial