summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoanna Jeremicz <joanna.jeremicz@nokia.com>2021-03-12 18:03:30 +0100
committerJoanna Jeremicz <joanna.jeremicz@nokia.com>2021-03-17 10:14:23 +0100
commit71d024482f4eb13e069aa240bea657109f4a2fd8 (patch)
tree1107c7a84e0ec38e54542b4f50a4edfdb209e589
parent96413a6b5c82edc5c64e399d9ab1077236bf5628 (diff)
Incoming events cache - do not load files, which are in progress1.5.2
- Implement singleton cache for events being processed - Add JUnit tests - Bump snapshot version 1.5.1 -> 1.5.2 Issue-ID: DCAEGEN2-2659 Signed-off-by: Joanna Jeremicz <joanna.jeremicz@nokia.com> Change-Id: I79f32554a8c67d91d0d7b601bc8b060ff4984eab
-rw-r--r--Changelog.md7
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java21
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java68
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCacheTest.java63
-rw-r--r--version.properties2
7 files changed, 161 insertions, 4 deletions
diff --git a/Changelog.md b/Changelog.md
index f546520..7114f05 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -4,7 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [1.5.2] - 16/03/2021
+
+- Implement singleton cache for events being processed
+- Add JUnit tests
+
## [1.5.1] - 03/02/2021
- Add configuration number of threads and limit rate in files processing
-- Fix vulnerability - update undertow from 2.0.3.Final to 2.2.3.Final and commons.io from 2.6.0 to 2.8.0 \ No newline at end of file
+- Fix vulnerability - update undertow from 2.0.3.Final to 2.2.3.Final and commons.io from 2.6.0 to 2.8.0
diff --git a/pom.xml b/pom.xml
index a594d5d..a6c2187 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>pm-mapper</artifactId>
- <version>1.5.1-SNAPSHOT</version>
+ <version>1.5.2-SNAPSHOT</version>
<name>dcaegen2-services-pm-mapper</name>
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 4583df3..b020837 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -48,6 +48,7 @@ import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
import org.onap.dcaegen2.services.pmmapper.model.ServerResource;
import org.onap.dcaegen2.services.pmmapper.ssl.SSLContextFactory;
import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.dcaegen2.services.pmmapper.utils.IncomingEventsCache;
import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
@@ -80,6 +81,7 @@ public class App {
private static final int HTTPS_PORT = 8443;
private static final int INITIAL_RECONFIGURATION_PERIOD = 60;
private static final int RECONFIGURATION_PERIOD = 60;
+ private static final IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
@@ -139,6 +141,8 @@ public class App {
this.flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
+ .filter(event -> !isCached(event.getPublishIdentity()))
+ .doOnNext(App::addToCache)
.limitRate(processingConfig.getLimitRate())
.parallel(processingThreads)
.runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
@@ -204,6 +208,14 @@ public class App {
}
}
+ private boolean isCached(String id) {
+ boolean isPresent = eventsCache.contains(id);
+ if(isPresent) {
+ logger.unwrap().info("Skipping. This event is already waiting in cache to be processed: " + id);
+ }
+ return isPresent;
+ }
+
public static void main(String[] args) throws EnvironmentConfigException {
FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
@@ -280,6 +292,7 @@ public class App {
public static void sendEventProcessed(MapperConfig config, Event event) {
try {
DataRouterUtils.processEvent(config, event);
+ eventsCache.remove(event.getPublishIdentity());
} catch (ProcessEventException exception) {
logger.unwrap().error("Process event failure", exception);
}
@@ -300,6 +313,14 @@ public class App {
}
/**
+ * Adds received event to cache
+ * @param event to be cached.
+ */
+ public static void addToCache(@NonNull Event event) {
+ eventsCache.add(event.getPublishIdentity());
+ }
+
+ /**
* Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
* @param event to be received.
*/
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
index 57d9570..3bd4a37 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2020 Nordix Foundation.
+ * Copyright (C) 2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java
new file mode 100644
index 0000000..677a3c3
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCache.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public enum IncomingEventsCache {
+ INSTANCE;
+
+ private Set<String> eventsIds = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Gets thread safe, single instance of this enum
+ * @return the single instance of cache
+ */
+ public IncomingEventsCache getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Adds publishIdentity of an event to cache
+ * @param id to be added to cache
+ */
+ public void add(String id) {
+ eventsIds.add(id);
+ }
+
+ /**
+ * Remove publishIdentity of an event from cache
+ * @param id to be removed from cache
+ */
+ public void remove(String id) {
+ eventsIds.remove(id);
+ }
+
+ /**
+ * Checks if the cache contains a publishIdentity
+ * @param id to be found in cache
+ * @return true when the id exists, false otherwise
+ */
+ public boolean contains(String id) {
+ return eventsIds.contains(id);
+ }
+
+ Set<String> getCachedEvents() {
+ return new HashSet<>(eventsIds);
+ }
+}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCacheTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCacheTest.java
new file mode 100644
index 0000000..6a77e0f
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/IncomingEventsCacheTest.java
@@ -0,0 +1,63 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class IncomingEventsCacheTest {
+
+ private static final String id1 = "123.dmaap-dr-node";
+ private static final String id2 = "987.dmaap-dr-node";
+
+ @After
+ public void resetCache() {
+ IncomingEventsCache cache = IncomingEventsCache.INSTANCE;
+ for(String id: cache.getCachedEvents()) {
+ cache.remove(id);
+ }
+ }
+
+ @Test
+ public void shouldContainEventAfterAddingItToCache() {
+ IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
+ eventsCache.add(id1);
+ assertEquals(1, IncomingEventsCache.INSTANCE.getCachedEvents().size());
+ assertTrue(IncomingEventsCache.INSTANCE.getCachedEvents().contains(id1));
+ }
+
+ @Test
+ public void shouldRemoveEventFromCache() {
+ IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
+
+ assertEquals(0, IncomingEventsCache.INSTANCE.getCachedEvents().size());
+ eventsCache.add(id1);
+ eventsCache.add(id1);
+ eventsCache.add(id2);
+ assertEquals(2, IncomingEventsCache.INSTANCE.getCachedEvents().size());
+ eventsCache.remove(id1);
+ assertEquals(1, IncomingEventsCache.INSTANCE.getCachedEvents().size());
+ assertTrue(IncomingEventsCache.INSTANCE.getCachedEvents().contains(id2));
+ }
+}
diff --git a/version.properties b/version.properties
index 303a703..3f9d877 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=1
minor=5
-patch=1
+patch=2
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT