From f7e07b1bfda9ffe612b1339e4b95394448c700f4 Mon Sep 17 00:00:00 2001 From: JoeOLeary Date: Fri, 22 Feb 2019 15:10:41 +0000 Subject: Add support for parallel event handling *Add support for parallel event handling *Add support for handling backpressure *Update delivery end point Issue-ID: DCAEGEN2-1268 Change-Id: Id467080fa135b58f2d3e366f00fd8ad5e1c6ec06 Signed-off-by: JoeOLeary --- .../org/onap/dcaegen2/services/pmmapper/App.java | 61 +++++++++++++++----- .../onap/dcaegen2/services/pmmapper/AppTest.java | 66 ++++++++++++++++++++++ 2 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 72d1509..e083466 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -24,6 +24,7 @@ import io.undertow.Handlers; import io.undertow.Undertow; import io.undertow.util.StatusCodes; +import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; @@ -32,12 +33,16 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; +import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler; import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; import java.nio.file.Path; import java.nio.file.Paths; @@ -46,29 +51,59 @@ public class App { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class)); private static Path mappingTemplate = Paths.get("/opt/app/pm-mapper/etc/mapping.ftl"); private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd"); + private static FluxSink fluxSink; public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException { + Flux flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); Mapper mapper = new Mapper(mappingTemplate); XMLValidator validator = new XMLValidator(xmlSchema); - DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> { - event.getHttpServerExchange().unDispatch(); - event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING); - MDC.setContextMap(event.getMdc()); - if(!validator.validate(event)){ - logger.unwrap().info("Event failed validation against schema."); - } else { - String ves = mapper.map(event); - logger.unwrap().info("Mapped Event: {}", ves); - } - }); + flux.onBackpressureDrop(App::handleBackPressure) + .doOnNext(App::receiveRequest) + .limitRate(1) + .parallel() + .runOn(Schedulers.newParallel(""), 1) + .doOnNext(event -> MDC.setContextMap(event.getMdc())) + .filter(validator::validate) + .map(mapper::map) + .subscribe(event -> logger.unwrap().info("Event Processed")); + + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next); MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); dataRouterSubscriber.start(mapperConfig); Undertow.builder() .addHttpListener(8081, "0.0.0.0") - .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber) - .add("get", "/healthcheck", healthCheckHandler)) + .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber) + .add("get", "/healthcheck", healthCheckHandler)) .build().start(); } + + /** + * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange. + * @param event to be ignored. + */ + public static void handleBackPressure(@NonNull Event event) { + logger.unwrap().debug("Event will not be processed, responding with 429"); + event.getHttpServerExchange() + .setStatusCode(StatusCodes.TOO_MANY_REQUESTS) + .getResponseSender() + .send(StatusCodes.TOO_MANY_REQUESTS_STRING); + event.getHttpServerExchange() + .unDispatch(); + } + + /** + * Takes the exchange from an event, responds with a 200 and un-dispatches the exchange. + * @param event to be received. + */ + public static void receiveRequest(@NonNull Event event) { + logger.unwrap().debug("Event will be processed, responding with 200"); + event.getHttpServerExchange() + .setStatusCode(StatusCodes.OK) + .getResponseSender() + .send(StatusCodes.OK_STRING); + event.getHttpServerExchange() + .unDispatch(); + } } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java new file mode 100644 index 0000000..82650f3 --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java @@ -0,0 +1,66 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import io.undertow.util.StatusCodes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; + + +@ExtendWith(MockitoExtension.class) +class AppTest { + + @Test + void testHandleBackPressureNullValue() { + assertThrows(NullPointerException.class, () -> App.handleBackPressure(null)); + } + + @Test + void testHandleBackPressure() { + Event event = utils.EventUtils.makeMockEvent("", mock(EventMetadata.class)); + App.handleBackPressure(event); + verify(event.getHttpServerExchange(), times(1)).setStatusCode(StatusCodes.TOO_MANY_REQUESTS); + verify(event.getHttpServerExchange(), times(1)).unDispatch(); + } + + @Test + void testReceiveRequestNullValue() { + assertThrows(NullPointerException.class, () -> App.receiveRequest(null)); + } + + @Test + void testReceiveRequest() { + Event event = utils.EventUtils.makeMockEvent("", mock(EventMetadata.class)); + App.receiveRequest(event); + verify(event.getHttpServerExchange(), times(1)).setStatusCode(StatusCodes.OK); + verify(event.getHttpServerExchange(), times(1)).unDispatch(); + } + + +} -- cgit 1.2.3-korg