bmsd/co2meters.c

Wed, 06 Dec 2023 20:26:00 +0100

author
Michiel Broek <mbroek@mbse.eu>
date
Wed, 06 Dec 2023 20:26:00 +0100
changeset 855
2d328a2a4025
parent 852
71f0fa38b634
permissions
-rw-r--r--

Fixed init scripts names in Makefile. Update crontasks to use the database to check the log entries for products.

/**
 * @file co2meters.c
 * @brief Handle co2meters status
 * @author Michiel Broek <mbroek at mbse dot eu>
 *
 * Copyright (C) 2019-2023
 *
 * This file is part of the bms (Brewery Management System)
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation; either version 2, or (at your option) any
 * later version.
 *
 * bms is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with ThermFerm; see the file COPYING.  If not, write to the Free
 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
 */


#include "bms.h"
#include "xutil.h"
#include "co2meters.h"
#include "mysql.h"
#include "websocket.h"
#include "nodes.h"


sys_co2meter_list	*co2meters = NULL;

extern int		debug;
extern sys_config       Config;
extern MYSQL		*con;
extern MYSQL_RES	*res_set;
extern MYSQL_ROW	row;



void co2meter_ws_send(sys_co2meter_list *co2meter)
{
    char	*msg = NULL, buf[65];

    msg = xstrcpy((char *)"{\"device\":\"co2meters\",\"node\":\"");
    msg = xstrcat(msg, co2meter->node);
    msg = xstrcat(msg, (char *)"\",\"unit\":\"");
    msg = xstrcat(msg, co2meter->alias);
    msg = xstrcat(msg, (char *)"\",\"online\":");
    msg = xstrcat(msg, co2meter->online ? (char *)"1":(char *)"0");
    msg = xstrcat(msg, (char *)",\"mode\":\"");
    msg = xstrcat(msg, co2meter->mode);
    msg = xstrcat(msg, (char *)"\",\"beeruuid\":\"");
    msg = xstrcat(msg, co2meter->beeruuid);
    msg = xstrcat(msg, (char *)"\",\"beercode\":\"");
    msg = xstrcat(msg, co2meter->beercode);
    msg = xstrcat(msg, (char *)"\",\"beername\":\"");
    msg = xstrcat(msg, co2meter->beername);
    msg = xstrcat(msg, (char *)"\",\"temperature_state\":\"");
    msg = xstrcat(msg, co2meter->temperature_state);
    msg = xstrcat(msg, (char *)"\",\"temperature_address\":\"");
    msg = xstrcat(msg, co2meter->temperature_address);
    msg = xstrcat(msg, (char *)"\",\"temperature\":");
    snprintf(buf, 64, "%.3f", co2meter->temperature);
    msg = xstrcat(msg, buf);
    msg = xstrcat(msg, (char *)",\"pressure_state\":\"");
    msg = xstrcat(msg, co2meter->pressure_state);
    msg = xstrcat(msg, (char *)"\",\"pressure_channel\":");
    snprintf(buf, 64, "%d", co2meter->pressure_channel);
    msg = xstrcat(msg, buf);
    msg = xstrcat(msg, (char *)",\"pressure_voltage\":");
    snprintf(buf, 64, "%.3f", co2meter->pressure_voltage);
    msg = xstrcat(msg, buf);
    msg = xstrcat(msg, (char *)",\"pressure_zero\":");
    snprintf(buf, 64, "%.3f", co2meter->pressure_zero);
    msg = xstrcat(msg, buf);
    msg = xstrcat(msg, (char *)",\"pressure_bar\":");
    snprintf(buf, 64, "%.3f", co2meter->pressure_bar);
    msg = xstrcat(msg, buf);
    msg = xstrcat(msg, (char *)",\"alarm\":");
    snprintf(buf, 64, "%d", co2meter->alarm);
    msg = xstrcat(msg, buf);
    msg = xstrcat(msg, (char *)"}");
    ws_broadcast(msg);
    free(msg);
    msg = NULL;
}



