summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/pmmapper/App.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java21
1 files changed, 21 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 4583df3..b020837 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -48,6 +48,7 @@ import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
import org.onap.dcaegen2.services.pmmapper.model.ServerResource;
import org.onap.dcaegen2.services.pmmapper.ssl.SSLContextFactory;
import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.dcaegen2.services.pmmapper.utils.IncomingEventsCache;
import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
@@ -80,6 +81,7 @@ public class App {
private static final int HTTPS_PORT = 8443;
private static final int INITIAL_RECONFIGURATION_PERIOD = 60;
private static final int RECONFIGURATION_PERIOD = 60;
+ private static final IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
@@ -139,6 +141,8 @@ public class App {
this.flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
+ .filter(event -> !isCached(event.getPublishIdentity()))
+ .doOnNext(App::addToCache)
.limitRate(processingConfig.getLimitRate())
.parallel(processingThreads)
.runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
@@ -204,6 +208,14 @@ public class App {
}
}
+ private boolean isCached(String id) {
+ boolean isPresent = eventsCache.contains(id);
+ if(isPresent) {
+ logger.unwrap().info("Skipping. This event is already waiting in cache to be processed: " + id);
+ }
+ return isPresent;
+ }
+
public static void main(String[] args) throws EnvironmentConfigException {
FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
@@ -280,6 +292,7 @@ public class App {
public static void sendEventProcessed(MapperConfig config, Event event) {
try {
DataRouterUtils.processEvent(config, event);
+ eventsCache.remove(event.getPublishIdentity());
} catch (ProcessEventException exception) {
logger.unwrap().error("Process event failure", exception);
}
@@ -300,6 +313,14 @@ public class App {
}
/**
+ * Adds received event to cache
+ * @param event to be cached.
+ */
+ public static void addToCache(@NonNull Event event) {
+ eventsCache.add(event.getPublishIdentity());
+ }
+
+ /**
* Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
* @param event to be received.
*/