aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
authorTony Hansen <tony@att.com>2019-03-18 13:45:01 +0000
committerGerrit Code Review <gerrit@onap.org>2019-03-18 13:45:01 +0000
commit6870154043d73d527cc42aca7ade7e49aa961476 (patch)
tree7682b329ef8bcf1f4e18c170dc9e91d1197d976a /datafile-app-server
parent84e13d376c82b96f5dad949b3155478f8a421545 (diff)
parentad4a3a514bd943df22a2e27d78f0706d412ebe9f (diff)
Merge "Thread safety issues"
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java101
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java18
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java42
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java)83
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java2
7 files changed, 145 insertions, 158 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 40de33dd..82c390f7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -2,39 +2,44 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ServiceLoader;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import java.io.*;
-import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -55,49 +60,53 @@ public class AppConfig {
private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
-
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
-
- FtpesConfig ftpesConfig;
+ private DmaapConsumerConfiguration dmaapConsumerConfiguration;
+ private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private FtpesConfig ftpesConfiguration;
@NotEmpty
private String filepath;
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
- public FtpesConfig getFtpesConfiguration() {
- return ftpesConfig;
+ public synchronized FtpesConfig getFtpesConfiguration() {
+ return ftpesConfiguration;
}
- public void initFileStreamReader() {
+ public void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
JsonObject jsonObject;
- try (InputStream inputStream = getInputStream(filepath)) {
+ try (InputStream inputStream = createInputStream(filepath)) {
JsonElement rootElement = getJsonElement(parser, inputStream);
if (rootElement.isJsonObject()) {
jsonObject = rootElement.getAsJsonObject();
- ftpesConfig = deserializeType(gsonBuilder,
+ FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
FtpesConfig.class);
- dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_CONSUMER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapConsumerConfiguration.class);
- dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_PRODUCER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapPublisherConfiguration.class);
+
+ setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
} catch (IOException e) {
logger.error("Problem with file loading, file: {}", filepath, e);
@@ -106,30 +115,36 @@ public class AppConfig {
}
}
+ synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
+ DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.dmaapPublisherConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
+
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
+ @NotNull Class<T> type) {
return gsonBuilder.create().fromJson(jsonObject, type);
}
- InputStream getInputStream(@NotNull String filepath) throws IOException {
+ InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- String getFilepath() {
+ synchronized String getFilepath() {
return this.filepath;
}
- public void setFilepath(String filepath) {
+ public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet()
- .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
+ source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
return target;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 208e8a43..0254597e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -56,7 +56,7 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
@@ -80,7 +80,7 @@ public class CloudConfiguration extends AppConfig {
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
- private void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized void parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
@@ -89,17 +89,17 @@ public class CloudConfiguration extends AppConfig {
}
@Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
}
@Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
}
@Override
- public FtpesConfig getFtpesConfiguration() {
+ public synchronized FtpesConfig getFtpesConfiguration() {
return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index a0020318..af4670e3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,22 +41,16 @@ import reactor.core.publisher.Mono;
public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
- private AppConfig datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
+ private final AppConfig datafileAppConfig;
-
- public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsClient;
- this.sftpClient = sftpClient;
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
Duration firstBackoffTimeout, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
@@ -65,18 +60,6 @@ public class FileCollector {
//@formatter:on
}
- private FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
@@ -85,11 +68,8 @@ public class FileCollector {
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
- try {
+ try (FileCollectClient currentClient = createClient(fileData)) {
localFile.getParent().toFile().mkdir(); // Create parent directories
-
- FileCollectClient currentClient = selectClient(fileData);
-
currentClient.collectFile(remoteFile, localFile);
return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
} catch (Exception throwable) {
@@ -98,12 +78,12 @@ public class FileCollector {
}
}
- private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
switch (fileData.scheme()) {
case SFTP:
- return sftpClient;
+ return createSftpClient(fileData);
case FTPS:
- return ftpsClient;
+ return createFtpsClient(fileData);
default:
throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
}
@@ -129,4 +109,17 @@ public class FileCollector {
.build();
// @formatter:on
}
+
+ SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
+ SftpClient client = new SftpClient(fileData.fileServerData());
+ client.open();
+ return client;
+ }
+
+ FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
+ FtpsClient client = new FtpsClient(fileData.fileServerData());
+ client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
+ return client;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 89ebde8f..28963377 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -57,12 +55,10 @@ public class ScheduledTasks {
/** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask;
final MessageMetaData metaData;
- FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ FileCollectionData(FileData fd, MessageMetaData metaData) {
this.fileData = fd;
- this.collectorTask = collectorTask;
this.metaData = metaData;
}
}
@@ -90,7 +86,7 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
- applicationConfiguration.initFileStreamReader();
+ applicationConfiguration.loadConfigurationFromFile();
createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
() -> onComplete(contextMap));
}
@@ -143,8 +139,7 @@ public class ScheduledTasks {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- fileCollects.add(
- new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
+ fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -159,7 +154,7 @@ public class ScheduledTasks {
final Duration initialRetryTimeout = Duration.ofSeconds(5);
MdcVariables.setMdcContextMap(contextMap);
- return fileCollect.collectorTask
+ return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
@@ -236,9 +231,8 @@ public class ScheduledTasks {
return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- FileCollector createFileCollector(FileData fileData) {
- return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
+ FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
}
DataRouterPublisher createDataRouterPublisher() {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 443ddae7..2c136304 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -47,16 +47,15 @@ import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoE
*/
@ExtendWith({MockitoExtension.class})
class AppConfigTest {
-
+
private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
private static final boolean CORRECT_JSON = true;
private static final boolean INCORRECT_JSON = false;
private static AppConfig appConfigUnderTest;
-
- private static String filePath = Objects
- .requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
+ private static String filePath =
+ Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
@@ -70,7 +69,7 @@ class AppConfigTest {
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(0)).initFileStreamReader();
+ verify(appConfigUnderTest, times(0)).loadConfigurationFromFile();
Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
}
@@ -82,15 +81,12 @@ class AppConfigTest {
// When
appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
- appConfigUnderTest.initFileStreamReader();
- appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration();
- appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration();
- appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
@@ -98,7 +94,6 @@ class AppConfigTest {
Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
-
}
@Test
@@ -108,11 +103,11 @@ class AppConfigTest {
appConfigUnderTest.setFilepath(filePath);
// When
- appConfigUnderTest.initFileStreamReader();
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
@@ -127,15 +122,15 @@ class AppConfigTest {
// When
appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
- appConfigUnderTest.initFileStreamReader();
+ doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
- Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+ Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertNotNull(appConfigUnderTest.getFtpesConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@@ -147,18 +142,15 @@ class AppConfigTest {
new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
// When
appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
+ doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
- appConfigUnderTest.initFileStreamReader();
- appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration();
- appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration();
- appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index b5d3c159..c266d50e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -16,16 +16,21 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,13 +45,14 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+
import reactor.test.StepVerifier;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*
*/
-public class XnfCollectorTaskImplTest {
+public class FileCollectorTest {
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -70,7 +76,7 @@ public class XnfCollectorTaskImplTest {
private static final String FTPES_LOCATION_NO_PORT =
FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
- private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -104,7 +110,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private FileData createFileData(String location) {
+ private FileData createFileData(String location, Scheme scheme) {
// @formatter:off
return ImmutableFileData.builder()
.name(PM_FILE_NAME)
@@ -112,7 +118,7 @@ public class XnfCollectorTaskImplTest {
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
- .scheme(Scheme.FTPS)
+ .scheme(scheme)
.build();
// @formatter:on
}
@@ -147,10 +153,10 @@ public class XnfCollectorTaskImplTest {
@Test
public void whenFtpesFile_returnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
- FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
@@ -159,56 +165,43 @@ public class XnfCollectorTaskImplTest {
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
- verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
- verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
- verify(ftpsClientMock).setTrustedCAPassword(TRUSTED_CA_PASSWORD);
+ verify(ftpsClientMock, times(1)).close();
+
verifyNoMoreInteractions(ftpsClientMock);
}
@Test
public void whenSftpFile_returnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION_NO_PORT)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .scheme(Scheme.SFTP)
- .build();
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
- ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION_NO_PORT)
- .internalLocation(LOCAL_FILE_LOCATION.toString())
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- // @formatter:on
+ FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
+ StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ .expectNext(expectedConsumerDmaapModel) //
+ .verifyComplete();
+
+ // The same again, but with port
+ fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
+ expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
+
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
- .expectNext(expectedConsumerDmaapModel).verifyComplete();
+ .expectNext(expectedConsumerDmaapModel) //
+ .verifyComplete();
- verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
}
@Test
public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData(FTPES_LOCATION);
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
+
+ FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -221,14 +214,14 @@ public class XnfCollectorTaskImplTest {
@Test
public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest =
- new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8a572be4..8c4b3891 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -82,7 +82,7 @@ public class ScheduledTasksTest {
doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
doReturn(consumerMock).when(testedObject).createConsumerTask();
- doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector();
doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}