summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
diff options
context:
space:
mode:
authorJoanna Jeremicz <joanna.jeremicz@nokia.com>2021-03-12 18:03:30 +0100
committerJoanna Jeremicz <joanna.jeremicz@nokia.com>2021-03-18 10:55:33 +0100
commit45ead28846e909d2675d2dc933d60d854a958c89 (patch)
tree02592e284a759fc0a728a38efab9ca9893d006f2 /src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
parentc641992ced6b848a4127fb830669ff0c89e4d55e (diff)
Incoming events cache - do not load files, which are in progresshonolulu
- Implement singleton cache for events being processed - Add JUnit tests - Bump snapshot version 1.5.1 -> 1.5.2 Issue-ID: DCAEGEN2-2659 Signed-off-by: Joanna Jeremicz <joanna.jeremicz@nokia.com> Change-Id: I79f32554a8c67d91d0d7b601bc8b060ff4984eab
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/pmmapper/App.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java21
1 files changed, 21 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 4583df3..b020837 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -48,6 +48,7 @@ import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
import org.onap.dcaegen2.services.pmmapper.model.ServerResource;
import org.onap.dcaegen2.services.pmmapper.ssl.SSLContextFactory;
import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.dcaegen2.services.pmmapper.utils.IncomingEventsCache;
import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
@@ -80,6 +81,7 @@ public class App {
private static final int HTTPS_PORT = 8443;
private static final int INITIAL_RECONFIGURATION_PERIOD = 60;
private static final int RECONFIGURATION_PERIOD = 60;
+ private static final IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
@@ -139,6 +141,8 @@ public class App {
this.flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
+ .filter(event -> !isCached(event.getPublishIdentity()))
+ .doOnNext(App::addToCache)
.limitRate(processingConfig.getLimitRate())
.parallel(processingThreads)
.runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
@@ -204,6 +208,14 @@ public class App {
}
}
+ private boolean isCached(String id) {
+ boolean isPresent = eventsCache.contains(id);
+ if(isPresent) {
+ logger.unwrap().info("Skipping. This event is already waiting in cache to be processed: " + id);
+ }
+ return isPresent;
+ }
+
public static void main(String[] args) throws EnvironmentConfigException {
FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
@@ -280,6 +292,7 @@ public class App {
public static void sendEventProcessed(MapperConfig config, Event event) {
try {
DataRouterUtils.processEvent(config, event);
+ eventsCache.remove(event.getPublishIdentity());
} catch (ProcessEventException exception) {
logger.unwrap().error("Process event failure", exception);
}
@@ -300,6 +313,14 @@ public class App {
}
/**
+ * Adds received event to cache
+ * @param event to be cached.
+ */
+ public static void addToCache(@NonNull Event event) {
+ eventsCache.add(event.getPublishIdentity());
+ }
+
+ /**
* Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
* @param event to be received.
*/