components/websocket/websocket_server.c

Thu, 29 Jul 2021 22:36:17 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Thu, 29 Jul 2021 22:36:17 +0200
changeset 114
1413c4c5cd8c
parent 55
38e1c91bfd88
permissions
-rw-r--r--

Fixed Brewfather beerxml import.

/**
 * @file websocket_server.c
 * @brief Websocket server functions.
 * @author Blake Felt - blake.w.felt@gmail.com`
 */

#include "websocket_server.h"
#include "freertos/FreeRTOS.h"
#include "freertos/semphr.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "lwip/tcp.h"
#include <string.h>

static SemaphoreHandle_t xwebsocket_mutex; // to lock the client array
static QueueHandle_t xwebsocket_queue; // to hold the clients that send messages
static ws_client_t clients[WEBSOCKET_SERVER_MAX_CLIENTS]; // holds list of clients
static TaskHandle_t xtask; // the task itself

/**
 * @brief Add a connection to the callback queue.
 * @param conn The network connection.
 * @param evt The event.
 * @param len Not used.
 */
static void background_callback(struct netconn* conn, enum netconn_evt evt,u16_t len) {
  switch(evt) {
    case NETCONN_EVT_RCVPLUS:
      xQueueSendToBack(xwebsocket_queue,&conn,WEBSOCKET_SERVER_QUEUE_TIMEOUT);
      break;
    default:
      break;
  }
}

/**
 * @brief Handle client read.
 * @param num The client connection number.
 */
static void handle_read(uint8_t num) 
{
    ws_header_t header;
    char* msg;

    header.received = 0;
    msg = ws_read(&clients[num],&header);

    if (!header.received) {
    	if (msg) 
	    free(msg);
    	return;
    }

    switch(clients[num].last_opcode) {
    	case WEBSOCKET_OPCODE_CONT:
      		break;
    	case WEBSOCKET_OPCODE_BIN:
      		clients[num].scallback(num,WEBSOCKET_BIN,msg,header.length);
      		break;
    	case WEBSOCKET_OPCODE_TEXT:
      		clients[num].scallback(num,WEBSOCKET_TEXT,msg,header.length);
      		break;
    	case WEBSOCKET_OPCODE_PING:
      		ws_send(&clients[num],WEBSOCKET_OPCODE_PONG,msg,header.length,0);
      		clients[num].scallback(num,WEBSOCKET_PING,msg,header.length);
      		break;
    	case WEBSOCKET_OPCODE_PONG:
      		if (clients[num].ping) {
        	    clients[num].scallback(num,WEBSOCKET_PONG,NULL,0);
        	    clients[num].ping = 0;
      		}
      		break;
    	case WEBSOCKET_OPCODE_CLOSE:
      		clients[num].scallback(num,WEBSOCKET_DISCONNECT_EXTERNAL,NULL,0);
      		ws_disconnect_client(&clients[num]);
      		break;
    	default:
      		break;
    }

    if(msg) 
	free(msg);
}

/**
 * @brief The webserver task.
 */
static void ws_server_task(void* pvParameters) 
{
    struct netconn* conn;

    xwebsocket_mutex = xSemaphoreCreateMutex();
    xwebsocket_queue = xQueueCreate(WEBSOCKET_SERVER_QUEUE_SIZE, sizeof(struct netconn*));

    // initialize all clients
    for (int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
    	clients[i].conn = NULL;
	clients[i].connected = false;
    	clients[i].url  = NULL;
    	clients[i].ping = 0;
    	clients[i].last_opcode = 0;
    	clients[i].contin = NULL;
    	clients[i].len = 0;
    	clients[i].ccallback = NULL;
    	clients[i].scallback = NULL;
    }

    for(;;) {
    	xQueueReceive(xwebsocket_queue, &conn, portMAX_DELAY);
    	if(!conn) continue; // if the connection was NULL, ignore it

    	if (xSemaphoreTake(xwebsocket_mutex, 25) == pdTRUE) { // take access
    	    for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
      	    	if(clients[i].conn == conn) {
        	    handle_read(i);
        	    break;
      	    	}
    	    }
    	    xSemaphoreGive(xwebsocket_mutex); // return access
    	}
    }
    vTaskDelete(NULL);
}



