summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoeOLeary <joseph.o.leary@est.tech>2019-02-22 15:10:41 +0000
committerJoeOLeary <joseph.o.leary@est.tech>2019-02-22 15:10:41 +0000
commitf7e07b1bfda9ffe612b1339e4b95394448c700f4 (patch)
tree6d1b54e35d8a45901036fa8d4676f05eced44f07
parent5e40fbfdf79be48a5ff19393f65c0e09309e868a (diff)
Add support for parallel event handling
*Add support for parallel event handling *Add support for handling backpressure *Update delivery end point Issue-ID: DCAEGEN2-1268 Change-Id: Id467080fa135b58f2d3e366f00fd8ad5e1c6ec06 Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java61
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java66
2 files changed, 114 insertions, 13 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 72d1509..e083466 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -24,6 +24,7 @@ import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.util.StatusCodes;
+import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
@@ -32,12 +33,16 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.scheduler.Schedulers;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -46,29 +51,59 @@ public class App {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
private static Path mappingTemplate = Paths.get("/opt/app/pm-mapper/etc/mapping.ftl");
private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd");
+ private static FluxSink<Event> fluxSink;
public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
+ Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
Mapper mapper = new Mapper(mappingTemplate);
XMLValidator validator = new XMLValidator(xmlSchema);
- DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> {
- event.getHttpServerExchange().unDispatch();
- event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING);
- MDC.setContextMap(event.getMdc());
- if(!validator.validate(event)){
- logger.unwrap().info("Event failed validation against schema.");
- } else {
- String ves = mapper.map(event);
- logger.unwrap().info("Mapped Event: {}", ves);
- }
- });
+ flux.onBackpressureDrop(App::handleBackPressure)
+ .doOnNext(App::receiveRequest)
+ .limitRate(1)
+ .parallel()
+ .runOn(Schedulers.newParallel(""), 1)
+ .doOnNext(event -> MDC.setContextMap(event.getMdc()))
+ .filter(validator::validate)
+ .map(mapper::map)
+ .subscribe(event -> logger.unwrap().info("Event Processed"));
+
+ DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
dataRouterSubscriber.start(mapperConfig);
Undertow.builder()
.addHttpListener(8081, "0.0.0.0")
- .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber)
- .add("get", "/healthcheck", healthCheckHandler))
+ .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber)
+ .add("get", "/healthcheck", healthCheckHandler))
.build().start();
}
+
+ /**
+ * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange.
+ * @param event to be ignored.
+ */
+ public static void handleBackPressure(@NonNull Event event) {
+ logger.unwrap().debug("Event will not be processed, responding with 429");
+ event.getHttpServerExchange()
+ .setStatusCode(StatusCodes.TOO_MANY_REQUESTS)
+ .getResponseSender()
+ .send(StatusCodes.TOO_MANY_REQUESTS_STRING);
+ event.getHttpServerExchange()
+ .unDispatch();
+ }
+
+ /**
+ * Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
+ * @param event to be received.
+ */
+ public static void receiveRequest(@NonNull Event event) {
+ logger.unwrap().debug("Event will be processed, responding with 200");
+ event.getHttpServerExchange()
+ .setStatusCode(StatusCodes.OK)
+ .getResponseSender()
+ .send(StatusCodes.OK_STRING);
+ event.getHttpServerExchange()
+ .unDispatch();
+ }
}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
new file mode 100644
index 0000000..82650f3
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.undertow.util.StatusCodes;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+
+
+@ExtendWith(MockitoExtension.class)
+class AppTest {
+
+ @Test
+ void testHandleBackPressureNullValue() {
+ assertThrows(NullPointerException.class, () -> App.handleBackPressure(null));
+ }
+
+ @Test
+ void testHandleBackPressure() {
+ Event event = utils.EventUtils.makeMockEvent("", mock(EventMetadata.class));
+ App.handleBackPressure(event);
+ verify(event.getHttpServerExchange(), times(1)).setStatusCode(StatusCodes.TOO_MANY_REQUESTS);
+ verify(event.getHttpServerExchange(), times(1)).unDispatch();
+ }
+
+ @Test
+ void testReceiveRequestNullValue() {
+ assertThrows(NullPointerException.class, () -> App.receiveRequest(null));
+ }
+
+ @Test
+ void testReceiveRequest() {
+ Event event = utils.EventUtils.makeMockEvent("", mock(EventMetadata.class));
+ App.receiveRequest(event);
+ verify(event.getHttpServerExchange(), times(1)).setStatusCode(StatusCodes.OK);
+ verify(event.getHttpServerExchange(), times(1)).unDispatch();
+ }
+
+
+}