diff options
Diffstat (limited to 'openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker')
7 files changed, 106 insertions, 151 deletions
diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java index c016a4797f..e597de0f20 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java @@ -17,96 +17,76 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.config; -import org.onap.sdc.tosca.services.YamlUtil; -import org.openecomp.sdc.logging.api.Logger; -import org.openecomp.sdc.logging.api.LoggerFactory; - import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.LinkedHashMap; import java.util.Map; import java.util.function.BiConsumer; +import org.onap.sdc.tosca.services.YamlUtil; +import org.openecomp.sdc.logging.api.Logger; +import org.openecomp.sdc.logging.api.LoggerFactory; public class ConfigurationManager { private static final String CONFIGURATION_YAML_FILE = "onboarding_configuration.yaml"; private static final String NOTIFICATIONS_CONFIG = "notifications"; - - private LinkedHashMap<String, Object> notificationsConfiguration; private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationManager.class); private static final ConfigurationManager SINGLETON = new ConfigurationManager(); - - public static ConfigurationManager getInstance() { - return SINGLETON; - } + private LinkedHashMap<String, Object> notificationsConfiguration; private ConfigurationManager() { initConfiguration(); } - private void initConfiguration() { + public static ConfigurationManager getInstance() { + return SINGLETON; + } + private void initConfiguration() { YamlUtil yamlUtil = new YamlUtil(); readConfigurationFromStream(yamlUtil, (filename, stream) -> { - if (stream == null) { LOGGER.warn("Configuration not found: " + filename + ". Using defaults"); return; } - Map<String, LinkedHashMap<String, Object>> configurationMap = yamlUtil.yamlToMap(stream); if (configurationMap == null) { LOGGER.warn("Configuration cannot be parsed: " + filename + ". Using defaults"); return; } - notificationsConfiguration = configurationMap.get(NOTIFICATIONS_CONFIG); if (notificationsConfiguration == null) { - LOGGER.error(NOTIFICATIONS_CONFIG + - " is missing in configuration file '" + filename + "'. Using defaults"); + LOGGER.error(NOTIFICATIONS_CONFIG + " is missing in configuration file '" + filename + "'. Using defaults"); } }); } - private void readConfigurationFromStream(YamlUtil yamlUtil, - BiConsumer<String, InputStream> reader) { - + private void readConfigurationFromStream(YamlUtil yamlUtil, BiConsumer<String, InputStream> reader) { String configurationYamlFile = System.getProperty(CONFIGURATION_YAML_FILE); - try { - if (configurationYamlFile == null) { - - try (InputStream inputStream = - yamlUtil.loadYamlFileIs("/" + CONFIGURATION_YAML_FILE)) { + try (InputStream inputStream = yamlUtil.loadYamlFileIs("/" + CONFIGURATION_YAML_FILE)) { reader.accept(CONFIGURATION_YAML_FILE, inputStream); } - } else { - try (InputStream inputStream = new FileInputStream(configurationYamlFile)) { reader.accept(configurationYamlFile, inputStream); } } - } catch (IOException e) { LOGGER.error("Failed to read configuration", e); } } public <T> T getConfigValue(String name, T defaultValue) { - Object value = notificationsConfiguration.get(name); - try { return value == null ? defaultValue : (T) value; } catch (ClassCastException e) { - LOGGER.warn(String.format("Failed to read configuration property '%s' as requested type. Using default '%s'", - name, defaultValue), e); + LOGGER.warn(String.format("Failed to read configuration property '%s' as requested type. Using default '%s'", name, defaultValue), e); return defaultValue; } } diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java index a48b16add6..9e92a2b983 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.types; import java.util.Map; diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java index 7aca3fd63d..c4522c4e04 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.types; import java.util.ArrayList; diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java index 75b9f5c786..8dd64c348b 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java @@ -17,13 +17,12 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.workers; -import org.openecomp.sdc.notification.types.NotificationsStatusDto; - import java.util.UUID; +import org.openecomp.sdc.notification.types.NotificationsStatusDto; public interface NewNotificationsReader { - NotificationsStatusDto getNewNotifications(String ownerId, UUID eventId, int limit); + + NotificationsStatusDto getNewNotifications(String ownerId, UUID eventId, int limit); } diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java index 4ce87427aa..e357fda982 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java @@ -17,28 +17,25 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.workers; -import org.openecomp.sdc.notification.types.NotificationsStatusDto; - import java.util.UUID; import java.util.function.Consumer; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; - +import org.openecomp.sdc.notification.types.NotificationsStatusDto; @Getter @Setter @AllArgsConstructor public class NotificationReceiver { - private String ownerId = null; - private UUID lastEventId = null; - private Consumer<NotificationsStatusDto> notesProcessor = null; + private String ownerId = null; + private UUID lastEventId = null; + private Consumer<NotificationsStatusDto> notesProcessor = null; - NotificationReceiver(String ownerId, Consumer<NotificationsStatusDto> notesProcessor) { - this(ownerId, null, notesProcessor); - } + NotificationReceiver(String ownerId, Consumer<NotificationsStatusDto> notesProcessor) { + this(ownerId, null, notesProcessor); + } } diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java index dce8edb356..c69c4ca524 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java @@ -17,103 +17,91 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.workers; -import org.apache.commons.collections4.CollectionUtils; -import org.openecomp.sdc.logging.api.Logger; -import org.openecomp.sdc.logging.api.LoggerFactory; -import org.openecomp.sdc.notification.config.ConfigurationManager; -import org.openecomp.sdc.notification.types.NotificationsStatusDto; - import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import org.apache.commons.collections4.CollectionUtils; +import org.openecomp.sdc.logging.api.Logger; +import org.openecomp.sdc.logging.api.LoggerFactory; +import org.openecomp.sdc.notification.config.ConfigurationManager; +import org.openecomp.sdc.notification.types.NotificationsStatusDto; public class NotificationWorker { - private static final int DEFAULT_POLLING_INTERVAL = 2000; - private static final String POLLING_INTERVAL = "pollingIntervalMsec"; - private static final int DEFAULT_SELECTION_LIMIT = 10; - private static final String SELECTION_SIZE = "selectionSize"; - - private static boolean stopRunning = false; - - private int selectionLimit = DEFAULT_SELECTION_LIMIT; - private int pollingSleepInterval = DEFAULT_POLLING_INTERVAL; - - private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorker.class); - - private static Map<String, NotificationReceiver> activeUsers = new ConcurrentHashMap<>(); - private NewNotificationsReader news = null; - - public NotificationWorker(NewNotificationsReader news) { - ConfigurationManager cm = ConfigurationManager.getInstance(); - pollingSleepInterval = cm.getConfigValue(POLLING_INTERVAL, DEFAULT_POLLING_INTERVAL); - selectionLimit = cm.getConfigValue(SELECTION_SIZE, DEFAULT_SELECTION_LIMIT); - - Objects.requireNonNull(news, "NotificationNews object is not initialized."); - this.news = news; - - NotificationWorker.Poller p = new Poller(); - Thread thread = new Thread(p); - thread.start(); - } - - public Map<String, NotificationReceiver> getActiveUsers() { - return activeUsers; - } - - public class Poller extends Thread { - @Override - public void run() { - try { - while (!stopRunning) { - pollNotifications(); - Thread.sleep(pollingSleepInterval); - } - } - catch (InterruptedException e) { - LOGGER.error("Interrupted Exception during Notification poller launch.", e); - Thread.currentThread().interrupt(); - } - } - - private void pollNotifications() { - - Map<String, NotificationReceiver> currUsers = new HashMap<>(); - currUsers.putAll(getActiveUsers()); - - for (NotificationReceiver receiver : currUsers.values()) { - String ownerId = receiver.getOwnerId(); - UUID eventId = receiver.getLastEventId(); - NotificationsStatusDto status = news.getNewNotifications(ownerId, eventId, selectionLimit); - if(Objects.nonNull(status) && CollectionUtils.isNotEmpty(status.getNotifications())) { - receiver.setLastEventId(status.getLastScanned()); - receiver.getNotesProcessor().accept(status); - } - } - } - - } - - public void register(String ownerId, UUID lastDelivered, Consumer<NotificationsStatusDto> notesProcessor) { - NotificationReceiver receiver = new NotificationReceiver(ownerId, lastDelivered, notesProcessor); - activeUsers.put(ownerId, receiver); - LOGGER.debug("User {} is registered with eventId: {}", ownerId, receiver.getLastEventId()); - } - - public void unregister(String ownerId) { - activeUsers.remove(ownerId); - LOGGER.debug("User {} is unregistered.", ownerId); - } - - public void stopPolling() { - LOGGER.debug("Stop notification polling."); - stopRunning = true; - } - + private static final int DEFAULT_POLLING_INTERVAL = 2000; + private static final String POLLING_INTERVAL = "pollingIntervalMsec"; + private static final int DEFAULT_SELECTION_LIMIT = 10; + private static final String SELECTION_SIZE = "selectionSize"; + private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorker.class); + private static boolean stopRunning = false; + private static Map<String, NotificationReceiver> activeUsers = new ConcurrentHashMap<>(); + private int selectionLimit = DEFAULT_SELECTION_LIMIT; + private int pollingSleepInterval = DEFAULT_POLLING_INTERVAL; + private NewNotificationsReader news = null; + + public NotificationWorker(NewNotificationsReader news) { + ConfigurationManager cm = ConfigurationManager.getInstance(); + pollingSleepInterval = cm.getConfigValue(POLLING_INTERVAL, DEFAULT_POLLING_INTERVAL); + selectionLimit = cm.getConfigValue(SELECTION_SIZE, DEFAULT_SELECTION_LIMIT); + Objects.requireNonNull(news, "NotificationNews object is not initialized."); + this.news = news; + NotificationWorker.Poller p = new Poller(); + Thread thread = new Thread(p); + thread.start(); + } + + public Map<String, NotificationReceiver> getActiveUsers() { + return activeUsers; + } + + public void register(String ownerId, UUID lastDelivered, Consumer<NotificationsStatusDto> notesProcessor) { + NotificationReceiver receiver = new NotificationReceiver(ownerId, lastDelivered, notesProcessor); + activeUsers.put(ownerId, receiver); + LOGGER.debug("User {} is registered with eventId: {}", ownerId, receiver.getLastEventId()); + } + + public void unregister(String ownerId) { + activeUsers.remove(ownerId); + LOGGER.debug("User {} is unregistered.", ownerId); + } + + public void stopPolling() { + LOGGER.debug("Stop notification polling."); + stopRunning = true; + } + + public class Poller extends Thread { + + @Override + public void run() { + try { + while (!stopRunning) { + pollNotifications(); + Thread.sleep(pollingSleepInterval); + } + } catch (InterruptedException e) { + LOGGER.error("Interrupted Exception during Notification poller launch.", e); + Thread.currentThread().interrupt(); + } + } + + private void pollNotifications() { + Map<String, NotificationReceiver> currUsers = new HashMap<>(); + currUsers.putAll(getActiveUsers()); + for (NotificationReceiver receiver : currUsers.values()) { + String ownerId = receiver.getOwnerId(); + UUID eventId = receiver.getLastEventId(); + NotificationsStatusDto status = news.getNewNotifications(ownerId, eventId, selectionLimit); + if (Objects.nonNull(status) && CollectionUtils.isNotEmpty(status.getNotifications())) { + receiver.setLastEventId(status.getLastScanned()); + receiver.getNotesProcessor().accept(status); + } + } + } + } } diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java index 86a3030bd3..7f2390b38a 100644 --- a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java @@ -17,25 +17,23 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.notification.workers.impl; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStreamReader; +import java.util.UUID; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; -import com.fasterxml.jackson.databind.ObjectMapper; import org.openecomp.sdc.logging.api.Logger; import org.openecomp.sdc.logging.api.LoggerFactory; import org.openecomp.sdc.notification.config.ConfigurationManager; import org.openecomp.sdc.notification.types.NotificationsStatusDto; import org.openecomp.sdc.notification.workers.NewNotificationsReader; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import java.io.InputStreamReader; -import java.util.UUID; - public class NewNotificationsReaderRestImpl implements NewNotificationsReader { private static final String USER_ID_HEADER_PARAM = "USER_ID"; @@ -47,12 +45,10 @@ public class NewNotificationsReaderRestImpl implements NewNotificationsReader { private static final int DEFAULT_BE_PORT = 8080; private static final String URL = "http://%s:%d/onboarding-api/v1.0/notifications/worker?"; private static final ObjectMapper mapper = new ObjectMapper(); - + private static final Logger LOGGER = LoggerFactory.getLogger(NewNotificationsReaderRestImpl.class); private static String beHost; private static int bePort; - private static final Logger LOGGER = LoggerFactory.getLogger(NewNotificationsReaderRestImpl.class); - public NewNotificationsReaderRestImpl() { ConfigurationManager cm = ConfigurationManager.getInstance(); bePort = cm.getConfigValue(BE_PORT, DEFAULT_BE_PORT); @@ -62,17 +58,14 @@ public class NewNotificationsReaderRestImpl implements NewNotificationsReader { public NotificationsStatusDto getNewNotifications(String ownerId, UUID eventId, int limit) { HttpClient client = HttpClientBuilder.create().build(); String url = String.format(URL, beHost, bePort); - url = url + LIMIT_QUERY_PARAM + "=" + limit; if (eventId != null) { url = url + "&" + LAST_DELIVERED_QUERY_PARAM + "=" + eventId; } - HttpGet request = new HttpGet(url); request.addHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON); request.addHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON); request.addHeader(USER_ID_HEADER_PARAM, ownerId); - try { HttpResponse response = client.execute(request); return mapper.readValue(new InputStreamReader(response.getEntity().getContent()), NotificationsStatusDto.class); |