int ws_server_start() 
{
    if(xtask)
	return 0;
  
    xTaskCreate(&ws_server_task, "ws_server_task", WEBSOCKET_SERVER_TASK_STACK_DEPTH, NULL, WEBSOCKET_SERVER_TASK_PRIORITY, &xtask);
    return 1;
}



int ws_server_stop() {
    if(!xtask) 
	return 0;
  
    vTaskDelete(xtask);
    return 1;
}



/**
 * @brief Prepare a response for a new websocket connection.
 * @param buf The client message buffer.
 * @param buflen The length of the buffer.
 * @param handshake The jandshake hash.
 * @param protocol The requested protocol or NULL.
 * @return True iif success, else false.
 */
static bool prepare_response(char* buf,uint32_t buflen,char* handshake,char* protocol) {
  const char WS_HEADER[] = "Upgrade: websocket\r\n";
  const char WS_KEY[] = "Sec-WebSocket-Key: ";
  const char WS_RSP[] = "HTTP/1.1 101 Switching Protocols\r\n" \
                        "Upgrade: websocket\r\n" \
                        "Connection: Upgrade\r\n" \
                        "Sec-WebSocket-Accept: %s\r\n" \
                        "%s\r\n";

  char* key_start;
  char* key_end;
  char* hashed_key;

  if(!strstr(buf,WS_HEADER)) return 0;
  if(!buflen) return 0;
  key_start = strstr(buf,WS_KEY);
  if(!key_start) return 0;
  key_start += 19;
  key_end = strstr(key_start,"\r\n");
  if(!key_end) return 0;

  hashed_key = ws_hash_handshake(key_start,key_end-key_start);
  if(!hashed_key) return 0;
  if(protocol) {
    char tmp[256];
    sprintf(tmp,WS_RSP,hashed_key,"Sec-WebSocket-Protocol: %s\r\n");
    sprintf(handshake,tmp,protocol);
  }
  else {
    sprintf(handshake,WS_RSP,hashed_key,"");
  }
  free(hashed_key);
  return 1;
}

int ws_server_add_client_protocol(struct netconn* conn,
                         char* msg,
                         uint16_t len,
                         char* url,
                         char* protocol,
                         void (*callback)(uint8_t num,
                                          WEBSOCKET_TYPE_t type,
                                          char* msg,
                                          uint64_t len)) {
  int ret;
  char handshake[256];

    if(!prepare_response(msg,len,handshake,protocol)) {
    	netconn_close(conn);
    	netconn_delete(conn);
    	return -2;
    }

    ret = -1;
    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	conn->callback = background_callback;
  	netconn_write(conn,handshake,strlen(handshake),NETCONN_COPY);

    	for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
    	    if(clients[i].conn) continue;
    	    callback(i,WEBSOCKET_CONNECT,NULL,0);
    	    clients[i] = ws_connect_client(conn,url,NULL,callback);
    	    if(!ws_is_connected(clients[i])) {
      	    	callback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
      	    	ws_disconnect_client(&clients[i]);
      	    	break;
    	    }
    	    ret = i;
    	    break;
    	}
    	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}



int ws_server_len_url(char* url) 
{
    int ret;
    ret = 0;
  
    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
    	    if(clients[i].url && !strcmp(url,clients[i].url)) 
		ret++;
  	}
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}

int ws_server_add_client(struct netconn* conn,
                         char* msg,
                         uint16_t len,
                         char* url,
                         void (*callback)(uint8_t num,
                                          WEBSOCKET_TYPE_t type,
                                          char* msg,
                                          uint64_t len)) {

  return ws_server_add_client_protocol(conn,msg,len,url,NULL,callback);

}

int ws_server_len_all() 
{
    int ret;
    ret = 0;
  
    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
    	    if(clients[i].conn) 
		ret++;
  	}
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}



