zigbee2mqtt/lib/mqtt.js
Koen Kanters d83085ea7f
Zigbee-herdsman (#1945)
* Update zigbee-herdsman and zigbee-shepherd-converters.

* Force Aqara S2 Lock endvices (#1764)

* Start on zigbee-herdsman controller refactor.

* More updates.

* Cleanup zapp.

* updates.

* Propagate adapter disconnected event.

* Updates.

* Initial refactor to zigbee-herdsman.

* Refactor deviceReceive to zigbee-herdsman.

* Rename

* Refactor deviceConfigure.

* Finish bridge config.

* Refactor availability.

* Active homeassistant extension and more refactors.

* Refactor groups.

* Enable soft reset.

* Activate group membership

* Start on tests.

* Enable reporting.

* Add more controller tests.

* Add more tests

* Fix linting error.

* Data en deviceReceive tests.

* Move to zigbee-herdsman-converters.

* More device publish tests.

* Cleanup dependencies.

* Bring device publish coverage to 100.

* Bring home assistant test coverage to 100.

* Device configure tests.

* Attempt to fix tests.

* Another attempt.

* Another one.

* Another one.

* Another.

* Add wait.

* Longer wait.

* Debug.

* Update dependencies.

* Another.

* Begin on availability tests.

* Improve availability tests.

* Complete deviceAvailability tests.

* Device bind tests.

* More tests.

* Begin networkmap refactors.

* start on networkmap tests.

* Network map tests.

* Add utils tests.

* Logger tests.

* Settings and logger tests.

* Ignore some stuff for coverage and add todos.

* Add remaining missing tests.

* Enforce 100% test coverage.

* Start on groups test and refactor entityPublish to resolveEntity

* Remove joinPathStorage, not used anymore as group information is stored into zigbee-herdsman database.

* Fix linting issues.

* Improve tests.

* Add groups.

* fix group membership.

* Group: log names.

* Convert MQTT message to string by default.

* Fix group name.

* Updates.

* Revert configuration.yaml.

* Add new line.

* Fixes.

* Updates.

* Fix tests.

* Ignore soft reset extension.
2019-09-09 19:48:09 +02:00

118 lines
3.8 KiB
JavaScript

const mqtt = require('mqtt');
const logger = require('./util/logger');
const settings = require('./util/settings');
const fs = require('fs');
const events = require('events');
class MQTT extends events.EventEmitter {
constructor() {
super();
this.onMessage = this.onMessage.bind(this);
}
async connect() {
const mqttSettings = settings.get().mqtt;
logger.info(`Connecting to MQTT server at ${mqttSettings.server}`);
const options = {
will: {
topic: `${settings.get().mqtt.base_topic}/bridge/state`,
payload: 'offline',
retain: true,
},
};
if (mqttSettings.ca) {
logger.debug(`MQTT SSL/TLS: Path to CA certificate = ${mqttSettings.ca}`);
options.ca = fs.readFileSync(mqttSettings.ca);
}
if (mqttSettings.key && mqttSettings.cert) {
logger.debug(`MQTT SSL/TLS: Path to client key = ${mqttSettings.key}`);
logger.debug(`MQTT SSL/TLS: Path to client certificate = ${mqttSettings.cert}`);
options.key = fs.readFileSync(mqttSettings.key);
options.cert = fs.readFileSync(mqttSettings.cert);
}
if (mqttSettings.user && mqttSettings.password) {
options.username = mqttSettings.user;
options.password = mqttSettings.password;
}
if (mqttSettings.client_id) {
logger.debug(`Using MQTT client ID: '${mqttSettings.client_id}'`);
options.clientId = mqttSettings.client_id;
}
if (mqttSettings.hasOwnProperty('reject_unauthorized') && !mqttSettings.reject_unauthorized) {
logger.debug(`MQTT reject_unauthorized set false, ignoring certificate warnings.`);
options.rejectUnauthorized = false;
}
// Set timer at interval to check if connected to MQTT server.
const interval = 10 * 1000; // seconds * 1000.
this.connectionTimer = setInterval(() => {
if (this.client.reconnecting) {
logger.error('Not connected to MQTT server!');
}
}, interval);
return new Promise((resolve) => {
this.client = mqtt.connect(mqttSettings.server, options);
this.client.on('connect', () => {
logger.info('Connected to MQTT server');
this.publish('bridge/state', 'online', {retain: true, qos: 0});
resolve();
});
this.client.on('message', this.onMessage);
});
}
async disconnect() {
clearTimeout(this.connectionTimer);
this.connectionTimer = null;
await this.publish('bridge/state', 'offline', {retain: true, qos: 0});
logger.info('Disconnecting from MQTT server');
this.client.end();
}
subscribe(topic) {
this.client.subscribe(topic);
}
onMessage(topic, message) {
this.emit('message', {topic, message: message.toString()});
}
async publish(topic, payload, options, base=settings.get().mqtt.base_topic) {
topic = `${base}/${topic}`;
options = {qos: 0, retain: false, ...options};
if (!this.client || this.client.reconnecting) {
logger.error(`Not connected to MQTT server!`);
logger.error(`Cannot send message: topic: '${topic}', payload: '${payload}`);
return;
}
logger.info(`MQTT publish: topic '${topic}', payload '${payload}'`);
return new Promise((resolve) => {
this.client.publish(topic, payload, options, () => resolve());
});
}
log(type, message, meta=null) {
const payload = {type, message};
if (meta) {
payload.meta = meta;
}
this.publish('bridge/log', JSON.stringify(payload), {retain: false});
}
}
module.exports = MQTT;