components/websocket/websocket_server.c

changeset 0
b74b0e4902c3
child 55
38e1c91bfd88
equal deleted inserted replaced
-1:000000000000 0:b74b0e4902c3
1 /**
2 * @file websocket_server.c
3 * @brief Websocket server functions.
4 * @author Blake Felt - blake.w.felt@gmail.com`
5 */
6
7 #include "websocket_server.h"
8 #include "freertos/FreeRTOS.h"
9 #include "freertos/semphr.h"
10 #include "freertos/task.h"
11 #include "freertos/queue.h"
12 #include "lwip/tcp.h"
13 #include <string.h>
14
15 static SemaphoreHandle_t xwebsocket_mutex; // to lock the client array
16 static QueueHandle_t xwebsocket_queue; // to hold the clients that send messages
17 static ws_client_t clients[WEBSOCKET_SERVER_MAX_CLIENTS]; // holds list of clients
18 static TaskHandle_t xtask; // the task itself
19
20 /**
21 * @brief Add a connection to the callback queue.
22 * @param conn The network connection.
23 * @param evt The event.
24 * @param len Not used.
25 */
26 static void background_callback(struct netconn* conn, enum netconn_evt evt,u16_t len) {
27 switch(evt) {
28 case NETCONN_EVT_RCVPLUS:
29 xQueueSendToBack(xwebsocket_queue,&conn,WEBSOCKET_SERVER_QUEUE_TIMEOUT);
30 break;
31 default:
32 break;
33 }
34 }
35
36 /**
37 * @brief Handle client read.
38 * @param num The client connection number.
39 */
40 static void handle_read(uint8_t num)
41 {
42 ws_header_t header;
43 char* msg;
44
45 header.received = 0;
46 msg = ws_read(&clients[num],&header);
47
48 if (!header.received) {
49 if (msg)
50 free(msg);
51 return NULL;
52 }
53
54 switch(clients[num].last_opcode) {
55 case WEBSOCKET_OPCODE_CONT:
56 break;
57 case WEBSOCKET_OPCODE_BIN:
58 clients[num].scallback(num,WEBSOCKET_BIN,msg,header.length);
59 break;
60 case WEBSOCKET_OPCODE_TEXT:
61 clients[num].scallback(num,WEBSOCKET_TEXT,msg,header.length);
62 break;
63 case WEBSOCKET_OPCODE_PING:
64 ws_send(&clients[num],WEBSOCKET_OPCODE_PONG,msg,header.length,0);
65 clients[num].scallback(num,WEBSOCKET_PING,msg,header.length);
66 break;
67 case WEBSOCKET_OPCODE_PONG:
68 if (clients[num].ping) {
69 clients[num].scallback(num,WEBSOCKET_PONG,NULL,0);
70 clients[num].ping = 0;
71 }
72 break;
73 case WEBSOCKET_OPCODE_CLOSE:
74 clients[num].scallback(num,WEBSOCKET_DISCONNECT_EXTERNAL,NULL,0);
75 ws_disconnect_client(&clients[num]);
76 break;
77 default:
78 break;
79 }
80
81 if(msg)
82 free(msg);
83 }
84
85 /**
86 * @brief The webserver task.
87 */
88 static void ws_server_task(void* pvParameters)
89 {
90 struct netconn* conn;
91
92 xwebsocket_mutex = xSemaphoreCreateMutex();
93 xwebsocket_queue = xQueueCreate(WEBSOCKET_SERVER_QUEUE_SIZE, sizeof(struct netconn*));
94
95 // initialize all clients
96 for (int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
97 clients[i].conn = NULL;
98 clients[i].connected = false;
99 clients[i].url = NULL;
100 clients[i].ping = 0;
101 clients[i].last_opcode = 0;
102 clients[i].contin = NULL;
103 clients[i].len = 0;
104 clients[i].ccallback = NULL;
105 clients[i].scallback = NULL;
106 }
107
108 for(;;) {
109 xQueueReceive(xwebsocket_queue, &conn, portMAX_DELAY);
110 if(!conn) continue; // if the connection was NULL, ignore it
111
112 if (xSemaphoreTake(xwebsocket_mutex, 25) == pdTRUE) { // take access
113 for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
114 if(clients[i].conn == conn) {
115 handle_read(i);
116 break;
117 }
118 }
119 xSemaphoreGive(xwebsocket_mutex); // return access
120 }
121 }
122 vTaskDelete(NULL);
123 }
124
125
126
127 int ws_server_start()
128 {
129 if(xtask)
130 return 0;
131
132 xTaskCreate(&ws_server_task, "ws_server_task", WEBSOCKET_SERVER_TASK_STACK_DEPTH, NULL, WEBSOCKET_SERVER_TASK_PRIORITY, &xtask);
133 return 1;
134 }
135
136
137
138 int ws_server_stop() {
139 if(!xtask)
140 return 0;
141
142 vTaskDelete(xtask);
143 return 1;
144 }
145
146
147
148 /**
149 * @brief Prepare a response for a new websocket connection.
150 * @param buf The client message buffer.
151 * @param buflen The length of the buffer.
152 * @param handshake The jandshake hash.
153 * @param protocol The requested protocol or NULL.
154 * @return True iif success, else false.
155 */
156 static bool prepare_response(char* buf,uint32_t buflen,char* handshake,char* protocol) {
157 const char WS_HEADER[] = "Upgrade: websocket\r\n";
158 const char WS_KEY[] = "Sec-WebSocket-Key: ";
159 const char WS_RSP[] = "HTTP/1.1 101 Switching Protocols\r\n" \
160 "Upgrade: websocket\r\n" \
161 "Connection: Upgrade\r\n" \
162 "Sec-WebSocket-Accept: %s\r\n" \
163 "%s\r\n";
164
165 char* key_start;
166 char* key_end;
167 char* hashed_key;
168
169 if(!strstr(buf,WS_HEADER)) return 0;
170 if(!buflen) return 0;
171 key_start = strstr(buf,WS_KEY);
172 if(!key_start) return 0;
173 key_start += 19;
174 key_end = strstr(key_start,"\r\n");
175 if(!key_end) return 0;
176
177 hashed_key = ws_hash_handshake(key_start,key_end-key_start);
178 if(!hashed_key) return 0;
179 if(protocol) {
180 char tmp[256];
181 sprintf(tmp,WS_RSP,hashed_key,"Sec-WebSocket-Protocol: %s\r\n");
182 sprintf(handshake,tmp,protocol);
183 }
184 else {
185 sprintf(handshake,WS_RSP,hashed_key,"");
186 }
187 free(hashed_key);
188 return 1;
189 }
190
191 int ws_server_add_client_protocol(struct netconn* conn,
192 char* msg,
193 uint16_t len,
194 char* url,
195 char* protocol,
196 void (*callback)(uint8_t num,
197 WEBSOCKET_TYPE_t type,
198 char* msg,
199 uint64_t len)) {
200 int ret;
201 char handshake[256];
202
203 if(!prepare_response(msg,len,handshake,protocol)) {
204 netconn_close(conn);
205 netconn_delete(conn);
206 return -2;
207 }
208
209 ret = -1;
210 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
211 conn->callback = background_callback;
212 netconn_write(conn,handshake,strlen(handshake),NETCONN_COPY);
213
214 for(int i = 0; i < WEBSOCKET_SERVER_MAX_CLIENTS; i++) {
215 if(clients[i].conn) continue;
216 callback(i,WEBSOCKET_CONNECT,NULL,0);
217 clients[i] = ws_connect_client(conn,url,NULL,callback);
218 if(!ws_is_connected(clients[i])) {
219 callback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
220 ws_disconnect_client(&clients[i]);
221 break;
222 }
223 ret = i;
224 break;
225 }
226 xSemaphoreGive(xwebsocket_mutex);
227 }
228
229 return ret;
230 }
231
232
233
234 int ws_server_len_url(char* url)
235 {
236 int ret;
237 ret = 0;
238
239 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
240 for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
241 if(clients[i].url && !strcmp(url,clients[i].url))
242 ret++;
243 }
244 xSemaphoreGive(xwebsocket_mutex);
245 }
246
247 return ret;
248 }
249
250 int ws_server_add_client(struct netconn* conn,
251 char* msg,
252 uint16_t len,
253 char* url,
254 void (*callback)(uint8_t num,
255 WEBSOCKET_TYPE_t type,
256 char* msg,
257 uint64_t len)) {
258
259 return ws_server_add_client_protocol(conn,msg,len,url,NULL,callback);
260
261 }
262
263 int ws_server_len_all()
264 {
265 int ret;
266 ret = 0;
267
268 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
269 for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
270 if(clients[i].conn)
271 ret++;
272 }
273 xSemaphoreGive(xwebsocket_mutex);
274 }
275
276 return ret;
277 }
278
279
280
281 int ws_server_remove_client(int num)
282 {
283 int ret = 0;
284
285 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
286 if(ws_is_connected(clients[num])) {
287 clients[num].scallback(num,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
288 ws_disconnect_client(&clients[num]);
289 ret = 1;
290 }
291 xSemaphoreGive(xwebsocket_mutex);
292 }
293
294 return ret;
295 }
296
297
298
299 int ws_server_remove_clients(char* url)
300 {
301 int ret = 0;
302
303 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
304 for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
305 if(ws_is_connected(clients[i]) && !strcmp(url,clients[i].url)) {
306 clients[i].scallback(i,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
307 ws_disconnect_client(&clients[i]);
308 ret += 1;
309 }
310 }
311 xSemaphoreGive(xwebsocket_mutex);
312 }
313
314 return ret;
315 }
316
317 int ws_server_remove_all()
318 {
319 int ret = 0;
320
321 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
322 for(int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
323 if(ws_is_connected(clients[i])) {
324 clients[i].scallback(i,WEBSOCKET_DISCONNECT_INTERNAL,NULL,0);
325 ws_disconnect_client(&clients[i]);
326 ret += 1;
327 }
328 }
329 xSemaphoreGive(xwebsocket_mutex);
330 }
331
332 return ret;
333 }
334
335 // The following functions are already written below, but without the mutex.
336
337 int ws_server_send_text_client(int num,char* msg,uint64_t len)
338 {
339 int ret = 0;
340
341 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
342 ret = ws_server_send_text_client_from_callback(num, msg, len);
343 xSemaphoreGive(xwebsocket_mutex);
344 }
345
346 return ret;
347 }
348
349
350
351 int ws_server_send_text_clients(char* url,char* msg,uint64_t len)
352 {
353 int ret = 0;
354
355 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
356 ret = ws_server_send_text_clients_from_callback(url, msg, len);
357 xSemaphoreGive(xwebsocket_mutex);
358 }
359
360 return ret;
361 }
362
363
364
365 int ws_server_send_text_all(char* msg,uint64_t len)
366 {
367 int ret = 0;
368
369 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
370 ret = ws_server_send_text_all_from_callback(msg, len);
371 xSemaphoreGive(xwebsocket_mutex);
372 }
373
374 return ret;
375 }
376
377
378
379 // the following functions should be used inside of the callback. The regular versions
380 // grab the mutex, but it is already grabbed from inside the callback so it will hang.
381
382 int ws_server_send_text_client_from_callback(int num,char* msg,uint64_t len)
383 {
384 int ret = 0;
385
386 if (ws_is_connected(clients[num])) {
387 if ((ws_send(&clients[num],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
388 ret = 1;
389 } else {
390 clients[num].scallback(num,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
391 ws_disconnect_client(&clients[num]);
392 ret = 0;
393 }
394 }
395 return ret;
396 }
397
398
399
400 int ws_server_send_text_clients_from_callback(char* url,char* msg,uint64_t len)
401 {
402 int ret = 0;
403
404 for (int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
405 if (ws_is_connected(clients[i]) && !strcmp(clients[i].url,url)) {
406 if ((ws_send(&clients[i],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
407 ret++;
408 } else {
409 clients[i].scallback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
410 ws_disconnect_client(&clients[i]);
411 }
412 }
413 }
414 return ret;
415 }
416
417
418
419 int ws_server_send_text_all_from_callback(char* msg,uint64_t len)
420 {
421 int ret = 0;
422
423 for (int i=0;i<WEBSOCKET_SERVER_MAX_CLIENTS;i++) {
424 if (ws_is_connected(clients[i])) {
425 if ((ws_send(&clients[i],WEBSOCKET_OPCODE_TEXT,msg,len,0) == ERR_OK)) {
426 ret++;
427 } else {
428 clients[i].scallback(i,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
429 ws_disconnect_client(&clients[i]);
430 }
431 }
432 }
433 return ret;
434 }
435
436
437
438 int ws_server_send_bin_client_from_callback(int num,char* msg,uint64_t len)
439 {
440 int ret = 0;
441
442 if (ws_is_connected(clients[num])) {
443 if ((ws_send(&clients[num],WEBSOCKET_OPCODE_BIN,msg,len,0) == ERR_OK)) {
444 ret = 1;
445 } else {
446 clients[num].scallback(num,WEBSOCKET_DISCONNECT_ERROR,NULL,0);
447 ws_disconnect_client(&clients[num]);
448 ret = 0;
449 }
450 }
451
452 return ret;
453 }
454
455
456
457 int ws_server_send_bin_client(int num,char* msg,uint64_t len)
458 {
459 int ret = 0;
460
461 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
462 ret = ws_server_send_bin_client_from_callback(num, msg, len);
463 xSemaphoreGive(xwebsocket_mutex);
464 }
465 return ret;
466 }
467
468
469
470 int ws_server_ping(int num)
471 {
472 int ret = 0;
473
474 if (xSemaphoreTake(xwebsocket_mutex, 10) == pdTRUE) {
475 if (ws_is_connected(clients[num])) {
476 if ((ws_send(&clients[num], WEBSOCKET_OPCODE_PING, (char *)"BrewPing", 8, 0) == ERR_OK)) {
477 ret = 1;
478 } else {
479 clients[num].scallback(num, WEBSOCKET_DISCONNECT_ERROR, NULL, 0);
480 ws_disconnect_client(&clients[num]);
481 ret = 0;
482 }
483 }
484 xSemaphoreGive(xwebsocket_mutex);
485 }
486
487 return ret;
488 }
489

mercurial