summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/controller
diff options
context:
space:
mode:
authors00370346 <swarup.nayak1@huawei.com>2019-03-14 15:06:40 +0530
committers00370346 <swarup.nayak1@huawei.com>2019-03-18 15:25:23 +0530
commitbe11dee889f5a740d584458b62804e5fd4296e53 (patch)
tree2cc5e14c29df1a40f8e7bbe3bb08fffa38e023b8 /src/main/java/org/onap/dcae/controller
parentd76c2d0f61bfb4373b13fcdb6fc3317467dd19b4 (diff)
Issue-ID: DCAEGEN2-1055 Generic RestConfCollector
Change-Id: I1800affa2b34cbb7487c0d8411e078adec5a0c48 Signed-off-by: s00370346 <swarup.nayak1@huawei.com>
Diffstat (limited to 'src/main/java/org/onap/dcae/controller')
-rw-r--r--src/main/java/org/onap/dcae/controller/AccessController.java243
-rw-r--r--src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java131
-rw-r--r--src/main/java/org/onap/dcae/controller/ConfigLoader.java147
-rw-r--r--src/main/java/org/onap/dcae/controller/ConfigParsing.java59
-rw-r--r--src/main/java/org/onap/dcae/controller/ConfigSource.java90
-rw-r--r--src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java88
-rw-r--r--src/main/java/org/onap/dcae/controller/Conversions.java54
-rw-r--r--src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java95
-rw-r--r--src/main/java/org/onap/dcae/controller/EnvProps.java74
-rw-r--r--src/main/java/org/onap/dcae/controller/PersistentEventConnection.java207
10 files changed, 1188 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/controller/AccessController.java b/src/main/java/org/onap/dcae/controller/AccessController.java
new file mode 100644
index 0000000..83673b8
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/AccessController.java
@@ -0,0 +1,243 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationException;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.common.Constants;
+import org.onap.dcae.common.RestConfContext;
+import org.onap.dcae.common.RestapiCallNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.nio.file.Files.readAllBytes;
+import static org.onap.dcae.common.RestapiCallNodeUtil.getUriMethod;
+
+public class AccessController {
+ private static final Logger log = LoggerFactory.getLogger(AccessController.class);
+ /* Collector properties */
+ private ApplicationSettings properties;
+
+ /* Controller specific information */
+ private ControllerConfigInfo cfgInfo;
+ private RestConfContext ctx;
+ RestapiCallNode restApiCallNode;
+
+ /* Maps of Events */
+ private Map<String, PersistentEventConnection> eventList = new ConcurrentHashMap<>();
+ private ExecutorService executor = Executors.newCachedThreadPool();
+ private Map<String, String> paraMap;
+
+ public AccessController(JSONObject controller,
+ ApplicationSettings properties) {
+ this.cfgInfo = new ControllerConfigInfo(controller.get("controller_name").toString(),
+ controller.get("controller_restapiUrl").toString(),
+ controller.get("controller_restapiUser").toString(),
+ controller.get("controller_restapiPassword").toString(),
+ controller.get("controller_accessTokenUrl").toString(),
+ controller.get("controller_accessTokenFile").toString(),
+ controller.get("controller_subscriptionUrl").toString(),
+ controller.get("controller_accessTokenMethod").toString(),
+ controller.get("controller_subsMethod").toString());
+ this.properties = properties;
+ this.ctx = new RestConfContext();
+ this.restApiCallNode = new RestapiCallNode();
+ this.paraMap = new HashMap<>();
+
+ prepareControllerParamMap();
+
+ log.info("AccesController Created {} {} {} {} {} {}",
+ this.cfgInfo.getController_name(),
+ this.cfgInfo.getController_restapiUrl(),
+ this.cfgInfo.getController_restapiPassword(),
+ this.cfgInfo.getController_restapiUser(),
+ this.cfgInfo.getController_accessTokenUrl(),
+ this.cfgInfo.getController_accessTokenFile());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof AccessController)) return false;
+ AccessController that = (AccessController) o;
+ return that.cfgInfo.getController_name().equals(that.cfgInfo.getController_name());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.cfgInfo.getController_name());
+ }
+
+ public RestapiCallNode getRestApiCallNode() {
+ return restApiCallNode;
+ }
+
+ private void fetch_TokenId() {
+
+
+ modifyControllerParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(this.properties.authorizationEnabled()) + cfgInfo.getController_restapiUrl() + cfgInfo.getController_accessTokenUrl());
+ modifyControllerParamMap(Constants.KDEFAULT_TEMP_FILENAME, cfgInfo.getController_accessTokenFile());
+ modifyControllerParamMap(Constants.KSETTING_REST_UNAME, cfgInfo.getController_restapiUser());
+ modifyControllerParamMap(Constants.KSETTING_REST_PASSWORD, cfgInfo.getController_restapiPassword());
+ modifyControllerParamMap(Constants.KSETTING_HTTP_METHOD, cfgInfo.getController_accessTokenMethod());
+
+ String httpResponse = null;
+ try {
+
+ restApiCallNode.sendRequest(this.paraMap, ctx, null);
+ String key = getControllerParamMapValue(Constants.KSETTING_RESP_PREFIX).concat(".").concat("httpResponse");
+ httpResponse = ctx.getAttribute(key);
+ log.info("httpResponse ", httpResponse + " key " + key);
+ JSONObject jsonObj = new JSONObject(httpResponse);
+ log.info("jsonObj ", jsonObj.toString());
+ //JSONObject data = jsonObj.getJSONObject("data");
+ //String tokenId = data.get("accessSession").toString();
+ //@TODO: Make return field dynamic
+ String tokenId = jsonObj.get("accessSession").toString();
+ log.info("token 1" + tokenId);
+ modifyControllerParamMap(Constants.KSETTING_TOKENID, tokenId);
+ modifyControllerParamMap(Constants.KSETTING_CUSTOMHTTP_HEADER, "X-ACCESS-TOKEN=" + tokenId);
+ } catch (Exception e) {
+ log.info("Access token is not supported" + e.getMessage());
+ log.info("http response " + httpResponse);
+ }
+ }
+
+ public void activate() {
+ fetch_TokenId();
+ printControllerParamMap();
+ /* Create eventlist from properties */
+ JSONArray contollers = new JSONArray(properties.rcc_policy());
+ for (int i = 0; i < contollers.length(); i++) {
+ JSONObject controller = contollers.getJSONObject(i);
+ if (controller.get("controller_name").equals(this.getCfgInfo().getController_name())) {
+ JSONArray eventlists = controller.getJSONArray("event_details");
+ for (int j = 0; j < eventlists.length(); j++) {
+ JSONObject event = eventlists.getJSONObject(j);
+ String name = event.get("event_name").toString();
+ PersistentEventConnection conn = new PersistentEventConnection(name,
+ event.get("event_description").toString(),
+ Boolean.parseBoolean(event.get("event_sseventUrlEmbed").toString()),
+ event.get("event_sseventsField").toString(),
+ event.get("event_sseventsUrl").toString(),
+ event.get("event_subscriptionTemplate").toString(),
+ event.get("event_unSubscriptionTemplate").toString(),
+ event.get("event_ruleId").toString(),
+ this);
+
+ eventList.put(name, conn);
+ executor.execute(conn);
+ }
+ }
+ }
+ }
+
+ public RestConfContext getCtx() {
+ return ctx;
+ }
+
+ public ApplicationSettings getProperties() {
+ return properties;
+ }
+
+ public ControllerConfigInfo getCfgInfo() {
+ return cfgInfo;
+ }
+
+ public Map<String, String> getParaMap() {
+ return this.paraMap;
+ }
+
+ private void prepareControllerParamMap() {
+ /* Adding the fields in ParaMap */
+ paraMap.put(Constants.KDEFAULT_TEMP_FILENAME, null);
+ paraMap.put(Constants.KSETTING_REST_API_URL, null);
+ paraMap.put(Constants.KSETTING_HTTP_METHOD, "post");
+ paraMap.put(Constants.KSETTING_RESP_PREFIX, "responsePrefix");
+ paraMap.put(Constants.KSETTING_SKIP_SENDING, "false");
+ paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, null);
+ paraMap.put(Constants.KSETTING_FORMAT, "json");
+
+ paraMap.put(Constants.KSETTING_REST_UNAME, null);
+ paraMap.put(Constants.KSETTING_REST_PASSWORD, null);
+ paraMap.put(Constants.KDEFAULT_REQUESTBODY, null);
+
+ paraMap.put(Constants.KSETTING_AUTH_TYPE, "unspecified");
+ paraMap.put(Constants.KSETTING_CONTENT_TYPE, "application/json");
+ paraMap.put(Constants.KSETTING_OAUTH_CONSUMER_KEY, null);
+ paraMap.put(Constants.KSETTING_OAUTH_CONSUMER_SECRET, null);
+ paraMap.put(Constants.KSETTING_OAUTH_SIGNATURE_METHOD, null);
+ paraMap.put(Constants.KSETTING_OAUTH_VERSION, null);
+
+ paraMap.put(Constants.KSETTING_CUSTOMHTTP_HEADER, null);
+ paraMap.put(Constants.KSETTING_TOKENID, null);
+ paraMap.put(Constants.KSETTING_DUMP_HEADER, "false");
+ paraMap.put(Constants.KSETTING_RETURN_REQUEST_PAYLOAD, "false");
+
+ paraMap.put(Constants.KSETTING_TRUST_STORE_FILENAME, this.getProperties().truststoreFileLocation());
+ String trustPassword = getKeyStorePassword(toAbsolutePath(this.getProperties().truststorePasswordFileLocation()));
+ paraMap.put(Constants.KSETTING_TRUST_STORE_PASSWORD, trustPassword);
+ paraMap.put(Constants.KSETTING_KEY_STORE_FILENAME, this.getProperties().keystoreFileLocation());
+ String KeyPassword = getKeyStorePassword(toAbsolutePath(this.getProperties().keystorePasswordFileLocation()));
+ paraMap.put(Constants.KSETTING_KEY_STORE_PASSWORD, KeyPassword);
+
+ }
+
+ private Path toAbsolutePath(final String path) {
+ return Paths.get(path).toAbsolutePath();
+ }
+
+ private String getKeyStorePassword(final Path location) {
+ try {
+ return new String(readAllBytes(location));
+ } catch (Exception e) {
+ log.error("Could not read password from: '" + location + "'.", e);
+ throw new ApplicationException(e);
+ }
+ }
+
+ public void modifyControllerParamMap(String fieldName, String value) {
+ paraMap.put(fieldName, value);
+ }
+
+ public String getControllerParamMapValue(String fieldName) {
+ return paraMap.get(fieldName);
+ }
+
+ public void printControllerParamMap() {
+ log.info("----------------Controller Param Map-------------------");
+ for (String name : paraMap.keySet()) {
+ String value = paraMap.get(name);
+ log.info(name + " : " + value);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java b/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java
new file mode 100644
index 0000000..4542fb5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java
@@ -0,0 +1,131 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import io.vavr.CheckedRunnable;
+import io.vavr.Tuple2;
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.common.publishing.VavrUtils.*;
+import static org.onap.dcae.controller.Conversions.toList;
+
+class ConfigFilesFacade {
+
+ private static Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class);
+
+ private final Path dMaaPConfigPath;
+ private final Path propertiesPath;
+
+ public ConfigFilesFacade(Path dMaaPConfigPath, Path propertiesPath) {
+ this.dMaaPConfigPath = dMaaPConfigPath;
+ this.propertiesPath = propertiesPath;
+ }
+
+ Try<Map<String, String>> readCollectorProperties() {
+ log.info(f("Reading collector properties from path: '%s'", propertiesPath));
+ return Try(this::readProperties)
+ .map(prop -> toList(prop.getKeys()).toMap(k -> k, k -> (String) prop.getProperty(k)))
+ .mapFailure(enhanceError("Unable to read properties configuration from path '%s'", propertiesPath))
+ .onFailure(logError(log))
+ .peek(props -> log.info(f("Read following collector properties: '%s'", props)));
+ }
+
+ Try<JSONObject> readDMaaPConfiguration() {
+ log.info(f("Reading DMaaP configuration from file: '%s'", dMaaPConfigPath));
+ return readFile(dMaaPConfigPath)
+ .recover(FileNotFoundException.class, __ -> "{}")
+ .mapFailure(enhanceError("Unable to read DMaaP configuration from file '%s'", dMaaPConfigPath))
+ .flatMap(Conversions::toJson)
+ .onFailure(logError(log))
+ .peek(props -> log.info(f("Read following DMaaP properties: '%s'", props)));
+ }
+
+ Try<Void> writeDMaaPConfiguration(JSONObject dMaaPConfiguration) {
+ log.info(f("Writing DMaaP configuration '%s' into file '%s'", dMaaPConfiguration, dMaaPConfigPath));
+ return writeFile(dMaaPConfigPath, indentConfiguration(dMaaPConfiguration.toString()))
+ .mapFailure(enhanceError("Could not save new DMaaP configuration to path '%s'", dMaaPConfigPath))
+ .onFailure(logError(log))
+ .peek(__ -> log.info("Written successfully"));
+ }
+
+
+ Try<Void> writeProperties(Map<String, String> properties) {
+ log.info(f("Writing properties configuration '%s' into file '%s'", properties, propertiesPath));
+ return Try.run(saveProperties(properties))
+ .mapFailure(enhanceError("Could not save properties to path '%s'", properties))
+ .onFailure(logError(log))
+ .peek(__ -> log.info("Written successfully"));
+ }
+
+ private Try<String> readFile(Path path) {
+ return Try(() -> new String(Files.readAllBytes(path), StandardCharsets.UTF_8))
+ .mapFailure(enhanceError("Could not read content from path: '%s'", path));
+ }
+
+ private Try<Void> writeFile(Path path, String content) {
+ return Try.run(() -> Files.write(path, content.getBytes()))
+ .mapFailure(enhanceError("Could not write content to path: '%s'", path));
+ }
+
+ private PropertiesConfiguration readProperties() throws ConfigurationException {
+ PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration();
+ propertiesConfiguration.setDelimiterParsingDisabled(true);
+ propertiesConfiguration.load(propertiesPath.toFile());
+ return propertiesConfiguration;
+ }
+
+ private CheckedRunnable saveProperties(Map<String, String> properties) {
+ return () -> {
+ PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(propertiesPath.toFile());
+ propertiesConfiguration.setEncoding(null);
+ for (Tuple2<String, String> property : properties) {
+ updateProperty(propertiesConfiguration, property);
+ }
+ propertiesConfiguration.save();
+ };
+ }
+
+ private void updateProperty(PropertiesConfiguration propertiesConfiguration, Tuple2<String, String> property) {
+ if (propertiesConfiguration.containsKey(property._1)) {
+ propertiesConfiguration.setProperty(property._1, property._2);
+ } else {
+ propertiesConfiguration.addProperty(property._1, property._2);
+ }
+ }
+
+ private String indentConfiguration(String configuration) {
+ return new JSONObject(configuration).toString(4);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/ConfigLoader.java b/src/main/java/org/onap/dcae/controller/ConfigLoader.java
new file mode 100644
index 0000000..1b7e60b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/ConfigLoader.java
@@ -0,0 +1,147 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import io.vavr.Function0;
+import io.vavr.Function1;
+import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import org.json.JSONObject;
+import org.onap.dcae.RestConfCollector;
+import org.onap.dcae.common.publishing.PublisherConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.function.Consumer;
+
+import static org.onap.dcae.common.publishing.DMaaPConfigurationParser.parseToDomainMapping;
+import static org.onap.dcae.controller.ConfigParsing.getDMaaPConfig;
+import static org.onap.dcae.controller.ConfigParsing.getProperties;
+import static org.onap.dcae.controller.EnvPropertiesReader.readEnvProps;
+
+public class ConfigLoader {
+
+ private static final String SKIP_MSG = "Skipping dynamic configuration update";
+ private static Logger log = LoggerFactory.getLogger(ConfigLoader.class);
+ private final Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer;
+ private final ConfigFilesFacade configFilesFacade;
+ private final Function1<EnvProps, Try<JSONObject>> configurationSource;
+ private final Function0<Map<String, String>> envVariablesSupplier;
+ private boolean toRestart = false;
+
+ ConfigLoader(Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer,
+ ConfigFilesFacade configFilesFacade,
+ Function1<EnvProps, Try<JSONObject>> configurationSource,
+ Function0<Map<String, String>> envVariablesSupplier) {
+ this.eventPublisherReconfigurer = eventPublisherReconfigurer;
+ this.configFilesFacade = configFilesFacade;
+ this.configurationSource = configurationSource;
+ this.envVariablesSupplier = envVariablesSupplier;
+ }
+
+ public static ConfigLoader create(Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer,
+ Path dMaaPConfigFile, Path propertiesConfigFile) {
+ log.info("ConfigLoader create ....");
+ return new ConfigLoader(eventPublisherReconfigurer,
+ new ConfigFilesFacade(dMaaPConfigFile, propertiesConfigFile),
+ ConfigSource::getAppConfig,
+ () -> HashMap.ofAll(System.getenv()));
+ }
+
+ public void updateConfig() {
+ log.info("Trying to dynamically update config from Config Binding Service");
+ readEnvProps(envVariablesSupplier.get())
+ .onEmpty(() -> log.warn(SKIP_MSG))
+ .forEach(this::updateConfig);
+ }
+
+ private void updateConfig(EnvProps props) {
+ configurationSource.apply(props)
+ .onFailure(logSkip())
+ .onSuccess(newConf -> {
+ updateConfigurationProperties(newConf);
+ updateDMaaPProperties(newConf);
+ reloadApplication();
+ }
+ );
+ }
+
+ private void reloadApplication() {
+ if (toRestart) {
+ log.info("New app config - Application will be restarted");
+ RestConfCollector.restartApplication();
+ }
+ }
+
+ private void updateDMaaPProperties(JSONObject newConf) {
+ configFilesFacade.readDMaaPConfiguration()
+ .onFailure(logSkip())
+ .onSuccess(oldDMaaPConf -> getDMaaPConfig(newConf)
+ .onEmpty(() -> log.warn(SKIP_MSG))
+ .forEach(newDMaaPConf -> compareAndOverwriteDMaaPConfig(oldDMaaPConf, newDMaaPConf)));
+ }
+
+
+ private void updateConfigurationProperties(JSONObject newConf) {
+ configFilesFacade.readCollectorProperties()
+ .onFailure(logSkip())
+ .onSuccess(oldProps -> compareAndOverwritePropertiesConfig(newConf, oldProps));
+ }
+
+ private void compareAndOverwritePropertiesConfig(JSONObject newConf, Map<String, String> oldProps) {
+ Map<String, String> newProperties = getProperties(newConf);
+ Map<String, String> result = oldProps.filterKeys((s) -> newProperties.keySet().contains(s));
+ if (!result.equals(newProperties)) {
+ configFilesFacade.writeProperties(newProperties)
+ .onSuccess(__ -> {
+ toRestart = true;
+ log.info("New properties configuration written to file");
+ })
+ .onFailure(logSkip());
+ } else {
+ log.info("Collector properties from CBS are the same as currently used ones. " + SKIP_MSG);
+ }
+ }
+
+ private void compareAndOverwriteDMaaPConfig(JSONObject oldDMaaPConf, JSONObject newDMaaPConf) {
+ if (!oldDMaaPConf.toString().equals(newDMaaPConf.toString())) {
+ parseToDomainMapping(newDMaaPConf)
+ .onFailure(exc -> log.error(SKIP_MSG, exc))
+ .onSuccess(eventPublisherReconfigurer)
+ .onSuccess(parsedConfig ->
+ configFilesFacade.writeDMaaPConfiguration(newDMaaPConf)
+ .onFailure(logSkip())
+ .onSuccess(__ -> {
+ toRestart = true;
+ log.info("New dMaaP configuration written to file");
+ }));
+ } else {
+ log.info("DMaaP config from CBS is the same as currently used one. " + SKIP_MSG);
+ }
+ }
+
+ private Consumer<Throwable> logSkip() {
+ return __ -> log.error(SKIP_MSG);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/controller/ConfigParsing.java b/src/main/java/org/onap/dcae/controller/ConfigParsing.java
new file mode 100644
index 0000000..fea3451
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/ConfigParsing.java
@@ -0,0 +1,59 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.vavr.API.Try;
+import static io.vavr.API.Tuple;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+import static org.onap.dcae.controller.Conversions.toList;
+
+interface ConfigParsing {
+
+ Logger log = LoggerFactory.getLogger(ConfigParsing.class);
+
+ static Option<JSONObject> getDMaaPConfig(JSONObject configuration) {
+ log.info(f("Getting DMaaP configuration from app configuration: '%s'", configuration));
+ return toList(configuration.toMap().entrySet().iterator())
+ .filter(t -> t.getKey().startsWith("streams_publishes"))
+ .headOption()
+ .flatMap(e -> Try(() -> configuration.getJSONObject(e.getKey())).toOption())
+ .onEmpty(() -> log.warn(f("App configuration '%s' is missing DMaaP configuration ('streams_publishes' key) "
+ + "or DMaaP configuration is not a valid json document", configuration)))
+ .peek(dMaaPConf -> log.info(f("Found following DMaaP configuration: '%s'", dMaaPConf)));
+ }
+
+ static Map<String, String> getProperties(JSONObject configuration) {
+ log.info(f("Getting properties configuration from app configuration: '%s'", configuration));
+ Map<String, String> confEntries = toList(configuration.toMap().entrySet().iterator())
+ .toMap(e -> Tuple(e.getKey(), String.valueOf(e.getValue())))
+ .filterKeys(e -> !e.startsWith("streams_publishes"));
+ log.info(f("Found following app properties: '%s'", confEntries));
+ return confEntries;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/ConfigSource.java b/src/main/java/org/onap/dcae/controller/ConfigSource.java
new file mode 100644
index 0000000..78cb147
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/ConfigSource.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import io.vavr.control.Try;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+
+final class ConfigSource {
+
+ private static final Logger log = LoggerFactory.getLogger(ConfigSource.class);
+
+ static Try<JSONObject> getAppConfig(EnvProps envProps) {
+ log.info("Fetching app configuration from CBS");
+ return callConsulForCBSConfiguration(envProps)
+ .peek(strBody -> log.info(f("Received following CBS configuration from Consul '%s'", strBody)))
+ .flatMap(Conversions::toJsonArray)
+ .flatMap(ConfigSource::withdrawCatalog)
+ .flatMap(json -> constructFullCBSUrl(envProps, json))
+ .flatMap(cbsUrl -> callCBSForAppConfig(envProps, cbsUrl))
+ .flatMap(Conversions::toJson)
+ .peek(jsonNode -> log.info(f("Received app configuration: '%s'", jsonNode)))
+ .onFailure(exc -> log.error("Could not fetch application config", exc));
+ }
+
+ private static Try<String> callConsulForCBSConfiguration(EnvProps envProps) {
+ return executeGet(envProps.consulProtocol + "://" + envProps.consulHost + ":" +
+ envProps.consulPort + "/v1/catalog/service/" + envProps.cbsName)
+ .mapFailure(enhanceError("Unable to retrieve CBS configuration from Consul"));
+ }
+
+ private static Try<String> constructFullCBSUrl(EnvProps envProps, JSONObject json) {
+ return Try(() -> envProps.cbsProtocol + "://" + json.get("ServiceAddress").toString() + ":" +
+ json.get("ServicePort").toString())
+ .mapFailure(enhanceError("ServiceAddress / ServicePort missing from CBS conf: '%s'", json));
+ }
+
+ private static Try<JSONObject> withdrawCatalog(JSONArray json) {
+ return Try(() -> new JSONObject(json.get(0).toString()))
+ .mapFailure(enhanceError("CBS response '%s' is in invalid format,"
+ + " most probably is it not a list of configuration objects", json));
+ }
+
+ private static Try<String> callCBSForAppConfig(EnvProps envProps, String cbsUrl) {
+ log.info("Calling CBS for application config");
+ return executeGet(cbsUrl + "/service_component/" + envProps.appName)
+ .mapFailure(enhanceError("Unable to fetch configuration from CBS"));
+ }
+
+
+ private static Try<String> executeGet(String url) {
+ log.info(f("Calling HTTP GET on url: '%s'", url));
+ return Try(() -> Unirest.get(url).asString())
+ .mapFailure(enhanceError("Http call (GET '%s') failed.", url))
+ .filter(
+ res -> res.getStatus() == 200,
+ res -> new RuntimeException(f("HTTP call (GET '%s') failed with status %s and body '%s'",
+ url, res.getStatus(), res.getBody())))
+ .map(HttpResponse::getBody)
+ .peek(body -> log.info(f("HTTP GET on '%s' returned body '%s'", url, body)));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java b/src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java
new file mode 100644
index 0000000..2941b9b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java
@@ -0,0 +1,88 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+public class ControllerConfigInfo {
+ private String controller_name;
+ private String controller_restapiUrl;
+ private String controller_restapiUser;
+ private String controller_restapiPassword;
+ private String controller_accessTokenUrl;
+ private String controller_accessTokenFile;
+ private String controller_subscriptionUrl;
+ private String controller_accessTokenMethod;
+ private String controller_subsMethod;
+
+ public ControllerConfigInfo(String controller_name,
+ String controller_restapiUrl,
+ String controller_restapiUser,
+ String controller_restapiPassword,
+ String controller_accessTokenUrl,
+ String controller_accessTokenFile,
+ String controller_subscriptionUrl,
+ String controller_accessTokenMethod,
+ String controller_subsMethod) {
+ this.controller_name = controller_name;
+ this.controller_restapiUrl = controller_restapiUrl;
+ this.controller_restapiUser = controller_restapiUser;
+ this.controller_restapiPassword = controller_restapiPassword;
+ this.controller_accessTokenUrl = controller_accessTokenUrl;
+ this.controller_accessTokenFile = controller_accessTokenFile;
+ this.controller_subscriptionUrl = controller_subscriptionUrl;
+ this.controller_accessTokenMethod = controller_accessTokenMethod;
+ this.controller_subsMethod = controller_subsMethod;
+ }
+
+ public String getController_name() {
+ return controller_name;
+ }
+
+ public String getController_restapiUrl() {
+ return controller_restapiUrl;
+ }
+
+ public String getController_restapiUser() {
+ return controller_restapiUser;
+ }
+
+ public String getController_restapiPassword() {
+ return controller_restapiPassword;
+ }
+
+ public String getController_accessTokenUrl() {
+ return controller_accessTokenUrl;
+ }
+
+ public String getController_accessTokenFile() {
+ return controller_accessTokenFile;
+ }
+
+ public String getController_accessTokenMethod() {
+ return controller_accessTokenMethod;
+ }
+
+ public String getController_subsMethod() {
+ return controller_subsMethod;
+ }
+
+ public String getController_subscriptionUrl() {
+ return controller_subscriptionUrl;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/controller/Conversions.java b/src/main/java/org/onap/dcae/controller/Conversions.java
new file mode 100644
index 0000000..0a3e7c7
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/Conversions.java
@@ -0,0 +1,54 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
+
+import io.vavr.API;
+import io.vavr.collection.List;
+import io.vavr.control.Try;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.StreamSupport;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+interface Conversions {
+
+ static Try<JSONObject> toJson(String strBody) {
+ return API.Try(() -> new JSONObject(strBody))
+ .mapFailure(enhanceError("Value '%s' is not a valid JSON document", strBody));
+ }
+
+ static Try<JSONArray> toJsonArray(String strBody) {
+ return API.Try(() -> new JSONArray(strBody))
+ .mapFailure(enhanceError("Value '%s' is not a valid JSON array", strBody));
+ }
+
+ static <T> List<T> toList(Iterator<T> iterator) {
+ return List.ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java b/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java
new file mode 100644
index 0000000..3feeacc
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Try;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+
+final class EnvPropertiesReader {
+
+ private final static Logger log = LoggerFactory.getLogger(EnvPropertiesReader.class);
+
+ static Option<EnvProps> readEnvProps(Map<String, String> environmentVariables) {
+ log.info("Loading necessary environment variables for dynamic configuration update");
+ int consulPort = getConsulPort(environmentVariables);
+ String consulProtocol = getConsulProtocol(environmentVariables);
+ String cbsProtocol = getCbsProtocol(environmentVariables);
+ Option<String> consulHost = getConsulHost(environmentVariables);
+ Option<String> cbsServiceName = getCBSName(environmentVariables);
+ Option<String> collectorAppName = getAppName(environmentVariables);
+ return Option.sequence(List(consulHost, cbsServiceName, collectorAppName))
+ .map(e -> new EnvProps(consulProtocol, e.get(0), consulPort, cbsProtocol, e.get(1), e.get(2)))
+ .onEmpty(() -> log.warn("Some required environment variables are missing"))
+ .peek(props -> log.info(f("Discovered following environment variables: '%s'", props)));
+ }
+
+ private static Option<String> getAppName(Map<String, String> environmentVariables) {
+ return environmentVariables.get("HOSTNAME")
+ .orElse(environmentVariables.get("SERVICE_NAME"))
+ .onEmpty(() -> log.warn("App service name (as registered in CBS) (env var: 'HOSTNAME' / 'SERVICE_NAME') "
+ + "is missing error environment variables."));
+ }
+
+ private static Option<String> getCBSName(Map<String, String> environmentVariables) {
+ return environmentVariables.get("CONFIG_BINDING_SERVICE")
+ .onEmpty(() -> log.warn("Name of CBS Service (as registered in Consul) (env var: 'CONFIG_BINDING_SERVICE') "
+ + "is missing from environment variables."));
+ }
+
+ private static Integer getConsulPort(Map<String, String> environmentVariables) {
+ return environmentVariables.get("CONSUL_PORT")
+ .flatMap(str -> Try(() -> Integer.valueOf(str))
+ .onFailure(exc -> log.warn("Consul port is not an integer value", exc))
+ .toOption())
+ .onEmpty(() -> log.warn("Consul port (env var: 'CONSUL_PORT') is missing from environment variables. "
+ + "Using default value of 8500"))
+ .getOrElse(8500);
+ }
+
+ private static Option<String> getConsulHost(Map<String, String> environmentVariables) {
+ return environmentVariables.get("CONSUL_HOST")
+ .onEmpty(() -> log.warn("Consul host (env var: 'CONSUL_HOST') (without port) "
+ + "is missing from environment variables."));
+ }
+
+ private static String getConsulProtocol(Map<String, String> environmentVariables) {
+ return getProtocolFrom("CONSUL_PROTOCOL", environmentVariables);
+ }
+
+ private static String getCbsProtocol(Map<String, String> environmentVariables) {
+ return getProtocolFrom("CBS_PROTOCOL", environmentVariables);
+ }
+
+ private static String getProtocolFrom(String variableName, Map<String, String> environmentVariables) {
+ return environmentVariables.get(variableName)
+ .onEmpty(() -> log.warn("Consul protocol (env var: '" + variableName + "') is missing "
+ + "from environment variables."))
+ .getOrElse("http");
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/EnvProps.java b/src/main/java/org/onap/dcae/controller/EnvProps.java
new file mode 100644
index 0000000..a2d381d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/EnvProps.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.controller;
+
+import java.util.Objects;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class EnvProps {
+
+ final String consulProtocol;
+ final String consulHost;
+ final int consulPort;
+ final String cbsName;
+ final String cbsProtocol;
+ final String appName;
+
+ EnvProps(String consulProtocol, String consulHost, int consulPort, String cbsProtocol, String cbsName, String appName) {
+ this.consulProtocol = consulProtocol;
+ this.consulHost = consulHost;
+ this.consulPort = consulPort;
+ this.cbsProtocol = cbsProtocol;
+ this.cbsName = cbsName;
+ this.appName = appName;
+ }
+
+ @Override
+ public String toString() {
+ return "EnvProps{" +
+ "consulProtocol='" + consulProtocol + '\'' +
+ ", consulHost='" + consulHost + '\'' +
+ ", consulPort=" + consulPort +
+ ", cbsProtocol='" + cbsProtocol + '\'' +
+ ", cbsName='" + cbsName + '\'' +
+ ", appName='" + appName + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EnvProps envProps = (EnvProps) o;
+ return consulPort == envProps.consulPort &&
+ Objects.equals(consulProtocol, envProps.consulProtocol) &&
+ Objects.equals(consulHost, envProps.consulHost) &&
+ Objects.equals(cbsProtocol, envProps.cbsProtocol) &&
+ Objects.equals(cbsName, envProps.cbsName) &&
+ Objects.equals(appName, envProps.appName);
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
new file mode 100644
index 0000000..1c0d85b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
@@ -0,0 +1,207 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.controller;
+
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.onap.dcae.common.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.HttpHeaders;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.onap.dcae.common.RestapiCallNodeUtil.*;
+
+public class PersistentEventConnection implements Runnable {
+ public String event_name;
+ private String event_description;
+ private boolean event_sseventUrlEmbed;
+ private String event_sseventsField;
+ private String event_sseventsUrl;
+ private String event_subscriptionTemplate;
+ private String event_unSubscriptionTemplate;
+ private String event_ruleId;
+ private EventConnectionState state;
+ private volatile boolean running = true;
+ private static final Logger log = LoggerFactory.getLogger(PersistentEventConnection.class);
+
+
+ private RestConfContext ctx;
+ private AccessController parentCtrllr;
+ private Map<String, String> eventParaMap;
+
+ public PersistentEventConnection(String event_name,
+ String event_description,
+ boolean event_sseventUrlEmbed,
+ String event_sseventsField,
+ String event_sseventsUrl,
+ String event_subscriptionTemplate,
+ String event_unSubscriptionTemplate,
+ String event_ruleId,
+ AccessController parentCtrllr) {
+ this.event_name = event_name;
+ this.event_description = event_description;
+ this.event_sseventUrlEmbed = event_sseventUrlEmbed;
+ this.event_sseventsField = event_sseventsField;
+ this.event_sseventsUrl = event_sseventsUrl;
+ this.event_subscriptionTemplate = event_subscriptionTemplate;
+ this.event_unSubscriptionTemplate = event_unSubscriptionTemplate;
+ this.event_ruleId = event_ruleId;
+ this.state = EventConnectionState.INIT;
+
+ this.ctx = new RestConfContext();
+ for (String s : parentCtrllr.getCtx().getAttributeKeySet()) {
+ this.ctx.setAttribute(s, ctx.getAttribute(s));
+ }
+ this.parentCtrllr = parentCtrllr;
+ this.eventParaMap = new HashMap<>();
+ this.eventParaMap.putAll(parentCtrllr.getParaMap());
+ printEventParamMap();
+ log.info("New persistent connection created " + event_name);
+ }
+
+ @Override
+ public void run() {
+ Parameters p = null;
+ try {
+ modifyEventParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(parentCtrllr.getProperties().authorizationEnabled())
+ + parentCtrllr.getCfgInfo().getController_restapiUrl()
+ + parentCtrllr.getCfgInfo().getController_subscriptionUrl());
+ modifyEventParamMap(Constants.KDEFAULT_TEMP_FILENAME, event_subscriptionTemplate);
+ modifyEventParamMap(Constants.KSETTING_REST_UNAME, parentCtrllr.getCfgInfo().getController_restapiUser());
+ modifyEventParamMap(Constants.KSETTING_REST_PASSWORD, parentCtrllr.getCfgInfo().getController_restapiPassword());
+ modifyEventParamMap(Constants.KSETTING_HTTP_METHOD, parentCtrllr.getCfgInfo().getController_subsMethod());
+
+ parentCtrllr.getRestApiCallNode().sendRequest(eventParaMap, ctx, null);
+ } catch (Exception e) {
+ log.error("Exception occured!", e);
+ Thread.currentThread().interrupt();
+ }
+
+ /* Retrieve url from result and construct SSE url */
+ if (event_sseventUrlEmbed) {
+ String key = getEventParamMapValue(Constants.KSETTING_RESP_PREFIX).concat(".").concat(event_sseventsField);
+ log.info("key " + key);
+ this.event_sseventsUrl = ctx.getAttribute(key);
+ }
+
+ log.info("SSE received url " + event_sseventsUrl);
+ try {
+ p = getParameters(eventParaMap);
+ } catch (Exception e) {
+ log.error("Exception occured!", e);
+ Thread.currentThread().interrupt();
+ }
+ printEventParamMap();
+ String url = getUriMethod(parentCtrllr.getProperties().authorizationEnabled()) +
+ parentCtrllr.getCfgInfo().getController_restapiUrl() + event_sseventsUrl;
+ Client client = ignoreSslClient().register(SseFeature.class);
+ WebTarget target = addAuthType(client, p).target(url);
+ String tokenId = getEventParamMapValue(Constants.KSETTING_TOKENID);
+ String headerName = "X-ACCESS-TOKEN";
+ if (tokenId == null) {
+ headerName = HttpHeaders.AUTHORIZATION;
+ tokenId = getAuthorizationToken(parentCtrllr.getCfgInfo().getController_restapiUser(),
+ parentCtrllr.getCfgInfo().getController_restapiPassword());
+ }
+ AdditionalHeaderWebTarget newTarget = new AdditionalHeaderWebTarget(target, tokenId, headerName);
+ EventSource eventSource = EventSource.target(newTarget).build();
+ eventSource.register(new DataChangeEventListener(this));
+ eventSource.open();
+ log.info("Connected to SSE source");
+ while (running) {
+ try {
+ log.info("SSE state " + eventSource.isOpen());
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ log.info("Exception: " + ie.getMessage());
+ Thread.currentThread().interrupt();
+ running = false;
+ }
+ }
+ eventSource.close();
+ log.info("Closed connection to SSE source");
+ }
+
+ private String getAuthorizationToken(String userName, String password) {
+ return "Basic " + Base64.getEncoder().encodeToString((
+ userName + ":" + password).getBytes());
+ }
+
+ private Client ignoreSslClient() {
+ SSLContext sslcontext = null;
+
+ try {
+ sslcontext = SSLContext.getInstance("TLS");
+ sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
+ @Override
+ public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }}, new java.security.SecureRandom());
+ } catch (NoSuchAlgorithmException | KeyManagementException e) {
+ throw new IllegalStateException(e);
+ }
+
+ return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build();
+ }
+
+ public String getEvent_ruleId() {
+ return event_ruleId;
+ }
+
+ public void modifyEventParamMap(String fieldName, String value) {
+ eventParaMap.put(fieldName, value);
+ }
+
+ public String getEventParamMapValue(String fieldName) {
+ return eventParaMap.get(fieldName);
+ }
+
+ public void printEventParamMap() {
+ log.info("----------------Event Param Map-------------------");
+ for (String name : eventParaMap.keySet()) {
+ String value = eventParaMap.get(name);
+ log.info(name + " : " + value);
+ }
+ }
+}