summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Changelog.md1
-rw-r--r--pom.xml26
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java6
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java80
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java120
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java45
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java47
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java34
-rw-r--r--src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java116
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java2
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java6
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java8
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java112
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java19
14 files changed, 508 insertions, 114 deletions
diff --git a/Changelog.md b/Changelog.md
index 3b10d37..7990b6b 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [1.6.0] - 25/03/2021
### Changed
+- Utilize DMaaP-Client in PM-Mapper
- Switched Dockerfile to integration image (alpine-based)
## [1.5.2] - 18/03/2021
diff --git a/pom.xml b/pom.xml
index e978515..3b82491 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,19 +48,22 @@
<logback.version>1.2.3</logback.version>
<reactor.version>3.4.0</reactor.version>
<undertow.version>2.2.3.Final</undertow.version>
- <gson.version>2.8.5</gson.version>
+ <gson.version>2.8.6</gson.version>
<freemarker.version>2.3.28</freemarker.version>
<commons.io.version>2.8.0</commons.io.version>
<xml.version>2.3.1</xml.version>
<jaxb.version>2.3.0.1</jaxb.version>
+ <dmaap-clinet.version>1.8.2</dmaap-clinet.version>
<!-- Testing Test Dependencies -->
<junit.version>5.3.2</junit.version>
<mockito.version>2.23.4</mockito.version>
<mockito-ju5-ext.version>2.23.4</mockito-ju5-ext.version>
<powermock.version>2.0.7</powermock.version>
- <mockserver.version>3.10.8</mockserver.version>
+ <mockserver.version>5.11.2</mockserver.version>
<junit4.version>4.12</junit4.version>
- <jsonschema.version>1.3.0</jsonschema.version>
+ <jsonschema.version>1.5.1</jsonschema.version>
+ <json.version>20210307</json.version>
+ <jackson.version>2.12.2</jackson.version>
<xerces.version>2.11.0</xerces.version>
<reactor.test>3.4.0</reactor.test>
<!-- Plugin Versions -->
@@ -120,6 +123,11 @@
<version>${jaxb.version}</version>
</dependency>
<dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>dmaap-client</artifactId>
+ <version>${dmaap-clinet.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>${xml.version}</version>
@@ -210,6 +218,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>${json.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
index 6b5c157..248debe 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +22,7 @@
package org.onap.dcaegen2.services.pmmapper.exceptions;
public class MRPublisherException extends RuntimeException{
- public MRPublisherException(String message, Throwable cause) {
- super(message, cause);
+ public MRPublisherException(String message) {
+ super(message);
}
-
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
index 6aaf1d6..9e0b87c 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,57 +21,74 @@
package org.onap.dcaegen2.services.pmmapper.messagerouter;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.List;
-
import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Flux;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class VESPublisher {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));
- private RequestSender sender;
- private MapperConfig config;
+ private final DmaapRequestSender sender;
+ private final MapperConfig config;
public VESPublisher(MapperConfig config) {
- this(config, new RequestSender());
+ this(config, new DmaapRequestSender());
}
- public VESPublisher(MapperConfig config, RequestSender sender) {
+ public VESPublisher(MapperConfig config, DmaapRequestSender sender) {
this.sender = sender;
this.config = config;
}
public Flux<Event> publish(List<Event> events) {
logger.unwrap().info("Publishing VES events to messagerouter.");
- Event event = events.get(0);
- try {
- events.forEach(e -> this.publish(e.getVes()));
- logger.unwrap().info("Successfully published VES events to messagerouter.");
- } catch (MRPublisherException e) {
- logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", e);
- return Flux.empty();
- }
- return Flux.just(event);
+ Event first = events.get(0);
+ List<String> vesEvents = minifiedVesEvents(events);
+ return publishEvents(vesEvents)
+ .filter(DmaapResponse::failed)
+ .takeLast(1)
+ .flatMap(this::toFluxError)
+ .defaultIfEmpty(first)
+ .doOnComplete(() -> logger.unwrap().info("Successfully published VES events to messagerouter."))
+ .onErrorResume(this::resume);
+ }
+
+ private List<String> minifiedVesEvents(List<Event> events) {
+ return events.stream()
+ .map(Event::getVes)
+ .map(vesEvent -> vesEvent.replace("\n", ""))
+ .collect(Collectors.toList());
+ }
+
+ private Flux<MessageRouterPublishResponse> publishEvents(List<String> vesEvents) {
+ String topicUrl = config.getPublisherTopicUrl();
+ AafCredentials credentials = aafCredentials();
+ return sender.send(topicUrl, vesEvents, credentials);
+ }
+
+ private Flux<Event> toFluxError(MessageRouterPublishResponse response) {
+ return Flux.error(new MRPublisherException(response.failReason()));
+ }
+
+ private Flux<Event> resume(Throwable t) {
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", t);
+ return Flux.empty();
}
- private void publish(String ves) {
- try {
- String topicUrl = config.getPublisherTopicUrl();
- ves = ves.replaceAll("\n", "");
- String userCredentials = Base64.getEncoder()
- .encodeToString((this.config.getPublisherUserName() + ":" +
- this.config.getPublisherPassword())
- .getBytes(StandardCharsets.UTF_8));
- sender.send("POST", topicUrl, ves, userCredentials);
- } catch (Exception e) {
- throw new MRPublisherException(e.getMessage(), e);
- }
+ private AafCredentials aafCredentials() {
+ return ImmutableAafCredentials.builder()
+ .username(config.getPublisherUserName())
+ .password(config.getPublisherPassword())
+ .build();
}
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java
new file mode 100644
index 0000000..1a7c59e
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java
@@ -0,0 +1,120 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import io.vavr.control.Try;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.MAX_RETRIES;
+import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.RETRY_INTERVAL;
+
+public class DmaapRequestSender {
+ private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(DmaapRequestSender.class));
+
+ private static final DmaapRetryConfig RETRY_CONFIG = ImmutableDmaapRetryConfig.builder()
+ .retryCount(MAX_RETRIES)
+ .retryIntervalInSeconds((int) RETRY_INTERVAL.getSeconds())
+ .build();
+ private static final MessageRouterPublisherConfig CLIENT_CONFIGURATION =
+ ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(RETRY_CONFIG)
+ .build();
+ private static final DmaapTimeoutConfig READ_TIMEOUT = ImmutableDmaapTimeoutConfig.builder()
+ .timeout(SendersConfig.READ_TIMEOUT)
+ .build();
+ private static final MessageRouterPublisher PUBLISHER =
+ DmaapClientFactory.createMessageRouterPublisher(CLIENT_CONFIGURATION);
+
+ /**
+ * Sends an http request to a given dmaap-mr topic.
+ *
+ * @param topicUrl representing given topic
+ * @param vesEvents of the requests as json
+ * @param credentials base64-encoded username password credentials
+ * @return dmaap-mr response
+ */
+ public Flux<MessageRouterPublishResponse> send(final String topicUrl, final List<String> vesEvents, final AafCredentials credentials) {
+ MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+ .contentType(ContentType.TEXT_PLAIN)
+ .sinkDefinition(sink(topicUrl, credentials))
+ .timeoutConfig(READ_TIMEOUT)
+ .diagnosticContext(diagnosticContext())
+ .build();
+
+ return PUBLISHER.put(request, jsonBatch(vesEvents));
+ }
+
+ private static MessageRouterSink sink(String topicUrl, AafCredentials credentials) {
+ return ImmutableMessageRouterSink.builder()
+ .aafCredentials(credentials)
+ .topicUrl(topicUrl)
+ .build();
+ }
+
+ private static RequestDiagnosticContext diagnosticContext() {
+ UUID invocationId = uuid(LoggingUtils.invocationID(LOGGER));
+ UUID requestId = uuid(LoggingUtils.requestID());
+ return ImmutableRequestDiagnosticContext.builder()
+ .invocationId(invocationId)
+ .requestId(requestId)
+ .build();
+ }
+
+ private static Flux<JsonElement> jsonBatch(List<String> events) {
+ return Flux.fromIterable(getAsJsonElements(events));
+ }
+
+ private static List<JsonElement> getAsJsonElements(List<String> events) {
+ return events.stream()
+ .map(JsonParser::parseString)
+ .collect(Collectors.toList());
+ }
+
+ private static UUID uuid(String s) {
+ return Try.of(() -> UUID.fromString(s))
+ .getOrElse(UUID::randomUUID);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java
new file mode 100644
index 0000000..9e7fc8e
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.jboss.logging.MDC;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.onap.logging.ref.slf4j.ONAPLogConstants;
+
+import java.util.Optional;
+import java.util.UUID;
+
+final class LoggingUtils {
+
+ private LoggingUtils() {
+ throw new IllegalStateException("Utility class;shouldn't be constructed");
+ }
+
+ static String invocationID(ONAPLogAdapter logger) {
+ return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))
+ .orElseGet(()-> logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());
+ }
+
+ static String requestID() {
+ return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))
+ .orElseGet(() -> UUID.randomUUID().toString());
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
index 4993a10..3ab8ab6 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,30 +29,23 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
-import java.util.Optional;
-import java.util.UUID;
import java.util.stream.Collectors;
import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;
import org.onap.dcaegen2.services.pmmapper.exceptions.ServerResponseException;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
-import org.jboss.logging.MDC;
public class RequestSender {
- private static final int MAX_RETRIES = 5;
- private static final int RETRY_INTERVAL = 1000;
private static final String SERVER_ERROR_MESSAGE = "Error on Server";
private static final int ERROR_START_RANGE = 300;
- private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
- public static final String DELETE = "DELETE";
- public static final String DEFAULT_CONTENT_TYPE = "text/plain";
- public static final int DEFAULT_READ_TIMEOUT = 5000;
+ private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
/**
* Works just like {@link RequestSender#send(method,urlString)}, except {@code method }
@@ -91,20 +85,16 @@ public class RequestSender {
*/
public String send(String method, final String urlString, final String body, final String encodedCredentials)
throws InterruptedException {
- String invocationID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))
- .orElse(logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());
- String requestID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))
- .orElse(UUID.randomUUID().toString());
String result = "";
boolean status = false;
- int attempts = 1;
+ int attempts = 0;
try {
- while (!status && attempts <= MAX_RETRIES) {
- if(attempts != 1) {
- Thread.sleep(RETRY_INTERVAL);
+ while (!status && attempts <= SendersConfig.MAX_RETRIES) {
+ if(attempts != 0) {
+ Thread.sleep(SendersConfig.RETRY_INTERVAL.toMillis());
}
final URL url = new URL(urlString);
- final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);
+ final HttpURLConnection connection = getHttpURLConnection(method, url);
if ("https".equalsIgnoreCase(url.getProtocol())) {
HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory());
@@ -122,26 +112,25 @@ public class RequestSender {
attempts++;
}
} catch (IOException | NoSuchAlgorithmException ex) {
- logger.unwrap().warn("Request failure", ex);
+ LOGGER.unwrap().warn("Request failure", ex);
throw new RequestFailure(ex);
}
return result;
}
- private HttpURLConnection getHttpURLConnection(String method, URL url, String invocationID, String requestID)
+ private HttpURLConnection getHttpURLConnection(String method, URL url)
throws IOException {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setReadTimeout(DEFAULT_READ_TIMEOUT);
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID);
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID);
+ connection.setReadTimeout((int) SendersConfig.READ_TIMEOUT.toMillis());
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, LoggingUtils.requestID());
+ connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, LoggingUtils.invocationID(LOGGER));
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
connection.setRequestMethod(method);
-
return connection;
}
private void setMessageBody(HttpURLConnection connection, String body) throws IOException {
- connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);
+ connection.setRequestProperty("Content-Type", ContentType.TEXT_PLAIN.toString());
connection.setDoOutput(true);
OutputStream outputStream = connection.getOutputStream();
outputStream.write(body.getBytes(StandardCharsets.UTF_8));
@@ -150,7 +139,7 @@ public class RequestSender {
}
private boolean retryLimitReached(final int retryCount) {
- return retryCount >= MAX_RETRIES;
+ return retryCount >= SendersConfig.MAX_RETRIES;
}
private boolean isWithinErrorRange(final int responseCode) {
@@ -158,18 +147,18 @@ public class RequestSender {
}
private String getResult(int attemptNumber, HttpURLConnection connection) throws IOException {
- logger.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());
+ LOGGER.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());
String result = "";
try (InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
result = reader.lines().collect(Collectors.joining("\n"));
int responseCode = connection.getResponseCode();
if (!(isWithinErrorRange(responseCode))) {
- logger.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);
+ LOGGER.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);
}
} catch (Exception e) {
if (retryLimitReached(attemptNumber)) {
- logger.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);
+ LOGGER.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);
throw new ServerResponseException(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);
}
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java
new file mode 100644
index 0000000..c0c972d
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.time.Duration;
+
+final class SendersConfig {
+
+ private SendersConfig() {
+ throw new IllegalStateException("SendersConfig class;shouldn't be constructed");
+ }
+
+ static final int MAX_RETRIES = 4;
+ static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);
+ static final Duration READ_TIMEOUT = Duration.ofSeconds(5);
+}
diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
index e5c5af4..47e09e9 100644
--- a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
+++ b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,75 +19,122 @@
* ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.pmmapper.messagerouter;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import reactor.test.StepVerifier;
-import java.util.Arrays;
-import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mockito;
import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
-import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(EnvironmentConfig.class)
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
public class VESPublisherTest {
- private static String topicURL = "http://mr/topic";
- private static RequestSender sender;
- private static MapperConfig config;
+ private static final String TOPIC_URL = "http://mr/topic";
+ private static final String VES = "{}";
+ private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder()
+ .username("")
+ .password("")
+ .build();
+
+ private static final MessageRouterPublishResponse SUCCESSFUL =
+ ImmutableMessageRouterPublishResponse.builder().build();
+ private static final MessageRouterPublishResponse FAILED =
+ ImmutableMessageRouterPublishResponse.builder()
+ .failReason("failReason")
+ .build();
+
+ private DmaapRequestSender sender;
private VESPublisher sut;
- private String ves = "{}";
@Before
- public void before() throws Exception {
- config = mock(MapperConfig.class);
- sender = mock(RequestSender.class);
+ public void before() {
+ MapperConfig config = mock(MapperConfig.class);
+ when(config.getPublisherTopicUrl()).thenReturn(TOPIC_URL);
+ when(config.getPublisherUserName()).thenReturn("");
+ when(config.getPublisherPassword()).thenReturn("");
+ sender = mock(DmaapRequestSender.class);
sut = new VESPublisher(config, sender);
- when(config.getPublisherTopicUrl()).thenReturn(topicURL);
}
@Test
- public void publish_multiple_success() throws Exception {
+ public void publish_multiple_success() {
+ Event event = mock(Event.class);
+ List<Event> events = Arrays.asList(event, event, event);
+ when(event.getVes()).thenReturn(VES);
+ MessageRouterPublishResponse successfulResponse = ImmutableMessageRouterPublishResponse.builder().build();
+ when(sender.send(any(), any(), any())).thenReturn(Flux.just(successfulResponse, successfulResponse));
+
+ Flux<Event> flux = sut.publish(events);
+
+ verify(sender, times(1)).send(anyString(), any(), any());
+ StepVerifier.create(flux)
+ .expectNextMatches(event::equals)
+ .verifyComplete();
+ }
+
+ @Test
+ public void publish_multiple_fail_sender_exceptions() {
+ Event event = mock(Event.class);
+ List<Event> events = Arrays.asList(event, event, event);
+ when(event.getVes()).thenReturn(VES);
+ when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+ .thenReturn(Flux.error(new RuntimeException()));
+
+ Flux<Event> flux = sut.publish(events);
+
+ StepVerifier.create(flux)
+ .verifyComplete();
+ }
+
+ @Test
+ public void publish_multiple_fail_mr_responses_failed() {
Event event = mock(Event.class);
- List<Event> events = Arrays.asList(event,event,event);
- when(event.getVes()).thenReturn(ves);
+ List<Event> events = Arrays.asList(event, event, event);
+ when(event.getVes()).thenReturn(VES);
+ when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+ .thenReturn(Flux.just(FAILED, FAILED, FAILED));
Flux<Event> flux = sut.publish(events);
- verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
StepVerifier.create(flux)
- .expectNextMatches(event::equals)
- .expectComplete()
- .verify();
+ .verifyComplete();
}
@Test
- public void publish_multiple_fail() throws Exception {
+ public void publish_multiple_fail_and_multiple_success() {
Event event = mock(Event.class);
- List<Event> events = Arrays.asList(event,event,event);
- when(event.getVes()).thenReturn(ves);
- when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(RequestFailure.class);
+ when(event.getVes()).thenReturn(VES);
+ List<Event> events = Arrays.asList(event, event, event, event);
+ when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+ .thenReturn(Flux.just(SUCCESSFUL, FAILED, SUCCESSFUL, FAILED));
Flux<Event> flux = sut.publish(events);
StepVerifier.create(flux)
- .expectNext(events.get(0))
- .verifyComplete();
+ .verifyComplete();
}
}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
index db45029..617cbd1 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
@@ -62,7 +62,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockserver.client.server.MockServerClient;
+import org.mockserver.client.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java
index 7ddc929..26cb648 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,6 +43,7 @@ import java.util.List;
import org.everit.json.schema.Schema;
import org.everit.json.schema.loader.SchemaLoader;
+import org.json.JSONException;
import org.json.JSONObject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -79,7 +81,7 @@ class MapperTest {
@BeforeAll
- static void classSetup() throws IOException {
+ static void classSetup() throws IOException, JSONException {
JSONObject ves = new JSONObject(new String(Files.readAllBytes(schema)));
vesSchema = SchemaLoader.load(ves);
@@ -97,7 +99,7 @@ class MapperTest {
@ParameterizedTest
@MethodSource("getValidEvents")
- void testValidEvent(Event testEvent) {
+ void testValidEvent(Event testEvent) throws JSONException {
vesSchema.validate(new JSONObject(objUnderTest.map(testEvent)));
}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
index 0451543..1c6d850 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 - 2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -65,6 +66,7 @@ public class DataRouterUtilsTest {
private static MapperConfig validConfig;
private SSLContextFactory sslContextFactory;
private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json");
+ private static final String DELETE = "DELETE";
@Test
public void processEventSuccessful() throws Exception {
@@ -94,7 +96,7 @@ public class DataRouterUtilsTest {
Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
- verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+ verify(mockConnection, times(1)).setRequestMethod(DELETE);
}
@Test
@@ -117,7 +119,7 @@ public class DataRouterUtilsTest {
Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
- verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+ verify(mockConnection, times(1)).setRequestMethod(DELETE);
}
@Test
@@ -148,7 +150,7 @@ public class DataRouterUtilsTest {
PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
- verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE);
+ verify(mockConnection, times(5)).setRequestMethod(DELETE);
}
@Test
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java
new file mode 100644
index 0000000..50abb5f
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpStatusCode;
+import org.mockserver.verify.VerificationTimes;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class DmaapRequestSenderTest {
+
+ private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder()
+ .username("")
+ .password("")
+ .build();
+ private static final List<String> SINGLE = Collections.singletonList("any");
+
+ private static ClientAndServer mockServer;
+ private final MockServerClient client = mockClient();
+
+ @BeforeClass
+ public static void setup() {
+ mockServer = startClientAndServer(35454);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ mockServer.stop();
+ }
+
+ @Before
+ public void setUp() {
+ client.reset();
+ }
+
+ @Test
+ public void send_success() {
+ client.when(request()).respond(response()
+ .withStatusCode(HttpStatusCode.OK_200.code())
+ .withBody("ResponseBody"));
+
+ Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+ .send("http://127.0.0.1:35454/once", SINGLE, AAF_CREDENTIALS);
+
+ StepVerifier.create(result)
+ .expectNextMatches(DmaapResponse::successful)
+ .verifyComplete();
+ client.verify(request(), VerificationTimes.once());
+ }
+
+ @Test
+ public void host_unavailable_retry_mechanism() {
+ client.when(request())
+ .respond(response().withStatusCode(HttpStatusCode.SERVICE_UNAVAILABLE_503.code()));
+
+ Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+ .send("http://127.0.0.1:35454/anypath", SINGLE, AAF_CREDENTIALS);
+
+ StepVerifier.create(result)
+ .expectNextMatches(DmaapResponse::failed)
+ .verifyComplete();
+ client.verify(request(), VerificationTimes.exactly(5));
+ }
+
+ @Test
+ public void host_unknown() {
+ Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+ .send("http://unknown-host:35454/host-is-unknown", SINGLE, AAF_CREDENTIALS);
+
+ StepVerifier.create(result)
+ .verifyError();
+ client.verify(request(), VerificationTimes.exactly(0));
+ }
+
+ private MockServerClient mockClient() {
+ return new MockServerClient("127.0.0.1", 35454);
+ }
+}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java
index 08faf4a..8541824 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,15 +31,15 @@ import java.net.URL;
import java.net.UnknownHostException;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockserver.client.server.MockServerClient;
+import org.mockserver.client.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpStatusCode;
import org.mockserver.verify.VerificationTimes;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -53,7 +54,7 @@ import utils.LoggingUtils;
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
public class RequestSenderTests {
private static ClientAndServer mockServer;
- private MockServerClient client = mockClient();
+ private final MockServerClient client = mockClient();
@BeforeClass
public static void setup() {
@@ -65,6 +66,11 @@ public class RequestSenderTests {
mockServer.stop();
}
+ @Before
+ public void setUp() {
+ client.reset();
+ }
+
@Test
public void send_success() throws Exception {
String url = "http://127.0.0.1:35454/once";
@@ -84,11 +90,10 @@ public class RequestSenderTests {
assertTrue(logAppender.list.get(1).getMessage().contains("Sending"));
assertTrue(logAppender.list.get(2).getMessage().contains("Received"));
logAppender.stop();
- client.clear(req);
}
@Test
- public void host_unavailable_retry_mechanism() throws Exception {
+ public void host_unavailable_retry_mechanism() {
PowerMockito.mockStatic(Thread.class);
client.when(request())
@@ -99,7 +104,6 @@ public class RequestSenderTests {
});
client.verify(request(), VerificationTimes.exactly(5));
- client.clear(request());
}
@Test
@@ -114,11 +118,10 @@ public class RequestSenderTests {
});
client.verify(request(), VerificationTimes.exactly(0));
- client.clear(request());
}
private MockServerClient mockClient() {
return new MockServerClient("127.0.0.1", 35454);
}
-} \ No newline at end of file
+}