aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java158
1 files changed, 142 insertions, 16 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index a1021868..3df2edae 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -20,7 +20,10 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -32,13 +35,16 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
import java.nio.file.Paths;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-
+import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -56,7 +62,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformati
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.MDC;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -67,7 +75,7 @@ public class ScheduledTasksTest {
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private AppConfig appConfig = mock(AppConfig.class);
- private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
+ private ScheduledTasks testedObject;
private int uniqueValue = 0;
private DMaaPMessageConsumer consumerMock;
@@ -80,6 +88,20 @@ public class ScheduledTasksTest {
@BeforeEach
private void setUp() throws DatafileTaskException {
+ testedObject = spy(new ScheduledTasks(appConfig));
+
+ consumerMock = mock(DMaaPMessageConsumer.class);
+ publishedCheckerMock = mock(PublishedChecker.class);
+ fileCollectorMock = mock(FileCollector.class);
+ dataRouterMock = mock(DataRouterPublisher.class);
+
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector();
+ doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
+ }
+
+ private void setUpConfiguration() throws DatafileTaskException {
final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
.publishUrl(publishUrl) //
.logUrl("") //
@@ -103,16 +125,6 @@ public class ScheduledTasksTest {
doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
-
- consumerMock = mock(DMaaPMessageConsumer.class);
- publishedCheckerMock = mock(PublishedChecker.class);
- fileCollectorMock = mock(FileCollector.class);
- dataRouterMock = mock(DataRouterPublisher.class);
-
- doReturn(consumerMock).when(testedObject).createConsumerTask();
- doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
- doReturn(fileCollectorMock).when(testedObject).createFileCollector();
- doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}
private MessageMetaData messageMetaData() {
@@ -130,7 +142,7 @@ public class ScheduledTasksTest {
private FileData fileData(int instanceNumber) {
return ImmutableFileData.builder() //
- .name("name" + instanceNumber) //
+ .name(PM_FILE_NAME + instanceNumber) //
.fileFormatType("") //
.fileFormatVersion("") //
.location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
@@ -183,7 +195,18 @@ public class ScheduledTasksTest {
}
@Test
- public void notingToConsume() throws DatafileTaskException {
+ public void purgeFileCache() {
+ testedObject.publishedFilesCache.put(Paths.get("file.xml"));
+
+ testedObject.purgeCachedInformation(Instant.MAX);
+
+ assertEquals(0, testedObject.publishedFilesCacheSize());
+ }
+
+ @Test
+ public void nothingToConsume() throws DatafileTaskException {
+ setUpConfiguration();
+
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
@@ -195,7 +218,102 @@ public class ScheduledTasksTest {
}
@Test
+ public void skippingConsumeDueToCurrentNumberOfTasksGreaterThan50() {
+ doReturn(51).when(testedObject).getCurrentNumberOfTasks();
+
+ testedObject.executeDatafileMainTask();
+
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void executeDatafileMainTask_successfulCase() throws DatafileTaskException {
+ setUpConfiguration();
+
+ final int noOfEvents = 1;
+ final int noOfFilesPerEvent = 1;
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+ doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
+
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
+
+ Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
+ doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
+
+ testedObject.executeDatafileMainTask();
+
+ await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
+
+ assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
+
+ verify(appConfig).getDmaapConsumerConfiguration();
+ verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
+ verifyNoMoreInteractions(appConfig);
+ }
+
+ @Test
+ public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
+ final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(publishUrl) //
+ .logUrl("") //
+ .userName("userName") //
+ .passWord("passWord") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .changeIdentifier("Different changeIdentifier") //
+ .build(); //
+ final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
+ .topicUrl("topicUrl").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
+ doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
+ doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
+ final int noOfEvents = 1;
+ final int noOfFilesPerEvent = 1;
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+ doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
+
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
+ testedObject.executeDatafileMainTask();
+
+ await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
+
+ assertTrue("Error missing in log", logAppender.list.toString().contains(
+ "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
+ }
+
+ @Test
+ public void createMainTask_consumeFail() {
+ MDC.setContextMap(contextMap);
+ doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
+
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
+ StepVerifier //
+ .create(testedObject.createMainTask(contextMap)) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .expectComplete() //
+ .verify(); //
+
+ assertTrue("Error missing in log", logAppender.list.toString().contains(
+ "[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"));
+ }
+
+ @Test
public void consume_successfulCase() throws DatafileTaskException {
+ setUpConfiguration();
+
final int noOfEvents = 200;
final int noOfFilesPerEvent = 200;
final int noOfFiles = noOfEvents * noOfFilesPerEvent;
@@ -228,6 +346,8 @@ public class ScheduledTasksTest {
@Test
public void consume_fetchFailedOnce() throws DatafileTaskException {
+ setUpConfiguration();
+
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
@@ -262,6 +382,7 @@ public class ScheduledTasksTest {
@Test
public void consume_publishFailedOnce() throws DatafileTaskException {
+ setUpConfiguration();
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
@@ -277,6 +398,7 @@ public class ScheduledTasksTest {
.when(dataRouterMock) //
.publishFile(notNull(), anyLong(), notNull());
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
StepVerifier //
.create(testedObject.createMainTask(contextMap)) //
.expectSubscription() //
@@ -284,6 +406,8 @@ public class ScheduledTasksTest {
.expectComplete() //
.verify(); //
+ assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
+
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -295,6 +419,8 @@ public class ScheduledTasksTest {
@Test
public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
+ setUpConfiguration();
+
final int noOfEvents = 1;
final int noOfFilesPerEvent = 100;