diff options
Diffstat (limited to 'openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main')
9 files changed, 595 insertions, 0 deletions
diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java new file mode 100644 index 0000000000..d960b8b9dc --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/config/ConfigurationManager.java @@ -0,0 +1,113 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.notification.config; + +import org.openecomp.sdc.logging.api.Logger; +import org.openecomp.sdc.logging.api.LoggerFactory; +import org.openecomp.sdc.tosca.services.YamlUtil; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.BiConsumer; + +public class ConfigurationManager { + + private static final String CONFIGURATION_YAML_FILE = "onboarding_configuration.yaml"; + private static final String NOTIFICATIONS_CONFIG = "notifications"; + + private LinkedHashMap<String, Object> notificationsConfiguration; + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationManager.class); + private static final ConfigurationManager SINGLETON = new ConfigurationManager(); + + public static ConfigurationManager getInstance() { + return SINGLETON; + } + + private ConfigurationManager() { + initConfiguration(); + } + + private void initConfiguration() { + + YamlUtil yamlUtil = new YamlUtil(); + readConfigurationFromStream(yamlUtil, (filename, stream) -> { + + if (stream == null) { + LOGGER.warn("Configuration not found: " + filename + ". Using defaults"); + return; + } + + Map<String, LinkedHashMap<String, Object>> configurationMap = yamlUtil.yamlToMap(stream); + if (configurationMap == null) { + LOGGER.warn("Configuration cannot be parsed: " + filename + ". Using defaults"); + return; + } + + notificationsConfiguration = configurationMap.get(NOTIFICATIONS_CONFIG); + if (notificationsConfiguration == null) { + LOGGER.error(NOTIFICATIONS_CONFIG + + " is missing in configuration file '" + filename + "'. Using defaults"); + } + }); + } + + private void readConfigurationFromStream(YamlUtil yamlUtil, + BiConsumer<String, InputStream> reader) { + + String configurationYamlFile = System.getProperty(CONFIGURATION_YAML_FILE); + + try { + + if (configurationYamlFile == null) { + + try (InputStream inputStream = + yamlUtil.loadYamlFileIs("/" + CONFIGURATION_YAML_FILE)) { + reader.accept(CONFIGURATION_YAML_FILE, inputStream); + } + + } else { + + try (InputStream inputStream = new FileInputStream(configurationYamlFile)) { + reader.accept(configurationYamlFile, inputStream); + } + } + + } catch (IOException e) { + LOGGER.error("Failed to read configuration", e); + } + } + + public <T> T getConfigValue(String name, T defaultValue) { + + Object value = notificationsConfiguration.get(name); + + try { + return value == null ? defaultValue : (T) value; + } catch (ClassCastException e) { + LOGGER.warn(String.format("Failed to read configuration property '%s' as requested type. Using default '%s'", + name, defaultValue), e); + return defaultValue; + } + } +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java new file mode 100644 index 0000000000..a415d9e081 --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationEntityDto.java @@ -0,0 +1,103 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.notification.types; + +import java.util.Map; +import java.util.UUID; + +public class NotificationEntityDto { + private boolean read; + private UUID eventId; + private String dateTime; + private String eventType; + private Map<String, Object> eventAttributes; + + public NotificationEntityDto() { + } + public NotificationEntityDto(boolean read, UUID eventId, String eventType, + Map<String, Object> eventAttributes) { + this.read = read; + this.eventId = eventId; + this.eventType = eventType; + this.eventAttributes = eventAttributes; + } + + public NotificationEntityDto(boolean read, UUID eventId,String eventType, + Map<String, Object> eventAttributes, String dateTime) { + this.read = read; + this.eventId = eventId; + this.dateTime = dateTime; + this.eventType = eventType; + this.eventAttributes = eventAttributes; + } + + public boolean isRead() { + return read; + } + + public void setRead(boolean read) { + this.read = read; + } + + public UUID getEventId() { + return eventId; + } + + public void setEventId(UUID eventId) { + this.eventId = eventId; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public Map<String, Object> getEventAttributes() { + return eventAttributes; + } + + public void setEventAttributes(Map<String, Object> eventAttributes) { + this.eventAttributes = eventAttributes; + } + + public String getDateTime() { + return dateTime; + } + + public void setDateTime(String dateTime) { + this.dateTime = dateTime; + } + + @Override + public String toString() { + return "NotificationEntityDto {" + + ", state='" + (read ? "Read" : "Noread") + '\'' + + ", dateTime='" + dateTime + '\'' + + ", eventId='" + eventId + '\'' + + ", eventType='" + eventType + '\'' + + ", eventAttributes='" + eventAttributes + '\'' + + '}'; + } + +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java new file mode 100644 index 0000000000..fd8d32090e --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/types/NotificationsStatusDto.java @@ -0,0 +1,72 @@ +package org.openecomp.sdc.notification.types; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * @author avrahamg + * @since June 29, 2017 + */ +public class NotificationsStatusDto { + private List<NotificationEntityDto> notifications; + private List<UUID> newEntries = new ArrayList<>(); + private UUID lastScanned; + private UUID endOfPage; + private long numOfNotSeenNotifications; + + public NotificationsStatusDto() { + } + + public List<NotificationEntityDto> getNotifications() { + return notifications; + } + + public void setNotifications( + List<NotificationEntityDto> notifications) { + this.notifications = notifications; + } + + public List<UUID> getNewEntries() { + return newEntries; + } + + public void setNewEntries(List<UUID> newEntries) { + this.newEntries = newEntries; + } + + public UUID getLastScanned() { + return lastScanned; + } + + public void setLastScanned(UUID lastScanned) { + this.lastScanned = lastScanned; + } + + public UUID getEndOfPage() { + return endOfPage; + } + + public void setEndOfPage(UUID endOfPage) { + this.endOfPage = endOfPage; + } + + public long getNumOfNotSeenNotifications() { + return numOfNotSeenNotifications; + } + + public void setNumOfNotSeenNotifications(long numOfNotSeenNotifications) { + this.numOfNotSeenNotifications = numOfNotSeenNotifications; + } + + @Override + public String toString() { + return "NotificationsStatusDto{" + + "notifications=" + notifications + + ", newEntries=" + newEntries + + ", lastScanned=" + lastScanned + + ", endOfPage=" + endOfPage + + ", numOfNotSeenNotifications=" + numOfNotSeenNotifications + + '}'; + } +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java new file mode 100644 index 0000000000..75b9f5c786 --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NewNotificationsReader.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.notification.workers; + +import org.openecomp.sdc.notification.types.NotificationsStatusDto; + +import java.util.UUID; + +public interface NewNotificationsReader { + NotificationsStatusDto getNewNotifications(String ownerId, UUID eventId, int limit); +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java new file mode 100644 index 0000000000..ee802640c4 --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationReceiver.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.notification.workers; + +import org.openecomp.sdc.notification.types.NotificationsStatusDto; + +import java.util.UUID; +import java.util.function.Consumer; + + + +public class NotificationReceiver { + + public String ownerId = null; + public Consumer<NotificationsStatusDto> notesProcessor = null; + public UUID lastEventId = null; + + NotificationReceiver(String ownerId, UUID lastEventId, Consumer<NotificationsStatusDto> notesProcessor) { + this.ownerId = ownerId; + this.lastEventId = lastEventId; + this.notesProcessor = notesProcessor; + } + + NotificationReceiver(String ownerId, Consumer<NotificationsStatusDto> notesProcessor) { + this(ownerId, null, notesProcessor); + } + + public void setOwnerId(String ownerId) { + this.ownerId = ownerId; + } + + public String getOwnerId() { + return this.ownerId; + } + + public void setNotesProcessor(Consumer<NotificationsStatusDto> notesProcessor) { + this.notesProcessor = notesProcessor; + } + + public Consumer<NotificationsStatusDto> getNotesProcessor() { + return this.notesProcessor; + } + + public void setLastEventId(UUID lastEventId) { + this.lastEventId = lastEventId; + } + + public UUID getlastEventId() { + return this.lastEventId; + } +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java new file mode 100644 index 0000000000..e8c2006d5c --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/NotificationWorker.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.notification.workers; + +import org.apache.commons.collections4.CollectionUtils; +import org.openecomp.sdc.logging.api.Logger; +import org.openecomp.sdc.logging.api.LoggerFactory; +import org.openecomp.sdc.notification.config.ConfigurationManager; +import org.openecomp.sdc.notification.types.NotificationsStatusDto; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class NotificationWorker { + + private static final int DEFAULT_POLLING_INTERVAL = 2000; + private static final String POLLING_INTERVAL = "pollingIntervalMsec"; + private static final int DEFAULT_SELECTION_LIMIT = 10; + private static final String SELECTION_SIZE = "selectionSize"; + + private static boolean stopRunning = false; + + private int selectionLimit = DEFAULT_SELECTION_LIMIT; + private int pollingSleepInterval = DEFAULT_POLLING_INTERVAL; + + private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorker.class); + + private static Map<String, NotificationReceiver> activeUsers = new ConcurrentHashMap<>(); + private NewNotificationsReader news = null; + + public NotificationWorker(NewNotificationsReader news) { + ConfigurationManager cm = ConfigurationManager.getInstance(); + pollingSleepInterval = cm.getConfigValue(POLLING_INTERVAL, DEFAULT_POLLING_INTERVAL); + selectionLimit = cm.getConfigValue(SELECTION_SIZE, DEFAULT_SELECTION_LIMIT); + + Objects.requireNonNull(news, "NotificationNews object is not initialized."); + this.news = news; + + NotificationWorker.Poller p = new Poller(); + Thread thread = new Thread(p); + thread.start(); + } + + public Map<String, NotificationReceiver> getActiveUsers() { + return activeUsers; + } + + public class Poller extends Thread { + public void run() { + try { + while (!stopRunning) { + pollNotifications(); + Thread.sleep(pollingSleepInterval); + } + } + catch (InterruptedException e) { + LOGGER.error("Interrupted Exception during Notification poller launch.", e); + } + } + + private void pollNotifications() { + + Map<String, NotificationReceiver> currUsers = new HashMap<>(); + currUsers.putAll(getActiveUsers()); + + for (NotificationReceiver receiver : currUsers.values()) { + String ownerId = receiver.getOwnerId(); + UUID eventId = receiver.getlastEventId(); + NotificationsStatusDto status = news.getNewNotifications(ownerId, eventId, selectionLimit); + if(Objects.nonNull(status) && CollectionUtils.isNotEmpty(status.getNotifications())) { + receiver.setLastEventId(status.getLastScanned()); + receiver.getNotesProcessor().accept(status); + } + } + } + + } + + public void register(String ownerId, UUID lastDelivered, Consumer<NotificationsStatusDto> notesProcessor) { + NotificationReceiver receiver = new NotificationReceiver(ownerId, lastDelivered, notesProcessor); + activeUsers.put(ownerId, receiver); + LOGGER.debug("User {} is registered with eventId: {}", ownerId, receiver.getlastEventId()); + } + + public void unregister(String ownerId) { + activeUsers.remove(ownerId); + LOGGER.debug("User {} is unregistered.", ownerId); + } + + public void stopPolling() { + LOGGER.debug("Stop notification polling."); + stopRunning = true; + } + +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java new file mode 100644 index 0000000000..a332efaf5a --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/java/org/openecomp/sdc/notification/workers/impl/NewNotificationsReaderRestImpl.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.notification.workers.impl; + +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.codehaus.jackson.map.ObjectMapper; +import org.openecomp.sdc.logging.api.Logger; +import org.openecomp.sdc.logging.api.LoggerFactory; +import org.openecomp.sdc.notification.config.ConfigurationManager; +import org.openecomp.sdc.notification.types.NotificationsStatusDto; +import org.openecomp.sdc.notification.workers.NewNotificationsReader; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import java.io.InputStreamReader; +import java.util.UUID; + +public class NewNotificationsReaderRestImpl implements NewNotificationsReader { + + private static final String USER_ID_HEADER_PARAM = "USER_ID"; + private static final String LAST_DELIVERED_QUERY_PARAM = "LAST_DELIVERED_EVENT_ID"; + private static final String LIMIT_QUERY_PARAM = "NOTIFICATION_ROWS_LIMIT"; + private static final String BE_HOST = "beHost"; + private static final String BE_PORT = "beHttpPort"; + private static final String DEFAULT_BE_HOST = "localhost"; + private static final int DEFAULT_BE_PORT = 8080; + private static final String URL = "http://%s:%d/onboarding-api/v1.0/notifications/worker?"; + private static final ObjectMapper mapper = new ObjectMapper(); + + private static String beHost; + private static int bePort; + + private static final Logger LOGGER = LoggerFactory.getLogger(NewNotificationsReaderRestImpl.class); + + public NewNotificationsReaderRestImpl() { + ConfigurationManager cm = ConfigurationManager.getInstance(); + bePort = cm.getConfigValue(BE_PORT, DEFAULT_BE_PORT); + beHost = cm.getConfigValue(BE_HOST, DEFAULT_BE_HOST); + } + + public NotificationsStatusDto getNewNotifications(String ownerId, UUID eventId, int limit) { + HttpClient client = HttpClientBuilder.create().build(); + String url = String.format(URL, beHost, bePort); + + url = url + LIMIT_QUERY_PARAM + "=" + limit; + if (eventId != null) { + url = url + "&" + LAST_DELIVERED_QUERY_PARAM + "=" + eventId; + } + + HttpGet request = new HttpGet(url); + request.addHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON); + request.addHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON); + request.addHeader(USER_ID_HEADER_PARAM, ownerId); + + try { + HttpResponse response = client.execute(request); + return mapper.readValue(new InputStreamReader(response.getEntity().getContent()), NotificationsStatusDto.class); + } catch (Exception e) { + LOGGER.error("Failed to execute the request {}", url, e); + return null; + } + } +} diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/resources/factoryConfiguration.json b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/resources/factoryConfiguration.json new file mode 100644 index 0000000000..5f5693fb92 --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/resources/factoryConfiguration.json @@ -0,0 +1,3 @@ +{ + "org.openecomp.config.api.ConfigurationManager": "org.openecomp.config.impl.ConfigurationImpl" +}
\ No newline at end of file diff --git a/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/resources/onboarding_configuration.yaml b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/resources/onboarding_configuration.yaml new file mode 100644 index 0000000000..a1b7ba5d9e --- /dev/null +++ b/openecomp-be/lib/openecomp-sdc-notification-lib/openecomp-sdc-notification-worker/src/main/resources/onboarding_configuration.yaml @@ -0,0 +1,5 @@ +notifications: + pollingIntervalMsec: 2000 + selectionSize: 10 + beHost: localhost + beHttpPort: 8080 |