recorder/ot-recorder.c

754 lines
18 KiB
C
Raw Normal View History

2015-08-14 09:40:35 -07:00
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <mosquitto.h>
#include <getopt.h>
#include <time.h>
#include "json.h"
#include <sys/utsname.h>
#include "utstring.h"
#include "utarray.h"
#include "geo.h"
#include "ghash.h"
#include "config.h"
#include "file.h"
#include "safewrite.h"
#include "base64.h"
2015-08-14 23:14:34 -07:00
#include "misc.h"
2015-08-14 09:40:35 -07:00
#define SSL_VERIFY_PEER (1)
#define SSL_VERIFY_NONE (0)
/*
* 1 => 5,009.4km x 4,992.6km
* 2 => 1,252.3km x 624.1km
* 3 => 156.5km x 156km
* 4 => 39.1km x 19.5km
* 5 => 4.9km x 4.9km
* 6 => 1.2km x 609.4m
* 7 => 152.9m x 152.4m
* 8 => 38.2m x 19m
* 9 => 4.8m x 4.8m
* 10 => 1.2m x 59.5cm
*/
#define GEOHASH_PREC (7)
#define TOPIC_PARTS (4) /* owntracks/user/device/info */
#define TOPIC_SUFFIX "info"
static int run = 1;
double number(JsonNode *j, char *element)
{
JsonNode *m;
if ((m = json_find_member(j, element)) != NULL) {
if (m->tag == JSON_NUMBER) {
return (m->number_);
} else if (m->tag == JSON_STRING) {
return (atof(m->string_));
}
}
return (-7.0L);
}
JsonNode *extract(struct udata *ud, char *payload, char *tid, char *t, double *lat, double *lon, long *tst)
{
JsonNode *json, *j;
*tid = *t = 0;
*lat = *lon = -1.0L;
if ((json = json_decode(payload)) == NULL)
return (NULL);
if (ud->skipdemo && (json_find_member(json, "_demo") != NULL)) {
json_delete(json);
return (NULL);
}
if ((j = json_find_member(json, "_type")) == NULL) {
json_delete(json);
return (NULL);
}
if ((j->tag != JSON_STRING) || (strcmp(j->string_, "location") != 0)) {
json_delete(json);
return (NULL);
}
if ((j = json_find_member(json, "tid")) != NULL) {
if (j->tag == JSON_STRING) {
// printf("I got: [%s]\n", m->string_);
strcpy(tid, j->string_);
}
}
if ((j = json_find_member(json, "t")) != NULL) {
if (j && j->tag == JSON_STRING) {
strcpy(t, j->string_);
}
}
*tst = time(NULL);
if ((j = json_find_member(json, "tst")) != NULL) {
if (j && j->tag == JSON_STRING) {
*tst = strtoul(j->string_, NULL, 10);
} else {
*tst = (unsigned long)j->number_;
}
}
*lat = number(json, "lat");
*lon = number(json, "lon");
return (json);
}
static const char *isotime(time_t t) {
static char buf[] = "YYYY-MM-DDTHH:MM:SSZ";
strftime(buf, sizeof(buf), "%FT%TZ", gmtime(&t));
return(buf);
}
static const char *ltime(time_t t) {
static char buf[] = "HH:MM:SS";
strftime(buf, sizeof(buf), "%T", localtime(&t));
return(buf);
}
void do_info(void *userdata, UT_string *username, UT_string *device, char *payload)
{
struct udata *ud = (struct udata *)userdata;
JsonNode *json, *j;
static UT_string *name = NULL, *face = NULL;
#ifdef HAVE_REDIS
redisReply *r;
#endif
FILE *fp;
char *img;
utstring_renew(name);
utstring_renew(face);
if ((json = json_decode(payload)) == NULL) {
2015-08-14 23:41:44 -07:00
fprintf(stderr, "Can't decode INFO payload for username=%s\n", utstring_body(username));
2015-08-14 09:40:35 -07:00
return;
}
if (ud->skipdemo && (json_find_member(json, "_demo") != NULL)) {
goto cleanup;
}
if ((j = json_find_member(json, "_type")) == NULL) {
goto cleanup;
}
if ((j->tag != JSON_STRING) || (strcmp(j->string_, "card") != 0)) {
goto cleanup;
}
if (ud->usefiles) {
/* I know the payload is valid JSON: write card */
if ((fp = pathn("wb", "cards", username, NULL, "json")) != NULL) {
fprintf(fp, "%s\n", payload);
fclose(fp);
}
}
if ((j = json_find_member(json, "name")) != NULL) {
if (j->tag == JSON_STRING) {
// printf("I got: [%s]\n", j->string_);
utstring_printf(name, "%s", j->string_);
}
}
if ((j = json_find_member(json, "face")) != NULL) {
if (j->tag == JSON_STRING) {
// printf("I got: [%s]\n", j->string_);
utstring_printf(face, "%s", j->string_);
}
}
fprintf(stderr, "* CARD: %s-%s %s\n", utstring_body(username), utstring_body(device), utstring_body(name));
#ifdef HAVE_REDIS
if (ud->useredis) {
redis_ping(&ud->redis);
r = redisCommand(ud->redis, "HMSET card:%s name %s face %s", utstring_body(username), utstring_body(name), utstring_body(face));
if (r) { ; } /* FIXME */
}
#endif
/* We have a base64-encoded "face". Decode it and store binary image */
if ((img = malloc(utstring_len(face))) != NULL) {
int imglen;
if ((imglen = base64_decode(utstring_body(face), img)) > 0) {
if (ud->usefiles) {
if ((fp = pathn("wb", "photos", username, NULL, "png")) != NULL) {
fwrite(img, sizeof(char), imglen, fp);
fclose(fp);
}
}
#ifdef HAVE_REDIS
if (ud->useredis) {
/* Add photo (binary) to Redis as photo:username */
redis_ping(&ud->redis);
r = redisCommand(ud->redis, "SET photo:%s %b", utstring_body(username), img, imglen);
}
#endif
}
free(img);
}
cleanup:
json_delete(json);
}
void do_msg(void *userdata, UT_string *username, UT_string *device, char *payload)
{
struct udata *ud = (struct udata *)userdata;
JsonNode *json, *j;
FILE *fp;
if ((json = json_decode(payload)) == NULL) {
2015-08-14 23:41:44 -07:00
fprintf(stderr, "Can't decode MSG payload for username=%s, device=%s\n",
utstring_body(username), utstring_body(device));
2015-08-14 09:40:35 -07:00
return;
}
if (ud->skipdemo && (json_find_member(json, "_demo") != NULL)) {
goto cleanup;
}
if ((j = json_find_member(json, "_type")) == NULL) {
goto cleanup;
}
if ((j->tag != JSON_STRING) || (strcmp(j->string_, "msg") != 0)) {
goto cleanup;
}
if (ud->usefiles) {
/* I know the payload is valid JSON: write message */
if ((fp = pathn("ab", "msg", username, NULL, "json")) != NULL) {
fprintf(fp, "%s\n", payload);
fclose(fp);
}
}
fprintf(stderr, "* MSG: %s-%s\n", utstring_body(username), utstring_body(device));
cleanup:
json_delete(json);
}
void republish(struct mosquitto *mosq, struct udata *userdata, char *username, char *topic, double lat, double lon, char *cc, char *addr, long tst, char *t)
{
struct udata *ud = (struct udata *)userdata;
JsonNode *json;
static UT_string *newtopic = NULL;
char *payload;
if (ud->pubprefix == NULL)
return;
if ((json = json_mkobject()) == NULL) {
return;
}
utstring_renew(newtopic);
utstring_printf(newtopic, "%s/%s", ud->pubprefix, topic);
json_append_member(json, "username", json_mkstring(username));
json_append_member(json, "topic", json_mkstring(topic));
json_append_member(json, "cc", json_mkstring(cc));
json_append_member(json, "addr", json_mkstring(addr));
json_append_member(json, "t", json_mkstring(t));
json_append_member(json, "tst", json_mknumber(tst));
json_append_member(json, "lat", json_mknumber(lat));
json_append_member(json, "lon", json_mknumber(lon));
if ((payload = json_stringify(json, NULL)) != NULL) {
mosquitto_publish(mosq, NULL, utstring_body(newtopic),
strlen(payload), payload, 1, true);
fprintf(stderr, "%s %s\n", utstring_body(newtopic), payload);
free(payload);
}
json_delete(json);
}
/*
* Decode OwnTracks CSV and return a new JsonNode to a JSON object.
*/
#define MILL 1000000.0
JsonNode *csv(char *payload, char *tid, char *t, double *lat, double *lon, long *tst)
{
JsonNode *json = NULL;
double dist = 0;
char tmptst[40];
double vel, trip, alt, cog;
if (sscanf(payload, "%[^,],%[^,],%[^,],%lf,%lf,%lf,%lf,%lf,%lf,%lf", tid, tmptst, t, lat, lon, &cog, &vel, &alt, &dist, &trip) != 10) {
fprintf(stderr, "**** payload not CSV: %s\n", payload);
return (NULL);
}
*lat /= MILL;
*lon /= MILL;
cog *= 10;
alt *= 10;
trip *= 1000;
*tst = strtoul(tmptst, NULL, 16);
json = json_mkobject();
json_append_member(json, "_type", json_mkstring("location"));
json_append_member(json, "t", json_mkstring(t));
json_append_member(json, "tid", json_mkstring(tid));
2015-08-16 02:24:37 -07:00
json_append_member(json, "tst", json_mknumber(*tst));
2015-08-14 09:40:35 -07:00
json_append_member(json, "lat", json_mknumber(*lat));
json_append_member(json, "lon", json_mknumber(*lon));
json_append_member(json, "cog", json_mknumber(cog));
json_append_member(json, "vel", json_mknumber(vel));
json_append_member(json, "alt", json_mknumber(alt));
json_append_member(json, "dist", json_mknumber(dist));
json_append_member(json, "trip", json_mknumber(trip));
json_append_member(json, "csv", json_mkbool(1));
return (json);
}
#define RECFORMAT "%s\t%-18s\t%s\n"
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m)
{
JsonNode *json;
char tid[BUFSIZ], t[BUFSIZ], *p;
double lat, lon;
long tst;
struct udata *ud = (struct udata *)userdata;
FILE *fp;
char **topics;
int count = 0, cached;
static UT_string *basetopic = NULL, *username = NULL, *device = NULL, *addr = NULL, *cc = NULL, *ghash = NULL, *ts = NULL;
2015-08-15 06:19:27 -07:00
static UT_string *reltopic = NULL;
2015-08-14 09:40:35 -07:00
char *jsonstring;
time_t now;
/*
* mosquitto_message->
* int mid;
* char *topic;
* void *payload;
* int payloadlen;
* int qos;
* bool retain;
*/
if (m->payloadlen == 0) {
return;
}
2015-08-14 23:14:34 -07:00
printf("%s %s\n", m->topic, bindump(m->payload, m->payloadlen)); fflush(stdout);
2015-08-14 09:40:35 -07:00
utstring_renew(ts);
utstring_renew(basetopic);
utstring_renew(username);
utstring_renew(device);
if (mosquitto_sub_topic_tokenise(m->topic, &topics, &count) != MOSQ_ERR_SUCCESS) {
return;
}
time(&now);
2015-08-15 01:46:46 -07:00
monitorhook(ud, now, m->topic);
2015-08-15 01:42:55 -07:00
2015-08-14 09:40:35 -07:00
/* FIXME: handle null leading topic `/` */
utstring_printf(basetopic, "%s/%s/%s", topics[0], topics[1], topics[2]);
utstring_printf(username, "%s", topics[1]);
utstring_printf(device, "%s", topics[2]);
if ((count == TOPIC_PARTS) && (strcmp(topics[count-1], TOPIC_SUFFIX) == 0)) {
do_info(ud, username, device, m->payload);
}
/* owntracks/user/device/msg */
if ((count == TOPIC_PARTS) && (strcmp(topics[count-1], "msg") == 0)) {
if (m->retain == FALSE || ud->ignoreretained == FALSE) {
do_msg(ud, username, device, m->payload);
}
}
2015-08-15 06:19:27 -07:00
/*
* Determine "relative topic", relative to base, i.e. whatever comes behind
* ownntracks/user/device/
*/
utstring_renew(reltopic);
2015-08-14 09:40:35 -07:00
if (count != 3) {
/*
* Not a normal location publish. Build up a string consisting of the remaining
* topic parts, i.e. whatever is after base topic, and record with whatever
* (hopefully non-binary) payload we got.
*/
int j;
for (j = 3; j < count; j++) {
2015-08-15 06:19:27 -07:00
utstring_printf(reltopic, "%s%c", topics[j], (j < count - 1) ? '/' : ' ');
2015-08-14 09:40:35 -07:00
}
if (ud->usefiles) {
if ((fp = pathn("a", "rec", username, device, "rec")) != NULL) {
2015-08-15 06:19:27 -07:00
fprintf(fp, RECFORMAT, isotime(now), utstring_body(reltopic), bindump(m->payload, m->payloadlen));
2015-08-14 09:40:35 -07:00
fclose(fp);
}
}
mosquitto_sub_topic_tokens_free(&topics, count);
return;
}
2015-08-15 07:26:41 -07:00
if (utstring_len(reltopic) == 0)
utstring_printf(reltopic, "-");
2015-08-14 09:40:35 -07:00
mosquitto_sub_topic_tokens_free(&topics, count);
if (m->retain == TRUE && ud->ignoreretained) {
return;
}
2015-08-15 00:09:09 -07:00
/*
* Try to decode JSON payload to find _type: location. If that doesn't work,
* see if it's OwnTracks' Greenwich CSV
*/
2015-08-14 09:40:35 -07:00
json = extract(ud, m->payload, tid, t, &lat, &lon, &tst);
if (json == NULL) {
/* Is it OwnTracks Greenwich CSV? */
if ((json = csv(m->payload, tid, t, &lat, &lon, &tst)) == NULL) {
/* It's not JSON or it's not a location CSV; store it */
if ((fp = pathn("a", "rec", username, device, "rec")) != NULL) {
fprintf(fp, RECFORMAT, isotime(now),
2015-08-15 06:19:27 -07:00
utstring_body(reltopic),
bindump(m->payload, m->payloadlen));
fclose(fp);
}
2015-08-14 09:40:35 -07:00
return;
}
// fprintf(stderr, "+++++ %s\n", json_stringify(json, NULL));
2015-08-14 09:40:35 -07:00
}
#if 0
if (*t && (!strcmp(t, "p") || !strcmp(t, "b"))) {
// fprintf(stderr, "Ignore `t:%s' for %s\n", t, m->topic);
json_delete(json);
return;
}
#endif
2015-08-15 00:09:09 -07:00
/*
* We are now processing a _type location.
*/
2015-08-14 09:40:35 -07:00
utstring_renew(ghash);
p = geohash_encode(lat, lon, GEOHASH_PREC);
if (p != NULL) {
utstring_printf(ghash, "%s", p);
free(p);
}
utstring_renew(addr);
utstring_renew(cc);
cached = FALSE;
/* FIXME: */
cached = ghash_readcache(ud, utstring_body(ghash), addr, cc);
if (!cached) {
JsonNode *geo;
if ((geo = revgeo(lat, lon, addr, cc)) != NULL) {
2015-08-15 00:09:09 -07:00
// fprintf(stderr, "REVGEO: %s\n", utstring_body(addr));
2015-08-14 09:40:35 -07:00
ghash_storecache(ud, geo, utstring_body(ghash), utstring_body(addr), utstring_body(cc));
json_delete(geo);
}
}
/*
* We have exactly three topic parts (owntracks/user/device), and valid JSON.
* Add a few bits to the JSON, and record it on a per-user/device basis.
*/
json_append_member(json, "ghash", json_mkstring(utstring_body(ghash)));
if ((jsonstring = json_stringify(json, NULL)) != NULL) {
#ifdef HAVE_REDIS
if (ud->useredis) {
/* add last to Redis as "lastpos:username-device" */
last_storeredis(&ud->redis, utstring_body(username), utstring_body(device), jsonstring);
}
#endif
if (ud->usefiles) {
if ((fp = pathn("a", "rec", username, device, "rec")) != NULL) {
// fprintf(fp, "%s\n", jsonstring);
fprintf(fp, RECFORMAT, isotime(now), "*", jsonstring);
/* Now safewrite the last location */
2015-08-15 03:40:54 -07:00
utstring_printf(ts, "%s/last/%s/%s",
JSONDIR, utstring_body(username), utstring_body(device));
2015-08-14 09:40:35 -07:00
if (mkpath(utstring_body(ts)) < 0) {
perror(utstring_body(ts));
}
utstring_printf(ts, "/%s-%s.json",
utstring_body(username), utstring_body(device));
safewrite(utstring_body(ts), jsonstring);
fclose(fp);
}
}
free(jsonstring);
}
/* publish */
republish(mosq, ud, utstring_body(username), m->topic, lat, lon, utstring_body(cc), utstring_body(addr), tst, t);
fprintf(stderr, "%c %s %-7s %s [%s] %s\n",
(cached) ? '*' : '-',
utstring_body(ghash), m->topic, ltime(tst), utstring_body(cc), utstring_body(addr));
json_delete(json);
}
void on_connect(struct mosquitto *mosq, void *userdata, int rc)
{
struct udata *ud = (struct udata *)userdata;
int mid;
char **m = NULL;
while ((m = (char **)utarray_next(ud->topics, m))) {
fprintf(stderr, "Subscribing to %s\n", *m);
mosquitto_subscribe(mosq, &mid, *m, 0);
}
}
void on_disconnect(struct mosquitto *mosq, void *userdata, int reason)
{
#ifdef HAVE_REDIS
struct udata *ud = (struct udata *)userdata;
#endif
fprintf(stderr, "Disconnected. Reason: %d [%s]\n", reason, mosquitto_strerror(reason));
if (reason == 0) { // client wish
#ifdef HAVE_REDIS
redisFree(ud->redis);
#endif
//SQL sqlite3_finalize(ud->raw);
//SQL sqlite3_finalize(ud->lastloc);
//SQL sqlite3_close(ud->db);
run = FALSE;
}
}
static void catcher(int sig)
{
fprintf(stderr, "Going down on signal %d\n", sig);
exit(1);
}
void usage(char *prog)
{
fprintf(stderr, "Usage: %s [-D] [-F] [-N] [-P prefix] [-R] topic [topic...]\n", prog);
exit(2);
}
int main(int argc, char **argv)
{
struct mosquitto *mosq = NULL;
char err[1024], *p, *username, *password, *cafile;
char *hostname = "localhost";
int port = 1883;
int rc, i, ch;
static struct udata udata, *ud = &udata;
struct utsname uts;
UT_string *clientid;
#ifdef HAVE_REDIS
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
#endif
char *progname = *argv;
udata.usefiles = TRUE;
udata.ignoreretained = TRUE;
udata.pubprefix = NULL;
udata.skipdemo = TRUE;
udata.useredis = TRUE;
while ((ch = getopt(argc, argv, "DFRNP:")) != EOF) {
switch (ch) {
case 'D':
ud->skipdemo = FALSE;
break;
case 'F':
ud->usefiles = FALSE;
break;
case 'N':
ud->useredis = FALSE;
break;
case 'P':
udata.pubprefix = strdup(optarg);
break;
case 'R':
ud->ignoreretained = FALSE;
break;
default:
usage(*argv);
break;
}
}
argc -= (optind - 1);
argv += (optind - 1);
if (argc < 2) {
usage(progname);
return (-1);
}
if ((p = getenv("OTR_HOST")) != NULL) {
hostname = strdup(p);
}
if ((p = getenv("OTR_PORT")) != NULL) {
port = atoi(p);
}
revgeo_init();
utstring_new(clientid);
mosquitto_lib_init();
utstring_printf(clientid, "ot-recorder");
if (uname(&uts) == 0) {
utstring_printf(clientid, "-%s", uts.nodename);
}
utstring_printf(clientid, "-%d", getpid());
signal(SIGINT, catcher);
#ifdef HAVE_REDIS
ud->redis = redisConnectWithTimeout("localhost", 6379, timeout);
#endif
mosq = mosquitto_new(utstring_body(clientid), true, (void *)&udata);
if (!mosq) {
fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
/*
* Pushing list of topics into the array so that we can (re)subscribe on_connect()
*/
utarray_new(ud->topics, &ut_str_icd);
for (i = 1; i < argc; i++) {
utarray_push_back(ud->topics, &argv[i]);
}
mosquitto_reconnect_delay_set(mosq,
2, /* delay */
20, /* delay_max */
0); /* exponential backoff */
mosquitto_message_callback_set(mosq, on_message);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
if ((username = getenv("OTR_USER")) != NULL) {
if ((password = getenv("OTR_PASS")) != NULL) {
mosquitto_username_pw_set(mosq, username, password);
}
}
cafile = getenv("OTR_CAFILE");
if (cafile && *cafile) {
rc = mosquitto_tls_set(mosq,
cafile, /* cafile */
NULL, /* capath */
NULL, /* certfile */
NULL, /* keyfile */
NULL /* pw_callback() */
);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "Cannot set TLS CA: %s (check path names)\n",
mosquitto_strerror(rc));
exit(3);
}
mosquitto_tls_opts_set(mosq,
SSL_VERIFY_PEER,
NULL, /* tls_version: "tlsv1.2", "tlsv1" */
NULL /* ciphers */
);
}
rc = mosquitto_connect(mosq, hostname, port, 60);
if (rc) {
if (rc == MOSQ_ERR_ERRNO) {
strerror_r(errno, err, 1024);
fprintf(stderr, "Error: %s\n", err);
} else {
fprintf(stderr, "Unable to connect (%d) [%s].\n", rc, mosquitto_strerror(rc));
}
mosquitto_lib_cleanup();
return rc;
}
while (run) {
mosquitto_loop_forever(mosq, -1, 1);
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return (0);
}