aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/websocketmanager/provider
diff options
context:
space:
mode:
authorRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2021-05-25 18:57:29 +0530
committerRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2021-06-14 10:22:54 +0530
commit17614362f2550c29dcd746ee2c1bc01d0df5de65 (patch)
tree97930a14a08c610efceb4aebb4f457e0cf42b2f8 /sdnr/wt/websocketmanager/provider
parentdb9f267b3930a28054e967c75db228e27663aedc (diff)
Improve Websocket notification interface
Improve websocket notification interface Issue-ID: CCSDK-3315 Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> Change-Id: I0ded865adddb546ade98df4760e0a32ec964295a Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/websocketmanager/provider')
-rw-r--r--sdnr/wt/websocketmanager/provider/pom.xml7
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java31
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java65
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java138
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java323
-rw-r--r--sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java19
-rw-r--r--sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java156
-rw-r--r--sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java76
-rw-r--r--sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java16
9 files changed, 639 insertions, 192 deletions
diff --git a/sdnr/wt/websocketmanager/provider/pom.xml b/sdnr/wt/websocketmanager/provider/pom.xml
index 0366ed2a4..fcdaa5e2f 100644
--- a/sdnr/wt/websocketmanager/provider/pom.xml
+++ b/sdnr/wt/websocketmanager/provider/pom.xml
@@ -57,6 +57,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.opendaylight.mdsal.model</groupId>
+ <artifactId>ietf-topology</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>sdnr-wt-yang-utils</artifactId>
<version>${project.version}</version>
@@ -136,7 +141,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>sdnr-wt-devicemanager-provider</artifactId>
+ <artifactId>sdnr-wt-devicemanager-core-provider</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
index 0b6e9b453..610001775 100644
--- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
@@ -24,6 +24,7 @@ import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationO
import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.common.QName;
import org.osgi.service.http.HttpService;
@@ -84,24 +85,44 @@ public class WebSocketManagerProvider implements WebsocketManagerService, AutoCl
@Override
- public void sendNotification(Notification notification, String nodeId, QName eventType) {
+ public void sendNotification(Notification notification, NodeId nodeId, QName eventType) {
+ if(!assertNotificationType(notification, eventType)){
+ return;
+ }
this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now()));
}
+ public static boolean assertNotificationType(Notification notification, QName eventType) {
+ final String yangTypeName = eventType.getLocalName();
+ final Class<?> cls = notification.getClass();
+ final String clsNameToTest = YangToolsMapperHelper.toCamelCaseClassName(yangTypeName);
+ if(cls.getSimpleName().equals(clsNameToTest)) {
+ return true;
+ }
+ Class<?>[] ifs = cls.getInterfaces();
+ for(Class<?> clsif:ifs) {
+ if(clsif.getSimpleName().equals(clsNameToTest)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
@Override
- public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) {
- WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime));
+ public void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) {
+ WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId.getValue(), eventType, eventTime));
}
@Override
- public void sendNotification(DOMNotification notification, String nodeId, QName eventType) {
+ public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType) {
LOG.warn("not yet implemented");
}
@Override
- public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) {
+ public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) {
LOG.warn("not yet implemented");
}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
index 945de3c1f..a642bda69 100644
--- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
@@ -25,6 +25,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.websocket.api.Session;
@@ -41,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class WebSocketManagerSocket extends WebSocketAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
public static final String MSG_KEY_DATA = "data";
public static final DataType MSG_KEY_SCOPES = DataType.scopes;
public static final String MSG_KEY_PARAM = "param";
@@ -54,7 +58,47 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
private static final Pattern PATTERN_SCOPEREGISTRATION =
Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
private static final Random RND = new Random();
+ private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
+ private static final int QUEUE_SIZE = 100;
+ private final Thread sendingSyncThread;
+ private final ArrayBlockingQueue<String> messageQueue;
+ private boolean closed;
+
+ private final Runnable sendingRunner = new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("isrunning");
+ while (!closed) {
+ try {
+
+ String message = messageQueue.poll();
+ if (message != null) {
+ WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
+ .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ LOG.info("message sent");
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("problem pushing message: ", e);
+ }
+
+ if (messageQueue.isEmpty()) {
+ trySleep(1000);
+ }
+
+ }
+ LOG.debug("isstopped");
+
+ };
+ };
+
+ private static void trySleep(int sleepMs) {
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
/**
* list of all sessionids
@@ -80,6 +124,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
public WebSocketManagerSocket() {
this.myUniqueSessionId = _genSessionId();
+ this.sendingSyncThread = new Thread(this.sendingRunner);
+ this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
}
@Override
@@ -112,6 +158,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
@Override
public void onWebSocketConnect(Session sess) {
this.session = sess;
+ closed = false;
+ this.sendingSyncThread.start();
clientList.put(String.valueOf(this.hashCode()), this);
LOG.debug("client connected from " + this.getRemoteAdr());
}
@@ -119,13 +167,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
@Override
public void onWebSocketClose(int statusCode, String reason) {
clientList.remove(String.valueOf(this.hashCode()));
+ this.sendingSyncThread.interrupt();
+ closed = true;
LOG.debug("client disconnected from " + this.getRemoteAdr());
}
@Override
public void onWebSocketError(Throwable cause) {
LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
- // super.onWebSocketError(cause);
}
private String getRemoteAdr() {
@@ -146,12 +195,12 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
private boolean manageClientRequest(String request) {
boolean ret = false;
final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
- if(!matcher.find()) {
+ if (!matcher.find()) {
return false;
}
try {
ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
- if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
+ if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
ret = true;
String sessionId = this.getSessionId();
UserScopes clientDto = new UserScopes();
@@ -188,9 +237,9 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
public void send(String msg) {
try {
LOG.trace("sending {}", msg);
- this.session.getRemote().sendString(msg);
- } catch (Exception e) {
- LOG.warn("problem sending message: " + e.getMessage());
+ this.messageQueue.put(msg);
+ } catch (InterruptedException e) {
+ LOG.warn("problem putting message into sending queue: " + e.getMessage());
}
}
@@ -200,7 +249,7 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
private void sendToAll(NotificationOutput output) {
try {
- this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
+ sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
} catch (JsonProcessingException e) {
LOG.warn("problem serializing noitifcation: ", e);
}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
deleted file mode 100644
index 5f3a5af2c..000000000
--- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : ccsdk features
- * ================================================================================
- * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
- * All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- */
-package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
-
-import java.time.Duration;
-import java.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Problems of to many notifications during mount of thousand of devices:
- * <ul>
- * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
- * <li>Notification processing blocks user -> App design with notifications popups
- * </ul>
- * Rate filter
- * <ul>
- * <li>Do not use a thread -> Do nothing if there are no notifications
- * <li>Parameter1 integrationTime : Measurement or integration time for period
- * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
- * <li>Start measurement on event received that comes later then
- * </ul>
- *
- * <pre>
- * Example (e: Event received, rateMaxCount=3)
- * eee e e e e e e e e e e e e e e
- * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
- * P1 P2 P1 P2 P3 P7 P1
- *Overload no no yes yes no no
- *
- *
- *Intention to use:
- * 1. Construct with parameters for WS stream to handle
- * 2.
- * </pre>
- */
-
-public class RateFilter {
-
- private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName());
-
- private final Duration integrationTime; // Integration time to measure event rate
- private final long rateMaxCount; //Rate for dropping packets
- private Instant timeStampPeriodStart; //Time stamp period beginn
- private Instant timeStampLastEvent; //Measurement interval
- private long rateCount; // >0: integration running 0: no integration running
- private boolean overload; //true means in overload status. Change at end of period only.
- private GetNow get;
-
- /**
- * Allow testing with own timestamp provider
- */
- public interface GetNow {
- Instant now();
- }
-
- public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) {
- this.integrationTime = integrationTime;
- this.rateMaxCount = rateMaxCount;
- this.get = getNowMethod;
- this.timeStampLastEvent = Instant.MIN;
- }
-
- public RateFilter(Duration integrationTime, long rateMaxCount) {
- this(integrationTime, rateMaxCount, () -> Instant.now());
- }
-
- public synchronized boolean getOverloadStatus() {
- return overload;
- }
-
- /**
- * Handle filter on event received
- */
- public synchronized void filterEvent() {
- final Instant now = get.now();
- final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now);
- this.timeStampLastEvent = now;
-
- if (durationSinceLastEvent.compareTo(integrationTime) >= 0) {
- //No measurement. Sync and start with period
- LOG.debug("Sync");
- timeStampPeriodStart = now;
- rateCount = 1; //Reset event count .. is part of the
- } else {
- //Within period
- Duration durationPeriod = Duration.between(timeStampPeriodStart, now);
- rateCount++;
- boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0;
- LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime);
- if (endOfPeriod) {
- //Only if end of Period
- overload = rateCount > rateMaxCount;
- LOG.debug("Reset overload {}", overload);
- timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime);
- rateCount = 0;
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("RateFilter [integrationTime=");
- builder.append(integrationTime);
- builder.append(", rateMaxCount=");
- builder.append(rateMaxCount);
- builder.append(", timeStampPeriodStart=");
- builder.append(timeStampPeriodStart);
- builder.append(", timeStampLastEvent=");
- builder.append(timeStampLastEvent);
- builder.append(", rateCount=");
- builder.append(rateCount);
- builder.append(", overload=");
- builder.append(overload);
- builder.append("]");
- return builder.toString();
- }
-}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java
new file mode 100644
index 000000000..7ffa29e89
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java
@@ -0,0 +1,323 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Problems of to many notifications during mount of thousand of devices:
+ * <ul>
+ * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
+ * <li>Notification processing blocks user -> App design with notifications popups
+ * </ul>
+ * Rate filter requirements
+ * <ul>
+ * <li>Use a single thread
+ * <li>Parameter1 integrationTime : Measurement or integration time for period
+ * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
+ * <li>Start measurement on event received that comes later then
+ * </ul>
+ *
+ * <pre>
+ * Example for behavior (e: Event received, rateMaxCount=3)
+ * eee e e e e e e e e e e e e e e
+ * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
+ * P1 P2 P1 P2 P3 P7 P1
+ *Overload no no yes yes no no
+ * </pre>
+ *
+ * Interface to use:
+ * <ul>
+ * <li>construct RateFilterManager. Parameters are integration time and function to get the actual time
+ * <li>RateFilterManager.getRateFilter() provides rateFilter object for a stream to count events and provide overload
+ * status.
+ * <li>rateFilter.event() count the events during measurement period
+ * <li>rateFilter.getOverloadStatus() indicates status
+ * <li>rateFilter.close() to release this object
+ * </ul>
+ */
+
+public class RateFilterManager implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RateFilterManager.class.getName());
+ private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("dd.mm.yy hh:mm:ss_SSS")
+ .withLocale(Locale.GERMAN).withZone(ZoneId.systemDefault());
+ private static final long CLIENTS_NUMBER_WARNLEVEL = 1000;
+
+ //Configuration
+ private final Duration integrationTime; // Integration time to measure event rate
+ private GetNow get; //Provides actual system time
+ private final Map<Long, RateFilter> rateFilterList;
+ @SuppressWarnings("unused")
+ private final Timer timerTask;
+
+ /**
+ * Allow testing with own timestamp provider Provide actual system time.
+ */
+ public interface GetNow {
+ Instant now();
+ }
+
+ /**
+ * Constructor with all parameters, intended to be used for unit test
+ *
+ * @param integrationTime is the interval length for counting events.
+ * @param rateMaxCountDefault if event count exceed this limit, status changes to overload.
+ * @param startTimer true start time with intervall time
+ * @param get function to provide actual system time.
+ */
+ public RateFilterManager(Duration integrationTime, boolean startTimer, GetNow get) {
+ this.integrationTime = integrationTime;
+ this.get = get;
+
+ this.rateFilterList = Collections.synchronizedMap(new HashMap<Long, RateFilter>());
+ this.timerTask = startTimer ? startTimerTask(integrationTime) : null;
+ }
+
+ /**
+ * Get RateFilter manager
+ *
+ * @param integrationTime is the time to measure events
+ * @param rateMaxCountDefault if exceeded state overload is true
+ */
+ public RateFilterManager(Duration integrationTime) {
+ this(integrationTime, true, () -> Instant.now());
+ }
+
+ /**
+ */
+ /**
+ * Get a specific rate filter for one stream. Use close() to release.
+ *
+ * @param ratePerMinute Rate per Minute for this filter. If 0 never overloaded.
+ * @return RateFilter object for each event stream.
+ * @throws IllegalArgumentException on negative rate
+ */
+ public synchronized RateFilter getRateFilter(long maxRatePerMinute) throws IllegalArgumentException {
+ long maxEventsPerIntegration = convertRPMToMaxCount(maxRatePerMinute);
+ if (maxEventsPerIntegration < 0)
+ throw new IllegalArgumentException(
+ "Resulting in illegal maxEventsPerIntegration=" + maxEventsPerIntegration);
+ return getRateFilterInstance(maxEventsPerIntegration);
+ }
+
+ @Override
+ public void close() {
+ if (timerTask != null) {
+ timerTask.cancel();
+ timerTask.purge();
+ }
+ rateFilterList.clear();
+ }
+
+ /**
+ * Function to get a new Ratefilter for a connection
+ *
+ * @param maxEventsPerIntegration
+ * @return reference to object with filter status
+ */
+ private RateFilter getRateFilterInstance(long maxEventsPerIntegration) {
+ RateFilter rateFilter;
+ synchronized (rateFilterList) {
+ rateFilter = rateFilterList.get(maxEventsPerIntegration);
+ if (rateFilter == null) {
+ rateFilter = new RateFilter(maxEventsPerIntegration);
+ synchronized (rateFilterList) {
+ rateFilterList.put(maxEventsPerIntegration, rateFilter);
+ }
+ } else {
+ if (rateFilter.addClient() > CLIENTS_NUMBER_WARNLEVEL)
+ LOG.warn("Warnlevel {} exceeded for client connections", CLIENTS_NUMBER_WARNLEVEL);
+ }
+ }
+ return rateFilter;
+ }
+
+ private Timer startTimerTask(Duration integrationTime) {
+ long milliseconds = integrationTime.toMillis();
+ LOG.debug("Start startTimerTask with {} ms", milliseconds);
+ Timer time = new Timer();
+ time.scheduleAtFixedRate(new TimeoutHandler(), 0L, milliseconds);
+ return time;
+ }
+
+ private class TimeoutHandler extends TimerTask {
+ @Override
+ public void run() {
+ LOG.debug("Run timeout task at {}", f(get.now()));
+ synchronized (rateFilterList) {
+ rateFilterList.forEach((k, f) -> f.timer());
+ }
+ }
+ }
+
+ /**
+ * Provide nice debug output for Instant and Duration
+ *
+ * @param i with instant
+ * @return output string
+ */
+ private static String f(Instant i) {
+ return i != null ? FORMATTER.format(i) : "null";
+ }
+
+ /**
+ * Convert a rate per minute into events per integration time.
+ *
+ * @param ratePerMinute
+ * @return events per integration time.
+ */
+ private long convertRPMToMaxCount(long ratePerMinute) {
+ return ratePerMinute * integrationTime.toSeconds() / TimeUnit.MINUTES.toSeconds(1);
+ }
+
+ /**
+ * Ratefilter class contains status informaton for one event stream.
+ */
+ public class RateFilter implements Closeable {
+ private final long maxEventsPerIntegration; //uuid and maximum of events without overload
+ private Long clients; // Number of clients for this filter.
+ private long rateCount; // number of events during integration period
+ private boolean overload; //true means in overload status. Change at end of period only.
+
+ /**
+ * Create a new Filter
+ *
+ * @param maxEventsPerIntegration >= 1 characteristics and uuid of this filter. < 1 switched off
+ * @see {@link #close}
+ */
+ private RateFilter(long maxEventsPerIntegration) {
+ synchronized (this) {
+ this.clients = 1L;
+ this.maxEventsPerIntegration = maxEventsPerIntegration;
+ this.rateCount = 0;
+ }
+ }
+
+ /**
+ * Add a client to this filter
+ *
+ * @return number of clients, handled by this filter
+ * @see {@link #close}
+ */
+ private synchronized long addClient() {
+ if (clients >= 1) {
+ ++clients;
+ } else {
+ LOG.warn("Misalligned open/close for {} with number {}", maxEventsPerIntegration, clients);
+ }
+ return clients;
+ }
+
+ /**
+ * Provide actual overload status
+ *
+ * @return status true means overloaded false not overloaded
+ */
+ public synchronized boolean getOverloadStatus() {
+ return overload;
+ }
+
+ /**
+ * Handle filter on event received
+ */
+ public synchronized void event() {
+ rateCount++;
+ LOG.debug("event rc:{}", rateCount);
+ }
+
+ /**
+ * Called if measurement period ends. Device if overload and reset counter.
+ */
+ public synchronized void timer() {
+ //Change overload only at end of period
+ //Always inactive if maxEventsPerIntegration== 0
+ if (maxEventsPerIntegration > 0) {
+ overload = rateCount > maxEventsPerIntegration;
+ }
+ rateCount = 0;
+ LOG.debug("Timer ol:{} rc:{}", overload, rateCount);
+ }
+
+ /**
+ * Get maximum events allowed per integration period
+ *
+ * @return 1 ...
+ */
+ public synchronized long getMaxEventsPerIntegration() {
+ return maxEventsPerIntegration;
+ }
+
+ /**
+ * Get number of client streams.
+ *
+ * @return 1 ...
+ */
+ public synchronized long getClients() {
+ return clients;
+ }
+
+ @Override
+ public void close() {
+ synchronized (rateFilterList) {
+ if (clients == 1) {
+ LOG.debug("Close and remove last client {}", maxEventsPerIntegration);
+ rateFilterList.remove(this.maxEventsPerIntegration);
+ clients--;
+ } else if (clients > 1) {
+ LOG.debug("Close one client of {} for events {}", clients, maxEventsPerIntegration);
+ clients--;
+ } else {
+ LOG.warn("Misaligned new/close for events {}", maxEventsPerIntegration);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RateFilter [maxEventsPerIntegration=");
+ builder.append(maxEventsPerIntegration);
+ builder.append(", clients=");
+ builder.append(clients);
+ builder.append(", rateCount=");
+ builder.append(rateCount);
+ builder.append(", overload=");
+ builder.append(overload);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java
index f3cf09545..df04c388f 100644
--- a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java
+++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java
@@ -20,13 +20,11 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileReader;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.nio.file.Paths;
+import java.nio.file.Files;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig;
@@ -58,19 +56,6 @@ public class AkkaConfigTest {
public static String loadResourceContentAsString(String resourceName)
throws URISyntaxException, FileNotFoundException, IOException {
- StringBuilder sb = new StringBuilder();
-
- ClassLoader classLoader = AkkaConfigTest.class.getClassLoader();
- File file = Paths.get(classLoader.getResource(resourceName).toURI()).toFile();
- try (BufferedReader br = new BufferedReader(new FileReader(file))) {
- String line = br.readLine();
-
- while (line != null) {
- sb.append(line);
- sb.append(System.lineSeparator());
- line = br.readLine();
- }
- }
- return sb.toString();
+ return Files.readString(new File("src/test/resources/"+resourceName).toPath());
}
}
diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java
index f4fab6810..d5a940f73 100644
--- a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java
+++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java
@@ -21,10 +21,15 @@
*/
package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import java.time.Duration;
import java.time.Instant;
+import java.util.Timer;
+import java.util.TimerTask;
import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilter;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilterManager;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilterManager.RateFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,11 +38,13 @@ import org.slf4j.LoggerFactory;
*
* <pre>
* Testcase (e: 17 Event received, rateMaxCount=3)
- * eee e e e e e e e e e e e e e e
- * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
- * P1:1 P2:1 P1:2 P2:2 P3:2 P4:2 P1:3
- * 1000-1002 2000 3500 10 millis
- *Overload no no yes yes no no
+ * 1 3 4 5 6 7 8 9 10 11 14 15 16 17 18
+ * t t t t t t t t t
+ * eee e e e e e e e e e e e e e e e
+ * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
+ * P1:1 P2:1 P1:2 P2:2 P3:2 P4:2 P1:3
+ * ms 500 1000-1002 2000 3500 4500 5500 6500 7500 8500
+ *Overload no no yes yes no no
* </pre>
*
*/
@@ -45,37 +52,140 @@ public class RateFilterTest {
private static final Logger LOG = LoggerFactory.getLogger(RateFilterTest.class.getName());
- private static int MILLIS = 1000;
- private static long[] now = { 1000, 1001, 1002, //P1:1 0-2
- 3500, 3550, 3560, 3570, 3580, 3590, 3800, //P1:2 3500 3-9
- 4510, 4520, 4530, 4540, 4900, //P2:2 4500 10-14
- 5700, //P3:2 5500 15
- 7000, 8000};//P1:3 16-17
+ private static int INTEGRATIONTIMEMILLIS = 1000;
+ private static long EVENTS_PER_INTERVALL = 4;
+ private static long RATE_PER_MINUTE = EVENTS_PER_INTERVALL * 60;
+ /* Negative event time indicates timer event */
+ private static long[] now = {-500, 1000, 1010, 1020, //P1:1 1-3
+ -1500, -2500, -3500, 3500, 3550, 3560, 3570, 3580, 3590, 3800, //P1:2 3500 4-10
+ -4500, 4510, 4520, 4530, 4540, 4900, //P2:2 4500 11-15
+ -5500, 5700, //P3:2 5500 16
+ -6500, -7500, 7500, 8000};//P1:3 17-18
+ private static boolean[] overload = {false, false, false, false, //P1:1 1-3
+ false, false, false, false, false, false, false, false, false, false, //P1:2 3500 4-10
+ true, true, true, true, true, true, //P2:2 4500 11-15
+ true, true, //P3:2 5500 16
+ false, false, false, false};//P1:3 17-18
+
private static int idx;
+ private static long millis;
@Test
- public void test() {
- RateFilter rateFilter = new RateFilter(Duration.ofMillis(MILLIS), 4, () -> getNow());
+ public void testStates() {
+ reset();
+ RateFilterManager rateFilterManager =
+ new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS), false, () -> getNow());
+ RateFilter rateFilter = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
LOG.info("Init done");
+ assertEquals("Events per integration period", EVENTS_PER_INTERVALL, rateFilter.getMaxEventsPerIntegration());
- for (int t=0; t < 20; t++) {
- LOG.info("{}", t);
- rateFilter.filterEvent();
- LOG.info("{}", rateFilter.getOverloadStatus());
+ for (int t = 1; t < 30; t++) {
+ boolean expected = tick();
+ if (millis < 0) {
+ LOG.info("{} - timer {}", t, millis);
+ rateFilter.timer();
+ } else {
+ LOG.info("{} - event {}", t, millis);
+ rateFilter.event();
+ }
+ LOG.info("Overload={} {}", rateFilter.getOverloadStatus(), expected);
+ assertEquals("Filter activity", expected, rateFilter.getOverloadStatus());
}
+ rateFilter.close();
+ }
+
+ @Test
+ public void testThread() throws InterruptedException {
+ LOG.info("testThread");
+ reset();
+ RateFilterManager rateFilterManager = new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS));
+ RateFilter rateFilter = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+
+ tick();
+ Thread.sleep(2000);
+
+ Object objectYouNeedToLockOn = new Object();
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ long localMillis;
+
+ @Override
+ public void run() {
+ long xLocalMillis = localMillis += 10;
+ long xMillis = Math.abs(millis);
+ if (xLocalMillis >= xMillis) {
+ LOG.info("aTime:{} Millis:{} Idx={}", xLocalMillis, xMillis, idx);
+ boolean expected = tick();
+ if (millis > 0) {
+ //Skip negatives .. handled by timer
+ rateFilter.event();
+ boolean actual = rateFilter.getOverloadStatus();
+ LOG.info("bTime:{} Millis:{} Idx={} Overload={} Expected={} {}", xLocalMillis, xMillis, idx,
+ actual, expected, actual == expected ? "" : "XXXX");
+ if (idx >= 30) {
+ LOG.info("Test is ending");
+ synchronized (objectYouNeedToLockOn) {
+ objectYouNeedToLockOn.notify();
+ }
+ timer.cancel();
+ }
+ assertEquals("Filter activity", expected, rateFilter.getOverloadStatus());
+ }
+ }
+ }
+ }, 0, 10);
+ synchronized (objectYouNeedToLockOn) {
+ objectYouNeedToLockOn.wait();
+ }
+ //rateFilter.close();
+ LOG.info("Test end");
+ }
+
+ @Test
+ public void testMultipleClients() {
+ RateFilterManager rateFilterManager = new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS));
+ RateFilter rateFilter1 = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+ assertEquals("Multiple clients", 1, rateFilter1.getClients());
+ RateFilter rateFilter2 = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+ assertEquals("Multiple clients", 2, rateFilter1.getClients());
+ RateFilter rateFilter3 = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+ assertEquals("Multiple clients", 3, rateFilter1.getClients());
+
+ assertEquals("Similar instances", rateFilter1, rateFilter3);
+
+ RateFilter rateFilterOther = rateFilterManager.getRateFilter(2*RATE_PER_MINUTE);
+ assertNotEquals("Different instances", rateFilter1, rateFilterOther);
+ rateFilterOther.close();
+
+ rateFilter3.close();
+ assertEquals("Multiple clients", 2, rateFilter1.getClients());
+ rateFilter2.close();
+ assertEquals("Multiple clients", 1, rateFilter1.getClients());
+ rateFilter1.close();
+ assertEquals("Multiple clients", 0, rateFilter1.getClients());
+
+ rateFilterManager.close();
+ }
+
+ private Instant getNow() {
+ LOG.debug("Now:{}", millis);
+ return Instant.ofEpochMilli(Math.abs(millis));
+ }
+ private void reset() {
+ idx = 0;
}
- Instant getNow() {
- long res;
+ private boolean tick() {
if (idx < now.length) {
- res = now[idx];
+ millis = now[idx];
} else {
int lastIdx = now.length - 1;
- res = now[lastIdx] + (idx - lastIdx) * MILLIS;
+ millis = now[lastIdx] + (idx - lastIdx) * INTEGRATIONTIMEMILLIS;
}
+ boolean expected = idx < overload.length ? overload[idx] : false;
idx++;
- return Instant.ofEpochMilli(res);
+ return expected;
}
}
diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java
new file mode 100644
index 000000000..885ded348
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.SchemaInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.Scope;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectCreationNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public class UserScopeTest {
+
+
+ @Test
+ public void testAllNodes() {
+ UserScopes scopes1 = new UserScopes();
+ scopes1.setScopes(Arrays.asList(buildScope(null, ProblemNotification.QNAME)));
+
+ assertTrue(scopes1.hasScope(new ReducedSchemaInfo(ProblemNotification.QNAME)));
+ assertFalse(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ObjectCreationNotification.QNAME)));
+
+ assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ProblemNotification.QNAME)));
+
+ }
+
+ @Test
+ public void testRevisionStar() {
+ UserScopes scopes1 = new UserScopes();
+ scopes1.setScopes(
+ Arrays.asList(buildScope(null, ProblemNotification.QNAME.getNamespace().toString(), "*", null)));
+
+ assertTrue(scopes1.hasScope(new ReducedSchemaInfo(ProblemNotification.QNAME)));
+ assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ObjectCreationNotification.QNAME)));
+
+ assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ProblemNotification.QNAME)));
+
+ }
+
+ private static final Scope buildScope(String nodeId, String namespace, String revision,
+ List<String> notifications) {
+ Scope scope = new Scope();
+ scope.setNodeId(nodeId);
+ scope.setSchema(new SchemaInfo(namespace, revision, notifications));
+ return scope;
+ }
+
+ private static final Scope buildScope(String nodeId, QName qname) {
+ Scope scope = new Scope();
+ scope.setNodeId(nodeId);
+ scope.setSchema(new SchemaInfo(qname));
+ return scope;
+ }
+
+}
diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java
index bc3cd10f8..2e6462462 100644
--- a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java
+++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java
@@ -17,11 +17,17 @@
*/
package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
import org.mockito.Mockito;
import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerProvider;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectCreationNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotificationBuilder;
+import org.opendaylight.yangtools.yang.binding.Notification;
import org.osgi.service.http.HttpService;
public class WebsockerProviderTest extends Mockito {
@@ -42,4 +48,14 @@ public class WebsockerProviderTest extends Mockito {
}
+ @Test
+ public void testTypeAssertion() {
+
+ Notification problemNotification = new ProblemNotificationBuilder().build();
+ assertTrue(WebSocketManagerProvider.assertNotificationType(problemNotification, ProblemNotification.QNAME));
+ assertFalse(
+ WebSocketManagerProvider.assertNotificationType(problemNotification, ObjectCreationNotification.QNAME));
+
+ }
+
}