/* * Copyright (C) 2015-2016 Jan-Piet Mens and OwnTracks * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * JAN-PIET MENS OR OWNTRACKS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. */ #include #include #include #include #include #include #include #include #include #include #include #include "json.h" #include #include #include "utstring.h" #include "geo.h" #include "geohash.h" #include "base64.h" #include "misc.h" #include "util.h" #include "storage.h" #ifdef WITH_LMDB # include "gcache.h" #endif #ifdef WITH_HTTP # include "http.h" #endif #ifdef WITH_LUA # include "hooks.h" #endif #if WITH_ENCRYPT # include #endif #define SSL_VERIFY_PEER (1) #define SSL_VERIFY_NONE (0) #define TOPIC_PARTS (4) /* owntracks/user/device/info */ #define DEFAULT_QOS (2) #define CLEAN_SESSION false #define GWNUMBERSMAX 50 /* number of batt,ext,status in array */ static int run = 1; double number(JsonNode *j, char *element) { JsonNode *m; double d; if ((m = json_find_member(j, element)) != NULL) { if (m->tag == JSON_NUMBER) { return (m->number_); } else if (m->tag == JSON_STRING) { d = atof(m->string_); /* Normalize to number */ json_remove_from_parent(m); json_append_member(j, element, json_mknumber(d)); return (d); } } return (NAN); } static const char *ltime(time_t t) { static char buf[] = "HH:MM:SS"; strftime(buf, sizeof(buf), "%T", localtime(&t)); return(buf); } /* * Process info/ message containing a CARD. If the payload is a card, return TRUE. */ int do_info(void *userdata, UT_string *username, UT_string *device, JsonNode *json) { struct udata *ud = (struct udata *)userdata; JsonNode *j; static UT_string *name = NULL, *face = NULL; FILE *fp; char *img; int rc = FALSE; size_t imglen; utstring_renew(name); utstring_renew(face); /* I know the payload is valid JSON: write card */ if ((fp = pathn("wb", "cards", username, NULL, "json")) != NULL) { char *js = json_stringify(json, NULL); if (js) { fprintf(fp, "%s\n", js); free(js); } fclose(fp); } rc = TRUE; if ((j = json_find_member(json, "name")) != NULL) { if (j->tag == JSON_STRING) { // printf("I got: [%s]\n", j->string_); utstring_printf(name, "%s", j->string_); } } if ((j = json_find_member(json, "face")) != NULL) { if (j->tag == JSON_STRING) { // printf("I got: [%s]\n", j->string_); utstring_printf(face, "%s", j->string_); } } if (ud->verbose) { printf("* CARD: %s-%s %s\n", UB(username), UB(device), UB(name)); } /* We have a base64-encoded "face". Decode it and store binary image */ if ((img = base64_decode(UB(face), &imglen)) != NULL) { if ((fp = pathn("wb", "photos", username, NULL, "png")) != NULL) { fwrite(img, sizeof(char), imglen, fp); fclose(fp); } free(img); } return (rc); } void do_msg(void *userdata, UT_string *username, UT_string *device, JsonNode *json) { struct udata *ud = (struct udata *)userdata; FILE *fp; /* I know the payload is valid JSON: write message */ if ((fp = pathn("ab", "msg", username, NULL, "json")) != NULL) { char *js = json_stringify(json, NULL); if (js) { fprintf(fp, "%s\n", js); free(js); } fclose(fp); } if (ud->verbose) { printf("* MSG: %s-%s\n", UB(username), UB(device)); } } void republish(struct mosquitto *mosq, struct udata *userdata, char *username, char *topic, double lat, double lon, char *cc, char *addr, long tst, char *t) { struct udata *ud = (struct udata *)userdata; JsonNode *json; static UT_string *newtopic = NULL; char *payload; if (ud->pubprefix == NULL) return; if ((json = json_mkobject()) == NULL) { return; } utstring_renew(newtopic); utstring_printf(newtopic, "%s/%s", ud->pubprefix, topic); json_append_member(json, "username", json_mkstring(username)); json_append_member(json, "topic", json_mkstring(topic)); json_append_member(json, "cc", json_mkstring(cc)); json_append_member(json, "addr", json_mkstring(addr)); json_append_member(json, "t", json_mkstring(t)); json_append_member(json, "tst", json_mknumber(tst)); json_append_member(json, "lat", json_mknumber(lat)); json_append_member(json, "lon", json_mknumber(lon)); if ((payload = json_stringify(json, NULL)) != NULL) { mosquitto_publish(mosq, NULL, UB(newtopic), strlen(payload), payload, 1, true); fprintf(stderr, "%s %s\n", UB(newtopic), payload); free(payload); } json_delete(json); } /* * Quickly check wheterh the payload looks like * Greenwich CSV with a regex. We could use this * to split out the fields, instead of reverting * to sscanf */ // TID , TST , T , LAT , LON , COG , VEL , ALT , DIST , TRIP #define CSV_RE "^([[:alnum:]]+),([[:xdigit:]]+),[[:alnum:]],[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+$" static int csv_looks_sane(char *payload) { static int virgin = 1; static regex_t regex; int nomatch; int cflags = REG_EXTENDED | REG_ICASE | REG_NOSUB; if (virgin) { virgin = !virgin; if (regcomp(®ex, CSV_RE, cflags)) { olog(LOG_ERR, "Cannot compile CSV RE"); return (FALSE); } } nomatch = regexec(®ex, payload, 0, NULL, 0); return (nomatch ? FALSE : TRUE); } /* * Decode OwnTracks CSV (Greenwich) and return a new JSON object * of _type = location. * #define CSV "X0,542A46AA,k,30365854,7575769,26,4,7,5,872" */ #define MILL 1000000.0 JsonNode *csv_to_json(char *payload) { JsonNode *json; char tid[64], t[10]; double dist = 0, lat, lon, vel, trip, alt, cog; long tst; char tmptst[40]; if (!csv_looks_sane(payload)) return (NULL); if (sscanf(payload, "%[^,],%[^,],%[^,],%lf,%lf,%lf,%lf,%lf,%lf,%lf", tid, tmptst, t, &lat, &lon, &cog, &vel, &alt, &dist, &trip) != 10) { // fprintf(stderr, "**** payload not CSV: %s\n", payload); return (NULL); } lat /= MILL; lon /= MILL; cog *= 10; alt *= 10; trip *= 1000; tst = strtoul(tmptst, NULL, 16); json = json_mkobject(); json_append_member(json, "_type", json_mkstring("location")); json_append_member(json, "t", json_mkstring(t)); json_append_member(json, "tid", json_mkstring(tid)); json_append_member(json, "tst", json_mknumber(tst)); json_append_member(json, "lat", json_mknumber(lat)); json_append_member(json, "lon", json_mknumber(lon)); json_append_member(json, "cog", json_mknumber(cog)); json_append_member(json, "vel", json_mknumber(vel)); json_append_member(json, "alt", json_mknumber(alt)); json_append_member(json, "dist", json_mknumber(dist)); json_append_member(json, "trip", json_mknumber(trip)); json_append_member(json, "csv", json_mkbool(1)); return (json); } #define RECFORMAT "%s\t%-18s\t%s\n" /* * Store payload in REC file unless our Lua putrec() function says * we shouldn't for this particular user/device combo. */ static void putrec(struct udata *ud, time_t now, UT_string *reltopic, UT_string *username, UT_string *device, char *string) { FILE *fp; int rc = 0; if (ud->norec) return; #ifdef WITH_LUA rc = hooks_norec(ud, UB(username), UB(device), string); #endif if (rc == 0) { if ((fp = pathn("a", "rec", username, device, "rec")) == NULL) { olog(LOG_ERR, "Cannot write REC for %s/%s: %m", UB(username), UB(device)); return; } fprintf(fp, RECFORMAT, isotime(now), UB(reltopic), string); fclose(fp); } } /* * Payload contains JSON string with a configuration obtained * via cmd `dump' to the device. Store it "pretty". */ static char *prettyfy(char *payloadstring) { JsonNode *json; char *pretty_js; if ((json = json_decode(payloadstring)) == NULL) { olog(LOG_ERR, "Cannot decode JSON from %s", payloadstring); return (NULL); } pretty_js = json_stringify(json, "\t"); json_delete(json); return (pretty_js); } static void xx_dump(struct udata *ud, UT_string *username, UT_string *device, char *payloadstring, char *type, char *extension) { static UT_string *ts = NULL; char *pretty_js = prettyfy(payloadstring); utstring_renew(ts); utstring_printf(ts, "%s/%s/%s/%s", STORAGEDIR, type, UB(username), UB(device)); if (mkpath(UB(ts)) < 0) { olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts)); if (pretty_js) free(pretty_js); return; } utstring_printf(ts, "/%s-%s.%s", UB(username), UB(device), extension); if (ud->verbose) { printf("Received %s dump, storing at %s\n", type, UB(ts)); } safewrite(UB(ts), (pretty_js) ? pretty_js : payloadstring); if (pretty_js) free(pretty_js); } /* Dump a config payload; get the 'configuration' element out of the dumped payloadstring */ void config_dump(struct udata *ud, UT_string *username, UT_string *device, char *payloadstring) { JsonNode *json = json_decode(payloadstring), *config; if (json == NULL) return; if ((config = json_find_member(json, "configuration")) != NULL) { char *js_string = json_stringify(config, NULL); if (js_string) { xx_dump(ud, username, device, js_string, "config", "otrc"); json_delete(json); free(js_string); } } } /* Dump a waypoints (plural) payload */ void waypoints_dump(struct udata *ud, UT_string *username, UT_string *device, char *payloadstring) { JsonNode *json = json_decode(payloadstring), *j; char *js = NULL; if (json == NULL) return; if ((j = json_find_member(json, "r")) != NULL) { json_remove_from_parent(j); js = json_stringify(json, NULL); json_delete(json); } xx_dump(ud, username, device, (js) ? js : payloadstring, "waypoints", "otrw"); if (js) free(js); } #ifdef WITH_RONLY static int is_ronly(struct udata *ud, UT_string *basetopic) { JsonNode *json, *j; char *key = UB(basetopic); int active = FALSE; if ((json = gcache_json_get(ud->ronlydb, key)) == NULL) return (FALSE); if ((j = json_find_member(json, "active")) != NULL) { active = j->bool_; } printf("**--- %s: return active = %d\n", key, active); return (active); } /* * Make an RONLYdb entry for basetopic, updating timestamp in the JSON * active is TRUE if the user is an RONLY user, else FALSE. */ static void ronly_set(struct udata *ud, UT_string *basetopic, int active) { JsonNode *json, *j; char *key = UB(basetopic); int rc, touch = FALSE; json = gcache_json_get(ud->ronlydb, key); if (json == NULL) { if (active == FALSE) /* Has never been r:true b/c not in RONLYdb */ return; json = json_mkobject(); } if ((j = json_find_member(json, "first")) == NULL) { json_append_member(json, "first", json_mknumber(time(0))); touch = TRUE; } if ((j = json_find_member(json, "last")) != NULL) json_remove_from_parent(j); if ((j = json_find_member(json, "active")) != NULL) { if (active != j->bool_) { json_remove_from_parent(j); json_append_member(json, "active", json_mkbool(active)); json_append_member(json, "last", json_mknumber(time(0))); touch = TRUE; } else if (active == TRUE) { json_append_member(json, "last", json_mknumber(time(0))); touch = TRUE; } } else { json_append_member(json, "active", json_mkbool(active)); touch = TRUE; } if (touch) { if ((rc = gcache_json_put(ud->ronlydb, key, json)) != 0) olog(LOG_ERR, "Cannot store %s in ronlydb: rc==%d", key, rc); printf("+++++++++ TOUCH db for %s\n", key); } json_delete(json); } #endif #ifdef WITH_GREENWICH /* * key is "batt", "ext", or "status" * value is a string which contains a number * * Open/create a file at gw/user/device/user-device.json. Append to the existing array, * limiting the number of array entries. */ void store_gwvalue(char *username, char *device, time_t tst, char *key, char *value) { static UT_string *ts = NULL; JsonNode *array, *o, *j; int count = 0; char *js; utstring_renew(ts); utstring_printf(ts, "%s/last/%s/%s", STORAGEDIR, username, device); if (mkpath(UB(ts)) < 0) { olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts)); return; } utstring_printf(ts, "/%s.json", key); /* Read file into array or create array on error */ if ((js = slurp_file(UB(ts), TRUE)) != NULL) { if ((array = json_decode(js)) == NULL) { array = json_mkarray(); } free(js); } else { array = json_mkarray(); } /* Count elements in array and pop first if too long */ json_foreach(j, array) { ++count; } if (count >= GWNUMBERSMAX) { j = json_first_child(array); json_delete(j); } o = json_mkobject(); json_append_member(o, "tst", json_mknumber(tst)); json_append_member(o, key, json_mknumber(atof(value))); json_append_element(array, o); if ((js = json_stringify(array, NULL)) != NULL) { safewrite(UB(ts), js); free(js); } json_delete(array); } #endif /* GREENWICH */ #if WITH_ENCRYPT /* * Create a new mosquitto message structure, decrypt and populate new. * p64 contains the base64-encoded payload from the device. `username' * and `device' are needed to obtain the decryption key for this object. */ struct mosquitto_message *decrypt(struct udata *ud, const struct mosquitto_message *m, char *p64, char *username, char *device) { struct mosquitto_message *msg; unsigned char key[crypto_secretbox_KEYBYTES]; unsigned char *ciphertext, *cleartext; size_t ciphertext_len; int n, klen; UT_string *userdev; utstring_new(userdev); utstring_printf(userdev, "%s-%s", username, device); memset(key, 0, sizeof(key)); klen = gcache_get(ud->keydb, (char *)UB(userdev), (char *)key, sizeof(key)); if (klen < 1) { olog(LOG_ERR, "no decryption key for %s in %s", UB(userdev), m->topic); return (NULL); } debug(ud, "Key for %s is [%s]", UB(userdev), key); if ((msg = malloc(sizeof(struct mosquitto_message))) == NULL) { return (NULL); } n = strlen(p64); /* This is more than enough */ msg->mid = m->mid; msg->topic = m->topic; msg->qos = m->qos; msg->retain = m->retain; if ((ciphertext = base64_decode(p64, &ciphertext_len)) == NULL) { olog(LOG_ERR, "payload of %s cannot be base64-decoded", m->topic); free(msg); return (NULL); } debug(ud, "START DECRYPT. clen==%lu", ciphertext_len); if ((cleartext = calloc(n, sizeof(unsigned char))) == NULL) { free(ciphertext); free(msg); return (NULL); } if (crypto_secretbox_open_easy(cleartext, // message ciphertext + crypto_secretbox_NONCEBYTES, // skip over nonce ciphertext_len - crypto_secretbox_NONCEBYTES, // len (- nonce) ciphertext, // nonce key) != 0) { olog(LOG_ERR, "payload of %s cannot be decrypted; forged?", m->topic); free(ciphertext); free(cleartext); free(msg); return (NULL); } debug(ud, "DECRYPTED: %s", (char *)cleartext); free(ciphertext); msg->payload = (void *)cleartext; msg->payloadlen = strlen((char *)cleartext); return (msg); } #endif /* ENCRYPT */ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m) { JsonNode *json, *j, *geo = NULL; char *tid = NULL, *t = NULL, *p; double lat, lon, acc; long tst; struct udata *ud = (struct udata *)userdata; char **topics; int count = 0, cached; static UT_string *basetopic = NULL, *username = NULL, *device = NULL, *addr = NULL, *cc = NULL, *ghash = NULL, *ts = NULL; static UT_string *reltopic = NULL; char *jsonstring, *_typestr = NULL; time_t now; int pingping = FALSE, skipslash = 0; int r_ok = TRUE; /* True if recording enabled for a publish */ payload_type _type; #ifdef WITH_ENCRYPT struct mosquitto_message *new_m; #endif /* * mosquitto_message-> * int mid; * char *topic; * void *payload; * int payloadlen; * int qos; * bool retain; */ time(&now); monitorhook(ud, now, m->topic); if (m->payloadlen == 0) { return; } if (m->retain == TRUE && ud->ignoreretained) { return; } // printf("%s %s\n", m->topic, bindump(m->payload, m->payloadlen)); fflush(stdout); utstring_renew(ts); utstring_renew(basetopic); utstring_renew(username); utstring_renew(device); if (mosquitto_sub_topic_tokenise(m->topic, &topics, &count) != MOSQ_ERR_SUCCESS) { return; } /* * Do we have a leading / in topic? * Also, if topic is too short, ignore and return. We *demand* 3 parts * i.e. "owntracks/user/device" */ if (topics[0] == NULL) { /* Topic has leading / */ skipslash = 1; } if (count - skipslash < 3) { fprintf(stderr, "Ignoring short topic %s\n", m->topic); mosquitto_sub_topic_tokens_free(&topics, count); return; } /* * Determine "relative topic", relative to base, i.e. whatever comes * behind ownntracks/user/device/. If it's the base topic, use "*". */ utstring_renew(reltopic); if (count != (3 + skipslash)) { int j; for (j = 3 + skipslash; j < count; j++) { utstring_printf(reltopic, "%s%c", topics[j], (j < count - 1) ? '/' : ' '); } } else { utstring_printf(reltopic, "*"); } if (utstring_len(reltopic) == 0) utstring_printf(reltopic, "-"); utstring_printf(basetopic, "%s/%s/%s", topics[0 + skipslash], topics[1 + skipslash], topics[2 + skipslash]); utstring_printf(username, "%s", topics[1 + skipslash]); utstring_printf(device, "%s", topics[2 + skipslash]); #ifdef WITH_PING if (!strcmp(UB(username), "ping") && !strcmp(UB(device), "ping")) { pingping = TRUE; } #endif #ifdef WITH_GREENWICH /* * For Greenwich: handle owntracks/user/device/voltage/batt, voltage/ext, and * status all of which have a numeric payload. */ if ((count == 5+skipslash && !strcmp(topics[3+skipslash], "voltage")) && (!strcmp(topics[4+skipslash], "batt") || !strcmp(topics[4+skipslash], "ext"))) { store_gwvalue(UB(username), UB(device), now, topics[4+skipslash], m->payload); } if (count == 4+skipslash && !strcmp(topics[3+skipslash], "status")) { store_gwvalue(UB(username), UB(device), now, "status", m->payload); } /* Fall through to store this payload in the REC file as well. */ #endif mosquitto_sub_topic_tokens_free(&topics, count); /* * Now let's see if this contains some sort of valid JSON * or an OwnTracks CSV. If it doesn't, just store this payload because * there's nothing left for us to do with it. */ if ((json = json_decode(m->payload)) == NULL) { if ((json = csv_to_json(m->payload)) == NULL) { #ifdef WITH_RONLY /* * If the base topic belongs to an RONLY user, store * the payload. */ if (is_ronly(ud, basetopic)) { // puts("*** storing plain publis"); putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); } #else /* It's not JSON or it's not a location CSV; store it */ putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); #endif return; } } if (ud->skipdemo && (json_find_member(json, "_demo") != NULL)) { json_delete(json); return; } #ifdef WITH_RONLY /* * This is a special mode in which location (and a few other) * publishes will be recorded only if r:true in the payload. * If we cannot find `r' in the JSON, or if `r' isn't true, * set r_ok to FALSE. We cannot just bail out here, because * we still want info, cards &c. */ if ((j = json_find_member(json, "r")) == NULL) { r_ok = FALSE; /* * This JSON payload might actually belong to an RONLY user * but it doesn't have an `r:true' in it. Determine whether * the basetopic belongs to such a user, and force r_ok * accordingly. If this is _type:location it holds the definitive * truth. */ if (is_ronly(ud, basetopic)) { r_ok = TRUE; // printf("*** forcing TRUE b/c ronlydb (blen=%ld)\n", blen); } if ((j = json_find_member(json, "_type")) != NULL) { if ((j->tag == JSON_STRING) && (strcmp(j->string_, "location") == 0)) { r_ok = FALSE; } } } else { r_ok = TRUE; if ((j->tag != JSON_BOOL) || (j->bool_ == FALSE)) { r_ok = FALSE; } } /* * Record the RONLY basetopic in RONLYdb, and indicate active or not */ ronly_set(ud, basetopic, r_ok); #endif _type = T_UNKNOWN; if ((j = json_find_member(json, "_type")) != NULL) { if (j->tag == JSON_STRING) { _typestr = strdup(j->string_); if (!strcmp(j->string_, "location")) _type = T_LOCATION; else if (!strcmp(j->string_, "beacon")) _type = T_BEACON; else if (!strcmp(j->string_, "card")) _type = T_CARD; else if (!strcmp(j->string_, "cmd")) _type = T_CMD; else if (!strcmp(j->string_, "lwt")) _type = T_LWT; else if (!strcmp(j->string_, "msg")) _type = T_MSG; else if (!strcmp(j->string_, "steps")) _type = T_STEPS; else if (!strcmp(j->string_, "transition")) _type = T_TRANSITION; else if (!strcmp(j->string_, "waypoint")) _type = T_WAYPOINT; else if (!strcmp(j->string_, "waypoints")) _type = T_WAYPOINTS; else if (!strcmp(j->string_, "dump")) _type = T_CONFIG; #if WITH_ENCRYPT else if (!strcmp(j->string_, "encrypted")) _type = T_ENCRYPTED; #endif /* WITH_ENCRYPT */ } } switch (_type) { case T_CARD: do_info(ud, username, device, json); goto cleanup; case T_MSG: do_msg(ud, username, device, json); goto cleanup; case T_BEACON: #ifdef WITH_HTTP if (ud->mgserver && !pingping) { json_append_member(json, "topic", json_mkstring(m->topic)); json_append_member(json, "username", json_mkstring(UB(username))); json_append_member(json, "device", json_mkstring(UB(device))); http_ws_push_json(ud->mgserver, json); } #endif if (r_ok) { putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); } goto cleanup; case T_CMD: case T_LWT: case T_STEPS: if (r_ok) { putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); } goto cleanup; case T_WAYPOINTS: waypoints_dump(ud, username, device, m->payload); goto cleanup; case T_CONFIG: config_dump(ud, username, device, m->payload); goto cleanup; case T_WAYPOINT: case T_TRANSITION: case T_LOCATION: break; #if WITH_ENCRYPT case T_ENCRYPTED: /* * Obtain the `data' element from JSON, and try and decrypt * that. If successful, we get a new mosquitto_message with * the decrypted message as payload, and invoke this function * again to do the heavy lifting. */ if ((j = json_find_member(json, "data")) != NULL) { if (j->tag == JSON_STRING) { new_m = decrypt(ud, m, j->string_, UB(username), UB(device)); if (new_m != NULL) { on_message(mosq, userdata, new_m); free(new_m->payload); free(new_m); } return; } } olog(LOG_ERR, "no `data' in encrypted %s", m->topic); return; break; #endif /* WITH_ENCRYPT */ default: if (r_ok) { putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen)); } goto cleanup; } if (r_ok == FALSE) goto cleanup; /* * We are now handling location-related JSON. Normalize tst, lat, lon * to numbers, particularly for Greenwich which produces strings * currently. We're normalizing *in* json which replaces strings by * numbers. */ tst = time(NULL); if ((j = json_find_member(json, "tst")) != NULL) { if (j->tag == JSON_STRING) { tst = strtoul(j->string_, NULL, 10); json_remove_from_parent(j); json_append_member(json, "tst", json_mknumber(tst)); } else { tst = (unsigned long)j->number_; } } if (isnan(lat = number(json, "lat")) || isnan(lon = number(json, "lon"))) { olog(LOG_ERR, "lat or lon for %s are NaN: %s", m->topic, bindump(m->payload, m->payloadlen)); goto cleanup; } if ((j = json_find_member(json, "acc")) != NULL) { if (j->tag == JSON_STRING) { acc = atof(j->string_); json_remove_from_parent(j); json_append_member(json, "acc", json_mknumber(acc)); } } if ((j = json_find_member(json, "tid")) != NULL) { if (j->tag == JSON_STRING) { tid = strdup(j->string_); } } if ((j = json_find_member(json, "t")) != NULL) { if (j && j->tag == JSON_STRING) { t = strdup(j->string_); } } #if 0 /* Haversine */ { double d = haversine_dist(lat, lon, 52.03431, 8.47654); printf("*** d=%lf meters\n", d); } #endif #ifdef WITH_LMDB /* * If the topic we are handling is in topic2tid, replace the TID * in this payload with that from the database. */ if (ud->t2t) { char newtid[BUFSIZ]; long blen; if ((blen = gcache_get(ud->t2t, m->topic, newtid, sizeof(newtid))) > 0) { if ((j = json_find_member(json, "tid")) != NULL) json_remove_from_parent(j); json_append_member(json, "tid", json_mkstring(newtid)); } } #endif /* * Chances are high that what we have now contains lat, lon. Attempt to * perform or retrieve reverse-geo. */ utstring_renew(ghash); utstring_renew(addr); utstring_renew(cc); p = geohash_encode(lat, lon, geohash_prec()); if (p != NULL) { utstring_printf(ghash, "%s", p); free(p); } cached = FALSE; if (ud->revgeo == TRUE) { #ifdef WITH_LMDB if ((geo = gcache_json_get(ud->gc, UB(ghash))) != NULL) { /* Habemus cached data */ cached = TRUE; if ((j = json_find_member(geo, "cc")) != NULL) { utstring_printf(cc, "%s", j->string_); } if ((j = json_find_member(geo, "addr")) != NULL) { utstring_printf(addr, "%s", j->string_); } } else { if ((geo = revgeo(ud, lat, lon, addr, cc)) != NULL) { gcache_json_put(ud->gc, UB(ghash), geo); } else { /* We didn't obtain reverse Geo, maybe because of over * quota; make a note of the missing geohash */ char gfile[BUFSIZ]; FILE *fp; snprintf(gfile, BUFSIZ, "%s/ghash/missing", STORAGEDIR); if ((fp = fopen(gfile, "a")) != NULL) { fprintf(fp, "%s %lf %lf\n", UB(ghash), lat, lon); fclose(fp); } } } #else /* !LMDB */ if ((geo = revgeo(ud, lat, lon, addr, cc)) != NULL) { ; } #endif /* LMDB */ } else { utstring_printf(cc, "??"); utstring_printf(addr, "n.a."); } /* * We have normalized data in the JSON, so we can now write it * out to the REC file. */ if (!pingping) { if ((jsonstring = json_stringify(json, NULL)) != NULL) { putrec(ud, now, reltopic, username, device, jsonstring); free(jsonstring); } } /* * Append a few bits to the location type to add to LAST and * for Lua / Websockets. * I need a unique "key" in the Websocket clients to keep track * of which device is being updated; use topic. */ json_append_member(json, "topic", json_mkstring(m->topic)); /* * We have to know which user/device this is for in order to * determine whether a connected Websocket client is authorized * to see this. Add user/device */ json_append_member(json, "username", json_mkstring(UB(username))); json_append_member(json, "device", json_mkstring(UB(device))); json_append_member(json, "ghash", json_mkstring(UB(ghash))); if (_type == T_LOCATION || _type == T_WAYPOINT) { UT_string *filename = NULL; char *component; utstring_renew(filename); if (_type == T_LOCATION) { component = "last"; utstring_printf(filename, "%s-%s.json", UB(username), UB(device)); } else if (_type == T_WAYPOINT) { component = "waypoints"; utstring_printf(filename, "%s.json", isotime(tst)); } if ((jsonstring = json_stringify(json, NULL)) != NULL) { utstring_printf(ts, "%s/%s/%s/%s", STORAGEDIR, component, UB(username), UB(device)); if (mkpath(UB(ts)) < 0) { olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts)); } utstring_printf(ts, "/%s", UB(filename)); safewrite(UB(ts), jsonstring); free(jsonstring); } } /* * Now add more bits for Lua and Websocket, in particular the * Geo data. */ if (geo) { json_copy_to_object(json, geo, FALSE); } #ifdef WITH_HTTP if (ud->mgserver && !pingping) { http_ws_push_json(ud->mgserver, json); } #endif #ifdef WITH_LUA # ifdef WITH_LMDB if (ud->luadata && !pingping) { hooks_hook(ud, m->topic, json); } # endif /* LMDB */ #endif if (ud->verbose) { if (_type == T_LOCATION) { printf("%c %s %-35s t=%-1.1s tid=%-2.2s loc=%.5f,%.5f [%s] %s (%s)\n", (cached) ? '*' : '-', ltime(tst), m->topic, (t) ? t : " ", (tid) ? tid : "", lat, lon, UB(cc), UB(addr), UB(ghash) ); } else if (_type == T_TRANSITION) { JsonNode *e, *d; e = json_find_member(json, "event"); d = json_find_member(json, "desc"); printf("transition: %s %s\n", (e) ? e->string_ : "unknown", (d) ? d->string_ : "unknown"); } else if (_type == T_WAYPOINT) { j = json_find_member(json, "desc"); printf("waypoint: %s\n", (j) ? j->string_ : "unknown desc"); } else { if ((jsonstring = json_stringify(json, NULL)) != NULL) { printf("%s %s\n", _typestr, jsonstring); free(jsonstring); } else { printf("%s received\n", _typestr); } } } cleanup: if (geo) json_delete(geo); if (json) json_delete(json); if (tid) free(tid); if (t) free(t); if (_typestr) free(_typestr); } void on_connect(struct mosquitto *mosq, void *userdata, int rc) { struct udata *ud = (struct udata *)userdata; int mid; JsonNode *t; json_foreach(t, ud->topics) { if (t->tag == JSON_STRING) { olog(LOG_DEBUG, "Subscribing to %s (qos=%d)", t->string_, ud->qos); mosquitto_subscribe(mosq, &mid, t->string_, ud->qos); } } } static char *mosquitto_reason(int rc) { static char *reasons[] = { "Connection accepted", /* 0x00 */ "Connection refused: incorrect protocol version", /* 0x01 */ "Connection refused: invalid client identifier", /* 0x02 */ "Connection refused: server unavailable", /* 0x03 */ "Connection refused: code=0x04", /* 0x04 */ "Connection refused: bad username or password", /* 0x05 */ "Connection refused: not authorized", /* 0x06 */ "Connection refused: TLS error", /* 0x07 */ }; return ((rc >= 0 && rc <= 0x07) ? reasons[rc] : "unknown reason"); } void on_disconnect(struct mosquitto *mosq, void *userdata, int reason) { #ifdef WITH_LMDB struct udata *ud = (struct udata *)userdata; #endif if (reason == 0) { // client wish #ifdef WITH_LMDB gcache_close(ud->gc); #endif } else { olog(LOG_INFO, "Disconnected. Reason: 0x%X [%s]", reason, mosquitto_reason(reason)); } } static void catcher(int sig) { fprintf(stderr, "Going down on signal %d\n", sig); run = 0; } void usage(char *prog) { printf("Usage: %s [options..] topic [topic ...]\n", prog); printf(" --help -h this message\n"); printf(" --storage -S storage dir (%s)\n", STORAGEDEFAULT); printf(" --norevgeo -G disable ghash to reverge-geo lookups\n"); printf(" --skipdemo -D do handle objects with _demo (default: don't)\n"); printf(" --useretained -R process retained messages (default: no)\n"); printf(" --clientid -i MQTT client-ID\n"); printf(" --qos -q MQTT QoS (dflt: 2)\n"); printf(" --pubprefix -P republish prefix (dflt: no republish)\n"); printf(" --host -H MQTT host (localhost)\n"); printf(" --port -p MQTT port (1883)\n"); printf(" --logfacility syslog facility (local0)\n"); printf(" --quiet disable printing of messages to stdout\n"); printf(" --initialize initialize storage\n"); printf(" --label