summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/RestConfCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/RestConfCollector.java')
-rw-r--r--src/main/java/org/onap/dcae/RestConfCollector.java189
1 files changed, 189 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java
new file mode 100644
index 0000000..1812d23
--- /dev/null
+++ b/src/main/java/org/onap/dcae/RestConfCollector.java
@@ -0,0 +1,189 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae;
+
+import io.vavr.collection.Map;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.common.EventData;
+import org.onap.dcae.common.EventProcessor;
+import org.onap.dcae.common.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.common.publishing.EventPublisher;
+import org.onap.dcae.common.publishing.PublisherConfig;
+import org.onap.dcae.controller.AccessController;
+import org.onap.dcae.controller.ConfigLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
+import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Lazy;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.concurrent.*;
+
+@SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
+public class RestConfCollector {
+ private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output");
+ private static final int MAX_THREADS = 20;
+ private static Logger log = LoggerFactory.getLogger(RestConfCollector.class);
+ public static LinkedBlockingQueue<EventData> fProcessingInputQueue;
+ private static ApplicationSettings properties;
+ private static ConfigurableApplicationContext context;
+ private static ConfigLoader configLoader;
+ private static SpringApplication app;
+ private static ScheduledFuture<?> scheduleFeatures;
+ private static ExecutorService executor;
+ private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+ private static EventPublisher eventPublisher;
+ private static EventProcessor eventProcessor;
+
+ /* List of Controllers */
+ private static java.util.Map<String, AccessController> controllerStore = new ConcurrentHashMap<>();
+
+
+ public static void main(String[] args) {
+ oplog.info("RestconfController starting");
+ app = new SpringApplication(RestConfCollector.class);
+ properties = new ApplicationSettings(args, CLIUtils::processCmdLine);
+ scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+ init();
+ app.setAddCommandLineProperties(true);
+ context = app.run();
+ configLoader.updateConfig();
+ controllerConfig(properties);
+ oplog.info("RestConfController running .....");
+ }
+
+
+ public static void restartApplication() {
+ Thread thread = new Thread(() -> {
+ controllerConfigCleanup();
+ context.close();
+ properties.reloadProperties();
+ scheduleFeatures.cancel(true);
+ init();
+ controllerConfig(properties);
+ context = SpringApplication.run(RestConfCollector.class);
+ });
+ thread.setDaemon(false);
+ thread.start();
+ }
+
+
+ private static void init() {
+ fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents());
+ createConfigLoader();
+ createSchedulePoolExecutor();
+ createExecutors();
+ }
+
+ private static Map<String, PublisherConfig> getDmapConfig() {
+ return DMaaPConfigurationParser.
+ parseToDomainMapping(Paths.get(properties.dMaaPConfigurationFileLocation())).get();
+ }
+
+ @Bean
+ @Lazy
+ public ApplicationSettings applicationSettings() {
+ return properties;
+ }
+
+ @Bean
+ public LinkedBlockingQueue<EventData> inputQueue() {
+ return fProcessingInputQueue;
+ }
+
+ public static java.util.Map<String, String[]> parseStreamIdToStreamHashMapping(String streamId) {
+
+ java.util.Map<String, String[]> streamidHash = new HashMap<>();
+ String[] list = streamId.split("\\|");
+ for (String aList : list) {
+ String domain = aList.split("=")[0];
+ String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
+ streamidHash.put(domain, streamIdList);
+ oplog.info("adding domain " + domain + " count" + streamIdList.length);
+ }
+ return streamidHash;
+ }
+
+ private static void controllerConfig(ApplicationSettings properties) {
+ oplog.info("Policy received " + properties.rcc_policy());
+ if (!properties.rcc_policy().equals("")) {
+ JSONArray contollers = new JSONArray(properties.rcc_policy());
+ for (int i = 0; i < contollers.length(); i++) {
+ JSONObject controller = contollers.getJSONObject(i);
+ oplog.info(" object " + controller.toString());
+ AccessController acClr = new AccessController(controller,
+ properties);
+ controllerStore.put(controller.get("controller_name").toString(), acClr);
+ oplog.info("Activating controller " + acClr.getCfgInfo().getController_name());
+ acClr.activate();
+ }
+ }
+ }
+
+ private static void controllerConfigCleanup() {
+ controllerStore.clear();
+ }
+
+ public static void handleEvents(EventData ev) throws Exception {
+ if (!fProcessingInputQueue.offer(ev)) {
+ throw new Exception();
+ }
+ log.info("RestConfCollector.handleEvents:EVENTS has been published successfully!");
+ }
+
+ private static void createConfigLoader() {
+ log.info("dMaaPConfigurationFileLocation " + properties.dMaaPConfigurationFileLocation() + " " + properties.configurationFileLocation());
+
+ configLoader = ConfigLoader.create(getEventPublisher()::reconfigure,
+ Paths.get(properties.dMaaPConfigurationFileLocation()),
+ properties.configurationFileLocation());
+ }
+
+ private static EventPublisher getEventPublisher() {
+ return EventPublisher.createPublisher(oplog, getDmapConfig());
+ }
+
+ private static void createSchedulePoolExecutor() {
+ scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig,
+ 10,
+ 10,
+ TimeUnit.MINUTES);
+ }
+
+ private static void createExecutors() {
+ eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig());
+ eventProcessor = new EventProcessor(eventPublisher,
+ parseStreamIdToStreamHashMapping(properties.dMaaPStreamsMapping()));
+
+ executor = Executors.newFixedThreadPool(MAX_THREADS);
+ for (int i = 0; i < MAX_THREADS; ++i) {
+ executor.execute(eventProcessor);
+ }
+ }
+}