components/websocket/websocket_server.c

changeset 0
b74b0e4902c3
child 55
38e1c91bfd88
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/components/websocket/websocket_server.c	Sat Oct 20 13:23:15 2018 +0200
@@ -0,0 +1,489 @@
+/**
+ * @file websocket_server.c
+ * @brief Websocket server functions.
+ * @author Blake Felt - blake.w.felt@gmail.com`
+ */
+
+#include "websocket_server.h"
+#include "freertos/FreeRTOS.h"
+#include "freertos/semphr.h"
+#include "freertos/task.h"
+#include "freertos/queue.h"
+#include "lwip/tcp.h"
+#include <string.h>
+
+static SemaphoreHandle_t xwebsocket_mutex; // to lock the client array
+static QueueHandle_t xwebsocket_queue; // to hold the clients that send messages
+static ws_client_t clients[WEBSOCKET_SERVER_MAX_CLIENTS]; // holds list of clients
+static TaskHandle_t xtask; // the task itself
+
+/**
+ * @brief Add a connection to the callback queue.
+ * @param conn The network connection.
+ * @param evt The event.
+ * @param len Not used.
+ */
+static void background_callback(struct netconn* conn, enum netconn_evt evt,u16_t len) {
+  switch(evt) {
+    case NETCONN_EVT_RCVPLUS:
+      xQueueSendToBack(xwebsocket_queue,&conn,WEBSOCKET_SERVER_QUEUE_TIMEOUT);
+      break;
+    default:
+      break;
+  }
+}
+
+/**
+ * @brief Handle client read.
+ * @param num The client connection number.
+ */
+static void handle_read(uint8_t num) 
+{
+    ws_header_t header;
+    char* msg;
+
+    header.received = 0;
+    msg = ws_read(&clients[num],&header);
+
+    if (!header.received) {
+    	if (msg) 
+	    free(msg);
+    	return NULL;
+    }
+
+    switch(clients[num].last_opcode) {
+    	case WEBSOCKET_OPCODE_CONT:
+      		break;
+    	case WEBSOCKET_OPCODE_BIN:
+      		clients[num].scallback(num,WEBSOCKET_BIN,msg,header.length);
+      		break;
+    	case WEBSOCKET_OPCODE_TEXT:
+      		clients[num].scallback(num,WEBSOCKET_TEXT,msg,header.length);
+      		break;
+    	case WEBSOCKET_OPCODE_PING:
+      		ws_send(&clients[num],WEBSOCKET_OPCODE_PONG,msg,header.length,0);
+      		clients[num].scallback(num,WEBSOCKET_PING,msg,header.length);
+      		break;
+    	case WEBSOCKET_OPCODE_PONG:
+      		if (clients[num].ping) {
+        	    clients[num].scallback(num,WEBSOCKET_PONG,NULL,0);
+        	    clients[num].ping = 0;
+      		}
+      		break;
+    	case WEBSOCKET_OPCODE_CLOSE:
+      		clients[num].scallback(num,WEBSOCKET_DISCONNECT_EXTERNAL,NULL,0);
+      		ws_disconnect_client(&clients[num]);
+      		break;
+    	default:
+      		break;
+    }
+
+    if(msg) 
+	free(msg);
+}
+
+/**
+ * @brief The webserver task.
+ */
+static void ws_server_task(void* pvParameters) 
+{
+    struct netconn* conn;
+
+    xwebsocket_mutex = xSemaphoreCreateMutex();
+    xwebsocket_queue = xQueueCreate(WEBSOCKET_SERVER_QUEUE_SIZE, sizeof(struct netconn*));
+
+    // initialize all clients
+    for (int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
+    	clients[i].conn = NULL;
+	clients[i].connected = false;
+    	clients[i].url  = NULL;
+    	clients[i].ping = 0;
+    	clients[i].last_opcode = 0;
+    	clients[i].contin = NULL;
+    	clients[i].len = 0;
+    	clients[i].ccallback = NULL;
+    	clients[i].scallback = NULL;
+    }
+
+    for(;;) {
+    	xQueueReceive(xwebsocket_queue, &conn, portMAX_DELAY);
+    	if(!conn) continue; // if the connection was NULL, ignore it
+
+    	if (xSemaphoreTake(xwebsocket_mutex, 25) == pdTRUE) { // take access
+    	    for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
+      	    	if(clients[i].conn == conn) {
+        	    handle_read(i);
+        	    break;
+      	    	}
+    	    }
+    	    xSemaphoreGive(xwebsocket_mutex); // return access
+    	}
+    }
+    vTaskDelete(NULL);
+}
+
+
+
+int ws_server_start() 
+{
+    if(xtask)
+	return 0;
+  
+    xTaskCreate(&ws_server_task, "ws_server_task", WEBSOCKET_SERVER_TASK_STACK_DEPTH, NULL, WEBSOCKET_SERVER_TASK_PRIORITY, &xtask);
+    return 1;
+}
+
+
+
+int ws_server_stop() {
+    if(!xtask) 
+	return 0;
+  
+    vTaskDelete(xtask);
+    return 1;
+}
+
+
+
+/**
+ * @brief Prepare a response for a new websocket connection.
+ * @param buf The client message buffer.
+ * @param buflen The length of the buffer.
+ * @param handshake The jandshake hash.
+ * @param protocol The requested protocol or NULL.
+ * @return True iif success, else false.
+ */
+static bool prepare_response(char* buf,uint32_t buflen,char* handshake,char* protocol) {
+  const char WS_HEADER[] = "Upgrade: websocket\r\n";
+  const char WS_KEY[] = "Sec-WebSocket-Key: ";
+  const char WS_RSP[] = "HTTP/1.1 101 Switching Protocols\r\n" \
+                        "Upgrade: websocket\r\n" \
+                        "Connection: Upgrade\r\n" \
+                        "Sec-WebSocket-Accept: %s\r\n" \
+                        "%s\r\n";
+
+  char* key_start;
+  char* key_end;
+  char* hashed_key;
+
+  if(!strstr(buf,WS_HEADER)) return 0;
+  if(!buflen) return 0;
+  key_start = strstr(buf,WS_KEY);
+  if(!key_start) return 0;
+  key_start += 19;
+  key_end = strstr(key_start,"\r\n");
+  if(!key_end) return 0;
+
+  hashed_key = ws_hash_handshake(key_start,key_end-key_start);
+  if(!hashed_key) return 0;
+  if(protocol) {
+    char tmp[256];
+    sprintf(tmp,WS_RSP,hashed_key,"Sec-WebSocket-Protocol: %s\r\n");
+    sprintf(handshake,tmp,protocol);
+  }
+  else {
+    sprintf(handshake,WS_RSP,hashed_key,"");
+  }
+  free(hashed_key);
+  return 1;
+}
+
+int ws_server_add_client_protocol(struct netconn* conn,
+                         char* msg,
+                         uint16_t len,
+                         char* url,
+                         char* protocol,
+                         void (*callback)(uint8_t num,
+                                          WEBSOCKET_TYPE_t type,
+                                          char* msg,
+                                          uint64_t len)) {
+  int ret;
+  char handshake[256];
+
+    if(!prepare_response(msg,len,handshake,protocol)) {
+    	netconn_close(conn);
+    	netconn_delete(conn);
+    	return -2;
+    }
+
+    ret = -1;
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	conn->callback = background_callback;
+  	netconn_write(conn,handshake,strlen(handshake),NETCONN_COPY);
+
+    	for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
+    	    if(clients[i].conn) continue;
+    	    callback(i,WEBSOCKET_CONNECT,NULL,0);
+    	    clients[i] = ws_connect_client(conn,url,NULL,callback);
+    	    if(!ws_is_connected(clients[i])) {
+      	    	callback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
+      	    	ws_disconnect_client(&clients[i]);
+      	    	break;
+    	    }
+    	    ret = i;
+    	    break;
+    	}
+    	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+
+
+int ws_server_len_url(char* url) 
+{
+    int ret;
+    ret = 0;
+  
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
+    	    if(clients[i].url && !strcmp(url,clients[i].url)) 
+		ret++;
+  	}
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+int ws_server_add_client(struct netconn* conn,
+                         char* msg,
+                         uint16_t len,
+                         char* url,
+                         void (*callback)(uint8_t num,
+                                          WEBSOCKET_TYPE_t type,
+                                          char* msg,
+                                          uint64_t len)) {
+
+  return ws_server_add_client_protocol(conn,msg,len,url,NULL,callback);
+
+}
+
+int ws_server_len_all() 
+{
+    int ret;
+    ret = 0;
+  
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
+    	    if(clients[i].conn) 
+		ret++;
+  	}
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+
+
+int ws_server_remove_client(int num) 
+{
+    int ret = 0;
+  
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	if(ws_is_connected(clients[num])) {
+    	    clients[num].scallback(num,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
+    	    ws_disconnect_client(&clients[num]);
+    	    ret = 1;
+  	}
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+
+
+int ws_server_remove_clients(char* url) 
+{
+    int ret = 0;
+  
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
+    	    if(ws_is_connected(clients[i]) && !strcmp(url,clients[i].url)) {
+      		clients[i].scallback(i,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
+      		ws_disconnect_client(&clients[i]);
+      		ret += 1;
+    	    }
+  	}
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+int ws_server_remove_all() 
+{
+    int ret = 0;
+
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
+    	    if(ws_is_connected(clients[i])) {
+      		clients[i].scallback(i,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
+      		ws_disconnect_client(&clients[i]);
+      		ret += 1;
+    	    }
+  	}
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+// The following functions are already written below, but without the mutex.
+
+int ws_server_send_text_client(int num,char* msg,uint64_t len)
+{
+    int ret = 0;
+
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	ret = ws_server_send_text_client_from_callback(num, msg, len);
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+
+
+int ws_server_send_text_clients(char* url,char* msg,uint64_t len) 
+{
+    int ret = 0;
+
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	ret = ws_server_send_text_clients_from_callback(url, msg, len);
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+
+
+int ws_server_send_text_all(char* msg,uint64_t len) 
+{
+    int ret = 0;
+
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+  	ret = ws_server_send_text_all_from_callback(msg, len);
+  	xSemaphoreGive(xwebsocket_mutex);
+    }
+  
+    return ret;
+}
+
+
+
+// the following functions should be used inside of the callback. The regular versions
+// grab the mutex, but it is already grabbed from inside the callback so it will hang.
+
+int ws_server_send_text_client_from_callback(int num,char* msg,uint64_t len) 
+{
+    int ret = 0;
+  
+    if (ws_is_connected(clients[num])) {
+    	if ((ws_send(&clients[num],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
+    	    ret = 1;
+	} else {
+      	    clients[num].scallback(num,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
+            ws_disconnect_client(&clients[num]);
+      	    ret = 0;
+    	}
+    }
+    return ret;
+}
+
+
+
+int ws_server_send_text_clients_from_callback(char* url,char* msg,uint64_t len) 
+{
+    int ret = 0;
+    
+    for (int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
+    	if (ws_is_connected(clients[i]) && !strcmp(clients[i].url,url)) {
+      	    if ((ws_send(&clients[i],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
+		ret++;
+	    } else {
+        	clients[i].scallback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
+        	ws_disconnect_client(&clients[i]);
+      	    }
+    	}
+    }
+    return ret;
+}
+
+
+
+int ws_server_send_text_all_from_callback(char* msg,uint64_t len) 
+{
+    int ret = 0;
+  
+    for (int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
+    	if (ws_is_connected(clients[i])) {
+      	    if ((ws_send(&clients[i],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
+		ret++;
+	    } else {
+        	clients[i].scallback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
+        	ws_disconnect_client(&clients[i]);
+      	    }
+    	}
+    }
+    return ret;
+}
+
+
+
+int ws_server_send_bin_client_from_callback(int num,char* msg,uint64_t len)
+{
+    int ret = 0;
+
+    if (ws_is_connected(clients[num])) {
+        if ((ws_send(&clients[num],WEBSOCKET_OPCODE_BIN,msg,len,0) == ERR_OK)) {
+	    ret = 1;
+	} else {
+	    clients[num].scallback(num,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
+	    ws_disconnect_client(&clients[num]);
+	    ret = 0;
+	}
+    }
+
+    return ret;
+}
+
+
+
+int ws_server_send_bin_client(int num,char* msg,uint64_t len) 
+{
+    int	ret = 0;
+
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+    	ret = ws_server_send_bin_client_from_callback(num, msg, len);
+    	xSemaphoreGive(xwebsocket_mutex);
+    }
+    return ret;
+}
+
+
+
+int ws_server_ping(int num) 
+{
+    int ret = 0;
+
+    if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
+    	if (ws_is_connected(clients[num])) {
+	    if ((ws_send(&clients[num], WEBSOCKET_OPCODE_PING, (char *)"BrewPing", 8, 0) == ERR_OK)) {
+	    	ret = 1;
+	    } else {
+	    	clients[num].scallback(num, WEBSOCKET_DISCONNECT_ERROR, NULL, 0);
+	    	ws_disconnect_client(&clients[num]);
+	    	ret = 0;
+	    }
+    	}
+    	xSemaphoreGive(xwebsocket_mutex);
+    }
+    
+    return ret;
+}
+

mercurial