aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/odlux/framework/src/services/notificationService.ts
diff options
context:
space:
mode:
authorMichael DÜrre <michael.duerre@highstreet-technologies.com>2021-04-08 07:27:18 +0200
committerMichael DÜrre <michael.duerre@highstreet-technologies.com>2021-04-08 07:27:28 +0200
commit21e4a946cd24b8a03ea577352f0271ebf7669ffa (patch)
tree4227d8566770b75c2c25b67c764038288cacfe3d /sdnr/wt/odlux/framework/src/services/notificationService.ts
parenta252be83694ae33260d99d5371ed48c1558aa2e8 (diff)
update odlux for notification change
update due new notification protocol Issue-ID: CCSDK-3253 Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com> Change-Id: Iad65459fdc18603cd1ddbd97bb2211308744bd8b
Diffstat (limited to 'sdnr/wt/odlux/framework/src/services/notificationService.ts')
-rw-r--r--sdnr/wt/odlux/framework/src/services/notificationService.ts156
1 files changed, 80 insertions, 76 deletions
diff --git a/sdnr/wt/odlux/framework/src/services/notificationService.ts b/sdnr/wt/odlux/framework/src/services/notificationService.ts
index 30091b574..5625b1f55 100644
--- a/sdnr/wt/odlux/framework/src/services/notificationService.ts
+++ b/sdnr/wt/odlux/framework/src/services/notificationService.ts
@@ -15,7 +15,6 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-import * as X2JS from 'x2js';
import { ApplicationStore } from '../store/applicationStore';
import { SetWebsocketAction } from '../actions/websocketAction';
@@ -26,81 +25,95 @@ let userLoggedOut = false;
let wasWebsocketConnectionEstablished: undefined | boolean;
let applicationStore: ApplicationStore | null;
-
export interface IFormatedMessage {
- notifType: string | null;
- time: string;
+ "event-time": string,
+ "data": {
+ "counter": number,
+ "attribute-name": string,
+ "time-stamp": string,
+ "object-id-ref": string,
+ "new-value": string
+ },
+ "node-id": string,
+ "type": {
+ "namespace": string,
+ "revision": string,
+ "type": string
+ }
}
export type SubscriptionCallback<TMessage extends IFormatedMessage = IFormatedMessage> = (msg: TMessage) => void;
-function formatData(event: MessageEvent): IFormatedMessage | undefined {
-
- var x2js = new X2JS();
- var jsonObj: { [key: string]: IFormatedMessage } = x2js.xml2js(event.data);
- if (jsonObj && typeof (jsonObj) === 'object') {
-
- const notifType = Object.keys(jsonObj)[0];
- const formated = jsonObj[notifType];
- formated.notifType = notifType;
- formated.time = new Date().toISOString();
- return formated;
- }
- return undefined;
-
+function setCurrentSubscriptions(notificationSocket: WebSocket) {
+ const scopesToSubscribe = Object.keys(subscriptions);
+ if (notificationSocket.readyState === notificationSocket.OPEN) {
+ const data = {
+ 'data': 'scopes',
+ 'scopes':[{
+ "schema":{
+ "namespace":"*",
+ "revision":"*",
+ "notification": scopesToSubscribe
+ }
+ }]
+ };
+ notificationSocket.send(JSON.stringify(data));
+ return true;
+ };
+ return false;
}
-export function subscribe<TMessage extends IFormatedMessage = IFormatedMessage>(scope: string | string[], callback: SubscriptionCallback<TMessage>): boolean {
+function addScope<TMessage extends IFormatedMessage = IFormatedMessage>(scope: string | string[], callback: SubscriptionCallback<TMessage>) {
const scopes = scope instanceof Array ? scope : [scope];
- // send all new scopes to subscribe
- const newScopesToSubscribe: string[] = scopes.reduce((acc: string[], cur: string) => {
- const currentCallbacks = subscriptions[cur];
- if (currentCallbacks) {
- if (!currentCallbacks.some(c => c === callback)) {
- currentCallbacks.push(callback);
+ // send all new scopes to subscribe
+ const newScopesToSubscribe: string[] = scopes.reduce((acc: string[], cur: string) => {
+ const currentCallbacks = subscriptions[cur];
+ if (currentCallbacks) {
+ if (!currentCallbacks.some(c => c === callback)) {
+ currentCallbacks.push(callback);
+ }
+ } else {
+ subscriptions[cur] = [callback];
+ acc.push(cur);
}
- } else {
- subscriptions[cur] = [callback];
- acc.push(cur);
- }
- return acc;
- }, []);
+ return acc;
+ }, []);
- if (newScopesToSubscribe.length === 0) {
- return true;
- }
+ if (newScopesToSubscribe.length === 0) {
+ return true;
+ }
+ return false;
+}
- return true;
+function removeScope<TMessage extends IFormatedMessage = IFormatedMessage>(scope: string | string[], callback: SubscriptionCallback<TMessage>) {
+ const scopes = scope instanceof Array ? scope : [scope];
+ scopes.forEach(s => {
+ const callbacks = subscriptions[s];
+ const index = callbacks && callbacks.indexOf(callback);
+ if (index > -1) {
+ callbacks.splice(index, 1);
+ }
+ if (callbacks.length === 0) {
+ subscriptions[s] === undefined;
+ }
+ });
}
+export function subscribe<TMessage extends IFormatedMessage = IFormatedMessage>(scope: string | string[], callback: SubscriptionCallback<TMessage>): Promise<boolean> {
+ addScope(scope, callback)
+ return socketReady && socketReady.then((notificationSocket) => {
+ // send a subscription to all active scopes
+ return setCurrentSubscriptions(notificationSocket);
+ }) || true;
+}
export function unsubscribe<TMessage extends IFormatedMessage = IFormatedMessage>(scope: string | string[], callback: SubscriptionCallback<TMessage>): Promise<boolean> {
- return socketReady.then((notificationSocket) => {
- const scopes = scope instanceof Array ? scope : [scope];
- scopes.forEach(s => {
- const callbacks = subscriptions[s];
- const index = callbacks && callbacks.indexOf(callback);
- if (index > -1) {
- callbacks.splice(index, 1);
- }
- if (callbacks.length === 0) {
- subscriptions[s] === undefined;
- }
- });
-
+ removeScope(scope, callback);
+ return socketReady && socketReady.then((notificationSocket) => {
// send a subscription to all active scopes
- const scopesToSubscribe = Object.keys(subscriptions);
- if (notificationSocket.readyState === notificationSocket.OPEN) {
- const data = {
- 'data': 'scopes',
- 'scopes': scopesToSubscribe
- };
- notificationSocket.send(JSON.stringify(data));
- return true;
- }
- return false;
- });
+ return setCurrentSubscriptions(notificationSocket);
+ }) || true;
}
export const startNotificationService = (store: ApplicationStore) => {
@@ -111,24 +124,24 @@ const connect = (): Promise<WebSocket> => {
return new Promise((resolve, reject) => {
const notificationSocket = new WebSocket(socketUrl);
- notificationSocket.onmessage = (event) => {
+ notificationSocket.onmessage = (event: MessageEvent<string>) => {
// process received event
- if (typeof event.data === 'string') {
- const formated = formatData(event);
- if (formated && formated.notifType) {
- const callbacks = subscriptions[formated.notifType];
+
+ if (event.data && typeof event.data === "string" ) {
+ const msg = JSON.parse(event.data) as IFormatedMessage;
+ const callbacks = msg?.type?.type && subscriptions[msg.type.type];
if (callbacks) {
callbacks.forEach(cb => {
// ensure all callbacks will be called
try {
- return cb(formated);
+ return cb(msg);
} catch (reason) {
console.error(reason);
}
});
}
}
- }
+
};
notificationSocket.onerror = function (error) {
@@ -148,14 +161,7 @@ const connect = (): Promise<WebSocket> => {
resolve(notificationSocket);
// send a subscription to all active scopes
- const scopesToSubscribe = Object.keys(subscriptions);
- if (notificationSocket.readyState === notificationSocket.OPEN) {
- const data = {
- 'data': 'scopes',
- 'scopes': scopesToSubscribe
- };
- notificationSocket.send(JSON.stringify(data));
- };
+ setCurrentSubscriptions(notificationSocket);
};
notificationSocket.onclose = function (event) {
@@ -171,8 +177,6 @@ const connect = (): Promise<WebSocket> => {
}
-
-
export const startWebsocketSession = () => {
socketReady = connect();
userLoggedOut = false;