summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorTomasz Wrobel <tomasz.wrobel@nokia.com>2021-02-03 09:32:09 +0100
committerTomasz Wrobel <tomasz.wrobel@nokia.com>2021-02-09 12:30:22 +0100
commit1b65e0e1fd3a9d1026db31c010d120e4400e378d (patch)
tree93722f15733762e340915c6490090a68918119d1 /src/main/java
parent4586462127dc2c024e9636ccafeb5a03aa319798 (diff)
Add configuration number of threads in files processing.
Issue-ID: DCAEGEN2-2600 Signed-off-by: Tomasz Wrobel <tomasz.wrobel@nokia.com> Change-Id: I04feb27698ef21196ab218de4bdd97be1fc85284
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java26
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java64
2 files changed, 82 insertions, 8 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 7aab08a..4583df3 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2020 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +31,8 @@ import lombok.Data;
import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
+import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader;
+import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
@@ -65,6 +68,9 @@ import java.util.concurrent.TimeUnit;
@Data
public class App {
+
+ private static final int PREFETCH_ONE_PER_THREAD = 1;
+
static {
System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml");
}
@@ -77,6 +83,8 @@ public class App {
private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
+ private final FilesProcessingConfig processingConfig;
+
private MapperConfig mapperConfig;
private MetadataFilter metadataFilter;
private MeasConverter measConverter;
@@ -105,13 +113,16 @@ public class App {
* @param httpsPort https port to start https server on.
* @param configHandler instance of the ConfigurationHandler used to acquire config.
*/
- public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler) {
+ public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig)
+ throws EnvironmentConfigException {
try {
this.mapperConfig = configHandler.getMapperConfig();
} catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) {
logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e);
throw new IllegalStateException("Config acquisition failed");
}
+ this.processingConfig = filesProcessingConfig;
+
this.httpPort = httpPort;
this.httpsPort = httpsPort;
this.metadataFilter = new MetadataFilter(mapperConfig);
@@ -124,11 +135,13 @@ public class App {
this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink);
this.configScheduler = Schedulers.newSingle("Config");
+ int processingThreads = processingConfig.getThreadsCount();
+
this.flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
- .limitRate(1)
- .parallel()
- .runOn(Schedulers.newParallel(""), 1)
+ .limitRate(processingConfig.getLimitRate())
+ .parallel(processingThreads)
+ .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
.filter(this.metadataFilter::filter)
.filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig))
@@ -191,8 +204,9 @@ public class App {
}
}
- public static void main(String[] args) {
- new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler()).start();
+ public static void main(String[] args) throws EnvironmentConfigException {
+ FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
+ new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
}
public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
index ab549e4..c79f059 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
@@ -33,8 +33,13 @@ public class FilesProcessingConfig {
private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE";
private static final int DEFAULT_LIMIT_RATE = 1;
- private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(FilesProcessingConfig.class));
- private EnvironmentReader environmentReader;
+ private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER";
+ private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT";
+ private static final int DEFAULT_MULTIPLIER = 1;
+
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(
+ LoggerFactory.getLogger(FilesProcessingConfig.class));
+ private final EnvironmentReader environmentReader;
/**
* Creates a FilesProcessingConfig
@@ -61,6 +66,26 @@ public class FilesProcessingConfig {
}
}
+ /**
+ * Provides reactor parallel threads count from environment variable.
+ *
+ * @throws EnvironmentConfigException
+ * @returns value of threads count
+ */
+ public int getThreadsCount() throws EnvironmentConfigException {
+ logger.unwrap().info("Attempt to read threads configuration");
+ int processingThreadsCount = getProcessingThreadsCount();
+ int threadsMultiplier = getThreadsMultiplier();
+ int processingThreadsAmount = processingThreadsCount * threadsMultiplier;
+
+ logger.unwrap().info(
+ "Processing threads configuration: Processing threads count - {}, Processing threads multiplier - {} ",
+ processingThreadsCount, threadsMultiplier);
+ logger.unwrap().info("Amount of files processing threads: {} ", processingThreadsAmount);
+
+ return processingThreadsAmount;
+ }
+
private Integer parseIntegerValue(String val) throws NumberFormatException {
Integer value = Integer.valueOf(val);
logger.unwrap().info(ENV_LIMIT_RATE + " value is: " + value);
@@ -71,4 +96,39 @@ public class FilesProcessingConfig {
logger.unwrap().info(ENV_LIMIT_RATE + " env not present. Setting limit rate to default value: " + DEFAULT_LIMIT_RATE);
return DEFAULT_LIMIT_RATE;
}
+
+ private int getThreadsMultiplier() throws EnvironmentConfigException {
+ try {
+ return Optional.ofNullable(environmentReader.getVariable(ENV_THREADS_MULTIPLIER))
+ .map(Integer::valueOf)
+ .orElseGet(this::getDefaultMultiplier);
+ } catch (NumberFormatException exception) {
+ throw new EnvironmentConfigException(
+ ENV_THREADS_MULTIPLIER + " environment variable has incorrect value.\n", exception);
+ }
+ }
+
+ private int getDefaultMultiplier() {
+ logger.unwrap().info(ENV_THREADS_MULTIPLIER +
+ " env not present. Setting multiplier to default value: " + DEFAULT_MULTIPLIER);
+ return DEFAULT_MULTIPLIER;
+ }
+
+ private int getProcessingThreadsCount() throws EnvironmentConfigException {
+ try {
+ return Optional.ofNullable(environmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT))
+ .map(Integer::valueOf)
+ .orElseGet(this::getDefaultThreadsCount);
+ } catch (NumberFormatException exception) {
+ throw new EnvironmentConfigException(
+ ENV_PROCESSING_THREADS_COUNT + " environment variable has incorrect value.\n", exception);
+ }
+ }
+
+ private int getDefaultThreadsCount() {
+ int defaultThreadsCount = Runtime.getRuntime().availableProcessors();
+ logger.unwrap().info(ENV_PROCESSING_THREADS_COUNT +
+ " env not present. Setting threads count to available cores: " + defaultThreadsCount);
+ return defaultThreadsCount;
+ }
}