summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/RestConfCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/RestConfCollector.java')
-rw-r--r--src/main/java/org/onap/dcae/RestConfCollector.java113
1 files changed, 60 insertions, 53 deletions
diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java
index 545bfd4..ff749f3 100644
--- a/src/main/java/org/onap/dcae/RestConfCollector.java
+++ b/src/main/java/org/onap/dcae/RestConfCollector.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,12 +26,11 @@ import org.json.JSONArray;
import org.json.JSONObject;
import org.onap.dcae.common.ControllerActivationState;
import org.onap.dcae.common.EventData;
-import org.onap.dcae.common.EventProcessor;
import org.onap.dcae.common.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.common.publishing.DMaaPEventPublisher;
import org.onap.dcae.common.publishing.EventPublisher;
import org.onap.dcae.common.publishing.PublisherConfig;
import org.onap.dcae.controller.AccessController;
-import org.onap.dcae.controller.ConfigLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -39,61 +38,98 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
+import org.onap.dcae.configuration.ConfigurationHandler;
+import org.onap.dcae.configuration.ConfigUpdater;
+import org.onap.dcae.configuration.ConfigUpdaterFactory;
+import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
@SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
public class RestConfCollector {
private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output");
- private static final int MAX_THREADS = 20;
+ private static final int DEFAULT_CONFIGURATION_FETCH_PERIOD = 5;
private static Logger log = LoggerFactory.getLogger(RestConfCollector.class);
public static LinkedBlockingQueue<EventData> fProcessingInputQueue;
private static ApplicationSettings properties;
private static ConfigurableApplicationContext context;
- private static ConfigLoader configLoader;
private static SpringApplication app;
- private static ScheduledFuture<?> scheduleFeatures;
- private static ScheduledFuture<?> scheduleCtrlActivation;
- private static ExecutorService executor;
private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private static ScheduledThreadPoolExecutor scheduledThExController;
private static EventPublisher eventPublisher;
- private static EventProcessor eventProcessor;
+ private static ConfigUpdater configUpdater;
+ private static ApplicationConfigurationListener applicationConfigurationListener;
+ private static ReentrantLock applicationLock = new ReentrantLock();
/* List of Controllers */
private static java.util.Map<String, AccessController> controllerStore = new ConcurrentHashMap<>();
public static void main(String[] args) {
+ applicationLock.lock();
+ try {
+ startApplication(args);
+ startListeningForApplicationConfigurationStoredInConsul();
+ } finally {
+ applicationLock.unlock();
+ }
+ }
+
+ private static void startApplication(String[] args) {
oplog.info("RestconfController starting");
app = new SpringApplication(RestConfCollector.class);
properties = new ApplicationSettings(args, CLIUtils::processCmdLine);
+ configUpdater = ConfigUpdaterFactory.create(
+ properties.configurationFileLocation(),
+ Paths.get(properties.dMaaPConfigurationFileLocation()));
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThExController = new ScheduledThreadPoolExecutor(1);
init();
+ applicationConfigurationListener = startListeningForApplicationConfigurationStoredInConsul();
app.setAddCommandLineProperties(true);
context = app.run();
controllerConfig(properties);
- configLoader.updateConfig();
oplog.info("RestConfController running .....");
}
+ private static ApplicationConfigurationListener startListeningForApplicationConfigurationStoredInConsul() {
+ ConfigurationHandler cbsHandler = new ConfigurationHandler(new CbsClientConfigurationProvider(), configUpdater);
+ ApplicationConfigurationListener applicationConfigProvider = new ApplicationConfigurationListener(Duration.ofMinutes(DEFAULT_CONFIGURATION_FETCH_PERIOD), cbsHandler);
+
+ ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+ scheduledThreadPoolExecutor.execute(applicationConfigProvider);
+
+ return applicationConfigProvider;
+ }
public static void restartApplication() {
Thread thread = new Thread(() -> {
- controllerConfigCleanup();
- context.close();
- properties.reloadProperties();
- scheduleFeatures.cancel(true);
- scheduleCtrlActivation.cancel(true);
- init();
- controllerConfig(properties);
- context = SpringApplication.run(RestConfCollector.class);
+ try {
+ applicationLock.lock();
+ controllerConfigCleanup();
+ if (context != null) {
+ context.close();
+ }
+ properties.reloadProperties();
+ init();
+ controllerConfig(properties);
+ context = SpringApplication.run(RestConfCollector.class);
+
+ configUpdater.setPaths(properties.configurationFileLocation(),
+ Paths.get(properties.dMaaPConfigurationFileLocation()));
+ applicationConfigurationListener.reload(Duration.ofMinutes(properties.configurationUpdateFrequency()));
+ } finally {
+ applicationLock.unlock();
+ }
});
thread.setDaemon(false);
thread.start();
@@ -102,9 +138,11 @@ public class RestConfCollector {
private static void init() {
fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents());
- createConfigLoader();
- createSchedulePoolExecutor();
- createExecutors();
+
+ configUpdater = ConfigUpdaterFactory.create(
+ properties.configurationFileLocation(),
+ Paths.get(properties.dMaaPConfigurationFileLocation()));
+ eventPublisher = new DMaaPEventPublisher(getDmapConfig());
}
private static Map<String, PublisherConfig> getDmapConfig() {
@@ -170,41 +208,10 @@ public class RestConfCollector {
log.info("RestConfCollector.handleEvents:EVENTS has been published successfully!");
}
- private static void createConfigLoader() {
- log.info("dMaaPConfigurationFileLocation " + properties.dMaaPConfigurationFileLocation() + " " + properties.configurationFileLocation());
-
- configLoader = ConfigLoader.create(getEventPublisher()::reconfigure,
- Paths.get(properties.dMaaPConfigurationFileLocation()),
- properties.configurationFileLocation());
- }
-
private static EventPublisher getEventPublisher() {
return EventPublisher.createPublisher(oplog, getDmapConfig());
}
- private static void createSchedulePoolExecutor() {
- scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig,
- 10,
- 10,
- TimeUnit.MINUTES);
- ControllerActivationTask task = new ControllerActivationTask();
- scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task,
- 10,
- 10,
- TimeUnit.SECONDS);
- }
-
- private static void createExecutors() {
- eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig());
- eventProcessor = new EventProcessor(eventPublisher,
- parseStreamIdToStreamHashMapping(properties.dMaaPStreamsMapping()));
-
- executor = Executors.newFixedThreadPool(MAX_THREADS);
- for (int i = 0; i < MAX_THREADS; ++i) {
- executor.execute(eventProcessor);
- }
- }
-
private static class ControllerActivationTask implements Runnable
{
public ControllerActivationTask() {