diff options
14 files changed, 508 insertions, 114 deletions
diff --git a/Changelog.md b/Changelog.md index 3b10d37..7990b6b 100644 --- a/Changelog.md +++ b/Changelog.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [1.6.0] - 25/03/2021 ### Changed +- Utilize DMaaP-Client in PM-Mapper - Switched Dockerfile to integration image (alpine-based) ## [1.5.2] - 18/03/2021 @@ -48,19 +48,22 @@ <logback.version>1.2.3</logback.version> <reactor.version>3.4.0</reactor.version> <undertow.version>2.2.3.Final</undertow.version> - <gson.version>2.8.5</gson.version> + <gson.version>2.8.6</gson.version> <freemarker.version>2.3.28</freemarker.version> <commons.io.version>2.8.0</commons.io.version> <xml.version>2.3.1</xml.version> <jaxb.version>2.3.0.1</jaxb.version> + <dmaap-clinet.version>1.8.2</dmaap-clinet.version> <!-- Testing Test Dependencies --> <junit.version>5.3.2</junit.version> <mockito.version>2.23.4</mockito.version> <mockito-ju5-ext.version>2.23.4</mockito-ju5-ext.version> <powermock.version>2.0.7</powermock.version> - <mockserver.version>3.10.8</mockserver.version> + <mockserver.version>5.11.2</mockserver.version> <junit4.version>4.12</junit4.version> - <jsonschema.version>1.3.0</jsonschema.version> + <jsonschema.version>1.5.1</jsonschema.version> + <json.version>20210307</json.version> + <jackson.version>2.12.2</jackson.version> <xerces.version>2.11.0</xerces.version> <reactor.test>3.4.0</reactor.test> <!-- Plugin Versions --> @@ -120,6 +123,11 @@ <version>${jaxb.version}</version> </dependency> <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>dmaap-client</artifactId> + <version>${dmaap-clinet.version}</version> + </dependency> + <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId> <version>${xml.version}</version> @@ -210,6 +218,18 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>${json.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-params</artifactId> <version>${junit.version}</version> diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java index 6b5c157..248debe 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java @@ -1,6 +1,7 @@ /*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +22,7 @@ package org.onap.dcaegen2.services.pmmapper.exceptions;
public class MRPublisherException extends RuntimeException{
- public MRPublisherException(String message, Throwable cause) {
- super(message, cause);
+ public MRPublisherException(String message) {
+ super(message);
}
-
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java index 6aaf1d6..9e0b87c 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java @@ -1,6 +1,7 @@ /*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,57 +21,74 @@ package org.onap.dcaegen2.services.pmmapper.messagerouter;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.List;
-
import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Flux;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class VESPublisher {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));
- private RequestSender sender;
- private MapperConfig config;
+ private final DmaapRequestSender sender;
+ private final MapperConfig config;
public VESPublisher(MapperConfig config) {
- this(config, new RequestSender());
+ this(config, new DmaapRequestSender());
}
- public VESPublisher(MapperConfig config, RequestSender sender) {
+ public VESPublisher(MapperConfig config, DmaapRequestSender sender) {
this.sender = sender;
this.config = config;
}
public Flux<Event> publish(List<Event> events) {
logger.unwrap().info("Publishing VES events to messagerouter.");
- Event event = events.get(0);
- try {
- events.forEach(e -> this.publish(e.getVes()));
- logger.unwrap().info("Successfully published VES events to messagerouter.");
- } catch (MRPublisherException e) {
- logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", e);
- return Flux.empty();
- }
- return Flux.just(event);
+ Event first = events.get(0);
+ List<String> vesEvents = minifiedVesEvents(events);
+ return publishEvents(vesEvents)
+ .filter(DmaapResponse::failed)
+ .takeLast(1)
+ .flatMap(this::toFluxError)
+ .defaultIfEmpty(first)
+ .doOnComplete(() -> logger.unwrap().info("Successfully published VES events to messagerouter."))
+ .onErrorResume(this::resume);
+ }
+
+ private List<String> minifiedVesEvents(List<Event> events) {
+ return events.stream()
+ .map(Event::getVes)
+ .map(vesEvent -> vesEvent.replace("\n", ""))
+ .collect(Collectors.toList());
+ }
+
+ private Flux<MessageRouterPublishResponse> publishEvents(List<String> vesEvents) {
+ String topicUrl = config.getPublisherTopicUrl();
+ AafCredentials credentials = aafCredentials();
+ return sender.send(topicUrl, vesEvents, credentials);
+ }
+
+ private Flux<Event> toFluxError(MessageRouterPublishResponse response) {
+ return Flux.error(new MRPublisherException(response.failReason()));
+ }
+
+ private Flux<Event> resume(Throwable t) {
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", t);
+ return Flux.empty();
}
- private void publish(String ves) {
- try {
- String topicUrl = config.getPublisherTopicUrl();
- ves = ves.replaceAll("\n", "");
- String userCredentials = Base64.getEncoder()
- .encodeToString((this.config.getPublisherUserName() + ":" +
- this.config.getPublisherPassword())
- .getBytes(StandardCharsets.UTF_8));
- sender.send("POST", topicUrl, ves, userCredentials);
- } catch (Exception e) {
- throw new MRPublisherException(e.getMessage(), e);
- }
+ private AafCredentials aafCredentials() {
+ return ImmutableAafCredentials.builder()
+ .username(config.getPublisherUserName())
+ .password(config.getPublisherPassword())
+ .build();
}
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java new file mode 100644 index 0000000..1a7c59e --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java @@ -0,0 +1,120 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import io.vavr.control.Try; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.MAX_RETRIES; +import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.RETRY_INTERVAL; + +public class DmaapRequestSender { + private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(DmaapRequestSender.class)); + + private static final DmaapRetryConfig RETRY_CONFIG = ImmutableDmaapRetryConfig.builder() + .retryCount(MAX_RETRIES) + .retryIntervalInSeconds((int) RETRY_INTERVAL.getSeconds()) + .build(); + private static final MessageRouterPublisherConfig CLIENT_CONFIGURATION = + ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(RETRY_CONFIG) + .build(); + private static final DmaapTimeoutConfig READ_TIMEOUT = ImmutableDmaapTimeoutConfig.builder() + .timeout(SendersConfig.READ_TIMEOUT) + .build(); + private static final MessageRouterPublisher PUBLISHER = + DmaapClientFactory.createMessageRouterPublisher(CLIENT_CONFIGURATION); + + /** + * Sends an http request to a given dmaap-mr topic. + * + * @param topicUrl representing given topic + * @param vesEvents of the requests as json + * @param credentials base64-encoded username password credentials + * @return dmaap-mr response + */ + public Flux<MessageRouterPublishResponse> send(final String topicUrl, final List<String> vesEvents, final AafCredentials credentials) { + MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() + .contentType(ContentType.TEXT_PLAIN) + .sinkDefinition(sink(topicUrl, credentials)) + .timeoutConfig(READ_TIMEOUT) + .diagnosticContext(diagnosticContext()) + .build(); + + return PUBLISHER.put(request, jsonBatch(vesEvents)); + } + + private static MessageRouterSink sink(String topicUrl, AafCredentials credentials) { + return ImmutableMessageRouterSink.builder() + .aafCredentials(credentials) + .topicUrl(topicUrl) + .build(); + } + + private static RequestDiagnosticContext diagnosticContext() { + UUID invocationId = uuid(LoggingUtils.invocationID(LOGGER)); + UUID requestId = uuid(LoggingUtils.requestID()); + return ImmutableRequestDiagnosticContext.builder() + .invocationId(invocationId) + .requestId(requestId) + .build(); + } + + private static Flux<JsonElement> jsonBatch(List<String> events) { + return Flux.fromIterable(getAsJsonElements(events)); + } + + private static List<JsonElement> getAsJsonElements(List<String> events) { + return events.stream() + .map(JsonParser::parseString) + .collect(Collectors.toList()); + } + + private static UUID uuid(String s) { + return Try.of(() -> UUID.fromString(s)) + .getOrElse(UUID::randomUUID); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java new file mode 100644 index 0000000..9e7fc8e --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import org.jboss.logging.MDC; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.onap.logging.ref.slf4j.ONAPLogConstants; + +import java.util.Optional; +import java.util.UUID; + +final class LoggingUtils { + + private LoggingUtils() { + throw new IllegalStateException("Utility class;shouldn't be constructed"); + } + + static String invocationID(ONAPLogAdapter logger) { + return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID)) + .orElseGet(()-> logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString()); + } + + static String requestID() { + return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.REQUEST_ID)) + .orElseGet(() -> UUID.randomUUID().toString()); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java index 4993a10..3ab8ab6 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java @@ -1,6 +1,7 @@ /*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,30 +29,23 @@ import java.net.HttpURLConnection; import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
-import java.util.Optional;
-import java.util.UUID;
import java.util.stream.Collectors;
import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;
import org.onap.dcaegen2.services.pmmapper.exceptions.ServerResponseException;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
-import org.jboss.logging.MDC;
public class RequestSender {
- private static final int MAX_RETRIES = 5;
- private static final int RETRY_INTERVAL = 1000;
private static final String SERVER_ERROR_MESSAGE = "Error on Server";
private static final int ERROR_START_RANGE = 300;
- private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
- public static final String DELETE = "DELETE";
- public static final String DEFAULT_CONTENT_TYPE = "text/plain";
- public static final int DEFAULT_READ_TIMEOUT = 5000;
+ private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
/**
* Works just like {@link RequestSender#send(method,urlString)}, except {@code method }
@@ -91,20 +85,16 @@ public class RequestSender { */
public String send(String method, final String urlString, final String body, final String encodedCredentials)
throws InterruptedException {
- String invocationID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))
- .orElse(logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());
- String requestID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))
- .orElse(UUID.randomUUID().toString());
String result = "";
boolean status = false;
- int attempts = 1;
+ int attempts = 0;
try {
- while (!status && attempts <= MAX_RETRIES) {
- if(attempts != 1) {
- Thread.sleep(RETRY_INTERVAL);
+ while (!status && attempts <= SendersConfig.MAX_RETRIES) {
+ if(attempts != 0) {
+ Thread.sleep(SendersConfig.RETRY_INTERVAL.toMillis());
}
final URL url = new URL(urlString);
- final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);
+ final HttpURLConnection connection = getHttpURLConnection(method, url);
if ("https".equalsIgnoreCase(url.getProtocol())) {
HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory());
@@ -122,26 +112,25 @@ public class RequestSender { attempts++;
}
} catch (IOException | NoSuchAlgorithmException ex) {
- logger.unwrap().warn("Request failure", ex);
+ LOGGER.unwrap().warn("Request failure", ex);
throw new RequestFailure(ex);
}
return result;
}
- private HttpURLConnection getHttpURLConnection(String method, URL url, String invocationID, String requestID)
+ private HttpURLConnection getHttpURLConnection(String method, URL url)
throws IOException {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setReadTimeout(DEFAULT_READ_TIMEOUT);
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID);
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID);
+ connection.setReadTimeout((int) SendersConfig.READ_TIMEOUT.toMillis());
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, LoggingUtils.requestID());
+ connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, LoggingUtils.invocationID(LOGGER));
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
connection.setRequestMethod(method);
-
return connection;
}
private void setMessageBody(HttpURLConnection connection, String body) throws IOException {
- connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);
+ connection.setRequestProperty("Content-Type", ContentType.TEXT_PLAIN.toString());
connection.setDoOutput(true);
OutputStream outputStream = connection.getOutputStream();
outputStream.write(body.getBytes(StandardCharsets.UTF_8));
@@ -150,7 +139,7 @@ public class RequestSender { }
private boolean retryLimitReached(final int retryCount) {
- return retryCount >= MAX_RETRIES;
+ return retryCount >= SendersConfig.MAX_RETRIES;
}
private boolean isWithinErrorRange(final int responseCode) {
@@ -158,18 +147,18 @@ public class RequestSender { }
private String getResult(int attemptNumber, HttpURLConnection connection) throws IOException {
- logger.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());
+ LOGGER.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());
String result = "";
try (InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
result = reader.lines().collect(Collectors.joining("\n"));
int responseCode = connection.getResponseCode();
if (!(isWithinErrorRange(responseCode))) {
- logger.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);
+ LOGGER.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);
}
} catch (Exception e) {
if (retryLimitReached(attemptNumber)) {
- logger.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);
+ LOGGER.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);
throw new ServerResponseException(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);
}
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java new file mode 100644 index 0000000..c0c972d --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import java.time.Duration; + +final class SendersConfig { + + private SendersConfig() { + throw new IllegalStateException("SendersConfig class;shouldn't be constructed"); + } + + static final int MAX_RETRIES = 4; + static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); + static final Duration READ_TIMEOUT = Duration.ofSeconds(5); +} diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java index e5c5af4..47e09e9 100644 --- a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java +++ b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,75 +19,122 @@ * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.pmmapper.messagerouter; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import reactor.test.StepVerifier; -import java.util.Arrays; -import java.util.List; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher; -import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; -import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; +import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender; +import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest(EnvironmentConfig.class) @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) public class VESPublisherTest { - private static String topicURL = "http://mr/topic"; - private static RequestSender sender; - private static MapperConfig config; + private static final String TOPIC_URL = "http://mr/topic"; + private static final String VES = "{}"; + private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder() + .username("") + .password("") + .build(); + + private static final MessageRouterPublishResponse SUCCESSFUL = + ImmutableMessageRouterPublishResponse.builder().build(); + private static final MessageRouterPublishResponse FAILED = + ImmutableMessageRouterPublishResponse.builder() + .failReason("failReason") + .build(); + + private DmaapRequestSender sender; private VESPublisher sut; - private String ves = "{}"; @Before - public void before() throws Exception { - config = mock(MapperConfig.class); - sender = mock(RequestSender.class); + public void before() { + MapperConfig config = mock(MapperConfig.class); + when(config.getPublisherTopicUrl()).thenReturn(TOPIC_URL); + when(config.getPublisherUserName()).thenReturn(""); + when(config.getPublisherPassword()).thenReturn(""); + sender = mock(DmaapRequestSender.class); sut = new VESPublisher(config, sender); - when(config.getPublisherTopicUrl()).thenReturn(topicURL); } @Test - public void publish_multiple_success() throws Exception { + public void publish_multiple_success() { + Event event = mock(Event.class); + List<Event> events = Arrays.asList(event, event, event); + when(event.getVes()).thenReturn(VES); + MessageRouterPublishResponse successfulResponse = ImmutableMessageRouterPublishResponse.builder().build(); + when(sender.send(any(), any(), any())).thenReturn(Flux.just(successfulResponse, successfulResponse)); + + Flux<Event> flux = sut.publish(events); + + verify(sender, times(1)).send(anyString(), any(), any()); + StepVerifier.create(flux) + .expectNextMatches(event::equals) + .verifyComplete(); + } + + @Test + public void publish_multiple_fail_sender_exceptions() { + Event event = mock(Event.class); + List<Event> events = Arrays.asList(event, event, event); + when(event.getVes()).thenReturn(VES); + when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS))) + .thenReturn(Flux.error(new RuntimeException())); + + Flux<Event> flux = sut.publish(events); + + StepVerifier.create(flux) + .verifyComplete(); + } + + @Test + public void publish_multiple_fail_mr_responses_failed() { Event event = mock(Event.class); - List<Event> events = Arrays.asList(event,event,event); - when(event.getVes()).thenReturn(ves); + List<Event> events = Arrays.asList(event, event, event); + when(event.getVes()).thenReturn(VES); + when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS))) + .thenReturn(Flux.just(FAILED, FAILED, FAILED)); Flux<Event> flux = sut.publish(events); - verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); StepVerifier.create(flux) - .expectNextMatches(event::equals) - .expectComplete() - .verify(); + .verifyComplete(); } @Test - public void publish_multiple_fail() throws Exception { + public void publish_multiple_fail_and_multiple_success() { Event event = mock(Event.class); - List<Event> events = Arrays.asList(event,event,event); - when(event.getVes()).thenReturn(ves); - when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(RequestFailure.class); + when(event.getVes()).thenReturn(VES); + List<Event> events = Arrays.asList(event, event, event, event); + when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS))) + .thenReturn(Flux.just(SUCCESSFUL, FAILED, SUCCESSFUL, FAILED)); Flux<Event> flux = sut.publish(events); StepVerifier.create(flux) - .expectNext(events.get(0)) - .verifyComplete(); + .verifyComplete(); } } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java index db45029..617cbd1 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java @@ -62,7 +62,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.mockserver.client.server.MockServerClient; +import org.mockserver.client.MockServerClient; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler; diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java index 7ddc929..26cb648 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +43,7 @@ import java.util.List; import org.everit.json.schema.Schema; import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONException; import org.json.JSONObject; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -79,7 +81,7 @@ class MapperTest { @BeforeAll - static void classSetup() throws IOException { + static void classSetup() throws IOException, JSONException { JSONObject ves = new JSONObject(new String(Files.readAllBytes(schema))); vesSchema = SchemaLoader.load(ves); @@ -97,7 +99,7 @@ class MapperTest { @ParameterizedTest @MethodSource("getValidEvents") - void testValidEvent(Event testEvent) { + void testValidEvent(Event testEvent) throws JSONException { vesSchema.validate(new JSONObject(objUnderTest.map(testEvent))); } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java index 0451543..1c6d850 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 - 2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +66,7 @@ public class DataRouterUtilsTest { private static MapperConfig validConfig; private SSLContextFactory sslContextFactory; private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json"); + private static final String DELETE = "DELETE"; @Test public void processEventSuccessful() throws Exception { @@ -94,7 +96,7 @@ public class DataRouterUtilsTest { Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); - verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE); + verify(mockConnection, times(1)).setRequestMethod(DELETE); } @Test @@ -117,7 +119,7 @@ public class DataRouterUtilsTest { Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); - verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE); + verify(mockConnection, times(1)).setRequestMethod(DELETE); } @Test @@ -148,7 +150,7 @@ public class DataRouterUtilsTest { PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); - verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE); + verify(mockConnection, times(5)).setRequestMethod(DELETE); } @Test diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java new file mode 100644 index 0000000..50abb5f --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpStatusCode; +import org.mockserver.verify.VerificationTimes; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.Collections; +import java.util.List; + +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class DmaapRequestSenderTest { + + private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder() + .username("") + .password("") + .build(); + private static final List<String> SINGLE = Collections.singletonList("any"); + + private static ClientAndServer mockServer; + private final MockServerClient client = mockClient(); + + @BeforeClass + public static void setup() { + mockServer = startClientAndServer(35454); + } + + @AfterClass + public static void teardown() { + mockServer.stop(); + } + + @Before + public void setUp() { + client.reset(); + } + + @Test + public void send_success() { + client.when(request()).respond(response() + .withStatusCode(HttpStatusCode.OK_200.code()) + .withBody("ResponseBody")); + + Flux<MessageRouterPublishResponse> result = new DmaapRequestSender() + .send("http://127.0.0.1:35454/once", SINGLE, AAF_CREDENTIALS); + + StepVerifier.create(result) + .expectNextMatches(DmaapResponse::successful) + .verifyComplete(); + client.verify(request(), VerificationTimes.once()); + } + + @Test + public void host_unavailable_retry_mechanism() { + client.when(request()) + .respond(response().withStatusCode(HttpStatusCode.SERVICE_UNAVAILABLE_503.code())); + + Flux<MessageRouterPublishResponse> result = new DmaapRequestSender() + .send("http://127.0.0.1:35454/anypath", SINGLE, AAF_CREDENTIALS); + + StepVerifier.create(result) + .expectNextMatches(DmaapResponse::failed) + .verifyComplete(); + client.verify(request(), VerificationTimes.exactly(5)); + } + + @Test + public void host_unknown() { + Flux<MessageRouterPublishResponse> result = new DmaapRequestSender() + .send("http://unknown-host:35454/host-is-unknown", SINGLE, AAF_CREDENTIALS); + + StepVerifier.create(result) + .verifyError(); + client.verify(request(), VerificationTimes.exactly(0)); + } + + private MockServerClient mockClient() { + return new MockServerClient("127.0.0.1", 35454); + } +} diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java index 08faf4a..8541824 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,15 +31,15 @@ import java.net.URL; import java.net.UnknownHostException; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockserver.client.server.MockServerClient; +import org.mockserver.client.MockServerClient; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; import org.mockserver.model.HttpStatusCode; import org.mockserver.verify.VerificationTimes; -import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; import org.onap.logging.ref.slf4j.ONAPLogConstants; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -53,7 +54,7 @@ import utils.LoggingUtils; @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) public class RequestSenderTests { private static ClientAndServer mockServer; - private MockServerClient client = mockClient(); + private final MockServerClient client = mockClient(); @BeforeClass public static void setup() { @@ -65,6 +66,11 @@ public class RequestSenderTests { mockServer.stop(); } + @Before + public void setUp() { + client.reset(); + } + @Test public void send_success() throws Exception { String url = "http://127.0.0.1:35454/once"; @@ -84,11 +90,10 @@ public class RequestSenderTests { assertTrue(logAppender.list.get(1).getMessage().contains("Sending")); assertTrue(logAppender.list.get(2).getMessage().contains("Received")); logAppender.stop(); - client.clear(req); } @Test - public void host_unavailable_retry_mechanism() throws Exception { + public void host_unavailable_retry_mechanism() { PowerMockito.mockStatic(Thread.class); client.when(request()) @@ -99,7 +104,6 @@ public class RequestSenderTests { }); client.verify(request(), VerificationTimes.exactly(5)); - client.clear(request()); } @Test @@ -114,11 +118,10 @@ public class RequestSenderTests { }); client.verify(request(), VerificationTimes.exactly(0)); - client.clear(request()); } private MockServerClient mockClient() { return new MockServerClient("127.0.0.1", 35454); } -}
\ No newline at end of file +} |