From 21e4a946cd24b8a03ea577352f0271ebf7669ffa Mon Sep 17 00:00:00 2001 From: Michael DÜrre Date: Thu, 8 Apr 2021 07:27:18 +0200 Subject: update odlux for notification change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit update due new notification protocol Issue-ID: CCSDK-3253 Signed-off-by: Michael DÜrre Change-Id: Iad65459fdc18603cd1ddbd97bb2211308744bd8b --- .../framework/src/services/notificationService.ts | 156 +++++++++++---------- 1 file changed, 80 insertions(+), 76 deletions(-) (limited to 'sdnr/wt/odlux/framework/src/services/notificationService.ts') diff --git a/sdnr/wt/odlux/framework/src/services/notificationService.ts b/sdnr/wt/odlux/framework/src/services/notificationService.ts index 30091b574..5625b1f55 100644 --- a/sdnr/wt/odlux/framework/src/services/notificationService.ts +++ b/sdnr/wt/odlux/framework/src/services/notificationService.ts @@ -15,7 +15,6 @@ * the License. * ============LICENSE_END========================================================================== */ -import * as X2JS from 'x2js'; import { ApplicationStore } from '../store/applicationStore'; import { SetWebsocketAction } from '../actions/websocketAction'; @@ -26,81 +25,95 @@ let userLoggedOut = false; let wasWebsocketConnectionEstablished: undefined | boolean; let applicationStore: ApplicationStore | null; - export interface IFormatedMessage { - notifType: string | null; - time: string; + "event-time": string, + "data": { + "counter": number, + "attribute-name": string, + "time-stamp": string, + "object-id-ref": string, + "new-value": string + }, + "node-id": string, + "type": { + "namespace": string, + "revision": string, + "type": string + } } export type SubscriptionCallback = (msg: TMessage) => void; -function formatData(event: MessageEvent): IFormatedMessage | undefined { - - var x2js = new X2JS(); - var jsonObj: { [key: string]: IFormatedMessage } = x2js.xml2js(event.data); - if (jsonObj && typeof (jsonObj) === 'object') { - - const notifType = Object.keys(jsonObj)[0]; - const formated = jsonObj[notifType]; - formated.notifType = notifType; - formated.time = new Date().toISOString(); - return formated; - } - return undefined; - +function setCurrentSubscriptions(notificationSocket: WebSocket) { + const scopesToSubscribe = Object.keys(subscriptions); + if (notificationSocket.readyState === notificationSocket.OPEN) { + const data = { + 'data': 'scopes', + 'scopes':[{ + "schema":{ + "namespace":"*", + "revision":"*", + "notification": scopesToSubscribe + } + }] + }; + notificationSocket.send(JSON.stringify(data)); + return true; + }; + return false; } -export function subscribe(scope: string | string[], callback: SubscriptionCallback): boolean { +function addScope(scope: string | string[], callback: SubscriptionCallback) { const scopes = scope instanceof Array ? scope : [scope]; - // send all new scopes to subscribe - const newScopesToSubscribe: string[] = scopes.reduce((acc: string[], cur: string) => { - const currentCallbacks = subscriptions[cur]; - if (currentCallbacks) { - if (!currentCallbacks.some(c => c === callback)) { - currentCallbacks.push(callback); + // send all new scopes to subscribe + const newScopesToSubscribe: string[] = scopes.reduce((acc: string[], cur: string) => { + const currentCallbacks = subscriptions[cur]; + if (currentCallbacks) { + if (!currentCallbacks.some(c => c === callback)) { + currentCallbacks.push(callback); + } + } else { + subscriptions[cur] = [callback]; + acc.push(cur); } - } else { - subscriptions[cur] = [callback]; - acc.push(cur); - } - return acc; - }, []); + return acc; + }, []); - if (newScopesToSubscribe.length === 0) { - return true; - } + if (newScopesToSubscribe.length === 0) { + return true; + } + return false; +} - return true; +function removeScope(scope: string | string[], callback: SubscriptionCallback) { + const scopes = scope instanceof Array ? scope : [scope]; + scopes.forEach(s => { + const callbacks = subscriptions[s]; + const index = callbacks && callbacks.indexOf(callback); + if (index > -1) { + callbacks.splice(index, 1); + } + if (callbacks.length === 0) { + subscriptions[s] === undefined; + } + }); } +export function subscribe(scope: string | string[], callback: SubscriptionCallback): Promise { + addScope(scope, callback) + return socketReady && socketReady.then((notificationSocket) => { + // send a subscription to all active scopes + return setCurrentSubscriptions(notificationSocket); + }) || true; +} export function unsubscribe(scope: string | string[], callback: SubscriptionCallback): Promise { - return socketReady.then((notificationSocket) => { - const scopes = scope instanceof Array ? scope : [scope]; - scopes.forEach(s => { - const callbacks = subscriptions[s]; - const index = callbacks && callbacks.indexOf(callback); - if (index > -1) { - callbacks.splice(index, 1); - } - if (callbacks.length === 0) { - subscriptions[s] === undefined; - } - }); - + removeScope(scope, callback); + return socketReady && socketReady.then((notificationSocket) => { // send a subscription to all active scopes - const scopesToSubscribe = Object.keys(subscriptions); - if (notificationSocket.readyState === notificationSocket.OPEN) { - const data = { - 'data': 'scopes', - 'scopes': scopesToSubscribe - }; - notificationSocket.send(JSON.stringify(data)); - return true; - } - return false; - }); + return setCurrentSubscriptions(notificationSocket); + }) || true; } export const startNotificationService = (store: ApplicationStore) => { @@ -111,24 +124,24 @@ const connect = (): Promise => { return new Promise((resolve, reject) => { const notificationSocket = new WebSocket(socketUrl); - notificationSocket.onmessage = (event) => { + notificationSocket.onmessage = (event: MessageEvent) => { // process received event - if (typeof event.data === 'string') { - const formated = formatData(event); - if (formated && formated.notifType) { - const callbacks = subscriptions[formated.notifType]; + + if (event.data && typeof event.data === "string" ) { + const msg = JSON.parse(event.data) as IFormatedMessage; + const callbacks = msg?.type?.type && subscriptions[msg.type.type]; if (callbacks) { callbacks.forEach(cb => { // ensure all callbacks will be called try { - return cb(formated); + return cb(msg); } catch (reason) { console.error(reason); } }); } } - } + }; notificationSocket.onerror = function (error) { @@ -148,14 +161,7 @@ const connect = (): Promise => { resolve(notificationSocket); // send a subscription to all active scopes - const scopesToSubscribe = Object.keys(subscriptions); - if (notificationSocket.readyState === notificationSocket.OPEN) { - const data = { - 'data': 'scopes', - 'scopes': scopesToSubscribe - }; - notificationSocket.send(JSON.stringify(data)); - }; + setCurrentSubscriptions(notificationSocket); }; notificationSocket.onclose = function (event) { @@ -171,8 +177,6 @@ const connect = (): Promise => { } - - export const startWebsocketSession = () => { socketReady = connect(); userLoggedOut = false; -- cgit 1.2.3-korg