diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2021-05-25 18:57:29 +0530 |
---|---|---|
committer | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2021-06-14 10:22:54 +0530 |
commit | 17614362f2550c29dcd746ee2c1bc01d0df5de65 (patch) | |
tree | 97930a14a08c610efceb4aebb4f457e0cf42b2f8 /sdnr/wt/websocketmanager | |
parent | db9f267b3930a28054e967c75db228e27663aedc (diff) |
Improve Websocket notification interface
Improve websocket notification interface
Issue-ID: CCSDK-3315
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: I0ded865adddb546ade98df4760e0a32ec964295a
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/websocketmanager')
13 files changed, 668 insertions, 204 deletions
diff --git a/sdnr/wt/websocketmanager/installer/pom.xml b/sdnr/wt/websocketmanager/installer/pom.xml index a13f064e3..661a9b3fb 100755 --- a/sdnr/wt/websocketmanager/installer/pom.xml +++ b/sdnr/wt/websocketmanager/installer/pom.xml @@ -47,7 +47,7 @@ <dependencies> <dependency> - <groupId>org.onap.ccsdk.features.sdnr.wt</groupId> + <groupId>${project.groupId}</groupId> <artifactId>${application.name}-feature</artifactId> <version>${project.version}</version> <type>xml</type> @@ -60,7 +60,7 @@ </exclusions> </dependency> <dependency> - <groupId>org.onap.ccsdk.features.sdnr.wt</groupId> + <groupId>${project.groupId}</groupId> <artifactId>${application.name}-provider</artifactId> <version>${project.version}</version> </dependency> diff --git a/sdnr/wt/websocketmanager/model/pom.xml b/sdnr/wt/websocketmanager/model/pom.xml index 7026b3329..ec384d7c6 100644 --- a/sdnr/wt/websocketmanager/model/pom.xml +++ b/sdnr/wt/websocketmanager/model/pom.xml @@ -56,6 +56,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.opendaylight.mdsal.model</groupId> + <artifactId>ietf-topology</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> diff --git a/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/WebsocketManagerService.java b/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/WebsocketManagerService.java index bfceb373e..305d7453c 100644 --- a/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/WebsocketManagerService.java +++ b/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/WebsocketManagerService.java @@ -2,6 +2,7 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager.model; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.QName; @@ -57,7 +58,7 @@ public interface WebsocketManagerService { * @param nodeId * @param eventType */ - void sendNotification(Notification notification, String nodeId, QName eventType); + void sendNotification(Notification notification, NodeId nodeId, QName eventType); /** * Send notification via Websocket to the connected clients. * @param notification @@ -65,7 +66,7 @@ public interface WebsocketManagerService { * @param eventType * @param eventTime */ - void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime); + void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime); /** * Send notification via Websocket to the connected clients. @@ -73,7 +74,7 @@ public interface WebsocketManagerService { * @param nodeId * @param eventType */ - void sendNotification(DOMNotification notification, String nodeId, QName eventType); + void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType); /** * Send notification via Websocket to the connected clients. * @param notification @@ -81,7 +82,7 @@ public interface WebsocketManagerService { * @param eventType * @param eventTime */ - void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime); + void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime); diff --git a/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/data/SchemaInfo.java b/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/data/SchemaInfo.java index c587a7997..4d3975379 100644 --- a/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/data/SchemaInfo.java +++ b/sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/data/SchemaInfo.java @@ -36,12 +36,17 @@ public class SchemaInfo { public SchemaInfo(QName qname) { - this.namespace = qname.getNamespace().toString(); - this.revision = qname.getRevision().isPresent() ? qname.getRevision().get().toString() : null; - this.notification = new ArrayList<>(); + this(qname.getNamespace().toString(), + qname.getRevision().isPresent() ? qname.getRevision().get().toString() : null, new ArrayList<>()); this.notification.add(qname.getLocalName()); } + public SchemaInfo(String namespace, String revision, List<String> notifications) { + this.namespace = namespace; + this.revision = revision; + this.notification = notifications; + } + public String getNamespace() { return namespace; } @@ -66,6 +71,11 @@ public class SchemaInfo { this.notification = notification; } + /** + * SchemaInfo Validation restrictions: namespace!=null notification=null or if notification list set, then size>0 + * + * @return + */ @JsonIgnore public boolean isValid() { return this.namespace != null @@ -74,6 +84,7 @@ public class SchemaInfo { /** * Check if schema(qname based info of notification) matches into this scope + * * @param schema * @return */ @@ -87,8 +98,8 @@ public class SchemaInfo { if (!this.namespace.equals(schema.getNamespace().toString())) { return false; } - //if revision of scope is set and it does not match => false - if (this.revision != null && !this.revision.equals(schema.getRevision())){ + //if revision of scope is set and it does not match and is not '*' => false + if (this.revision != null && (!this.revision.equals(schema.getRevision()) && !this.revision.equals("*"))) { return false; } //if notification of scope is set and is current notification is not in the list @@ -117,7 +128,7 @@ public class SchemaInfo { @JsonIgnore public void addNotification(String notification) { - if(this.notification ==null) { + if (this.notification == null) { this.notification = new ArrayList<>(); } this.notification.add(notification); diff --git a/sdnr/wt/websocketmanager/provider/pom.xml b/sdnr/wt/websocketmanager/provider/pom.xml index 0366ed2a4..fcdaa5e2f 100644 --- a/sdnr/wt/websocketmanager/provider/pom.xml +++ b/sdnr/wt/websocketmanager/provider/pom.xml @@ -57,6 +57,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.opendaylight.mdsal.model</groupId> + <artifactId>ietf-topology</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>sdnr-wt-yang-utils</artifactId> <version>${project.version}</version> @@ -136,7 +141,7 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> - <artifactId>sdnr-wt-devicemanager-provider</artifactId> + <artifactId>sdnr-wt-devicemanager-core-provider</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java index 0b6e9b453..610001775 100644 --- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java @@ -24,6 +24,7 @@ import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationO import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.QName; import org.osgi.service.http.HttpService; @@ -84,24 +85,44 @@ public class WebSocketManagerProvider implements WebsocketManagerService, AutoCl @Override - public void sendNotification(Notification notification, String nodeId, QName eventType) { + public void sendNotification(Notification notification, NodeId nodeId, QName eventType) { + if(!assertNotificationType(notification, eventType)){ + return; + } this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now())); } + public static boolean assertNotificationType(Notification notification, QName eventType) { + final String yangTypeName = eventType.getLocalName(); + final Class<?> cls = notification.getClass(); + final String clsNameToTest = YangToolsMapperHelper.toCamelCaseClassName(yangTypeName); + if(cls.getSimpleName().equals(clsNameToTest)) { + return true; + } + Class<?>[] ifs = cls.getInterfaces(); + for(Class<?> clsif:ifs) { + if(clsif.getSimpleName().equals(clsNameToTest)) { + return true; + } + } + return false; + } + + @Override - public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) { - WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime)); + public void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) { + WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId.getValue(), eventType, eventTime)); } @Override - public void sendNotification(DOMNotification notification, String nodeId, QName eventType) { + public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType) { LOG.warn("not yet implemented"); } @Override - public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) { + public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) { LOG.warn("not yet implemented"); } diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java index 945de3c1f..a642bda69 100644 --- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java @@ -25,6 +25,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.eclipse.jetty.websocket.api.Session; @@ -41,7 +45,7 @@ import org.slf4j.LoggerFactory; public class WebSocketManagerSocket extends WebSocketAdapter { - private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class); public static final String MSG_KEY_DATA = "data"; public static final DataType MSG_KEY_SCOPES = DataType.scopes; public static final String MSG_KEY_PARAM = "param"; @@ -54,7 +58,47 @@ public class WebSocketManagerSocket extends WebSocketAdapter { private static final Pattern PATTERN_SCOPEREGISTRATION = Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE); private static final Random RND = new Random(); + private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500; + private static final int QUEUE_SIZE = 100; + private final Thread sendingSyncThread; + private final ArrayBlockingQueue<String> messageQueue; + private boolean closed; + + private final Runnable sendingRunner = new Runnable() { + @Override + public void run() { + LOG.debug("isrunning"); + while (!closed) { + try { + + String message = messageQueue.poll(); + if (message != null) { + WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message) + .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + LOG.info("message sent"); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("problem pushing message: ", e); + } + + if (messageQueue.isEmpty()) { + trySleep(1000); + } + + } + LOG.debug("isstopped"); + + }; + }; + + private static void trySleep(int sleepMs) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } /** * list of all sessionids @@ -80,6 +124,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter { public WebSocketManagerSocket() { this.myUniqueSessionId = _genSessionId(); + this.sendingSyncThread = new Thread(this.sendingRunner); + this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); } @Override @@ -112,6 +158,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketConnect(Session sess) { this.session = sess; + closed = false; + this.sendingSyncThread.start(); clientList.put(String.valueOf(this.hashCode()), this); LOG.debug("client connected from " + this.getRemoteAdr()); } @@ -119,13 +167,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketClose(int statusCode, String reason) { clientList.remove(String.valueOf(this.hashCode())); + this.sendingSyncThread.interrupt(); + closed = true; LOG.debug("client disconnected from " + this.getRemoteAdr()); } @Override public void onWebSocketError(Throwable cause) { LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage()); - // super.onWebSocketError(cause); } private String getRemoteAdr() { @@ -146,12 +195,12 @@ public class WebSocketManagerSocket extends WebSocketAdapter { private boolean manageClientRequest(String request) { boolean ret = false; final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request); - if(!matcher.find()) { + if (!matcher.find()) { return false; } try { ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class); - if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) { + if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) { ret = true; String sessionId = this.getSessionId(); UserScopes clientDto = new UserScopes(); @@ -188,9 +237,9 @@ public class WebSocketManagerSocket extends WebSocketAdapter { public void send(String msg) { try { LOG.trace("sending {}", msg); - this.session.getRemote().sendString(msg); - } catch (Exception e) { - LOG.warn("problem sending message: " + e.getMessage()); + this.messageQueue.put(msg); + } catch (InterruptedException e) { + LOG.warn("problem putting message into sending queue: " + e.getMessage()); } } @@ -200,7 +249,7 @@ public class WebSocketManagerSocket extends WebSocketAdapter { private void sendToAll(NotificationOutput output) { try { - this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output)); + sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output)); } catch (JsonProcessingException e) { LOG.warn("problem serializing noitifcation: ", e); } diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java deleted file mode 100644 index 5f3a5af2c..000000000 --- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : ccsdk features - * ================================================================================ - * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. - * All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - */ -package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; - -import java.time.Duration; -import java.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Problems of to many notifications during mount of thousand of devices: - * <ul> - * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate - * <li>Notification processing blocks user -> App design with notifications popups - * </ul> - * Rate filter - * <ul> - * <li>Do not use a thread -> Do nothing if there are no notifications - * <li>Parameter1 integrationTime : Measurement or integration time for period - * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload - * <li>Start measurement on event received that comes later then - * </ul> - * - * <pre> - * Example (e: Event received, rateMaxCount=3) - * eee e e e e e e e e e e e e e e - * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| - * P1 P2 P1 P2 P3 P7 P1 - *Overload no no yes yes no no - * - * - *Intention to use: - * 1. Construct with parameters for WS stream to handle - * 2. - * </pre> - */ - -public class RateFilter { - - private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName()); - - private final Duration integrationTime; // Integration time to measure event rate - private final long rateMaxCount; //Rate for dropping packets - private Instant timeStampPeriodStart; //Time stamp period beginn - private Instant timeStampLastEvent; //Measurement interval - private long rateCount; // >0: integration running 0: no integration running - private boolean overload; //true means in overload status. Change at end of period only. - private GetNow get; - - /** - * Allow testing with own timestamp provider - */ - public interface GetNow { - Instant now(); - } - - public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) { - this.integrationTime = integrationTime; - this.rateMaxCount = rateMaxCount; - this.get = getNowMethod; - this.timeStampLastEvent = Instant.MIN; - } - - public RateFilter(Duration integrationTime, long rateMaxCount) { - this(integrationTime, rateMaxCount, () -> Instant.now()); - } - - public synchronized boolean getOverloadStatus() { - return overload; - } - - /** - * Handle filter on event received - */ - public synchronized void filterEvent() { - final Instant now = get.now(); - final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now); - this.timeStampLastEvent = now; - - if (durationSinceLastEvent.compareTo(integrationTime) >= 0) { - //No measurement. Sync and start with period - LOG.debug("Sync"); - timeStampPeriodStart = now; - rateCount = 1; //Reset event count .. is part of the - } else { - //Within period - Duration durationPeriod = Duration.between(timeStampPeriodStart, now); - rateCount++; - boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0; - LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime); - if (endOfPeriod) { - //Only if end of Period - overload = rateCount > rateMaxCount; - LOG.debug("Reset overload {}", overload); - timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime); - rateCount = 0; - } - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("RateFilter [integrationTime="); - builder.append(integrationTime); - builder.append(", rateMaxCount="); - builder.append(rateMaxCount); - builder.append(", timeStampPeriodStart="); - builder.append(timeStampPeriodStart); - builder.append(", timeStampLastEvent="); - builder.append(timeStampLastEvent); - builder.append(", rateCount="); - builder.append(rateCount); - builder.append(", overload="); - builder.append(overload); - builder.append("]"); - return builder.toString(); - } -} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java new file mode 100644 index 000000000..7ffa29e89 --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java @@ -0,0 +1,323 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Problems of to many notifications during mount of thousand of devices: + * <ul> + * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate + * <li>Notification processing blocks user -> App design with notifications popups + * </ul> + * Rate filter requirements + * <ul> + * <li>Use a single thread + * <li>Parameter1 integrationTime : Measurement or integration time for period + * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload + * <li>Start measurement on event received that comes later then + * </ul> + * + * <pre> + * Example for behavior (e: Event received, rateMaxCount=3) + * eee e e e e e e e e e e e e e e + * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| + * P1 P2 P1 P2 P3 P7 P1 + *Overload no no yes yes no no + * </pre> + * + * Interface to use: + * <ul> + * <li>construct RateFilterManager. Parameters are integration time and function to get the actual time + * <li>RateFilterManager.getRateFilter() provides rateFilter object for a stream to count events and provide overload + * status. + * <li>rateFilter.event() count the events during measurement period + * <li>rateFilter.getOverloadStatus() indicates status + * <li>rateFilter.close() to release this object + * </ul> + */ + +public class RateFilterManager implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(RateFilterManager.class.getName()); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("dd.mm.yy hh:mm:ss_SSS") + .withLocale(Locale.GERMAN).withZone(ZoneId.systemDefault()); + private static final long CLIENTS_NUMBER_WARNLEVEL = 1000; + + //Configuration + private final Duration integrationTime; // Integration time to measure event rate + private GetNow get; //Provides actual system time + private final Map<Long, RateFilter> rateFilterList; + @SuppressWarnings("unused") + private final Timer timerTask; + + /** + * Allow testing with own timestamp provider Provide actual system time. + */ + public interface GetNow { + Instant now(); + } + + /** + * Constructor with all parameters, intended to be used for unit test + * + * @param integrationTime is the interval length for counting events. + * @param rateMaxCountDefault if event count exceed this limit, status changes to overload. + * @param startTimer true start time with intervall time + * @param get function to provide actual system time. + */ + public RateFilterManager(Duration integrationTime, boolean startTimer, GetNow get) { + this.integrationTime = integrationTime; + this.get = get; + + this.rateFilterList = Collections.synchronizedMap(new HashMap<Long, RateFilter>()); + this.timerTask = startTimer ? startTimerTask(integrationTime) : null; + } + + /** + * Get RateFilter manager + * + * @param integrationTime is the time to measure events + * @param rateMaxCountDefault if exceeded state overload is true + */ + public RateFilterManager(Duration integrationTime) { + this(integrationTime, true, () -> Instant.now()); + } + + /** + */ + /** + * Get a specific rate filter for one stream. Use close() to release. + * + * @param ratePerMinute Rate per Minute for this filter. If 0 never overloaded. + * @return RateFilter object for each event stream. + * @throws IllegalArgumentException on negative rate + */ + public synchronized RateFilter getRateFilter(long maxRatePerMinute) throws IllegalArgumentException { + long maxEventsPerIntegration = convertRPMToMaxCount(maxRatePerMinute); + if (maxEventsPerIntegration < 0) + throw new IllegalArgumentException( + "Resulting in illegal maxEventsPerIntegration=" + maxEventsPerIntegration); + return getRateFilterInstance(maxEventsPerIntegration); + } + + @Override + public void close() { + if (timerTask != null) { + timerTask.cancel(); + timerTask.purge(); + } + rateFilterList.clear(); + } + + /** + * Function to get a new Ratefilter for a connection + * + * @param maxEventsPerIntegration + * @return reference to object with filter status + */ + private RateFilter getRateFilterInstance(long maxEventsPerIntegration) { + RateFilter rateFilter; + synchronized (rateFilterList) { + rateFilter = rateFilterList.get(maxEventsPerIntegration); + if (rateFilter == null) { + rateFilter = new RateFilter(maxEventsPerIntegration); + synchronized (rateFilterList) { + rateFilterList.put(maxEventsPerIntegration, rateFilter); + } + } else { + if (rateFilter.addClient() > CLIENTS_NUMBER_WARNLEVEL) + LOG.warn("Warnlevel {} exceeded for client connections", CLIENTS_NUMBER_WARNLEVEL); + } + } + return rateFilter; + } + + private Timer startTimerTask(Duration integrationTime) { + long milliseconds = integrationTime.toMillis(); + LOG.debug("Start startTimerTask with {} ms", milliseconds); + Timer time = new Timer(); + time.scheduleAtFixedRate(new TimeoutHandler(), 0L, milliseconds); + return time; + } + + private class TimeoutHandler extends TimerTask { + @Override + public void run() { + LOG.debug("Run timeout task at {}", f(get.now())); + synchronized (rateFilterList) { + rateFilterList.forEach((k, f) -> f.timer()); + } + } + } + + /** + * Provide nice debug output for Instant and Duration + * + * @param i with instant + * @return output string + */ + private static String f(Instant i) { + return i != null ? FORMATTER.format(i) : "null"; + } + + /** + * Convert a rate per minute into events per integration time. + * + * @param ratePerMinute + * @return events per integration time. + */ + private long convertRPMToMaxCount(long ratePerMinute) { + return ratePerMinute * integrationTime.toSeconds() / TimeUnit.MINUTES.toSeconds(1); + } + + /** + * Ratefilter class contains status informaton for one event stream. + */ + public class RateFilter implements Closeable { + private final long maxEventsPerIntegration; //uuid and maximum of events without overload + private Long clients; // Number of clients for this filter. + private long rateCount; // number of events during integration period + private boolean overload; //true means in overload status. Change at end of period only. + + /** + * Create a new Filter + * + * @param maxEventsPerIntegration >= 1 characteristics and uuid of this filter. < 1 switched off + * @see {@link #close} + */ + private RateFilter(long maxEventsPerIntegration) { + synchronized (this) { + this.clients = 1L; + this.maxEventsPerIntegration = maxEventsPerIntegration; + this.rateCount = 0; + } + } + + /** + * Add a client to this filter + * + * @return number of clients, handled by this filter + * @see {@link #close} + */ + private synchronized long addClient() { + if (clients >= 1) { + ++clients; + } else { + LOG.warn("Misalligned open/close for {} with number {}", maxEventsPerIntegration, clients); + } + return clients; + } + + /** + * Provide actual overload status + * + * @return status true means overloaded false not overloaded + */ + public synchronized boolean getOverloadStatus() { + return overload; + } + + /** + * Handle filter on event received + */ + public synchronized void event() { + rateCount++; + LOG.debug("event rc:{}", rateCount); + } + + /** + * Called if measurement period ends. Device if overload and reset counter. + */ + public synchronized void timer() { + //Change overload only at end of period + //Always inactive if maxEventsPerIntegration== 0 + if (maxEventsPerIntegration > 0) { + overload = rateCount > maxEventsPerIntegration; + } + rateCount = 0; + LOG.debug("Timer ol:{} rc:{}", overload, rateCount); + } + + /** + * Get maximum events allowed per integration period + * + * @return 1 ... + */ + public synchronized long getMaxEventsPerIntegration() { + return maxEventsPerIntegration; + } + + /** + * Get number of client streams. + * + * @return 1 ... + */ + public synchronized long getClients() { + return clients; + } + + @Override + public void close() { + synchronized (rateFilterList) { + if (clients == 1) { + LOG.debug("Close and remove last client {}", maxEventsPerIntegration); + rateFilterList.remove(this.maxEventsPerIntegration); + clients--; + } else if (clients > 1) { + LOG.debug("Close one client of {} for events {}", clients, maxEventsPerIntegration); + clients--; + } else { + LOG.warn("Misaligned new/close for events {}", maxEventsPerIntegration); + } + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("RateFilter [maxEventsPerIntegration="); + builder.append(maxEventsPerIntegration); + builder.append(", clients="); + builder.append(clients); + builder.append(", rateCount="); + builder.append(rateCount); + builder.append(", overload="); + builder.append(overload); + builder.append("]"); + return builder.toString(); + } + } +} diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java index f3cf09545..df04c388f 100644 --- a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java +++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java @@ -20,13 +20,11 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileReader; import java.io.IOException; import java.net.URISyntaxException; -import java.nio.file.Paths; +import java.nio.file.Files; import org.junit.Test; import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig; @@ -58,19 +56,6 @@ public class AkkaConfigTest { public static String loadResourceContentAsString(String resourceName) throws URISyntaxException, FileNotFoundException, IOException { - StringBuilder sb = new StringBuilder(); - - ClassLoader classLoader = AkkaConfigTest.class.getClassLoader(); - File file = Paths.get(classLoader.getResource(resourceName).toURI()).toFile(); - try (BufferedReader br = new BufferedReader(new FileReader(file))) { - String line = br.readLine(); - - while (line != null) { - sb.append(line); - sb.append(System.lineSeparator()); - line = br.readLine(); - } - } - return sb.toString(); + return Files.readString(new File("src/test/resources/"+resourceName).toPath()); } } diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java index f4fab6810..d5a940f73 100644 --- a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java +++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java @@ -21,10 +21,15 @@ */ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import java.time.Duration; import java.time.Instant; +import java.util.Timer; +import java.util.TimerTask; import org.junit.Test; -import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilter; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilterManager; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilterManager.RateFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +38,13 @@ import org.slf4j.LoggerFactory; * * <pre> * Testcase (e: 17 Event received, rateMaxCount=3) - * eee e e e e e e e e e e e e e e - * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| - * P1:1 P2:1 P1:2 P2:2 P3:2 P4:2 P1:3 - * 1000-1002 2000 3500 10 millis - *Overload no no yes yes no no + * 1 3 4 5 6 7 8 9 10 11 14 15 16 17 18 + * t t t t t t t t t + * eee e e e e e e e e e e e e e e e + * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| + * P1:1 P2:1 P1:2 P2:2 P3:2 P4:2 P1:3 + * ms 500 1000-1002 2000 3500 4500 5500 6500 7500 8500 + *Overload no no yes yes no no * </pre> * */ @@ -45,37 +52,140 @@ public class RateFilterTest { private static final Logger LOG = LoggerFactory.getLogger(RateFilterTest.class.getName()); - private static int MILLIS = 1000; - private static long[] now = { 1000, 1001, 1002, //P1:1 0-2 - 3500, 3550, 3560, 3570, 3580, 3590, 3800, //P1:2 3500 3-9 - 4510, 4520, 4530, 4540, 4900, //P2:2 4500 10-14 - 5700, //P3:2 5500 15 - 7000, 8000};//P1:3 16-17 + private static int INTEGRATIONTIMEMILLIS = 1000; + private static long EVENTS_PER_INTERVALL = 4; + private static long RATE_PER_MINUTE = EVENTS_PER_INTERVALL * 60; + /* Negative event time indicates timer event */ + private static long[] now = {-500, 1000, 1010, 1020, //P1:1 1-3 + -1500, -2500, -3500, 3500, 3550, 3560, 3570, 3580, 3590, 3800, //P1:2 3500 4-10 + -4500, 4510, 4520, 4530, 4540, 4900, //P2:2 4500 11-15 + -5500, 5700, //P3:2 5500 16 + -6500, -7500, 7500, 8000};//P1:3 17-18 + private static boolean[] overload = {false, false, false, false, //P1:1 1-3 + false, false, false, false, false, false, false, false, false, false, //P1:2 3500 4-10 + true, true, true, true, true, true, //P2:2 4500 11-15 + true, true, //P3:2 5500 16 + false, false, false, false};//P1:3 17-18 + private static int idx; + private static long millis; @Test - public void test() { - RateFilter rateFilter = new RateFilter(Duration.ofMillis(MILLIS), 4, () -> getNow()); + public void testStates() { + reset(); + RateFilterManager rateFilterManager = + new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS), false, () -> getNow()); + RateFilter rateFilter = rateFilterManager.getRateFilter(RATE_PER_MINUTE); LOG.info("Init done"); + assertEquals("Events per integration period", EVENTS_PER_INTERVALL, rateFilter.getMaxEventsPerIntegration()); - for (int t=0; t < 20; t++) { - LOG.info("{}", t); - rateFilter.filterEvent(); - LOG.info("{}", rateFilter.getOverloadStatus()); + for (int t = 1; t < 30; t++) { + boolean expected = tick(); + if (millis < 0) { + LOG.info("{} - timer {}", t, millis); + rateFilter.timer(); + } else { + LOG.info("{} - event {}", t, millis); + rateFilter.event(); + } + LOG.info("Overload={} {}", rateFilter.getOverloadStatus(), expected); + assertEquals("Filter activity", expected, rateFilter.getOverloadStatus()); } + rateFilter.close(); + } + + @Test + public void testThread() throws InterruptedException { + LOG.info("testThread"); + reset(); + RateFilterManager rateFilterManager = new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS)); + RateFilter rateFilter = rateFilterManager.getRateFilter(RATE_PER_MINUTE); + + tick(); + Thread.sleep(2000); + + Object objectYouNeedToLockOn = new Object(); + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + long localMillis; + + @Override + public void run() { + long xLocalMillis = localMillis += 10; + long xMillis = Math.abs(millis); + if (xLocalMillis >= xMillis) { + LOG.info("aTime:{} Millis:{} Idx={}", xLocalMillis, xMillis, idx); + boolean expected = tick(); + if (millis > 0) { + //Skip negatives .. handled by timer + rateFilter.event(); + boolean actual = rateFilter.getOverloadStatus(); + LOG.info("bTime:{} Millis:{} Idx={} Overload={} Expected={} {}", xLocalMillis, xMillis, idx, + actual, expected, actual == expected ? "" : "XXXX"); + if (idx >= 30) { + LOG.info("Test is ending"); + synchronized (objectYouNeedToLockOn) { + objectYouNeedToLockOn.notify(); + } + timer.cancel(); + } + assertEquals("Filter activity", expected, rateFilter.getOverloadStatus()); + } + } + } + }, 0, 10); + synchronized (objectYouNeedToLockOn) { + objectYouNeedToLockOn.wait(); + } + //rateFilter.close(); + LOG.info("Test end"); + } + + @Test + public void testMultipleClients() { + RateFilterManager rateFilterManager = new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS)); + RateFilter rateFilter1 = rateFilterManager.getRateFilter(RATE_PER_MINUTE); + assertEquals("Multiple clients", 1, rateFilter1.getClients()); + RateFilter rateFilter2 = rateFilterManager.getRateFilter(RATE_PER_MINUTE); + assertEquals("Multiple clients", 2, rateFilter1.getClients()); + RateFilter rateFilter3 = rateFilterManager.getRateFilter(RATE_PER_MINUTE); + assertEquals("Multiple clients", 3, rateFilter1.getClients()); + + assertEquals("Similar instances", rateFilter1, rateFilter3); + + RateFilter rateFilterOther = rateFilterManager.getRateFilter(2*RATE_PER_MINUTE); + assertNotEquals("Different instances", rateFilter1, rateFilterOther); + rateFilterOther.close(); + + rateFilter3.close(); + assertEquals("Multiple clients", 2, rateFilter1.getClients()); + rateFilter2.close(); + assertEquals("Multiple clients", 1, rateFilter1.getClients()); + rateFilter1.close(); + assertEquals("Multiple clients", 0, rateFilter1.getClients()); + + rateFilterManager.close(); + } + + private Instant getNow() { + LOG.debug("Now:{}", millis); + return Instant.ofEpochMilli(Math.abs(millis)); + } + private void reset() { + idx = 0; } - Instant getNow() { - long res; + private boolean tick() { if (idx < now.length) { - res = now[idx]; + millis = now[idx]; } else { int lastIdx = now.length - 1; - res = now[lastIdx] + (idx - lastIdx) * MILLIS; + millis = now[lastIdx] + (idx - lastIdx) * INTEGRATIONTIMEMILLIS; } + boolean expected = idx < overload.length ? overload[idx] : false; idx++; - return Instant.ofEpochMilli(res); + return expected; } } diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java new file mode 100644 index 000000000..885ded348 --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.SchemaInfo; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.Scope; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectCreationNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification; +import org.opendaylight.yangtools.yang.common.QName; + +public class UserScopeTest { + + + @Test + public void testAllNodes() { + UserScopes scopes1 = new UserScopes(); + scopes1.setScopes(Arrays.asList(buildScope(null, ProblemNotification.QNAME))); + + assertTrue(scopes1.hasScope(new ReducedSchemaInfo(ProblemNotification.QNAME))); + assertFalse(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ObjectCreationNotification.QNAME))); + + assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ProblemNotification.QNAME))); + + } + + @Test + public void testRevisionStar() { + UserScopes scopes1 = new UserScopes(); + scopes1.setScopes( + Arrays.asList(buildScope(null, ProblemNotification.QNAME.getNamespace().toString(), "*", null))); + + assertTrue(scopes1.hasScope(new ReducedSchemaInfo(ProblemNotification.QNAME))); + assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ObjectCreationNotification.QNAME))); + + assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ProblemNotification.QNAME))); + + } + + private static final Scope buildScope(String nodeId, String namespace, String revision, + List<String> notifications) { + Scope scope = new Scope(); + scope.setNodeId(nodeId); + scope.setSchema(new SchemaInfo(namespace, revision, notifications)); + return scope; + } + + private static final Scope buildScope(String nodeId, QName qname) { + Scope scope = new Scope(); + scope.setNodeId(nodeId); + scope.setSchema(new SchemaInfo(qname)); + return scope; + } + +} diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java index bc3cd10f8..2e6462462 100644 --- a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java +++ b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java @@ -17,11 +17,17 @@ */ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.junit.Test; import org.mockito.Mockito; import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerProvider; import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectCreationNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotificationBuilder; +import org.opendaylight.yangtools.yang.binding.Notification; import org.osgi.service.http.HttpService; public class WebsockerProviderTest extends Mockito { @@ -42,4 +48,14 @@ public class WebsockerProviderTest extends Mockito { } + @Test + public void testTypeAssertion() { + + Notification problemNotification = new ProblemNotificationBuilder().build(); + assertTrue(WebSocketManagerProvider.assertNotificationType(problemNotification, ProblemNotification.QNAME)); + assertFalse( + WebSocketManagerProvider.assertNotificationType(problemNotification, ObjectCreationNotification.QNAME)); + + } + } |