From 91cd8af37a7d5c285ac02ddcbd022bb14312106c Mon Sep 17 00:00:00 2001 From: emartin Date: Thu, 14 Mar 2019 15:50:49 +0000 Subject: Integrate split and filter in main App Change-Id: I9da0852072409854118e46aba63c491bdd53fd28 Issue-ID: DCAEGEN2-1038 Signed-off-by: emartin --- .../org/onap/dcaegen2/services/pmmapper/App.java | 65 +++++++++++++++++++- .../pmmapper/filtering/MeasFilterHandler.java | 69 ++++++++++++++++++---- .../dcaegen2/services/pmmapper/mapping/Mapper.java | 6 ++ .../dcaegen2/services/pmmapper/model/Event.java | 2 + .../services/pmmapper/utils/DataRouterUtils.java | 2 +- .../services/pmmapper/utils/MeasSplitter.java | 19 +++--- 6 files changed, 140 insertions(+), 23 deletions(-) (limited to 'src/main') diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 6ebc61c..9abe086 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -33,12 +33,17 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; +import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter; +import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler; +import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils; +import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter; +import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter; import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.slf4j.LoggerFactory; @@ -50,6 +55,7 @@ import reactor.core.scheduler.Schedulers; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.List; public class App { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class)); @@ -62,8 +68,12 @@ public class App { HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); MetadataFilter metadataFilter = new MetadataFilter(mapperConfig); + MeasConverter measConverter = new MeasConverter(); + MeasFilterHandler filterHandler = new MeasFilterHandler(measConverter); Mapper mapper = new Mapper(mappingTemplate); + MeasSplitter splitter = new MeasSplitter(measConverter); XMLValidator validator = new XMLValidator(xmlSchema); + flux.onBackpressureDrop(App::handleBackPressure) .doOnNext(App::receiveRequest) .limitRate(1) @@ -71,9 +81,13 @@ public class App { .runOn(Schedulers.newParallel(""), 1) .doOnNext(event -> MDC.setContextMap(event.getMdc())) .filter(metadataFilter::filter) + .filter(filterHandler::filterByFileType) .filter(validator::validate) - .map(mapper::map) - .subscribe(event -> logger.unwrap().info("Event Processed")); + .concatMap(event -> App.split(splitter,event, mapperConfig)) + .filter(events -> App.filter(filterHandler, events, mapperConfig)) + .concatMap(events -> App.map(mapper, events, mapperConfig)) + .subscribe(events -> logger.unwrap().info("Event Processed")); + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig); dataRouterSubscriber.start(); ArrayList configurables = new ArrayList<>(); @@ -89,6 +103,53 @@ public class App { .build().start(); } + public static boolean filter(MeasFilterHandler filterHandler, List events, MapperConfig config) { + Event event = events.get(0); + boolean hasMatchingFilter = false; + try { + hasMatchingFilter = filterHandler.filterByMeasType(events); + if(!hasMatchingFilter) { + sendEventProcessed(config,event); + } + } catch (Exception exception) { + logger.unwrap().error(exception.getMessage(),exception); + sendEventProcessed(config,event); + } + return hasMatchingFilter; + } + + public static Flux> map(Mapper mapper, List events, MapperConfig config) { + List mappedEvents = new ArrayList<>(); + try { + mappedEvents = mapper.mapEvents(events); + } catch (Exception exception) { + logger.unwrap().error(exception.getMessage(),exception); + sendEventProcessed(config,events.get(0)); + return Flux.>empty(); + } + return Flux.just(mappedEvents); + } + + public static Flux> split(MeasSplitter splitter, Event event, MapperConfig config) { + List splitEvents = new ArrayList<>(); + try { + splitEvents = splitter.split(event); + } catch (Exception exception) { + logger.unwrap().error(exception.getMessage(),exception); + sendEventProcessed(config,event); + return Flux.>empty(); + } + return Flux.just(splitEvents); + } + + public static void sendEventProcessed(MapperConfig config, Event event) { + try { + DataRouterUtils.processEvent(config, event); + } catch (ProcessEventException exception) { + logger.unwrap().error("Process event failure", exception); + } + } + /** * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange. * @param event to be ignored. diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java index a6017b6..7aec457 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MeasFilterHandler.java @@ -24,8 +24,10 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; + import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile; import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile.MeasData; @@ -43,28 +45,30 @@ import org.slf4j.LoggerFactory; **/ public class MeasFilterHandler { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MeasFilterHandler.class)); - private Filter filter; private MeasConverter converter; public MeasFilterHandler(MeasConverter converter) { this.converter = converter; } - public void setFilter(Filter filter) { - this.filter = filter; - } - /** * Filters each measInfo node for measTypes that match the given measTypes from filters. **/ public boolean filterByMeasType(Event event) { - logger.unwrap().debug("Filtering the measurement file."); - + Optional filter = Optional.ofNullable(event.getFilter()); MeasCollecFile measCollecFile = event.getMeasCollecFile(); - if(filter.getMeasTypes().isEmpty() || measCollecFile.getMeasData().isEmpty()) { + + if(hasNoFilters(filter)) { + logger.unwrap().info("Skipping filtering by measTypes as filter config does not contain measTypes."); + return true; + } + + if(measCollecFile.getMeasData().isEmpty()) { + logger.unwrap().info("Measurement file will not be processed further as MeasData is empty."); return false; } + logger.unwrap().info("Filtering the measurement file by measTypes."); MeasData measData = measCollecFile.getMeasData().get(0); List measInfos = measData.getMeasInfo(); List filteredMeasInfos = new ArrayList<>(); @@ -73,23 +77,62 @@ public class MeasFilterHandler { MeasInfo currentMeasInfo = measInfos.get(i); List measTypesNode = currentMeasInfo.getMeasTypes(); if(!measTypesNode.isEmpty()) { - setMeasInfosFromMeasTypes(currentMeasInfo,filteredMeasInfos); + setMeasInfosFromMeasTypes(currentMeasInfo,filteredMeasInfos, filter.get()); }else { - setMeasInfoFromMeasType(currentMeasInfo,filteredMeasInfos); + setMeasInfoFromMeasType(currentMeasInfo,filteredMeasInfos, filter.get()); } } if (filteredMeasInfos.isEmpty()) { + logger.unwrap().info("No filter match from the current measurement file."); return false; } - measData.setMeasInfo(filteredMeasInfos); String filteredXMl = converter.convert(measCollecFile); event.setBody(filteredXMl); + logger.unwrap().info("Successfully filtered the measurement by measTypes."); + return true; + } + + /** + * Filters each measInfo node in the list for measTypes that match the given measTypes from filters. + **/ + public boolean filterByMeasType(List events) { + boolean hasMatchAnyFilter = false; + for (int i = 0; i < events.size(); i++) { + Event currentEvent = events.get(i); + boolean hasMatchingFilter = filterByMeasType(currentEvent); + if (hasMatchingFilter) { + hasMatchAnyFilter = true; + } else { + events.remove(events.get(i)); + } + } + + if (!hasMatchAnyFilter) { + logger.unwrap().info("No filter match from all measurement files."); + return false; + } + return true; } - private void setMeasInfoFromMeasType(MeasInfo currentMeasInfo, List filteredMeasInfos) { + private boolean hasNoFilters(Optional filter) { + return !filter.isPresent() || filter.get().getMeasTypes().isEmpty(); + } + + + /** + * Filters the measurement by file type. Measurement files starting with A or C are valid. + **/ + public boolean filterByFileType(Event event) { + logger.unwrap().debug("Filtering the measurement by file type."); + String requestPath = event.getHttpServerExchange().getRequestPath(); + String fileName = requestPath.substring(requestPath.lastIndexOf('/')+1); + return (fileName.startsWith("C") || fileName.startsWith("A")); + } + + private void setMeasInfoFromMeasType(MeasInfo currentMeasInfo, List filteredMeasInfos, Filter filter) { MeasValue currentMeasValue = currentMeasInfo.getMeasValue() .get(0); List measResultsRNodes = currentMeasValue.getR(); @@ -114,7 +157,7 @@ public class MeasFilterHandler { } - private void setMeasInfosFromMeasTypes(MeasInfo currentMeasInfo, List filteredMeasInfos) { + private void setMeasInfosFromMeasTypes(MeasInfo currentMeasInfo, List filteredMeasInfos, Filter filter) { MeasValue currentMeasValue = currentMeasInfo.getMeasValue() .get(0); List measTypesNode = currentMeasInfo.getMeasTypes(); diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java index 0ad26e3..44bbc27 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/mapping/Mapper.java @@ -42,6 +42,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -62,6 +63,11 @@ public class Mapper { } } + public List mapEvents(List events) { + events.forEach(event -> event.setVes(this.map(event))); + return events; + } + public String map(@NonNull Event event) { logger.unwrap().info("Mapping event"); NodeModel pmNodeModel; diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java index fa41142..9eadb8a 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -45,4 +45,6 @@ public class Event { private MeasCollecFile measCollecFile; private Filter filter; + + private String ves; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java index 9525ec7..f30fb96 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java @@ -42,7 +42,7 @@ public class DataRouterUtils { logger.unwrap().info("Sending processed to DataRouter"); String baseDelete = config.getDmaapDRDeleteEndpoint(); String subscriberIdentity = config.getSubscriberIdentity(); - String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity()); + String delete = String.format("%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity()); try { return new RequestSender().send("DELETE", delete); } catch (Exception exception) { diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java index 009e454..1821803 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/MeasSplitter.java @@ -22,18 +22,16 @@ package org.onap.dcaegen2.services.pmmapper.utils; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.onap.dcaegen2.services.pmmapper.model.Event; -import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import java.util.NoSuchElementException; + import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile; import org.onap.dcaegen2.services.pmmapper.model.MeasCollecFile.MeasData; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.slf4j.LoggerFactory; -import io.undertow.server.HttpServerExchange; - /** * Splits the MeasCollecFile based on MeasData. **/ @@ -44,11 +42,17 @@ public class MeasSplitter { public MeasSplitter(MeasConverter converter) { this.converter = converter; } - + + /** + * Splits the MeasCollecFile to multiple MeasCollecFile based on the number of MeasData + **/ public List split(Event event) { - logger.unwrap().debug("Splitting 3GPP xml MeasData to MeasCollecFile"); + logger.unwrap().debug("Splitting 3GPP xml MeasData to individual MeasCollecFile"); MeasCollecFile currentMeasurement = converter.convert(event.getBody()); - + event.setMeasCollecFile(currentMeasurement); + if(currentMeasurement.getMeasData().isEmpty()) { + throw new NoSuchElementException("MeasData is empty."); + } return currentMeasurement.getMeasData().stream().map( measData -> { Event newEvent = generateNewEvent(event); MeasCollecFile newMeasCollec = generateNewMeasCollec(newEvent,measData); @@ -70,6 +74,7 @@ public class MeasSplitter { event.getBody(), event.getMetadata(), event.getMdc(), event.getPublishIdentity()); modifiedEvent.setMeasCollecFile(event.getMeasCollecFile()); + modifiedEvent.setFilter(event.getFilter()); return modifiedEvent; } } -- cgit 1.2.3-korg