diff options
author | Tomasz Wrobel <tomasz.wrobel@nokia.com> | 2021-02-03 09:32:09 +0100 |
---|---|---|
committer | Tomasz Wrobel <tomasz.wrobel@nokia.com> | 2021-02-09 12:30:22 +0100 |
commit | 1b65e0e1fd3a9d1026db31c010d120e4400e378d (patch) | |
tree | 93722f15733762e340915c6490090a68918119d1 | |
parent | 4586462127dc2c024e9636ccafeb5a03aa319798 (diff) |
Add configuration number of threads in files processing.
Issue-ID: DCAEGEN2-2600
Signed-off-by: Tomasz Wrobel <tomasz.wrobel@nokia.com>
Change-Id: I04feb27698ef21196ab218de4bdd97be1fc85284
4 files changed, 184 insertions, 14 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 7aab08a..4583df3 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +31,8 @@ import lombok.Data; import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration; +import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader; +import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig; import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; @@ -65,6 +68,9 @@ import java.util.concurrent.TimeUnit; @Data public class App { + + private static final int PREFETCH_ONE_PER_THREAD = 1; + static { System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml"); } @@ -77,6 +83,8 @@ public class App { private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/"); private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/"); + private final FilesProcessingConfig processingConfig; + private MapperConfig mapperConfig; private MetadataFilter metadataFilter; private MeasConverter measConverter; @@ -105,13 +113,16 @@ public class App { * @param httpsPort https port to start https server on. * @param configHandler instance of the ConfigurationHandler used to acquire config. */ - public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler) { + public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig) + throws EnvironmentConfigException { try { this.mapperConfig = configHandler.getMapperConfig(); } catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) { logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e); throw new IllegalStateException("Config acquisition failed"); } + this.processingConfig = filesProcessingConfig; + this.httpPort = httpPort; this.httpsPort = httpsPort; this.metadataFilter = new MetadataFilter(mapperConfig); @@ -124,11 +135,13 @@ public class App { this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink); this.configScheduler = Schedulers.newSingle("Config"); + int processingThreads = processingConfig.getThreadsCount(); + this.flux.onBackpressureDrop(App::handleBackPressure) .doOnNext(App::receiveRequest) - .limitRate(1) - .parallel() - .runOn(Schedulers.newParallel(""), 1) + .limitRate(processingConfig.getLimitRate()) + .parallel(processingThreads) + .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD) .doOnNext(event -> MDC.setContextMap(event.getMdc())) .filter(this.metadataFilter::filter) .filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig)) @@ -191,8 +204,9 @@ public class App { } } - public static void main(String[] args) { - new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler()).start(); + public static void main(String[] args) throws EnvironmentConfigException { + FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader()); + new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start(); } public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) { diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java index ab549e4..c79f059 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java @@ -33,8 +33,13 @@ public class FilesProcessingConfig { private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE"; private static final int DEFAULT_LIMIT_RATE = 1; - private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(FilesProcessingConfig.class)); - private EnvironmentReader environmentReader; + private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER"; + private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT"; + private static final int DEFAULT_MULTIPLIER = 1; + + private static final ONAPLogAdapter logger = new ONAPLogAdapter( + LoggerFactory.getLogger(FilesProcessingConfig.class)); + private final EnvironmentReader environmentReader; /** * Creates a FilesProcessingConfig @@ -61,6 +66,26 @@ public class FilesProcessingConfig { } } + /** + * Provides reactor parallel threads count from environment variable. + * + * @throws EnvironmentConfigException + * @returns value of threads count + */ + public int getThreadsCount() throws EnvironmentConfigException { + logger.unwrap().info("Attempt to read threads configuration"); + int processingThreadsCount = getProcessingThreadsCount(); + int threadsMultiplier = getThreadsMultiplier(); + int processingThreadsAmount = processingThreadsCount * threadsMultiplier; + + logger.unwrap().info( + "Processing threads configuration: Processing threads count - {}, Processing threads multiplier - {} ", + processingThreadsCount, threadsMultiplier); + logger.unwrap().info("Amount of files processing threads: {} ", processingThreadsAmount); + + return processingThreadsAmount; + } + private Integer parseIntegerValue(String val) throws NumberFormatException { Integer value = Integer.valueOf(val); logger.unwrap().info(ENV_LIMIT_RATE + " value is: " + value); @@ -71,4 +96,39 @@ public class FilesProcessingConfig { logger.unwrap().info(ENV_LIMIT_RATE + " env not present. Setting limit rate to default value: " + DEFAULT_LIMIT_RATE); return DEFAULT_LIMIT_RATE; } + + private int getThreadsMultiplier() throws EnvironmentConfigException { + try { + return Optional.ofNullable(environmentReader.getVariable(ENV_THREADS_MULTIPLIER)) + .map(Integer::valueOf) + .orElseGet(this::getDefaultMultiplier); + } catch (NumberFormatException exception) { + throw new EnvironmentConfigException( + ENV_THREADS_MULTIPLIER + " environment variable has incorrect value.\n", exception); + } + } + + private int getDefaultMultiplier() { + logger.unwrap().info(ENV_THREADS_MULTIPLIER + + " env not present. Setting multiplier to default value: " + DEFAULT_MULTIPLIER); + return DEFAULT_MULTIPLIER; + } + + private int getProcessingThreadsCount() throws EnvironmentConfigException { + try { + return Optional.ofNullable(environmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)) + .map(Integer::valueOf) + .orElseGet(this::getDefaultThreadsCount); + } catch (NumberFormatException exception) { + throw new EnvironmentConfigException( + ENV_PROCESSING_THREADS_COUNT + " environment variable has incorrect value.\n", exception); + } + } + + private int getDefaultThreadsCount() { + int defaultThreadsCount = Runtime.getRuntime().availableProcessors(); + logger.unwrap().info(ENV_PROCESSING_THREADS_COUNT + + " env not present. Setting threads count to available cores: " + defaultThreadsCount); + return defaultThreadsCount; + } } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java index 0b8cdfc..db45029 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +52,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; +import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig; import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator; import reactor.core.publisher.Flux; @@ -76,6 +78,7 @@ import utils.EventUtils; @ExtendWith(MockitoExtension.class) class AppTest { + public static final int WANTED_NUMBER_OF_INVOCATIONS_1 = 1; static ClientAndServer mockServer; static MockServerClient client; @@ -91,6 +94,7 @@ class AppTest { private App objUnderTest; + private final FilesProcessingConfig processingConfig = mock(FilesProcessingConfig.class); @BeforeAll static void setup() { @@ -105,8 +109,10 @@ class AppTest { } @BeforeEach - void beforeEach() { + void beforeEach() throws EnvironmentConfigException { configHandler = mock(ConfigHandler.class); + when(this.processingConfig.getLimitRate()).thenReturn(1); + when(this.processingConfig.getThreadsCount()).thenReturn(1); } @Test @@ -115,7 +121,7 @@ class AppTest { MapperConfig mockConfig = Mockito.spy(mapperConfig); when(mockConfig.getEnableHttp()).thenReturn(false); when(configHandler.getMapperConfig()).thenReturn(mockConfig); - objUnderTest = new App(template, schema, 0, 0, configHandler); + objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig); objUnderTest.start(); assertEquals(1, objUnderTest.getApplicationServer().getListenerInfo().size()); assertEquals("https", objUnderTest.getApplicationServer().getListenerInfo().get(0).getProtcol()); @@ -127,7 +133,7 @@ class AppTest { MapperConfig mockConfig = Mockito.spy(mapperConfig); when(mockConfig.getEnableHttp()).thenReturn(true); when(configHandler.getMapperConfig()).thenReturn(mockConfig); - objUnderTest = new App(template, schema, 0, 0, configHandler); + objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig); objUnderTest.start(); assertEquals(2, objUnderTest.getApplicationServer().getListenerInfo().size()); assertEquals("http", objUnderTest.getApplicationServer().getListenerInfo().get(0).getProtcol()); @@ -137,7 +143,7 @@ class AppTest { @Test void testConfigFailure() throws EnvironmentConfigException, CBSServerError, MapperConfigException { when(configHandler.getMapperConfig()).thenThrow(MapperConfigException.class); - assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler)); + assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler, processingConfig)); } @@ -146,7 +152,7 @@ class AppTest { MapperConfig mockConfig = Mockito.spy(mapperConfig); when(mockConfig.getKeyStorePath()).thenReturn("not_a_file"); when(configHandler.getMapperConfig()).thenReturn(mockConfig); - assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler)); + assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler, processingConfig)); } @@ -319,4 +325,19 @@ class AppTest { Flux<List<Event>> mappingResult = App.map(new Mapper(mappingTemplate,mockMeasConverter), mockEvents, mockConfig); assertEquals(mappingResult, Flux.<List<Event>>empty()); } + + @Test + void filesProcessingConfiguration_IsReadInMainApp() throws Exception { + MapperConfig mockConfig = Mockito.spy(mapperConfig); + when(mockConfig.getEnableHttp()).thenReturn(true); + when(configHandler.getMapperConfig()).thenReturn(mockConfig); + objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig); + objUnderTest.start(); + + verify(processingConfig, times(WANTED_NUMBER_OF_INVOCATIONS_1)).getLimitRate(); + verify(processingConfig, times(WANTED_NUMBER_OF_INVOCATIONS_1)).getThreadsCount(); + + objUnderTest.stop(); + } + } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java index fd21a39..b8d0a1f 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java @@ -32,7 +32,15 @@ public class FilesProcessingConfigTest { private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE"; - private EnvironmentReader mockEnvironmentReader = mock(EnvironmentReader.class); + private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER"; + private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT"; + private static final String THREADS_4 = "4"; + private static final String MULTIPLIER_3 = "3"; + + private static final int EXPECTED_4 = 4; + private static final int EXPECTED_12 = 12; + + private final EnvironmentReader mockEnvironmentReader = mock(EnvironmentReader.class); private FilesProcessingConfig filesProcessingConfig; @Test @@ -63,4 +71,71 @@ public class FilesProcessingConfigTest { assertEquals(1, limitRate); } + + @Test + public void shouldReturnCorrectThreadsCount_whenVariableIsSet() throws EnvironmentConfigException { + when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn(THREADS_4); + when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn(MULTIPLIER_3); + + filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader); + int threadsCount = filesProcessingConfig.getThreadsCount(); + + assertEquals(EXPECTED_12, threadsCount); + } + + @Test + public void shouldReturnCorrectThreadsCount_whenVariableMultiplierIsNotSet() throws EnvironmentConfigException { + + when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn(THREADS_4); + + filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader); + int threadsCount = filesProcessingConfig.getThreadsCount(); + + assertEquals(EXPECTED_4, threadsCount); + } + + @Test + public void shouldReturnCorrectThreadsCount_whenVariableThreadsIsNotSet() throws EnvironmentConfigException { + when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn(MULTIPLIER_3); + + filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader); + int threadsCount = filesProcessingConfig.getThreadsCount(); + int expected = Runtime.getRuntime().availableProcessors() * Integer.parseInt(MULTIPLIER_3); + + assertEquals(expected, threadsCount); + } + + @Test + public void shouldReturnCorrectThreadsCount_whenVariableThreadsAndMultiplierIsNotSet() + throws EnvironmentConfigException { + filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader); + int threadsCount = filesProcessingConfig.getThreadsCount(); + int expected = Runtime.getRuntime().availableProcessors(); + + assertEquals(expected, threadsCount); + } + + @Test + public void shouldThrowEnvironmentConfigException_whenProcessingThreadsVariableHasWrongValue() { + when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn("not-an-int"); + filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader); + String expectedMessage = "PROCESSING_THREADS_COUNT environment variable has incorrect value.\n"; + String causeMessage = "For input string: \"not-an-int\""; + + Throwable exception = assertThrows(EnvironmentConfigException.class, () -> filesProcessingConfig.getThreadsCount()); + assertEquals(expectedMessage, exception.getMessage()); + assertEquals(causeMessage, exception.getCause().getMessage()); + } + + @Test + public void shouldThrowEnvironmentConfigException_whenThreadsMultiplierVariableHasWrongValue() { + when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn("not-an-int"); + filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader); + String expectedMessage = "THREADS_MULTIPLIER environment variable has incorrect value.\n"; + String causeMessage = "For input string: \"not-an-int\""; + + Throwable exception = assertThrows(EnvironmentConfigException.class, () -> filesProcessingConfig.getThreadsCount()); + assertEquals(expectedMessage, exception.getMessage()); + assertEquals(causeMessage, exception.getCause().getMessage()); + } } |