diff options
author | emartin <ephraim.martin@est.tech> | 2019-03-14 15:50:49 +0000 |
---|---|---|
committer | emartin <ephraim.martin@est.tech> | 2019-03-14 15:50:49 +0000 |
commit | 91cd8af37a7d5c285ac02ddcbd022bb14312106c (patch) | |
tree | 6890e0b5c9bc3a76d92c133ea957cc9388b16f05 /src/main | |
parent | f4e3a4319568de5bcadaaa2e9f5ed668ae80db80 (diff) |
Integrate split and filter in main App
Change-Id: I9da0852072409854118e46aba63c491bdd53fd28
Issue-ID: DCAEGEN2-1038
Signed-off-by: emartin <ephraim.martin@est.tech>
Diffstat (limited to 'src/main')
6 files changed, 140 insertions, 23 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 6ebc61c..9abe086 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -33,12 +33,17 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; +import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter; +import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler; +import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils; +import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter; +import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter; import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.slf4j.LoggerFactory; @@ -50,6 +55,7 @@ import reactor.core.scheduler.Schedulers; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.List; public class App { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class)); @@ -62,8 +68,12 @@ public class App { HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); MetadataFilter metadataFilter = new MetadataFilter(mapperConfig); + MeasConverter measConverter = new MeasConverter(); + MeasFilterHandler filterHandler = new MeasFilterHandler(measConverter); Mapper mapper = new Mapper(mappingTemplate); + MeasSplitter splitter = new MeasSplitter(measConverter); XMLValidator validator = new XMLValidator(xmlSchema); + flux.onBackpressureDrop(App::handleBackPressure) .doOnNext(App::receiveRequest) .limitRate(1) @@ -71,9 +81,13 @@ public class App { .runOn(Schedulers.newParallel(""), 1) .doOnNext(event -> MDC.setContextMap(event.getMdc())) .filter(metadataFilter::filter) + .filter(filterHandler::filterByFileType) .filter(validator::validate) - .map(mapper::map) - .subscribe(event -> logger.unwrap().info("Event Processed")); + .concatMap(event -> App.split(splitter,event, mapperConfig)) + .filter(events -> App.filter(filterHandler, events, mapperConfig)) + .concatMap(events -> App.map(mapper, events, mapperConfig)) + .subscribe(events -> logger.unwrap().info("Event Processed")); + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig); dataRouterSubscriber.start(); ArrayList<Configurable> configurables = new ArrayList<>(); @@ -89,6 +103,53 @@ public class App { .build().start(); } + public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) { + Event event = events.get(0); + boolean hasMatchingFilter = false; + try { + hasMatchingFilter = filterHandler.filterByMeasType(events); + if(!hasMatchingFilter) { + sendEventProcessed(config,event); + } + } catch (Exception exception) { + logger.unwrap().error(exception.getMessage(),exception); + sendEventProcessed(config,event); + } + return hasMatchingFilter; + } + + public static Flux<List<Event>> map(Mapper mapper, List<Event> events, MapperConfig config) { + List<Event> mappedEvents = new ArrayList<>(); + try { + mappedEvents = mapper.mapEvents(events); + } catch (Exception exception) { + logger.unwrap().error(exception.getMessage(),exception); + sendEventProcessed(config,events.get(0)); + return Flux.<List<Event>>empty(); + } + return Flux.just(mappedEvents); + } + + public static Flux<List<Event>> split(MeasSplitter splitter, Event event, MapperConfig config) { + List<Event> splitEvents = new ArrayList<>(); + try { + splitEvents = splitter.split(event); + } catch (Exception exception) { + logger.unwrap().error(exception.getMessage(),exception); + sendEventProcessed(config,event); + return Flux.<List<Event>>empty(); + } + return Flux.just(splitEvents); + } + + public static void sendEventProcessed(MapperConfig config, Event event) { + try { + DataRouterUtils.processEvent(config, event); + } catch (ProcessEventException exception) { + logger.unwrap().error("Process event failure", exception); + } + } + /** * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange. * @param event to be ignored. diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java index a6017b6..7aec457 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java @@ -24,8 +24,10 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; + import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile; import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile.MeasData; @@ -43,28 +45,30 @@ import org.slf4j.LoggerFactory; **/ public class MeasFilterHandler { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MeasFilterHandler.class)); - private Filter filter; private MeasConverter converter; public MeasFilterHandler(MeasConverter converter) { this.converter = converter; } - public void setFilter(Filter filter) { - this.filter = filter; - } - /** * Filters each measInfo node for measTypes that match the given measTypes from filters. **/ public boolean filterByMeasType(Event event) { - logger.unwrap().debug("Filtering the measurement file."); - + Optional<Filter> filter = Optional.ofNullable(event.getFilter()); MeasCollecFile measCollecFile = event.getMeasCollecFile(); - if(filter.getMeasTypes().isEmpty() || measCollecFile.getMeasData().isEmpty()) { + + if(hasNoFilters(filter)) { + logger.unwrap().info("Skipping filtering by measTypes as filter config does not contain measTypes."); + return true; + } + + if(measCollecFile.getMeasData().isEmpty()) { + logger.unwrap().info("Measurement file will not be processed further as MeasData is empty."); return false; } + logger.unwrap().info("Filtering the measurement file by measTypes."); MeasData measData = measCollecFile.getMeasData().get(0); List<MeasInfo> measInfos = measData.getMeasInfo(); List<MeasInfo> filteredMeasInfos = new ArrayList<>(); @@ -73,23 +77,62 @@ public class MeasFilterHandler { MeasInfo currentMeasInfo = measInfos.get(i); List<String> measTypesNode = currentMeasInfo.getMeasTypes(); if(!measTypesNode.isEmpty()) { - setMeasInfosFromMeasTypes(currentMeasInfo,filteredMeasInfos); + setMeasInfosFromMeasTypes(currentMeasInfo,filteredMeasInfos, filter.get()); }else { - setMeasInfoFromMeasType(currentMeasInfo,filteredMeasInfos); + setMeasInfoFromMeasType(currentMeasInfo,filteredMeasInfos, filter.get()); } } if (filteredMeasInfos.isEmpty()) { + logger.unwrap().info("No filter match from the current measurement file."); return false; } - measData.setMeasInfo(filteredMeasInfos); String filteredXMl = converter.convert(measCollecFile); event.setBody(filteredXMl); + logger.unwrap().info("Successfully filtered the measurement by measTypes."); + return true; + } + + /** + * Filters each measInfo node in the list for measTypes that match the given measTypes from filters. + **/ + public boolean filterByMeasType(List<Event> events) { + boolean hasMatchAnyFilter = false; + for (int i = 0; i < events.size(); i++) { + Event currentEvent = events.get(i); + boolean hasMatchingFilter = filterByMeasType(currentEvent); + if (hasMatchingFilter) { + hasMatchAnyFilter = true; + } else { + events.remove(events.get(i)); + } + } + + if (!hasMatchAnyFilter) { + logger.unwrap().info("No filter match from all measurement files."); + return false; + } + return true; } - private void setMeasInfoFromMeasType(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos) { + private boolean hasNoFilters(Optional<Filter> filter) { + return !filter.isPresent() || filter.get().getMeasTypes().isEmpty(); + } + + + /** + * Filters the measurement by file type. Measurement files starting with A or C are valid. + **/ + public boolean filterByFileType(Event event) { + logger.unwrap().debug("Filtering the measurement by file type."); + String requestPath = event.getHttpServerExchange().getRequestPath(); + String fileName = requestPath.substring(requestPath.lastIndexOf('/')+1); + return (fileName.startsWith("C") || fileName.startsWith("A")); + } + + private void setMeasInfoFromMeasType(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos, Filter filter) { MeasValue currentMeasValue = currentMeasInfo.getMeasValue() .get(0); List<R> measResultsRNodes = currentMeasValue.getR(); @@ -114,7 +157,7 @@ public class MeasFilterHandler { } - private void setMeasInfosFromMeasTypes(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos) { + private void setMeasInfosFromMeasTypes(MeasInfo currentMeasInfo, List<MeasInfo> filteredMeasInfos, Filter filter) { MeasValue currentMeasValue = currentMeasInfo.getMeasValue() .get(0); List<String> measTypesNode = currentMeasInfo.getMeasTypes(); diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java index 0ad26e3..44bbc27 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java @@ -42,6 +42,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -62,6 +63,11 @@ public class Mapper { } } + public List<Event> mapEvents(List<Event> events) { + events.forEach(event -> event.setVes(this.map(event))); + return events; + } + public String map(@NonNull Event event) { logger.unwrap().info("Mapping event"); NodeModel pmNodeModel; diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java index fa41142..9eadb8a 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -45,4 +45,6 @@ public class Event { private MeasCollecFile measCollecFile; private Filter filter; + + private String ves; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java index 9525ec7..f30fb96 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java @@ -42,7 +42,7 @@ public class DataRouterUtils { logger.unwrap().info("Sending processed to DataRouter"); String baseDelete = config.getDmaapDRDeleteEndpoint(); String subscriberIdentity = config.getSubscriberIdentity(); - String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity()); + String delete = String.format("%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity()); try { return new RequestSender().send("DELETE", delete); } catch (Exception exception) { diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java index 009e454..1821803 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java @@ -22,18 +22,16 @@ package org.onap.dcaegen2.services.pmmapper.utils; import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import org.onap.dcaegen2.services.pmmapper.model.Event;
-import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import java.util.NoSuchElementException;
+
import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile;
import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile.MeasData;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
-import io.undertow.server.HttpServerExchange;
-
/**
* Splits the MeasCollecFile based on MeasData.
**/
@@ -44,11 +42,17 @@ public class MeasSplitter { public MeasSplitter(MeasConverter converter) {
this.converter = converter;
}
-
+ + /**
+ * Splits the MeasCollecFile to multiple MeasCollecFile based on the number of MeasData
+ **/
public List<Event> split(Event event) {
- logger.unwrap().debug("Splitting 3GPP xml MeasData to MeasCollecFile");
+ logger.unwrap().debug("Splitting 3GPP xml MeasData to individual MeasCollecFile");
MeasCollecFile currentMeasurement = converter.convert(event.getBody());
-
+ event.setMeasCollecFile(currentMeasurement);
+ if(currentMeasurement.getMeasData().isEmpty()) {
+ throw new NoSuchElementException("MeasData is empty.");
+ }
return currentMeasurement.getMeasData().stream().map( measData -> {
Event newEvent = generateNewEvent(event);
MeasCollecFile newMeasCollec = generateNewMeasCollec(newEvent,measData);
@@ -70,6 +74,7 @@ public class MeasSplitter { event.getBody(), event.getMetadata(), event.getMdc(),
event.getPublishIdentity());
modifiedEvent.setMeasCollecFile(event.getMeasCollecFile());
+ modifiedEvent.setFilter(event.getFilter());
return modifiedEvent;
}
}
|