diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2021-05-25 18:57:29 +0530 |
---|---|---|
committer | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2021-06-14 10:22:54 +0530 |
commit | 17614362f2550c29dcd746ee2c1bc01d0df5de65 (patch) | |
tree | 97930a14a08c610efceb4aebb4f457e0cf42b2f8 /sdnr/wt/websocketmanager/provider/src/main | |
parent | db9f267b3930a28054e967c75db228e27663aedc (diff) |
Improve Websocket notification interface
Improve websocket notification interface
Issue-ID: CCSDK-3315
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: I0ded865adddb546ade98df4760e0a32ec964295a
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/websocketmanager/provider/src/main')
4 files changed, 406 insertions, 151 deletions
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java index 0b6e9b453..610001775 100644 --- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java @@ -24,6 +24,7 @@ import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationO import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.QName; import org.osgi.service.http.HttpService; @@ -84,24 +85,44 @@ public class WebSocketManagerProvider implements WebsocketManagerService, AutoCl @Override - public void sendNotification(Notification notification, String nodeId, QName eventType) { + public void sendNotification(Notification notification, NodeId nodeId, QName eventType) { + if(!assertNotificationType(notification, eventType)){ + return; + } this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now())); } + public static boolean assertNotificationType(Notification notification, QName eventType) { + final String yangTypeName = eventType.getLocalName(); + final Class<?> cls = notification.getClass(); + final String clsNameToTest = YangToolsMapperHelper.toCamelCaseClassName(yangTypeName); + if(cls.getSimpleName().equals(clsNameToTest)) { + return true; + } + Class<?>[] ifs = cls.getInterfaces(); + for(Class<?> clsif:ifs) { + if(clsif.getSimpleName().equals(clsNameToTest)) { + return true; + } + } + return false; + } + + @Override - public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) { - WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime)); + public void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) { + WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId.getValue(), eventType, eventTime)); } @Override - public void sendNotification(DOMNotification notification, String nodeId, QName eventType) { + public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType) { LOG.warn("not yet implemented"); } @Override - public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) { + public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) { LOG.warn("not yet implemented"); } diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java index 945de3c1f..a642bda69 100644 --- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java @@ -25,6 +25,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.eclipse.jetty.websocket.api.Session; @@ -41,7 +45,7 @@ import org.slf4j.LoggerFactory; public class WebSocketManagerSocket extends WebSocketAdapter { - private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class); public static final String MSG_KEY_DATA = "data"; public static final DataType MSG_KEY_SCOPES = DataType.scopes; public static final String MSG_KEY_PARAM = "param"; @@ -54,7 +58,47 @@ public class WebSocketManagerSocket extends WebSocketAdapter { private static final Pattern PATTERN_SCOPEREGISTRATION = Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE); private static final Random RND = new Random(); + private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500; + private static final int QUEUE_SIZE = 100; + private final Thread sendingSyncThread; + private final ArrayBlockingQueue<String> messageQueue; + private boolean closed; + + private final Runnable sendingRunner = new Runnable() { + @Override + public void run() { + LOG.debug("isrunning"); + while (!closed) { + try { + + String message = messageQueue.poll(); + if (message != null) { + WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message) + .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + LOG.info("message sent"); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("problem pushing message: ", e); + } + + if (messageQueue.isEmpty()) { + trySleep(1000); + } + + } + LOG.debug("isstopped"); + + }; + }; + + private static void trySleep(int sleepMs) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } /** * list of all sessionids @@ -80,6 +124,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter { public WebSocketManagerSocket() { this.myUniqueSessionId = _genSessionId(); + this.sendingSyncThread = new Thread(this.sendingRunner); + this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); } @Override @@ -112,6 +158,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketConnect(Session sess) { this.session = sess; + closed = false; + this.sendingSyncThread.start(); clientList.put(String.valueOf(this.hashCode()), this); LOG.debug("client connected from " + this.getRemoteAdr()); } @@ -119,13 +167,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketClose(int statusCode, String reason) { clientList.remove(String.valueOf(this.hashCode())); + this.sendingSyncThread.interrupt(); + closed = true; LOG.debug("client disconnected from " + this.getRemoteAdr()); } @Override public void onWebSocketError(Throwable cause) { LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage()); - // super.onWebSocketError(cause); } private String getRemoteAdr() { @@ -146,12 +195,12 @@ public class WebSocketManagerSocket extends WebSocketAdapter { private boolean manageClientRequest(String request) { boolean ret = false; final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request); - if(!matcher.find()) { + if (!matcher.find()) { return false; } try { ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class); - if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) { + if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) { ret = true; String sessionId = this.getSessionId(); UserScopes clientDto = new UserScopes(); @@ -188,9 +237,9 @@ public class WebSocketManagerSocket extends WebSocketAdapter { public void send(String msg) { try { LOG.trace("sending {}", msg); - this.session.getRemote().sendString(msg); - } catch (Exception e) { - LOG.warn("problem sending message: " + e.getMessage()); + this.messageQueue.put(msg); + } catch (InterruptedException e) { + LOG.warn("problem putting message into sending queue: " + e.getMessage()); } } @@ -200,7 +249,7 @@ public class WebSocketManagerSocket extends WebSocketAdapter { private void sendToAll(NotificationOutput output) { try { - this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output)); + sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output)); } catch (JsonProcessingException e) { LOG.warn("problem serializing noitifcation: ", e); } diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java deleted file mode 100644 index 5f3a5af2c..000000000 --- a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : ccsdk features - * ================================================================================ - * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. - * All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - */ -package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; - -import java.time.Duration; -import java.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Problems of to many notifications during mount of thousand of devices: - * <ul> - * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate - * <li>Notification processing blocks user -> App design with notifications popups - * </ul> - * Rate filter - * <ul> - * <li>Do not use a thread -> Do nothing if there are no notifications - * <li>Parameter1 integrationTime : Measurement or integration time for period - * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload - * <li>Start measurement on event received that comes later then - * </ul> - * - * <pre> - * Example (e: Event received, rateMaxCount=3) - * eee e e e e e e e e e e e e e e - * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| - * P1 P2 P1 P2 P3 P7 P1 - *Overload no no yes yes no no - * - * - *Intention to use: - * 1. Construct with parameters for WS stream to handle - * 2. - * </pre> - */ - -public class RateFilter { - - private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName()); - - private final Duration integrationTime; // Integration time to measure event rate - private final long rateMaxCount; //Rate for dropping packets - private Instant timeStampPeriodStart; //Time stamp period beginn - private Instant timeStampLastEvent; //Measurement interval - private long rateCount; // >0: integration running 0: no integration running - private boolean overload; //true means in overload status. Change at end of period only. - private GetNow get; - - /** - * Allow testing with own timestamp provider - */ - public interface GetNow { - Instant now(); - } - - public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) { - this.integrationTime = integrationTime; - this.rateMaxCount = rateMaxCount; - this.get = getNowMethod; - this.timeStampLastEvent = Instant.MIN; - } - - public RateFilter(Duration integrationTime, long rateMaxCount) { - this(integrationTime, rateMaxCount, () -> Instant.now()); - } - - public synchronized boolean getOverloadStatus() { - return overload; - } - - /** - * Handle filter on event received - */ - public synchronized void filterEvent() { - final Instant now = get.now(); - final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now); - this.timeStampLastEvent = now; - - if (durationSinceLastEvent.compareTo(integrationTime) >= 0) { - //No measurement. Sync and start with period - LOG.debug("Sync"); - timeStampPeriodStart = now; - rateCount = 1; //Reset event count .. is part of the - } else { - //Within period - Duration durationPeriod = Duration.between(timeStampPeriodStart, now); - rateCount++; - boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0; - LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime); - if (endOfPeriod) { - //Only if end of Period - overload = rateCount > rateMaxCount; - LOG.debug("Reset overload {}", overload); - timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime); - rateCount = 0; - } - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("RateFilter [integrationTime="); - builder.append(integrationTime); - builder.append(", rateMaxCount="); - builder.append(rateMaxCount); - builder.append(", timeStampPeriodStart="); - builder.append(timeStampPeriodStart); - builder.append(", timeStampLastEvent="); - builder.append(timeStampLastEvent); - builder.append(", rateCount="); - builder.append(rateCount); - builder.append(", overload="); - builder.append(overload); - builder.append("]"); - return builder.toString(); - } -} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java new file mode 100644 index 000000000..7ffa29e89 --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java @@ -0,0 +1,323 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Problems of to many notifications during mount of thousand of devices: + * <ul> + * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate + * <li>Notification processing blocks user -> App design with notifications popups + * </ul> + * Rate filter requirements + * <ul> + * <li>Use a single thread + * <li>Parameter1 integrationTime : Measurement or integration time for period + * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload + * <li>Start measurement on event received that comes later then + * </ul> + * + * <pre> + * Example for behavior (e: Event received, rateMaxCount=3) + * eee e e e e e e e e e e e e e e + * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| + * P1 P2 P1 P2 P3 P7 P1 + *Overload no no yes yes no no + * </pre> + * + * Interface to use: + * <ul> + * <li>construct RateFilterManager. Parameters are integration time and function to get the actual time + * <li>RateFilterManager.getRateFilter() provides rateFilter object for a stream to count events and provide overload + * status. + * <li>rateFilter.event() count the events during measurement period + * <li>rateFilter.getOverloadStatus() indicates status + * <li>rateFilter.close() to release this object + * </ul> + */ + +public class RateFilterManager implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(RateFilterManager.class.getName()); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("dd.mm.yy hh:mm:ss_SSS") + .withLocale(Locale.GERMAN).withZone(ZoneId.systemDefault()); + private static final long CLIENTS_NUMBER_WARNLEVEL = 1000; + + //Configuration + private final Duration integrationTime; // Integration time to measure event rate + private GetNow get; //Provides actual system time + private final Map<Long, RateFilter> rateFilterList; + @SuppressWarnings("unused") + private final Timer timerTask; + + /** + * Allow testing with own timestamp provider Provide actual system time. + */ + public interface GetNow { + Instant now(); + } + + /** + * Constructor with all parameters, intended to be used for unit test + * + * @param integrationTime is the interval length for counting events. + * @param rateMaxCountDefault if event count exceed this limit, status changes to overload. + * @param startTimer true start time with intervall time + * @param get function to provide actual system time. + */ + public RateFilterManager(Duration integrationTime, boolean startTimer, GetNow get) { + this.integrationTime = integrationTime; + this.get = get; + + this.rateFilterList = Collections.synchronizedMap(new HashMap<Long, RateFilter>()); + this.timerTask = startTimer ? startTimerTask(integrationTime) : null; + } + + /** + * Get RateFilter manager + * + * @param integrationTime is the time to measure events + * @param rateMaxCountDefault if exceeded state overload is true + */ + public RateFilterManager(Duration integrationTime) { + this(integrationTime, true, () -> Instant.now()); + } + + /** + */ + /** + * Get a specific rate filter for one stream. Use close() to release. + * + * @param ratePerMinute Rate per Minute for this filter. If 0 never overloaded. + * @return RateFilter object for each event stream. + * @throws IllegalArgumentException on negative rate + */ + public synchronized RateFilter getRateFilter(long maxRatePerMinute) throws IllegalArgumentException { + long maxEventsPerIntegration = convertRPMToMaxCount(maxRatePerMinute); + if (maxEventsPerIntegration < 0) + throw new IllegalArgumentException( + "Resulting in illegal maxEventsPerIntegration=" + maxEventsPerIntegration); + return getRateFilterInstance(maxEventsPerIntegration); + } + + @Override + public void close() { + if (timerTask != null) { + timerTask.cancel(); + timerTask.purge(); + } + rateFilterList.clear(); + } + + /** + * Function to get a new Ratefilter for a connection + * + * @param maxEventsPerIntegration + * @return reference to object with filter status + */ + private RateFilter getRateFilterInstance(long maxEventsPerIntegration) { + RateFilter rateFilter; + synchronized (rateFilterList) { + rateFilter = rateFilterList.get(maxEventsPerIntegration); + if (rateFilter == null) { + rateFilter = new RateFilter(maxEventsPerIntegration); + synchronized (rateFilterList) { + rateFilterList.put(maxEventsPerIntegration, rateFilter); + } + } else { + if (rateFilter.addClient() > CLIENTS_NUMBER_WARNLEVEL) + LOG.warn("Warnlevel {} exceeded for client connections", CLIENTS_NUMBER_WARNLEVEL); + } + } + return rateFilter; + } + + private Timer startTimerTask(Duration integrationTime) { + long milliseconds = integrationTime.toMillis(); + LOG.debug("Start startTimerTask with {} ms", milliseconds); + Timer time = new Timer(); + time.scheduleAtFixedRate(new TimeoutHandler(), 0L, milliseconds); + return time; + } + + private class TimeoutHandler extends TimerTask { + @Override + public void run() { + LOG.debug("Run timeout task at {}", f(get.now())); + synchronized (rateFilterList) { + rateFilterList.forEach((k, f) -> f.timer()); + } + } + } + + /** + * Provide nice debug output for Instant and Duration + * + * @param i with instant + * @return output string + */ + private static String f(Instant i) { + return i != null ? FORMATTER.format(i) : "null"; + } + + /** + * Convert a rate per minute into events per integration time. + * + * @param ratePerMinute + * @return events per integration time. + */ + private long convertRPMToMaxCount(long ratePerMinute) { + return ratePerMinute * integrationTime.toSeconds() / TimeUnit.MINUTES.toSeconds(1); + } + + /** + * Ratefilter class contains status informaton for one event stream. + */ + public class RateFilter implements Closeable { + private final long maxEventsPerIntegration; //uuid and maximum of events without overload + private Long clients; // Number of clients for this filter. + private long rateCount; // number of events during integration period + private boolean overload; //true means in overload status. Change at end of period only. + + /** + * Create a new Filter + * + * @param maxEventsPerIntegration >= 1 characteristics and uuid of this filter. < 1 switched off + * @see {@link #close} + */ + private RateFilter(long maxEventsPerIntegration) { + synchronized (this) { + this.clients = 1L; + this.maxEventsPerIntegration = maxEventsPerIntegration; + this.rateCount = 0; + } + } + + /** + * Add a client to this filter + * + * @return number of clients, handled by this filter + * @see {@link #close} + */ + private synchronized long addClient() { + if (clients >= 1) { + ++clients; + } else { + LOG.warn("Misalligned open/close for {} with number {}", maxEventsPerIntegration, clients); + } + return clients; + } + + /** + * Provide actual overload status + * + * @return status true means overloaded false not overloaded + */ + public synchronized boolean getOverloadStatus() { + return overload; + } + + /** + * Handle filter on event received + */ + public synchronized void event() { + rateCount++; + LOG.debug("event rc:{}", rateCount); + } + + /** + * Called if measurement period ends. Device if overload and reset counter. + */ + public synchronized void timer() { + //Change overload only at end of period + //Always inactive if maxEventsPerIntegration== 0 + if (maxEventsPerIntegration > 0) { + overload = rateCount > maxEventsPerIntegration; + } + rateCount = 0; + LOG.debug("Timer ol:{} rc:{}", overload, rateCount); + } + + /** + * Get maximum events allowed per integration period + * + * @return 1 ... + */ + public synchronized long getMaxEventsPerIntegration() { + return maxEventsPerIntegration; + } + + /** + * Get number of client streams. + * + * @return 1 ... + */ + public synchronized long getClients() { + return clients; + } + + @Override + public void close() { + synchronized (rateFilterList) { + if (clients == 1) { + LOG.debug("Close and remove last client {}", maxEventsPerIntegration); + rateFilterList.remove(this.maxEventsPerIntegration); + clients--; + } else if (clients > 1) { + LOG.debug("Close one client of {} for events {}", clients, maxEventsPerIntegration); + clients--; + } else { + LOG.warn("Misaligned new/close for events {}", maxEventsPerIntegration); + } + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("RateFilter [maxEventsPerIntegration="); + builder.append(maxEventsPerIntegration); + builder.append(", clients="); + builder.append(clients); + builder.append(", rateCount="); + builder.append(rateCount); + builder.append(", overload="); + builder.append(overload); + builder.append("]"); + return builder.toString(); + } + } +} |