diff options
Diffstat (limited to 'datafile-dmaap-client')
8 files changed, 292 insertions, 268 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index 1bf3ec5a..4283debf 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -67,7 +67,7 @@ public class FtpsClient implements FileCollectClient { } catch (DatafileTaskException e) { throw e; } catch (Exception e) { - throw new DatafileTaskException("Could not open connection: ", e); + throw new DatafileTaskException("Could not open connection: " + e, e); } } @@ -100,7 +100,7 @@ public class FtpsClient implements FileCollectClient { throw new DatafileTaskException("Could not retrieve file " + remoteFileName); } } catch (IOException e) { - throw new DatafileTaskException("Could not fetch file: ", e); + throw new DatafileTaskException("Could not fetch file: " + e, e); } logger.trace("collectFile fetched: {}", localFileName); } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 2f489166..dec8af42 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -79,7 +79,7 @@ public class SftpClient implements FileCollectClient { sftpChannel = getChannel(session); } } catch (JSchException e) { - throw new DatafileTaskException("Could not open Sftp client", e); + throw new DatafileTaskException("Could not open Sftp client" + e, e); } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java new file mode 100644 index 00000000..e01a941b --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.http; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import org.apache.http.client.RedirectStrategy; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.client.HttpAsyncClients; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder { + HttpAsyncClientBuilder builder = HttpAsyncClients.custom(); + + @Override + public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) { + builder.setRedirectStrategy(redirectStrategy); + return this; + } + + @Override + public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) { + builder.setSSLContext(sslcontext); + return this; + } + + @Override + public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) { + builder.setSSLHostnameVerifier(hostnameVerifier); + return this; + } + + @Override + public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) { + builder.setDefaultRequestConfig(config); + return this; + } + + @Override + public CloseableHttpAsyncClient build() { + return builder.build(); + } + +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java new file mode 100644 index 00000000..e0a51a80 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.http; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import org.apache.http.client.RedirectStrategy; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public interface IHttpAsyncClientBuilder { + public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy); + + public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext); + + public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier); + + public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config); + + public CloseableHttpAsyncClient build(); +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java deleted file mode 100644 index 5295b124..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; -import org.springframework.core.io.FileSystemResource; - -public class FileSystemResourceWrapper implements IFileSystemResource { - private FileSystemResource realResource; - - @Override - public void setPath(Path path) { - realResource = new FileSystemResource(path); - } - @Override - public InputStream getInputStream() throws IOException { - return realResource.getInputStream(); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java deleted file mode 100644 index 23f14a33..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; - -public interface IFileSystemResource { - - public void setPath(Path filePath); - - public InputStream getInputStream() throws IOException; -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 9304688f..9bd5d57f 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -16,54 +16,34 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; - -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Future; import javax.net.ssl.SSLContext; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.ssl.SSLContextBuilder; -import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; -import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper; +import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; - -import reactor.core.publisher.Mono; +import org.springframework.web.util.UriBuilder; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -71,11 +51,7 @@ import reactor.core.publisher.Mono; */ public class DmaapProducerReactiveHttpClient { - private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; - private static final String NAME_JSON_TAG = "name"; - private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation"; - private static final String URI_SEPARATOR = "/"; - private static final String DEFAULT_FEED_ID = "1"; + private static final int NO_REQUEST_TIMEOUT = -1; private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN"); @@ -83,14 +59,10 @@ public class DmaapProducerReactiveHttpClient { private final String dmaapHostName; private final Integer dmaapPortNumber; - private final String dmaapTopicName; private final String dmaapProtocol; - private final String dmaapContentType; private final String user; private final String pwd; - private IFileSystemResource fileResource = new FileSystemResourceWrapper(); - /** * Constructor DmaapProducerReactiveHttpClient. * @@ -99,96 +71,86 @@ public class DmaapProducerReactiveHttpClient { public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); - this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); - this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); this.user = dmaapPublisherConfiguration.dmaapUserName(); this.pwd = dmaapPublisherConfiguration.dmaapUserPassword(); } - /** - * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter. - * - * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter - * @return status code of operation - */ - public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel, - Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - try (CloseableHttpAsyncClient webClient = createWebClient()) { - - HttpPut put = new HttpPut(); - prepareHead(consumerDmaapModel, put); - prepareBody(consumerDmaapModel, put); - addUserCredentialsToHead(put); - - logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation()); - Future<HttpResponse> future = webClient.execute(put, null); + public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap) + throws DatafileTaskException { + try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) { + MdcVariables.setMdcContextMap(contextMap); + webClient.start(); + + logger.trace(INVOKE, "Starting to produce to DR {}", request); + Future<HttpResponse> future = webClient.execute(request, null); HttpResponse response = future.get(); - logger.trace(INVOKE_RETURN, "Response from DR {}", response); - return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); + logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString()); + return response; } catch (Exception e) { - logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); - return Mono.error(e); + throw new DatafileTaskException("Unable to create web client.", e); } } - private void addUserCredentialsToHead(HttpPut put) { + public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout, + Map<String, String> contextMap) throws DatafileTaskException { + try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) { + MdcVariables.setMdcContextMap(contextMap); + webClient.start(); + + logger.trace(INVOKE, "Starting to produce to DR {}", request); + Future<HttpResponse> future = webClient.execute(request, null); + HttpResponse response = future.get(); + logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString()); + return response; + } catch (Exception e) { + throw new DatafileTaskException("Unable to create web client.", e); + } + } + + public void addUserCredentialsToHead(HttpUriRequest request) { String plainCreds = user + ":" + pwd; byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); String base64Creds = new String(base64CredsBytes); logger.trace("base64Creds...: {}", base64Creds); - put.addHeader("Authorization", "Basic " + base64Creds); + request.addHeader("Authorization", "Basic " + base64Creds); } - private void prepareHead(ConsumerDmaapModel model, HttpPut put) { - put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); - metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); - put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getUri(name)); - - String requestID = MDC.get(REQUEST_ID); - put.addHeader(X_ONAP_REQUEST_ID, requestID); - String invocationID = UUID.randomUUID().toString(); - put.addHeader(X_INVOCATION_ID, invocationID); + public UriBuilder getBaseUri() { + return new DefaultUriBuilderFactory().builder() // + .scheme(dmaapProtocol) // + .host(dmaapHostName) // + .port(dmaapPortNumber); } - private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { - Path fileLocation = Paths.get(model.getInternalLocation()); - this.fileResource.setPath(fileLocation); - InputStream fileInputStream = fileResource.getInputStream(); - - put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout) + throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + SSLContext sslContext = + new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); - } + IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder(); + clientBuilder.setSSLContext(sslContext) // + .setSSLHostnameVerifier(new NoopHostnameVerifier()); - private URI getUri(String fileName) { - String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName; - return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) - .path(path).build(); - } + if (expectRedirect) { + clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE); + } - void setFileSystemResource(IFileSystemResource fileSystemResource) { - fileResource = fileSystemResource; - } + if (requestTimeout > NO_REQUEST_TIMEOUT) { + RequestConfig requestConfig = RequestConfig.custom() // + .setSocketTimeout(requestTimeout) // + .setConnectTimeout(requestTimeout) // + .setConnectionRequestTimeout(requestTimeout) // + .build(); - protected CloseableHttpAsyncClient createWebClient() - throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + clientBuilder.setDefaultRequestConfig(requestConfig); + } - SSLContext sslContext = new SSLContextBuilder() // - .loadTrustMaterial(null, (certificate, authType) -> true) // - .build(); - - CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() // - .setSSLContext(sslContext) // - .setSSLHostnameVerifier(new NoopHostnameVerifier()) // - .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) // - .build(); - webClient.start(); - return webClient; + return clientBuilder.build(); } + IHttpAsyncClientBuilder getHttpClientBuilder() { + return new HttpAsyncClientBuilderWrapper(); + } }
\ No newline at end of file diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index 06ff570c..91c4c334 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -16,51 +16,44 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; -import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; +import org.apache.http.Header; import org.apache.http.HttpResponse; -import org.apache.http.StatusLine; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import org.mockito.ArgumentCaptor; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder; +import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.web.util.DefaultUriBuilderFactory; - -import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -68,32 +61,24 @@ import reactor.test.StepVerifier; */ class DmaapProducerReactiveHttpClientTest { - private static final String FILE_NAME = "A20161224.1030-1045.bin.gz"; - private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation"; - private static final String NAME_JSON_TAG = "name"; - private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; - private static final String HOST = "54.45.33.2"; private static final String HTTPS_SCHEME = "https"; private static final int PORT = 1234; - private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; - private static final String URI_SEPARATOR = "/"; - private static final String PUBLISH_TOPIC = "publish"; - private static final String DEFAULT_FEED_ID = "1"; - private static final String FILE_CONTENT = "Just a string."; + private static final String USER_NAME = "dradmin"; + private static final int TWO_SECOND_TIMEOUT = 2000; - private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; + private static final Map<String, String> CONTEXT_MAP = new HashMap<>(); + + + private DmaapProducerReactiveHttpClient producerClientUnderTestSpy; private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class); - private ConsumerDmaapModel consumerDmaapModel; - private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class); + private IHttpAsyncClientBuilder clientBuilderMock; + private CloseableHttpAsyncClient clientMock; - private HttpResponse responseMock = mock(HttpResponse.class); @SuppressWarnings("unchecked") private Future<HttpResponse> futureMock = mock(Future.class); - private StatusLine statusLine = mock(StatusLine.class); - private InputStream fileStream; @BeforeEach void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { @@ -102,82 +87,114 @@ class DmaapProducerReactiveHttpClientTest { when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin"); when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin"); - when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE); - when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC); - - // @formatter:off - consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName("NrRadio") - .vendorName("Ericsson") - .lastEpochMicrosec("8745745764578") - .sourceName("oteNB5309") - .startEpochMicrosec("8745745764578") - .timeZoneOffset("UTC+05:00") - .name("A20161224.1030-1045.bin.gz") - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz") - .internalLocation("target/A20161224.1030-1045.bin.gz") - .compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec") - .fileFormatVersion("V10") - .build(); - //formatter:on - - dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock)); - dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock); + + producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock)); + + clientBuilderMock = mock(IHttpAsyncClientBuilder.class); clientMock = mock(CloseableHttpAsyncClient.class); - doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient(); } @Test - void getHttpResponse_Success() throws Exception { - mockWebClientDependantObject(); + void getHttpResponseWithRederict_Success() throws Exception { + doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder(); + when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock); + when(clientBuilderMock.setSSLHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock); + when(clientBuilderMock.build()).thenReturn(clientMock); + when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock); + HttpResponse responseMock = mock(HttpResponse.class); + when(futureMock.get()).thenReturn(responseMock); - HttpPut httpPut = new HttpPut(); - httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); + HttpGet request = new HttpGet(); + producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP); - URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT) - .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build(); - httpPut.setURI(expectedUri); + verify(clientBuilderMock).setSSLContext(any(SSLContext.class)); + verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class)); + verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE); + verify(clientBuilderMock).build(); + verifyNoMoreInteractions(clientBuilderMock); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); - metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); - metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); - httpPut.addHeader(X_DMAAP_DR_META, metaData.toString()); + verify(clientMock).start(); + verify(clientMock).close(); - String plainCreds = "dradmin" + ":" + "dradmin"; - byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); - byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); - String base64Creds = new String(base64CredsBytes); - httpPut.addHeader("Authorization", "Basic " + base64Creds); + verify(futureMock).get(); + verifyNoMoreInteractions(futureMock); + } - fileStream.reset(); - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) - .expectNext(HttpStatus.OK).verifyComplete(); + @Test + void getHttpResponseWithCustomTimeout_Success() throws Exception { + doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder(); + when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock); + when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock); + when(clientBuilderMock.build()).thenReturn(clientMock); + when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock); + HttpResponse responseMock = mock(HttpResponse.class); + when(futureMock.get()).thenReturn(responseMock); - verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME)); - InputStream fileInputStream = fileSystemResourceMock.getInputStream(); - httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + HttpGet request = new HttpGet(); + producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP); + + ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class); + verify(clientBuilderMock).setSSLContext(any(SSLContext.class)); + verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class)); + verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture()); + RequestConfig requestConfig = requestConfigCaptor.getValue(); + assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout()); + assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout()); + assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout()); + verify(clientBuilderMock).build(); + verifyNoMoreInteractions(clientBuilderMock); + + verify(clientMock).start(); + verify(clientMock).close(); + + verify(futureMock).get(); + verifyNoMoreInteractions(futureMock); } @Test - void getHttpResponse_Fail() throws Exception { - Map<String, String> contextMap = new HashMap<>(); - doReturn(futureMock).when(clientMock).execute(any(), any()); - doThrow(new InterruptedException()).when(futureMock).get(); - StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) // - .expectError() // - .verify(); // + public void getResponseWithException_throwsException() throws Exception { + doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder(); + when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock); + when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock); + when(clientBuilderMock.build()).thenReturn(clientMock); + HttpPut request = new HttpPut(); + when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); + + try { + when(futureMock.get()).thenThrow(new InterruptedException("Interrupted")); + + producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, + CONTEXT_MAP); + + fail("Should have got an exception."); + } catch (DatafileTaskException e) { + assertTrue(e.getCause() instanceof InterruptedException); + assertEquals("Interrupted", e.getCause().getMessage()); + } catch (Exception e) { + fail("Wrong exception"); + } + + verify(clientMock).start(); + verify(clientMock).close(); } - private void mockWebClientDependantObject() - throws IOException, InterruptedException, ExecutionException { - fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); - when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream); - when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); - when(futureMock.get()).thenReturn(responseMock); - when(responseMock.getStatusLine()).thenReturn(statusLine); - when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK); + @Test + public void addCredentialsToHead_success() { + HttpPut request = new HttpPut(); + + producerClientUnderTestSpy.addUserCredentialsToHead(request); + String plainCreds = USER_NAME + ":" + USER_NAME; + byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); + byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); + String base64Creds = "Basic " + new String(base64CredsBytes); + Header[] authorizationHeaders = request.getHeaders("Authorization"); + assertEquals(base64Creds, authorizationHeaders[0].getValue()); + } + + @Test + public void getBaseUri_success() { + URI uri = producerClientUnderTestSpy.getBaseUri().build(); + assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString()); } } |