diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/RestConfCollector.java')
-rw-r--r-- | src/main/java/org/onap/dcae/RestConfCollector.java | 113 |
1 files changed, 60 insertions, 53 deletions
diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java index 545bfd4..ff749f3 100644 --- a/src/main/java/org/onap/dcae/RestConfCollector.java +++ b/src/main/java/org/onap/dcae/RestConfCollector.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,11 @@ import org.json.JSONArray; import org.json.JSONObject; import org.onap.dcae.common.ControllerActivationState; import org.onap.dcae.common.EventData; -import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; +import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.onap.dcae.common.publishing.EventPublisher; import org.onap.dcae.common.publishing.PublisherConfig; import org.onap.dcae.controller.AccessController; -import org.onap.dcae.controller.ConfigLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; @@ -39,61 +38,98 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.context.ConfigurableApplicationContext; +import org.onap.dcae.configuration.ConfigurationHandler; +import org.onap.dcae.configuration.ConfigUpdater; +import org.onap.dcae.configuration.ConfigUpdaterFactory; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; -import java.util.concurrent.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class RestConfCollector { private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); - private static final int MAX_THREADS = 20; + private static final int DEFAULT_CONFIGURATION_FETCH_PERIOD = 5; private static Logger log = LoggerFactory.getLogger(RestConfCollector.class); public static LinkedBlockingQueue<EventData> fProcessingInputQueue; private static ApplicationSettings properties; private static ConfigurableApplicationContext context; - private static ConfigLoader configLoader; private static SpringApplication app; - private static ScheduledFuture<?> scheduleFeatures; - private static ScheduledFuture<?> scheduleCtrlActivation; - private static ExecutorService executor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static ScheduledThreadPoolExecutor scheduledThExController; private static EventPublisher eventPublisher; - private static EventProcessor eventProcessor; + private static ConfigUpdater configUpdater; + private static ApplicationConfigurationListener applicationConfigurationListener; + private static ReentrantLock applicationLock = new ReentrantLock(); /* List of Controllers */ private static java.util.Map<String, AccessController> controllerStore = new ConcurrentHashMap<>(); public static void main(String[] args) { + applicationLock.lock(); + try { + startApplication(args); + startListeningForApplicationConfigurationStoredInConsul(); + } finally { + applicationLock.unlock(); + } + } + + private static void startApplication(String[] args) { oplog.info("RestconfController starting"); app = new SpringApplication(RestConfCollector.class); properties = new ApplicationSettings(args, CLIUtils::processCmdLine); + configUpdater = ConfigUpdaterFactory.create( + properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); scheduledThExController = new ScheduledThreadPoolExecutor(1); init(); + applicationConfigurationListener = startListeningForApplicationConfigurationStoredInConsul(); app.setAddCommandLineProperties(true); context = app.run(); controllerConfig(properties); - configLoader.updateConfig(); oplog.info("RestConfController running ....."); } + private static ApplicationConfigurationListener startListeningForApplicationConfigurationStoredInConsul() { + ConfigurationHandler cbsHandler = new ConfigurationHandler(new CbsClientConfigurationProvider(), configUpdater); + ApplicationConfigurationListener applicationConfigProvider = new ApplicationConfigurationListener(Duration.ofMinutes(DEFAULT_CONFIGURATION_FETCH_PERIOD), cbsHandler); + + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + scheduledThreadPoolExecutor.execute(applicationConfigProvider); + + return applicationConfigProvider; + } public static void restartApplication() { Thread thread = new Thread(() -> { - controllerConfigCleanup(); - context.close(); - properties.reloadProperties(); - scheduleFeatures.cancel(true); - scheduleCtrlActivation.cancel(true); - init(); - controllerConfig(properties); - context = SpringApplication.run(RestConfCollector.class); + try { + applicationLock.lock(); + controllerConfigCleanup(); + if (context != null) { + context.close(); + } + properties.reloadProperties(); + init(); + controllerConfig(properties); + context = SpringApplication.run(RestConfCollector.class); + + configUpdater.setPaths(properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); + applicationConfigurationListener.reload(Duration.ofMinutes(properties.configurationUpdateFrequency())); + } finally { + applicationLock.unlock(); + } }); thread.setDaemon(false); thread.start(); @@ -102,9 +138,11 @@ public class RestConfCollector { private static void init() { fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); - createConfigLoader(); - createSchedulePoolExecutor(); - createExecutors(); + + configUpdater = ConfigUpdaterFactory.create( + properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); + eventPublisher = new DMaaPEventPublisher(getDmapConfig()); } private static Map<String, PublisherConfig> getDmapConfig() { @@ -170,41 +208,10 @@ public class RestConfCollector { log.info("RestConfCollector.handleEvents:EVENTS has been published successfully!"); } - private static void createConfigLoader() { - log.info("dMaaPConfigurationFileLocation " + properties.dMaaPConfigurationFileLocation() + " " + properties.configurationFileLocation()); - - configLoader = ConfigLoader.create(getEventPublisher()::reconfigure, - Paths.get(properties.dMaaPConfigurationFileLocation()), - properties.configurationFileLocation()); - } - private static EventPublisher getEventPublisher() { return EventPublisher.createPublisher(oplog, getDmapConfig()); } - private static void createSchedulePoolExecutor() { - scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig, - 10, - 10, - TimeUnit.MINUTES); - ControllerActivationTask task = new ControllerActivationTask(); - scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task, - 10, - 10, - TimeUnit.SECONDS); - } - - private static void createExecutors() { - eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); - eventProcessor = new EventProcessor(eventPublisher, - parseStreamIdToStreamHashMapping(properties.dMaaPStreamsMapping())); - - executor = Executors.newFixedThreadPool(MAX_THREADS); - for (int i = 0; i < MAX_THREADS; ++i) { - executor.execute(eventProcessor); - } - } - private static class ControllerActivationTask implements Runnable { public ControllerActivationTask() { |