void co2meter_ws_receive(char *payload)
{
    struct json_object	*jobj, *val;
    sys_co2meter_list	*tmpp;
    char		*node = NULL, *alias = NULL, *beeruuid = NULL, *beercode = NULL, *beername = NULL;
    char		query[512], *end;
    MYSQL               *con2 = NULL;

    /*
     * Process the JSON formatted payload.
     */
    jobj = json_tokener_parse(payload);
    if (json_object_object_get_ex(jobj, "node", &val))
	node = xstrcpy((char *)json_object_get_string(val));
    if (json_object_object_get_ex(jobj, "unit", &val))
	alias = xstrcpy((char *)json_object_get_string(val));
    if (json_object_object_get_ex(jobj, "beeruuid", &val))
	beeruuid = xstrcpy((char *)json_object_get_string(val));
    if (json_object_object_get_ex(jobj, "beercode", &val))
	beercode = xstrcpy((char *)json_object_get_string(val));
    if (json_object_object_get_ex(jobj, "beername", &val))
	beername = xstrcpy((char *)json_object_get_string(val));
    json_object_put(jobj);

    /*
     * Search co2meter record in the memory array and use it if found.
     */
    if (co2meters) {
        for (tmpp = co2meters; tmpp; tmpp = tmpp->next) {
            if ((strcmp(tmpp->alias, alias) == 0) && (strcmp(tmpp->node, node) == 0)) {
		if (beeruuid && beercode && beername) {
		    con2 = mysql_init(NULL);
		    if (con2 == NULL) {
			syslog(LOG_NOTICE, "MySQL: mysql_init() failed");
		    } else {
			if (mysql_real_connect(con2, Config.mysql_host, Config.mysql_user, Config.mysql_pass, Config.mysql_database, Config.mysql_port, NULL, 0) == NULL) {
			    syslog(LOG_NOTICE, "MySQL: mysql_real_connect() %s", mysql_error(con2));
			} else {
			    end = stpcpy(query, "UPDATE mon_co2meters SET beeruuid='");
			    end += mysql_real_escape_string(con2, end, beeruuid, strlen(beeruuid));
			    end = stpcpy(end, "', beercode='");
			    end += mysql_real_escape_string(con2, end, beercode, strlen(beercode));
			    end = stpcpy(end, "', beername='");
			    end += mysql_real_escape_string(con2, end, beername, strlen(beername));
			    end = stpcpy(end, "' WHERE node='");
			    end += mysql_real_escape_string(con2, end, node, strlen(node));
			    end = stpcpy(end, "' AND alias='");
			    end += mysql_real_escape_string(con2, end, alias, strlen(alias));
			    end = stpcpy(end, "'");

			    if (mysql_real_query(con2, query, (unsigned int) (end - query))) {
				syslog(LOG_NOTICE, "MySQL: `%s' error %u (%s))", query, mysql_errno(con2), mysql_error(con2));
			    } else {
				/* Database updated, now update internal memory */
				//syslog(LOG_NOTICE, "MySQL: `%s' Ok", query);
				if (tmpp->beercode)
                                    free(tmpp->beercode);
                            	tmpp->beercode = xstrcpy(beercode);
                            	if (tmpp->beername)
                                    free(tmpp->beername);
                            	tmpp->beername = xstrcpy(beername);
				if (tmpp->beeruuid)
				    free(tmpp->beeruuid);
				tmpp->beeruuid = xstrcpy(beeruuid);
				/* Report new state to the client */
				co2meter_ws_send(tmpp);
				syslog(LOG_NOTICE, "Set co2meter %s/%s new beer %s %s", node, alias, beercode, beername);
			    }
			    mysql_close(con2);
			}
		    }
		}
                break;
            }
        }
    }

    if (node)
	free(node);
    if (alias)
	free(alias);
    if (beeruuid)
	free(beeruuid);
    if (beercode)
	free(beercode);
    if (beername)
	free(beername);
}



