summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test/java
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-02-20 08:35:16 +0100
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-06 10:24:33 +0100
commitf41226f4c3f6675019f6b60508d211a0df102e8a (patch)
tree1a6dbc43050bd61b8400d429131119683c9c31f4 /datafile-app-server/src/test/java
parent2bf13db00a2733d6c56cd8f0c0d26905586bd39f (diff)
Improvement of the parallelism
The reactive framework Scedulers uses to few threads. (the same number as the number of processors). That is too few for an io-intense application like this where CPU is not the limiting factor. Change-Id: Ia5f41e75716d309f47dce5f5273b739f7e6d136a Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/test/java')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java59
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java285
2 files changed, 315 insertions, 29 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d2240f18..24b82fe6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -61,37 +61,38 @@ class DataRouterPublisherTest {
@BeforeAll
public static void setUp() {
- //@formatter:off
+
dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234)
- .dmaapProtocol("https")
- .dmaapUserName("DFC")
- .dmaapUserPassword("DFC")
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
- .build();
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME)
- .internalLocation("target/" + PM_FILE_NAME)
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
+ .internalLocation("target/" + PM_FILE_NAME) //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
+ .build(); //
appConfig = mock(AppConfig.class);
- //@formatter:on
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
}
@Test
@@ -132,7 +133,7 @@ class DataRouterPublisherTest {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
nextHttpResponses);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+
dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
new file mode 100644
index 00000000..0662216b
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -0,0 +1,285 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ScheduledTasksTest {
+
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+
+ private AppConfig appConfig = mock(AppConfig.class);
+ private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
+
+ private int uniqueValue = 0;
+ private DMaaPMessageConsumerTask consumerMock;
+ private FileCollector fileCollectorMock;
+ private DataRouterPublisher dataRouterMock;
+
+ @BeforeEach
+ private void setUp() {
+ DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
+
+ consumerMock = mock(DMaaPMessageConsumerTask.class);
+ fileCollectorMock = mock(FileCollector.class);
+ dataRouterMock = mock(DataRouterPublisher.class);
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+ doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
+ }
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder() //
+ .productName("productName") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .changeIdentifier("") //
+ .changeType("") //
+ .build();
+ }
+
+ private FileData fileData(int instanceNumber) {
+ return ImmutableFileData.builder() //
+ .name("name" + instanceNumber) //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
+ .scheme(Scheme.FTPS) //
+ .compression("") //
+ .build();
+ }
+
+ private List<FileData> files(int size, boolean uniqueNames) {
+ List<FileData> list = new LinkedList<FileData>();
+ for (int i = 0; i < size; ++i) {
+ if (uniqueNames) {
+ ++uniqueValue;
+ }
+ list.add(fileData(uniqueValue));
+ }
+ return list;
+ }
+
+ private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
+ MessageMetaData md = messageMetaData();
+ return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
+ .files(files(numberOfFiles, uniqueNames)).build();
+ }
+
+ private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
+ List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
+ for (int i = 0; i < numberOfEvents; ++i) {
+ list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
+ }
+ return Flux.fromIterable(list);
+ }
+
+ private ConsumerDmaapModel consumerData() {
+ return ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation("internalLocation") //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ }
+
+ @Test
+ public void notingToConsume() {
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(Flux.empty()).when(consumerMock).execute();
+
+ testedObject.scheduleMainDatafileEventTask();
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase() {
+ final int noOfEvents = 200;
+ final int noOfFilesPerEvent = 200;
+ final int noOfFiles = noOfEvents * noOfFilesPerEvent;
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(noOfFiles) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_fetchFailedOnce() {
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ Mono<Object> error = Mono.error(new Exception("problem"));
+
+ // First file collect will fail, 3 will succeed
+ doReturn(error, collectedFile, collectedFile, collectedFile) //
+ .when(fileCollectorMock) //
+ .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_publishFailedOnce() {
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+
+ Mono<Object> error = Mono.error(new Exception("problem"));
+ // One publish will fail, the rest will succeed
+ doReturn(collectedFile, error, collectedFile, collectedFile) //
+ .when(dataRouterMock) //
+ .execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) // 3 completed files
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase_sameFileNames() {
+ final int noOfEvents = 1;
+ final int noOfFilesPerEvent = 100;
+
+ // 100 files with the same name
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(1) // 99 is skipped
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+
+}