summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorJoeOLeary <joseph.o.leary@est.tech>2019-02-22 15:10:41 +0000
committerJoeOLeary <joseph.o.leary@est.tech>2019-02-22 15:10:41 +0000
commitf7e07b1bfda9ffe612b1339e4b95394448c700f4 (patch)
tree6d1b54e35d8a45901036fa8d4676f05eced44f07 /src/main
parent5e40fbfdf79be48a5ff19393f65c0e09309e868a (diff)
Add support for parallel event handling
*Add support for parallel event handling *Add support for handling backpressure *Update delivery end point Issue-ID: DCAEGEN2-1268 Change-Id: Id467080fa135b58f2d3e366f00fd8ad5e1c6ec06 Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java61
1 files changed, 48 insertions, 13 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 72d1509..e083466 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -24,6 +24,7 @@ import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.util.StatusCodes;
+import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
@@ -32,12 +33,16 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.scheduler.Schedulers;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -46,29 +51,59 @@ public class App {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
private static Path mappingTemplate = Paths.get("/opt/app/pm-mapper/etc/mapping.ftl");
private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd");
+ private static FluxSink<Event> fluxSink;
public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
+ Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
Mapper mapper = new Mapper(mappingTemplate);
XMLValidator validator = new XMLValidator(xmlSchema);
- DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> {
- event.getHttpServerExchange().unDispatch();
- event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING);
- MDC.setContextMap(event.getMdc());
- if(!validator.validate(event)){
- logger.unwrap().info("Event failed validation against schema.");
- } else {
- String ves = mapper.map(event);
- logger.unwrap().info("Mapped Event: {}", ves);
- }
- });
+ flux.onBackpressureDrop(App::handleBackPressure)
+ .doOnNext(App::receiveRequest)
+ .limitRate(1)
+ .parallel()
+ .runOn(Schedulers.newParallel(""), 1)
+ .doOnNext(event -> MDC.setContextMap(event.getMdc()))
+ .filter(validator::validate)
+ .map(mapper::map)
+ .subscribe(event -> logger.unwrap().info("Event Processed"));
+
+ DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
dataRouterSubscriber.start(mapperConfig);
Undertow.builder()
.addHttpListener(8081, "0.0.0.0")
- .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber)
- .add("get", "/healthcheck", healthCheckHandler))
+ .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber)
+ .add("get", "/healthcheck", healthCheckHandler))
.build().start();
}
+
+ /**
+ * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange.
+ * @param event to be ignored.
+ */
+ public static void handleBackPressure(@NonNull Event event) {
+ logger.unwrap().debug("Event will not be processed, responding with 429");
+ event.getHttpServerExchange()
+ .setStatusCode(StatusCodes.TOO_MANY_REQUESTS)
+ .getResponseSender()
+ .send(StatusCodes.TOO_MANY_REQUESTS_STRING);
+ event.getHttpServerExchange()
+ .unDispatch();
+ }
+
+ /**
+ * Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
+ * @param event to be received.
+ */
+ public static void receiveRequest(@NonNull Event event) {
+ logger.unwrap().debug("Event will be processed, responding with 200");
+ event.getHttpServerExchange()
+ .setStatusCode(StatusCodes.OK)
+ .getResponseSender()
+ .send(StatusCodes.OK_STRING);
+ event.getHttpServerExchange()
+ .unDispatch();
+ }
}