From f789f254efe024085663bbe04f99f10af4a98fe7 Mon Sep 17 00:00:00 2001 From: tkogut Date: Tue, 13 Apr 2021 10:11:35 +0200 Subject: [DCAE/PM-Mapper] Utilize DMaaP-Client in PM-Mapper - Bump mockserver libraries - Use dmaap-client for sending events to dmapp-mr - Extract Retry/Timeout configs to separate class - Extract logging utils to separate class Issue-ID: DCAEGEN2-2732 Change-Id: I5d406e99fe1def078f102ff704df5312f5ae996b Signed-off-by: tkogut --- Changelog.md | 1 + pom.xml | 26 ++++- .../pmmapper/exceptions/MRPublisherException.java | 6 +- .../pmmapper/messagerouter/VESPublisher.java | 80 ++++++++------ .../pmmapper/utils/DmaapRequestSender.java | 120 +++++++++++++++++++++ .../services/pmmapper/utils/LoggingUtils.java | 45 ++++++++ .../services/pmmapper/utils/RequestSender.java | 47 ++++---- .../services/pmmapper/utils/SendersConfig.java | 34 ++++++ .../pmmapper/messagerouter/VESPublisherTest.java | 116 ++++++++++++++------ .../onap/dcaegen2/services/pmmapper/AppTest.java | 2 +- .../services/pmmapper/mapping/MapperTest.java | 6 +- .../pmmapper/utils/DataRouterUtilsTest.java | 8 +- .../pmmapper/utils/DmaapRequestSenderTest.java | 112 +++++++++++++++++++ .../pmmapper/utils/RequestSenderTests.java | 19 ++-- 14 files changed, 508 insertions(+), 114 deletions(-) create mode 100644 src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java create mode 100644 src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java create mode 100644 src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java create mode 100644 src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java diff --git a/Changelog.md b/Changelog.md index 3b10d37..7990b6b 100644 --- a/Changelog.md +++ b/Changelog.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [1.6.0] - 25/03/2021 ### Changed +- Utilize DMaaP-Client in PM-Mapper - Switched Dockerfile to integration image (alpine-based) ## [1.5.2] - 18/03/2021 diff --git a/pom.xml b/pom.xml index e978515..3b82491 100644 --- a/pom.xml +++ b/pom.xml @@ -48,19 +48,22 @@ 1.2.3 3.4.0 2.2.3.Final - 2.8.5 + 2.8.6 2.3.28 2.8.0 2.3.1 2.3.0.1 + 1.8.2 5.3.2 2.23.4 2.23.4 2.0.7 - 3.10.8 + 5.11.2 4.12 - 1.3.0 + 1.5.1 + 20210307 + 2.12.2 2.11.0 3.4.0 @@ -119,6 +122,11 @@ jaxb-core ${jaxb.version} + + org.onap.dcaegen2.services.sdk.rest.services + dmaap-client + ${dmaap-clinet.version} + com.sun.xml.bind jaxb-impl @@ -209,6 +217,18 @@ ${mockserver.version} test + + org.json + json + ${json.version} + test + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + test + org.junit.jupiter junit-jupiter-params diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java index 6b5c157..248debe 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +22,7 @@ package org.onap.dcaegen2.services.pmmapper.exceptions; public class MRPublisherException extends RuntimeException{ - public MRPublisherException(String message, Throwable cause) { - super(message, cause); + public MRPublisherException(String message) { + super(message); } - } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java index 6aaf1d6..9e0b87c 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,57 +21,74 @@ package org.onap.dcaegen2.services.pmmapper.messagerouter; -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.List; - import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; -import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; +import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; +import java.util.List; +import java.util.stream.Collectors; + public class VESPublisher { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class)); - private RequestSender sender; - private MapperConfig config; + private final DmaapRequestSender sender; + private final MapperConfig config; public VESPublisher(MapperConfig config) { - this(config, new RequestSender()); + this(config, new DmaapRequestSender()); } - public VESPublisher(MapperConfig config, RequestSender sender) { + public VESPublisher(MapperConfig config, DmaapRequestSender sender) { this.sender = sender; this.config = config; } public Flux publish(List events) { logger.unwrap().info("Publishing VES events to messagerouter."); - Event event = events.get(0); - try { - events.forEach(e -> this.publish(e.getVes())); - logger.unwrap().info("Successfully published VES events to messagerouter."); - } catch (MRPublisherException e) { - logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", e); - return Flux.empty(); - } - return Flux.just(event); + Event first = events.get(0); + List vesEvents = minifiedVesEvents(events); + return publishEvents(vesEvents) + .filter(DmaapResponse::failed) + .takeLast(1) + .flatMap(this::toFluxError) + .defaultIfEmpty(first) + .doOnComplete(() -> logger.unwrap().info("Successfully published VES events to messagerouter.")) + .onErrorResume(this::resume); + } + + private List minifiedVesEvents(List events) { + return events.stream() + .map(Event::getVes) + .map(vesEvent -> vesEvent.replace("\n", "")) + .collect(Collectors.toList()); + } + + private Flux publishEvents(List vesEvents) { + String topicUrl = config.getPublisherTopicUrl(); + AafCredentials credentials = aafCredentials(); + return sender.send(topicUrl, vesEvents, credentials); + } + + private Flux toFluxError(MessageRouterPublishResponse response) { + return Flux.error(new MRPublisherException(response.failReason())); + } + + private Flux resume(Throwable t) { + logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", t); + return Flux.empty(); } - private void publish(String ves) { - try { - String topicUrl = config.getPublisherTopicUrl(); - ves = ves.replaceAll("\n", ""); - String userCredentials = Base64.getEncoder() - .encodeToString((this.config.getPublisherUserName() + ":" + - this.config.getPublisherPassword()) - .getBytes(StandardCharsets.UTF_8)); - sender.send("POST", topicUrl, ves, userCredentials); - } catch (Exception e) { - throw new MRPublisherException(e.getMessage(), e); - } + private AafCredentials aafCredentials() { + return ImmutableAafCredentials.builder() + .username(config.getPublisherUserName()) + .password(config.getPublisherPassword()) + .build(); } } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java new file mode 100644 index 0000000..1a7c59e --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java @@ -0,0 +1,120 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import io.vavr.control.Try; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.MAX_RETRIES; +import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.RETRY_INTERVAL; + +public class DmaapRequestSender { + private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(DmaapRequestSender.class)); + + private static final DmaapRetryConfig RETRY_CONFIG = ImmutableDmaapRetryConfig.builder() + .retryCount(MAX_RETRIES) + .retryIntervalInSeconds((int) RETRY_INTERVAL.getSeconds()) + .build(); + private static final MessageRouterPublisherConfig CLIENT_CONFIGURATION = + ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(RETRY_CONFIG) + .build(); + private static final DmaapTimeoutConfig READ_TIMEOUT = ImmutableDmaapTimeoutConfig.builder() + .timeout(SendersConfig.READ_TIMEOUT) + .build(); + private static final MessageRouterPublisher PUBLISHER = + DmaapClientFactory.createMessageRouterPublisher(CLIENT_CONFIGURATION); + + /** + * Sends an http request to a given dmaap-mr topic. + * + * @param topicUrl representing given topic + * @param vesEvents of the requests as json + * @param credentials base64-encoded username password credentials + * @return dmaap-mr response + */ + public Flux send(final String topicUrl, final List vesEvents, final AafCredentials credentials) { + MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() + .contentType(ContentType.TEXT_PLAIN) + .sinkDefinition(sink(topicUrl, credentials)) + .timeoutConfig(READ_TIMEOUT) + .diagnosticContext(diagnosticContext()) + .build(); + + return PUBLISHER.put(request, jsonBatch(vesEvents)); + } + + private static MessageRouterSink sink(String topicUrl, AafCredentials credentials) { + return ImmutableMessageRouterSink.builder() + .aafCredentials(credentials) + .topicUrl(topicUrl) + .build(); + } + + private static RequestDiagnosticContext diagnosticContext() { + UUID invocationId = uuid(LoggingUtils.invocationID(LOGGER)); + UUID requestId = uuid(LoggingUtils.requestID()); + return ImmutableRequestDiagnosticContext.builder() + .invocationId(invocationId) + .requestId(requestId) + .build(); + } + + private static Flux jsonBatch(List events) { + return Flux.fromIterable(getAsJsonElements(events)); + } + + private static List getAsJsonElements(List events) { + return events.stream() + .map(JsonParser::parseString) + .collect(Collectors.toList()); + } + + private static UUID uuid(String s) { + return Try.of(() -> UUID.fromString(s)) + .getOrElse(UUID::randomUUID); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java new file mode 100644 index 0000000..9e7fc8e --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import org.jboss.logging.MDC; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.onap.logging.ref.slf4j.ONAPLogConstants; + +import java.util.Optional; +import java.util.UUID; + +final class LoggingUtils { + + private LoggingUtils() { + throw new IllegalStateException("Utility class;shouldn't be constructed"); + } + + static String invocationID(ONAPLogAdapter logger) { + return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID)) + .orElseGet(()-> logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString()); + } + + static String requestID() { + return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.REQUEST_ID)) + .orElseGet(() -> UUID.randomUUID().toString()); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java index 4993a10..3ab8ab6 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,30 +29,23 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; -import java.util.Optional; -import java.util.UUID; import java.util.stream.Collectors; import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure; import org.onap.dcaegen2.services.pmmapper.exceptions.ServerResponseException; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.onap.logging.ref.slf4j.ONAPLogConstants; import org.slf4j.LoggerFactory; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; -import org.jboss.logging.MDC; public class RequestSender { - private static final int MAX_RETRIES = 5; - private static final int RETRY_INTERVAL = 1000; private static final String SERVER_ERROR_MESSAGE = "Error on Server"; private static final int ERROR_START_RANGE = 300; - private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class)); - public static final String DELETE = "DELETE"; - public static final String DEFAULT_CONTENT_TYPE = "text/plain"; - public static final int DEFAULT_READ_TIMEOUT = 5000; + private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class)); /** * Works just like {@link RequestSender#send(method,urlString)}, except {@code method } @@ -91,20 +85,16 @@ public class RequestSender { */ public String send(String method, final String urlString, final String body, final String encodedCredentials) throws InterruptedException { - String invocationID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID)) - .orElse(logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString()); - String requestID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID)) - .orElse(UUID.randomUUID().toString()); String result = ""; boolean status = false; - int attempts = 1; + int attempts = 0; try { - while (!status && attempts <= MAX_RETRIES) { - if(attempts != 1) { - Thread.sleep(RETRY_INTERVAL); + while (!status && attempts <= SendersConfig.MAX_RETRIES) { + if(attempts != 0) { + Thread.sleep(SendersConfig.RETRY_INTERVAL.toMillis()); } final URL url = new URL(urlString); - final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID); + final HttpURLConnection connection = getHttpURLConnection(method, url); if ("https".equalsIgnoreCase(url.getProtocol())) { HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory()); @@ -122,26 +112,25 @@ public class RequestSender { attempts++; } } catch (IOException | NoSuchAlgorithmException ex) { - logger.unwrap().warn("Request failure", ex); + LOGGER.unwrap().warn("Request failure", ex); throw new RequestFailure(ex); } return result; } - private HttpURLConnection getHttpURLConnection(String method, URL url, String invocationID, String requestID) + private HttpURLConnection getHttpURLConnection(String method, URL url) throws IOException { HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setReadTimeout(DEFAULT_READ_TIMEOUT); - connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID); - connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID); + connection.setReadTimeout((int) SendersConfig.READ_TIMEOUT.toMillis()); + connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, LoggingUtils.requestID()); + connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, LoggingUtils.invocationID(LOGGER)); connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME); connection.setRequestMethod(method); - return connection; } private void setMessageBody(HttpURLConnection connection, String body) throws IOException { - connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE); + connection.setRequestProperty("Content-Type", ContentType.TEXT_PLAIN.toString()); connection.setDoOutput(true); OutputStream outputStream = connection.getOutputStream(); outputStream.write(body.getBytes(StandardCharsets.UTF_8)); @@ -150,7 +139,7 @@ public class RequestSender { } private boolean retryLimitReached(final int retryCount) { - return retryCount >= MAX_RETRIES; + return retryCount >= SendersConfig.MAX_RETRIES; } private boolean isWithinErrorRange(final int responseCode) { @@ -158,18 +147,18 @@ public class RequestSender { } private String getResult(int attemptNumber, HttpURLConnection connection) throws IOException { - logger.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL()); + LOGGER.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL()); String result = ""; try (InputStream is = connection.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(is))) { result = reader.lines().collect(Collectors.joining("\n")); int responseCode = connection.getResponseCode(); if (!(isWithinErrorRange(responseCode))) { - logger.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result); + LOGGER.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result); } } catch (Exception e) { if (retryLimitReached(attemptNumber)) { - logger.unwrap().error("Execution error: {}", connection.getResponseMessage(), e); + LOGGER.unwrap().error("Execution error: {}", connection.getResponseMessage(), e); throw new ServerResponseException(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e); } } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java new file mode 100644 index 0000000..c0c972d --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import java.time.Duration; + +final class SendersConfig { + + private SendersConfig() { + throw new IllegalStateException("SendersConfig class;shouldn't be constructed"); + } + + static final int MAX_RETRIES = 4; + static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); + static final Duration READ_TIMEOUT = Duration.ofSeconds(5); +} diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java index e5c5af4..47e09e9 100644 --- a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java +++ b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,75 +19,122 @@ * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.pmmapper.messagerouter; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import reactor.test.StepVerifier; -import java.util.Arrays; -import java.util.List; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher; -import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; -import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; +import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender; +import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest(EnvironmentConfig.class) @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) public class VESPublisherTest { - private static String topicURL = "http://mr/topic"; - private static RequestSender sender; - private static MapperConfig config; + private static final String TOPIC_URL = "http://mr/topic"; + private static final String VES = "{}"; + private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder() + .username("") + .password("") + .build(); + + private static final MessageRouterPublishResponse SUCCESSFUL = + ImmutableMessageRouterPublishResponse.builder().build(); + private static final MessageRouterPublishResponse FAILED = + ImmutableMessageRouterPublishResponse.builder() + .failReason("failReason") + .build(); + + private DmaapRequestSender sender; private VESPublisher sut; - private String ves = "{}"; @Before - public void before() throws Exception { - config = mock(MapperConfig.class); - sender = mock(RequestSender.class); + public void before() { + MapperConfig config = mock(MapperConfig.class); + when(config.getPublisherTopicUrl()).thenReturn(TOPIC_URL); + when(config.getPublisherUserName()).thenReturn(""); + when(config.getPublisherPassword()).thenReturn(""); + sender = mock(DmaapRequestSender.class); sut = new VESPublisher(config, sender); - when(config.getPublisherTopicUrl()).thenReturn(topicURL); } @Test - public void publish_multiple_success() throws Exception { + public void publish_multiple_success() { + Event event = mock(Event.class); + List events = Arrays.asList(event, event, event); + when(event.getVes()).thenReturn(VES); + MessageRouterPublishResponse successfulResponse = ImmutableMessageRouterPublishResponse.builder().build(); + when(sender.send(any(), any(), any())).thenReturn(Flux.just(successfulResponse, successfulResponse)); + + Flux flux = sut.publish(events); + + verify(sender, times(1)).send(anyString(), any(), any()); + StepVerifier.create(flux) + .expectNextMatches(event::equals) + .verifyComplete(); + } + + @Test + public void publish_multiple_fail_sender_exceptions() { + Event event = mock(Event.class); + List events = Arrays.asList(event, event, event); + when(event.getVes()).thenReturn(VES); + when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS))) + .thenReturn(Flux.error(new RuntimeException())); + + Flux flux = sut.publish(events); + + StepVerifier.create(flux) + .verifyComplete(); + } + + @Test + public void publish_multiple_fail_mr_responses_failed() { Event event = mock(Event.class); - List events = Arrays.asList(event,event,event); - when(event.getVes()).thenReturn(ves); + List events = Arrays.asList(event, event, event); + when(event.getVes()).thenReturn(VES); + when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS))) + .thenReturn(Flux.just(FAILED, FAILED, FAILED)); Flux flux = sut.publish(events); - verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); StepVerifier.create(flux) - .expectNextMatches(event::equals) - .expectComplete() - .verify(); + .verifyComplete(); } @Test - public void publish_multiple_fail() throws Exception { + public void publish_multiple_fail_and_multiple_success() { Event event = mock(Event.class); - List events = Arrays.asList(event,event,event); - when(event.getVes()).thenReturn(ves); - when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(RequestFailure.class); + when(event.getVes()).thenReturn(VES); + List events = Arrays.asList(event, event, event, event); + when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS))) + .thenReturn(Flux.just(SUCCESSFUL, FAILED, SUCCESSFUL, FAILED)); Flux flux = sut.publish(events); StepVerifier.create(flux) - .expectNext(events.get(0)) - .verifyComplete(); + .verifyComplete(); } } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java index db45029..617cbd1 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java @@ -62,7 +62,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.mockserver.client.server.MockServerClient; +import org.mockserver.client.MockServerClient; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler; diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java index 7ddc929..26cb648 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +43,7 @@ import java.util.List; import org.everit.json.schema.Schema; import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONException; import org.json.JSONObject; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -79,7 +81,7 @@ class MapperTest { @BeforeAll - static void classSetup() throws IOException { + static void classSetup() throws IOException, JSONException { JSONObject ves = new JSONObject(new String(Files.readAllBytes(schema))); vesSchema = SchemaLoader.load(ves); @@ -97,7 +99,7 @@ class MapperTest { @ParameterizedTest @MethodSource("getValidEvents") - void testValidEvent(Event testEvent) { + void testValidEvent(Event testEvent) throws JSONException { vesSchema.validate(new JSONObject(objUnderTest.map(testEvent))); } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java index 0451543..1c6d850 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 - 2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +66,7 @@ public class DataRouterUtilsTest { private static MapperConfig validConfig; private SSLContextFactory sslContextFactory; private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json"); + private static final String DELETE = "DELETE"; @Test public void processEventSuccessful() throws Exception { @@ -94,7 +96,7 @@ public class DataRouterUtilsTest { Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); - verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE); + verify(mockConnection, times(1)).setRequestMethod(DELETE); } @Test @@ -117,7 +119,7 @@ public class DataRouterUtilsTest { Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); - verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE); + verify(mockConnection, times(1)).setRequestMethod(DELETE); } @Test @@ -148,7 +150,7 @@ public class DataRouterUtilsTest { PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); - verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE); + verify(mockConnection, times(5)).setRequestMethod(DELETE); } @Test diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java new file mode 100644 index 0000000..50abb5f --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpStatusCode; +import org.mockserver.verify.VerificationTimes; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.Collections; +import java.util.List; + +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class DmaapRequestSenderTest { + + private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder() + .username("") + .password("") + .build(); + private static final List SINGLE = Collections.singletonList("any"); + + private static ClientAndServer mockServer; + private final MockServerClient client = mockClient(); + + @BeforeClass + public static void setup() { + mockServer = startClientAndServer(35454); + } + + @AfterClass + public static void teardown() { + mockServer.stop(); + } + + @Before + public void setUp() { + client.reset(); + } + + @Test + public void send_success() { + client.when(request()).respond(response() + .withStatusCode(HttpStatusCode.OK_200.code()) + .withBody("ResponseBody")); + + Flux result = new DmaapRequestSender() + .send("http://127.0.0.1:35454/once", SINGLE, AAF_CREDENTIALS); + + StepVerifier.create(result) + .expectNextMatches(DmaapResponse::successful) + .verifyComplete(); + client.verify(request(), VerificationTimes.once()); + } + + @Test + public void host_unavailable_retry_mechanism() { + client.when(request()) + .respond(response().withStatusCode(HttpStatusCode.SERVICE_UNAVAILABLE_503.code())); + + Flux result = new DmaapRequestSender() + .send("http://127.0.0.1:35454/anypath", SINGLE, AAF_CREDENTIALS); + + StepVerifier.create(result) + .expectNextMatches(DmaapResponse::failed) + .verifyComplete(); + client.verify(request(), VerificationTimes.exactly(5)); + } + + @Test + public void host_unknown() { + Flux result = new DmaapRequestSender() + .send("http://unknown-host:35454/host-is-unknown", SINGLE, AAF_CREDENTIALS); + + StepVerifier.create(result) + .verifyError(); + client.verify(request(), VerificationTimes.exactly(0)); + } + + private MockServerClient mockClient() { + return new MockServerClient("127.0.0.1", 35454); + } +} diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java index 08faf4a..8541824 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,15 +31,15 @@ import java.net.URL; import java.net.UnknownHostException; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockserver.client.server.MockServerClient; +import org.mockserver.client.MockServerClient; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; import org.mockserver.model.HttpStatusCode; import org.mockserver.verify.VerificationTimes; -import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; import org.onap.logging.ref.slf4j.ONAPLogConstants; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -53,7 +54,7 @@ import utils.LoggingUtils; @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) public class RequestSenderTests { private static ClientAndServer mockServer; - private MockServerClient client = mockClient(); + private final MockServerClient client = mockClient(); @BeforeClass public static void setup() { @@ -65,6 +66,11 @@ public class RequestSenderTests { mockServer.stop(); } + @Before + public void setUp() { + client.reset(); + } + @Test public void send_success() throws Exception { String url = "http://127.0.0.1:35454/once"; @@ -84,11 +90,10 @@ public class RequestSenderTests { assertTrue(logAppender.list.get(1).getMessage().contains("Sending")); assertTrue(logAppender.list.get(2).getMessage().contains("Received")); logAppender.stop(); - client.clear(req); } @Test - public void host_unavailable_retry_mechanism() throws Exception { + public void host_unavailable_retry_mechanism() { PowerMockito.mockStatic(Thread.class); client.when(request()) @@ -99,7 +104,6 @@ public class RequestSenderTests { }); client.verify(request(), VerificationTimes.exactly(5)); - client.clear(request()); } @Test @@ -114,11 +118,10 @@ public class RequestSenderTests { }); client.verify(request(), VerificationTimes.exactly(0)); - client.clear(request()); } private MockServerClient mockClient() { return new MockServerClient("127.0.0.1", 35454); } -} \ No newline at end of file +} -- cgit 1.2.3-korg