aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java27
1 files changed, 20 insertions, 7 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 5f72808e..6e4679c3 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonElement;
import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
+import io.vavr.collection.Map;
import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
@@ -43,6 +44,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErr
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,16 +112,14 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
}
private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
- ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
+ return ImmutableHttpRequest.builder()
.method(HttpMethod.POST)
.url(request.sinkDefinition().topicUrl())
.diagnosticContext(request.diagnosticContext().withNewInvocationId())
- .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
- .body(body);
-
- return Option.of(request.timeoutConfig())
- .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
- .getOrElse(requestBuilder::build);
+ .customHeaders(headers(request))
+ .body(body)
+ .timeout(timeout(request).getOrNull())
+ .build();
}
private MessageRouterPublishResponse buildResponse(
@@ -138,4 +138,17 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
.failReason(failReason)
.build());
}
+
+ private Option<Duration> timeout(MessageRouterPublishRequest request) {
+ return Option.of(request.timeoutConfig())
+ .map(DmaapTimeoutConfig::getTimeout);
+ }
+
+ private Map<String, String> headers(MessageRouterPublishRequest request) {
+ Map<String, String> headers = Option.of(request.sinkDefinition().aafCredentials())
+ .map(Commons::basicAuthHeader)
+ .map(HashMap::of)
+ .getOrElse(HashMap.empty());
+ return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString());
+ }
}