summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorJoanna Jeremicz <joanna.jeremicz@nokia.com>2021-03-12 18:03:30 +0100
committerJoanna Jeremicz <joanna.jeremicz@nokia.com>2021-03-17 10:14:23 +0100
commit71d024482f4eb13e069aa240bea657109f4a2fd8 (patch)
tree1107c7a84e0ec38e54542b4f50a4edfdb209e589 /src/main
parent96413a6b5c82edc5c64e399d9ab1077236bf5628 (diff)
Incoming events cache - do not load files, which are in progress1.5.2
- Implement singleton cache for events being processed - Add JUnit tests - Bump snapshot version 1.5.1 -> 1.5.2 Issue-ID: DCAEGEN2-2659 Signed-off-by: Joanna Jeremicz <joanna.jeremicz@nokia.com> Change-Id: I79f32554a8c67d91d0d7b601bc8b060ff4984eab
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java21
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java68
3 files changed, 90 insertions, 1 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 4583df3..b020837 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -48,6 +48,7 @@ import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
import org.onap.dcaegen2.services.pmmapper.model.ServerResource;
import org.onap.dcaegen2.services.pmmapper.ssl.SSLContextFactory;
import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.dcaegen2.services.pmmapper.utils.IncomingEventsCache;
import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
@@ -80,6 +81,7 @@ public class App {
private static final int HTTPS_PORT = 8443;
private static final int INITIAL_RECONFIGURATION_PERIOD = 60;
private static final int RECONFIGURATION_PERIOD = 60;
+ private static final IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
@@ -139,6 +141,8 @@ public class App {
this.flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
+ .filter(event -> !isCached(event.getPublishIdentity()))
+ .doOnNext(App::addToCache)
.limitRate(processingConfig.getLimitRate())
.parallel(processingThreads)
.runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
@@ -204,6 +208,14 @@ public class App {
}
}
+ private boolean isCached(String id) {
+ boolean isPresent = eventsCache.contains(id);
+ if(isPresent) {
+ logger.unwrap().info("Skipping. This event is already waiting in cache to be processed: " + id);
+ }
+ return isPresent;
+ }
+
public static void main(String[] args) throws EnvironmentConfigException {
FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
@@ -280,6 +292,7 @@ public class App {
public static void sendEventProcessed(MapperConfig config, Event event) {
try {
DataRouterUtils.processEvent(config, event);
+ eventsCache.remove(event.getPublishIdentity());
} catch (ProcessEventException exception) {
logger.unwrap().error("Process event failure", exception);
}
@@ -300,6 +313,14 @@ public class App {
}
/**
+ * Adds received event to cache
+ * @param event to be cached.
+ */
+ public static void addToCache(@NonNull Event event) {
+ eventsCache.add(event.getPublishIdentity());
+ }
+
+ /**
* Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
* @param event to be received.
*/
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
index 57d9570..3bd4a37 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2020 Nordix Foundation.
+ * Copyright (C) 2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java
new file mode 100644
index 0000000..677a3c3
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public enum IncomingEventsCache {
+ INSTANCE;
+
+ private Set<String> eventsIds = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Gets thread safe, single instance of this enum
+ * @return the single instance of cache
+ */
+ public IncomingEventsCache getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Adds publishIdentity of an event to cache
+ * @param id to be added to cache
+ */
+ public void add(String id) {
+ eventsIds.add(id);
+ }
+
+ /**
+ * Remove publishIdentity of an event from cache
+ * @param id to be removed from cache
+ */
+ public void remove(String id) {
+ eventsIds.remove(id);
+ }
+
+ /**
+ * Checks if the cache contains a publishIdentity
+ * @param id to be found in cache
+ * @return true when the id exists, false otherwise
+ */
+ public boolean contains(String id) {
+ return eventsIds.contains(id);
+ }
+
+ Set<String> getCachedEvents() {
+ return new HashSet<>(eventsIds);
+ }
+}