summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/websocketmanager/provider/src/main/java
diff options
context:
space:
mode:
authorMichael DÜrre <michael.duerre@highstreet-technologies.com>2021-04-08 06:34:22 +0200
committerMichael DÜrre <michael.duerre@highstreet-technologies.com>2021-04-08 06:34:46 +0200
commitf3969004c6ccac18e742c5fc48c844e315991023 (patch)
treef5486a62e842bb16ca7d3af47a8663df08feef55 /sdnr/wt/websocketmanager/provider/src/main/java
parenta252be83694ae33260d99d5371ed48c1558aa2e8 (diff)
update websocketmanager
update complete notification flow Issue-ID: CCSDK-3252 Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com> Change-Id: I87ba00f615707b942471fcace57bcda50ce37e61
Diffstat (limited to 'sdnr/wt/websocketmanager/provider/src/main/java')
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java130
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java109
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java245
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java34
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java207
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java138
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java56
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java120
8 files changed, 1039 insertions, 0 deletions
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java
new file mode 100644
index 000000000..7b4916d5a
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerSocket.EventInputCallback;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterConfig;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterNodeInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.websocket.SyncWebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebSocketManager extends WebSocketServlet {
+
+ private static final long serialVersionUID = -681665669062744439L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
+ private static final String APPLICATION_NAME = WebSocketManager.class.getName();
+ private static final int PORT = 8181;
+ private final EventInputCallback rpcEventInputCallback;
+ private final AkkaConfig akkaConfig;
+ /**
+ * timeout for websocket with no messages in ms
+ */
+ //private static final long IDLE_TIMEOUT = 5 * 60 * 1000L;
+ private static final long IDLE_TIMEOUT = 0L;
+
+ private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
+
+ public WebSocketManager() {
+ this(null, null);
+ }
+
+ public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
+ super();
+ this.akkaConfig = akkaconfig;
+ if (cb != null) {
+ this.rpcEventInputCallback = cb;
+ } else {
+ this.rpcEventInputCallback = message -> {
+ LOG.debug("onMessagePushed: " + message);
+ SyncWebSocketClient client;
+ for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
+ client = new SyncWebSocketClient(clientURI);
+ LOG.debug("try to push message to " + client.getURI());
+ client.openAndSendAndCloseSync(message);
+ }
+ };
+ }
+ LOG.info("Create servlet for {}", APPLICATION_NAME);
+ }
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ LOG.info("Configure provider for {}", APPLICATION_NAME);
+ // set a second timeout
+ factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT);
+ factory.getPolicy().setMaxBinaryMessageSize(1);
+ factory.getPolicy().setMaxTextMessageSize(64 * 1024);
+
+ // register Socket as the WebSocket to create on Upgrade
+ factory.register(WebSocketManagerSocket.class);
+
+ AkkaConfig cfg = this.akkaConfig;
+ if (cfg == null) {
+ try {
+ cfg = AkkaConfig.load();
+ } catch (Exception e) {
+ LOG.warn("problem loading akka config: " + e.getMessage());
+ }
+ }
+ if (cfg != null && cfg.isCluster()) {
+ this.initWSClients(cfg.getClusterConfig());
+ }
+ }
+
+ /**********************************************************
+ * Private functions
+ */
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ if (req.getHeader("Upgrade") != null) {
+ /* Accept upgrade request */
+ resp.setStatus(101);
+ resp.setHeader("Upgrade", "XYZP");
+ resp.setHeader("Connection", "Upgrade");
+ resp.setHeader("OtherHeaderB", "Value");
+ }
+ }
+
+ private void initWSClients(ClusterConfig clusterConfig) {
+ for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
+ if (clusterConfig.isMe(nodeConfig)) {
+ continue;
+ }
+ String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT);
+ try {
+ LOG.debug("registering ws client for " + url);
+ clusterNodeClients.add(new URI(url));
+ } catch (URISyntaxException e) {
+ LOG.warn("problem instantiating wsclient for url: " + url);
+ }
+ }
+ }
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
new file mode 100644
index 000000000..0b6e9b453
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
+
+import java.time.Instant;
+import javax.servlet.ServletException;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.WebsocketManagerService;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
+import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.osgi.service.http.HttpService;
+import org.osgi.service.http.NamespaceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebSocketManagerProvider implements WebsocketManagerService, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class);
+ private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName();
+ private static final String ALIAS = "/websocket";
+
+ private WebSocketManager wsServlet = null;
+
+ public WebSocketManagerProvider() {
+ LOG.info("Creating provider for {}", APPLICATION_NAME);
+ }
+
+
+ public void init() {
+ LOG.info("Init provider for {}", APPLICATION_NAME);
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("Close provider for {}", APPLICATION_NAME);
+ }
+
+ public void onUnbindService(HttpService httpService) {
+ httpService.unregister(ALIAS);
+ wsServlet = null;
+ }
+
+ public void onBindService(HttpService httpService) throws ServletException, NamespaceException {
+ if (httpService == null) {
+ LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService");
+ } else {
+
+ if (wsServlet == null) {
+ wsServlet = new WebSocketManager();
+ httpService.registerServlet(ALIAS, wsServlet, null, null);
+ LOG.info("websocket servlet registered.");
+ } else {
+ LOG.warn("Servelt ");
+ }
+ }
+
+ }
+
+ public WebSocketManager getWsServlet() {
+ return wsServlet;
+ }
+
+ public void setWsServlet(WebSocketManager wsServlet) {
+ this.wsServlet = wsServlet;
+ }
+
+
+ @Override
+ public void sendNotification(Notification notification, String nodeId, QName eventType) {
+ this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now()));
+ }
+
+ @Override
+ public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) {
+ WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime));
+
+ }
+
+ @Override
+ public void sendNotification(DOMNotification notification, String nodeId, QName eventType) {
+ LOG.warn("not yet implemented");
+
+ }
+
+ @Override
+ public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) {
+ LOG.warn("not yet implemented");
+
+ }
+
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
new file mode 100644
index 000000000..945de3c1f
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
@@ -0,0 +1,245 @@
+/*
+* ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
+import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebSocketManagerSocket extends WebSocketAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName());
+ public static final String MSG_KEY_DATA = "data";
+ public static final DataType MSG_KEY_SCOPES = DataType.scopes;
+ public static final String MSG_KEY_PARAM = "param";
+ public static final String MSG_KEY_VALUE = "value";
+ public static final String MSG_KEY_SCOPE = "scope";
+
+ public static final String KEY_NODEID = "nodeId";
+ public static final String KEY_EVENTTYPE = "eventType";
+ private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
+ private static final Pattern PATTERN_SCOPEREGISTRATION =
+ Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
+ private static final Random RND = new Random();
+
+
+ /**
+ * list of all sessionids
+ */
+ private static final List<String> sessionIds = new ArrayList<>();
+ /**
+ * map of sessionid <=> UserScopes
+ */
+ private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
+ /**
+ * map of class.hashCode <=> class
+ */
+ private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
+
+ private static final YangToolsMapper mapper = new YangToolsMapper();
+ private final String myUniqueSessionId;
+
+ private Session session = null;
+
+ public interface EventInputCallback {
+ void onMessagePushed(final String message) throws Exception;
+ }
+
+ public WebSocketManagerSocket() {
+ this.myUniqueSessionId = _genSessionId();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ sessionIds.remove(this.myUniqueSessionId);
+ }
+
+ private static String _genSessionId() {
+ String sid = String.valueOf(RND.nextLong());
+ while (sessionIds.contains(sid)) {
+ sid = String.valueOf(RND.nextLong());
+ }
+ sessionIds.add(sid);
+ return sid;
+ }
+
+ @Override
+ public void onWebSocketText(String message) {
+ LOG.info("{} has sent {}", this.getRemoteAdr(), message);
+ if (!this.manageClientRequest(message)) {
+ this.manageClientRequest2(message);
+ }
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int len) {
+ LOG.debug("Binary not supported");
+ }
+
+ @Override
+ public void onWebSocketConnect(Session sess) {
+ this.session = sess;
+ clientList.put(String.valueOf(this.hashCode()), this);
+ LOG.debug("client connected from " + this.getRemoteAdr());
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ clientList.remove(String.valueOf(this.hashCode()));
+ LOG.debug("client disconnected from " + this.getRemoteAdr());
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
+ // super.onWebSocketError(cause);
+ }
+
+ private String getRemoteAdr() {
+ String adr = "unknown";
+ try {
+ adr = this.session.getRemoteAddress().toString();
+ } catch (Exception e) {
+ LOG.debug("error resolving adr: {}", e.getMessage());
+ }
+ return adr;
+ }
+
+ /**
+ *
+ * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
+ * @return if handled
+ */
+ private boolean manageClientRequest(String request) {
+ boolean ret = false;
+ final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
+ if(!matcher.find()) {
+ return false;
+ }
+ try {
+ ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
+ if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
+ ret = true;
+ String sessionId = this.getSessionId();
+ UserScopes clientDto = new UserScopes();
+ clientDto.setScopes(registration.getScopes());
+ userScopesList.put(sessionId, clientDto);
+ this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
+ }
+
+ } catch (JsonProcessingException e) {
+ LOG.warn("problem set scope: " + e.getMessage());
+ try {
+ this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
+ } catch (JsonProcessingException e1) {
+ LOG.warn("problem sending error response via ws: " + e1);
+ }
+ }
+ return ret;
+ }
+
+ /*
+ * broadcast message to all your clients
+ */
+ private void manageClientRequest2(String request) {
+ try {
+ NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
+ if (notification.getNodeId() != null && notification.getType() != null) {
+ this.sendToAll(notification.getNodeId(), notification.getType(), request);
+ }
+ } catch (Exception e) {
+ LOG.warn("handle ws request failed:" + e.getMessage());
+ }
+ }
+
+ public void send(String msg) {
+ try {
+ LOG.trace("sending {}", msg);
+ this.session.getRemote().sendString(msg);
+ } catch (Exception e) {
+ LOG.warn("problem sending message: " + e.getMessage());
+ }
+ }
+
+ public String getSessionId() {
+ return this.myUniqueSessionId;
+ }
+
+ private void sendToAll(NotificationOutput output) {
+ try {
+ this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
+ } catch (JsonProcessingException e) {
+ LOG.warn("problem serializing noitifcation: ", e);
+ }
+ }
+
+ private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
+ if (clientList.size() > 0) {
+ for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
+ WebSocketManagerSocket socket = entry.getValue();
+ if (socket != null) {
+ try {
+ UserScopes clientScopes = userScopesList.get(socket.getSessionId());
+ if (clientScopes != null) {
+ if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
+ socket.send(notification);
+ } else {
+ LOG.debug("client has not scope {}", reducedSchemaInfo);
+ }
+ } else {
+ LOG.debug("no scopes for notifications registered");
+ }
+ } catch (Exception ioe) {
+ LOG.warn(ioe.getMessage());
+ }
+ } else {
+ LOG.debug("cannot broadcast. socket is null");
+ }
+ }
+ }
+ }
+
+ public static void broadCast(NotificationOutput output) {
+ if (clientList.size() > 0) {
+ Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
+ WebSocketManagerSocket s = e.iterator().next().getValue();
+ if (s != null) {
+ s.sendToAll(output);
+ }
+ }
+ }
+
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java
new file mode 100644
index 000000000..6627eeadf
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.data;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class TimeRateLimitingQueue<T> extends ArrayBlockingQueue<T>{
+
+ public TimeRateLimitingQueue(int capacity) {
+ super(capacity);
+ // TODO Auto-generated constructor stub
+ }
+
+
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java
new file mode 100644
index 000000000..794515bb2
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java
@@ -0,0 +1,207 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AkkaConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
+
+ public static class ClusterNodeInfo {
+ @Override
+ public String toString() {
+ return "ClusterNodeInfo [protocol=" + protocol + ", clusterName=" + clusterName + ", remoteAdr=" + remoteAdr
+ + ", port=" + port + "]";
+ }
+
+ private final String protocol;
+ private final String clusterName;
+ private final String remoteAdr;
+ private final int port;
+
+ public String getRemoteAddress() {
+ return this.remoteAdr;
+ }
+
+ public ClusterNodeInfo(String s) throws Exception {
+ final String regex = "([a-z.]*):\\/\\/([a-zA-Z0-9-]*)@([a-zA-Z0-9.-]*):([0-9]*)";
+ final Pattern pattern = Pattern.compile(regex);
+ final Matcher matcher = pattern.matcher(s);
+ if (!matcher.find()) {
+ throw new Exception("invalid seedNode format");
+ }
+ this.protocol = matcher.group(1);
+ this.clusterName = matcher.group(2);
+ this.remoteAdr = matcher.group(3);
+ this.port = Integer.parseInt(matcher.group(4));
+ }
+ }
+ public static class ClusterRoleInfo {
+ @Override
+ public String toString() {
+ return "ClusterRoleInfo [Role=" + Role + ", Index=" + Index + "]";
+ }
+
+ private final String Role;
+ private final int Index;
+
+ public ClusterRoleInfo(String s) throws Exception {
+ final String regex = "([a-z]*)-([0-9]*)";
+ final Pattern pattern = Pattern.compile(regex);
+ final Matcher matcher = pattern.matcher(s);
+ if (!matcher.find()) {
+ throw new Exception("invalid role format");
+ }
+ this.Role = matcher.group(1);
+ this.Index = Integer.parseInt(matcher.group(2));
+ }
+
+ }
+ public static class ClusterConfig {
+ @Override
+ public String toString() {
+ return "ClusterConfig [seedNodes=" + seedNodes + ", roles=" + roles + "]";
+ }
+
+ private final List<ClusterNodeInfo> seedNodes;
+ private final List<ClusterRoleInfo> roles;
+ private final ClusterNodeInfo ismeInfo;
+
+ public ClusterConfig(Config o) throws Exception {
+ {
+ this.seedNodes = new ArrayList<>();
+ List<String> a = o.getStringList("seed-nodes");
+ for (int i = 0; i < a.size(); i++) {
+ ClusterNodeInfo info = new ClusterNodeInfo(a.get(i));
+ this.seedNodes.add(info);
+ }
+ this.roles = new ArrayList<>();
+ a = o.getStringList("roles");
+ for (int i = 0; i < a.size(); i++) {
+ ClusterRoleInfo s = new ClusterRoleInfo(a.get(i));
+ this.roles.add(s);
+ }
+ int idx = this.roles.get(0).Index - 1;
+ if (idx >= 0 && idx < this.seedNodes.size()) {
+ this.ismeInfo = this.seedNodes.get(idx);
+ } else {
+ this.ismeInfo = null;
+ }
+ }
+
+ }
+
+ public boolean isCluster() {
+ return this.seedNodes != null ? this.seedNodes.size() > 1 : false;
+ }
+
+ public boolean isMe(ClusterNodeInfo i) {
+ return this.ismeInfo != null ? this.ismeInfo.equals(i) : false;
+ }
+
+ public List<ClusterNodeInfo> getSeedNodes() {
+ return this.seedNodes;
+ }
+ }
+
+ private static final String DEFAULT_FILENAME = "configuration/initial/akka.conf";
+ private final File file;
+ private final String resourceFilename;
+ private final String fileContent;
+ private ClusterConfig cluserConfig;
+
+ public ClusterConfig getClusterConfig() {
+ return this.cluserConfig;
+ }
+
+ private AkkaConfig(File file, boolean isResource) {
+ this.file = isResource ? null : file;
+ this.fileContent = null;
+ this.resourceFilename = isResource ? file.getName() : null;
+ }
+
+ private AkkaConfig(String fileContent) {
+ this.file = null;
+ this.fileContent = fileContent;
+ this.resourceFilename = null;
+ }
+
+
+ @Override
+ public String toString() {
+ return "AkkaConfig [filename=" + file + ", cluserConfig=" + cluserConfig + "]";
+ }
+
+ private void loadFromFile() throws Exception {
+ Config cfg = null;
+ if (this.file != null) {
+ cfg = ConfigFactory.parseFile(this.file);
+ } else if (this.fileContent != null) {
+ cfg = ConfigFactory.parseString(this.fileContent);
+ } else if (this.resourceFilename != null) {
+ cfg = ConfigFactory.parseResources(this.getClass(), this.resourceFilename);
+ }
+
+ if (cfg != null) {
+ this.cluserConfig =
+ new ClusterConfig(cfg.getConfig("odl-cluster-data").getConfig("akka").getConfig("cluster"));
+ } else {
+ LOG.warn("unable to parse config file");
+ this.cluserConfig = null;
+ }
+ }
+
+ public boolean isCluster() {
+ return this.cluserConfig != null ? this.cluserConfig.isCluster() : false;
+ }
+
+ public static AkkaConfig load() throws Exception {
+ return load(DEFAULT_FILENAME);
+ }
+
+ public static AkkaConfig load(String filename) throws Exception {
+ return load(filename, false);
+ }
+
+ public static AkkaConfig load(String filename, boolean isResource) throws Exception {
+ AkkaConfig cfg = new AkkaConfig(new File(filename), isResource);
+ cfg.loadFromFile();
+
+ return cfg;
+ }
+
+ public static AkkaConfig loadContent(String content) throws Exception {
+ AkkaConfig cfg = new AkkaConfig(content);
+ cfg.loadFromFile();
+
+ return cfg;
+ }
+
+
+
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
new file mode 100644
index 000000000..5f3a5af2c
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
@@ -0,0 +1,138 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Problems of to many notifications during mount of thousand of devices:
+ * <ul>
+ * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
+ * <li>Notification processing blocks user -> App design with notifications popups
+ * </ul>
+ * Rate filter
+ * <ul>
+ * <li>Do not use a thread -> Do nothing if there are no notifications
+ * <li>Parameter1 integrationTime : Measurement or integration time for period
+ * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
+ * <li>Start measurement on event received that comes later then
+ * </ul>
+ *
+ * <pre>
+ * Example (e: Event received, rateMaxCount=3)
+ * eee e e e e e e e e e e e e e e
+ * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
+ * P1 P2 P1 P2 P3 P7 P1
+ *Overload no no yes yes no no
+ *
+ *
+ *Intention to use:
+ * 1. Construct with parameters for WS stream to handle
+ * 2.
+ * </pre>
+ */
+
+public class RateFilter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName());
+
+ private final Duration integrationTime; // Integration time to measure event rate
+ private final long rateMaxCount; //Rate for dropping packets
+ private Instant timeStampPeriodStart; //Time stamp period beginn
+ private Instant timeStampLastEvent; //Measurement interval
+ private long rateCount; // >0: integration running 0: no integration running
+ private boolean overload; //true means in overload status. Change at end of period only.
+ private GetNow get;
+
+ /**
+ * Allow testing with own timestamp provider
+ */
+ public interface GetNow {
+ Instant now();
+ }
+
+ public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) {
+ this.integrationTime = integrationTime;
+ this.rateMaxCount = rateMaxCount;
+ this.get = getNowMethod;
+ this.timeStampLastEvent = Instant.MIN;
+ }
+
+ public RateFilter(Duration integrationTime, long rateMaxCount) {
+ this(integrationTime, rateMaxCount, () -> Instant.now());
+ }
+
+ public synchronized boolean getOverloadStatus() {
+ return overload;
+ }
+
+ /**
+ * Handle filter on event received
+ */
+ public synchronized void filterEvent() {
+ final Instant now = get.now();
+ final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now);
+ this.timeStampLastEvent = now;
+
+ if (durationSinceLastEvent.compareTo(integrationTime) >= 0) {
+ //No measurement. Sync and start with period
+ LOG.debug("Sync");
+ timeStampPeriodStart = now;
+ rateCount = 1; //Reset event count .. is part of the
+ } else {
+ //Within period
+ Duration durationPeriod = Duration.between(timeStampPeriodStart, now);
+ rateCount++;
+ boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0;
+ LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime);
+ if (endOfPeriod) {
+ //Only if end of Period
+ overload = rateCount > rateMaxCount;
+ LOG.debug("Reset overload {}", overload);
+ timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime);
+ rateCount = 0;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RateFilter [integrationTime=");
+ builder.append(integrationTime);
+ builder.append(", rateMaxCount=");
+ builder.append(rateMaxCount);
+ builder.append(", timeStampPeriodStart=");
+ builder.append(timeStampPeriodStart);
+ builder.append(", timeStampLastEvent=");
+ builder.append(timeStampLastEvent);
+ builder.append(", rateCount=");
+ builder.append(rateCount);
+ builder.append(", overload=");
+ builder.append(overload);
+ builder.append("]");
+ return builder.toString();
+ }
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java
new file mode 100644
index 000000000..3969bcb15
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
+
+import java.util.List;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.Scope;
+
+public class UserScopes {
+
+ private List<Scope> scopes;
+
+ /**
+ *
+ * @param list array of Strings
+ */
+ public void setScopes(List<Scope> list) {
+ this.scopes = list;
+ }
+
+ public boolean hasScope(NotificationOutput output) {
+ return this.hasScope(output.getNodeId(), output.getType());
+ }
+
+ public boolean hasScope(ReducedSchemaInfo schema) {
+ return this.hasScope(null, schema);
+ }
+
+ public boolean hasScope(String nodeId, ReducedSchemaInfo reducedSchemaInfo) {
+ if (this.scopes == null)
+ return false;
+ for (Scope scope : this.scopes) {
+ if (scope.matches(nodeId, reducedSchemaInfo)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java
new file mode 100644
index 000000000..c9177205b
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.websocket;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncWebSocketClient extends WebSocketClient {
+
+ public interface WebsocketEventHandler {
+ void onMessageReceived(String message);
+
+ void onOpen(ServerHandshake arg0);
+
+ void onClose(int arg0, String arg1, boolean arg2);
+
+ void onError(Exception e);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncWebSocketClient.class.getName());
+ private String messageToSend;
+ private final List<WebsocketEventHandler> handlers;
+
+ public SyncWebSocketClient(URI serverUri) {
+ super(serverUri);
+ this.handlers = new ArrayList<WebsocketEventHandler>();
+ }
+
+ public SyncWebSocketClient(String uri) throws URISyntaxException {
+ this(new URI(uri));
+ }
+
+ public void addEventHandler(WebsocketEventHandler h) {
+ this.handlers.add(h);
+ }
+
+ public void removeEventHandler(WebsocketEventHandler h) {
+ this.handlers.remove(h);
+ }
+
+ @Override
+ public void onClose(int arg0, String arg1, boolean arg2) {
+ LOG.debug("socket closed: {} {} {}", arg0, arg1, arg2);
+ for (WebsocketEventHandler h : this.handlers) {
+ h.onClose(arg0, arg1, arg2);
+ }
+ }
+
+ @Override
+ public void onError(Exception arg0) {
+ LOG.warn("error on socket: {}", arg0.getMessage());
+ for (WebsocketEventHandler h : this.handlers) {
+ h.onError(arg0);
+ }
+ }
+
+ @Override
+ public void onMessage(String arg0) {
+ LOG.debug("received message: {}", arg0);
+ for (WebsocketEventHandler h : this.handlers) {
+ h.onMessageReceived(arg0);
+ }
+ }
+
+ @Override
+ public void onOpen(ServerHandshake arg0) {
+ LOG.debug("socket opened");
+ if (this.messageToSend != null) {
+ LOG.debug("try to send: " + this.messageToSend);
+ this.send(this.messageToSend);
+ this.messageToSend = null;
+ }
+ for (WebsocketEventHandler h : this.handlers) {
+ h.onOpen(arg0);
+ }
+ }
+
+ public void openAndSendAsync(String message) {
+ this.messageToSend = message;
+ this.connect();
+ }
+
+ public void openAndSendAndCloseSync(String message) {
+ try {
+ this.connectBlocking();
+ } catch (InterruptedException e) {
+ LOG.warn("problem connecting:" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ this.send(message);
+ try {
+ this.closeBlocking();
+ } catch (InterruptedException e) {
+ LOG.warn("problem disconnecting:" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}