bmsd/websocket.c

changeset 678
14322825cb3d
parent 677
6e82fece1f8f
child 679
48f8f3fce7c0
equal deleted inserted replaced
677:6e82fece1f8f 678:14322825cb3d
25 */ 25 */
26 26
27 #include "bms.h" 27 #include "bms.h"
28 #include "xutil.h" 28 #include "xutil.h"
29 #include "websocket.h" 29 #include "websocket.h"
30 #include "co2meters.h"
30 #include <libwebsockets.h> 31 #include <libwebsockets.h>
31 32
32 33
33 extern sys_config Config; 34 extern sys_config Config;
34 extern int debug; 35 extern int debug;
62 63
63 64
64 static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) 65 static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
65 { 66 {
66 struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; 67 struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
67 int n, m; 68 int n, m;
69 char buf[513];
68 70
69 switch (reason) { 71 switch (reason) {
70 72
71 case LWS_CALLBACK_ESTABLISHED: { 73 case LWS_CALLBACK_ESTABLISHED: {
72 ws_clients++; 74 ws_clients++;
108 } 110 }
109 } 111 }
110 break; 112 break;
111 113
112 case LWS_CALLBACK_RECEIVE: 114 case LWS_CALLBACK_RECEIVE:
113 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { 115
114 syslog(LOG_NOTICE, "ws: dropping!"); 116 memcpy(buf, in, len);
115 goto choke; 117 buf[len] = '\0';
116 } 118 syslog(LOG_NOTICE, "ws: reveived %ld bytes %s", len, buf);
117 119 if (strncmp(buf, (char *)"{\"device\":\"co2meters\",", 22) == 0) {
118 if (ringbuffer[ringbuffer_head].payload) 120 co2meter_ws_receive(buf);
119 free(ringbuffer[ringbuffer_head].payload); 121 }
120 122
121 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
122 ringbuffer[ringbuffer_head].len = len;
123 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, in, len);
124 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
125 ringbuffer_head = 0;
126 else
127 ringbuffer_head++;
128
129 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
130 goto done;
131
132 choke:
133 syslog(LOG_NOTICE, "ws: LWS_CALLBACK_RECEIVE: throttling");
134 lws_rx_flow_control(wsi, 0);
135
136 done:
137 lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
138 break; 123 break;
139 124
140 case LWS_CALLBACK_CLOSED: 125 case LWS_CALLBACK_CLOSED:
141 ws_clients--; 126 ws_clients--;
142 syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients); 127 syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients);
150 } 135 }
151 136
152 137
153 138
154 static struct lws_protocols protocols[] = { 139 static struct lws_protocols protocols[] = {
155 { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 128 }, 140 { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 512 },
156 { NULL, NULL, 0, 0 } /* terminator */ 141 { NULL, NULL, 0, 0 } /* terminator */
157 }; 142 };
158 143
159 144
160 /* 145 /*

mercurial