summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Wrobel <tomasz.wrobel@nokia.com>2021-02-03 09:32:09 +0100
committerTomasz Wrobel <tomasz.wrobel@nokia.com>2021-02-09 12:30:22 +0100
commit1b65e0e1fd3a9d1026db31c010d120e4400e378d (patch)
tree93722f15733762e340915c6490090a68918119d1
parent4586462127dc2c024e9636ccafeb5a03aa319798 (diff)
Add configuration number of threads in files processing.
Issue-ID: DCAEGEN2-2600 Signed-off-by: Tomasz Wrobel <tomasz.wrobel@nokia.com> Change-Id: I04feb27698ef21196ab218de4bdd97be1fc85284
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java26
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java64
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java31
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java77
4 files changed, 184 insertions, 14 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 7aab08a..4583df3 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +31,8 @@ import lombok.Data;
import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
+import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader;
+import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
@@ -65,6 +68,9 @@ import java.util.concurrent.TimeUnit;
@Data
public class App {
+
+ private static final int PREFETCH_ONE_PER_THREAD = 1;
+
static {
System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml");
}
@@ -77,6 +83,8 @@ public class App {
private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
+ private final FilesProcessingConfig processingConfig;
+
private MapperConfig mapperConfig;
private MetadataFilter metadataFilter;
private MeasConverter measConverter;
@@ -105,13 +113,16 @@ public class App {
* @param httpsPort https port to start https server on.
* @param configHandler instance of the ConfigurationHandler used to acquire config.
*/
- public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler) {
+ public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig)
+ throws EnvironmentConfigException {
try {
this.mapperConfig = configHandler.getMapperConfig();
} catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) {
logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e);
throw new IllegalStateException("Config acquisition failed");
}
+ this.processingConfig = filesProcessingConfig;
+
this.httpPort = httpPort;
this.httpsPort = httpsPort;
this.metadataFilter = new MetadataFilter(mapperConfig);
@@ -124,11 +135,13 @@ public class App {
this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink);
this.configScheduler = Schedulers.newSingle("Config");
+ int processingThreads = processingConfig.getThreadsCount();
+
this.flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
- .limitRate(1)
- .parallel()
- .runOn(Schedulers.newParallel(""), 1)
+ .limitRate(processingConfig.getLimitRate())
+ .parallel(processingThreads)
+ .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
.filter(this.metadataFilter::filter)
.filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig))
@@ -191,8 +204,9 @@ public class App {
}
}
- public static void main(String[] args) {
- new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler()).start();
+ public static void main(String[] args) throws EnvironmentConfigException {
+ FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
+ new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
}
public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
index ab549e4..c79f059 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
@@ -33,8 +33,13 @@ public class FilesProcessingConfig {
private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE";
private static final int DEFAULT_LIMIT_RATE = 1;
- private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(FilesProcessingConfig.class));
- private EnvironmentReader environmentReader;
+ private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER";
+ private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT";
+ private static final int DEFAULT_MULTIPLIER = 1;
+
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(
+ LoggerFactory.getLogger(FilesProcessingConfig.class));
+ private final EnvironmentReader environmentReader;
/**
* Creates a FilesProcessingConfig
@@ -61,6 +66,26 @@ public class FilesProcessingConfig {
}
}
+ /**
+ * Provides reactor parallel threads count from environment variable.
+ *
+ * @throws EnvironmentConfigException
+ * @returns value of threads count
+ */
+ public int getThreadsCount() throws EnvironmentConfigException {
+ logger.unwrap().info("Attempt to read threads configuration");
+ int processingThreadsCount = getProcessingThreadsCount();
+ int threadsMultiplier = getThreadsMultiplier();
+ int processingThreadsAmount = processingThreadsCount * threadsMultiplier;
+
+ logger.unwrap().info(
+ "Processing threads configuration: Processing threads count - {}, Processing threads multiplier - {} ",
+ processingThreadsCount, threadsMultiplier);
+ logger.unwrap().info("Amount of files processing threads: {} ", processingThreadsAmount);
+
+ return processingThreadsAmount;
+ }
+
private Integer parseIntegerValue(String val) throws NumberFormatException {
Integer value = Integer.valueOf(val);
logger.unwrap().info(ENV_LIMIT_RATE + " value is: " + value);
@@ -71,4 +96,39 @@ public class FilesProcessingConfig {
logger.unwrap().info(ENV_LIMIT_RATE + " env not present. Setting limit rate to default value: " + DEFAULT_LIMIT_RATE);
return DEFAULT_LIMIT_RATE;
}
+
+ private int getThreadsMultiplier() throws EnvironmentConfigException {
+ try {
+ return Optional.ofNullable(environmentReader.getVariable(ENV_THREADS_MULTIPLIER))
+ .map(Integer::valueOf)
+ .orElseGet(this::getDefaultMultiplier);
+ } catch (NumberFormatException exception) {
+ throw new EnvironmentConfigException(
+ ENV_THREADS_MULTIPLIER + " environment variable has incorrect value.\n", exception);
+ }
+ }
+
+ private int getDefaultMultiplier() {
+ logger.unwrap().info(ENV_THREADS_MULTIPLIER +
+ " env not present. Setting multiplier to default value: " + DEFAULT_MULTIPLIER);
+ return DEFAULT_MULTIPLIER;
+ }
+
+ private int getProcessingThreadsCount() throws EnvironmentConfigException {
+ try {
+ return Optional.ofNullable(environmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT))
+ .map(Integer::valueOf)
+ .orElseGet(this::getDefaultThreadsCount);
+ } catch (NumberFormatException exception) {
+ throw new EnvironmentConfigException(
+ ENV_PROCESSING_THREADS_COUNT + " environment variable has incorrect value.\n", exception);
+ }
+ }
+
+ private int getDefaultThreadsCount() {
+ int defaultThreadsCount = Runtime.getRuntime().availableProcessors();
+ logger.unwrap().info(ENV_PROCESSING_THREADS_COUNT +
+ " env not present. Setting threads count to available cores: " + defaultThreadsCount);
+ return defaultThreadsCount;
+ }
}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
index 0b8cdfc..db45029 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,6 +52,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
+import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
import reactor.core.publisher.Flux;
@@ -76,6 +78,7 @@ import utils.EventUtils;
@ExtendWith(MockitoExtension.class)
class AppTest {
+ public static final int WANTED_NUMBER_OF_INVOCATIONS_1 = 1;
static ClientAndServer mockServer;
static MockServerClient client;
@@ -91,6 +94,7 @@ class AppTest {
private App objUnderTest;
+ private final FilesProcessingConfig processingConfig = mock(FilesProcessingConfig.class);
@BeforeAll
static void setup() {
@@ -105,8 +109,10 @@ class AppTest {
}
@BeforeEach
- void beforeEach() {
+ void beforeEach() throws EnvironmentConfigException {
configHandler = mock(ConfigHandler.class);
+ when(this.processingConfig.getLimitRate()).thenReturn(1);
+ when(this.processingConfig.getThreadsCount()).thenReturn(1);
}
@Test
@@ -115,7 +121,7 @@ class AppTest {
MapperConfig mockConfig = Mockito.spy(mapperConfig);
when(mockConfig.getEnableHttp()).thenReturn(false);
when(configHandler.getMapperConfig()).thenReturn(mockConfig);
- objUnderTest = new App(template, schema, 0, 0, configHandler);
+ objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig);
objUnderTest.start();
assertEquals(1, objUnderTest.getApplicationServer().getListenerInfo().size());
assertEquals("https", objUnderTest.getApplicationServer().getListenerInfo().get(0).getProtcol());
@@ -127,7 +133,7 @@ class AppTest {
MapperConfig mockConfig = Mockito.spy(mapperConfig);
when(mockConfig.getEnableHttp()).thenReturn(true);
when(configHandler.getMapperConfig()).thenReturn(mockConfig);
- objUnderTest = new App(template, schema, 0, 0, configHandler);
+ objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig);
objUnderTest.start();
assertEquals(2, objUnderTest.getApplicationServer().getListenerInfo().size());
assertEquals("http", objUnderTest.getApplicationServer().getListenerInfo().get(0).getProtcol());
@@ -137,7 +143,7 @@ class AppTest {
@Test
void testConfigFailure() throws EnvironmentConfigException, CBSServerError, MapperConfigException {
when(configHandler.getMapperConfig()).thenThrow(MapperConfigException.class);
- assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler));
+ assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler, processingConfig));
}
@@ -146,7 +152,7 @@ class AppTest {
MapperConfig mockConfig = Mockito.spy(mapperConfig);
when(mockConfig.getKeyStorePath()).thenReturn("not_a_file");
when(configHandler.getMapperConfig()).thenReturn(mockConfig);
- assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler));
+ assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler, processingConfig));
}
@@ -319,4 +325,19 @@ class AppTest {
Flux<List<Event>> mappingResult = App.map(new Mapper(mappingTemplate,mockMeasConverter), mockEvents, mockConfig);
assertEquals(mappingResult, Flux.<List<Event>>empty());
}
+
+ @Test
+ void filesProcessingConfiguration_IsReadInMainApp() throws Exception {
+ MapperConfig mockConfig = Mockito.spy(mapperConfig);
+ when(mockConfig.getEnableHttp()).thenReturn(true);
+ when(configHandler.getMapperConfig()).thenReturn(mockConfig);
+ objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig);
+ objUnderTest.start();
+
+ verify(processingConfig, times(WANTED_NUMBER_OF_INVOCATIONS_1)).getLimitRate();
+ verify(processingConfig, times(WANTED_NUMBER_OF_INVOCATIONS_1)).getThreadsCount();
+
+ objUnderTest.stop();
+ }
+
}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java
index fd21a39..b8d0a1f 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java
@@ -32,7 +32,15 @@ public class FilesProcessingConfigTest {
private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE";
- private EnvironmentReader mockEnvironmentReader = mock(EnvironmentReader.class);
+ private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER";
+ private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT";
+ private static final String THREADS_4 = "4";
+ private static final String MULTIPLIER_3 = "3";
+
+ private static final int EXPECTED_4 = 4;
+ private static final int EXPECTED_12 = 12;
+
+ private final EnvironmentReader mockEnvironmentReader = mock(EnvironmentReader.class);
private FilesProcessingConfig filesProcessingConfig;
@Test
@@ -63,4 +71,71 @@ public class FilesProcessingConfigTest {
assertEquals(1, limitRate);
}
+
+ @Test
+ public void shouldReturnCorrectThreadsCount_whenVariableIsSet() throws EnvironmentConfigException {
+ when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn(THREADS_4);
+ when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn(MULTIPLIER_3);
+
+ filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+ int threadsCount = filesProcessingConfig.getThreadsCount();
+
+ assertEquals(EXPECTED_12, threadsCount);
+ }
+
+ @Test
+ public void shouldReturnCorrectThreadsCount_whenVariableMultiplierIsNotSet() throws EnvironmentConfigException {
+
+ when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn(THREADS_4);
+
+ filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+ int threadsCount = filesProcessingConfig.getThreadsCount();
+
+ assertEquals(EXPECTED_4, threadsCount);
+ }
+
+ @Test
+ public void shouldReturnCorrectThreadsCount_whenVariableThreadsIsNotSet() throws EnvironmentConfigException {
+ when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn(MULTIPLIER_3);
+
+ filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+ int threadsCount = filesProcessingConfig.getThreadsCount();
+ int expected = Runtime.getRuntime().availableProcessors() * Integer.parseInt(MULTIPLIER_3);
+
+ assertEquals(expected, threadsCount);
+ }
+
+ @Test
+ public void shouldReturnCorrectThreadsCount_whenVariableThreadsAndMultiplierIsNotSet()
+ throws EnvironmentConfigException {
+ filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+ int threadsCount = filesProcessingConfig.getThreadsCount();
+ int expected = Runtime.getRuntime().availableProcessors();
+
+ assertEquals(expected, threadsCount);
+ }
+
+ @Test
+ public void shouldThrowEnvironmentConfigException_whenProcessingThreadsVariableHasWrongValue() {
+ when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn("not-an-int");
+ filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+ String expectedMessage = "PROCESSING_THREADS_COUNT environment variable has incorrect value.\n";
+ String causeMessage = "For input string: \"not-an-int\"";
+
+ Throwable exception = assertThrows(EnvironmentConfigException.class, () -> filesProcessingConfig.getThreadsCount());
+ assertEquals(expectedMessage, exception.getMessage());
+ assertEquals(causeMessage, exception.getCause().getMessage());
+ }
+
+ @Test
+ public void shouldThrowEnvironmentConfigException_whenThreadsMultiplierVariableHasWrongValue() {
+ when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn("not-an-int");
+ filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+ String expectedMessage = "THREADS_MULTIPLIER environment variable has incorrect value.\n";
+ String causeMessage = "For input string: \"not-an-int\"";
+
+ Throwable exception = assertThrows(EnvironmentConfigException.class, () -> filesProcessingConfig.getThreadsCount());
+ assertEquals(expectedMessage, exception.getMessage());
+ assertEquals(causeMessage, exception.getCause().getMessage());
+ }
}