aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/websocketmanager/provider/src/main
diff options
context:
space:
mode:
authorRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2021-05-25 18:57:29 +0530
committerRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2021-06-14 10:22:54 +0530
commit17614362f2550c29dcd746ee2c1bc01d0df5de65 (patch)
tree97930a14a08c610efceb4aebb4f457e0cf42b2f8 /sdnr/wt/websocketmanager/provider/src/main
parentdb9f267b3930a28054e967c75db228e27663aedc (diff)
Improve Websocket notification interface
Improve websocket notification interface Issue-ID: CCSDK-3315 Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> Change-Id: I0ded865adddb546ade98df4760e0a32ec964295a Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/websocketmanager/provider/src/main')
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java31
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java65
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java138
-rw-r--r--sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java323
4 files changed, 406 insertions, 151 deletions
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
index 0b6e9b453..610001775 100644
--- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
@@ -24,6 +24,7 @@ import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationO
import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.common.QName;
import org.osgi.service.http.HttpService;
@@ -84,24 +85,44 @@ public class WebSocketManagerProvider implements WebsocketManagerService, AutoCl
@Override
- public void sendNotification(Notification notification, String nodeId, QName eventType) {
+ public void sendNotification(Notification notification, NodeId nodeId, QName eventType) {
+ if(!assertNotificationType(notification, eventType)){
+ return;
+ }
this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now()));
}
+ public static boolean assertNotificationType(Notification notification, QName eventType) {
+ final String yangTypeName = eventType.getLocalName();
+ final Class<?> cls = notification.getClass();
+ final String clsNameToTest = YangToolsMapperHelper.toCamelCaseClassName(yangTypeName);
+ if(cls.getSimpleName().equals(clsNameToTest)) {
+ return true;
+ }
+ Class<?>[] ifs = cls.getInterfaces();
+ for(Class<?> clsif:ifs) {
+ if(clsif.getSimpleName().equals(clsNameToTest)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
@Override
- public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) {
- WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime));
+ public void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) {
+ WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId.getValue(), eventType, eventTime));
}
@Override
- public void sendNotification(DOMNotification notification, String nodeId, QName eventType) {
+ public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType) {
LOG.warn("not yet implemented");
}
@Override
- public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) {
+ public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) {
LOG.warn("not yet implemented");
}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
index 945de3c1f..a642bda69 100644
--- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
@@ -25,6 +25,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.websocket.api.Session;
@@ -41,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class WebSocketManagerSocket extends WebSocketAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
public static final String MSG_KEY_DATA = "data";
public static final DataType MSG_KEY_SCOPES = DataType.scopes;
public static final String MSG_KEY_PARAM = "param";
@@ -54,7 +58,47 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
private static final Pattern PATTERN_SCOPEREGISTRATION =
Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
private static final Random RND = new Random();
+ private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
+ private static final int QUEUE_SIZE = 100;
+ private final Thread sendingSyncThread;
+ private final ArrayBlockingQueue<String> messageQueue;
+ private boolean closed;
+
+ private final Runnable sendingRunner = new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("isrunning");
+ while (!closed) {
+ try {
+
+ String message = messageQueue.poll();
+ if (message != null) {
+ WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
+ .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ LOG.info("message sent");
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("problem pushing message: ", e);
+ }
+
+ if (messageQueue.isEmpty()) {
+ trySleep(1000);
+ }
+
+ }
+ LOG.debug("isstopped");
+
+ };
+ };
+
+ private static void trySleep(int sleepMs) {
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
/**
* list of all sessionids
@@ -80,6 +124,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
public WebSocketManagerSocket() {
this.myUniqueSessionId = _genSessionId();
+ this.sendingSyncThread = new Thread(this.sendingRunner);
+ this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
}
@Override
@@ -112,6 +158,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
@Override
public void onWebSocketConnect(Session sess) {
this.session = sess;
+ closed = false;
+ this.sendingSyncThread.start();
clientList.put(String.valueOf(this.hashCode()), this);
LOG.debug("client connected from " + this.getRemoteAdr());
}
@@ -119,13 +167,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
@Override
public void onWebSocketClose(int statusCode, String reason) {
clientList.remove(String.valueOf(this.hashCode()));
+ this.sendingSyncThread.interrupt();
+ closed = true;
LOG.debug("client disconnected from " + this.getRemoteAdr());
}
@Override
public void onWebSocketError(Throwable cause) {
LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
- // super.onWebSocketError(cause);
}
private String getRemoteAdr() {
@@ -146,12 +195,12 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
private boolean manageClientRequest(String request) {
boolean ret = false;
final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
- if(!matcher.find()) {
+ if (!matcher.find()) {
return false;
}
try {
ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
- if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
+ if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
ret = true;
String sessionId = this.getSessionId();
UserScopes clientDto = new UserScopes();
@@ -188,9 +237,9 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
public void send(String msg) {
try {
LOG.trace("sending {}", msg);
- this.session.getRemote().sendString(msg);
- } catch (Exception e) {
- LOG.warn("problem sending message: " + e.getMessage());
+ this.messageQueue.put(msg);
+ } catch (InterruptedException e) {
+ LOG.warn("problem putting message into sending queue: " + e.getMessage());
}
}
@@ -200,7 +249,7 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
private void sendToAll(NotificationOutput output) {
try {
- this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
+ sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
} catch (JsonProcessingException e) {
LOG.warn("problem serializing noitifcation: ", e);
}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
deleted file mode 100644
index 5f3a5af2c..000000000
--- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : ccsdk features
- * ================================================================================
- * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
- * All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- */
-package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
-
-import java.time.Duration;
-import java.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Problems of to many notifications during mount of thousand of devices:
- * <ul>
- * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
- * <li>Notification processing blocks user -> App design with notifications popups
- * </ul>
- * Rate filter
- * <ul>
- * <li>Do not use a thread -> Do nothing if there are no notifications
- * <li>Parameter1 integrationTime : Measurement or integration time for period
- * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
- * <li>Start measurement on event received that comes later then
- * </ul>
- *
- * <pre>
- * Example (e: Event received, rateMaxCount=3)
- * eee e e e e e e e e e e e e e e
- * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
- * P1 P2 P1 P2 P3 P7 P1
- *Overload no no yes yes no no
- *
- *
- *Intention to use:
- * 1. Construct with parameters for WS stream to handle
- * 2.
- * </pre>
- */
-
-public class RateFilter {
-
- private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName());
-
- private final Duration integrationTime; // Integration time to measure event rate
- private final long rateMaxCount; //Rate for dropping packets
- private Instant timeStampPeriodStart; //Time stamp period beginn
- private Instant timeStampLastEvent; //Measurement interval
- private long rateCount; // >0: integration running 0: no integration running
- private boolean overload; //true means in overload status. Change at end of period only.
- private GetNow get;
-
- /**
- * Allow testing with own timestamp provider
- */
- public interface GetNow {
- Instant now();
- }
-
- public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) {
- this.integrationTime = integrationTime;
- this.rateMaxCount = rateMaxCount;
- this.get = getNowMethod;
- this.timeStampLastEvent = Instant.MIN;
- }
-
- public RateFilter(Duration integrationTime, long rateMaxCount) {
- this(integrationTime, rateMaxCount, () -> Instant.now());
- }
-
- public synchronized boolean getOverloadStatus() {
- return overload;
- }
-
- /**
- * Handle filter on event received
- */
- public synchronized void filterEvent() {
- final Instant now = get.now();
- final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now);
- this.timeStampLastEvent = now;
-
- if (durationSinceLastEvent.compareTo(integrationTime) >= 0) {
- //No measurement. Sync and start with period
- LOG.debug("Sync");
- timeStampPeriodStart = now;
- rateCount = 1; //Reset event count .. is part of the
- } else {
- //Within period
- Duration durationPeriod = Duration.between(timeStampPeriodStart, now);
- rateCount++;
- boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0;
- LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime);
- if (endOfPeriod) {
- //Only if end of Period
- overload = rateCount > rateMaxCount;
- LOG.debug("Reset overload {}", overload);
- timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime);
- rateCount = 0;
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("RateFilter [integrationTime=");
- builder.append(integrationTime);
- builder.append(", rateMaxCount=");
- builder.append(rateMaxCount);
- builder.append(", timeStampPeriodStart=");
- builder.append(timeStampPeriodStart);
- builder.append(", timeStampLastEvent=");
- builder.append(timeStampLastEvent);
- builder.append(", rateCount=");
- builder.append(rateCount);
- builder.append(", overload=");
- builder.append(overload);
- builder.append("]");
- return builder.toString();
- }
-}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java
new file mode 100644
index 000000000..7ffa29e89
--- /dev/null
+++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java
@@ -0,0 +1,323 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Problems of to many notifications during mount of thousand of devices:
+ * <ul>
+ * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
+ * <li>Notification processing blocks user -> App design with notifications popups
+ * </ul>
+ * Rate filter requirements
+ * <ul>
+ * <li>Use a single thread
+ * <li>Parameter1 integrationTime : Measurement or integration time for period
+ * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
+ * <li>Start measurement on event received that comes later then
+ * </ul>
+ *
+ * <pre>
+ * Example for behavior (e: Event received, rateMaxCount=3)
+ * eee e e e e e e e e e e e e e e
+ * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
+ * P1 P2 P1 P2 P3 P7 P1
+ *Overload no no yes yes no no
+ * </pre>
+ *
+ * Interface to use:
+ * <ul>
+ * <li>construct RateFilterManager. Parameters are integration time and function to get the actual time
+ * <li>RateFilterManager.getRateFilter() provides rateFilter object for a stream to count events and provide overload
+ * status.
+ * <li>rateFilter.event() count the events during measurement period
+ * <li>rateFilter.getOverloadStatus() indicates status
+ * <li>rateFilter.close() to release this object
+ * </ul>
+ */
+
+public class RateFilterManager implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RateFilterManager.class.getName());
+ private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("dd.mm.yy hh:mm:ss_SSS")
+ .withLocale(Locale.GERMAN).withZone(ZoneId.systemDefault());
+ private static final long CLIENTS_NUMBER_WARNLEVEL = 1000;
+
+ //Configuration
+ private final Duration integrationTime; // Integration time to measure event rate
+ private GetNow get; //Provides actual system time
+ private final Map<Long, RateFilter> rateFilterList;
+ @SuppressWarnings("unused")
+ private final Timer timerTask;
+
+ /**
+ * Allow testing with own timestamp provider Provide actual system time.
+ */
+ public interface GetNow {
+ Instant now();
+ }
+
+ /**
+ * Constructor with all parameters, intended to be used for unit test
+ *
+ * @param integrationTime is the interval length for counting events.
+ * @param rateMaxCountDefault if event count exceed this limit, status changes to overload.
+ * @param startTimer true start time with intervall time
+ * @param get function to provide actual system time.
+ */
+ public RateFilterManager(Duration integrationTime, boolean startTimer, GetNow get) {
+ this.integrationTime = integrationTime;
+ this.get = get;
+
+ this.rateFilterList = Collections.synchronizedMap(new HashMap<Long, RateFilter>());
+ this.timerTask = startTimer ? startTimerTask(integrationTime) : null;
+ }
+
+ /**
+ * Get RateFilter manager
+ *
+ * @param integrationTime is the time to measure events
+ * @param rateMaxCountDefault if exceeded state overload is true
+ */
+ public RateFilterManager(Duration integrationTime) {
+ this(integrationTime, true, () -> Instant.now());
+ }
+
+ /**
+ */
+ /**
+ * Get a specific rate filter for one stream. Use close() to release.
+ *
+ * @param ratePerMinute Rate per Minute for this filter. If 0 never overloaded.
+ * @return RateFilter object for each event stream.
+ * @throws IllegalArgumentException on negative rate
+ */
+ public synchronized RateFilter getRateFilter(long maxRatePerMinute) throws IllegalArgumentException {
+ long maxEventsPerIntegration = convertRPMToMaxCount(maxRatePerMinute);
+ if (maxEventsPerIntegration < 0)
+ throw new IllegalArgumentException(
+ "Resulting in illegal maxEventsPerIntegration=" + maxEventsPerIntegration);
+ return getRateFilterInstance(maxEventsPerIntegration);
+ }
+
+ @Override
+ public void close() {
+ if (timerTask != null) {
+ timerTask.cancel();
+ timerTask.purge();
+ }
+ rateFilterList.clear();
+ }
+
+ /**
+ * Function to get a new Ratefilter for a connection
+ *
+ * @param maxEventsPerIntegration
+ * @return reference to object with filter status
+ */
+ private RateFilter getRateFilterInstance(long maxEventsPerIntegration) {
+ RateFilter rateFilter;
+ synchronized (rateFilterList) {
+ rateFilter = rateFilterList.get(maxEventsPerIntegration);
+ if (rateFilter == null) {
+ rateFilter = new RateFilter(maxEventsPerIntegration);
+ synchronized (rateFilterList) {
+ rateFilterList.put(maxEventsPerIntegration, rateFilter);
+ }
+ } else {
+ if (rateFilter.addClient() > CLIENTS_NUMBER_WARNLEVEL)
+ LOG.warn("Warnlevel {} exceeded for client connections", CLIENTS_NUMBER_WARNLEVEL);
+ }
+ }
+ return rateFilter;
+ }
+
+ private Timer startTimerTask(Duration integrationTime) {
+ long milliseconds = integrationTime.toMillis();
+ LOG.debug("Start startTimerTask with {} ms", milliseconds);
+ Timer time = new Timer();
+ time.scheduleAtFixedRate(new TimeoutHandler(), 0L, milliseconds);
+ return time;
+ }
+
+ private class TimeoutHandler extends TimerTask {
+ @Override
+ public void run() {
+ LOG.debug("Run timeout task at {}", f(get.now()));
+ synchronized (rateFilterList) {
+ rateFilterList.forEach((k, f) -> f.timer());
+ }
+ }
+ }
+
+ /**
+ * Provide nice debug output for Instant and Duration
+ *
+ * @param i with instant
+ * @return output string
+ */
+ private static String f(Instant i) {
+ return i != null ? FORMATTER.format(i) : "null";
+ }
+
+ /**
+ * Convert a rate per minute into events per integration time.
+ *
+ * @param ratePerMinute
+ * @return events per integration time.
+ */
+ private long convertRPMToMaxCount(long ratePerMinute) {
+ return ratePerMinute * integrationTime.toSeconds() / TimeUnit.MINUTES.toSeconds(1);
+ }
+
+ /**
+ * Ratefilter class contains status informaton for one event stream.
+ */
+ public class RateFilter implements Closeable {
+ private final long maxEventsPerIntegration; //uuid and maximum of events without overload
+ private Long clients; // Number of clients for this filter.
+ private long rateCount; // number of events during integration period
+ private boolean overload; //true means in overload status. Change at end of period only.
+
+ /**
+ * Create a new Filter
+ *
+ * @param maxEventsPerIntegration >= 1 characteristics and uuid of this filter. < 1 switched off
+ * @see {@link #close}
+ */
+ private RateFilter(long maxEventsPerIntegration) {
+ synchronized (this) {
+ this.clients = 1L;
+ this.maxEventsPerIntegration = maxEventsPerIntegration;
+ this.rateCount = 0;
+ }
+ }
+
+ /**
+ * Add a client to this filter
+ *
+ * @return number of clients, handled by this filter
+ * @see {@link #close}
+ */
+ private synchronized long addClient() {
+ if (clients >= 1) {
+ ++clients;
+ } else {
+ LOG.warn("Misalligned open/close for {} with number {}", maxEventsPerIntegration, clients);
+ }
+ return clients;
+ }
+
+ /**
+ * Provide actual overload status
+ *
+ * @return status true means overloaded false not overloaded
+ */
+ public synchronized boolean getOverloadStatus() {
+ return overload;
+ }
+
+ /**
+ * Handle filter on event received
+ */
+ public synchronized void event() {
+ rateCount++;
+ LOG.debug("event rc:{}", rateCount);
+ }
+
+ /**
+ * Called if measurement period ends. Device if overload and reset counter.
+ */
+ public synchronized void timer() {
+ //Change overload only at end of period
+ //Always inactive if maxEventsPerIntegration== 0
+ if (maxEventsPerIntegration > 0) {
+ overload = rateCount > maxEventsPerIntegration;
+ }
+ rateCount = 0;
+ LOG.debug("Timer ol:{} rc:{}", overload, rateCount);
+ }
+
+ /**
+ * Get maximum events allowed per integration period
+ *
+ * @return 1 ...
+ */
+ public synchronized long getMaxEventsPerIntegration() {
+ return maxEventsPerIntegration;
+ }
+
+ /**
+ * Get number of client streams.
+ *
+ * @return 1 ...
+ */
+ public synchronized long getClients() {
+ return clients;
+ }
+
+ @Override
+ public void close() {
+ synchronized (rateFilterList) {
+ if (clients == 1) {
+ LOG.debug("Close and remove last client {}", maxEventsPerIntegration);
+ rateFilterList.remove(this.maxEventsPerIntegration);
+ clients--;
+ } else if (clients > 1) {
+ LOG.debug("Close one client of {} for events {}", clients, maxEventsPerIntegration);
+ clients--;
+ } else {
+ LOG.warn("Misaligned new/close for events {}", maxEventsPerIntegration);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RateFilter [maxEventsPerIntegration=");
+ builder.append(maxEventsPerIntegration);
+ builder.append(", clients=");
+ builder.append(clients);
+ builder.append(", rateCount=");
+ builder.append(rateCount);
+ builder.append(", overload=");
+ builder.append(overload);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+}