class Mqtt { /** * Connect to MQTT server. */ static connect() { // make an api call to get the mqtt info this.connected = false; this.subscribes = {}; Core.API.call({ url: `/api/${Page.Meta.api_version}/server/configuration/index.php`, data: { func: 'apiGetMqttConnectionInfo' }, callback: (data) => { setTimeout( () => { this.client = new Paho.MQTT.Client(data.mqtt_uri, Page.Meta.page_id); this.client.onMessageArrived = (message) => this.onMessageArrived(message); this.client.connect({ userName: data.mqtt_username, password: data.mqtt_password, useSSL: true, onSuccess: () => this.onConnected(), onFailure: () => this.reconnect() }); this.client.onConnectionLost = (error) => { this.onConnectionLost(); console.log(error); }; }, 1000 ); }, fadeParameters: { hideLoader: true } }); } /** * Reconnect MQTT. */ static reconnect() { this.reconnectInterval = setInterval( () => Mqtt.connect(), 5000 ); } /** * Process connection success. */ static onConnected() { if (this.reconnectInterval) { clearInterval(this.reconnectInterval); this.reconnectInterval = null; } this.connected = true console.info('MQTT connected.'); for (let context in this.subscribes) { for (let topic in this.subscribes[context]) { console.info(`Subscribing to ${topic}`); this.client.subscribe( topic, { qos: 2, onFailure: (invocationContext, errorCode, errorMessage) => console.error(`Failed to subscribe to ${topic}: ${errorMessage}`) } ); } } } /** * Process received message. * @param {object} message */ static onMessageArrived(message) { for (let context in this.subscribes) { if (this.subscribes[context][message.destinationName]) { this.subscribes[context][message.destinationName](message); } } } /** * Process connection lost. */ static onConnectionLost() { this.connected = false; this.reconnect(); } /** * Subscribe to topic. * @param {string} context * @param {string} topic * @param {callable} callback */ static subscribe(context, topic, callback) { if (!(context in this.subscribes)) { this.subscribes[context] = {}; } this.subscribes[context][topic] = callback; if (this.connected) { console.info(`Subscribing to ${topic}`); this.client.subscribe(topic, { qos: 2, onFailure: (invocationContext, errorCode, errorMessage) => console.error(`Failed to subscribe to ${topic}: ${errorMessage}`) }); } } /** * Unsubscribe to topic. * @param {string} context * @param {string} topic */ static unsubscribe(context, topic) { if (!(context in this.subscribes)) { return; } if (!(topic in this.subscribes[context])) { return; } this.client.subscribe( topic, { onFailure: (invocationContext, errorCode, errorMessage) => console.error(`Failed to unsubscribe to ${topic}: ${errorMessage}`) } ); console.info(`MQTT: Unsubscribing ${context}:${topic}`); delete this.subscribes[context][topic]; if (_.isEmpty(this.subscribes[context])) { delete this.subscribes[context]; } } /** * Unsubscribe all topics from context. * @param {string} context */ static unsubscribeContext(context) { if (!(context in this.subscribes)) { return; } for (let topic in this.subscribes[context]) { this.unsubscribe( context, topic ); } } /** * Publish MQTT message */ static publish(topic, data) { let message = new Paho.MQTT.Message(data); message.destinationName = topic; message.qos = 2; this.client.send(message); } }