Availability refactor part 1 (#8243)

* Initial

* Updates

* update

* Updates

* Updates

* Updates
This commit is contained in:
Koen Kanters 2021-08-14 16:04:03 +02:00 committed by GitHub
parent f74047d090
commit c18a698e6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 488 additions and 112 deletions

View File

@ -21,6 +21,7 @@ const ExtensionBridgeLegacy = require('./extension/legacy/bridgeLegacy');
const ExtensionBridge = require('./extension/bridge');
const ExtensionGroups = require('./extension/groups');
const ExtensionAvailability = require('./extension/availability');
const ExtensionAvailabilityNew = require('./extension/availabilityNew');
const ExtensionBind = require('./extension/bind');
const ExtensionReport = require('./extension/legacy/report');
const ExtensionOnEvent = require('./extension/onEvent');
@ -32,7 +33,7 @@ const AllExtensions = [
ExtensionPublish, ExtensionReceive, ExtensionNetworkMap, ExtensionSoftReset, ExtensionHomeAssistant,
ExtensionConfigure, ExtensionDeviceGroupMembership, ExtensionBridgeLegacy, ExtensionBridge, ExtensionGroups,
ExtensionAvailability, ExtensionBind, ExtensionReport, ExtensionOnEvent, ExtensionOTAUpdate,
ExtensionExternalConverters, ExtensionFrontend, ExtensionExternalExtension,
ExtensionExternalConverters, ExtensionFrontend, ExtensionExternalExtension, ExtensionAvailabilityNew,
];
class Controller {
@ -87,8 +88,12 @@ class Controller {
this.extensions.push(new ExtensionSoftReset(...args));
}
if (settings.get().advanced.availability_timeout) {
this.extensions.push(new ExtensionAvailability(...args));
if (settings.get().advanced.availability_timeout || settings.get().availability) {
if (settings.get().experimental.availability_new) {
this.extensions.push(new ExtensionAvailabilityNew(...args));
} else {
this.extensions.push(new ExtensionAvailability(...args));
}
}
this.extensions.push(new ExtensionExternalExtension(...args));
}

View File

@ -0,0 +1,130 @@
import ExtensionTS from './extensionts';
import logger from '../util/logger';
const hours = (hours: number): number => 1000 * 60 * 60 * hours;
const minutes = (minutes: number): number => 1000 * 60 * minutes;
const seconds = (seconds: number): number => 1000 * seconds;
const ActiveTimeout = minutes(10);
const PassiveTimeout = hours(25);
class AvailabilityNew extends ExtensionTS {
private timers: {[s: string]: NodeJS.Timeout} = {};
private availabilityCache: {[s: string]: boolean} = {};
private pingQueue: ResolvedEntity[] = [];
private pingQueueExecuting = false;
constructor(zigbee: TempZigbee, mqtt: TempMQTT, state: TempState,
publishEntityState: TempPublishEntityState, eventBus: TempEventBus) {
super(zigbee, mqtt, state, publishEntityState, eventBus);
this.lastSeenChanged = this.lastSeenChanged.bind(this);
logger.warn('Using experimental new availability feature');
}
private isActiveDevice(re: ResolvedEntity): boolean {
return (re.device.type === 'Router' && re.device.powerSource !== 'Battery') ||
re.device.powerSource === 'Mains (single phase)';
}
private isAvailable(re: ResolvedEntity): boolean {
const ago = Date.now() - re.device.lastSeen;
if (this.isActiveDevice(re)) {
logger.debug(`Active device '${re.name}' was last seen '${(ago / minutes(1)).toFixed(2)}' minutes ago.`);
return ago < ActiveTimeout;
} else {
logger.debug(`Passive device '${re.name}' was last seen '${(ago / hours(1)).toFixed(2)}' hours ago.`);
return ago < PassiveTimeout;
}
}
private resetTimer(re: ResolvedEntity): void {
clearTimeout(this.timers[re.device.ieeeAddr]);
// If the timer triggers, the device is not avaiable anymore otherwise resetTimer already have been called
if (this.isActiveDevice(re)) {
// If device did not check in, ping it, if that fails it will be marked as offline
this.timers[re.device.ieeeAddr] = setTimeout(
() => this.addToPingQueue(re), ActiveTimeout + seconds(1));
} else {
this.timers[re.device.ieeeAddr] = setTimeout(
() => this.publishAvailability(re), PassiveTimeout + seconds(1));
}
}
private addToPingQueue(re: ResolvedEntity): void {
this.pingQueue.push(re);
this.pingQueueExecuteNext();
}
private removeFromPingQueue(re: ResolvedEntity): void {
const index = this.pingQueue.findIndex((r) => r.device.ieeeAddr === re.device.ieeeAddr);
index != -1 && this.pingQueue.splice(index, 1);
}
private async pingQueueExecuteNext(): Promise<void> {
if (this.pingQueue.length === 0 || this.pingQueueExecuting) return;
this.pingQueueExecuting = true;
const re = this.pingQueue[0];
try {
await re.device.ping();
logger.debug(`Succesfully pinged '${re.name}'`);
} catch {
logger.error(`Failed to ping '${re.name}'`);
}
this.publishAvailability(re);
this.resetTimer(re);
this.removeFromPingQueue(re);
this.pingQueueExecuting = false;
this.pingQueueExecuteNext();
}
override onMQTTConnected(): void {
for (const device of this.zigbee.getClients()) {
const re: ResolvedEntity = this.zigbee.resolveEntity(device);
this.resetTimer(re);
// Publish initial availablility
this.publishAvailability(re);
// If an active device is initially unavailable, ping it.
if (this.isActiveDevice(re) && !this.isAvailable(re)) {
this.addToPingQueue(re);
}
}
}
override onZigbeeStarted(): void {
this.zigbee.on('lastSeenChanged', this.lastSeenChanged);
}
private publishAvailability(re: ResolvedEntity): void {
const available = this.isAvailable(re);
if (this.availabilityCache[re.device.ieeeAddr] == available) {
return;
}
const topic = `${re.name}/availability`;
const payload = available ? 'online' : 'offline';
this.availabilityCache[re.device.ieeeAddr] = available;
this.mqtt.publish(topic, payload, {retain: true, qos: 0});
}
private lastSeenChanged(data: {device: Device}): void {
const re = this.zigbee.resolveEntity(data.device);
// Remove from ping queue, not necessary anymore since we know the device is online.
this.removeFromPingQueue(re);
this.resetTimer(re);
this.publishAvailability(re);
}
override stop(): void {
Object.values(this.timers).forEach((t) => clearTimeout(t));
this.zigbee.removeListener('lastSeenChanged', this.lastSeenChanged);
super.stop();
}
}
module.exports = AvailabilityNew;

View File

@ -0,0 +1,69 @@
abstract class ExtensionTS {
protected zigbee: TempZigbee;
protected mqtt: TempMQTT;
protected state: TempState;
protected publishEntityState: TempPublishEntityState;
protected eventBus: TempEventBus;
/**
* Besides intializing variables, the constructor should do nothing!
*
* @param {Zigbee} zigbee Zigbee controller
* @param {MQTT} mqtt MQTT controller
* @param {State} state State controller
* @param {Function} publishEntityState Method to publish device state to MQTT.
* @param {EventBus} eventBus The event bus
*/
constructor(zigbee: TempZigbee, mqtt: TempMQTT, state: TempState,
publishEntityState: TempPublishEntityState, eventBus: TempEventBus) {
this.zigbee = zigbee;
this.mqtt = mqtt;
this.state = state;
this.publishEntityState = publishEntityState;
this.eventBus = eventBus;
}
/**
* This method is called by the controller once Zigbee has been started.
*/
/* istanbul ignore next */
onZigbeeStarted(): void {}
/**
* This method is called by the controller once connected to the MQTT server.
*/
/* istanbul ignore next */
onMQTTConnected(): void {}
/**
* Is called when a Zigbee message from a device is received.
* @param {string} type Type of the message
* @param {Object} data Data of the message
* @param {Object?} resolvedEntity Resolved entity returned from this.zigbee.resolveEntity()
* @param {Object?} settingsDevice Device settings
*/
/* istanbul ignore next */
/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
onZigbeeEvent(type: string, data: unknown, resolvedEntity: ResolvedEntity): void {}
/**
* Is called when a MQTT message is received
* @param {string} topic Topic on which the message was received
* @param {Object} message The received message
* @return {boolean} if the message was handled
*/
/* istanbul ignore next */
/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
onMQTTMessage(topic: string, message: string): boolean {
return false;
}
/**
* Is called once the extension has to stop
*/
stop(): void {
this.eventBus.removeListenersExtension(this.constructor.name);
}
}
export default ExtensionTS;

View File

@ -855,7 +855,7 @@ class HomeAssistant extends Extension {
// Availability payload
payload.availability = [{topic: `${settings.get().mqtt.base_topic}/bridge/state`}];
if (settings.get().advanced.availability_timeout) {
if (settings.get().advanced.availability_timeout || settings.get().availability) {
payload.availability.push({topic: `${settings.get().mqtt.base_topic}/${friendlyName}/availability`});
}

View File

@ -1,108 +1,142 @@
/* eslint-disable */
import type {Device as ZHDevice} from 'zigbee-herdsman/dist/controller/model';
interface KeyValue {
[s: string]: any,
}
declare global {
type Device = ZHDevice;
interface Settings {
devices?: {[s: string]: {friendly_name: string, retention?: number}},
groups?: {[s: string]: {friendly_name: string, devices?: string[]}},
passlist: string[],
blocklist: string[],
whitelist: string[],
ban: string[],
permit_join: boolean,
frontend?: {
auth_token?: string,
},
mqtt: {
include_device_information: boolean,
force_disable_retain: boolean
version?: number,
user?: string,
password?: string,
},
serial: {
disable_led: boolean,
},
device_options: {[s: string]: unknown},
map_options: {
graphviz: {
colors: {
fill: {
enddevice: string,
coordinator: string,
router: string,
},
font: {
coordinator: string,
router: string,
enddevice: string,
},
line: {
active: string,
inactive: string,
/* eslint-disable */
// Controller
interface KeyValue {
[s: string]: any,
}
interface Settings {
devices?: {[s: string]: {friendly_name: string, retention?: number}},
groups?: {[s: string]: {friendly_name: string, devices?: string[]}},
passlist: string[],
blocklist: string[],
whitelist: string[],
ban: string[],
permit_join: boolean,
frontend?: {
auth_token?: string,
},
mqtt: {
include_device_information: boolean,
force_disable_retain: boolean
version?: number,
user?: string,
password?: string,
},
serial: {
disable_led: boolean,
},
device_options: {[s: string]: unknown},
map_options: {
graphviz: {
colors: {
fill: {
enddevice: string,
coordinator: string,
router: string,
},
font: {
coordinator: string,
router: string,
enddevice: string,
},
line: {
active: string,
inactive: string,
},
},
},
},
},
experimental: {
output: 'json' | 'attribute' | 'attribute_and_json',
},
advanced: {
legacy_api: boolean,
log_rotation: boolean,
log_symlink_current: boolean,
log_output: ('console' | 'file')[],
log_directory: string,
log_file: string,
log_level: 'debug' | 'info' | 'error' | 'warn',
log_syslog: {},
soft_reset_timeout: number,
pan_id: number,
ext_pan_id: number[],
channel: number,
adapter_concurrent: number | null,
adapter_delay: number | null,
availability_timeout: number,
availability_blocklist: string[],
availability_passlist: string[],
availability_blacklist: string[],
availability_whitelist: string[],
cache_state: boolean,
cache_state_persistent: boolean,
cache_state_send_on_startup: boolean,
last_seen: 'disable' | 'ISO_8601' | 'ISO_8601_local' | 'epoch',
elapsed: boolean,
network_key: number[],
report: boolean,
homeassistant_discovery_topic: string,
homeassistant_status_topic: string,
homeassistant_legacy_entity_attributes: boolean,
homeassistant_legacy_triggers: boolean,
timestamp_format: string,
},
ota: {
update_check_interval: number,
disable_automatic_update_check: boolean
},
external_converters: string[],
}
experimental: {
output: 'json' | 'attribute' | 'attribute_and_json',
},
advanced: {
legacy_api: boolean,
log_rotation: boolean,
log_symlink_current: boolean,
log_output: ('console' | 'file')[],
log_directory: string,
log_file: string,
log_level: 'debug' | 'info' | 'error' | 'warn',
log_syslog: {},
soft_reset_timeout: number,
pan_id: number,
ext_pan_id: number[],
channel: number,
adapter_concurrent: number | null,
adapter_delay: number | null,
availability_timeout: number,
availability_blocklist: string[],
availability_passlist: string[],
availability_blacklist: string[],
availability_whitelist: string[],
cache_state: boolean,
cache_state_persistent: boolean,
cache_state_send_on_startup: boolean,
last_seen: 'disable' | 'ISO_8601' | 'ISO_8601_local' | 'epoch',
elapsed: boolean,
network_key: number[],
report: boolean,
homeassistant_discovery_topic: string,
homeassistant_status_topic: string,
homeassistant_legacy_entity_attributes: boolean,
homeassistant_legacy_triggers: boolean,
timestamp_format: string,
},
ota: {
update_check_interval: number,
disable_automatic_update_check: boolean
},
external_converters: string[],
}
interface DeviceSettings {
friendlyName: string,
ID: string,
retention?: number,
}
interface DeviceSettings {
friendlyName: string,
ID: string,
retention?: number,
}
interface GroupSettings {
friendlyName: string,
devices: string[],
ID: number,
}
interface GroupSettings {
friendlyName: string,
devices: string[],
ID: number,
}
type EntitySettings = {
type: 'device' | 'group'
ID: number | string,
friendlyName: string,
}
type EntitySettings = {
type: 'device' | 'group'
ID: number | string,
friendlyName: string,
}
interface ResolvedEntity {
type: 'device' | 'group',
definition?: {model: string},
name: string,
device?: Device,
}
type lastSeenChangedHandler = (data: {device: Device}) => void;
interface TempZigbee {
getClients: () => Device[];
on: (event: 'lastSeenChanged', handler: lastSeenChangedHandler) => void;
removeListener: (event: 'lastSeenChanged', handler: lastSeenChangedHandler) => void;
resolveEntity: (device: Device) => ResolvedEntity;
}
interface TempMQTT {
publish: (topic: string, payload: string, options: {}, base?: string, skipLog?: boolean, skipReceive?: boolean) => Promise<void>;
}
interface TempState {}
interface TempEventBus {
removeListenersExtension: (extension: string) => void;
}
type TempPublishEntityState = () => void;
}

View File

@ -79,6 +79,7 @@ class Zigbee extends events.EventEmitter {
this.herdsman.on('deviceLeave', (data) => this.emit('event', 'deviceLeave', data));
this.herdsman.on('message', (data) => this.emit('event', 'message', data));
this.herdsman.on('permitJoinChanged', (data) => this.emit('permitJoinChanged', data));
this.herdsman.on('lastSeenChanged', (data) => this.emit('lastSeenChanged', data));
logger.info(`zigbee-herdsman started (${startResult})`);
logger.info(`Coordinator firmware version: '${stringify(await this.getCoordinatorVersion())}'`);

View File

@ -87,7 +87,8 @@
}
},
"collectCoverageFrom": [
"lib/**/*.js"
"lib/**/*.js",
"lib/**/*.ts"
]
},
"bin": {

View File

@ -0,0 +1,142 @@
const data = require('./stub/data');
const logger = require('./stub/logger');
const stringify = require('json-stable-stringify-without-jsonify');
const zigbeeHerdsman = require('./stub/zigbeeHerdsman');
zigbeeHerdsman.returnDevices.push('0x000b57fffec6a5b3');
zigbeeHerdsman.returnDevices.push('0x000b57fffec6a5b4');
zigbeeHerdsman.returnDevices.push('0x00124b00120144ae');
zigbeeHerdsman.returnDevices.push('0x0017880104e45517');
const MQTT = require('./stub/mqtt');
const settings = require('../lib/util/settings');
const Controller = require('../lib/controller');
const flushPromises = require('./lib/flushPromises');
const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const mocks = [MQTT.publish, logger.warn, logger.debug];
const hours = (hours) => 1000 * 60 * 60 * hours;
const minutes = (minutes) => 1000 * 60 * minutes;
describe('Availability', () => {
let controller;
let extension;
let devices = zigbeeHerdsman.devices;
let resetExtension = async () => {
await controller.enableDisableExtension(false, 'AvailabilityNew');
await controller.enableDisableExtension(true, 'AvailabilityNew');
extension = controller.extensions.find((e) => e.constructor.name === 'AvailabilityNew');
}
const advancedTime = async (value) => {
jest.setSystemTime(Date.now() + value);
jest.advanceTimersByTime(value);
await flushPromises();
}
beforeAll(async () => {
jest.useFakeTimers('modern');
settings.reRead();
settings.set(['availability'], true);
settings.set(['experimental', 'availability_new'], true);
controller = new Controller(jest.fn(), jest.fn());
await controller.start();
await flushPromises();
});
beforeEach(async () => {
jest.useFakeTimers('modern').setSystemTime(minutes(1));
data.writeDefaultConfiguration();
// @ts-ignore
Object.values(zigbeeHerdsman.devices).forEach(d => d.lastSeen = minutes(1));
mocks.forEach((m) => m.mockClear());
await resetExtension();
// @ts-ignore
Object.values(devices).forEach((d) => d.ping.mockClear());
});
afterEach(async () => {
// @ts-ignore
Object.values(zigbeeHerdsman.devices).forEach(d => d.lastSeen = minutes(1));
})
afterAll(async () => {
jest.useRealTimers();
})
it('Should publish availabilty on startup', async () => {
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
});
it('Should publish offline for active device when not seen for 10 minutes', async () => {
MQTT.publish.mockClear();
await advancedTime(minutes(5));
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(0);
await advancedTime(minutes(7));
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(1);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
});
it('Should publish offline for passive device when not seen for 25 hours', async () => {
MQTT.publish.mockClear();
await advancedTime(hours(26));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
});
it('Should reset ping timer when device last seen changes for active device', async () => {
MQTT.publish.mockClear();
await advancedTime(minutes(5));
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(0);
await zigbeeHerdsman.events.lastSeenChanged({device: devices.bulb_color});
await advancedTime(minutes(7));
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(0);
devices.bulb_color.ping.mockImplementationOnce(() => {throw new Error('failed')});
await advancedTime(minutes(10));
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(1);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
});
it('Should reset ping timer when device last seen changes for passive device', async () => {
MQTT.publish.mockClear();
await advancedTime(hours(24));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
await zigbeeHerdsman.events.lastSeenChanged({device: devices.remote});
await advancedTime(hours(25));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
devices.remote.ping.mockImplementationOnce(() => {throw new Error('failed')});
await advancedTime(hours(3));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
});
it('Should immediately mark device as online when it lastSeen changes', async () => {
MQTT.publish.mockClear();
await advancedTime(minutes(15));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
devices.bulb_color.lastSeen = Date.now();
await zigbeeHerdsman.events.lastSeenChanged({device: devices.bulb_color});
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
});
});

View File

@ -14,13 +14,7 @@
"allowJs": true,
"rootDir": "lib",
"inlineSourceMap": true,
"resolveJsonModule": true,
"paths": {
"*": [
"node_modules/*",
"lib/types/*"
]
},
"resolveJsonModule": true
},
"include": [
"lib/**/*"