diff options
author | JoeOLeary <joseph.o.leary@est.tech> | 2019-02-22 15:10:41 +0000 |
---|---|---|
committer | JoeOLeary <joseph.o.leary@est.tech> | 2019-02-22 15:10:41 +0000 |
commit | f7e07b1bfda9ffe612b1339e4b95394448c700f4 (patch) | |
tree | 6d1b54e35d8a45901036fa8d4676f05eced44f07 /src/main/java/org/onap | |
parent | 5e40fbfdf79be48a5ff19393f65c0e09309e868a (diff) |
Add support for parallel event handling
*Add support for parallel event handling
*Add support for handling backpressure
*Update delivery end point
Issue-ID: DCAEGEN2-1268
Change-Id: Id467080fa135b58f2d3e366f00fd8ad5e1c6ec06
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src/main/java/org/onap')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/pmmapper/App.java | 61 |
1 files changed, 48 insertions, 13 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 72d1509..e083466 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -24,6 +24,7 @@ import io.undertow.Handlers; import io.undertow.Undertow; import io.undertow.util.StatusCodes; +import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; @@ -32,12 +33,16 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; +import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler; import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; import java.nio.file.Path; import java.nio.file.Paths; @@ -46,29 +51,59 @@ public class App { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class)); private static Path mappingTemplate = Paths.get("/opt/app/pm-mapper/etc/mapping.ftl"); private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd"); + private static FluxSink<Event> fluxSink; public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException { + Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); Mapper mapper = new Mapper(mappingTemplate); XMLValidator validator = new XMLValidator(xmlSchema); - DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> { - event.getHttpServerExchange().unDispatch(); - event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING); - MDC.setContextMap(event.getMdc()); - if(!validator.validate(event)){ - logger.unwrap().info("Event failed validation against schema."); - } else { - String ves = mapper.map(event); - logger.unwrap().info("Mapped Event: {}", ves); - } - }); + flux.onBackpressureDrop(App::handleBackPressure) + .doOnNext(App::receiveRequest) + .limitRate(1) + .parallel() + .runOn(Schedulers.newParallel(""), 1) + .doOnNext(event -> MDC.setContextMap(event.getMdc())) + .filter(validator::validate) + .map(mapper::map) + .subscribe(event -> logger.unwrap().info("Event Processed")); + + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next); MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); dataRouterSubscriber.start(mapperConfig); Undertow.builder() .addHttpListener(8081, "0.0.0.0") - .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber) - .add("get", "/healthcheck", healthCheckHandler)) + .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber) + .add("get", "/healthcheck", healthCheckHandler)) .build().start(); } + + /** + * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange. + * @param event to be ignored. + */ + public static void handleBackPressure(@NonNull Event event) { + logger.unwrap().debug("Event will not be processed, responding with 429"); + event.getHttpServerExchange() + .setStatusCode(StatusCodes.TOO_MANY_REQUESTS) + .getResponseSender() + .send(StatusCodes.TOO_MANY_REQUESTS_STRING); + event.getHttpServerExchange() + .unDispatch(); + } + + /** + * Takes the exchange from an event, responds with a 200 and un-dispatches the exchange. + * @param event to be received. + */ + public static void receiveRequest(@NonNull Event event) { + logger.unwrap().debug("Event will be processed, responding with 200"); + event.getHttpServerExchange() + .setStatusCode(StatusCodes.OK) + .getResponseSender() + .send(StatusCodes.OK_STRING); + event.getHttpServerExchange() + .unDispatch(); + } } |