diff options
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/pmmapper/App.java')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/pmmapper/App.java | 50 |
1 files changed, 37 insertions, 13 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 25e3918..a5eb68d 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -28,13 +28,11 @@ import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; import org.onap.dcaegen2.services.pmmapper.config.Configurable; import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration; -import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; -import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; +import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException; -import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter; import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; @@ -67,7 +65,7 @@ public class App { private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd"); private static FluxSink<Event> fluxSink; - public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException, IOException { + public static void main(String[] args) throws EnvironmentConfigException, CBSServerError, MapperConfigException, IOException { Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); @@ -86,18 +84,16 @@ public class App { .runOn(Schedulers.newParallel(""), 1) .doOnNext(event -> MDC.setContextMap(event.getMdc())) .filter(metadataFilter::filter) - .filter(filterHandler::filterByFileType) - .filter(validator::validate) + .filter(event -> App.filterByFileType(filterHandler, event, mapperConfig)) + .filter(event -> App.validate(validator, event, mapperConfig)) .concatMap(event -> App.split(splitter,event, mapperConfig)) .filter(events -> App.filter(filterHandler, events, mapperConfig)) .concatMap(events -> App.map(mapper, events, mapperConfig)) .concatMap(vesPublisher::publish) .subscribe(event -> App.sendEventProcessed(mapperConfig, event)); - DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig); - dataRouterSubscriber.start(); + DeliveryHandler deliveryHandler = new DeliveryHandler(fluxSink::next); ArrayList<Configurable> configurables = new ArrayList<>(); - configurables.add(dataRouterSubscriber); configurables.add(mapperConfig); DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig); @@ -113,12 +109,40 @@ public class App { builder.addHttpsListener(8443, "0.0.0.0", sslContext) .setHandler(Handlers.routing() - .add("put", "/delivery/{filename}", dataRouterSubscriber) + .add("put", "/delivery/{filename}", deliveryHandler) .add("get", "/healthcheck", healthCheckHandler) .add("get", "/reconfigure", dynamicConfiguration)) .build().start(); } + public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) { + boolean hasValidFileName = false; + try { + hasValidFileName = filterHandler.filterByFileType(event); + if(!hasValidFileName) { + sendEventProcessed(config,event); + } + } catch (Exception exception) { + logger.unwrap().error("Unable to filter by file type", exception); + sendEventProcessed(config,event); + } + return hasValidFileName; + } + + public static boolean validate(XMLValidator validator, Event event, MapperConfig config) { + boolean isValidXML = false; + try { + isValidXML = validator.validate(event); + if(!isValidXML) { + sendEventProcessed(config,event); + } + } catch (Exception exception) { + logger.unwrap().error("Unable to validate XML",exception); + sendEventProcessed(config,event); + } + return isValidXML; + } + public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) { Event event = events.get(0); boolean hasMatchingFilter = false; @@ -128,7 +152,7 @@ public class App { sendEventProcessed(config,event); } } catch (Exception exception) { - logger.unwrap().error(exception.getMessage(),exception); + logger.unwrap().error("Unable to filter by Meas Types",exception); sendEventProcessed(config,event); } return hasMatchingFilter; @@ -139,7 +163,7 @@ public class App { try { mappedEvents = mapper.mapEvents(events); } catch (Exception exception) { - logger.unwrap().error(exception.getMessage(),exception); + logger.unwrap().error("Unable to map XML to VES",exception); sendEventProcessed(config,events.get(0)); return Flux.<List<Event>>empty(); } @@ -151,7 +175,7 @@ public class App { try { splitEvents = splitter.split(event); } catch (Exception exception) { - logger.unwrap().error(exception.getMessage(),exception); + logger.unwrap().error("Unable to split MeasCollecFile",exception); sendEventProcessed(config,event); return Flux.<List<Event>>empty(); } |