int ws_server_remove_client(int num) 
{
    int ret = 0;
  
    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	if(ws_is_connected(clients[num])) {
    	    clients[num].scallback(num,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
    	    ws_disconnect_client(&clients[num]);
    	    ret = 1;
  	}
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}



int ws_server_remove_clients(char* url) 
{
    int ret = 0;
  
    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
    	    if(ws_is_connected(clients[i]) && !strcmp(url,clients[i].url)) {
      		clients[i].scallback(i,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
      		ws_disconnect_client(&clients[i]);
      		ret += 1;
    	    }
  	}
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}

int ws_server_remove_all() 
{
    int ret = 0;

    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
    	    if(ws_is_connected(clients[i])) {
      		clients[i].scallback(i,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
      		ws_disconnect_client(&clients[i]);
      		ret += 1;
    	    }
  	}
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}

// The following functions are already written below, but without the mutex.

int ws_server_send_text_client(int num,char* msg,uint64_t len)
{
    int ret = 0;

    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	ret = ws_server_send_text_client_from_callback(num, msg, len);
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}



int ws_server_send_text_clients(char* url,char* msg,uint64_t len) 
{
    int ret = 0;

    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	ret = ws_server_send_text_clients_from_callback(url, msg, len);
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}



int ws_server_send_text_all(char* msg,uint64_t len) 
{
    int ret = 0;

    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
  	ret = ws_server_send_text_all_from_callback(msg, len);
  	xSemaphoreGive(xwebsocket_mutex);
    }
  
    return ret;
}



// the following functions should be used inside of the callback. The regular versions
// grab the mutex, but it is already grabbed from inside the callback so it will hang.

int ws_server_send_text_client_from_callback(int num,char* msg,uint64_t len) 
{
    int ret = 0;
  
    if (ws_is_connected(clients[num])) {
    	if ((ws_send(&clients[num],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
    	    ret = 1;
	} else {
      	    clients[num].scallback(num,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
            ws_disconnect_client(&clients[num]);
      	    ret = 0;
    	}
    }
    return ret;
}



int ws_server_send_text_clients_from_callback(char* url,char* msg,uint64_t len) 
{
    int ret = 0;
    
    for (int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
    	if (ws_is_connected(clients[i]) && !strcmp(clients[i].url,url)) {
      	    if ((ws_send(&clients[i],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
		ret++;
	    } else {
        	clients[i].scallback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
        	ws_disconnect_client(&clients[i]);
      	    }
    	}
    }
    return ret;
}



int ws_server_send_text_all_from_callback(char* msg,uint64_t len) 
{
    int ret = 0;
  
    for (int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
    	if (ws_is_connected(clients[i])) {
      	    if ((ws_send(&clients[i],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
		ret++;
	    } else {
        	clients[i].scallback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
        	ws_disconnect_client(&clients[i]);
      	    }
    	}
    }
    return ret;
}



int ws_server_send_bin_client_from_callback(int num,char* msg,uint64_t len)
{
    int ret = 0;

    if (ws_is_connected(clients[num])) {
        if ((ws_send(&clients[num],WEBSOCKET_OPCODE_BIN,msg,len,0) == ERR_OK)) {
	    ret = 1;
	} else {
	    clients[num].scallback(num,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
	    ws_disconnect_client(&clients[num]);
	    ret = 0;
	}
    }

    return ret;
}



int ws_server_send_bin_client(int num,char* msg,uint64_t len) 
{
    int	ret = 0;

    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
    	ret = ws_server_send_bin_client_from_callback(num, msg, len);
    	xSemaphoreGive(xwebsocket_mutex);
    }
    return ret;
}



int ws_server_ping(int num) 
{
    int ret = 0;

    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
    	if (ws_is_connected(clients[num])) {
	    if ((ws_send(&clients[num], WEBSOCKET_OPCODE_PING, (char *)"BrewPing", 8, 0) == ERR_OK)) {
	    	ret = 1;
	    } else {
	    	clients[num].scallback(num, WEBSOCKET_DISCONNECT_ERROR, NULL, 0);
	    	ws_disconnect_client(&clients[num]);
	    	ret = 0;
	    }
    	}
    	xSemaphoreGive(xwebsocket_mutex);
    }
    
    return ret;
}

mercurial