diff options
author | Tomasz Wrobel <tomasz.wrobel@nokia.com> | 2021-02-03 09:32:09 +0100 |
---|---|---|
committer | Tomasz Wrobel <tomasz.wrobel@nokia.com> | 2021-02-09 12:30:22 +0100 |
commit | 1b65e0e1fd3a9d1026db31c010d120e4400e378d (patch) | |
tree | 93722f15733762e340915c6490090a68918119d1 /src/main/java | |
parent | 4586462127dc2c024e9636ccafeb5a03aa319798 (diff) |
Add configuration number of threads in files processing.
Issue-ID: DCAEGEN2-2600
Signed-off-by: Tomasz Wrobel <tomasz.wrobel@nokia.com>
Change-Id: I04feb27698ef21196ab218de4bdd97be1fc85284
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/pmmapper/App.java | 26 | ||||
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java | 64 |
2 files changed, 82 insertions, 8 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 7aab08a..4583df3 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2021 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +31,8 @@ import lombok.Data; import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration; +import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader; +import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig; import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; @@ -65,6 +68,9 @@ import java.util.concurrent.TimeUnit; @Data public class App { + + private static final int PREFETCH_ONE_PER_THREAD = 1; + static { System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml"); } @@ -77,6 +83,8 @@ public class App { private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/"); private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/"); + private final FilesProcessingConfig processingConfig; + private MapperConfig mapperConfig; private MetadataFilter metadataFilter; private MeasConverter measConverter; @@ -105,13 +113,16 @@ public class App { * @param httpsPort https port to start https server on. * @param configHandler instance of the ConfigurationHandler used to acquire config. */ - public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler) { + public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig) + throws EnvironmentConfigException { try { this.mapperConfig = configHandler.getMapperConfig(); } catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) { logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e); throw new IllegalStateException("Config acquisition failed"); } + this.processingConfig = filesProcessingConfig; + this.httpPort = httpPort; this.httpsPort = httpsPort; this.metadataFilter = new MetadataFilter(mapperConfig); @@ -124,11 +135,13 @@ public class App { this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink); this.configScheduler = Schedulers.newSingle("Config"); + int processingThreads = processingConfig.getThreadsCount(); + this.flux.onBackpressureDrop(App::handleBackPressure) .doOnNext(App::receiveRequest) - .limitRate(1) - .parallel() - .runOn(Schedulers.newParallel(""), 1) + .limitRate(processingConfig.getLimitRate()) + .parallel(processingThreads) + .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD) .doOnNext(event -> MDC.setContextMap(event.getMdc())) .filter(this.metadataFilter::filter) .filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig)) @@ -191,8 +204,9 @@ public class App { } } - public static void main(String[] args) { - new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler()).start(); + public static void main(String[] args) throws EnvironmentConfigException { + FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader()); + new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start(); } public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) { diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java index ab549e4..c79f059 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java @@ -33,8 +33,13 @@ public class FilesProcessingConfig { private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE"; private static final int DEFAULT_LIMIT_RATE = 1; - private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(FilesProcessingConfig.class)); - private EnvironmentReader environmentReader; + private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER"; + private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT"; + private static final int DEFAULT_MULTIPLIER = 1; + + private static final ONAPLogAdapter logger = new ONAPLogAdapter( + LoggerFactory.getLogger(FilesProcessingConfig.class)); + private final EnvironmentReader environmentReader; /** * Creates a FilesProcessingConfig @@ -61,6 +66,26 @@ public class FilesProcessingConfig { } } + /** + * Provides reactor parallel threads count from environment variable. + * + * @throws EnvironmentConfigException + * @returns value of threads count + */ + public int getThreadsCount() throws EnvironmentConfigException { + logger.unwrap().info("Attempt to read threads configuration"); + int processingThreadsCount = getProcessingThreadsCount(); + int threadsMultiplier = getThreadsMultiplier(); + int processingThreadsAmount = processingThreadsCount * threadsMultiplier; + + logger.unwrap().info( + "Processing threads configuration: Processing threads count - {}, Processing threads multiplier - {} ", + processingThreadsCount, threadsMultiplier); + logger.unwrap().info("Amount of files processing threads: {} ", processingThreadsAmount); + + return processingThreadsAmount; + } + private Integer parseIntegerValue(String val) throws NumberFormatException { Integer value = Integer.valueOf(val); logger.unwrap().info(ENV_LIMIT_RATE + " value is: " + value); @@ -71,4 +96,39 @@ public class FilesProcessingConfig { logger.unwrap().info(ENV_LIMIT_RATE + " env not present. Setting limit rate to default value: " + DEFAULT_LIMIT_RATE); return DEFAULT_LIMIT_RATE; } + + private int getThreadsMultiplier() throws EnvironmentConfigException { + try { + return Optional.ofNullable(environmentReader.getVariable(ENV_THREADS_MULTIPLIER)) + .map(Integer::valueOf) + .orElseGet(this::getDefaultMultiplier); + } catch (NumberFormatException exception) { + throw new EnvironmentConfigException( + ENV_THREADS_MULTIPLIER + " environment variable has incorrect value.\n", exception); + } + } + + private int getDefaultMultiplier() { + logger.unwrap().info(ENV_THREADS_MULTIPLIER + + " env not present. Setting multiplier to default value: " + DEFAULT_MULTIPLIER); + return DEFAULT_MULTIPLIER; + } + + private int getProcessingThreadsCount() throws EnvironmentConfigException { + try { + return Optional.ofNullable(environmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)) + .map(Integer::valueOf) + .orElseGet(this::getDefaultThreadsCount); + } catch (NumberFormatException exception) { + throw new EnvironmentConfigException( + ENV_PROCESSING_THREADS_COUNT + " environment variable has incorrect value.\n", exception); + } + } + + private int getDefaultThreadsCount() { + int defaultThreadsCount = Runtime.getRuntime().availableProcessors(); + logger.unwrap().info(ENV_PROCESSING_THREADS_COUNT + + " env not present. Setting threads count to available cores: " + defaultThreadsCount); + return defaultThreadsCount; + } } |