summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java65
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java69
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java6
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java19
6 files changed, 140 insertions, 23 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 6ebc61c..9abe086 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -33,12 +33,17 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
+import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
+import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
+import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
@@ -50,6 +55,7 @@ import reactor.core.scheduler.Schedulers;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.List;
public class App {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
@@ -62,8 +68,12 @@ public class App {
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
MetadataFilter metadataFilter = new MetadataFilter(mapperConfig);
+ MeasConverter measConverter = new MeasConverter();
+ MeasFilterHandler filterHandler = new MeasFilterHandler(measConverter);
Mapper mapper = new Mapper(mappingTemplate);
+ MeasSplitter splitter = new MeasSplitter(measConverter);
XMLValidator validator = new XMLValidator(xmlSchema);
+
flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
.limitRate(1)
@@ -71,9 +81,13 @@ public class App {
.runOn(Schedulers.newParallel(""), 1)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
.filter(metadataFilter::filter)
+ .filter(filterHandler::filterByFileType)
.filter(validator::validate)
- .map(mapper::map)
- .subscribe(event -> logger.unwrap().info("Event Processed"));
+ .concatMap(event -> App.split(splitter,event, mapperConfig))
+ .filter(events -> App.filter(filterHandler, events, mapperConfig))
+ .concatMap(events -> App.map(mapper, events, mapperConfig))
+ .subscribe(events -> logger.unwrap().info("Event Processed"));
+
DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
dataRouterSubscriber.start();
ArrayList<Configurable> configurables = new ArrayList<>();
@@ -89,6 +103,53 @@ public class App {
.build().start();
}
+ public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
+ Event event = events.get(0);
+ boolean hasMatchingFilter = false;
+ try {
+ hasMatchingFilter = filterHandler.filterByMeasType(events);
+ if(!hasMatchingFilter) {
+ sendEventProcessed(config,event);
+ }
+ } catch (Exception exception) {
+ logger.unwrap().error(exception.getMessage(),exception);
+ sendEventProcessed(config,event);
+ }
+ return hasMatchingFilter;
+ }
+
+ public static Flux<List<Event>> map(Mapper mapper, List<Event> events, MapperConfig config) {
+ List<Event> mappedEvents = new ArrayList<>();
+ try {
+ mappedEvents = mapper.mapEvents(events);
+ } catch (Exception exception) {
+ logger.unwrap().error(exception.getMessage(),exception);
+ sendEventProcessed(config,events.get(0));
+ return Flux.<List<Event>>empty();
+ }
+ return Flux.just(mappedEvents);
+ }
+
+ public static Flux<List<Event>> split(MeasSplitter splitter, Event event, MapperConfig config) {
+ List<Event> splitEvents = new ArrayList<>();
+ try {
+ splitEvents = splitter.split(event);
+ } catch (Exception exception) {
+ logger.unwrap().error(exception.getMessage(),exception);
+ sendEventProcessed(config,event);
+ return Flux.<List<Event>>empty();
+ }
+ return Flux.just(splitEvents);
+ }
+
+ public static void sendEventProcessed(MapperConfig config, Event event) {
+ try {
+ DataRouterUtils.processEvent(config, event);
+ } catch (ProcessEventException exception) {
+ logger.unwrap().error("Process event failure", exception);
+ }
+ }
+
/**
* Takes the exchange from an event, responds with a 429 and un-dispatches the exchange.
* @param event to be ignored.
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java
index a6017b6..7aec457 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java
@@ -24,8 +24,10 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
+
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile;
import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile.MeasData;
@@ -43,28 +45,30 @@ import org.slf4j.LoggerFactory;
**/
public class MeasFilterHandler {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MeasFilterHandler.class));
- private Filter filter;
private MeasConverter converter;
public MeasFilterHandler(MeasConverter converter) {
this.converter = converter;
}
- public void setFilter(Filter filter) {
- this.filter = filter;
- }
-
/**
* Filters each measInfo node for measTypes that match the given measTypes from filters.
**/
public boolean filterByMeasType(Event event) {
- logger.unwrap().debug("Filtering the measurement file.");
-
+ Optional<Filter> filter = Optional.ofNullable(event.getFilter());
MeasCollecFile measCollecFile = event.getMeasCollecFile();
- if(filter.getMeasTypes().isEmpty() || measCollecFile.getMeasData().isEmpty()) {
+
+ if(hasNoFilters(filter)) {
+ logger.unwrap().info("Skipping filtering by measTypes as filter config does not contain measTypes.");
+ return true;
+ }
+
+ if(measCollecFile.getMeasData().isEmpty()) {
+ logger.unwrap().info("Measurement file will not be processed further as MeasData is empty.");
return false;
}
+ logger.unwrap().info("Filtering the measurement file by measTypes.");
MeasData measData = measCollecFile.getMeasData().get(0);
List<MeasInfo> measInfos = measData.getMeasInfo();
List<MeasInfo> filteredMeasInfos = new ArrayList<>();
@@ -73,23 +77,62 @@ public class MeasFilterHandler {
MeasInfo currentMeasInfo = measInfos.get(i);
List<String> measTypesNode = currentMeasInfo.getMeasTypes();
if(!measTypesNode.isEmpty()) {
- setMeasInfosFromMeasTypes(currentMeasInfo,filteredMeasInfos);
+ setMeasInfosFromMeasTypes(currentMeasInfo,filteredMeasInfos, filter.get());
}else {
- setMeasInfoFromMeasType(currentMeasInfo,filteredMeasInfos);
+ setMeasInfoFromMeasType(currentMeasInfo,filteredMeasInfos, filter.get());
}
}
if (filteredMeasInfos.isEmpty()) {
+ logger.unwrap().info("No filter match from the current measurement file.");
return false;
}
-
measData.setMeasInfo(filteredMeasInfos);
String filteredXMl = converter.convert(measCollecFile);
event.setBody(filteredXMl);
+ logger.unwrap().info("Successfully filtered the measurement by measTypes.");
+ return true;
+ }
+
+ /**
+ * Filters each measInfo node in the list for measTypes that match the given measTypes from filters.
+ **/
+ public boolean filterByMeasType(List<Event> events) {
+ boolean hasMatchAnyFilter = false;
+ for (int i = 0; i < events.size(); i++) {
+ Event currentEvent = events.get(i);
+ boolean hasMatchingFilter = filterByMeasType(currentEvent);
+ if (hasMatchingFilter) {
+ hasMatchAnyFilter = true;
+ } else {
+ events.remove(events.get(i));
+ }
+ }
+
+ if (!hasMatchAnyFilter) {
+ logger.unwrap().info("No filter match from all measurement files.");
+ return false;
+ }
+
return true;
}
- private void setMeasInfoFromMeasType(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos) {
+ private boolean hasNoFilters(Optional<Filter> filter) {
+ return !filter.isPresent() || filter.get().getMeasTypes().isEmpty();
+ }
+
+
+ /**
+ * Filters the measurement by file type. Measurement files starting with A or C are valid.
+ **/
+ public boolean filterByFileType(Event event) {
+ logger.unwrap().debug("Filtering the measurement by file type.");
+ String requestPath = event.getHttpServerExchange().getRequestPath();
+ String fileName = requestPath.substring(requestPath.lastIndexOf('/')+1);
+ return (fileName.startsWith("C") || fileName.startsWith("A"));
+ }
+
+ private void setMeasInfoFromMeasType(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos, Filter filter) {
MeasValue currentMeasValue = currentMeasInfo.getMeasValue()
.get(0);
List<R> measResultsRNodes = currentMeasValue.getR();
@@ -114,7 +157,7 @@ public class MeasFilterHandler {
}
- private void setMeasInfosFromMeasTypes(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos) {
+ private void setMeasInfosFromMeasTypes(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos, Filter filter) {
MeasValue currentMeasValue = currentMeasInfo.getMeasValue()
.get(0);
List<String> measTypesNode = currentMeasInfo.getMeasTypes();
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java
index 0ad26e3..44bbc27 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java
@@ -42,6 +42,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -62,6 +63,11 @@ public class Mapper {
}
}
+ public List<Event> mapEvents(List<Event> events) {
+ events.forEach(event -> event.setVes(this.map(event)));
+ return events;
+ }
+
public String map(@NonNull Event event) {
logger.unwrap().info("Mapping event");
NodeModel pmNodeModel;
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
index fa41142..9eadb8a 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -45,4 +45,6 @@ public class Event {
private MeasCollecFile measCollecFile;
private Filter filter;
+
+ private String ves;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
index 9525ec7..f30fb96 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
@@ -42,7 +42,7 @@ public class DataRouterUtils {
logger.unwrap().info("Sending processed to DataRouter");
String baseDelete = config.getDmaapDRDeleteEndpoint();
String subscriberIdentity = config.getSubscriberIdentity();
- String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
+ String delete = String.format("%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
try {
return new RequestSender().send("DELETE", delete);
} catch (Exception exception) {
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java
index 009e454..1821803 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java
@@ -22,18 +22,16 @@ package org.onap.dcaegen2.services.pmmapper.utils;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import org.onap.dcaegen2.services.pmmapper.model.Event;
-import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import java.util.NoSuchElementException;
+
import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile;
import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile.MeasData;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
-import io.undertow.server.HttpServerExchange;
-
/**
* Splits the MeasCollecFile based on MeasData.
**/
@@ -44,11 +42,17 @@ public class MeasSplitter {
public MeasSplitter(MeasConverter converter) {
this.converter = converter;
}
-
+
+ /**
+ * Splits the MeasCollecFile to multiple MeasCollecFile based on the number of MeasData
+ **/
public List<Event> split(Event event) {
- logger.unwrap().debug("Splitting 3GPP xml MeasData to MeasCollecFile");
+ logger.unwrap().debug("Splitting 3GPP xml MeasData to individual MeasCollecFile");
MeasCollecFile currentMeasurement = converter.convert(event.getBody());
-
+ event.setMeasCollecFile(currentMeasurement);
+ if(currentMeasurement.getMeasData().isEmpty()) {
+ throw new NoSuchElementException("MeasData is empty.");
+ }
return currentMeasurement.getMeasData().stream().map( measData -> {
Event newEvent = generateNewEvent(event);
MeasCollecFile newMeasCollec = generateNewMeasCollec(newEvent,measData);
@@ -70,6 +74,7 @@ public class MeasSplitter {
event.getBody(), event.getMetadata(), event.getMdc(),
event.getPublishIdentity());
modifiedEvent.setMeasCollecFile(event.getMeasCollecFile());
+ modifiedEvent.setFilter(event.getFilter());
return modifiedEvent;
}
}