bmsd/websocket.c

Mon, 11 May 2020 17:32:08 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Mon, 11 May 2020 17:32:08 +0200
changeset 671
4b54d6f79d25
child 672
23f959713fcb
permissions
-rw-r--r--

Version 0.3.33 Added websockets framework. Added fermenter status messages to the websockets broadcast.

/**
 * @file websocket.c
 * @brief WebSockets interface
 * @author Michiel Broek <mbroek at mbse dot eu>
 *
 * Copyright (C) 2020
 *
 * Michiel Broek <mbroek at mbse dot eu>
 *
 * This file is part of the bms (Brewery Management System)
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * bms is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 */

#include "bms.h"
#include "xutil.h"
#include "websocket.h"
#include <libwebsockets.h>


extern sys_config       Config;
extern int		debug;
extern int		my_shutdown;

struct			lws_context *context;
int			ws_clients = 0;
time_t			last_msg = 0;
pthread_mutex_t		ws_mutex;


/*
 * Based on lws-mirror-protocol from libwebsockets v2.0.x
 * Debian ships v2.0.3, on Slackware we have 2.4.0 and there
 * are lots of changes in the api.
 */
#define MAX_MESSAGE_QUEUE 512

/*
 * one of these created for each message
 */
struct a_message {
    void		*payload; /* is malloc'd */
    size_t		len;
};


static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
static int ringbuffer_head;



static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
    struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
    int n, m;

    switch (reason) {

	case LWS_CALLBACK_ESTABLISHED: {
		ws_clients++;
		syslog(LOG_NOTICE, "ws: new connection, total %d", ws_clients);
		pss->ringbuffer_tail = ringbuffer_head;
                pss->wsi = wsi;
		break;
	}

	case LWS_CALLBACK_PROTOCOL_DESTROY:
		syslog(LOG_NOTICE, "ws: protocol cleaning up");
                for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++)
                    if (ringbuffer[n].payload)
                	free(ringbuffer[n].payload);
                break;

        case LWS_CALLBACK_SERVER_WRITEABLE:
                while (pss->ringbuffer_tail != ringbuffer_head) {
                    m = ringbuffer[pss->ringbuffer_tail].len;
                    n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT);
                    if (n < 0) {
                        syslog(LOG_NOTICE, "ws: ERROR %d writing", n);
                        return -1;
                    }
                    if (n < m)
                        syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m);

                    if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
                        pss->ringbuffer_tail = 0;
                    else
                        pss->ringbuffer_tail++;

                    if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
                        lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));

                    if (lws_send_pipe_choked(wsi)) {
                        lws_callback_on_writable(wsi);
                        break;
                    }
                }
                break;

	case LWS_CALLBACK_RECEIVE:
                if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
                    syslog(LOG_NOTICE, "ws: dropping!");
                    goto choke;
                }

                if (ringbuffer[ringbuffer_head].payload)
                    free(ringbuffer[ringbuffer_head].payload);

                ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
                ringbuffer[ringbuffer_head].len = len;
                memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, in, len);
                if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
                    ringbuffer_head = 0;
                else
                    ringbuffer_head++;

                if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
                    goto done;

choke:
                syslog(LOG_NOTICE, "ws: LWS_CALLBACK_RECEIVE: throttling");
                lws_rx_flow_control(wsi, 0);

done:
                lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
                break;

	case LWS_CALLBACK_CLOSED:
		ws_clients--;
		syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients);
		break;

	default:
		break;
  }

  return 0;
}



static struct lws_protocols protocols[] = {
	{ "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 128 },
        { NULL, NULL, 0, 0 } /* terminator */
};


/*
 *  {"device":"fermenter","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"}
 */
void ws_broadcast(char *msg)
{
    int         len, err;

    err = pthread_mutex_lock(&ws_mutex);
    if (err) {
	syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err);
    } else {

    	len = strlen(msg);
    	if (ringbuffer[ringbuffer_head].payload)
            free(ringbuffer[ringbuffer_head].payload);

    	ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
    	ringbuffer[ringbuffer_head].len = len;
    	memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len);
    	if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
            ringbuffer_head = 0;
    	else
            ringbuffer_head++;

//    syslog(LOG_NOTICE, "ws: %d %s", ringbuffer_head, msg);
    	syslog(LOG_NOTICE, "ws: broadcast buffer=%d  len=%d", ringbuffer_head, len);

    	lws_callback_on_writable_all_protocol(context, &protocols[0]);
    	err = pthread_mutex_unlock(&ws_mutex);
    	if (err) {
            syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err);
    	}
    	last_msg = time(NULL);
    }
}



/*
 * Called every 5 seconds.
 */
void ws_check(void)
{
    time_t	now = time(NULL);

    if (((int)now - (int)last_msg) > 45) {
	ws_broadcast((char *)"{\"ping\":1}");
    }
}



void *ws_loop(void *threadid)
{
    struct	lws_context_creation_info info;
    int		n = 0;

    syslog(LOG_NOTICE, "Thread ws_loop started");

    memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
    info.port = 8010;
    info.protocols = protocols;
    info.gid = -1;
    info.uid = -1;
    info.keepalive_timeout = 900;
    info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;

    context = lws_create_context(&info);

    if (context == NULL) {
	syslog(LOG_NOTICE, "libwebsocket_create_context() failed");
    }

    /*
     * Loop forever until external shutdown variable is set.
     */
    while (n >= 0 && ! my_shutdown) {

	n = lws_service(context, 50);
    }
    lws_context_destroy(context);

    syslog(LOG_NOTICE, "Thread ws_loop stopped");
    return 0;
}

mercurial