void co2meter_set(char *edge_node, char *alias, char *payload)
{
    struct json_object	*jobj, *val, *sensor;
    sys_co2meter_list	*co2meter, *tmpp;
    bool		new_co2meter = true;

//    fprintf(stdout, "co2meter_set: %s/%s %s\n", edge_node, alias, payload);

    /*
     * Search co2meter record in the memory array and use it if found.
     */
    if (co2meters) {
	for (tmpp = co2meters; tmpp; tmpp = tmpp->next) {
	    if ((strcmp(tmpp->alias, alias) == 0) && (strcmp(tmpp->node, edge_node) == 0)) {
		new_co2meter = false;
		co2meter = tmpp;
		break;
	    }
	}
    }

//    printf("new_co2meter %s\n", new_co2meter ? "true":"false");

    /*
     * Allocate new co2meter if not yet known.
     */
    if (new_co2meter) {
	co2meter = (sys_co2meter_list *)malloc(sizeof(sys_co2meter_list));
	memset(co2meter, 0, sizeof(sys_co2meter_list));
	co2meter->alias = xstrcpy(alias);
	co2meter->node = xstrcpy(edge_node);
	co2meter->mode = xstrcpy((char *)"OFF");
    }

    if (! co2meter->online) {
    	co2meter->online = true;
    	syslog(LOG_NOTICE, "Online co2meter %s/%s mode %s", edge_node, alias, co2meter->mode);
    }

    /*
     * Process the JSON formatted payload. 
     * Update only the fields that are found in the payload.
     */
    jobj = json_tokener_parse(payload);

    if (json_object_object_get_ex(jobj, "uuid", &val)) {
	if (co2meter->uuid)
	    free(co2meter->uuid);
	co2meter->uuid = xstrcpy((char *)json_object_get_string(val));
    }
    if (json_object_object_get_ex(jobj, "mode", &val)) {
	if (co2meter->mode) {
	    if (strcmp(co2meter->mode, (char *)json_object_get_string(val))) {
		syslog(LOG_NOTICE, "Change mode co2meter %s/%s: %s to %s", edge_node, alias, co2meter->mode, (char *)json_object_get_string(val));
	    }
	    free(co2meter->mode);
	}
	co2meter->mode = xstrcpy((char *)json_object_get_string(val));
    }
    if (json_object_object_get_ex(jobj, "alarm", &val)) {
	co2meter->alarm = json_object_get_int(val);
    }
    if (json_object_object_get_ex(jobj, "temperature", &sensor)) {
	if (json_object_object_get_ex(sensor, "address", &val)) {
	    if (co2meter->temperature_address)
		free(co2meter->temperature_address);
	    co2meter->temperature_address = xstrcpy((char *)json_object_get_string(val));
	}
	if (json_object_object_get_ex(sensor, "state", &val)) {
	    if (co2meter->temperature_state)
		free(co2meter->temperature_state);
	    co2meter->temperature_state = xstrcpy((char *)json_object_get_string(val));
	}
	if (json_object_object_get_ex(sensor, "temperature", &val)) {
	    co2meter->temperature = json_object_get_double(val);
	}
    }
    if (json_object_object_get_ex(jobj, "pressure", &sensor)) {
	if (json_object_object_get_ex(sensor, "state", &val)) {
            if (co2meter->pressure_state)
                free(co2meter->pressure_state);
            co2meter->pressure_state = xstrcpy((char *)json_object_get_string(val));
        }
	if (json_object_object_get_ex(sensor, "channel", &val)) {
            co2meter->pressure_channel = json_object_get_int(val);
        }
	if (json_object_object_get_ex(sensor, "voltage", &val)) {
            co2meter->pressure_voltage = json_object_get_double(val);
        }
	if (json_object_object_get_ex(sensor, "zero", &val)) {
            co2meter->pressure_zero = json_object_get_double(val);
        }
	if (json_object_object_get_ex(sensor, "bar", &val)) {
            co2meter->pressure_bar = json_object_get_double(val);
        }
    }
    json_object_put(jobj);

    co2meter_ws_send(co2meter);

//    co2meter_dump(co2meter);

    if (new_co2meter) {
    	if (co2meters == NULL) {
	    co2meters = co2meter;
	} else {
	    for (tmpp = co2meters; tmpp; tmpp = tmpp->next) {
		if (tmpp->next == NULL) {
		    tmpp->next = co2meter;
		    break;
		}
	    }
	}
	co2meter_mysql_insert(co2meter);
    } else {
	co2meter_mysql_update(co2meter);
    }
}



/*
 * With DBIRTH all active co2meters are publishd in an array.
 */
void co2meter_birth_data(char *topic, char *payload)
{
    char		*message_type, *edge_node, *alias;
    struct json_object  *jobj, *val, *metric, *units, *unit;
    int			arraylen;

    strtok(topic, "/"); // ignore namespace
    strtok(NULL, "/");
    message_type = strtok(NULL, "/");
    edge_node = strtok(NULL, "/\0");
    alias = strtok(NULL, "/\0");

    if ((alias == NULL) && (strcmp("DBIRTH", message_type) == 0)) {
	/*
	 * Global initial DBIRTH message with array of co2meters.
	 */
	jobj = json_tokener_parse(payload);

	if (json_object_object_get_ex(jobj, "metric", &metric)) {
	    if (json_object_object_get_ex(metric, "units", &units)) {
		arraylen = json_object_array_length(units);
		for (int i = 0; i < arraylen; i++) {
		    /*
		     * Parse the array of units
		     */
		    unit = json_object_array_get_idx(units, i);

		    if (json_object_object_get_ex(unit, "alias", &val)) {
			if (alias)
			    free(alias);
			alias = xstrcpy((char *)json_object_get_string(val));
		    	co2meter_set(edge_node, alias, (char *)json_object_to_json_string_ext(unit, 0));
			free(alias);
			alias = NULL;
		    }
		}
	    }
	}
	json_object_put(jobj);
	return;
    }

    /*
     * The rest are errors.
     */
    printf("ERROR co2meter_birth_data: %s %s %s\n", message_type, edge_node, alias);
}



