Handle zigbee commands sequentially (and refactor it). #529

This commit is contained in:
Koenkk 2018-10-23 20:39:48 +02:00
parent e54de4bea7
commit ddb83cc6e4
7 changed files with 184 additions and 120 deletions

View File

@ -6,14 +6,12 @@ const settings = require('./util/settings');
const ExtensionNetworkMap = require('./extension/networkMap');
const ExtensionSoftReset = require('./extension/softReset');
const ExtensionRouterPollXiaomi = require('./extension/routerPollXiaomi');
const ExtensionDeviceCommand = require('./extension/deviceCommand');
const zigbeeShepherdConverters = require('zigbee-shepherd-converters');
const homeassistant = require('./homeassistant');
const objectAssignDeep = require('object-assign-deep');
const mqttConfigRegex = new RegExp(`${settings.get().mqtt.base_topic}/bridge/config/\\w+`, 'g');
const mqttDeviceRegex = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/set`, 'g');
const mqttDevicePrefixRegex = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/[\\w\\s\\d.-]+/set`, 'g');
const allowedLogLevels = ['error', 'warn', 'info', 'debug'];
@ -67,8 +65,6 @@ class Controller {
// Connect to MQTT broker
const subscriptions = [
`${settings.get().mqtt.base_topic}/+/set`,
`${settings.get().mqtt.base_topic}/+/+/set`,
`${settings.get().mqtt.base_topic}/bridge/config/+`,
];
@ -96,9 +92,10 @@ class Controller {
// Initialize extensions.
this.extensions = [
new ExtensionNetworkMap(this.zigbee, this.mqtt, this.state),
new ExtensionSoftReset(this.zigbee, this.mqtt, this.state),
new ExtensionRouterPollXiaomi(this.zigbee, this.mqtt, this.state),
new ExtensionDeviceCommand(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState),
new ExtensionNetworkMap(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState),
new ExtensionSoftReset(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState),
new ExtensionRouterPollXiaomi(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState),
];
// Resend all cached states.
@ -320,18 +317,15 @@ class Controller {
}
handleMQTTMessage(topic, message) {
logger.debug(`Received mqtt message on topic '${topic}' with data '${message}'`);
logger.debug(`Received MQTT message on '${topic}' with data '${message}'`);
// Find extensions that could handle this.
const extensions = this.extensions.filter((e) => e.handleMQTTMessage);
// Call extensions.
const extensionResults = extensions.map((e) => e.handleMQTTMessage(topic, message));
// Find extensions that can handle MQTT messages and get results
const results = this.extensions
.filter((e) => e.handleMQTTMessage)
.map((e) => e.handleMQTTMessage(topic, message));
if (topic.match(mqttConfigRegex)) {
this.handleMQTTMessageConfig(topic, message);
} else if (topic.match(mqttDeviceRegex) || topic.match(mqttDevicePrefixRegex)) {
this.handleMQTTMessageDevice(topic, message, topic.match(mqttDevicePrefixRegex));
} else if (topic === 'hass/status') {
if (message.toString().toLowerCase() === 'online') {
const timer = setTimeout(() => {
@ -339,8 +333,8 @@ class Controller {
clearTimeout(timer);
}, 20000);
}
} else if (!extensionResults.includes(true)) {
logger.warn(`Cannot handle MQTT message with topic '${topic}' and message '${message}'`);
} else if (!results.includes(true)) {
logger.warn(`Cannot handle MQTT message on '${topic}' with data '${message}'`);
}
}
@ -448,105 +442,6 @@ class Controller {
}
}
handleMQTTMessageDevice(topic, message, withPrefix) {
const friendlyName = topic.split('/').slice(withPrefix ? -3 : -2)[0];
const topicPrefix = withPrefix ? topic.split('/').slice(-2)[0] : '';
// Map friendlyName to deviceID.
const deviceID = settings.getIDByFriendlyName(friendlyName);
if (!deviceID) {
logger.error(`Cannot handle '${topic}' because deviceID of '${friendlyName}' cannot be found`);
return;
}
// Convert the MQTT message to a Zigbee message.
let json = null;
try {
json = JSON.parse(message);
} catch (e) {
// Cannot be parsed to JSON, assume state message.
json = {state: message.toString()};
}
// Find ep for this device
const device = this.zigbee.getDevice(deviceID);
if (!device) {
logger.error(`Failed to find device with deviceID ${deviceID}`);
return;
}
const mappedModel = zigbeeShepherdConverters.findByZigbeeModel(device.modelId);
if (!mappedModel) {
logger.warn(`Device with modelID '${device.modelId}' is not supported.`);
logger.warn(`Please see: https://github.com/Koenkk/zigbee2mqtt/wiki/How-to-support-new-devices`);
return;
}
const ep = mappedModel.ep && mappedModel.ep[topicPrefix] ? mappedModel.ep[topicPrefix] : null;
const published = [];
Object.keys(json).forEach((key) => {
// Find converter for this key.
const converter = mappedModel.toZigbee.find((c) => c.key === key);
if (!converter) {
logger.error(`No converter available for '${key}' (${json[key]})`);
return;
}
const message = converter.convert(json[key], json);
if (!message) {
return;
}
const callback = (error) => {
// Devices do not report when they go off, this ensures state (on/off) is always in sync.
if (!error && (key.startsWith('state') || key === 'brightness')) {
const msg = {};
const _key = topicPrefix ? `state_${topicPrefix}` : 'state';
msg[_key] = key === 'brightness' ? 'ON' : json['state'];
this.mqttPublishDeviceState(device, msg, true);
}
};
this.zigbee.publish(deviceID, message.cid, message.cmd, message.zclData,
message.cfg, ep, message.type, callback);
published.push({message: message, converter: converter});
});
/**
* After publishing a command to a zigbee device we want to monitor the changed attribute(s) so that
* everything stays in sync.
*/
published.forEach((p) => {
let counter = 0;
let secondsToMonitor = 1;
// In case of a transition we need to monitor for the whole transition time.
if (p.message.zclData.hasOwnProperty('transtime')) {
// Note that: transtime 10 = 0.1 seconds, 100 = 1 seconds, etc.
secondsToMonitor = (p.message.zclData.transtime / 10) + 1;
}
const timer = setInterval(() => {
counter++;
// Doing a 'read' will result in the device sending a zigbee message with the current attribute value.
// which will be handled by this.handleZigbeeMessage.
p.converter.attr.forEach((attribute) => {
this.zigbee.read(deviceID, p.message.cid, attribute, ep, () => null);
});
if (counter >= secondsToMonitor) {
clearTimeout(timer);
}
}, 1000);
});
}
mqttPublishDeviceState(device, payload, cache) {
const deviceID = device.ieeeAddr;
const appSettings = settings.get();

View File

@ -0,0 +1,157 @@
const settings = require('../util/settings');
const zigbeeShepherdConverters = require('zigbee-shepherd-converters');
const Queue = require('queue');
const logger = require('../util/logger');
const setTopic = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/set`, 'g');
const setWithPrefixTopic = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/[\\w\\s\\d.-]+/set`, 'g');
class DeviceCommand {
constructor(zigbee, mqtt, state, mqttPublishDeviceState) {
this.zigbee = zigbee;
this.mqtt = mqtt;
this.state = state;
// TODO -> remove this; move to publish device state method to mqtt.js
this.mqttPublishDeviceState = mqttPublishDeviceState;
/**
* Setup command queue.
* The command queue ensures that only 1 command is executed at a time.
* When executing multiple commands at the same time, some commands may fail.
*/
this.queue = new Queue();
this.queue.concurrency = 1;
this.queue.autostart = true;
// Subscribe to topics.
this.mqtt.subscribe(`${settings.get().mqtt.base_topic}/+/set`);
this.mqtt.subscribe(`${settings.get().mqtt.base_topic}/+/+/set`);
}
stop() {
this.queue.stop();
}
handleMQTTMessage(topic, message) {
if (!topic.match(setTopic) && !topic.match(setWithPrefixTopic)) {
// Can't handle this message
return false;
}
// Parse topic
const hasPrefix = topic.match(setWithPrefixTopic);
const friendlyName = topic.split('/').slice(hasPrefix ? -3 : -2)[0];
const prefix = hasPrefix ? topic.split('/').slice(-2)[0] : '';
// Map friendlyName to ieeeAddr.
const ieeeAddr = settings.getIeeAddrByFriendlyName(friendlyName);
if (!ieeeAddr) {
logger.error(`Cannot handle '${topic}' because ieeAddr of '${friendlyName}' cannot be found`);
return;
}
// Get device
const device = this.zigbee.getDevice(ieeeAddr);
if (!device) {
logger.error(`Failed to find device with ieeAddr: '${ieeeAddr}'`);
return;
}
// Map device to a model
const model = zigbeeShepherdConverters.findByZigbeeModel(device.modelId);
if (!model) {
logger.warn(`Device with modelID '${device.modelId}' is not supported.`);
logger.warn(`Please see: https://github.com/Koenkk/zigbee2mqtt/wiki/How-to-support-new-devices`);
return;
}
// Convert the MQTT message to a Zigbee message.
let json = null;
try {
json = JSON.parse(message);
} catch (e) {
// Cannot be parsed to JSON, assume state message.
json = {state: message.toString()};
}
// Determine endpoint to publish to.
const endpoint = model.hasOwnProperty('ep') && model.ep.hasOwnProperty(prefix) ? model.ep[prefix] : null;
// For each key in the JSON message find the matching converter.
Object.keys(json).forEach((key) => {
const converter = model.toZigbee.find((c) => c.key === key);
if (!converter) {
logger.error(`No converter available for '${key}' (${json[key]})`);
return;
}
// Converter didn't return a result, skip
const converted = converter.convert(json[key], json);
if (!converted) {
return;
}
// Add job to queue
this.queue.push((queueCallback) => {
this.zigbee.publish(
ieeeAddr,
converted.cid,
converted.cmd,
converted.zclData,
converted.cfg,
endpoint,
converted.type,
(error) => {
// Devices do not report when they go off, this ensures state (on/off) is always in sync.
if (!error && (key.startsWith('state') || key === 'brightness')) {
const msg = {};
const _key = prefix ? `state_${prefix}` : 'state';
msg[_key] = key === 'brightness' ? 'ON' : json['state'];
this.mqttPublishDeviceState(device, msg, true);
}
queueCallback();
}
);
});
});
return true;
// TODO
// Is this still needed??????
/**
* After publishing a command to a zigbee device we want to monitor the changed attribute(s) so that
* everything stays in sync.
*/
// published.forEach((p) => {
// let counter = 0;
// let secondsToMonitor = 1;
// // In case of a transition we need to monitor for the whole transition time.
// if (p.message.zclData.hasOwnProperty('transtime')) {
// // Note that: transtime 10 = 0.1 seconds, 100 = 1 seconds, etc.
// secondsToMonitor = (p.message.zclData.transtime / 10) + 1;
// }
// const timer = setInterval(() => {
// counter++;
// // Doing a 'read' will result in the device sending a zigbee message with the
// //current attribute value.
// // which will be handled by this.handleZigbeeMessage.
// p.converter.attr.forEach((attribute) => {
// this.zigbee.read(deviceID, p.message.cid, attribute, ep, () => null);
// });
// if (counter >= secondsToMonitor) {
// clearTimeout(timer);
// }
// }, 1000);
// });
}
}
module.exports = DeviceCommand;

View File

@ -30,6 +30,8 @@ class NetworkMap {
return true;
}
return false;
}
raw(zigbee, topology) {

View File

@ -75,4 +75,5 @@ module.exports = {
removeDevice: (id) => removeDevice(id),
getIDByFriendlyName: (friendlyName) => getIDByFriendlyName(friendlyName),
changeFriendlyName: (old, new_) => changeFriendlyName(old, new_),
getIeeAddrByFriendlyName: (friendlyName) => getIDByFriendlyName(friendlyName),
};

View File

@ -177,8 +177,8 @@ class Zigbee {
return this.shepherd.list();
}
getDevice(deviceID) {
return this.getDevices().find((d) => d.ieeeAddr === deviceID);
getDevice(ieeeAddr) {
return this.getDevices().find((d) => d.ieeeAddr === ieeeAddr);
}
getCoordinator() {

10
npm-shrinkwrap.json generated
View File

@ -3544,7 +3544,7 @@
},
"pify": {
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/pify/-/pify-2.3.0.tgz",
"resolved": "http://registry.npmjs.org/pify/-/pify-2.3.0.tgz",
"integrity": "sha1-7RQaasBDqEnqWISY59yosVMw6Qw=",
"dev": true
},
@ -3844,6 +3844,14 @@
"resolved": "https://registry.npmjs.org/q/-/q-1.5.1.tgz",
"integrity": "sha1-fjL3W0E4EpHQRhHxvxQQmsAGUdc="
},
"queue": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/queue/-/queue-4.5.0.tgz",
"integrity": "sha512-DwxpAnqJuoQa+wyDgQuwkSshkhlqIlWEvwvdAY27fDPunZ2cVJzXU4JyjY+5l7zs7oGLaYAQm4MbLOVFAHFBzA==",
"requires": {
"inherits": "~2.0.0"
}
},
"radio-symbol": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/radio-symbol/-/radio-symbol-2.0.0.tgz",

View File

@ -38,6 +38,7 @@
"moment": "*",
"mqtt": "*",
"object-assign-deep": "*",
"queue": "*",
"rimraf": "*",
"winston": "2.4.2",
"zcl-packet": "git+https://github.com/Koenkk/zcl-packet.git#fbd8c936bbd4be0597ad3e934be0ca722b0128a6",