diff options
author | Tony Hansen <tony@att.com> | 2019-03-18 13:45:01 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-03-18 13:45:01 +0000 |
commit | 6870154043d73d527cc42aca7ade7e49aa961476 (patch) | |
tree | 7682b329ef8bcf1f4e18c170dc9e91d1197d976a | |
parent | 84e13d376c82b96f5dad949b3155478f8a421545 (diff) | |
parent | ad4a3a514bd943df22a2e27d78f0706d412ebe9f (diff) |
Merge "Thread safety issues"
21 files changed, 436 insertions, 782 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 40de33dd..82c390f7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -2,39 +2,44 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ServiceLoader; + +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import java.io.*; -import java.util.ServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.stereotype.Component; -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -55,49 +60,53 @@ public class AppConfig { private static final String SECURITY = "security"; private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); - DmaapConsumerConfiguration dmaapConsumerConfiguration; - - DmaapPublisherConfiguration dmaapPublisherConfiguration; - - FtpesConfig ftpesConfig; + private DmaapConsumerConfiguration dmaapConsumerConfiguration; + private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private FtpesConfig ftpesConfiguration; @NotEmpty private String filepath; - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { return dmaapConsumerConfiguration; } - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return dmaapPublisherConfiguration; } - public FtpesConfig getFtpesConfiguration() { - return ftpesConfig; + public synchronized FtpesConfig getFtpesConfiguration() { + return ftpesConfiguration; } - public void initFileStreamReader() { + public void loadConfigurationFromFile() { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); JsonParser parser = new JsonParser(); JsonObject jsonObject; - try (InputStream inputStream = getInputStream(filepath)) { + try (InputStream inputStream = createInputStream(filepath)) { JsonElement rootElement = getJsonElement(parser, inputStream); if (rootElement.isJsonObject()) { jsonObject = rootElement.getAsJsonObject(); - ftpesConfig = deserializeType(gsonBuilder, + FtpesConfig ftpesConfig = deserializeType(gsonBuilder, jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), FtpesConfig.class); - dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), + DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder, + concatenateJsonObjects( + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) + .getAsJsonObject(DMAAP_CONSUMER), + rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), DmaapConsumerConfiguration.class); - dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), + DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder, + concatenateJsonObjects( + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) + .getAsJsonObject(DMAAP_PRODUCER), + rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), DmaapPublisherConfiguration.class); + + setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); } } catch (IOException e) { logger.error("Problem with file loading, file: {}", filepath, e); @@ -106,30 +115,36 @@ public class AppConfig { } } + synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration, + DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) { + this.dmaapConsumerConfiguration = consumerConfiguration; + this.dmaapPublisherConfiguration = publisherConfiguration; + this.ftpesConfiguration = ftpesConfig; + } + JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { return parser.parse(new InputStreamReader(inputStream)); } private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class<T> type) { + @NotNull Class<T> type) { return gsonBuilder.create().fromJson(jsonObject, type); } - InputStream getInputStream(@NotNull String filepath) throws IOException { + InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } - String getFilepath() { + synchronized String getFilepath() { return this.filepath; } - public void setFilepath(String filepath) { + public synchronized void setFilepath(String filepath) { this.filepath = filepath; } private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet() - .forEach(entry -> target.add(entry.getKey(), entry.getValue())); + source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue())); return target; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 208e8a43..0254597e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -56,7 +56,7 @@ public class CloudConfiguration extends AppConfig { private Properties systemEnvironment; @Autowired - public void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { + public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; } @@ -80,7 +80,7 @@ public class CloudConfiguration extends AppConfig { .subscribe(this::parseCloudConfig, this::cloudConfigError); } - private void parseCloudConfig(JsonObject jsonObject) { + private synchronized void parseCloudConfig(JsonObject jsonObject) { logger.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); @@ -89,17 +89,17 @@ public class CloudConfiguration extends AppConfig { } @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); } @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); } @Override - public FtpesConfig getFtpesConfiguration() { + public synchronized FtpesConfig getFtpesConfiguration() { return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index a0020318..af4670e3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.util.Map; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -40,22 +41,16 @@ import reactor.core.publisher.Mono; public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); - private AppConfig datafileAppConfig; - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; + private final AppConfig datafileAppConfig; - - public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) { + public FileCollector(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; - this.ftpsClient = ftpsClient; - this.sftpClient = sftpClient; } public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, Duration firstBackoffTimeout, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); - resolveKeyStore(); //@formatter:off return Mono.just(fileData) @@ -65,18 +60,6 @@ public class FileCollector { //@formatter:on } - private FtpesConfig resolveConfiguration() { - return datafileAppConfig.getFtpesConfiguration(); - } - - private void resolveKeyStore() { - FtpesConfig ftpesConfig = resolveConfiguration(); - ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); - ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); - ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); - ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); - } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); @@ -85,11 +68,8 @@ public class FileCollector { final String remoteFile = fileData.remoteFilePath(); final Path localFile = fileData.getLocalFileName(); - try { + try (FileCollectClient currentClient = createClient(fileData)) { localFile.getParent().toFile().mkdir(); // Create parent directories - - FileCollectClient currentClient = selectClient(fileData); - currentClient.collectFile(remoteFile, localFile); return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); } catch (Exception throwable) { @@ -98,12 +78,12 @@ public class FileCollector { } } - private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException { + private FileCollectClient createClient(FileData fileData) throws DatafileTaskException { switch (fileData.scheme()) { case SFTP: - return sftpClient; + return createSftpClient(fileData); case FTPS: - return ftpsClient; + return createFtpsClient(fileData); default: throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme()); } @@ -129,4 +109,17 @@ public class FileCollector { .build(); // @formatter:on } + + SftpClient createSftpClient(FileData fileData) throws DatafileTaskException { + SftpClient client = new SftpClient(fileData.fileServerData()); + client.open(); + return client; + } + + FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException { + FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); + FtpsClient client = new FtpsClient(fileData.fileServerData()); + client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword()); + return client; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 89ebde8f..28963377 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -57,12 +55,10 @@ public class ScheduledTasks { /** Data needed for fetching of one file */ private class FileCollectionData { final FileData fileData; - final FileCollector collectorTask; final MessageMetaData metaData; - FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { + FileCollectionData(FileData fd, MessageMetaData metaData) { this.fileData = fd; - this.collectorTask = collectorTask; this.metaData = metaData; } } @@ -90,7 +86,7 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Execution of tasks was registered"); - applicationConfiguration.initFileStreamReader(); + applicationConfiguration.loadConfigurationFromFile(); createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), () -> onComplete(contextMap)); } @@ -143,8 +139,7 @@ public class ScheduledTasks { List<FileCollectionData> fileCollects = new ArrayList<>(); for (FileData fileData : availableFiles.files()) { - fileCollects.add( - new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData())); + fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData())); } return Flux.fromIterable(fileCollects); } @@ -159,7 +154,7 @@ public class ScheduledTasks { final Duration initialRetryTimeout = Duration.ofSeconds(5); MdcVariables.setMdcContextMap(contextMap); - return fileCollect.collectorTask + return createFileCollector() .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, contextMap) .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); @@ -236,9 +231,8 @@ public class ScheduledTasks { return new DMaaPMessageConsumerTask(this.applicationConfiguration); } - FileCollector createFileCollector(FileData fileData) { - return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), - new SftpClient(fileData.fileServerData())); + FileCollector createFileCollector() { + return new FileCollector(applicationConfiguration); } DataRouterPublisher createDataRouterPublisher() { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java index 443ddae7..2c136304 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java @@ -47,16 +47,15 @@ import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoE */ @ExtendWith({MockitoExtension.class}) class AppConfigTest { - + private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json"; private static final boolean CORRECT_JSON = true; private static final boolean INCORRECT_JSON = false; private static AppConfig appConfigUnderTest; - - private static String filePath = Objects - .requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile(); + private static String filePath = + Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile(); @BeforeEach public void setUp() { @@ -70,7 +69,7 @@ class AppConfigTest { // Then verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(0)).initFileStreamReader(); + verify(appConfigUnderTest, times(0)).loadConfigurationFromFile(); Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath()); } @@ -82,15 +81,12 @@ class AppConfigTest { // When appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).getInputStream(any()); - appConfigUnderTest.initFileStreamReader(); - appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration(); - appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration(); - appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); + doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + appConfigUnderTest.loadConfigurationFromFile(); // Then verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(1)).initFileStreamReader(); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration()); Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(), @@ -98,7 +94,6 @@ class AppConfigTest { Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(), appConfigUnderTest.getDmaapConsumerConfiguration()); Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration()); - } @Test @@ -108,11 +103,11 @@ class AppConfigTest { appConfigUnderTest.setFilepath(filePath); // When - appConfigUnderTest.initFileStreamReader(); + appConfigUnderTest.loadConfigurationFromFile(); // Then verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(1)).initFileStreamReader(); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); @@ -127,15 +122,15 @@ class AppConfigTest { // When appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).getInputStream(any()); - appConfigUnderTest.initFileStreamReader(); + doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + appConfigUnderTest.loadConfigurationFromFile(); // Then verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(1)).initFileStreamReader(); - Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); + Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertNotNull(appConfigUnderTest.getFtpesConfiguration()); + Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } @@ -147,18 +142,15 @@ class AppConfigTest { new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); // When appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).getInputStream(any()); + doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); JsonElement jsonElement = mock(JsonElement.class); when(jsonElement.isJsonObject()).thenReturn(false); doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class)); - appConfigUnderTest.initFileStreamReader(); - appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration(); - appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration(); - appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); + appConfigUnderTest.loadConfigurationFromFile(); // Then verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(1)).initFileStreamReader(); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index b5d3c159..c266d50e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -16,16 +16,21 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; + import java.nio.file.Path; import java.time.Duration; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -40,13 +45,14 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; + import reactor.test.StepVerifier; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> * */ -public class XnfCollectorTaskImplTest { +public class FileCollectorTest { private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; private static final String LAST_EPOCH_MICROSEC = "8745745764578"; @@ -70,7 +76,7 @@ public class XnfCollectorTaskImplTest { private static final String FTPES_LOCATION_NO_PORT = FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION; private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; - private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION; + private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION; private static final String GZIP_COMPRESSION = "gzip"; private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; @@ -104,7 +110,7 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private FileData createFileData(String location) { + private FileData createFileData(String location, Scheme scheme) { // @formatter:off return ImmutableFileData.builder() .name(PM_FILE_NAME) @@ -112,7 +118,7 @@ public class XnfCollectorTaskImplTest { .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) - .scheme(Scheme.FTPS) + .scheme(scheme) .build(); // @formatter:on } @@ -147,10 +153,10 @@ public class XnfCollectorTaskImplTest { @Test public void whenFtpesFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = - new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); - FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); @@ -159,56 +165,43 @@ public class XnfCollectorTaskImplTest { .expectNext(expectedConsumerDmaapModel).verifyComplete(); verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH); - verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD); - verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH); - verify(ftpsClientMock).setTrustedCAPassword(TRUSTED_CA_PASSWORD); + verify(ftpsClientMock, times(1)).close(); + verifyNoMoreInteractions(ftpsClientMock); } @Test public void whenSftpFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = - new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(SFTP_LOCATION_NO_PORT) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .scheme(Scheme.SFTP) - .build(); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any()); - ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(SFTP_LOCATION_NO_PORT) - .internalLocation(LOCAL_FILE_LOCATION.toString()) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - // @formatter:on + FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT); Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + .expectNext(expectedConsumerDmaapModel) // + .verifyComplete(); + + // The same again, but with port + fileData = createFileData(SFTP_LOCATION, Scheme.SFTP); + expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION); + StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) - .expectNext(expectedConsumerDmaapModel).verifyComplete(); + .expectNext(expectedConsumerDmaapModel) // + .verifyComplete(); - verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verify(sftpClientMock, times(2)).close(); verifyNoMoreInteractions(sftpClientMock); } @Test public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { - FileCollector collectorUndetTest = - new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(FTPES_LOCATION); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); + + FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -221,14 +214,14 @@ public class XnfCollectorTaskImplTest { @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = - new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); Map<String, String> contextMap = new HashMap<>(); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 8a572be4..8c4b3891 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -82,7 +82,7 @@ public class ScheduledTasksTest { doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); doReturn(consumerMock).when(testedObject).createConsumerTask(); - doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull()); + doReturn(fileCollectorMock).when(testedObject).createFileCollector(); doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher(); } diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 442b766b..0eaa7a17 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -25,10 +25,6 @@ public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; - public DatafileTaskException(Exception e) { - super(e); - } - public DatafileTaskException(String message) { super(message); } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java index bedae43a..bca7dfd2 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java @@ -23,7 +23,6 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@FunctionalInterface -public interface FileCollectClient { +public interface FileCollectClient extends AutoCloseable { public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException; } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index c3b7990f..1bf3ec5a 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -22,26 +22,25 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; -import java.nio.file.Paths; +import java.security.GeneralSecurityException; +import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; import java.util.Optional; +import javax.net.ssl.KeyManager; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; +import org.apache.commons.net.util.KeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; -import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils; -import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException; -import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore; -import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory; -import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper; -import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper; -import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.io.FileSystemResource; /** * Gets file from xNF with FTPS protocol. @@ -50,139 +49,40 @@ import org.slf4j.LoggerFactory; */ public class FtpsClient implements FileCollectClient { private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); - private String keyCertPath; - private String keyCertPassword; - private Path trustedCAPath; - private String trustedCAPassword; - - private FTPSClient realFtpsClient = new FTPSClient(); - private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper(); - private IKeyStore keyStore; - private ITrustManagerFactory trustManagerFactory; - private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper(); - private boolean keyManagerSet = false; - private boolean trustManagerSet = false; + FTPSClient realFtpsClient = new FTPSClient(); private final FileServerData fileServerData; - + private static TrustManager theTrustManager = null; public FtpsClient(FileServerData fileServerData) { this.fileServerData = fileServerData; } - @Override - public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { - logger.trace("collectFile called"); - + public void open(String keyCertPath, String keyCertPassword, Path trustedCAPath, String trustedCAPassword) + throws DatafileTaskException { try { realFtpsClient.setNeedClientAuth(true); - setUpKeyManager(realFtpsClient); - setUpTrustedCA(realFtpsClient); - setUpConnection(realFtpsClient); - getFileFromxNF(realFtpsClient, remoteFile, localFile); - } catch (IOException e) { - logger.trace("", e); + realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword)); + realFtpsClient.setTrustManager(getTrustManager(trustedCAPath, trustedCAPassword)); + setUpConnection(); + } catch (DatafileTaskException e) { + throw e; + } catch (Exception e) { throw new DatafileTaskException("Could not open connection: ", e); - } catch (KeyManagerException e) { - logger.trace("", e); - throw new DatafileTaskException(e); - } finally { - closeDownConnection(realFtpsClient); - } - logger.trace("collectFile fetched: {}", localFile); - } - - private void setUpKeyManager(FTPSClient ftps) throws KeyManagerException { - if (keyManagerSet) { - logger.trace("keyManager already set!"); - } else { - keyManagerUtils.setCredentials(keyCertPath, keyCertPassword); - ftps.setKeyManager(keyManagerUtils.getClientKeyManager()); - keyManagerSet = true; - } - logger.trace("complete setUpKeyManager"); - } - - private void setUpTrustedCA(FTPSClient ftps) throws DatafileTaskException { - if (trustManagerSet) { - logger.trace("trustManager already set!"); - } else { - try { - fileSystemResource.setPath(trustedCAPath); - InputStream fis = fileSystemResource.getInputStream(); - IKeyStore ks = getKeyStore(); - ks.load(fis, trustedCAPassword.toCharArray()); - fis.close(); - ITrustManagerFactory tmf = getTrustManagerFactory(); - tmf.init(ks.getKeyStore()); - ftps.setTrustManager(tmf.getTrustManagers()[0]); - trustManagerSet = true; - } catch (Exception e) { - throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e); - } } - logger.trace("complete setUpTrustedCA"); } - private int getPort(Optional<Integer> port) { - final int FTPS_DEFAULT_PORT = 21; - return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; - } - - private void setUpConnection(FTPSClient ftps) throws DatafileTaskException, IOException { - if (!ftps.isConnected()) { - ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port())); - logger.trace("after ftp connect"); - - if (!ftps.login(fileServerData.userId(), fileServerData.password())) { - throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress()); - } - - if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) { - ftps.enterLocalPassiveMode(); - ftps.setFileType(FTP.BINARY_FILE_TYPE); - // Set protection buffer size - ftps.execPBSZ(0); - // Set data channel protection to private - ftps.execPROT("P"); - ftps.setBufferSize(1024 * 1024); - } else { - throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress() - + " xNF reply code: " + ftps.getReplyCode()); - } - } - logger.trace("setUpConnection successfully!"); - } - - private void getFileFromxNF(FTPSClient ftps, String remoteFileName, Path localFileName) - throws IOException { - logger.trace("starting to getFile"); - - File localFile = localFileName.toFile(); - if (localFile.createNewFile()) { - logger.warn("Local file {} already created", localFileName); - } - OutputStream output = new FileOutputStream(localFile); - logger.trace("begin to retrieve from xNF."); - if (!ftps.retrieveFile(remoteFileName, output)) { - throw new IOException("Could not retrieve file"); - } - logger.trace("end retrieve from xNF."); - output.close(); - logger.debug("File {} Download Successfull from xNF", localFileName); - } - - - private void closeDownConnection(FTPSClient ftps) { + @Override + public void close() { logger.trace("starting to closeDownConnection"); - if (ftps != null && ftps.isConnected()) { + if (realFtpsClient.isConnected()) { try { - boolean logOut = ftps.logout(); + boolean logOut = realFtpsClient.logout(); logger.trace("logOut: {}", logOut); } catch (Exception e) { logger.trace("Unable to logout connection.", e); } try { - ftps.disconnect(); + realFtpsClient.disconnect(); logger.trace("disconnected!"); } catch (Exception e) { logger.trace("Unable to disconnect connection.", e); @@ -190,54 +90,89 @@ public class FtpsClient implements FileCollectClient { } } - public void setKeyCertPath(String keyCertPath) { - this.keyCertPath = keyCertPath; - } + @Override + public void collectFile(String remoteFileName, Path localFileName) throws DatafileTaskException { + logger.trace("collectFile called"); - public void setKeyCertPassword(String keyCertPassword) { - this.keyCertPassword = keyCertPassword; + try (OutputStream output = createOutputStream(localFileName)) { + logger.trace("begin to retrieve from xNF."); + if (!realFtpsClient.retrieveFile(remoteFileName, output)) { + throw new DatafileTaskException("Could not retrieve file " + remoteFileName); + } + } catch (IOException e) { + throw new DatafileTaskException("Could not fetch file: ", e); + } + logger.trace("collectFile fetched: {}", localFileName); } - public void setTrustedCAPath(String trustedCAPath) { - this.trustedCAPath = Paths.get(trustedCAPath); + private int getPort(Optional<Integer> port) { + final int FTPS_DEFAULT_PORT = 21; + return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } - public void setTrustedCAPassword(String trustedCAPassword) { - this.trustedCAPassword = trustedCAPassword; - } + private void setUpConnection() throws DatafileTaskException, IOException { - private ITrustManagerFactory getTrustManagerFactory() throws NoSuchAlgorithmException { - if (trustManagerFactory == null) { - trustManagerFactory = new TrustManagerFactoryWrapper(); - } - return trustManagerFactory; - } + realFtpsClient.connect(fileServerData.serverAddress(), getPort(fileServerData.port())); + logger.trace("after ftp connect"); - private IKeyStore getKeyStore() throws KeyStoreException { - if (keyStore == null) { - keyStore = new KeyStoreWrapper(); + if (!realFtpsClient.login(fileServerData.userId(), fileServerData.password())) { + throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress()); } - return keyStore; - } + if (FTPReply.isPositiveCompletion(realFtpsClient.getReplyCode())) { + realFtpsClient.enterLocalPassiveMode(); + realFtpsClient.setFileType(FTP.BINARY_FILE_TYPE); + // Set protection buffer size + realFtpsClient.execPBSZ(0); + // Set data channel protection to private + realFtpsClient.execPROT("P"); + realFtpsClient.setBufferSize(1024 * 1024); + } else { + throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress() + + " xNF reply code: " + realFtpsClient.getReplyCode()); + } - void setFtpsClient(FTPSClient ftpsClient) { - this.realFtpsClient = ftpsClient; + logger.trace("setUpConnection successfully!"); } - void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) { - this.keyManagerUtils = keyManagerUtils; + InputStream createInputStream(Path localFileName) throws IOException { + FileSystemResource realResource = new FileSystemResource(localFileName); + return realResource.getInputStream(); } - void setKeyStore(IKeyStore keyStore) { - this.keyStore = keyStore; + OutputStream createOutputStream(Path localFileName) throws IOException { + File localFile = localFileName.toFile(); + if (localFile.createNewFile()) { + logger.warn("Local file {} already created", localFileName); + } + OutputStream output = new FileOutputStream(localFile); + logger.debug("File {} opened xNF", localFileName); + return output; + } + + private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword) + throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException { + try (InputStream fis = createInputStream(trustedCAPath)) { + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(fis, trustedCAPassword.toCharArray()); + TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509"); + factory.init(keyStore); + return factory.getTrustManagers()[0]; + } } - void setTrustManagerFactory(ITrustManagerFactory tmf) { - trustManagerFactory = tmf; + TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword) + throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException { + synchronized (FtpsClient.class) { + if (theTrustManager == null) { + theTrustManager = createTrustManager(trustedCAPath, trustedCAPassword); + } + return theTrustManager; + } } - void setFileSystemResource(IFileSystemResource fileSystemResource) { - this.fileSystemResource = fileSystemResource; + KeyManager createKeyManager(String keyCertPath, String keyCertPassword) + throws IOException, GeneralSecurityException { + return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword); } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 0c6491b8..2f489166 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; public class SftpClient implements FileCollectClient { private static final Logger logger = LoggerFactory.getLogger(SftpClient.class); private final FileServerData fileServerData; + private Session session = null; + private ChannelSftp sftpChannel = null; public SftpClient(FileServerData fileServerData) { this.fileServerData = fileServerData; @@ -48,18 +50,37 @@ public class SftpClient implements FileCollectClient { logger.trace("collectFile called"); try { - Session session = setUpSession(fileServerData); - ChannelSftp sftpChannel = getChannel(session); sftpChannel.get(remoteFile, localFile.toString()); logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); - sftpChannel.exit(); - session.disconnect(); } catch (Exception e) { - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e); + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); } logger.trace("collectFile OK"); + } + + @Override + public void close() { + logger.trace("close"); + if (sftpChannel != null) { + sftpChannel.exit(); + sftpChannel = null; + } + if (session != null) { + session.disconnect(); + session = null; + } + } + public void open() throws DatafileTaskException { + try { + if (session == null) { + session = setUpSession(fileServerData); + sftpChannel = getChannel(session); + } + } catch (JSchException e) { + throw new DatafileTaskException("Could not open Sftp client", e); + } } private int getPort(Optional<Integer> port) { @@ -70,12 +91,12 @@ public class SftpClient implements FileCollectClient { private Session setUpSession(FileServerData fileServerData) throws JSchException { JSch jsch = new JSch(); - Session session = - jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port())); - session.setConfig("StrictHostKeyChecking", "no"); - session.setPassword(fileServerData.password()); - session.connect(); - return session; + Session newSession = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), + getPort(fileServerData.port())); + newSession.setConfig("StrictHostKeyChecking", "no"); + newSession.setPassword(fileServerData.password()); + newSession.connect(); + return newSession; } private ChannelSftp getChannel(Session session) throws JSchException { diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index f80fcd0f..9304688f 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -19,8 +19,10 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; + import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -33,7 +35,9 @@ import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.UUID; import java.util.concurrent.Future; + import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; @@ -58,6 +62,7 @@ import org.slf4j.MarkerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; + import reactor.core.publisher.Mono; /** @@ -85,7 +90,6 @@ public class DmaapProducerReactiveHttpClient { private final String pwd; private IFileSystemResource fileResource = new FileSystemResourceWrapper(); - private CloseableHttpAsyncClient webClient; /** * Constructor DmaapProducerReactiveHttpClient. @@ -111,10 +115,7 @@ public class DmaapProducerReactiveHttpClient { public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); - try { - webClient = getWebClient(); - webClient.start(); + try (CloseableHttpAsyncClient webClient = createWebClient()) { HttpPut put = new HttpPut(); prepareHead(consumerDmaapModel, put); @@ -124,8 +125,7 @@ public class DmaapProducerReactiveHttpClient { logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation()); Future<HttpResponse> future = webClient.execute(put, null); HttpResponse response = future.get(); - logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString()); - webClient.close(); + logger.trace(INVOKE_RETURN, "Response from DR {}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); @@ -175,25 +175,20 @@ public class DmaapProducerReactiveHttpClient { fileResource = fileSystemResource; } - protected CloseableHttpAsyncClient getWebClient() + protected CloseableHttpAsyncClient createWebClient() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { - if (webClient != null) { - return webClient; - } - SSLContext sslContext = null; - sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); + SSLContext sslContext = new SSLContextBuilder() // + .loadTrustMaterial(null, (certificate, authType) -> true) // + .build(); - //@formatter:off - return HttpAsyncClients.custom() - .setSSLContext(sslContext) - .setSSLHostnameVerifier(new NoopHostnameVerifier()) - .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) + CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() // + .setSSLContext(sslContext) // + .setSSLHostnameVerifier(new NoopHostnameVerifier()) // + .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) // .build(); - //@formatter:on + webClient.start(); + return webClient; } - protected void setWebClient(CloseableHttpAsyncClient client) { - this.webClient = client; - } }
\ No newline at end of file diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/IKeyManagerUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/IKeyManagerUtils.java deleted file mode 100644 index 8c4525e7..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/IKeyManagerUtils.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ssl; - -import javax.net.ssl.KeyManager; - -public interface IKeyManagerUtils { - public void setCredentials(String keyStorePath, String keyStorePass) throws KeyManagerException; - - public KeyManager getClientKeyManager(); - - public static class KeyManagerException extends Exception { - private static final long serialVersionUID = 1L; - - public KeyManagerException(Exception e) { - super(e); - } - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/IKeyStore.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/IKeyStore.java deleted file mode 100644 index 0e54cecf..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/IKeyStore.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ssl; - -import java.io.InputStream; -import java.security.KeyStore; - -public interface IKeyStore { - public void load(InputStream arg0, char[] arg1) throws KeyStoreLoadException; - - public KeyStore getKeyStore(); - - public static class KeyStoreLoadException extends Exception { - private static final long serialVersionUID = 1L; - - public KeyStoreLoadException(Exception e) { - super(e); - } - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/ITrustManagerFactory.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/ITrustManagerFactory.java deleted file mode 100644 index 99e3de1f..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/ITrustManagerFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ssl; - -import java.security.KeyStore; -import java.security.KeyStoreException; - -import javax.net.ssl.TrustManager; - -public interface ITrustManagerFactory { - public void init(KeyStore ks) throws KeyStoreException; - - public TrustManager[] getTrustManagers(); -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/KeyManagerUtilsWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/KeyManagerUtilsWrapper.java deleted file mode 100644 index 93a7a2fb..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/KeyManagerUtilsWrapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ssl; - -import java.io.File; -import java.io.IOException; -import java.security.GeneralSecurityException; - -import javax.net.ssl.KeyManager; - -import org.apache.commons.net.util.KeyManagerUtils; - -public class KeyManagerUtilsWrapper implements IKeyManagerUtils { - private KeyManager keyManager; - - @Override - public void setCredentials(String keyStorePath, String keyStorePass) throws KeyManagerException { - try { - keyManager = KeyManagerUtils.createClientKeyManager(new File(keyStorePath), keyStorePass); - } catch (IOException | GeneralSecurityException e) { - throw new KeyManagerException(e); - } - } - - @Override - public KeyManager getClientKeyManager() { - return keyManager; - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/KeyStoreWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/KeyStoreWrapper.java deleted file mode 100644 index a8eebea7..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/KeyStoreWrapper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ssl; - -import java.io.IOException; -import java.io.InputStream; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; - -public class KeyStoreWrapper implements IKeyStore { - private KeyStore keyStore; - - public KeyStoreWrapper() throws KeyStoreException { - keyStore = KeyStore.getInstance("JKS"); - } - - @Override - public void load(InputStream stream, char[] password) - throws KeyStoreLoadException { - try { - keyStore.load(stream, password); - } catch (NoSuchAlgorithmException | CertificateException | IOException e) { - throw new KeyStoreLoadException(e); - } - } - - @Override - public KeyStore getKeyStore() { - return keyStore; - } - -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/TrustManagerFactoryWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/TrustManagerFactoryWrapper.java deleted file mode 100644 index f539634d..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ssl/TrustManagerFactoryWrapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ssl; - -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; - -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; - -public class TrustManagerFactoryWrapper implements ITrustManagerFactory { - TrustManagerFactory tmf; - - public TrustManagerFactoryWrapper() throws NoSuchAlgorithmException { - tmf = TrustManagerFactory.getInstance("SunX509"); - } - - @Override - public void init(KeyStore ks) throws KeyStoreException { - tmf.init(ks); - } - - @Override - public TrustManager[] getTrustManagers() { - return tmf.getTrustManagers(); - } - -} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index 670b1bdc..e2882606 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -17,8 +17,11 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -29,23 +32,15 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; import java.nio.file.Paths; -import java.security.GeneralSecurityException; -import java.security.KeyStore; -import java.security.KeyStoreException; import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; import org.apache.commons.net.ftp.FTP; -import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; -import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils; -import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore; -import org.onap.dcaegen2.collectors.datafile.ssl.ITrustManagerFactory; import org.springframework.http.HttpStatus; public class FtpsClientTest { @@ -62,18 +57,14 @@ public class FtpsClientTest { private static final String USERNAME = "bob"; private static final String PASSWORD = "123"; + private FTPSClient ftpsClientMock = mock(FTPSClient.class); - private IKeyManagerUtils keyManagerUtilsMock = mock(IKeyManagerUtils.class); private KeyManager keyManagerMock = mock(KeyManager.class); - private IKeyStore keyStoreWrapperMock = mock(IKeyStore.class); - private KeyStore keyStoreMock = mock(KeyStore.class); - private ITrustManagerFactory trustManagerFactoryMock = mock(ITrustManagerFactory.class); private TrustManager trustManagerMock = mock(TrustManager.class); - private IFileSystemResource fileResourceMock = mock(IFileSystemResource.class); private InputStream inputStreamMock = mock(InputStream.class); + private OutputStream outputStreamMock = mock(OutputStream.class); - FtpsClient clientUnderTest = new FtpsClient(createFileServerData()); - + FtpsClient clientUnderTestSpy; private ImmutableFileServerData createFileServerData() { return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD) @@ -82,40 +73,17 @@ public class FtpsClientTest { @BeforeEach protected void setUp() throws Exception { - clientUnderTest.setFtpsClient(ftpsClientMock); - clientUnderTest.setKeyManagerUtils(keyManagerUtilsMock); - clientUnderTest.setKeyStore(keyStoreWrapperMock); - clientUnderTest.setTrustManagerFactory(trustManagerFactoryMock); - clientUnderTest.setFileSystemResource(fileResourceMock); - - clientUnderTest.setKeyCertPath(FTP_KEY_PATH); - clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD); - clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH.toString()); - clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD); + clientUnderTestSpy = spy(new FtpsClient(createFileServerData())); + clientUnderTestSpy.realFtpsClient = ftpsClientMock; } - @Test - public void collectFile_allOk() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); - when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); - when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); - when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value()); - - when(ftpsClientMock.isConnected()).thenReturn(false, true); + private void verifyFtpsClientMock_openOK() throws Exception { + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + when(ftpsClientMock.retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), ArgumentMatchers.any(OutputStream.class))).thenReturn(true); - - clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH); - verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); verify(ftpsClientMock).setTrustManager(trustManagerMock); verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); verify(ftpsClientMock).login(USERNAME, PASSWORD); @@ -125,59 +93,85 @@ public class FtpsClientTest { verify(ftpsClientMock).execPROT("P"); verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); verify(ftpsClientMock).setBufferSize(1024 * 1024); - verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), - ArgumentMatchers.any(OutputStream.class)); + } + + @Test + public void collectFile_allOk() throws Exception { + + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD); + doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode(); + + clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); + + doReturn(true).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock); + clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH); + + doReturn(true).when(ftpsClientMock).isConnected(); + clientUnderTestSpy.close(); + + verifyFtpsClientMock_openOK(); + verify(ftpsClientMock, times(1)).isConnected(); verify(ftpsClientMock, times(1)).logout(); verify(ftpsClientMock, times(1)).disconnect(); - verify(ftpsClientMock, times(2)).isConnected(); + verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any()); verifyNoMoreInteractions(ftpsClientMock); } @Test public void collectFileFaultyOwnKey_shouldFail() throws Exception { - doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock) - .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); - when(ftpsClientMock.isConnected()).thenReturn(false); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)).hasMessage( - "org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException"); + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + assertThatThrownBy(() -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, + TRUSTED_CA_PASSWORD)).hasMessageContaining( + "Could not open connection: java.io.FileNotFoundException:"); verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); + + doReturn(false).when(ftpsClientMock).isConnected(); + clientUnderTestSpy.close(); verify(ftpsClientMock).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } @Test - public void collectFileFaultTrustedCA_shouldFail() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); + public void collectFileFaultTrustedCA_shouldFail_no_trustedCA_file() throws Exception { + + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doThrow(new IOException("problem")).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH); + + assertThatThrownBy( + () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD)) + .hasMessage("Could not open connection: java.io.IOException: problem"); + } + + @Test + public void collectFileFaultTrustedCA_shouldFail_empty_trustedCA_file() throws Exception { - doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock); + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doReturn(inputStreamMock).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException"); + assertThatThrownBy( + () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD)) + .hasMessage("Could not open connection: java.io.EOFException"); } @Test public void collectFileFaultyLogin_shouldFail() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); - when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); - when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(false); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Unable to log in to xNF. 127.0.0.1"); + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + doReturn(false).when(ftpsClientMock).login(USERNAME, PASSWORD); + + assertThatThrownBy( + () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD)) + .hasMessage("Unable to log in to xNF. 127.0.0.1"); verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); verify(ftpsClientMock).setTrustManager(trustManagerMock); verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); verify(ftpsClientMock).login(USERNAME, PASSWORD); @@ -185,105 +179,61 @@ public class FtpsClientTest { @Test public void collectFileBadRequestResponse_shouldFail() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); - when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); - when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); - when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE); + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD); + doReturn(503).when(ftpsClientMock).getReplyCode(); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503"); + assertThatThrownBy( + () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD)) + .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503"); verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); verify(ftpsClientMock).setTrustManager(trustManagerMock); verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); verify(ftpsClientMock).login(USERNAME, PASSWORD); verify(ftpsClientMock, times(2)).getReplyCode(); - verify(ftpsClientMock, times(2)).isConnected(); verifyNoMoreInteractions(ftpsClientMock); } @Test - public void collectFileFaultyConnection_shouldFail() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); - when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); + public void collectFile_shouldFail() throws Exception { + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD); + doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode(); + clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); - doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT); + doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not open connection: java.io.IOException"); + assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Could not retrieve file /dir/sample.txt"); - verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); - verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); - verify(ftpsClientMock).setTrustManager(trustManagerMock); - verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); - verify(ftpsClientMock, times(2)).isConnected(); + verifyFtpsClientMock_openOK(); + verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any()); verifyNoMoreInteractions(ftpsClientMock); } @Test - public void collectFileFailingFileCollect_shouldFail() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); - when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); - when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); - when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value()); - - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, Paths.get(""))) - .hasMessage("Could not open connection: java.io.IOException: No such file or directory"); - - } - - @Test - public void collectFileFailingFileRetrieve_shouldFail() throws Exception { - when(keyManagerUtilsMock.getClientKeyManager()).thenReturn(keyManagerMock); - when(fileResourceMock.getInputStream()).thenReturn(inputStreamMock); - when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock); - when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock}); - when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true); - when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.OK.value()); + public void collectFile_shouldFail_ioexception() throws Exception { + doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD); + doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); + doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH); + doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD); + doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode(); + clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD); when(ftpsClientMock.isConnected()).thenReturn(false); - doThrow(new IOException("problemas")).when(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), - ArgumentMatchers.any(OutputStream.class)); + doThrow(new IOException("problem")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock); - assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not open connection: java.io.IOException: problemas"); + assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) + .hasMessage("Could not fetch file: java.io.IOException: problem"); - verify(ftpsClientMock).setNeedClientAuth(true); - verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD); - verify(ftpsClientMock).setKeyManager(keyManagerMock); - verify(fileResourceMock).setPath(TRUSTED_CA_PATH); - verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray()); - verify(inputStreamMock, times(1)).close(); - verify(trustManagerFactoryMock).init(keyStoreMock); - verify(ftpsClientMock).setTrustManager(trustManagerMock); - verify(ftpsClientMock).connect(XNF_ADDRESS, PORT); - verify(ftpsClientMock).login(USERNAME, PASSWORD); - verify(ftpsClientMock).getReplyCode(); - verify(ftpsClientMock, times(1)).enterLocalPassiveMode(); - verify(ftpsClientMock).execPBSZ(0); - verify(ftpsClientMock).execPROT("P"); - verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE); - verify(ftpsClientMock).setBufferSize(1024 * 1024); - verify(ftpsClientMock).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), - ArgumentMatchers.any(OutputStream.class)); - verify(ftpsClientMock, times(2)).isConnected(); + verifyFtpsClientMock_openOK(); + verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any()); verifyNoMoreInteractions(ftpsClientMock); } } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index 90fb9336..85a0c090 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.io.IOUtils.toByteArray; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertTrue; import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; import com.jcraft.jsch.ChannelSftp; @@ -56,45 +55,45 @@ public class SftpClientTest { throws DatafileTaskException, IOException, JSchException, SftpException, Exception { FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build(); - SftpClient sftpClient = new SftpClient(expectedFileServerData); - sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); - byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE); - - sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE); - byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath()); - assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT); - assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT); + try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) { + sftpClient.open(); + sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); + byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE); + + sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE); + byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath()); + assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT); + assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT); + } } @Test - public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException { - FileServerData expectedFileServerData = - ImmutableFileServerData.builder().serverAddress("127.0.0.1").userId("Wrong").password(PASSWORD).build(); - SftpClient sftpClient = new SftpClient(expectedFileServerData); - sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); + public void collectFile_withWrongUserName_shouldFail() throws DatafileTaskException, IOException { + FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") + .userId("wrong").password(PASSWORD).port(sftpServer.getPort()).build(); + try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) { + + sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); - assertThatThrownBy(() -> sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE)) - .hasMessageContaining("Unable to get file from xNF"); + assertThatThrownBy(() -> sftpClient.open()) + .hasMessageContaining("Could not open Sftp clientcom.jcraft.jsch.JSchException: Auth fail"); + } } @Test - public void collectFile_withWrongFileName_shouldFail() throws IOException, JSchException, SftpException { + public void collectFile_withWrongFileName_shouldFail() + throws IOException, JSchException, SftpException, DatafileTaskException { FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1") .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build(); - SftpClient sftpClient = new SftpClient(expectedFileServerData); - sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); + try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) { + sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8); + sftpClient.open(); - String errorMessage = ""; - try { - sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE); - } catch (Exception e) { - errorMessage = e.getMessage(); + assertThatThrownBy(() -> sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE)).hasMessageStartingWith( + "Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, " + + "userId=bob, password=123, port="); } - - String expectedErrorMessage = "Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, " - + "userId=bob, password=123, port="; - assertTrue(errorMessage.startsWith(expectedErrorMessage)); } private static Session connectToServer(FakeSftpServerRule sftpServer) throws JSchException { diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index 2bbe8e1d..06ff570c 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -17,21 +17,30 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; @@ -50,6 +59,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; + import reactor.test.StepVerifier; /** @@ -80,12 +90,13 @@ class DmaapProducerReactiveHttpClientTest { private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class); private CloseableHttpAsyncClient clientMock; private HttpResponse responseMock = mock(HttpResponse.class); + @SuppressWarnings("unchecked") private Future<HttpResponse> futureMock = mock(Future.class); private StatusLine statusLine = mock(StatusLine.class); private InputStream fileStream; @BeforeEach - void setUp() { + void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST); when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); @@ -111,15 +122,15 @@ class DmaapProducerReactiveHttpClientTest { .build(); //formatter:on - dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock); + dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock)); dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock); clientMock = mock(CloseableHttpAsyncClient.class); - dmaapProducerReactiveHttpClient.setWebClient(clientMock); + doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient(); } @Test void getHttpResponse_Success() throws Exception { - mockWebClientDependantObject(true); + mockWebClientDependantObject(); HttpPut httpPut = new HttpPut(); httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); @@ -151,24 +162,22 @@ class DmaapProducerReactiveHttpClientTest { @Test void getHttpResponse_Fail() throws Exception { - mockWebClientDependantObject(false); - StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, any())) - .expectError() - .verify(); + Map<String, String> contextMap = new HashMap<>(); + doReturn(futureMock).when(clientMock).execute(any(), any()); + doThrow(new InterruptedException()).when(futureMock).get(); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) // + .expectError() // + .verify(); // } - private void mockWebClientDependantObject(boolean success) + private void mockWebClientDependantObject() throws IOException, InterruptedException, ExecutionException { fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream); - if (success) { - when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); - when(futureMock.get()).thenReturn(responseMock); - when(responseMock.getStatusLine()).thenReturn(statusLine); - when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK); - } else { - when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); - when(futureMock.get()).thenThrow(new InterruptedException()); - } + when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock); + when(futureMock.get()).thenReturn(responseMock); + when(responseMock.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK); + } } |