void co2meter_log(char *topic, char *payload)
{
    char                *edge_node, *alias, buf[128], *query = malloc(512);
    struct json_object  *jobj, *val, *metric;
    co2pressure_log	*log;
    struct tm		*mytime;
    time_t		timestamp;
    bool		trigger = false;

    strtok(topic, "/"); // ignore namespace
    strtok(NULL, "/");	// group_id
    strtok(NULL, "/");	// message_type
    edge_node = strtok(NULL, "/\0");
    alias = strtok(NULL, "/\0");

    log = (co2pressure_log *)malloc(sizeof(co2pressure_log));
    memset(log, 0, sizeof(co2pressure_log));

    log->node = xstrcpy(edge_node);
    log->alias = xstrcpy(alias);
    jobj = json_tokener_parse(payload);

    timestamp = time(NULL);
    log->datetime = malloc(73);
    mytime = localtime(&timestamp);
    snprintf(log->datetime, 72, "%04d-%02d-%02d %02d:%02d:%02d",
	mytime->tm_year + 1900, mytime->tm_mon + 1, mytime->tm_mday, mytime->tm_hour, mytime->tm_min, mytime->tm_sec);
    if ((mytime->tm_min % 10) == 0)
	trigger = true;

    if (json_object_object_get_ex(jobj, "metric", &metric)) {
	if (json_object_object_get_ex(metric, "uuid", &val)) {
	    if (strcmp((char *)"(null)", json_object_get_string(val)))
	    	log->uuid = xstrcpy((char *)json_object_get_string(val));
	}
	if (json_object_object_get_ex(metric, "temperature", &val)) {
	    log->temperature = json_object_get_double(val);
	}
	if (json_object_object_get_ex(metric, "pressure", &val)) {
            log->pressure = json_object_get_double(val);
        }
    }
    json_object_put(jobj);

    /*
     * Because co2meters are not so smart and don't hold product information
     * search the missing pieces in the database.
     */
    snprintf(buf, 127, "SELECT beercode,beername,beeruuid FROM mon_co2meters WHERE uuid='%s'", log->uuid);
    if (mysql_query(con, buf)) {
        syslog(LOG_NOTICE, "MySQL: %s error %u (%s))", buf, mysql_errno(con), mysql_error(con));
    } else {
        res_set = mysql_store_result(con);
        if (res_set == NULL) {
            syslog(LOG_NOTICE, "MySQL: mysq_store_result error %u (%s))", mysql_errno(con), mysql_error(con));
        } else {
            if ((row = mysql_fetch_row(res_set)) != NULL) {
		/*
		 * Ignore when the beer_name or beer_code is not set.
		 */
		if ((int)strlen(row[0]) == 0 || (int)strlen(row[1]) == 0) {
		    if (log->datetime)
        		free(log->datetime);
    		    if (log->uuid)
        		free(log->uuid);
    		    if (log->node)
        		free(log->node);
    		    if (log->alias)
        		free(log->alias);
    		    free(log);
		    return;
		}
		log->product_code = xstrcpy(row[0]);
		log->product_name = xstrcpy(row[1]);
		log->product_uuid = xstrcpy(row[2]);
	    }
	}
    }

    /*
     * Build the MySQL log and insert if trigger is set.
     */
    if (trigger) {
	snprintf(query, 511, "INSERT IGNORE INTO log_co2pressure SET code='%s', datetime='%s', temperature='%.4f', " \
			"pressure='%.4f', uuid='%s'",
			log->product_code, log->datetime, log->temperature, log->pressure, log->uuid);
	//syslog(LOG_NOTICE, "%s", query);
	bms_mysql_query(query);
    }

    if (log->datetime)
       free(log->datetime);
    if (log->product_uuid )
       free(log->product_uuid );
    if (log->product_code )
       free(log->product_code );
    if (log->product_name )
       free(log->product_name );
    if (log->uuid)
       free(log->uuid);
    if (log->node)
       free(log->node);
    if (log->alias)
       free(log->alias);
    free(log);
}



void co2meter_dump(sys_co2meter_list *co2meter)
{
    if (debug) {
    	printf("uuid        %s\n", co2meter->uuid);
    	printf("alias       %s\n", co2meter->alias);
    	printf("node        %s\n", co2meter->node);
	printf("online      %s\n", co2meter->online ? "yes":"no");
	printf("product     %s / %s\n", co2meter->beercode, co2meter->beername);
	printf("Temperature %-16s %10s %8.3f\n", co2meter->temperature_address, co2meter->temperature_state, co2meter->temperature);
	printf("Pressure    %10s %d %.3f %.3f %.3f\n", co2meter->pressure_state, co2meter->pressure_channel,
				co2meter->pressure_voltage, co2meter->pressure_zero, co2meter->pressure_bar);
	printf("mode        %s\n", co2meter->mode);
    }
}

mercurial