summaryrefslogtreecommitdiffstats
path: root/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java
diff options
context:
space:
mode:
Diffstat (limited to 'openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java')
-rw-r--r--openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java164
1 files changed, 76 insertions, 88 deletions
diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java
index dce8edb356..c69c4ca524 100644
--- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java
+++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java
@@ -17,103 +17,91 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.openecomp.sdc.notification.workers;
-import org.apache.commons.collections4.CollectionUtils;
-import org.openecomp.sdc.logging.api.Logger;
-import org.openecomp.sdc.logging.api.LoggerFactory;
-import org.openecomp.sdc.notification.config.ConfigurationManager;
-import org.openecomp.sdc.notification.types.NotificationsStatusDto;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
+import org.apache.commons.collections4.CollectionUtils;
+import org.openecomp.sdc.logging.api.Logger;
+import org.openecomp.sdc.logging.api.LoggerFactory;
+import org.openecomp.sdc.notification.config.ConfigurationManager;
+import org.openecomp.sdc.notification.types.NotificationsStatusDto;
public class NotificationWorker {
- private static final int DEFAULT_POLLING_INTERVAL = 2000;
- private static final String POLLING_INTERVAL = "pollingIntervalMsec";
- private static final int DEFAULT_SELECTION_LIMIT = 10;
- private static final String SELECTION_SIZE = "selectionSize";
-
- private static boolean stopRunning = false;
-
- private int selectionLimit = DEFAULT_SELECTION_LIMIT;
- private int pollingSleepInterval = DEFAULT_POLLING_INTERVAL;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorker.class);
-
- private static Map<String, NotificationReceiver> activeUsers = new ConcurrentHashMap<>();
- private NewNotificationsReader news = null;
-
- public NotificationWorker(NewNotificationsReader news) {
- ConfigurationManager cm = ConfigurationManager.getInstance();
- pollingSleepInterval = cm.getConfigValue(POLLING_INTERVAL, DEFAULT_POLLING_INTERVAL);
- selectionLimit = cm.getConfigValue(SELECTION_SIZE, DEFAULT_SELECTION_LIMIT);
-
- Objects.requireNonNull(news, "NotificationNews object is not initialized.");
- this.news = news;
-
- NotificationWorker.Poller p = new Poller();
- Thread thread = new Thread(p);
- thread.start();
- }
-
- public Map<String, NotificationReceiver> getActiveUsers() {
- return activeUsers;
- }
-
- public class Poller extends Thread {
- @Override
- public void run() {
- try {
- while (!stopRunning) {
- pollNotifications();
- Thread.sleep(pollingSleepInterval);
- }
- }
- catch (InterruptedException e) {
- LOGGER.error("Interrupted Exception during Notification poller launch.", e);
- Thread.currentThread().interrupt();
- }
- }
-
- private void pollNotifications() {
-
- Map<String, NotificationReceiver> currUsers = new HashMap<>();
- currUsers.putAll(getActiveUsers());
-
- for (NotificationReceiver receiver : currUsers.values()) {
- String ownerId = receiver.getOwnerId();
- UUID eventId = receiver.getLastEventId();
- NotificationsStatusDto status = news.getNewNotifications(ownerId, eventId, selectionLimit);
- if(Objects.nonNull(status) && CollectionUtils.isNotEmpty(status.getNotifications())) {
- receiver.setLastEventId(status.getLastScanned());
- receiver.getNotesProcessor().accept(status);
- }
- }
- }
-
- }
-
- public void register(String ownerId, UUID lastDelivered, Consumer<NotificationsStatusDto> notesProcessor) {
- NotificationReceiver receiver = new NotificationReceiver(ownerId, lastDelivered, notesProcessor);
- activeUsers.put(ownerId, receiver);
- LOGGER.debug("User {} is registered with eventId: {}", ownerId, receiver.getLastEventId());
- }
-
- public void unregister(String ownerId) {
- activeUsers.remove(ownerId);
- LOGGER.debug("User {} is unregistered.", ownerId);
- }
-
- public void stopPolling() {
- LOGGER.debug("Stop notification polling.");
- stopRunning = true;
- }
-
+ private static final int DEFAULT_POLLING_INTERVAL = 2000;
+ private static final String POLLING_INTERVAL = "pollingIntervalMsec";
+ private static final int DEFAULT_SELECTION_LIMIT = 10;
+ private static final String SELECTION_SIZE = "selectionSize";
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorker.class);
+ private static boolean stopRunning = false;
+ private static Map<String, NotificationReceiver> activeUsers = new ConcurrentHashMap<>();
+ private int selectionLimit = DEFAULT_SELECTION_LIMIT;
+ private int pollingSleepInterval = DEFAULT_POLLING_INTERVAL;
+ private NewNotificationsReader news = null;
+
+ public NotificationWorker(NewNotificationsReader news) {
+ ConfigurationManager cm = ConfigurationManager.getInstance();
+ pollingSleepInterval = cm.getConfigValue(POLLING_INTERVAL, DEFAULT_POLLING_INTERVAL);
+ selectionLimit = cm.getConfigValue(SELECTION_SIZE, DEFAULT_SELECTION_LIMIT);
+ Objects.requireNonNull(news, "NotificationNews object is not initialized.");
+ this.news = news;
+ NotificationWorker.Poller p = new Poller();
+ Thread thread = new Thread(p);
+ thread.start();
+ }
+
+ public Map<String, NotificationReceiver> getActiveUsers() {
+ return activeUsers;
+ }
+
+ public void register(String ownerId, UUID lastDelivered, Consumer<NotificationsStatusDto> notesProcessor) {
+ NotificationReceiver receiver = new NotificationReceiver(ownerId, lastDelivered, notesProcessor);
+ activeUsers.put(ownerId, receiver);
+ LOGGER.debug("User {} is registered with eventId: {}", ownerId, receiver.getLastEventId());
+ }
+
+ public void unregister(String ownerId) {
+ activeUsers.remove(ownerId);
+ LOGGER.debug("User {} is unregistered.", ownerId);
+ }
+
+ public void stopPolling() {
+ LOGGER.debug("Stop notification polling.");
+ stopRunning = true;
+ }
+
+ public class Poller extends Thread {
+
+ @Override
+ public void run() {
+ try {
+ while (!stopRunning) {
+ pollNotifications();
+ Thread.sleep(pollingSleepInterval);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted Exception during Notification poller launch.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void pollNotifications() {
+ Map<String, NotificationReceiver> currUsers = new HashMap<>();
+ currUsers.putAll(getActiveUsers());
+ for (NotificationReceiver receiver : currUsers.values()) {
+ String ownerId = receiver.getOwnerId();
+ UUID eventId = receiver.getLastEventId();
+ NotificationsStatusDto status = news.getNewNotifications(ownerId, eventId, selectionLimit);
+ if (Objects.nonNull(status) && CollectionUtils.isNotEmpty(status.getNotifications())) {
+ receiver.setLastEventId(status.getLastScanned());
+ receiver.getNotesProcessor().accept(status);
+ }
+ }
+ }
+ }
}