diff -r 000000000000 -r b74b0e4902c3 components/websocket/websocket_server.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/components/websocket/websocket_server.c Sat Oct 20 13:23:15 2018 +0200 @@ -0,0 +1,489 @@ +/** + * @file websocket_server.c + * @brief Websocket server functions. + * @author Blake Felt - blake.w.felt@gmail.com` + */ + +#include "websocket_server.h" +#include "freertos/FreeRTOS.h" +#include "freertos/semphr.h" +#include "freertos/task.h" +#include "freertos/queue.h" +#include "lwip/tcp.h" +#include + +static SemaphoreHandle_t xwebsocket_mutex; // to lock the client array +static QueueHandle_t xwebsocket_queue; // to hold the clients that send messages +static ws_client_t clients[WEBSOCKET_SERVER_MAX_CLIENTS]; // holds list of clients +static TaskHandle_t xtask; // the task itself + +/** + * @brief Add a connection to the callback queue. + * @param conn The network connection. + * @param evt The event. + * @param len Not used. + */ +static void background_callback(struct netconn* conn, enum netconn_evt evt,u16_t len) { + switch(evt) { + case NETCONN_EVT_RCVPLUS: + xQueueSendToBack(xwebsocket_queue,&conn,WEBSOCKET_SERVER_QUEUE_TIMEOUT); + break; + default: + break; + } +} + +/** + * @brief Handle client read. + * @param num The client connection number. + */ +static void handle_read(uint8_t num) +{ + ws_header_t header; + char* msg; + + header.received = 0; + msg = ws_read(&clients[num],&header); + + if (!header.received) { + if (msg) + free(msg); + return NULL; + } + + switch(clients[num].last_opcode) { + case WEBSOCKET_OPCODE_CONT: + break; + case WEBSOCKET_OPCODE_BIN: + clients[num].scallback(num,WEBSOCKET_BIN,msg,header.length); + break; + case WEBSOCKET_OPCODE_TEXT: + clients[num].scallback(num,WEBSOCKET_TEXT,msg,header.length); + break; + case WEBSOCKET_OPCODE_PING: + ws_send(&clients[num],WEBSOCKET_OPCODE_PONG,msg,header.length,0); + clients[num].scallback(num,WEBSOCKET_PING,msg,header.length); + break; + case WEBSOCKET_OPCODE_PONG: + if (clients[num].ping) { + clients[num].scallback(num,WEBSOCKET_PONG,NULL,0); + clients[num].ping = 0; + } + break; + case WEBSOCKET_OPCODE_CLOSE: + clients[num].scallback(num,WEBSOCKET_DISCONNECT_EXTERNAL,NULL,0); + ws_disconnect_client(&clients[num]); + break; + default: + break; + } + + if(msg) + free(msg); +} + +/** + * @brief The webserver task. + */ +static void ws_server_task(void* pvParameters) +{ + struct netconn* conn; + + xwebsocket_mutex = xSemaphoreCreateMutex(); + xwebsocket_queue = xQueueCreate(WEBSOCKET_SERVER_QUEUE_SIZE, sizeof(struct netconn*)); + + // initialize all clients + for (int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) { + clients[i].conn = NULL; + clients[i].connected = false; + clients[i].url = NULL; + clients[i].ping = 0; + clients[i].last_opcode = 0; + clients[i].contin = NULL; + clients[i].len = 0; + clients[i].ccallback = NULL; + clients[i].scallback = NULL; + } + + for(;;) { + xQueueReceive(xwebsocket_queue, &conn, portMAX_DELAY); + if(!conn) continue; // if the connection was NULL, ignore it + + if (xSemaphoreTake(xwebsocket_mutex, 25) == pdTRUE) { // take access + for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) { + if(clients[i].conn == conn) { + handle_read(i); + break; + } + } + xSemaphoreGive(xwebsocket_mutex); // return access + } + } + vTaskDelete(NULL); +} + + + +int ws_server_start() +{ + if(xtask) + return 0; + + xTaskCreate(&ws_server_task, "ws_server_task", WEBSOCKET_SERVER_TASK_STACK_DEPTH, NULL, WEBSOCKET_SERVER_TASK_PRIORITY, &xtask); + return 1; +} + + + +int ws_server_stop() { + if(!xtask) + return 0; + + vTaskDelete(xtask); + return 1; +} + + + +/** + * @brief Prepare a response for a new websocket connection. + * @param buf The client message buffer. + * @param buflen The length of the buffer. + * @param handshake The jandshake hash. + * @param protocol The requested protocol or NULL. + * @return True iif success, else false. + */ +static bool prepare_response(char* buf,uint32_t buflen,char* handshake,char* protocol) { + const char WS_HEADER[] = "Upgrade: websocket\r\n"; + const char WS_KEY[] = "Sec-WebSocket-Key: "; + const char WS_RSP[] = "HTTP/1.1 101 Switching Protocols\r\n" \ + "Upgrade: websocket\r\n" \ + "Connection: Upgrade\r\n" \ + "Sec-WebSocket-Accept: %s\r\n" \ + "%s\r\n"; + + char* key_start; + char* key_end; + char* hashed_key; + + if(!strstr(buf,WS_HEADER)) return 0; + if(!buflen) return 0; + key_start = strstr(buf,WS_KEY); + if(!key_start) return 0; + key_start += 19; + key_end = strstr(key_start,"\r\n"); + if(!key_end) return 0; + + hashed_key = ws_hash_handshake(key_start,key_end-key_start); + if(!hashed_key) return 0; + if(protocol) { + char tmp[256]; + sprintf(tmp,WS_RSP,hashed_key,"Sec-WebSocket-Protocol: %s\r\n"); + sprintf(handshake,tmp,protocol); + } + else { + sprintf(handshake,WS_RSP,hashed_key,""); + } + free(hashed_key); + return 1; +} + +int ws_server_add_client_protocol(struct netconn* conn, + char* msg, + uint16_t len, + char* url, + char* protocol, + void (*callback)(uint8_t num, + WEBSOCKET_TYPE_t type, + char* msg, + uint64_t len)) { + int ret; + char handshake[256]; + + if(!prepare_response(msg,len,handshake,protocol)) { + netconn_close(conn); + netconn_delete(conn); + return -2; + } + + ret = -1; + if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) { + conn->callback = background_callback; + netconn_write(conn,handshake,strlen(handshake),NETCONN_COPY); + + for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) { + if(clients[i].conn) continue; + callback(i,WEBSOCKET_CONNECT,NULL,0); + clients[i] = ws_connect_client(conn,url,NULL,callback); + if(!ws_is_connected(clients[i])) { + callback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0); + ws_disconnect_client(&clients[i]); + break; + } + ret = i; + break; + } + xSemaphoreGive(xwebsocket_mutex); + } + + return ret; +} + + + +int ws_server_len_url(char* url) +{ + int ret; + ret = 0; + + if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) { + for(int i=0;i