/* * Copyright (C) 2015 Jan-Piet Mens and OwnTracks * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * JAN-PIET MENS OR OWNTRACKS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. */ #include #include #include #include #include #include #include #include #include #include #include #include "json.h" #include #include "utstring.h" #include "geo.h" #include "geohash.h" #include "base64.h" #include "misc.h" #include "util.h" #include "storage.h" #ifdef WITH_LMDB # include "gcache.h" #endif #ifdef WITH_HTTP # include "http.h" #endif #ifdef WITH_LUA # include "hooks.h" #endif #define SSL_VERIFY_PEER (1) #define SSL_VERIFY_NONE (0) #define TOPIC_PARTS (4) /* owntracks/user/device/info */ #define DEFAULT_QOS (2) #define CLEAN_SESSION false static int run = 1; double number(JsonNode *j, char *element) { JsonNode *m; double d; if ((m = json_find_member(j, element)) != NULL) { if (m->tag == JSON_NUMBER) { return (m->number_); } else if (m->tag == JSON_STRING) { d = atof(m->string_); /* Normalize to number */ json_remove_from_parent(m); json_append_member(j, element, json_mknumber(d)); return (d); } } return (NAN); } static const char *ltime(time_t t) { static char buf[] = "HH:MM:SS"; strftime(buf, sizeof(buf), "%T", localtime(&t)); return(buf); } /* * Process info/ message containing a CARD. If the payload is a card, return TRUE. */ int do_info(void *userdata, UT_string *username, UT_string *device, JsonNode *json) { struct udata *ud = (struct udata *)userdata; JsonNode *j; static UT_string *name = NULL, *face = NULL; FILE *fp; char *img; int rc = FALSE; utstring_renew(name); utstring_renew(face); /* I know the payload is valid JSON: write card */ if ((fp = pathn("wb", "cards", username, NULL, "json")) != NULL) { char *js = json_stringify(json, NULL); if (js) { fprintf(fp, "%s\n", js); free(js); } fclose(fp); } rc = TRUE; if ((j = json_find_member(json, "name")) != NULL) { if (j->tag == JSON_STRING) { // printf("I got: [%s]\n", j->string_); utstring_printf(name, "%s", j->string_); } } if ((j = json_find_member(json, "face")) != NULL) { if (j->tag == JSON_STRING) { // printf("I got: [%s]\n", j->string_); utstring_printf(face, "%s", j->string_); } } if (ud->verbose) { printf("* CARD: %s-%s %s\n", UB(username), UB(device), UB(name)); } /* We have a base64-encoded "face". Decode it and store binary image */ if ((img = malloc(utstring_len(face))) != NULL) { int imglen; if ((imglen = base64_decode(UB(face), img)) > 0) { if ((fp = pathn("wb", "photos", username, NULL, "png")) != NULL) { fwrite(img, sizeof(char), imglen, fp); fclose(fp); } } free(img); } return (rc); } void do_msg(void *userdata, UT_string *username, UT_string *device, JsonNode *json) { struct udata *ud = (struct udata *)userdata; FILE *fp; /* I know the payload is valid JSON: write message */ if ((fp = pathn("ab", "msg", username, NULL, "json")) != NULL) { char *js = json_stringify(json, NULL); if (js) { fprintf(fp, "%s\n", js); free(js); } fclose(fp); } if (ud->verbose) { printf("* MSG: %s-%s\n", UB(username), UB(device)); } } void republish(struct mosquitto *mosq, struct udata *userdata, char *username, char *topic, double lat, double lon, char *cc, char *addr, long tst, char *t) { struct udata *ud = (struct udata *)userdata; JsonNode *json; static UT_string *newtopic = NULL; char *payload; if (ud->pubprefix == NULL) return; if ((json = json_mkobject()) == NULL) { return; } utstring_renew(newtopic); utstring_printf(newtopic, "%s/%s", ud->pubprefix, topic); json_append_member(json, "username", json_mkstring(username)); json_append_member(json, "topic", json_mkstring(topic)); json_append_member(json, "cc", json_mkstring(cc)); json_append_member(json, "addr", json_mkstring(addr)); json_append_member(json, "t", json_mkstring(t)); json_append_member(json, "tst", json_mknumber(tst)); json_append_member(json, "lat", json_mknumber(lat)); json_append_member(json, "lon", json_mknumber(lon)); if ((payload = json_stringify(json, NULL)) != NULL) { mosquitto_publish(mosq, NULL, UB(newtopic), strlen(payload), payload, 1, true); fprintf(stderr, "%s %s\n", UB(newtopic), payload); free(payload); } json_delete(json); } /* * Decode OwnTracks CSV (Greenwich) and return a new JSON object * of _type = location. */ #define MILL 1000000.0 JsonNode *csv_to_json(char *payload) { JsonNode *json; char tid[64], t[10]; double dist = 0, lat, lon, vel, trip, alt, cog; long tst; char tmptst[40]; if (sscanf(payload, "%[^,],%[^,],%[^,],%lf,%lf,%lf,%lf,%lf,%lf,%lf", tid, tmptst, t, &lat, &lon, &cog, &vel, &alt, &dist, &trip) != 10) { // fprintf(stderr, "**** payload not CSV: %s\n", payload); return (NULL); } lat /= MILL; lon /= MILL; cog *= 10; alt *= 10; trip *= 1000; tst = strtoul(tmptst, NULL, 16); json = json_mkobject(); json_append_member(json, "_type", json_mkstring("location")); json_append_member(json, "t", json_mkstring(t)); json_append_member(json, "tid", json_mkstring(tid)); json_append_member(json, "tst", json_mknumber(tst)); json_append_member(json, "lat", json_mknumber(lat)); json_append_member(json, "lon", json_mknumber(lon)); json_append_member(json, "cog", json_mknumber(cog)); json_append_member(json, "vel", json_mknumber(vel)); json_append_member(json, "alt", json_mknumber(alt)); json_append_member(json, "dist", json_mknumber(dist)); json_append_member(json, "trip", json_mknumber(trip)); json_append_member(json, "csv", json_mkbool(1)); return (json); } #define RECFORMAT "%s\t%-18s\t%s\n" /* * Store payload in REC file unless our Lua putrec() function says * we shouldn't for this particular user/device combo. */ static void putrec(struct udata *ud, time_t now, UT_string *reltopic, UT_string *username, UT_string *device, char *string) { FILE *fp; int rc = 0; #ifdef WITH_LUA rc = hooks_norec(ud, UB(username), UB(device), string); #endif if (rc == 0) { if ((fp = pathn("a", "rec", username, device, "rec")) == NULL) { olog(LOG_ERR, "Cannot write REC for %s/%s: %m", UB(username), UB(device)); } fprintf(fp, RECFORMAT, isotime(now), UB(reltopic), string); fclose(fp); } } void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m) { JsonNode *json, *j, *geo = NULL; char *tid = NULL, *t = NULL, *p; double lat, lon; long tst; struct udata *ud = (struct udata *)userdata; char **topics; int count = 0, cached; static UT_string *basetopic = NULL, *username = NULL, *device = NULL, *addr = NULL, *cc = NULL, *ghash = NULL, *ts = NULL; static UT_string *reltopic = NULL; char *jsonstring, *_typestr = NULL; time_t now; int pingping = FALSE, skipslash = 0; payload_type _type; /* * mosquitto_message-> * int mid; * char *topic; * void *payload; * int payloadlen; * int qos; * bool retain; */ time(&now); monitorhook(ud, now, m->topic); if (m->payloadlen == 0) { return; } if (m->retain == TRUE && ud->ignoreretained) { return; } // printf("%s %s\n", m->topic, bindump(m->payload, m->payloadlen)); fflush(stdout); utstring_renew(ts); utstring_renew(basetopic); utstring_renew(username); utstring_renew(device); if (mosquitto_sub_topic_tokenise(m->topic, &topics, &count) != MOSQ_ERR_SUCCESS) { return; } /* * Do we have a leading / in topic? * Also, if topic is too short, ignore and return. We *demand* 3 parts * i.e. "owntracks/user/device" */ if (topics[0] == NULL) { /* Topic has leading / */ skipslash = 1; } if (count - skipslash < 3) { fprintf(stderr, "Ignoring short topic %s\n", m->topic); mosquitto_sub_topic_tokens_free(&topics, count); return; } /* * Determine "relative topic", relative to base, i.e. whatever comes * behind ownntracks/user/device/. If it's the base topic, use "*". */ utstring_renew(reltopic); if (count != (3 + skipslash)) { int j; for (j = 3 + skipslash; j < count; j++) { utstring_printf(reltopic, "%s%c", topics[j], (j < count - 1) ? '/' : ' '); } } else { utstring_printf(reltopic, "*"); } if (utstring_len(reltopic) == 0) utstring_printf(reltopic, "-"); utstring_printf(basetopic, "%s/%s/%s", topics[0 + skipslash], topics[1 + skipslash], topics[2 + skipslash]); utstring_printf(username, "%s", topics[1 + skipslash]); utstring_printf(device, "%s", topics[2 + skipslash]); mosquitto_sub_topic_tokens_free(&topics, count); #ifdef WITH_PING if (!strcmp(UB(username), "ping") && !strcmp(UB(device), "ping")) { pingping = TRUE; } #endif /* * First things first: let's see if this contains some sort of valid JSON * or an OwnTracks CSV. If it doesn't, just store this payload because * there's nothing left for us to do with it. */ if ((json = json_decode(m->payload)) == NULL) { if ((json = csv_to_json(m->payload)) == NULL) { /* It's not JSON or it's not a location CSV; store it */ putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); return; } } if (ud->skipdemo && (json_find_member(json, "_demo") != NULL)) { json_delete(json); return; } _type = T_UNKNOWN; if ((j = json_find_member(json, "_type")) != NULL) { if (j->tag == JSON_STRING) { _typestr = strdup(j->string_); if (!strcmp(j->string_, "location")) _type = T_LOCATION; else if (!strcmp(j->string_, "beacon")) _type = T_BEACON; else if (!strcmp(j->string_, "card")) _type = T_CARD; else if (!strcmp(j->string_, "cmd")) _type = T_CMD; else if (!strcmp(j->string_, "configuration")) _type = T_CONFIG; else if (!strcmp(j->string_, "lwt")) _type = T_LWT; else if (!strcmp(j->string_, "msg")) _type = T_MSG; else if (!strcmp(j->string_, "steps")) _type = T_STEPS; else if (!strcmp(j->string_, "transition")) _type = T_TRANSITION; else if (!strcmp(j->string_, "waypoint")) _type = T_WAYPOINT; else if (!strcmp(j->string_, "waypoints")) _type = T_WAYPOINTS; } } switch (_type) { case T_CARD: do_info(ud, username, device, json); goto cleanup; case T_MSG: do_msg(ud, username, device, json); goto cleanup; case T_BEACON: case T_CMD: case T_CONFIG: case T_LWT: case T_STEPS: case T_WAYPOINTS: putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); goto cleanup; case T_WAYPOINT: case T_TRANSITION: case T_LOCATION: break; default: putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); goto cleanup; } /* * We are now handling location-related JSON. Normalize tst, lat, lon * to numbers, particularly for Greenwich which produces strings * currently. We're normalizing *in* json which replaces strings by * numbers. */ tst = time(NULL); if ((j = json_find_member(json, "tst")) != NULL) { if (j && j->tag == JSON_STRING) { tst = strtoul(j->string_, NULL, 10); json_remove_from_parent(j); json_append_member(json, "tst", json_mknumber(tst)); } else { tst = (unsigned long)j->number_; } } if (isnan(lat = number(json, "lat")) || isnan(lon = number(json, "lon"))) { olog(LOG_ERR, "lat or lon for %s are NaN: %s", m->topic, bindump(m->payload, m->payloadlen)); goto cleanup; } if ((j = json_find_member(json, "tid")) != NULL) { if (j->tag == JSON_STRING) { tid = strdup(j->string_); } } if ((j = json_find_member(json, "t")) != NULL) { if (j && j->tag == JSON_STRING) { t = strdup(j->string_); } } #ifdef WITH_LMDB /* * If the topic we are handling is in topic2tid, replace the TID * in this payload with that from the database. */ if (ud->t2t) { char newtid[BUFSIZ]; long blen; if ((blen = gcache_get(ud->t2t, m->topic, newtid, sizeof(newtid))) > 0) { if ((j = json_find_member(json, "tid")) != NULL) json_remove_from_parent(j); json_append_member(json, "tid", json_mkstring(newtid)); } } #endif /* * Chances are high that what we have now contains lat, lon. Attempt to * perform or retrieve reverse-geo. */ utstring_renew(ghash); utstring_renew(addr); utstring_renew(cc); p = geohash_encode(lat, lon, geohash_prec()); if (p != NULL) { utstring_printf(ghash, "%s", p); free(p); } cached = FALSE; if (ud->revgeo == TRUE) { if ((geo = gcache_json_get(ud->gc, UB(ghash))) != NULL) { /* Habemus cached data */ cached = TRUE; if ((j = json_find_member(geo, "cc")) != NULL) { utstring_printf(cc, "%s", j->string_); } if ((j = json_find_member(geo, "addr")) != NULL) { utstring_printf(addr, "%s", j->string_); } } else { if ((geo = revgeo(lat, lon, addr, cc)) != NULL) { gcache_json_put(ud->gc, UB(ghash), geo); } else { /* We didn't obtain reverse Geo, maybe because of over * quota; make a note of the missing geohash */ char gfile[BUFSIZ]; FILE *fp; snprintf(gfile, BUFSIZ, "%s/ghash/missing", STORAGEDIR); if ((fp = fopen(gfile, "a")) != NULL) { fprintf(fp, "%s %lf %lf\n", UB(ghash), lat, lon); fclose(fp); } } } } else { utstring_printf(cc, "??"); utstring_printf(addr, "n.a."); } /* * We have normalized data in the JSON, so we can now write it * out to the REC file. */ if (!pingping) { if ((jsonstring = json_stringify(json, NULL)) != NULL) { putrec(ud, now, reltopic, username, device, jsonstring); free(jsonstring); } } /* * Append a few bits to the location type to add to LAST and * for Lua / Websockets. * I need a unique "key" in the Websocket clients to keep track * of which device is being updated; use topic. */ json_append_member(json, "topic", json_mkstring(m->topic)); /* * We have to know which user/device this is for in order to * determine whether a connected Websocket client is authorized * to see this. Add user/device */ json_append_member(json, "username", json_mkstring(UB(username))); json_append_member(json, "device", json_mkstring(UB(device))); json_append_member(json, "ghash", json_mkstring(UB(ghash))); if (_type == T_LOCATION || _type == T_WAYPOINT) { UT_string *filename = NULL; char *component; utstring_renew(filename); if (_type == T_LOCATION) { component = "last"; utstring_printf(filename, "%s-%s.json", UB(username), UB(device)); } else if (_type == T_WAYPOINT) { component = "waypoints"; utstring_printf(filename, "%s.json", isotime(tst)); } if ((jsonstring = json_stringify(json, NULL)) != NULL) { utstring_printf(ts, "%s/%s/%s/%s", STORAGEDIR, component, UB(username), UB(device)); if (mkpath(UB(ts)) < 0) { olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts)); } utstring_printf(ts, "/%s", UB(filename)); safewrite(UB(ts), jsonstring); free(jsonstring); } } /* * Now add more bits for Lua and Websocket, in particular the * Geo data. */ if (geo) { json_copy_to_object(json, geo, FALSE); } #ifdef WITH_HTTP if (ud->mgserver && !pingping) { http_ws_push_json(ud->mgserver, json); } #endif #ifdef WITH_LUA if (ud->luadata && !pingping) { hooks_hook(ud, m->topic, json); } #endif if (ud->verbose) { if (_type == T_LOCATION) { printf("%c %s %-35s t=%-1.1s tid=%-2.2s loc=%.5f,%.5f [%s] %s (%s)\n", (cached) ? '*' : '-', ltime(tst), m->topic, (t) ? t : " ", (tid) ? tid : "", lat, lon, UB(cc), UB(addr), UB(ghash) ); } else if (_type == T_TRANSITION) { JsonNode *e, *d; e = json_find_member(json, "event"); d = json_find_member(json, "desc"); printf("transition: %s %s\n", (e) ? e->string_ : "unknown", (d) ? d->string_ : "unknown"); } else if (_type == T_WAYPOINT) { j = json_find_member(json, "desc"); printf("waypoint: %s\n", (j) ? j->string_ : "unknown desc"); } else { if ((jsonstring = json_stringify(json, NULL)) != NULL) { printf("%s %s\n", _typestr, jsonstring); free(jsonstring); } else { printf("%s received\n", _typestr); } } } cleanup: if (geo) json_delete(geo); if (json) json_delete(json); if (tid) free(tid); if (t) free(t); if (_typestr) free(_typestr); } void on_connect(struct mosquitto *mosq, void *userdata, int rc) { struct udata *ud = (struct udata *)userdata; int mid; JsonNode *t; json_foreach(t, ud->topics) { if (t->tag == JSON_STRING) { olog(LOG_DEBUG, "Subscribing to %s (qos=%d)", t->string_, ud->qos); mosquitto_subscribe(mosq, &mid, t->string_, ud->qos); } } } static char *mosquitto_reason(int rc) { static char *reasons[] = { "Connection accepted", /* 0x00 */ "Connection refused: incorrect protocol version", /* 0x01 */ "Connection refused: invalid client identifier", /* 0x02 */ "Connection refused: server unavailable", /* 0x03 */ "Connection refused: bad username or password", /* 0x05 */ "Connection refused: not authorized", /* 0x06 */ "Connection refused: TLS error", /* 0x07 */ }; return ((rc >= 0 && rc <= 0x07) ? reasons[rc] : "unknown reason"); } void on_disconnect(struct mosquitto *mosq, void *userdata, int reason) { #ifdef WITH_LMDB struct udata *ud = (struct udata *)userdata; #endif if (reason == 0) { // client wish #ifdef WITH_LMDB gcache_close(ud->gc); #endif } else { olog(LOG_INFO, "Disconnected. Reason: 0x%X [%s]", reason, mosquitto_reason(reason)); } } static void catcher(int sig) { fprintf(stderr, "Going down on signal %d\n", sig); run = 0; } void usage(char *prog) { printf("Usage: %s [options..] topic [topic ...]\n", prog); printf(" --help -h this message\n"); printf(" --storage -S storage dir (%s)\n", STORAGEDEFAULT); printf(" --norevgeo -G disable ghash to reverge-geo lookups\n"); printf(" --skipdemo -D do not handle objects with _demo\n"); printf(" --useretained -R process retained messages (default: no)\n"); printf(" --clientid -i MQTT client-ID\n"); printf(" --qos -q MQTT QoS (dflt: 2)\n"); printf(" --pubprefix -P republish prefix (dflt: no republish)\n"); printf(" --host -H MQTT host (localhost)\n"); printf(" --port -p MQTT port (1883)\n"); printf(" --logfacility syslog facility (local0)\n"); printf(" --quiet disable printing of messages to stdout\n"); printf(" --initialize initialize storage\n"); printf(" --label