recorder/recorder.c

1667 lines
42 KiB
C
Raw Normal View History

2015-09-01 08:19:52 -07:00
/*
2016-01-01 07:57:34 -07:00
* Copyright (C) 2015-2016 Jan-Piet Mens <jpmens@gmail.com> and OwnTracks
2015-09-01 08:19:52 -07:00
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* JAN-PIET MENS OR OWNTRACKS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
2015-08-14 09:40:35 -07:00
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <mosquitto.h>
#include <getopt.h>
#include <time.h>
#include <math.h>
2015-08-14 09:40:35 -07:00
#include "json.h"
#include <sys/utsname.h>
2015-10-14 23:51:18 -07:00
#include <regex.h>
2015-08-14 09:40:35 -07:00
#include "utstring.h"
#include "geo.h"
2015-09-01 04:31:49 -07:00
#include "geohash.h"
2015-08-14 09:40:35 -07:00
#include "base64.h"
2015-08-14 23:14:34 -07:00
#include "misc.h"
2015-08-22 06:53:11 -07:00
#include "util.h"
2015-08-27 14:42:18 -07:00
#include "storage.h"
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-01 04:31:49 -07:00
# include "gcache.h"
#endif
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
2015-08-29 06:04:22 -07:00
# include "http.h"
2015-08-27 14:42:18 -07:00
#endif
2015-09-19 07:11:53 -07:00
#ifdef WITH_LUA
# include "hooks.h"
#endif
2015-08-14 09:40:35 -07:00
#define SSL_VERIFY_PEER (1)
#define SSL_VERIFY_NONE (0)
#define TOPIC_PARTS (4) /* owntracks/user/device/info */
#define DEFAULT_QOS (2)
#define CLEAN_SESSION false
#define GWNUMBERSMAX 50 /* number of batt,ext,status in array */
2015-08-14 09:40:35 -07:00
static int run = 1;
double number(JsonNode *j, char *element)
{
JsonNode *m;
double d;
2015-08-14 09:40:35 -07:00
if ((m = json_find_member(j, element)) != NULL) {
if (m->tag == JSON_NUMBER) {
return (m->number_);
} else if (m->tag == JSON_STRING) {
d = atof(m->string_);
/* Normalize to number */
json_remove_from_parent(m);
json_append_member(j, element, json_mknumber(d));
return (d);
2015-08-14 09:40:35 -07:00
}
}
return (NAN);
2015-08-14 09:40:35 -07:00
}
static const char *ltime(time_t t) {
static char buf[] = "HH:MM:SS";
strftime(buf, sizeof(buf), "%T", localtime(&t));
return(buf);
}
/*
* Process info/ message containing a CARD. If the payload is a card, return TRUE.
*/
int do_info(void *userdata, UT_string *username, UT_string *device, JsonNode *json)
2015-08-14 09:40:35 -07:00
{
struct udata *ud = (struct udata *)userdata;
JsonNode *j;
2015-08-14 09:40:35 -07:00
static UT_string *name = NULL, *face = NULL;
FILE *fp;
char *img;
int rc = FALSE;
2015-08-14 09:40:35 -07:00
utstring_renew(name);
utstring_renew(face);
2015-09-01 04:31:49 -07:00
/* I know the payload is valid JSON: write card */
2015-08-14 09:40:35 -07:00
2015-09-01 04:31:49 -07:00
if ((fp = pathn("wb", "cards", username, NULL, "json")) != NULL) {
char *js = json_stringify(json, NULL);
if (js) {
fprintf(fp, "%s\n", js);
free(js);
}
2015-09-01 04:31:49 -07:00
fclose(fp);
2015-08-14 09:40:35 -07:00
}
2015-09-01 04:31:49 -07:00
rc = TRUE;
2015-08-14 09:40:35 -07:00
if ((j = json_find_member(json, "name")) != NULL) {
if (j->tag == JSON_STRING) {
// printf("I got: [%s]\n", j->string_);
utstring_printf(name, "%s", j->string_);
}
}
if ((j = json_find_member(json, "face")) != NULL) {
if (j->tag == JSON_STRING) {
// printf("I got: [%s]\n", j->string_);
utstring_printf(face, "%s", j->string_);
}
}
if (ud->verbose) {
2015-09-21 07:07:31 -07:00
printf("* CARD: %s-%s %s\n", UB(username), UB(device), UB(name));
2015-08-14 09:40:35 -07:00
}
2015-08-14 09:40:35 -07:00
/* We have a base64-encoded "face". Decode it and store binary image */
if ((img = malloc(utstring_len(face))) != NULL) {
int imglen;
2015-09-21 07:07:31 -07:00
if ((imglen = base64_decode(UB(face), img)) > 0) {
2015-09-01 04:31:49 -07:00
if ((fp = pathn("wb", "photos", username, NULL, "png")) != NULL) {
fwrite(img, sizeof(char), imglen, fp);
fclose(fp);
2015-08-14 09:40:35 -07:00
}
}
free(img);
}
return (rc);
2015-08-14 09:40:35 -07:00
}
void do_msg(void *userdata, UT_string *username, UT_string *device, JsonNode *json)
2015-08-14 09:40:35 -07:00
{
struct udata *ud = (struct udata *)userdata;
FILE *fp;
2015-09-01 04:31:49 -07:00
/* I know the payload is valid JSON: write message */
2015-08-14 09:40:35 -07:00
2015-09-01 04:31:49 -07:00
if ((fp = pathn("ab", "msg", username, NULL, "json")) != NULL) {
char *js = json_stringify(json, NULL);
if (js) {
fprintf(fp, "%s\n", js);
free(js);
}
2015-09-01 04:31:49 -07:00
fclose(fp);
2015-08-14 09:40:35 -07:00
}
if (ud->verbose) {
2015-09-21 07:07:31 -07:00
printf("* MSG: %s-%s\n", UB(username), UB(device));
}
2015-08-14 09:40:35 -07:00
}
void republish(struct mosquitto *mosq, struct udata *userdata, char *username, char *topic, double lat, double lon, char *cc, char *addr, long tst, char *t)
{
struct udata *ud = (struct udata *)userdata;
JsonNode *json;
static UT_string *newtopic = NULL;
char *payload;
if (ud->pubprefix == NULL)
return;
if ((json = json_mkobject()) == NULL) {
return;
}
utstring_renew(newtopic);
utstring_printf(newtopic, "%s/%s", ud->pubprefix, topic);
json_append_member(json, "username", json_mkstring(username));
json_append_member(json, "topic", json_mkstring(topic));
json_append_member(json, "cc", json_mkstring(cc));
json_append_member(json, "addr", json_mkstring(addr));
json_append_member(json, "t", json_mkstring(t));
json_append_member(json, "tst", json_mknumber(tst));
json_append_member(json, "lat", json_mknumber(lat));
json_append_member(json, "lon", json_mknumber(lon));
if ((payload = json_stringify(json, NULL)) != NULL) {
2015-09-21 07:07:31 -07:00
mosquitto_publish(mosq, NULL, UB(newtopic),
2015-08-14 09:40:35 -07:00
strlen(payload), payload, 1, true);
2015-09-21 07:07:31 -07:00
fprintf(stderr, "%s %s\n", UB(newtopic), payload);
2015-08-14 09:40:35 -07:00
free(payload);
}
json_delete(json);
}
2015-10-14 23:51:18 -07:00
/*
* Quickly check wheterh the payload looks like
* Greenwich CSV with a regex. We could use this
* to split out the fields, instead of reverting
* to sscanf
*/
// TID , TST , T , LAT , LON , COG , VEL , ALT , DIST , TRIP
#define CSV_RE "^([[:alnum:]]+),([[:xdigit:]]+),[[:alnum:]],[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+,[[:digit:]]+$"
static int csv_looks_sane(char *payload)
{
static int virgin = 1;
static regex_t regex;
2015-10-15 00:00:12 -07:00
int nomatch;
2015-10-14 23:51:18 -07:00
int cflags = REG_EXTENDED | REG_ICASE | REG_NOSUB;
if (virgin) {
virgin = !virgin;
if (regcomp(&regex, CSV_RE, cflags)) {
olog(LOG_ERR, "Cannot compile CSV RE");
return (FALSE);
}
}
2015-10-15 00:00:12 -07:00
nomatch = regexec(&regex, payload, 0, NULL, 0);
2015-10-14 23:51:18 -07:00
2015-10-15 00:00:12 -07:00
return (nomatch ? FALSE : TRUE);
2015-10-14 23:51:18 -07:00
}
2015-08-14 09:40:35 -07:00
/*
* Decode OwnTracks CSV (Greenwich) and return a new JSON object
* of _type = location.
2015-10-14 23:51:18 -07:00
* #define CSV "X0,542A46AA,k,30365854,7575769,26,4,7,5,872"
2015-08-14 09:40:35 -07:00
*/
#define MILL 1000000.0
JsonNode *csv_to_json(char *payload)
2015-08-14 09:40:35 -07:00
{
JsonNode *json;
char tid[64], t[10];
double dist = 0, lat, lon, vel, trip, alt, cog;
long tst;
2015-08-14 09:40:35 -07:00
char tmptst[40];
2015-10-14 23:51:18 -07:00
if (!csv_looks_sane(payload))
return (NULL);
if (sscanf(payload, "%[^,],%[^,],%[^,],%lf,%lf,%lf,%lf,%lf,%lf,%lf", tid, tmptst, t, &lat, &lon, &cog, &vel, &alt, &dist, &trip) != 10) {
2015-08-16 08:38:11 -07:00
// fprintf(stderr, "**** payload not CSV: %s\n", payload);
2015-08-14 09:40:35 -07:00
return (NULL);
}
lat /= MILL;
lon /= MILL;
2015-08-14 09:40:35 -07:00
cog *= 10;
alt *= 10;
trip *= 1000;
tst = strtoul(tmptst, NULL, 16);
2015-08-14 09:40:35 -07:00
json = json_mkobject();
json_append_member(json, "_type", json_mkstring("location"));
json_append_member(json, "t", json_mkstring(t));
json_append_member(json, "tid", json_mkstring(tid));
json_append_member(json, "tst", json_mknumber(tst));
json_append_member(json, "lat", json_mknumber(lat));
json_append_member(json, "lon", json_mknumber(lon));
2015-08-14 09:40:35 -07:00
json_append_member(json, "cog", json_mknumber(cog));
json_append_member(json, "vel", json_mknumber(vel));
json_append_member(json, "alt", json_mknumber(alt));
json_append_member(json, "dist", json_mknumber(dist));
json_append_member(json, "trip", json_mknumber(trip));
json_append_member(json, "csv", json_mkbool(1));
return (json);
}
2015-08-14 09:40:35 -07:00
#define RECFORMAT "%s\t%-18s\t%s\n"
/*
* Store payload in REC file unless our Lua putrec() function says
* we shouldn't for this particular user/device combo.
*/
static void putrec(struct udata *ud, time_t now, UT_string *reltopic, UT_string *username, UT_string *device, char *string)
{
FILE *fp;
2015-09-28 23:03:17 -07:00
int rc = 0;
2015-10-23 08:03:19 -07:00
if (ud->norec)
return;
2015-09-28 23:03:17 -07:00
#ifdef WITH_LUA
rc = hooks_norec(ud, UB(username), UB(device), string);
#endif
if (rc == 0) {
if ((fp = pathn("a", "rec", username, device, "rec")) == NULL) {
olog(LOG_ERR, "Cannot write REC for %s/%s: %m",
UB(username), UB(device));
}
fprintf(fp, RECFORMAT, isotime(now),
2015-09-21 07:07:31 -07:00
UB(reltopic), string);
fclose(fp);
}
}
2015-10-20 00:37:24 -07:00
/*
* Payload contains JSON string with a configuration obtained
2015-10-20 00:49:56 -07:00
* via cmd `dump' to the device. Store it "pretty".
2015-10-20 00:37:24 -07:00
*/
static char *prettyfy(char *payloadstring)
2015-10-20 00:37:24 -07:00
{
2015-10-20 00:49:56 -07:00
JsonNode *json;
char *pretty_js;
if ((json = json_decode(payloadstring)) == NULL) {
olog(LOG_ERR, "Cannot decode JSON from %s", payloadstring);
return (NULL);
2015-10-20 00:49:56 -07:00
}
pretty_js = json_stringify(json, "\t");
json_delete(json);
2015-10-20 00:37:24 -07:00
return (pretty_js);
}
static void xx_dump(struct udata *ud, UT_string *username, UT_string *device, char *payloadstring, char *type, char *extension)
{
static UT_string *ts = NULL;
2015-10-22 08:56:57 -07:00
char *pretty_js = prettyfy(payloadstring);
2015-10-20 00:37:24 -07:00
utstring_renew(ts);
utstring_printf(ts, "%s/%s/%s/%s",
STORAGEDIR,
2015-10-22 08:56:57 -07:00
type,
2015-10-20 00:37:24 -07:00
UB(username),
UB(device));
if (mkpath(UB(ts)) < 0) {
olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts));
if (pretty_js) free(pretty_js);
2015-10-20 00:37:24 -07:00
return;
}
utstring_printf(ts, "/%s-%s.%s", UB(username), UB(device), extension);
2015-10-20 00:37:24 -07:00
if (ud->verbose) {
2015-10-22 08:56:57 -07:00
printf("Received %s dump, storing at %s\n", type, UB(ts));
2015-10-20 00:37:24 -07:00
}
safewrite(UB(ts), (pretty_js) ? pretty_js : payloadstring);
if (pretty_js) free(pretty_js);
2015-10-20 00:37:24 -07:00
}
/* Dump a config payload; get the 'configuration' element out of the dumped payloadstring */
2015-10-22 08:56:57 -07:00
void config_dump(struct udata *ud, UT_string *username, UT_string *device, char *payloadstring)
{
JsonNode *json = json_decode(payloadstring), *config;
if (json == NULL)
return;
if ((config = json_find_member(json, "configuration")) != NULL) {
char *js_string = json_stringify(config, NULL);
if (js_string) {
xx_dump(ud, username, device, js_string, "config", "otrc");
json_delete(json);
free(js_string);
}
}
2015-10-22 08:56:57 -07:00
}
2015-10-22 08:56:57 -07:00
/* Dump a waypoints (plural) payload */
void waypoints_dump(struct udata *ud, UT_string *username, UT_string *device, char *payloadstring)
{
JsonNode *json = json_decode(payloadstring), *j;
char *js = NULL;
if (json == NULL)
return;
if ((j = json_find_member(json, "r")) != NULL) {
json_remove_from_parent(j);
js = json_stringify(json, NULL);
json_delete(json);
}
xx_dump(ud, username, device, (js) ? js : payloadstring, "waypoints", "otrw");
if (js)
free(js);
}
2015-10-22 02:22:14 -07:00
#ifdef WITH_RONLY
static int is_ronly(struct udata *ud, UT_string *basetopic)
{
2015-10-22 05:47:16 -07:00
JsonNode *json, *j;
char *key = UB(basetopic);
int active = FALSE;
2015-10-22 02:22:14 -07:00
2015-10-22 05:47:16 -07:00
if ((json = gcache_json_get(ud->ronlydb, key)) == NULL)
return (FALSE);
if ((j = json_find_member(json, "active")) != NULL) {
active = j->bool_;
}
printf("**--- %s: return active = %d\n", key, active);
return (active);
}
/*
* Make an RONLYdb entry for basetopic, updating timestamp in the JSON
* active is TRUE if the user is an RONLY user, else FALSE.
*/
static void ronly_set(struct udata *ud, UT_string *basetopic, int active)
{
JsonNode *json, *j;
char *key = UB(basetopic);
int rc, touch = FALSE;
json = gcache_json_get(ud->ronlydb, key);
if (json == NULL) {
if (active == FALSE) /* Has never been r:true b/c not in RONLYdb */
return;
json = json_mkobject();
}
if ((j = json_find_member(json, "first")) == NULL) {
json_append_member(json, "first", json_mknumber(time(0)));
touch = TRUE;
}
if ((j = json_find_member(json, "last")) != NULL)
json_remove_from_parent(j);
if ((j = json_find_member(json, "active")) != NULL) {
if (active != j->bool_) {
json_remove_from_parent(j);
json_append_member(json, "active", json_mkbool(active));
json_append_member(json, "last", json_mknumber(time(0)));
touch = TRUE;
} else if (active == TRUE) {
json_append_member(json, "last", json_mknumber(time(0)));
touch = TRUE;
}
} else {
json_append_member(json, "active", json_mkbool(active));
touch = TRUE;
}
if (touch) {
if ((rc = gcache_json_put(ud->ronlydb, key, json)) != 0)
olog(LOG_ERR, "Cannot store %s in ronlydb: rc==%d", key, rc);
printf("+++++++++ TOUCH db for %s\n", key);
}
json_delete(json);
2015-10-22 02:22:14 -07:00
}
#endif
#ifdef WITH_GREENWICH
/*
* key is "batt", "ext", or "status"
* value is a string which contains a number
*
* Open/create a file at gw/user/device/user-device.json. Append to the existing array,
* limiting the number of array entries.
*/
void store_gwvalue(char *username, char *device, time_t tst, char *key, char *value)
{
static UT_string *ts = NULL;
JsonNode *array, *o, *j;
int count = 0;
char *js;
utstring_renew(ts);
utstring_printf(ts, "%s/last/%s/%s",
STORAGEDIR,
username,
device);
if (mkpath(UB(ts)) < 0) {
olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts));
return;
}
utstring_printf(ts, "/%s.json", key);
/* Read file into array or create array on error */
if ((js = slurp_file(UB(ts), TRUE)) != NULL) {
if ((array = json_decode(js)) == NULL) {
array = json_mkarray();
}
free(js);
} else {
array = json_mkarray();
}
/* Count elements in array and pop first if too long */
json_foreach(j, array) {
++count;
}
if (count >= GWNUMBERSMAX) {
j = json_first_child(array);
json_delete(j);
}
o = json_mkobject();
json_append_member(o, "tst", json_mknumber(tst));
json_append_member(o, key, json_mknumber(atof(value)));
json_append_element(array, o);
if ((js = json_stringify(array, NULL)) != NULL) {
safewrite(UB(ts), js);
free(js);
}
json_delete(array);
}
#endif /* GREENWICH */
2015-08-14 09:40:35 -07:00
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m)
{
JsonNode *json, *j, *geo = NULL;
char *tid = NULL, *t = NULL, *p;
2015-12-09 04:52:20 -07:00
double lat, lon, acc;
2015-08-14 09:40:35 -07:00
long tst;
struct udata *ud = (struct udata *)userdata;
char **topics;
int count = 0, cached;
static UT_string *basetopic = NULL, *username = NULL, *device = NULL, *addr = NULL, *cc = NULL, *ghash = NULL, *ts = NULL;
2015-08-15 06:19:27 -07:00
static UT_string *reltopic = NULL;
char *jsonstring, *_typestr = NULL;
2015-08-14 09:40:35 -07:00
time_t now;
int pingping = FALSE, skipslash = 0;
int r_ok = TRUE; /* True if recording enabled for a publish */
payload_type _type;
2015-08-14 09:40:35 -07:00
/*
* mosquitto_message->
* int mid;
* char *topic;
* void *payload;
* int payloadlen;
* int qos;
* bool retain;
*/
time(&now);
monitorhook(ud, now, m->topic);
2015-08-14 09:40:35 -07:00
if (m->payloadlen == 0) {
return;
}
if (m->retain == TRUE && ud->ignoreretained) {
return;
}
2015-08-16 08:38:11 -07:00
// printf("%s %s\n", m->topic, bindump(m->payload, m->payloadlen)); fflush(stdout);
2015-08-14 09:40:35 -07:00
utstring_renew(ts);
utstring_renew(basetopic);
utstring_renew(username);
utstring_renew(device);
if (mosquitto_sub_topic_tokenise(m->topic, &topics, &count) != MOSQ_ERR_SUCCESS) {
return;
}
/*
2015-09-26 05:06:35 -07:00
* Do we have a leading / in topic?
* Also, if topic is too short, ignore and return. We *demand* 3 parts
* i.e. "owntracks/user/device"
*/
if (topics[0] == NULL) {
/* Topic has leading / */
skipslash = 1;
}
2015-09-26 05:06:35 -07:00
if (count - skipslash < 3) {
fprintf(stderr, "Ignoring short topic %s\n", m->topic);
mosquitto_sub_topic_tokens_free(&topics, count);
return;
}
/*
* Determine "relative topic", relative to base, i.e. whatever comes
* behind ownntracks/user/device/. If it's the base topic, use "*".
*/
utstring_renew(reltopic);
if (count != (3 + skipslash)) {
int j;
for (j = 3 + skipslash; j < count; j++) {
utstring_printf(reltopic, "%s%c", topics[j], (j < count - 1) ? '/' : ' ');
}
} else {
utstring_printf(reltopic, "*");
}
if (utstring_len(reltopic) == 0)
utstring_printf(reltopic, "-");
2015-08-15 01:42:55 -07:00
utstring_printf(basetopic, "%s/%s/%s", topics[0 + skipslash], topics[1 + skipslash], topics[2 + skipslash]);
utstring_printf(username, "%s", topics[1 + skipslash]);
utstring_printf(device, "%s", topics[2 + skipslash]);
2015-08-14 09:40:35 -07:00
2015-09-28 10:04:37 -07:00
#ifdef WITH_PING
2015-09-21 07:07:31 -07:00
if (!strcmp(UB(username), "ping") && !strcmp(UB(device), "ping")) {
pingping = TRUE;
}
#endif
#ifdef WITH_GREENWICH
/*
* For Greenwich: handle owntracks/user/device/voltage/batt, voltage/ext, and
* status all of which have a numeric payload.
*/
if ((count == 5+skipslash && !strcmp(topics[3+skipslash], "voltage")) &&
(!strcmp(topics[4+skipslash], "batt") || !strcmp(topics[4+skipslash], "ext"))) {
store_gwvalue(UB(username), UB(device), now, topics[4+skipslash], m->payload);
}
if (count == 4+skipslash && !strcmp(topics[3+skipslash], "status")) {
store_gwvalue(UB(username), UB(device), now, "status", m->payload);
}
2015-11-11 13:29:50 -07:00
/* Fall through to store this payload in the REC file as well. */
#endif
mosquitto_sub_topic_tokens_free(&topics, count);
2015-08-15 06:19:27 -07:00
/*
* Now let's see if this contains some sort of valid JSON
* or an OwnTracks CSV. If it doesn't, just store this payload because
* there's nothing left for us to do with it.
2015-08-15 06:19:27 -07:00
*/
if ((json = json_decode(m->payload)) == NULL) {
if ((json = csv_to_json(m->payload)) == NULL) {
2015-10-22 01:48:13 -07:00
#ifdef WITH_RONLY
/*
* If the base topic belongs to an RONLY user, store
* the payload.
*/
2015-10-22 02:22:14 -07:00
if (is_ronly(ud, basetopic)) {
2015-10-22 01:48:13 -07:00
// puts("*** storing plain publis");
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
}
#else
/* It's not JSON or it's not a location CSV; store it */
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
2015-10-22 01:48:13 -07:00
#endif
return;
2015-08-14 09:40:35 -07:00
}
}
2015-08-14 09:40:35 -07:00
if (ud->skipdemo && (json_find_member(json, "_demo") != NULL)) {
json_delete(json);
2015-08-14 09:40:35 -07:00
return;
}
2015-08-15 07:26:41 -07:00
#ifdef WITH_RONLY
/*
* This is a special mode in which location (and a few other)
* publishes will be recorded only if r:true in the payload.
* If we cannot find `r' in the JSON, or if `r' isn't true,
* set r_ok to FALSE. We cannot just bail out here, because
* we still want info, cards &c.
*/
if ((j = json_find_member(json, "r")) == NULL) {
2015-10-22 01:48:13 -07:00
r_ok = FALSE;
2015-10-22 01:48:13 -07:00
/*
* This JSON payload might actually belong to an RONLY user
* but it doesn't have an `r:true' in it. Determine whether
* the basetopic belongs to such a user, and force r_ok
2015-10-22 23:05:27 -07:00
* accordingly. If this is _type:location it holds the definitive
* truth.
2015-10-22 01:48:13 -07:00
*/
2015-10-22 02:22:14 -07:00
if (is_ronly(ud, basetopic)) {
2015-10-22 01:48:13 -07:00
r_ok = TRUE;
// printf("*** forcing TRUE b/c ronlydb (blen=%ld)\n", blen);
}
2015-10-22 23:05:27 -07:00
if ((j = json_find_member(json, "_type")) != NULL) {
if ((j->tag == JSON_STRING) && (strcmp(j->string_, "location") == 0)) {
r_ok = FALSE;
}
}
} else {
r_ok = TRUE;
if ((j->tag != JSON_BOOL) || (j->bool_ == FALSE)) {
r_ok = FALSE;
}
}
2015-10-22 01:48:13 -07:00
/*
2015-10-22 05:47:16 -07:00
* Record the RONLY basetopic in RONLYdb, and indicate active or not
2015-10-22 01:48:13 -07:00
*/
2015-10-22 08:56:57 -07:00
2015-10-22 05:47:16 -07:00
ronly_set(ud, basetopic, r_ok);
2015-10-22 01:48:13 -07:00
#endif
_type = T_UNKNOWN;
if ((j = json_find_member(json, "_type")) != NULL) {
if (j->tag == JSON_STRING) {
_typestr = strdup(j->string_);
if (!strcmp(j->string_, "location")) _type = T_LOCATION;
else if (!strcmp(j->string_, "beacon")) _type = T_BEACON;
else if (!strcmp(j->string_, "card")) _type = T_CARD;
else if (!strcmp(j->string_, "cmd")) _type = T_CMD;
else if (!strcmp(j->string_, "lwt")) _type = T_LWT;
else if (!strcmp(j->string_, "msg")) _type = T_MSG;
else if (!strcmp(j->string_, "steps")) _type = T_STEPS;
else if (!strcmp(j->string_, "transition")) _type = T_TRANSITION;
else if (!strcmp(j->string_, "waypoint")) _type = T_WAYPOINT;
else if (!strcmp(j->string_, "waypoints")) _type = T_WAYPOINTS;
2015-10-24 11:46:41 -07:00
else if (!strcmp(j->string_, "dump")) _type = T_CONFIG;
}
}
2015-08-14 09:40:35 -07:00
switch (_type) {
case T_CARD:
do_info(ud, username, device, json);
goto cleanup;
case T_MSG:
do_msg(ud, username, device, json);
goto cleanup;
case T_BEACON:
#ifdef WITH_HTTP
if (ud->mgserver && !pingping) {
json_append_member(json, "topic", json_mkstring(m->topic));
json_append_member(json, "username", json_mkstring(UB(username)));
json_append_member(json, "device", json_mkstring(UB(device)));
http_ws_push_json(ud->mgserver, json);
}
#endif
if (r_ok) {
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
}
goto cleanup;
case T_CMD:
case T_LWT:
case T_STEPS:
if (r_ok) {
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
}
goto cleanup;
case T_WAYPOINTS:
waypoints_dump(ud, username, device, m->payload);
goto cleanup;
2015-10-24 11:46:41 -07:00
case T_CONFIG:
2015-10-20 00:37:24 -07:00
config_dump(ud, username, device, m->payload);
goto cleanup;
case T_WAYPOINT:
case T_TRANSITION:
case T_LOCATION:
break;
default:
2015-10-22 01:48:13 -07:00
if (r_ok) {
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
}
goto cleanup;
}
2015-08-14 09:40:35 -07:00
2015-10-21 07:51:46 -07:00
if (r_ok == FALSE)
2015-10-21 07:51:46 -07:00
goto cleanup;
2015-08-15 00:09:09 -07:00
/*
* We are now handling location-related JSON. Normalize tst, lat, lon
* to numbers, particularly for Greenwich which produces strings
* currently. We're normalizing *in* json which replaces strings by
* numbers.
2015-08-15 00:09:09 -07:00
*/
tst = time(NULL);
if ((j = json_find_member(json, "tst")) != NULL) {
2015-12-09 04:52:20 -07:00
if (j->tag == JSON_STRING) {
tst = strtoul(j->string_, NULL, 10);
json_remove_from_parent(j);
json_append_member(json, "tst", json_mknumber(tst));
} else {
tst = (unsigned long)j->number_;
}
}
if (isnan(lat = number(json, "lat")) || isnan(lon = number(json, "lon"))) {
olog(LOG_ERR, "lat or lon for %s are NaN: %s", m->topic, bindump(m->payload, m->payloadlen));
goto cleanup;
}
2015-08-14 09:40:35 -07:00
2015-12-09 04:52:20 -07:00
if ((j = json_find_member(json, "acc")) != NULL) {
if (j->tag == JSON_STRING) {
acc = atof(j->string_);
json_remove_from_parent(j);
json_append_member(json, "acc", json_mknumber(acc));
}
}
if ((j = json_find_member(json, "tid")) != NULL) {
if (j->tag == JSON_STRING) {
tid = strdup(j->string_);
2015-08-14 09:40:35 -07:00
}
}
if ((j = json_find_member(json, "t")) != NULL) {
if (j && j->tag == JSON_STRING) {
t = strdup(j->string_);
}
2015-08-14 09:40:35 -07:00
}
#if 0
/* Haversine */
{
double d = haversine_dist(lat, lon, 52.03431, 8.47654);
printf("*** d=%lf meters\n", d);
}
#endif
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-24 04:18:16 -07:00
/*
* If the topic we are handling is in topic2tid, replace the TID
* in this payload with that from the database.
*/
if (ud->t2t) {
char newtid[BUFSIZ];
long blen;
if ((blen = gcache_get(ud->t2t, m->topic, newtid, sizeof(newtid))) > 0) {
if ((j = json_find_member(json, "tid")) != NULL)
json_remove_from_parent(j);
json_append_member(json, "tid", json_mkstring(newtid));
}
}
#endif
2015-08-15 00:09:09 -07:00
/*
* Chances are high that what we have now contains lat, lon. Attempt to
* perform or retrieve reverse-geo.
2015-08-15 00:09:09 -07:00
*/
2015-08-14 09:40:35 -07:00
utstring_renew(ghash);
utstring_renew(addr);
utstring_renew(cc);
p = geohash_encode(lat, lon, geohash_prec());
2015-08-14 09:40:35 -07:00
if (p != NULL) {
utstring_printf(ghash, "%s", p);
free(p);
}
cached = FALSE;
if (ud->revgeo == TRUE) {
2015-10-22 12:10:15 -07:00
#ifdef WITH_LMDB
2015-09-21 07:07:31 -07:00
if ((geo = gcache_json_get(ud->gc, UB(ghash))) != NULL) {
2015-09-01 04:31:49 -07:00
/* Habemus cached data */
cached = TRUE;
2015-08-14 09:40:35 -07:00
2015-09-01 04:31:49 -07:00
if ((j = json_find_member(geo, "cc")) != NULL) {
utstring_printf(cc, "%s", j->string_);
}
if ((j = json_find_member(geo, "addr")) != NULL) {
utstring_printf(addr, "%s", j->string_);
}
} else {
if ((geo = revgeo(ud, lat, lon, addr, cc)) != NULL) {
2015-09-21 07:07:31 -07:00
gcache_json_put(ud->gc, UB(ghash), geo);
2015-08-21 10:31:26 -07:00
} else {
/* We didn't obtain reverse Geo, maybe because of over
* quota; make a note of the missing geohash */
char gfile[BUFSIZ];
FILE *fp;
snprintf(gfile, BUFSIZ, "%s/ghash/missing", STORAGEDIR);
if ((fp = fopen(gfile, "a")) != NULL) {
2015-09-21 07:07:31 -07:00
fprintf(fp, "%s %lf %lf\n", UB(ghash), lat, lon);
2015-08-21 10:31:26 -07:00
fclose(fp);
}
}
2015-08-14 09:40:35 -07:00
}
2015-10-22 12:10:15 -07:00
#else /* !LMDB */
if ((geo = revgeo(ud, lat, lon, addr, cc)) != NULL) {
2015-10-22 12:10:15 -07:00
;
}
#endif /* LMDB */
} else {
utstring_printf(cc, "??");
utstring_printf(addr, "n.a.");
2015-08-14 09:40:35 -07:00
}
2015-09-19 07:11:53 -07:00
/*
* We have normalized data in the JSON, so we can now write it
* out to the REC file.
2015-09-19 07:11:53 -07:00
*/
2015-09-08 03:57:09 -07:00
if (!pingping) {
if ((jsonstring = json_stringify(json, NULL)) != NULL) {
putrec(ud, now, reltopic, username, device, jsonstring);
free(jsonstring);
}
2015-09-19 07:11:53 -07:00
}
2015-09-08 03:57:09 -07:00
2015-09-19 07:11:53 -07:00
/*
* Append a few bits to the location type to add to LAST and
* for Lua / Websockets.
2015-09-19 07:11:53 -07:00
* I need a unique "key" in the Websocket clients to keep track
* of which device is being updated; use topic.
*/
2015-09-08 03:57:09 -07:00
json_append_member(json, "topic", json_mkstring(m->topic));
2015-09-08 03:57:09 -07:00
2015-09-19 07:11:53 -07:00
/*
* We have to know which user/device this is for in order to
* determine whether a connected Websocket client is authorized
* to see this. Add user/device
*/
2015-09-21 07:07:31 -07:00
json_append_member(json, "username", json_mkstring(UB(username)));
json_append_member(json, "device", json_mkstring(UB(device)));
2015-08-14 09:40:35 -07:00
2015-09-21 07:07:31 -07:00
json_append_member(json, "ghash", json_mkstring(UB(ghash)));
2015-09-21 07:07:31 -07:00
if (_type == T_LOCATION || _type == T_WAYPOINT) {
UT_string *filename = NULL;
char *component;
2015-09-21 07:07:31 -07:00
utstring_renew(filename);
2015-09-01 04:31:49 -07:00
2015-09-21 07:07:31 -07:00
if (_type == T_LOCATION) {
component = "last";
utstring_printf(filename, "%s-%s.json",
UB(username), UB(device));
} else if (_type == T_WAYPOINT) {
component = "waypoints";
utstring_printf(filename, "%s.json", isotime(tst));
2015-09-21 03:05:16 -07:00
}
2015-09-21 07:07:31 -07:00
2015-09-21 03:05:16 -07:00
if ((jsonstring = json_stringify(json, NULL)) != NULL) {
2015-09-21 07:07:31 -07:00
utstring_printf(ts, "%s/%s/%s/%s",
STORAGEDIR,
component,
UB(username),
UB(device));
if (mkpath(UB(ts)) < 0) {
olog(LOG_ERR, "Cannot mkdir %s: %m", UB(ts));
2015-09-21 03:05:16 -07:00
}
2015-09-21 07:07:31 -07:00
utstring_printf(ts, "/%s", UB(filename));
safewrite(UB(ts), jsonstring);
free(jsonstring);
2015-08-14 09:40:35 -07:00
}
}
/*
* Now add more bits for Lua and Websocket, in particular the
* Geo data.
*/
if (geo) {
json_copy_to_object(json, geo, FALSE);
}
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
if (ud->mgserver && !pingping) {
http_ws_push_json(ud->mgserver, json);
}
#endif
2015-08-14 09:40:35 -07:00
#ifdef WITH_LUA
2015-10-22 12:10:15 -07:00
# ifdef WITH_LMDB
if (ud->luadata && !pingping) {
hooks_hook(ud, m->topic, json);
2015-08-16 08:38:11 -07:00
}
2015-10-22 12:10:15 -07:00
# endif /* LMDB */
#endif
2015-08-16 08:38:11 -07:00
if (ud->verbose) {
if (_type == T_LOCATION) {
printf("%c %s %-35s t=%-1.1s tid=%-2.2s loc=%.5f,%.5f [%s] %s (%s)\n",
(cached) ? '*' : '-',
ltime(tst),
m->topic,
(t) ? t : " ",
(tid) ? tid : "",
lat, lon,
2015-09-21 07:07:31 -07:00
UB(cc),
UB(addr),
UB(ghash)
);
2015-09-21 03:05:16 -07:00
} else if (_type == T_TRANSITION) {
JsonNode *e, *d;
e = json_find_member(json, "event");
d = json_find_member(json, "desc");
printf("transition: %s %s\n",
(e) ? e->string_ : "unknown",
(d) ? d->string_ : "unknown");
} else if (_type == T_WAYPOINT) {
j = json_find_member(json, "desc");
printf("waypoint: %s\n", (j) ? j->string_ : "unknown desc");
} else {
if ((jsonstring = json_stringify(json, NULL)) != NULL) {
printf("%s %s\n", _typestr, jsonstring);
free(jsonstring);
} else {
printf("%s received\n", _typestr);
}
}
}
2015-08-14 09:40:35 -07:00
cleanup:
if (geo) json_delete(geo);
if (json) json_delete(json);
if (tid) free(tid);
if (t) free(t);
if (_typestr) free(_typestr);
2015-08-14 09:40:35 -07:00
}
void on_connect(struct mosquitto *mosq, void *userdata, int rc)
{
struct udata *ud = (struct udata *)userdata;
int mid;
2015-09-17 23:23:10 -07:00
JsonNode *t;
2015-08-14 09:40:35 -07:00
2015-09-17 23:23:10 -07:00
json_foreach(t, ud->topics) {
if (t->tag == JSON_STRING) {
olog(LOG_DEBUG, "Subscribing to %s (qos=%d)", t->string_, ud->qos);
mosquitto_subscribe(mosq, &mid, t->string_, ud->qos);
}
2015-08-14 09:40:35 -07:00
}
}
2015-09-25 08:03:07 -07:00
static char *mosquitto_reason(int rc)
{
static char *reasons[] = {
"Connection accepted", /* 0x00 */
"Connection refused: incorrect protocol version", /* 0x01 */
"Connection refused: invalid client identifier", /* 0x02 */
"Connection refused: server unavailable", /* 0x03 */
"Connection refused: bad username or password", /* 0x05 */
"Connection refused: not authorized", /* 0x06 */
"Connection refused: TLS error", /* 0x07 */
};
return ((rc >= 0 && rc <= 0x07) ? reasons[rc] : "unknown reason");
}
2015-08-14 09:40:35 -07:00
void on_disconnect(struct mosquitto *mosq, void *userdata, int reason)
{
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-08-14 09:40:35 -07:00
struct udata *ud = (struct udata *)userdata;
#endif
if (reason == 0) { // client wish
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-01 04:31:49 -07:00
gcache_close(ud->gc);
#endif
2015-09-25 08:03:07 -07:00
} else {
olog(LOG_INFO, "Disconnected. Reason: 0x%X [%s]", reason, mosquitto_reason(reason));
2015-08-14 09:40:35 -07:00
}
}
static void catcher(int sig)
{
fprintf(stderr, "Going down on signal %d\n", sig);
2015-09-19 07:11:53 -07:00
run = 0;
2015-08-14 09:40:35 -07:00
}
void usage(char *prog)
{
2015-08-26 12:32:12 -07:00
printf("Usage: %s [options..] topic [topic ...]\n", prog);
printf(" --help -h this message\n");
printf(" --storage -S storage dir (%s)\n", STORAGEDEFAULT);
2015-08-26 12:32:12 -07:00
printf(" --norevgeo -G disable ghash to reverge-geo lookups\n");
printf(" --skipdemo -D do not handle objects with _demo\n");
printf(" --useretained -R process retained messages (default: no)\n");
printf(" --clientid -i MQTT client-ID\n");
printf(" --qos -q MQTT QoS (dflt: 2)\n");
printf(" --pubprefix -P republish prefix (dflt: no republish)\n");
printf(" --host -H MQTT host (localhost)\n");
printf(" --port -p MQTT port (1883)\n");
printf(" --logfacility syslog facility (local0)\n");
printf(" --quiet disable printing of messages to stdout\n");
printf(" --initialize initialize storage\n");
printf(" --label <label> server label (dflt: Recorder)\n");
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
printf(" --http-host <host> HTTP addr to bind to (localhost)\n");
2015-09-18 22:55:17 -07:00
printf(" --http-port <port> -A HTTP port (8083); 0 to disable HTTP\n");
2015-09-11 06:48:56 -07:00
printf(" --doc-root <directory> document root (%s)\n", DOCROOT);
2015-09-19 07:11:53 -07:00
#endif
#ifdef WITH_LUA
printf(" --lua-script <script.lua> path to Lua script. If unset, no Lua hooks\n");
2015-08-27 14:42:18 -07:00
#endif
printf(" --precision ghash precision (dflt: %d)\n", GHASHPREC);
2015-09-11 08:32:40 -07:00
printf(" --hosted use OwnTracks Hosted\n");
2015-10-23 08:03:19 -07:00
printf(" --norec don't maintain REC files\n");
2016-01-05 02:02:52 -07:00
printf(" --geokey optional Google reverse-geo API key\n");
printf("\n");
printf("Options override these environment variables:\n");
printf(" $OTR_HOST MQTT hostname\n");
printf(" $OTR_PORT MQTT port\n");
printf(" $OTR_STORAGEDIR\n");
printf(" $OTR_USER\n");
printf(" $OTR_PASS\n");
printf(" $OTR_CAFILE PEM CA certificate chain\n");
2015-09-11 08:32:40 -07:00
printf("For --hosted:\n");
printf(" $OTR_USER username as registered on Hosted\n");
printf(" $OTR_DEVICE connect as device\n");
printf(" $OTR_TOKEN device token\n");
2015-08-26 12:32:12 -07:00
exit(1);
2015-08-14 09:40:35 -07:00
}
2015-08-26 12:32:12 -07:00
2015-08-14 09:40:35 -07:00
int main(int argc, char **argv)
{
struct mosquitto *mosq = NULL;
2015-09-11 08:32:40 -07:00
char err[1024], *p, *username, *password, *cafile, *device;
char *hostname = "localhost", *logfacility = "local0";
2015-09-19 07:11:53 -07:00
#ifdef WITH_LUA
char *luascript = NULL;
#endif
2015-08-14 09:40:35 -07:00
int port = 1883;
2015-10-23 09:58:45 -07:00
int loop_timeout = 0;
int rc, i, ch, hosted = FALSE, initialize = FALSE;
2015-08-14 09:40:35 -07:00
static struct udata udata, *ud = &udata;
struct utsname uts;
UT_string *clientid;
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
2015-08-27 14:42:18 -07:00
int http_port = 8083;
2015-09-11 06:48:56 -07:00
char *doc_root = DOCROOT;
char *http_host = "localhost";
2015-08-14 09:40:35 -07:00
#endif
char *progname = *argv;
udata.qos = DEFAULT_QOS;
2015-08-14 09:40:35 -07:00
udata.ignoreretained = TRUE;
udata.pubprefix = NULL;
udata.skipdemo = TRUE;
2015-08-19 08:39:00 -07:00
udata.revgeo = TRUE;
udata.verbose = TRUE;
2015-10-23 08:03:19 -07:00
udata.norec = FALSE;
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-01 04:31:49 -07:00
udata.gc = NULL;
2015-09-24 04:18:16 -07:00
udata.t2t = NULL; /* Topic to TID */
2015-10-22 01:48:13 -07:00
# ifdef WITH_RONLY
udata.ronlydb = NULL; /* RONLY db */
# endif
2015-09-01 04:31:49 -07:00
#endif
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
udata.mgserver = NULL;
2015-08-27 14:42:18 -07:00
#endif
2015-09-19 07:11:53 -07:00
#ifdef WITH_LUA
2015-09-28 10:04:37 -07:00
# ifdef WITH_LMDB
2015-10-22 12:10:15 -07:00
udata.luadata = NULL;
2015-09-25 08:43:17 -07:00
udata.luadb = NULL;
# endif /* WITH_LMDB */
#endif /* WITH_LUA */
udata.label = strdup("Recorder");
udata.geokey = NULL; /* default: no API key */
2015-08-14 09:40:35 -07:00
2015-08-26 12:32:12 -07:00
if ((p = getenv("OTR_HOST")) != NULL) {
hostname = strdup(p);
}
if ((p = getenv("OTR_PORT")) != NULL) {
port = atoi(p);
}
if ((p = getenv("OTR_STORAGEDIR")) != NULL) {
strcpy(STORAGEDIR, p);
}
utstring_new(clientid);
utstring_printf(clientid, "ot-recorder");
if (uname(&uts) == 0) {
utstring_printf(clientid, "-%s", uts.nodename);
}
utstring_printf(clientid, "-%d", getpid());
2015-08-26 12:32:12 -07:00
while (1) {
static struct option long_options[] = {
{ "help", no_argument, 0, 'h'},
{ "skipdemo", no_argument, 0, 'D'},
{ "norevgeo", no_argument, 0, 'G'},
{ "useretained", no_argument, 0, 'R'},
{ "clientid", required_argument, 0, 'i'},
{ "pubprefix", required_argument, 0, 'P'},
{ "qos", required_argument, 0, 'q'},
{ "host", required_argument, 0, 'H'},
{ "port", required_argument, 0, 'p'},
{ "storage", required_argument, 0, 'S'},
{ "logfacility", required_argument, 0, 4},
{ "precision", required_argument, 0, 5},
2015-09-11 08:32:40 -07:00
{ "hosted", no_argument, 0, 6},
{ "quiet", no_argument, 0, 8},
{ "initialize", no_argument, 0, 9},
{ "label", required_argument, 0, 10},
2015-10-23 08:03:19 -07:00
{ "norec", no_argument, 0, 11},
{ "geokey", required_argument, 0, 12},
2015-09-19 07:11:53 -07:00
#ifdef WITH_LUA
{ "lua-script", required_argument, 0, 7},
#endif
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
{ "http-host", required_argument, 0, 3},
2015-08-27 14:42:18 -07:00
{ "http-port", required_argument, 0, 'A'},
{ "doc-root", required_argument, 0, 2},
#endif
2015-08-26 12:32:12 -07:00
{0, 0, 0, 0}
};
int optindex = 0;
2015-09-01 04:31:49 -07:00
ch = getopt_long(argc, argv, "hDGRi:P:q:S:H:p:A:", long_options, &optindex);
2015-08-26 12:32:12 -07:00
if (ch == -1)
break;
2015-08-14 09:40:35 -07:00
switch (ch) {
case 12:
udata.geokey = strdup(optarg);
break;
2015-10-23 08:03:19 -07:00
case 11:
udata.norec = TRUE;
break;
case 10:
free(udata.label);
udata.label = strdup(optarg);
break;
case 9:
initialize = TRUE;
break;
case 8:
ud->verbose = FALSE;
break;
2015-09-19 07:11:53 -07:00
#ifdef WITH_LUA
case 7:
luascript = strdup(optarg);
break;
#endif
2015-09-11 08:32:40 -07:00
case 6:
hosted = TRUE;
break;
case 5:
geohash_setprec(atoi(optarg));
break;
case 4:
logfacility = strdup(optarg);
break;
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
2015-08-27 14:42:18 -07:00
case 'A': /* API */
http_port = atoi(optarg);
break;
case 2: /* no short char */
doc_root = strdup(optarg);
break;
case 3: /* no short char */
http_host = strdup(optarg);
break;
2015-08-27 14:42:18 -07:00
#endif
2015-08-14 09:40:35 -07:00
case 'D':
ud->skipdemo = FALSE;
break;
case 'G':
ud->revgeo = FALSE;
break;
case 'i':
utstring_clear(clientid);
utstring_printf(clientid, "%s", optarg);
break;
2015-08-14 09:40:35 -07:00
case 'P':
2015-08-26 12:32:12 -07:00
udata.pubprefix = strdup(optarg); /* TODO: do we want this? */
2015-08-14 09:40:35 -07:00
break;
case 'q':
ud->qos = atoi(optarg);
if (ud->qos < 0 || ud->qos > 2) {
fprintf(stderr, "%s: illegal qos\n", progname);
exit(2);
}
break;
2015-08-14 09:40:35 -07:00
case 'R':
ud->ignoreretained = FALSE;
break;
2015-08-26 12:32:12 -07:00
case 'H':
hostname = strdup(optarg);
break;
case 'p':
port = atoi(optarg);
2015-08-14 09:40:35 -07:00
break;
2015-08-26 12:32:12 -07:00
case 'S':
strcpy(STORAGEDIR, optarg);
break;
case 'h':
usage(progname);
exit(0);
default:
2015-09-11 08:32:40 -07:00
exit(1);
2015-08-14 09:40:35 -07:00
}
2015-08-26 12:32:12 -07:00
2015-08-14 09:40:35 -07:00
}
/*
* If requested to, attempt to create ghash storage and
* initialize (non-destructively -- just open for write)
* the LMDB databases.
*/
if (initialize == TRUE) {
2015-10-22 12:10:15 -07:00
#ifdef WITH_LMDB
struct gcache *gt;
2015-10-22 12:10:15 -07:00
#endif
char path[BUFSIZ], *pp;
snprintf(path, BUFSIZ, "%s/ghash", STORAGEDIR);
if (!is_directory(path)) {
pp = strdup(path);
if (mkpath(pp) < 0) {
fprintf(stderr, "Cannot mkdir %s: %s", path, strerror(errno));
exit(2);
}
free(pp);
}
#ifdef WITH_LMDB
if ((gt = gcache_open(path, NULL, FALSE)) == NULL) {
fprintf(stderr, "Cannot lmdb-open MainDB\n");
exit(2);
}
gcache_close(gt);
if ((gt = gcache_open(path, "topic2tid", FALSE)) == NULL) {
fprintf(stderr, "Cannot lmdb-open `topic2tid'\n");
exit(2);
}
gcache_close(gt);
#ifdef WITH_LUA
if ((gt = gcache_open(path, "luadb", FALSE)) == NULL) {
fprintf(stderr, "Cannot lmdb-open `luadb'\n");
exit(2);
}
gcache_close(gt);
#endif /* !LUA */
2015-10-22 01:48:13 -07:00
#ifdef WITH_RONLY
if ((gt = gcache_open(path, "ronlydb", FALSE)) == NULL) {
fprintf(stderr, "Cannot lmdb-open `ronly'\n");
exit(2);
}
gcache_close(gt);
#endif /* !RONLY */
#endif
exit(0);
}
2015-08-26 12:32:12 -07:00
argc -= (optind);
argv += (optind);
2015-08-14 09:40:35 -07:00
2015-08-26 12:32:12 -07:00
if (argc < 1) {
2015-08-14 09:40:35 -07:00
usage(progname);
return (-1);
}
2015-09-11 08:32:40 -07:00
if (hosted) {
char tmp[BUFSIZ];
2015-09-12 05:40:19 -07:00
hostname = strdup("hosted-mqtt.owntracks.org");
2015-09-11 08:32:40 -07:00
port = 8883;
if ((username = getenv("OTR_USER")) == NULL) {
fprintf(stderr, "%s requires $OTR_USER\n", progname);
exit(1);
}
if ((device = getenv("OTR_DEVICE")) == NULL) {
fprintf(stderr, "%s requires $OTR_DEVICE\n", progname);
exit(1);
}
if ((password = getenv("OTR_TOKEN")) == NULL) {
fprintf(stderr, "%s requires $OTR_TOKEN\n", progname);
exit(1);
}
if ((cafile = getenv("OTR_CAFILE")) == NULL) {
fprintf(stderr, "%s requires $OTR_CAFILE\n", progname);
exit(1);
}
utstring_renew(clientid);
utstring_printf(clientid, "ot-RECORDER-%s-%s", username, device);
if (uname(&uts) == 0) {
utstring_printf(clientid, "-%s", uts.nodename);
}
snprintf(tmp, sizeof(tmp), "%s|%s", username, device);
username = strdup(tmp);
} else {
username = getenv("OTR_USER");
password = getenv("OTR_PASS");
}
openlog("ot-recorder", LOG_PID | LOG_PERROR, syslog_facility_code(logfacility));
2015-09-01 05:08:28 -07:00
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
if (http_port) {
if (!is_directory(doc_root)) {
2015-09-03 23:06:53 -07:00
olog(LOG_ERR, "%s is not a directory", doc_root);
exit(1);
}
/* First arg is user data which I can grab via conn->server_param */
udata.mgserver = mg_create_server(ud, ev_handler);
2015-09-01 05:08:28 -07:00
}
#endif
2015-09-03 23:06:53 -07:00
olog(LOG_DEBUG, "starting");
if (ud->revgeo == TRUE) {
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-01 04:31:49 -07:00
char db_filename[BUFSIZ], *pa;
snprintf(db_filename, BUFSIZ, "%s/ghash", STORAGEDIR);
pa = strdup(db_filename);
mkpath(pa);
free(pa);
2015-09-07 05:08:09 -07:00
udata.gc = gcache_open(db_filename, NULL, FALSE);
2015-09-01 04:31:49 -07:00
if (udata.gc == NULL) {
2015-09-03 23:06:53 -07:00
olog(LOG_ERR, "Can't initialize gcache in %s", db_filename);
2015-09-01 04:31:49 -07:00
exit(1);
}
storage_init(ud->revgeo); /* For the HTTP server */
2015-09-01 04:31:49 -07:00
#endif
revgeo_init();
}
2015-08-14 09:40:35 -07:00
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-24 04:18:16 -07:00
snprintf(err, sizeof(err), "%s/ghash", STORAGEDIR);
ud->t2t = gcache_open(err, "topic2tid", TRUE);
2015-09-25 08:43:17 -07:00
# ifdef WITH_LUA
ud->luadb = gcache_open(err, "luadb", FALSE);
# endif
2015-10-22 01:48:13 -07:00
# ifdef WITH_RONLY
ud->ronlydb = gcache_open(err, "ronlydb", FALSE);
# endif
2015-09-25 08:43:17 -07:00
#endif
2015-10-22 12:10:15 -07:00
#if WITH_LUA && WITH_LMDB
2015-09-25 08:43:17 -07:00
/*
* If option for lua-script has not been given, ignore all hooks.
*/
if (luascript) {
2015-10-18 23:48:45 -07:00
if ((udata.luadata = hooks_init(ud, luascript)) == NULL) {
2015-10-18 23:47:10 -07:00
olog(LOG_ERR, "Stopping because Lua load failed");
2015-10-18 23:48:45 -07:00
exit(1);
}
2015-09-25 08:43:17 -07:00
}
2015-09-24 04:18:16 -07:00
#endif
2015-08-14 09:40:35 -07:00
mosquitto_lib_init();
signal(SIGINT, catcher);
2015-09-19 07:11:53 -07:00
signal(SIGTERM, catcher);
2015-08-14 09:40:35 -07:00
2015-09-21 07:07:31 -07:00
mosq = mosquitto_new(UB(clientid), CLEAN_SESSION, (void *)&udata);
2015-08-14 09:40:35 -07:00
if (!mosq) {
fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
/*
* Pushing list of topics into the array so that we can (re)subscribe on_connect()
*/
2015-09-17 23:23:10 -07:00
ud->topics = json_mkarray();
2015-08-26 12:32:12 -07:00
for (i = 0; i < argc; i++) {
2015-09-17 23:23:10 -07:00
json_append_element(ud->topics, json_mkstring(argv[i]));
2015-08-14 09:40:35 -07:00
}
mosquitto_reconnect_delay_set(mosq,
2, /* delay */
20, /* delay_max */
0); /* exponential backoff */
mosquitto_message_callback_set(mosq, on_message);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
2015-09-11 08:32:40 -07:00
if (username && password) {
2015-08-14 09:40:35 -07:00
mosquitto_username_pw_set(mosq, username, password);
}
cafile = getenv("OTR_CAFILE");
if (cafile && *cafile) {
rc = mosquitto_tls_set(mosq,
cafile, /* cafile */
NULL, /* capath */
NULL, /* certfile */
NULL, /* keyfile */
NULL /* pw_callback() */
);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "Cannot set TLS CA: %s (check path names)\n",
mosquitto_strerror(rc));
exit(3);
}
mosquitto_tls_opts_set(mosq,
SSL_VERIFY_PEER,
NULL, /* tls_version: "tlsv1.2", "tlsv1" */
NULL /* ciphers */
);
}
2015-09-13 02:06:38 -07:00
if (hosted) {
2015-09-21 07:07:31 -07:00
olog(LOG_INFO, "connecting to Hosted as clientID %s", UB(clientid));
2015-09-20 08:06:56 -07:00
} else {
olog(LOG_INFO, "connecting to MQTT on %s:%d as clientID %s %s TLS",
hostname, port,
2015-09-21 07:07:31 -07:00
UB(clientid),
2015-09-20 08:06:56 -07:00
(cafile) ? "with" : "without");
2015-09-13 02:06:38 -07:00
}
2015-08-14 09:40:35 -07:00
rc = mosquitto_connect(mosq, hostname, port, 60);
if (rc) {
if (rc == MOSQ_ERR_ERRNO) {
strerror_r(errno, err, 1024);
fprintf(stderr, "Error: %s\n", err);
} else {
2015-09-25 08:03:07 -07:00
fprintf(stderr, "Unable to connect (%d) [%s]: %s.\n",
rc, mosquitto_strerror(rc), mosquitto_reason(rc));
2015-08-14 09:40:35 -07:00
}
mosquitto_lib_cleanup();
return rc;
}
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
if (http_port) {
char address[BUFSIZ];
2015-09-28 03:47:13 -07:00
const char *addressinfo;
sprintf(address, "%s:%d", http_host, http_port);
mg_set_option(udata.mgserver, "listening_port", address);
// mg_set_option(udata.mgserver, "listening_port", "8090,ssl://8091:cert.pem");
2015-08-27 14:42:18 -07:00
// mg_set_option(udata.mgserver, "ssl_certificate", "cert.pem");
// mg_set_option(udata.mgserver, "listening_port", "8091");
2015-08-27 14:42:18 -07:00
mg_set_option(udata.mgserver, "document_root", doc_root);
mg_set_option(udata.mgserver, "enable_directory_listing", "yes");
2015-09-17 23:15:59 -07:00
mg_set_option(udata.mgserver, "auth_domain", "owntracks-recorder");
// mg_set_option(udata.mgserver, "access_log_file", "access.log");
// mg_set_option(udata.mgserver, "cgi_pattern", "**.cgi");
2015-08-27 14:42:18 -07:00
2015-09-28 03:47:13 -07:00
addressinfo = mg_get_option(udata.mgserver, "listening_port");
olog(LOG_INFO, "HTTP listener started on %s", addressinfo);
if (addressinfo == NULL || *addressinfo == 0) {
olog(LOG_ERR, "HTTP port is in use. Exiting.");
exit(2);
}
}
2015-08-27 14:42:18 -07:00
#endif
2015-10-13 22:20:41 -07:00
olog(LOG_INFO, "Using storage at %s with precision %d", STORAGEDIR, geohash_prec());
2015-10-23 09:58:45 -07:00
#ifdef WITH_HTTP
if (udata.mgserver == NULL)
loop_timeout = 1000;
#else
loop_timeout = 1000;
#endif
2015-08-14 09:40:35 -07:00
while (run) {
2015-10-23 09:58:45 -07:00
rc = mosquitto_loop(mosq, loop_timeout, /* max-packets */ 1);
if (run && rc) {
2015-09-03 23:06:53 -07:00
olog(LOG_INFO, "MQTT connection: rc=%d [%s]. Sleeping...", rc, mosquitto_strerror(rc));
sleep(10);
mosquitto_reconnect(mosq);
}
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
if (udata.mgserver) {
2015-09-29 00:15:04 -07:00
mg_poll_server(udata.mgserver, 100);
}
2015-08-27 14:42:18 -07:00
#endif
2015-08-14 09:40:35 -07:00
}
2015-09-17 23:23:10 -07:00
json_delete(ud->topics);
2015-09-28 10:04:37 -07:00
#ifdef WITH_LMDB
2015-09-24 04:18:16 -07:00
if (ud->t2t)
gcache_close(ud->t2t);
2015-09-25 08:43:17 -07:00
# ifdef WITH_LUA
if (ud->luadb)
gcache_close(ud->luadb);
# endif
2015-09-24 04:18:16 -07:00
#endif
free(ud->label);
2015-09-28 10:04:37 -07:00
#ifdef WITH_HTTP
mg_destroy_server(&udata.mgserver);
#endif
2015-09-19 07:11:53 -07:00
2015-10-22 12:10:15 -07:00
#if WITH_LUA && WITH_LMDB
2015-09-19 07:11:53 -07:00
hooks_exit(ud->luadata, "recorder stops");
#endif
2015-08-14 09:40:35 -07:00
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return (0);
}