thermometers/main.c

changeset 8
e584bc0177df
parent 7
d74b26b2f217
child 10
5600a1789644
equal deleted inserted replaced
7:d74b26b2f217 8:e584bc0177df
18 * You should have received a copy of the GNU General Public License 18 * You should have received a copy of the GNU General Public License
19 * along with EC-65K; see the file COPYING. If not, write to the Free 19 * along with EC-65K; see the file COPYING. If not, write to the Free
20 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. 20 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
21 *****************************************************************************/ 21 *****************************************************************************/
22 22
23 #include <stdio.h> 23 #include "../lib/mbselib.h"
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <getopt.h>
27 #include <sys/types.h>
28 #include <signal.h>
29 #include <string.h>
30 #include <errno.h>
31
32 #include <mosquitto.h>
33
34
35 #include "main.h" 24 #include "main.h"
36 25
37 26
38 #define STATUS_CONNECTING 0 27 #define STATUS_CONNECTING 0
39 #define STATUS_CONNACK_RECVD 1 28 #define STATUS_CONNACK_RECVD 1
49 static int status = STATUS_CONNECTING; 38 static int status = STATUS_CONNECTING;
50 static int mid_sent = 0; 39 static int mid_sent = 0;
51 static int last_mid = -1; 40 static int last_mid = -1;
52 static int last_mid_sent = -1; 41 static int last_mid_sent = -1;
53 static bool connected = true; 42 static bool connected = true;
54 //static char *username = NULL;
55 //static char *password = NULL;
56 static bool disconnect_sent = false; 43 static bool disconnect_sent = false;
57 static bool quiet = false;
58 static bool debug = false;
59 static bool shutdown = false; 44 static bool shutdown = false;
60 45
46 extern bool debug;
47 extern sys_config Config;
48
49 int server(void);
50 void help(void);
51 void die(int);
61 52
62 53
63 void help(void) 54 void help(void)
64 { 55 {
65 fprintf(stdout, "Usage: thermomeneters [-d] [-h]\n"); 56 fprintf(stdout, "Usage: thermomeneters [-d] [-h]\n");
70 61
71 62
72 void die(int onsig) 63 void die(int onsig)
73 { 64 {
74 switch (onsig) { 65 switch (onsig) {
75 case SIGHUP: fprintf(stdout, "[main] Hangup detected\n"); 66 case SIGHUP: syslog(LOG_NOTICE, "Got SIGHUP, shutting down");
76 break; 67 break;
77 case SIGINT: fprintf(stdout, "[main] Interrupt from keyboard\n"); 68 case SIGINT: syslog(LOG_NOTICE, "Keyboard interrupt, shutting down");
78 break; 69 break;
79 case SIGTERM: fprintf(stdout, "[main] Termination signal received\n"); 70 case SIGTERM: syslog(LOG_NOTICE, "Got SIGTERM, shutting down");
80 break; 71 break;
81 default: fprintf(stdout, "[main] die on signal %d\n", onsig); 72 default: syslog(LOG_NOTICE, "die() on signal %d", onsig);
82 } 73 }
83 74
84 shutdown = true; 75 shutdown = true;
85 } 76 }
86 77
87 78
88 79
89 void my_connect_callback(struct mosquitto *mosq, void *obj, int result) 80 void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
90 { 81 {
91 int rc = MOSQ_ERR_SUCCESS;
92
93 fprintf(stdout, (char *)"my_connect_callback result=%d\n", result);
94 if (!result) { 82 if (!result) {
95 status = STATUS_CONNACK_RECVD; 83 status = STATUS_CONNACK_RECVD;
96 } else { 84 } else {
97 fprintf(stderr, "%s\n", mosquitto_connack_string(result)); 85 syslog(LOG_NOTICE, "my_connect_callback: %s\n", mosquitto_connack_string(result));
98 } 86 }
99 } 87 }
100 88
101 89
102 90
103 void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) 91 void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
104 { 92 {
105 fprintf(stdout, (char *)"my_disconnect_callback\n"); 93 if (shutdown) {
106 connected = false; 94 syslog(LOG_NOTICE, "Acknowledged DISCONNECT from %s", Config.mosq_host);
95 connected = false;
96 } else {
97 /*
98 * The remove server was brought down. We must keep running
99 */
100 syslog(LOG_NOTICE, "Received DISCONNECT from %s but we want to run", Config.mosq_host);
101 /*
102 * We need a temp state
103 */
104 }
107 } 105 }
108 106
109 107
110 108
111 void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) 109 void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
117 115
118 116
119 117
120 void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) 118 void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
121 { 119 {
122 printf("log: %s\n", str); 120 syslog(LOG_NOTICE, "MQTT: %s", str);
121 printf("MQTT: %s\n", str);
123 } 122 }
124 123
125 124
126 125
127 int main(int argc, char *argv[]) 126 int main(int argc, char *argv[])
128 { 127 {
129 int i, c, len, rc, rc2; 128 int rc, c, i;
130 char *id = NULL;
131 char *host = (char *)"lx02.mbse.ym";
132 int port = 1883;
133 struct mosquitto *mosq = NULL;
134 char hostname[256], buf[1024];
135 int keepalive = 60;
136 unsigned int max_inflight = 20;
137 char err[1024];
138 129
139 while (1) { 130 while (1) {
140 int option_index = 0; 131 int option_index = 0;
141 static struct option long_options[] = { 132 static struct option long_options[] = {
142 {"debug", 0, 0, 'c'}, 133 {"debug", 0, 0, 'c'},
154 case 'h': help(); 145 case 'h': help();
155 return 1; 146 return 1;
156 } 147 }
157 } 148 }
158 149
150 openlog("thermometers", LOG_PID|LOG_CONS|LOG_NOWAIT, LOG_USER);
151 syslog(LOG_NOTICE, "mbsePi-apps thermometers v%s starting", VERSION);
152
153 if (rdconfig()) {
154 fprintf(stderr, "Error reading configuration\n");
155 syslog(LOG_NOTICE, "halted");
156 return 1;
157 }
158
159 /* 159 /*
160 * Catch all the signals we can, and ignore the rest. Note that SIGKILL can't be ignored 160 * Catch all the signals we can, and ignore the rest. Note that SIGKILL can't be ignored
161 * but that's live. This daemon should only be stopped by SIGTERM. 161 * but that's live. This daemon should only be stopped by SIGTERM.
162 * Don't catch SIGCHLD. 162 * Don't catch SIGCHLD.
163 */ 163 */
164 for (i = 0; i < NSIG; i++) { 164 for (i = 0; i < NSIG; i++) {
165 if ((i != SIGCHLD) && (i != SIGKILL) && (i != SIGSTOP)) 165 if ((i != SIGCHLD) && (i != SIGKILL) && (i != SIGSTOP))
166 signal(i, (void (*))die); 166 signal(i, (void (*))die);
167 } 167 }
168 168
169 rc = server();
170 syslog(LOG_NOTICE, "Finished, rc=%d", rc);
171 return rc;
172 }
173
174
175
176 int server(void)
177 {
178 char *id = NULL, *state = NULL;
179 struct mosquitto *mosq = NULL;
180 char hostname[256], buf[1024];
181 int rc, keepalive = 60;
182 unsigned int max_inflight = 20;
183 char err[1024];
184
169 /* 185 /*
170 * Initialize mosquitto communication 186 * Initialize mosquitto communication
171 */ 187 */
172 mosquitto_lib_init(); 188 mosquitto_lib_init();
189
190 /*
191 * Build MQTT id
192 */
173 hostname[0] = '\0'; 193 hostname[0] = '\0';
174 gethostname(hostname, 256); 194 gethostname(hostname, 256);
175 hostname[255] = '\0'; 195 hostname[255] = '\0';
176 len = strlen("thermometers/") + 1 + strlen(hostname); 196
177 id = malloc(len); 197 id = xstrcpy((char *)"thermometers/");
178 if(!id) { 198 id = xstrcat(id, hostname);
179 if (!quiet)
180 fprintf(stderr, "Error: Out of memory.\n");
181 mosquitto_lib_cleanup();
182 return 1;
183 }
184 snprintf(id, len, "thermometers/%s", hostname);
185 if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { 199 if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
186 /* 200 /*
187 * Enforce maximum client id length of 23 characters 201 * Enforce maximum client id length of 23 characters
188 */ 202 */
189 id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; 203 id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
190 } 204 }
191 205
192 fprintf(stdout, "id: %s\n", id);
193
194 mosq = mosquitto_new(id, true, NULL); 206 mosq = mosquitto_new(id, true, NULL);
195 if(!mosq) { 207 if(!mosq) {
196 switch(errno) { 208 switch(errno) {
197 case ENOMEM: 209 case ENOMEM:
198 if (!quiet) 210 syslog(LOG_NOTICE, "mosquitto_new: Out of memory");
199 fprintf(stderr, "Error: Out of memory.\n");
200 break; 211 break;
201 case EINVAL: 212 case EINVAL:
202 if (!quiet) 213 syslog(LOG_NOTICE, "mosquitto_new: Invalid id");
203 fprintf(stderr, "Error: Invalid id.\n");
204 break; 214 break;
205 } 215 }
206 mosquitto_lib_cleanup(); 216 mosquitto_lib_cleanup();
207 return 1; 217 return 1;
208 } 218 }
209 219
210 if(debug) { 220 if (debug) {
211 mosquitto_log_callback_set(mosq, my_log_callback); 221 mosquitto_log_callback_set(mosq, my_log_callback);
212 } 222 }
213 223
214 /* 224 /*
215 * Set our will 225 * Set our will
216 */ 226 */
217 topic = malloc(28 + strlen(hostname)); 227 state = xstrcpy((char *)"clients/");
218 sprintf(topic, "clients/%s/thermometers/state", hostname); 228 state = xstrcat(state, hostname);
229 state = xstrcat(state, (char *)"/thermometers/state");
219 sprintf(buf, "0"); 230 sprintf(buf, "0");
220 rc = mosquitto_will_set(mosq, topic, strlen(buf), buf, qos, true); 231
232 rc = mosquitto_will_set(mosq, state, strlen(buf), buf, qos, true);
221 if (rc) { 233 if (rc) {
222 if (rc == MOSQ_ERR_INVAL) { 234 if (rc == MOSQ_ERR_INVAL) {
223 fprintf(stderr, "Input parameters invalid\n"); 235 syslog(LOG_NOTICE, "mosquitto_will_set: input parameters invalid");
224 } else if (rc == MOSQ_ERR_NOMEM) { 236 } else if (rc == MOSQ_ERR_NOMEM) {
225 fprintf(stderr, "Out of Memory\n"); 237 syslog(LOG_NOTICE, "mosquitto_will_set: Out of Memory");
226 } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) { 238 } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) {
227 fprintf(stderr, "Invalid payload size\n"); 239 syslog(LOG_NOTICE, "mosquitto_will_set: invalid payload size");
228 } 240 }
229 mosquitto_lib_cleanup(); 241 mosquitto_lib_cleanup();
230 return rc; 242 return rc;
231 } 243 }
232 244
233 mosquitto_max_inflight_messages_set(mosq, max_inflight); 245 mosquitto_max_inflight_messages_set(mosq, max_inflight);
234 mosquitto_connect_callback_set(mosq, my_connect_callback); 246 mosquitto_connect_callback_set(mosq, my_connect_callback);
235 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); 247 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
236 mosquitto_publish_callback_set(mosq, my_publish_callback); 248 mosquitto_publish_callback_set(mosq, my_publish_callback);
237 249
238 rc = mosquitto_connect(mosq, host, port, keepalive); 250 rc = mosquitto_connect(mosq, Config.mosq_host, Config.mosq_port, keepalive);
239 if (rc) { 251 if (rc) {
240 if (rc == MOSQ_ERR_ERRNO) { 252 if (rc == MOSQ_ERR_ERRNO) {
241 strerror_r(errno, err, 1024); 253 strerror_r(errno, err, 1024);
242 fprintf(stderr, "Error: %s\n", err); 254 syslog(LOG_NOTICE, "mosquitto_connect: error: %s", err);
243 } else { 255 } else {
244 fprintf(stderr, "Unable to connect (%d).\n", rc); 256 syslog(LOG_NOTICE, "mosquitto_connect: unable to connect (%d)", rc);
245 } 257 }
246 mosquitto_lib_cleanup(); 258 mosquitto_lib_cleanup();
247 return rc; 259 return rc;
248 } 260 }
261 syslog(LOG_NOTICE, "Connected with %s:%d", Config.mosq_host, Config.mosq_port);
249 262
250 /* 263 /*
251 * Initialise is complete, report our presence state 264 * Initialise is complete, report our presence state
252 */ 265 */
253 mosquitto_loop_start(mosq); 266 mosquitto_loop_start(mosq);
254 267
255 // topic = malloc(28 + strlen(hostname));
256 sprintf(topic, "clients/%s/thermometers/state", hostname);
257 sprintf(buf, "1"); 268 sprintf(buf, "1");
258 rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, 1); 269 rc = mosquitto_publish(mosq, &mid_sent, state, strlen(buf), buf, qos, 1);
259 free(topic);
260
261 270
262 fprintf(stdout, (char *)"Enter loop, connected %d\n", connected); 271 fprintf(stdout, (char *)"Enter loop, connected %d\n", connected);
263 do { 272 do {
264 if (status == STATUS_CONNACK_RECVD) { 273 if (status == STATUS_CONNACK_RECVD) {
265 // fprintf(stdout, (char *)"Ok\n"); 274 /*
266 // if(fgets(buf, 1024, stdin)){ 275 * Sleep just log enough to keep the system load low.
267 // buf[strlen(buf)-1] = '\0'; 276 */
268 // rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, retain); 277 usleep(1);
269 // if(rc2){ 278 /*
270 // if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2); 279 * Here send our sensors values
271 // mosquitto_disconnect(mosq); 280 */
272 // } 281
273 // } else
274 if (shutdown) { 282 if (shutdown) {
275 fprintf(stdout, (char *)"Shutdown\n"); 283 /*
276 topic = malloc(28 + strlen(hostname)); 284 * Final publish 0 to clients/<hostname>/thermometers/state
277 sprintf(topic, "clients/%s/thermometers/state", hostname); 285 */
278 sprintf(buf, "0"); 286 sprintf(buf, "0");
279 rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, true); 287 mosquitto_publish(mosq, &mid_sent, state, strlen(buf), buf, qos, true);
280 free(topic); 288 free(topic);
281 last_mid = mid_sent; 289 last_mid = mid_sent;
282 status = STATUS_WAITING; 290 status = STATUS_WAITING;
283 } 291 }
284 } else if (status == STATUS_WAITING) { 292 } else if (status == STATUS_WAITING) {
297 mosquitto_loop_stop(mosq, false); 305 mosquitto_loop_stop(mosq, false);
298 306
299 mosquitto_destroy(mosq); 307 mosquitto_destroy(mosq);
300 mosquitto_lib_cleanup(); 308 mosquitto_lib_cleanup();
301 309
302 fprintf(stdout, (char *)"Bye Bye\n");
303 return 0; 310 return 0;
304 } 311 }
305 312

mercurial