summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/pmmapper/App.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java50
1 files changed, 37 insertions, 13 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 25e3918..a5eb68d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -28,13 +28,11 @@ import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
import org.onap.dcaegen2.services.pmmapper.config.Configurable;
import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
-import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
@@ -67,7 +65,7 @@ public class App {
private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd");
private static FluxSink<Event> fluxSink;
- public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
+ public static void main(String[] args) throws EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
@@ -86,18 +84,16 @@ public class App {
.runOn(Schedulers.newParallel(""), 1)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
.filter(metadataFilter::filter)
- .filter(filterHandler::filterByFileType)
- .filter(validator::validate)
+ .filter(event -> App.filterByFileType(filterHandler, event, mapperConfig))
+ .filter(event -> App.validate(validator, event, mapperConfig))
.concatMap(event -> App.split(splitter,event, mapperConfig))
.filter(events -> App.filter(filterHandler, events, mapperConfig))
.concatMap(events -> App.map(mapper, events, mapperConfig))
.concatMap(vesPublisher::publish)
.subscribe(event -> App.sendEventProcessed(mapperConfig, event));
- DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
- dataRouterSubscriber.start();
+ DeliveryHandler deliveryHandler = new DeliveryHandler(fluxSink::next);
ArrayList<Configurable> configurables = new ArrayList<>();
- configurables.add(dataRouterSubscriber);
configurables.add(mapperConfig);
DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig);
@@ -113,12 +109,40 @@ public class App {
builder.addHttpsListener(8443, "0.0.0.0", sslContext)
.setHandler(Handlers.routing()
- .add("put", "/delivery/{filename}", dataRouterSubscriber)
+ .add("put", "/delivery/{filename}", deliveryHandler)
.add("get", "/healthcheck", healthCheckHandler)
.add("get", "/reconfigure", dynamicConfiguration))
.build().start();
}
+ public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
+ boolean hasValidFileName = false;
+ try {
+ hasValidFileName = filterHandler.filterByFileType(event);
+ if(!hasValidFileName) {
+ sendEventProcessed(config,event);
+ }
+ } catch (Exception exception) {
+ logger.unwrap().error("Unable to filter by file type", exception);
+ sendEventProcessed(config,event);
+ }
+ return hasValidFileName;
+ }
+
+ public static boolean validate(XMLValidator validator, Event event, MapperConfig config) {
+ boolean isValidXML = false;
+ try {
+ isValidXML = validator.validate(event);
+ if(!isValidXML) {
+ sendEventProcessed(config,event);
+ }
+ } catch (Exception exception) {
+ logger.unwrap().error("Unable to validate XML",exception);
+ sendEventProcessed(config,event);
+ }
+ return isValidXML;
+ }
+
public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
Event event = events.get(0);
boolean hasMatchingFilter = false;
@@ -128,7 +152,7 @@ public class App {
sendEventProcessed(config,event);
}
} catch (Exception exception) {
- logger.unwrap().error(exception.getMessage(),exception);
+ logger.unwrap().error("Unable to filter by Meas Types",exception);
sendEventProcessed(config,event);
}
return hasMatchingFilter;
@@ -139,7 +163,7 @@ public class App {
try {
mappedEvents = mapper.mapEvents(events);
} catch (Exception exception) {
- logger.unwrap().error(exception.getMessage(),exception);
+ logger.unwrap().error("Unable to map XML to VES",exception);
sendEventProcessed(config,events.get(0));
return Flux.<List<Event>>empty();
}
@@ -151,7 +175,7 @@ public class App {
try {
splitEvents = splitter.split(event);
} catch (Exception exception) {
- logger.unwrap().error(exception.getMessage(),exception);
+ logger.unwrap().error("Unable to split MeasCollecFile",exception);
sendEventProcessed(config,event);
return Flux.<List<Event>>empty();
}