aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-03-14 14:27:30 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-14 14:27:30 +0000
commitad4a3a514bd943df22a2e27d78f0706d412ebe9f (patch)
treee13648b6efbb335bf8e4f5f7a853615f8626d30e /datafile-app-server/src
parenta89e09eecabd035f1c227b1ae3f5fa59eff36be4 (diff)
Thread safety issues
The TrustManager is now loaded and initialized once in a thread safe way (instead of each time it is used). Removed some unneeded wrappers. Using AutoCloseable for FTP clients to make sure they are closed in case of exceptions. Made AppConfig thread safe. Change-Id: Ia6a2c8a76bf960013180fdd7c53ae0ff17b26505 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java101
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java18
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java42
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java)83
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java2
7 files changed, 145 insertions, 158 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 40de33dd..82c390f7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -2,39 +2,44 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ServiceLoader;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import java.io.*;
-import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -55,49 +60,53 @@ public class AppConfig {
private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
-
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
-
- FtpesConfig ftpesConfig;
+ private DmaapConsumerConfiguration dmaapConsumerConfiguration;
+ private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private FtpesConfig ftpesConfiguration;
@NotEmpty
private String filepath;
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
- public FtpesConfig getFtpesConfiguration() {
- return ftpesConfig;
+ public synchronized FtpesConfig getFtpesConfiguration() {
+ return ftpesConfiguration;
}
- public void initFileStreamReader() {
+ public void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
JsonObject jsonObject;
- try (InputStream inputStream = getInputStream(filepath)) {
+ try (InputStream inputStream = createInputStream(filepath)) {
JsonElement rootElement = getJsonElement(parser, inputStream);
if (rootElement.isJsonObject()) {
jsonObject = rootElement.getAsJsonObject();
- ftpesConfig = deserializeType(gsonBuilder,
+ FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
FtpesConfig.class);
- dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_CONSUMER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapConsumerConfiguration.class);
- dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_PRODUCER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapPublisherConfiguration.class);
+
+ setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
} catch (IOException e) {
logger.error("Problem with file loading, file: {}", filepath, e);
@@ -106,30 +115,36 @@ public class AppConfig {
}
}
+ synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
+ DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.dmaapPublisherConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
+
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
+ @NotNull Class<T> type) {
return gsonBuilder.create().fromJson(jsonObject, type);
}
- InputStream getInputStream(@NotNull String filepath) throws IOException {
+ InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- String getFilepath() {
+ synchronized String getFilepath() {
return this.filepath;
}
- public void setFilepath(String filepath) {
+ public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet()
- .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
+ source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
return target;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 208e8a43..0254597e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -56,7 +56,7 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
@@ -80,7 +80,7 @@ public class CloudConfiguration extends AppConfig {
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
- private void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized void parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
@@ -89,17 +89,17 @@ public class CloudConfiguration extends AppConfig {
}
@Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
}
@Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
}
@Override
- public FtpesConfig getFtpesConfiguration() {
+ public synchronized FtpesConfig getFtpesConfiguration() {
return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index a0020318..af4670e3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,22 +41,16 @@ import reactor.core.publisher.Mono;
public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
- private AppConfig datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
+ private final AppConfig datafileAppConfig;
-
- public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsClient;
- this.sftpClient = sftpClient;
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
Duration firstBackoffTimeout, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
@@ -65,18 +60,6 @@ public class FileCollector {
//@formatter:on
}
- private FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
@@ -85,11 +68,8 @@ public class FileCollector {
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
- try {
+ try (FileCollectClient currentClient = createClient(fileData)) {
localFile.getParent().toFile().mkdir(); // Create parent directories
-
- FileCollectClient currentClient = selectClient(fileData);
-
currentClient.collectFile(remoteFile, localFile);
return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
} catch (Exception throwable) {
@@ -98,12 +78,12 @@ public class FileCollector {
}
}
- private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
switch (fileData.scheme()) {
case SFTP:
- return sftpClient;
+ return createSftpClient(fileData);
case FTPS:
- return ftpsClient;
+ return createFtpsClient(fileData);
default:
throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
}
@@ -129,4 +109,17 @@ public class FileCollector {
.build();
// @formatter:on
}
+
+ SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
+ SftpClient client = new SftpClient(fileData.fileServerData());
+ client.open();
+ return client;
+ }
+
+ FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
+ FtpsClient client = new FtpsClient(fileData.fileServerData());
+ client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
+ return client;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 89ebde8f..28963377 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -57,12 +55,10 @@ public class ScheduledTasks {
/** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask;
final MessageMetaData metaData;
- FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ FileCollectionData(FileData fd, MessageMetaData metaData) {
this.fileData = fd;
- this.collectorTask = collectorTask;
this.metaData = metaData;
}
}
@@ -90,7 +86,7 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
- applicationConfiguration.initFileStreamReader();
+ applicationConfiguration.loadConfigurationFromFile();
createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
() -> onComplete(contextMap));
}
@@ -143,8 +139,7 @@ public class ScheduledTasks {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- fileCollects.add(
- new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
+ fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -159,7 +154,7 @@ public class ScheduledTasks {
final Duration initialRetryTimeout = Duration.ofSeconds(5);
MdcVariables.setMdcContextMap(contextMap);
- return fileCollect.collectorTask
+ return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
@@ -236,9 +231,8 @@ public class ScheduledTasks {
return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- FileCollector createFileCollector(FileData fileData) {
- return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
+ FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
}
DataRouterPublisher createDataRouterPublisher() {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 443ddae7..2c136304 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -47,16 +47,15 @@ import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoE
*/
@ExtendWith({MockitoExtension.class})
class AppConfigTest {
-
+
private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
private static final boolean CORRECT_JSON = true;
private static final boolean INCORRECT_JSON = false;
private static AppConfig appConfigUnderTest;
-
- private static String filePath = Objects
- .requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
+ private static String filePath =
+ Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
@@ -70,7 +69,7 @@ class AppConfigTest {
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(0)).initFileStreamReader();
+ verify(appConfigUnderTest, times(0)).loadConfigurationFromFile();
Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
}
@@ -82,15 +81,12 @@ class AppConfigTest {
// When
appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
- appConfigUnderTest.initFileStreamReader();
- appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration();
- appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration();
- appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
@@ -98,7 +94,6 @@ class AppConfigTest {
Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
-
}
@Test
@@ -108,11 +103,11 @@ class AppConfigTest {
appConfigUnderTest.setFilepath(filePath);
// When
- appConfigUnderTest.initFileStreamReader();
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
@@ -127,15 +122,15 @@ class AppConfigTest {
// When
appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
- appConfigUnderTest.initFileStreamReader();
+ doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
- Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+ Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertNotNull(appConfigUnderTest.getFtpesConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@@ -147,18 +142,15 @@ class AppConfigTest {
new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
// When
appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
+ doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
- appConfigUnderTest.initFileStreamReader();
- appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration();
- appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration();
- appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index b5d3c159..c266d50e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -16,16 +16,21 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,13 +45,14 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+
import reactor.test.StepVerifier;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*
*/
-public class XnfCollectorTaskImplTest {
+public class FileCollectorTest {
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -70,7 +76,7 @@ public class XnfCollectorTaskImplTest {
private static final String FTPES_LOCATION_NO_PORT =
FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
- private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -104,7 +110,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private FileData createFileData(String location) {
+ private FileData createFileData(String location, Scheme scheme) {
// @formatter:off
return ImmutableFileData.builder()
.name(PM_FILE_NAME)
@@ -112,7 +118,7 @@ public class XnfCollectorTaskImplTest {
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
- .scheme(Scheme.FTPS)
+ .scheme(scheme)
.build();
// @formatter:on
}
@@ -147,10 +153,10 @@ public class XnfCollectorTaskImplTest {
@Test
public void whenFtpesFile_returnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
- FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
@@ -159,56 +165,43 @@ public class XnfCollectorTaskImplTest {
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
- verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
- verify(ftpsClientMock).setTrustedCAPassword(TRUSTED_CA_PASSWORD);
+ verify(ftpsClientMock, times(1)).close();
+
verifyNoMoreInteractions(ftpsClientMock);
}
@Test
public void whenSftpFile_returnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION_NO_PORT)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .scheme(Scheme.SFTP)
- .build();
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION_NO_PORT)
- .internalLocation(LOCAL_FILE_LOCATION.toString())
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
+ FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ .expectNext(expectedConsumerDmaapModel) //
+ .verifyComplete();
+
+ // The same again, but with port
+ fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
+ expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
+
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
- .expectNext(expectedConsumerDmaapModel).verifyComplete();
+ .expectNext(expectedConsumerDmaapModel) //
+ .verifyComplete();
- verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
}
@Test
public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData(FTPES_LOCATION);
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
+
+ FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -221,14 +214,14 @@ public class XnfCollectorTaskImplTest {
@Test
public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8a572be4..8c4b3891 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -82,7 +82,7 @@ public class ScheduledTasksTest {
doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
doReturn(consumerMock).when(testedObject).createConsumerTask();
- doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector();
doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}