aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java67
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java43
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java37
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java28
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java164
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java215
8 files changed, 292 insertions, 268 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 1bf3ec5a..4283debf 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -67,7 +67,7 @@ public class FtpsClient implements FileCollectClient {
} catch (DatafileTaskException e) {
throw e;
} catch (Exception e) {
- throw new DatafileTaskException("Could not open connection: ", e);
+ throw new DatafileTaskException("Could not open connection: " + e, e);
}
}
@@ -100,7 +100,7 @@ public class FtpsClient implements FileCollectClient {
throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
}
} catch (IOException e) {
- throw new DatafileTaskException("Could not fetch file: ", e);
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
}
logger.trace("collectFile fetched: {}", localFileName);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 2f489166..dec8af42 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -79,7 +79,7 @@ public class SftpClient implements FileCollectClient {
sftpChannel = getChannel(session);
}
} catch (JSchException e) {
- throw new DatafileTaskException("Could not open Sftp client", e);
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..e01a941b
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ @Override
+ public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ @Override
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
new file mode 100644
index 00000000..e0a51a80
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface IHttpAsyncClientBuilder {
+ public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy);
+
+ public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext);
+
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier);
+
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config);
+
+ public CloseableHttpAsyncClient build();
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
deleted file mode 100644
index 5295b124..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import org.springframework.core.io.FileSystemResource;
-
-public class FileSystemResourceWrapper implements IFileSystemResource {
- private FileSystemResource realResource;
-
- @Override
- public void setPath(Path path) {
- realResource = new FileSystemResource(path);
- }
- @Override
- public InputStream getInputStream() throws IOException {
- return realResource.getInputStream();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
deleted file mode 100644
index 23f14a33..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-
-public interface IFileSystemResource {
-
- public void setPath(Path filePath);
-
- public InputStream getInputStream() throws IOException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9304688f..9bd5d57f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,54 +16,34 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.UriBuilder;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -71,11 +51,7 @@ import reactor.core.publisher.Mono;
*/
public class DmaapProducerReactiveHttpClient {
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
- private static final String NAME_JSON_TAG = "name";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String URI_SEPARATOR = "/";
- private static final String DEFAULT_FEED_ID = "1";
+ private static final int NO_REQUEST_TIMEOUT = -1;
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
@@ -83,14 +59,10 @@ public class DmaapProducerReactiveHttpClient {
private final String dmaapHostName;
private final Integer dmaapPortNumber;
- private final String dmaapTopicName;
private final String dmaapProtocol;
- private final String dmaapContentType;
private final String user;
private final String pwd;
- private IFileSystemResource fileResource = new FileSystemResourceWrapper();
-
/**
* Constructor DmaapProducerReactiveHttpClient.
*
@@ -99,96 +71,86 @@ public class DmaapProducerReactiveHttpClient {
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
this.user = dmaapPublisherConfiguration.dmaapUserName();
this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
}
- /**
- * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter.
- *
- * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
- * @return status code of operation
- */
- public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
- Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- try (CloseableHttpAsyncClient webClient = createWebClient()) {
-
- HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
- addUserCredentialsToHead(put);
-
- logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
- Future<HttpResponse> future = webClient.execute(put, null);
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response);
- return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ return response;
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
- return Mono.error(e);
+ throw new DatafileTaskException("Unable to create web client.", e);
}
}
- private void addUserCredentialsToHead(HttpPut put) {
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ public void addUserCredentialsToHead(HttpUriRequest request) {
String plainCreds = user + ":" + pwd;
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
logger.trace("base64Creds...: {}", base64Creds);
- put.addHeader("Authorization", "Basic " + base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
- put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getUri(name));
-
- String requestID = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestID);
- String invocationID = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationID);
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(dmaapProtocol) //
+ .host(dmaapHostName) //
+ .port(dmaapPortNumber);
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
- Path fileLocation = Paths.get(model.getInternalLocation());
- this.fileResource.setPath(fileLocation);
- InputStream fileInputStream = fileResource.getInputStream();
-
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- }
+ IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSSLContext(sslContext) //
+ .setSSLHostnameVerifier(new NoopHostnameVerifier());
- private URI getUri(String fileName) {
- String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName;
- return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
- .path(path).build();
- }
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
- void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
- }
+ if (requestTimeout > NO_REQUEST_TIMEOUT) {
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(requestTimeout) //
+ .setConnectTimeout(requestTimeout) //
+ .setConnectionRequestTimeout(requestTimeout) //
+ .build();
- protected CloseableHttpAsyncClient createWebClient()
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ }
- SSLContext sslContext = new SSLContextBuilder() //
- .loadTrustMaterial(null, (certificate, authType) -> true) //
- .build();
-
- CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() //
- .setSSLContext(sslContext) //
- .setSSLHostnameVerifier(new NoopHostnameVerifier()) //
- .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) //
- .build();
- webClient.start();
- return webClient;
+ return clientBuilder.build();
}
+ IHttpAsyncClientBuilder getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index 06ff570c..91c4c334 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -16,51 +16,44 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -68,32 +61,24 @@ import reactor.test.StepVerifier;
*/
class DmaapProducerReactiveHttpClientTest {
- private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String NAME_JSON_TAG = "name";
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-
private static final String HOST = "54.45.33.2";
private static final String HTTPS_SCHEME = "https";
private static final int PORT = 1234;
- private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
- private static final String URI_SEPARATOR = "/";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
- private static final String FILE_CONTENT = "Just a string.";
+ private static final String USER_NAME = "dradmin";
+ private static final int TWO_SECOND_TIMEOUT = 2000;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+
+ private DmaapProducerReactiveHttpClient producerClientUnderTestSpy;
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
- private ConsumerDmaapModel consumerDmaapModel;
- private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
+ private IHttpAsyncClientBuilder clientBuilderMock;
+
private CloseableHttpAsyncClient clientMock;
- private HttpResponse responseMock = mock(HttpResponse.class);
@SuppressWarnings("unchecked")
private Future<HttpResponse> futureMock = mock(Future.class);
- private StatusLine statusLine = mock(StatusLine.class);
- private InputStream fileStream;
@BeforeEach
void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
@@ -102,82 +87,114 @@ class DmaapProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
- when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
-
- // @formatter:off
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
- //formatter:on
-
- dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
- dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
+
+ producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
+
+ clientBuilderMock = mock(IHttpAsyncClientBuilder.class);
clientMock = mock(CloseableHttpAsyncClient.class);
- doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient();
}
@Test
- void getHttpResponse_Success() throws Exception {
- mockWebClientDependantObject();
+ void getHttpResponseWithRederict_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- HttpPut httpPut = new HttpPut();
- httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP);
- URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
- .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
- httpPut.setURI(expectedUri);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
- metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
+ verify(clientMock).start();
+ verify(clientMock).close();
- String plainCreds = "dradmin" + ":" + "dradmin";
- byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
- byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
- String base64Creds = new String(base64CredsBytes);
- httpPut.addHeader("Authorization", "Basic " + base64Creds);
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
+ }
- fileStream.reset();
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
- .expectNext(HttpStatus.OK).verifyComplete();
+ @Test
+ void getHttpResponseWithCustomTimeout_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
- InputStream fileInputStream = fileSystemResourceMock.getInputStream();
- httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP);
+
+ ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
+ RequestConfig requestConfig = requestConfigCaptor.getValue();
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout());
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
}
@Test
- void getHttpResponse_Fail() throws Exception {
- Map<String, String> contextMap = new HashMap<>();
- doReturn(futureMock).when(clientMock).execute(any(), any());
- doThrow(new InterruptedException()).when(futureMock).get();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) //
- .expectError() //
- .verify(); //
+ public void getResponseWithException_throwsException() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ HttpPut request = new HttpPut();
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+
+ try {
+ when(futureMock.get()).thenThrow(new InterruptedException("Interrupted"));
+
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT,
+ CONTEXT_MAP);
+
+ fail("Should have got an exception.");
+ } catch (DatafileTaskException e) {
+ assertTrue(e.getCause() instanceof InterruptedException);
+ assertEquals("Interrupted", e.getCause().getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception");
+ }
+
+ verify(clientMock).start();
+ verify(clientMock).close();
}
- private void mockWebClientDependantObject()
- throws IOException, InterruptedException, ExecutionException {
- fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
- when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
- when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
- when(futureMock.get()).thenReturn(responseMock);
- when(responseMock.getStatusLine()).thenReturn(statusLine);
- when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
+ @Test
+ public void addCredentialsToHead_success() {
+ HttpPut request = new HttpPut();
+
+ producerClientUnderTestSpy.addUserCredentialsToHead(request);
+ String plainCreds = USER_NAME + ":" + USER_NAME;
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = "Basic " + new String(base64CredsBytes);
+ Header[] authorizationHeaders = request.getHeaders("Authorization");
+ assertEquals(base64Creds, authorizationHeaders[0].getValue());
+ }
+
+ @Test
+ public void getBaseUri_success() {
+ URI uri = producerClientUnderTestSpy.getBaseUri().build();
+ assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
}
}