diff options
author | Chengkai Yan <martin.c.yan@est.tech> | 2018-11-23 16:23:56 +0100 |
---|---|---|
committer | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-12-03 13:20:42 +0100 |
commit | 693026c6b973c44ade969516be496f966e4fae86 (patch) | |
tree | 8398e3e98d79303d5d53c51eeccf3db018e2865b | |
parent | a3c452af58c12283d76019509dd605f67f14532c (diff) |
fix bugs in jira DCAEGEN2-940 and DCAEGEN2-941
Change-Id: Id0b3e295cab0e085746b034caccbf82aca2e0d7b
Signed-off-by: Chengkai Yan <martin.c.yan@est.tech>
Issue-ID: DCAEGEN2-940
Issue-ID: DCAEGEN2-941
17 files changed, 330 insertions, 372 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml index cef185c6..b66f7b6e 100644 --- a/datafile-app-server/config/application.yaml +++ b/datafile-app-server/config/application.yaml @@ -14,6 +14,7 @@ logging: ROOT: ERROR org.springframework: ERROR org.springframework.data: ERROR + org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR org.onap.dcaegen2.collectors.datafile: ERROR file: opt/log/application.log app: diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 171dd024..c465fe94 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -59,11 +60,18 @@ public class ScheduledTasks { */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); - + //@formatter:off consumeFromDmaapMessage() + .publishOn(Schedulers.parallel()) + .cache() .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP")) - .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration) + .flatMap(this::collectFilesFromXnf) + .retry(3) + .cache() + .flatMap(this::publishToDmaapConfiguration) + .retry(3) .subscribe(this::onSuccess, this::onError, this::onComplete); + //@formatter:on } private void onComplete() { diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml index 9f1d21de..084220fd 100644 --- a/datafile-dmaap-client/pom.xml +++ b/datafile-dmaap-client/pom.xml @@ -37,9 +37,16 @@ </properties> <dependencies> - <!-- DEVELOPMENT DEPENDENCIES --> <dependency> + <groupId>org.asynchttpclient</groupId> + <artifactId>async-http-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + </dependency> + <dependency> <groupId>org.onap.dcaegen2.collectors.datafile</groupId> <artifactId>datafile-commons</artifactId> <version>${project.parent.version}</version> diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java index 2ccf1bab..4b7cc01a 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java @@ -2,17 +2,15 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ @@ -75,6 +73,11 @@ public class FTPSClientWrapper implements IFTPSClient { } @Override + public void setFileType(int fileType) throws IOException { + ftpsClient.setFileType(fileType); + } + + @Override public void execPBSZ(int psbz) throws IOException { ftpsClient.execPBSZ(psbz); } @@ -93,4 +96,14 @@ public class FTPSClientWrapper implements IFTPSClient { public void setTimeout(Integer t) { this.ftpsClient.setDefaultTimeout(t); } + + @Override + public boolean isConnected() { + return ftpsClient.isConnected(); + } + + @Override + public void setBufferSize(int bufSize) { + ftpsClient.setBufferSize(bufSize); + } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java index 9b6eacb0..fa1d4310 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java @@ -2,17 +2,15 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ @@ -44,6 +42,7 @@ public class FileCollectResult { @Override public String toString() { - return "FileCollectResult: " + result + " Error data: " + getErrorData(); + return "FileCollectResult: " + + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData()); } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index 8247bccb..0d055fc1 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; import org.onap.dcaegen2.collectors.datafile.io.FileWrapper; @@ -72,7 +73,6 @@ public class FtpsClient extends FileCollectClient { if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) { if (getFileFromxNF(ftps)) { - closeDownConnection(ftps); fileCollectResult = new FileCollectResult(); } else { fileCollectResult = new FileCollectResult(errorData); @@ -80,6 +80,7 @@ public class FtpsClient extends FileCollectClient { } else { fileCollectResult = new FileCollectResult(errorData); } + closeDownConnection(ftps); logger.trace("retryCollectFile left with result: {}", fileCollectResult); return fileCollectResult; } @@ -87,6 +88,7 @@ public class FtpsClient extends FileCollectClient { private boolean setUpKeyManager(IFTPSClient ftps) { boolean result = true; if (keyManagerSet) { + logger.trace("keyManager already set!"); return result; } try { @@ -105,6 +107,7 @@ public class FtpsClient extends FileCollectClient { private boolean setUpTrustedCA(IFTPSClient ftps) { boolean result = true; if (trustManagerSet) { + logger.trace("trustManager already set!"); return result; } try { @@ -130,32 +133,42 @@ public class FtpsClient extends FileCollectClient { private boolean setUpConnection(IFTPSClient ftps) { boolean result = true; try { + if (ftps.isConnected()) { + addError( + "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData, + null); + return false; + } ftps.connect(fileServerData.serverAddress(), fileServerData.port()); logger.trace("after ftp connect"); boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password()); if (!loginSuccesful) { - ftps.logout(); + closeDownConnection(ftps); addError("Unable to log in to xNF. " + fileServerData, null); - result = false; + return false; } if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) { ftps.enterLocalPassiveMode(); + ftps.setFileType(FTP.BINARY_FILE_TYPE); // Set protection buffer size ftps.execPBSZ(0); // Set data channel protection to private ftps.execPROT("P"); + ftps.setBufferSize(1024 * 1024); } else { - ftps.disconnect(); + closeDownConnection(ftps); addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(), null); - result = false; + return false; } - } catch (Exception ex) { - addError("Unable to connect to xNF. Data: " + fileServerData, ex); - result = false; + } catch (Exception e) { + logger.trace("connect to ftp server failed.", e); + addError("Unable to connect to xNF. Data: " + fileServerData, e); + closeDownConnection(ftps); + return false; } - logger.trace("setUpConnection return value: {}", result); + logger.trace("setUpConnection successfully!"); return result; } @@ -169,8 +182,9 @@ public class FtpsClient extends FileCollectClient { IOutputStream outputStream = getOutputStream(); OutputStream output = outputStream.getOutputStream(outfile.getFile()); - + logger.trace("begin to retrieve from xNF."); result = ftps.retrieveFile(remoteFile, output); + logger.trace("end retrieve from xNF."); if (!result) { output.close(); logger.debug("Unable to retrieve file from xNF. Cause unknown!"); @@ -186,20 +200,26 @@ public class FtpsClient extends FileCollectClient { } catch (Exception e) { logger.trace("Unable to delete file {}.", localFile, e); } - result = false; + return false; } return result; } private void closeDownConnection(IFTPSClient ftps) { logger.trace("starting to closeDownConnection"); - try { - if (ftps != null) { - ftps.logout(); + if (ftps != null && ftps.isConnected()) { + try { + boolean logOut = ftps.logout(); + logger.trace("logOut: {}", logOut); + } catch (Exception e) { + logger.trace("Unable to logout connection.", e); + } + try { ftps.disconnect(); + logger.trace("disconnected!"); + } catch (Exception e) { + logger.trace("Unable to disconnect connection.", e); } - } catch (Exception e) { - logger.trace("Unable to logout and close connection.", e); } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java index b1472026..1a581636 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java @@ -2,17 +2,15 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ @@ -39,10 +37,16 @@ public interface IFTPSClient { public int getReplyCode(); + public void setBufferSize(int bufSize); + + public boolean isConnected(); + public void disconnect() throws IOException; public void enterLocalPassiveMode(); + public void setFileType(int fileType) throws IOException; + public void execPBSZ(int newParam) throws IOException; public void execPROT(String prot) throws IOException; diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 380eac88..0e95b0b0 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -23,26 +23,29 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; +import java.util.concurrent.Future; + +import javax.net.ssl.SSLContext; import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.ssl.SSLContextBuilder; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate; -import org.onap.dcaegen2.collectors.datafile.web.RestTemplateWrapper; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Flux; @@ -70,7 +73,7 @@ public class DmaapProducerReactiveHttpClient { private final String pwd; private IFileSystemResource fileResource; - private IRestTemplate restTemplate; + private CloseableHttpAsyncClient webClient; /** * Constructor DmaapProducerReactiveHttpClient. @@ -78,7 +81,6 @@ public class DmaapProducerReactiveHttpClient { * @param dmaapPublisherConfiguration - DMaaP producer configuration object */ public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { - this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); @@ -97,54 +99,70 @@ public class DmaapProducerReactiveHttpClient { public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); try { - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.parseMediaType(dmaapContentType)); - addMetaDataToHead(consumerDmaapModel, headers); - - addUserCredentialsToHead(headers); - - IFileSystemResource fileSystemResource = getFileSystemResource(); - fileSystemResource.setPath(consumerDmaapModel.getInternalLocation()); - InputStream fileInputStream = fileSystemResource.getInputStream(); - HttpEntity<byte[]> request = addFileToRequest(fileInputStream, headers); + logger.trace("Starting to publish to DR"); + webClient = getWebClient(); + webClient.start(); - logger.trace("Starting to publish to DR"); - ResponseEntity<String> responseEntity = getRestTemplate().exchange(getUri(consumerDmaapModel.getName()), - HttpMethod.PUT, request, String.class); + HttpPut put = new HttpPut(); + prepareHead(consumerDmaapModel, put); + prepareBody(consumerDmaapModel, put); + addUserCredentialsToHead(put); - return Flux.just(responseEntity.getStatusCode().toString()); + Future<HttpResponse> future = webClient.execute(put, null); + HttpResponse response = future.get(); + logger.trace(response.toString()); + webClient.close(); + handleHttpResponse(response); + return Flux.just(response.toString()); } catch (Exception e) { logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e); return Flux.empty(); } } - private void addUserCredentialsToHead(HttpHeaders headers) { + private void handleHttpResponse(HttpResponse response) { + int statusCode = response.getStatusLine().getStatusCode(); + if (HttpUtils.isSuccessfulResponseCode(statusCode)) { + logger.trace("Publish to DR successful!"); + } else { + logger.error("Publish to DR unsuccessful, response code: " + statusCode); + } + } + + private void addUserCredentialsToHead(HttpPut put) { String plainCreds = user + ":" + pwd; byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); String base64Creds = new String(base64CredsBytes); logger.trace("base64Creds...: {}", base64Creds); - headers.add("Authorization", "Basic " + base64Creds); + put.addHeader("Authorization", "Basic " + base64Creds); } - private void addMetaDataToHead(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) { - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); - metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + private void prepareHead(ConsumerDmaapModel model, HttpPut put) { + put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); - headers.set(X_ATT_DR_META, metaData.toString()); - } - private HttpEntity<byte[]> addFileToRequest(InputStream inputStream, HttpHeaders headers) - throws IOException { - return new HttpEntity<>(IOUtils.toByteArray(inputStream), headers); + put.addHeader(X_ATT_DR_META, metaData.toString()); + put.setURI(getUri(name)); } - private IRestTemplate getRestTemplate() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException { - if (restTemplate == null) { - restTemplate = new RestTemplateWrapper(); + private void prepareBody(ConsumerDmaapModel model, HttpPut put) { + String fileLocation = model.getInternalLocation(); + IFileSystemResource fileSystemResource = getFileSystemResource(); + fileSystemResource.setPath(fileLocation); + InputStream fileInputStream = null; + try { + fileInputStream = fileSystemResource.getInputStream(); + } catch (IOException e) { + logger.error("Unable to get stream from filesystem.", e); + } + try { + put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + } catch (IOException e) { + logger.error("Unable to set put request body from ByteArray.", e); } - return restTemplate; } private URI getUri(String fileName) { @@ -164,7 +182,26 @@ public class DmaapProducerReactiveHttpClient { fileResource = fileSystemResource; } - protected void setRestTemplate(IRestTemplate restTemplate) { - this.restTemplate = restTemplate; + protected CloseableHttpAsyncClient getWebClient() { + if (webClient != null) { + return webClient; + } + SSLContext sslContext = null; + try { + sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); + } catch (Exception e) { + logger.trace("Unable to get sslContext.", e); + } + //@formatter:off + return HttpAsyncClients.custom() + .setSSLContext(sslContext) + .setSSLHostnameVerifier(new NoopHostnameVerifier()) + .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) + .build(); + //@formatter:on + } + + protected void setWebClient(CloseableHttpAsyncClient client) { + this.webClient = client; } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/IRestTemplate.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/IRestTemplate.java deleted file mode 100644 index 07e7563d..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/IRestTemplate.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.web; - -import java.net.URI; - -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpMethod; -import org.springframework.http.ResponseEntity; - -@FunctionalInterface -public interface IRestTemplate { - public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity, - Class<String> responseType); -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java deleted file mode 100644 index 15d459f8..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ -package org.onap.dcaegen2.collectors.datafile.web; -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.HttpRequest; -import org.springframework.http.client.ClientHttpRequestExecution; -import org.springframework.http.client.ClientHttpRequestInterceptor; -import org.springframework.http.client.ClientHttpResponse; - -public class RequestResponseLoggingInterceptor implements ClientHttpRequestInterceptor { - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - @Override - public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { - logRequest(request, body); - ClientHttpResponse response = execution.execute(request, body); - logResponse(response); - return response; - } - - private void logRequest(HttpRequest request, byte[] body) throws IOException { - if (log.isDebugEnabled()) { - log.debug("===========================request begin================================================"); - log.debug("URI : {}", request.getURI()); - log.debug("Method : {}", request.getMethod()); - log.debug("Headers : {}", request.getHeaders()); - log.debug("Request body: {}", new String(body, "UTF-8")); - log.debug("==========================request end================================================"); - } - } - - private void logResponse(ClientHttpResponse response) throws IOException { - if (log.isDebugEnabled()) { - log.debug("============================response begin=========================================="); - log.debug("Status code : {}", response.getStatusCode()); - log.debug("Status text : {}", response.getStatusText()); - log.debug("Headers : {}", response.getHeaders()); - log.debug("=======================response end================================================="); - } - } -}
\ No newline at end of file diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java deleted file mode 100644 index 99ead846..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.web; - -import java.net.URI; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.util.Collections; - -import javax.net.ssl.SSLContext; - -import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.ssl.SSLContextBuilder; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpMethod; -import org.springframework.http.ResponseEntity; -import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; -import org.springframework.web.client.RestTemplate; - -public class RestTemplateWrapper implements IRestTemplate { - private RestTemplate restTemplate; - - public RestTemplateWrapper() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { - SSLContext sslContext = - new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); - CloseableHttpClient httpClient = - HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier()) - .setRedirectStrategy(new PublishRedirectStrategy()).build(); - - HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(); - requestFactory.setHttpClient(httpClient); - - restTemplate = new RestTemplate(requestFactory); - restTemplate.setInterceptors(Collections.singletonList(new RequestResponseLoggingInterceptor())); - - } - - @Override - public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity, - Class<String> responseType) { - return restTemplate.exchange(url, method, requestEntity, responseType); - } - -} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java index 083727e4..38d24233 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java @@ -2,17 +2,15 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ @@ -30,7 +28,7 @@ public class FileCollectResultTest { public void successfulResult() { FileCollectResult resultUnderTest = new FileCollectResult(); assertTrue(resultUnderTest.downloadSuccessful()); - assertEquals("FileCollectResult: true Error data: ", resultUnderTest.toString()); + assertEquals("FileCollectResult: successful!", resultUnderTest.toString()); } @Test @@ -40,6 +38,7 @@ public class FileCollectResultTest { errorData.addError("Null", new NullPointerException()); FileCollectResult resultUnderTest = new FileCollectResult(errorData); assertFalse(resultUnderTest.downloadSuccessful()); - assertEquals("FileCollectResult: false Error data: " + errorData.toString(), resultUnderTest.toString()); + assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(), + resultUnderTest.toString()); } } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index 2157e176..c134b79c 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -36,6 +36,8 @@ import java.security.KeyStoreException; import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPReply; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.io.IFile; @@ -88,7 +90,7 @@ public class FtpsClientTest { clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD); clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH); clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD); -} + } @Test public void collectFile_allOk() throws Exception { @@ -103,6 +105,7 @@ public class FtpsClientTest { OutputStream osMock = mock(OutputStream.class); when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock); when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true); + when(ftpsClientMock.isConnected()).thenReturn(false, true); ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) .userId(USERNAME).password(PASSWORD).port(PORT).build(); @@ -124,19 +127,22 @@ public class FtpsClientTest { verify(ftpsClientMock, times(1)).enterLocalPassiveMode(); verify(ftpsClientMock).execPBSZ(0); verify(ftpsClientMock).execPROT("P"); + verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); + verify(ftpsClientMock).setBufferSize(1024 * 1024); verify(localFileMock).setPath(LOCAL_FILE_PATH); verify(localFileMock, times(1)).createNewFile(); verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock); verify(osMock, times(1)).close(); verify(ftpsClientMock, times(1)).logout(); verify(ftpsClientMock, times(1)).disconnect(); + verify(ftpsClientMock, times(2)).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } @Test public void collectFileFaultyOwnKey_shouldFail() throws Exception { - doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())) - .when(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock) + .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) .userId(USERNAME).password(PASSWORD).port(PORT).build(); @@ -144,6 +150,10 @@ public class FtpsClientTest { FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); assertFalse(result.downloadSuccessful()); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock, times(1)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -160,6 +170,15 @@ public class FtpsClientTest { FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); assertFalse(result.downloadSuccessful()); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock).setKeyManager(keyManagerMock); + verify(fileResourceMock).setPath(TRUSTED_CA_PATH); + verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); + verify(inputStreamMock, times(1)).close(); + verify(trustManagerFactoryMock).init(keyStoreMock); + verify(ftpsClientMock, times(1)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -175,8 +194,19 @@ public class FtpsClientTest { FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); - verify(ftpsClientMock, times(1)).logout(); assertFalse(result.downloadSuccessful()); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock).setKeyManager(keyManagerMock); + verify(fileResourceMock).setPath(TRUSTED_CA_PATH); + verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); + verify(inputStreamMock, times(1)).close(); + verify(trustManagerFactoryMock).init(keyStoreMock); + verify(ftpsClientMock).setTrustManager(trustManagerMock); + verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); + verify(ftpsClientMock).login(USERNAME, PASSWORD); + verify(ftpsClientMock, times(3)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -186,15 +216,27 @@ public class FtpsClientTest { when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); - when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.BAD_REQUEST.value()); + when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE); ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS) .userId(USERNAME).password(PASSWORD).port(PORT).build(); FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); - verify(ftpsClientMock, times(1)).disconnect(); assertFalse(result.downloadSuccessful()); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock).setKeyManager(keyManagerMock); + verify(fileResourceMock).setPath(TRUSTED_CA_PATH); + verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); + verify(inputStreamMock, times(1)).close(); + verify(trustManagerFactoryMock).init(keyStoreMock); + verify(ftpsClientMock).setTrustManager(trustManagerMock); + verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); + verify(ftpsClientMock).login(USERNAME, PASSWORD); + verify(ftpsClientMock, times(2)).getReplyCode(); + verify(ftpsClientMock, times(3)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -212,6 +254,17 @@ public class FtpsClientTest { FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); assertFalse(result.downloadSuccessful()); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock).setKeyManager(keyManagerMock); + verify(fileResourceMock).setPath(TRUSTED_CA_PATH); + verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); + verify(inputStreamMock, times(1)).close(); + verify(trustManagerFactoryMock).init(keyStoreMock); + verify(ftpsClientMock).setTrustManager(trustManagerMock); + verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); + verify(ftpsClientMock, times(3)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -232,6 +285,26 @@ public class FtpsClientTest { assertFalse(result.downloadSuccessful()); verify(localFileMock, times(1)).delete(); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock).setKeyManager(keyManagerMock); + verify(fileResourceMock).setPath(TRUSTED_CA_PATH); + verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); + verify(inputStreamMock, times(1)).close(); + verify(trustManagerFactoryMock).init(keyStoreMock); + verify(ftpsClientMock).setTrustManager(trustManagerMock); + verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); + verify(ftpsClientMock).login(USERNAME, PASSWORD); + verify(ftpsClientMock).getReplyCode(); + verify(ftpsClientMock, times(1)).enterLocalPassiveMode(); + verify(ftpsClientMock).execPBSZ(0); + verify(ftpsClientMock).execPROT("P"); + verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); + verify(ftpsClientMock).setBufferSize(1024 * 1024); + verify(localFileMock).setPath(LOCAL_FILE_PATH); + verify(localFileMock, times(1)).createNewFile(); + verify(ftpsClientMock, times(2)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } @Test @@ -254,5 +327,26 @@ public class FtpsClientTest { FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH); assertFalse(result.downloadSuccessful()); + verify(ftpsClientMock).setNeedClientAuth(true); + verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + verify(ftpsClientMock).setKeyManager(keyManagerMock); + verify(fileResourceMock).setPath(TRUSTED_CA_PATH); + verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); + verify(inputStreamMock, times(1)).close(); + verify(trustManagerFactoryMock).init(keyStoreMock); + verify(ftpsClientMock).setTrustManager(trustManagerMock); + verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); + verify(ftpsClientMock).login(USERNAME, PASSWORD); + verify(ftpsClientMock).getReplyCode(); + verify(ftpsClientMock, times(1)).enterLocalPassiveMode(); + verify(ftpsClientMock).execPBSZ(0); + verify(ftpsClientMock).execPROT("P"); + verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); + verify(ftpsClientMock).setBufferSize(1024 * 1024); + verify(localFileMock).setPath(LOCAL_FILE_PATH); + verify(localFileMock, times(1)).createNewFile(); + verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock); + verify(ftpsClientMock, times(2)).isConnected(); + verifyNoMoreInteractions(ftpsClientMock); } -}
\ No newline at end of file +} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index d8296e1c..bf2f73d6 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -19,7 +19,6 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.gson.JsonElement; @@ -30,9 +29,16 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; @@ -40,13 +46,8 @@ import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate; -import org.springframework.http.HttpEntity; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.test.StepVerifier; @@ -77,11 +78,12 @@ class DmaapProducerReactiveHttpClientTest { private ConsumerDmaapModel consumerDmaapModel; private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class); - private IRestTemplate restTemplateMock = mock(IRestTemplate.class); - private ResponseEntity<String> responseEntityMock = mock(ResponseEntity.class); + private CloseableHttpAsyncClient clientMock; + private HttpResponse responseMock = mock(HttpResponse.class); + private Future<HttpResponse> futureMock = mock(Future.class); + private StatusLine statusLine = mock(StatusLine.class); private InputStream fileStream; - @BeforeEach void setUp() { when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST); @@ -111,59 +113,60 @@ class DmaapProducerReactiveHttpClientTest { dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock); dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock); - dmaapProducerReactiveHttpClient.setRestTemplate(restTemplateMock); + clientMock = mock(CloseableHttpAsyncClient.class); + dmaapProducerReactiveHttpClient.setWebClient(clientMock); } @Test void getHttpResponse_Success() throws Exception { mockWebClientDependantObject(true); - StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) - .expectNext(HttpStatus.OK.toString()).verifyComplete(); - URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT) .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build(); - HttpHeaders headers = new HttpHeaders(); - - headers.setContentType(MediaType.parseMediaType(APPLICATION_OCTET_STREAM_CONTENT_TYPE)); + HttpPut httpPut = new HttpPut(); + httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); - headers.set(X_ATT_DR_META, metaData.toString()); + httpPut.addHeader(X_ATT_DR_META, metaData.toString()); + httpPut.setURI(expectedUri); String plainCreds = "dradmin" + ":" + "dradmin"; byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); String base64Creds = new String(base64CredsBytes); - headers.add("Authorization", "Basic " + base64Creds); + httpPut.addHeader("Authorization", "Basic " + base64Creds); fileStream.reset(); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) + .expectNext(responseMock.toString()).verifyComplete(); - HttpEntity<byte[]> requestEntity = new HttpEntity<>(IOUtils.toByteArray(fileStream), headers); verify(fileSystemResourceMock).setPath("target/" + FILE_NAME); - verify(restTemplateMock).exchange(expectedUri, HttpMethod.PUT, requestEntity, String.class); - verifyNoMoreInteractions(restTemplateMock); + InputStream fileInputStream = fileSystemResourceMock.getInputStream(); + httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); } @Test void getHttpResponse_Fail() throws Exception { mockWebClientDependantObject(false); - StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) .verifyComplete(); } - private void mockWebClientDependantObject(boolean success) throws IOException { + private void mockWebClientDependantObject(boolean success) + throws IOException, InterruptedException, ExecutionException { fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream); - if (success) { - when(restTemplateMock.exchange(any(), any(), any(), any())).thenReturn(responseEntityMock); - when(responseEntityMock.getStatusCode()).thenReturn(HttpStatus.OK); + when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); + when(futureMock.get()).thenReturn(responseMock); + when(responseMock.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK); } else { - when(restTemplateMock.exchange(any(), any(), any(), any())).thenThrow(new RuntimeException()); + when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); + when(futureMock.get()).thenThrow(new InterruptedException()); } } } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java deleted file mode 100644 index b0f5c931..00000000 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.web; - - -import org.junit.jupiter.api.Test; -import org.springframework.http.HttpRequest; -import org.springframework.http.client.ClientHttpRequestExecution; -import org.springframework.http.client.ClientHttpResponse; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -class RequestResponseLoggingInterceptorTest { - - @Test - void intercept_shouldReturnObject() throws URISyntaxException, IOException { - - //given - RequestResponseLoggingInterceptor requestResponseLoggingInterceptor = new RequestResponseLoggingInterceptor(); - - ClientHttpRequestExecution execution = mock(ClientHttpRequestExecution.class); - HttpRequest request = mock(HttpRequest.class); - ClientHttpResponse response = mock(ClientHttpResponse.class); - - byte[] BODY = new byte[] { (byte)0xe0, 0x4f, (byte)0xd0, 0x20, (byte)0xa2 }; - URI uri = new URI("www.someuri.com"); - - //when - when(execution.execute(request, BODY)).thenReturn(response); - - //then - assertNotNull(requestResponseLoggingInterceptor.intercept(request, BODY, execution)); - } -} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java deleted file mode 100644 index 3a0701ff..00000000 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.web; - -import org.junit.jupiter.api.Test; - -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class RestTemplateWrapperTest { - - @Test - void constructor_shouldReturnNotNullObject() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { - RestTemplateWrapper restTemplateWrapper = new RestTemplateWrapper(); - assertNotNull(restTemplateWrapper); - } -} @@ -46,7 +46,7 @@ <properties> <java.version>8</java.version> <immutable.version>2.7.1</immutable.version> - <spring.version>5.1.0.RELEASE</spring.version> + <spring.version>5.1.2.RELEASE</spring.version> <spring-boot.version>2.1.0.M4</spring-boot.version> <tomcat.version>8.5.34</tomcat.version> <docker.maven.version>1.0.0</docker.maven.version> @@ -136,6 +136,16 @@ <dependencyManagement> <dependencies> <dependency> + <groupId>org.asynchttpclient</groupId> + <artifactId>async-http-client</artifactId> + <version>2.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + <version>4.1.4</version> + </dependency> + <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-SR10</version> |