From f41226f4c3f6675019f6b60508d211a0df102e8a Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 20 Feb 2019 08:35:16 +0100 Subject: Improvement of the parallelism The reactive framework Scedulers uses to few threads. (the same number as the number of processors). That is too few for an io-intense application like this where CPU is not the limiting factor. Change-Id: Ia5f41e75716d309f47dce5f5273b739f7e6d136a Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr --- .../datafile/tasks/DataRouterPublisherTest.java | 59 ++--- .../datafile/tasks/ScheduledTasksTest.java | 285 +++++++++++++++++++++ 2 files changed, 315 insertions(+), 29 deletions(-) create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java (limited to 'datafile-app-server/src/test') diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index d2240f18..24b82fe6 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -61,37 +61,38 @@ class DataRouterPublisherTest { @BeforeAll public static void setUp() { - //@formatter:off + dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234) - .dmaapProtocol("https") - .dmaapUserName("DFC") - .dmaapUserPassword("DFC") - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) - .build(); + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234) // + .dmaapProtocol("https") // + .dmaapUserName("DFC") // + .dmaapUserPassword("DFC") // + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); // consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) - .internalLocation("target/" + PM_FILE_NAME) - .compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec") - .fileFormatVersion("V10") - .build(); + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .name(PM_FILE_NAME) // + .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) // + .internalLocation("target/" + PM_FILE_NAME) // + .compression("gzip") // + .fileFormatType("org.3GPP.32.435#measCollec") // + .fileFormatVersion("V10") // + .build(); // appConfig = mock(AppConfig.class); - //@formatter:on + + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); } @Test @@ -132,7 +133,7 @@ class DataRouterPublisherTest { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, nextHttpResponses); - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java new file mode 100644 index 00000000..0662216b --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -0,0 +1,285 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class ScheduledTasksTest { + + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + + private AppConfig appConfig = mock(AppConfig.class); + private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig)); + + private int uniqueValue = 0; + private DMaaPMessageConsumerTask consumerMock; + private FileCollector fileCollectorMock; + private DataRouterPublisher dataRouterMock; + + @BeforeEach + private void setUp() { + DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234) // + .dmaapProtocol("https") // + .dmaapUserName("DFC") // + .dmaapUserPassword("DFC") // + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); // + + consumerMock = mock(DMaaPMessageConsumerTask.class); + fileCollectorMock = mock(FileCollector.class); + dataRouterMock = mock(DataRouterPublisher.class); + + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); + doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull()); + doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher(); + } + + private MessageMetaData messageMetaData() { + return ImmutableMessageMetaData.builder() // + .productName("productName") // + .vendorName("") // + .lastEpochMicrosec("") // + .sourceName("") // + .startEpochMicrosec("") // + .timeZoneOffset("") // + .changeIdentifier("") // + .changeType("") // + .build(); + } + + private FileData fileData(int instanceNumber) { + return ImmutableFileData.builder() // + .name("name" + instanceNumber) // + .fileFormatType("") // + .fileFormatVersion("") // + .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) // + .scheme(Scheme.FTPS) // + .compression("") // + .build(); + } + + private List files(int size, boolean uniqueNames) { + List list = new LinkedList(); + for (int i = 0; i < size; ++i) { + if (uniqueNames) { + ++uniqueValue; + } + list.add(fileData(uniqueValue)); + } + return list; + } + + private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) { + MessageMetaData md = messageMetaData(); + return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md) + .files(files(numberOfFiles, uniqueNames)).build(); + } + + private Flux fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) { + List list = new LinkedList(); + for (int i = 0; i < numberOfEvents; ++i) { + list.add(createFileReadyMessage(filesPerEvent, uniqueNames)); + } + return Flux.fromIterable(list); + } + + private ConsumerDmaapModel consumerData() { + return ImmutableConsumerDmaapModel // + .builder() // + .productName("") // + .vendorName("") // + .lastEpochMicrosec("") // + .sourceName("") // + .startEpochMicrosec("") // + .timeZoneOffset("") // + .name("") // + .location("") // + .internalLocation("internalLocation") // + .compression("") // + .fileFormatType("") // + .fileFormatVersion("") // + .build(); + } + + @Test + public void notingToConsume() { + doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(Flux.empty()).when(consumerMock).execute(); + + testedObject.scheduleMainDatafileEventTask(); + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_successfulCase() { + final int noOfEvents = 200; + final int noOfFilesPerEvent = 200; + final int noOfFiles = noOfEvents * noOfFilesPerEvent; + + Flux fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true); + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(noOfFiles) // + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_fetchFailedOnce() { + Flux fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono collectedFile = Mono.just(consumerData()); + Mono error = Mono.error(new Exception("problem")); + + // First file collect will fail, 3 will succeed + doReturn(error, collectedFile, collectedFile, collectedFile) // + .when(fileCollectorMock) // + .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class)); + + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(3) // + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_publishFailedOnce() { + + Flux fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + + Mono error = Mono.error(new Exception("problem")); + // One publish will fail, the rest will succeed + doReturn(collectedFile, error, collectedFile, collectedFile) // + .when(dataRouterMock) // + .execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(3) // 3 completed files + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_successfulCase_sameFileNames() { + final int noOfEvents = 1; + final int noOfFilesPerEvent = 100; + + // 100 files with the same name + Flux fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false); + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(1) // 99 is skipped + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + +} -- cgit 1.2.3-korg