summaryrefslogtreecommitdiffstats
path: root/prh-dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'prh-dmaap-client/src/main')
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java38
1 files changed, 23 insertions, 15 deletions
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
index a99833dc..cb7d5af2 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -17,14 +17,12 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.onap.dcaegen2.services.prh.service.consumer;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Optional;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.slf4j.Logger;
@@ -49,6 +47,9 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String dmaapContentType;
+ private final String dmaapUserName;
+ private final String dmaapUserPassword;
public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
this.dmaapHostName = consumerConfiguration.dmaapHostName();
@@ -57,17 +58,21 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
- String dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.dmaapUserName = consumerConfiguration.dmaapUserName();
+ this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword();
+ }
+
+ public void initWebClient() {
this.webClient = WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
- .filter(
- basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword()))
+ .filter(basicAuthentication(dmaapUserName, dmaapUserPassword))
.filter(logRequest())
.filter(logResponse())
.build();
}
- public Mono<Optional<String>> getDmaaPConsumerResponse() {
+ public Mono<String> getDmaaPConsumerResponse() {
try {
return webClient
.get()
@@ -78,31 +83,34 @@ public class DmaapConsumerReactiveHttpClient {
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
Mono.error(new Exception("HTTP 500")))
- .bodyToMono(String.class)
- .map(Optional::of);
+ .bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while executing HTTP request: ", e);
return Mono.error(e);
}
}
- private URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
- }
-
private String createRequestPath() {
return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
}
- private ExchangeFilterFunction logResponse() {
+ void initWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ }
+
+ ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
logger.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
- private ExchangeFilterFunction logRequest() {
+ URI getUri() throws URISyntaxException {
+ return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+ .setPath(createRequestPath()).build();
+ }
+
+ ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()