diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/controller')
10 files changed, 1188 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/controller/AccessController.java b/src/main/java/org/onap/dcae/controller/AccessController.java new file mode 100644 index 0000000..83673b8 --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/AccessController.java @@ -0,0 +1,243 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.Constants; +import org.onap.dcae.common.RestConfContext; +import org.onap.dcae.common.RestapiCallNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static java.nio.file.Files.readAllBytes; +import static org.onap.dcae.common.RestapiCallNodeUtil.getUriMethod; + +public class AccessController { + private static final Logger log = LoggerFactory.getLogger(AccessController.class); + /* Collector properties */ + private ApplicationSettings properties; + + /* Controller specific information */ + private ControllerConfigInfo cfgInfo; + private RestConfContext ctx; + RestapiCallNode restApiCallNode; + + /* Maps of Events */ + private Map<String, PersistentEventConnection> eventList = new ConcurrentHashMap<>(); + private ExecutorService executor = Executors.newCachedThreadPool(); + private Map<String, String> paraMap; + + public AccessController(JSONObject controller, + ApplicationSettings properties) { + this.cfgInfo = new ControllerConfigInfo(controller.get("controller_name").toString(), + controller.get("controller_restapiUrl").toString(), + controller.get("controller_restapiUser").toString(), + controller.get("controller_restapiPassword").toString(), + controller.get("controller_accessTokenUrl").toString(), + controller.get("controller_accessTokenFile").toString(), + controller.get("controller_subscriptionUrl").toString(), + controller.get("controller_accessTokenMethod").toString(), + controller.get("controller_subsMethod").toString()); + this.properties = properties; + this.ctx = new RestConfContext(); + this.restApiCallNode = new RestapiCallNode(); + this.paraMap = new HashMap<>(); + + prepareControllerParamMap(); + + log.info("AccesController Created {} {} {} {} {} {}", + this.cfgInfo.getController_name(), + this.cfgInfo.getController_restapiUrl(), + this.cfgInfo.getController_restapiPassword(), + this.cfgInfo.getController_restapiUser(), + this.cfgInfo.getController_accessTokenUrl(), + this.cfgInfo.getController_accessTokenFile()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof AccessController)) return false; + AccessController that = (AccessController) o; + return that.cfgInfo.getController_name().equals(that.cfgInfo.getController_name()); + } + + @Override + public int hashCode() { + return Objects.hash(this.cfgInfo.getController_name()); + } + + public RestapiCallNode getRestApiCallNode() { + return restApiCallNode; + } + + private void fetch_TokenId() { + + + modifyControllerParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(this.properties.authorizationEnabled()) + cfgInfo.getController_restapiUrl() + cfgInfo.getController_accessTokenUrl()); + modifyControllerParamMap(Constants.KDEFAULT_TEMP_FILENAME, cfgInfo.getController_accessTokenFile()); + modifyControllerParamMap(Constants.KSETTING_REST_UNAME, cfgInfo.getController_restapiUser()); + modifyControllerParamMap(Constants.KSETTING_REST_PASSWORD, cfgInfo.getController_restapiPassword()); + modifyControllerParamMap(Constants.KSETTING_HTTP_METHOD, cfgInfo.getController_accessTokenMethod()); + + String httpResponse = null; + try { + + restApiCallNode.sendRequest(this.paraMap, ctx, null); + String key = getControllerParamMapValue(Constants.KSETTING_RESP_PREFIX).concat(".").concat("httpResponse"); + httpResponse = ctx.getAttribute(key); + log.info("httpResponse ", httpResponse + " key " + key); + JSONObject jsonObj = new JSONObject(httpResponse); + log.info("jsonObj ", jsonObj.toString()); + //JSONObject data = jsonObj.getJSONObject("data"); + //String tokenId = data.get("accessSession").toString(); + //@TODO: Make return field dynamic + String tokenId = jsonObj.get("accessSession").toString(); + log.info("token 1" + tokenId); + modifyControllerParamMap(Constants.KSETTING_TOKENID, tokenId); + modifyControllerParamMap(Constants.KSETTING_CUSTOMHTTP_HEADER, "X-ACCESS-TOKEN=" + tokenId); + } catch (Exception e) { + log.info("Access token is not supported" + e.getMessage()); + log.info("http response " + httpResponse); + } + } + + public void activate() { + fetch_TokenId(); + printControllerParamMap(); + /* Create eventlist from properties */ + JSONArray contollers = new JSONArray(properties.rcc_policy()); + for (int i = 0; i < contollers.length(); i++) { + JSONObject controller = contollers.getJSONObject(i); + if (controller.get("controller_name").equals(this.getCfgInfo().getController_name())) { + JSONArray eventlists = controller.getJSONArray("event_details"); + for (int j = 0; j < eventlists.length(); j++) { + JSONObject event = eventlists.getJSONObject(j); + String name = event.get("event_name").toString(); + PersistentEventConnection conn = new PersistentEventConnection(name, + event.get("event_description").toString(), + Boolean.parseBoolean(event.get("event_sseventUrlEmbed").toString()), + event.get("event_sseventsField").toString(), + event.get("event_sseventsUrl").toString(), + event.get("event_subscriptionTemplate").toString(), + event.get("event_unSubscriptionTemplate").toString(), + event.get("event_ruleId").toString(), + this); + + eventList.put(name, conn); + executor.execute(conn); + } + } + } + } + + public RestConfContext getCtx() { + return ctx; + } + + public ApplicationSettings getProperties() { + return properties; + } + + public ControllerConfigInfo getCfgInfo() { + return cfgInfo; + } + + public Map<String, String> getParaMap() { + return this.paraMap; + } + + private void prepareControllerParamMap() { + /* Adding the fields in ParaMap */ + paraMap.put(Constants.KDEFAULT_TEMP_FILENAME, null); + paraMap.put(Constants.KSETTING_REST_API_URL, null); + paraMap.put(Constants.KSETTING_HTTP_METHOD, "post"); + paraMap.put(Constants.KSETTING_RESP_PREFIX, "responsePrefix"); + paraMap.put(Constants.KSETTING_SKIP_SENDING, "false"); + paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, null); + paraMap.put(Constants.KSETTING_FORMAT, "json"); + + paraMap.put(Constants.KSETTING_REST_UNAME, null); + paraMap.put(Constants.KSETTING_REST_PASSWORD, null); + paraMap.put(Constants.KDEFAULT_REQUESTBODY, null); + + paraMap.put(Constants.KSETTING_AUTH_TYPE, "unspecified"); + paraMap.put(Constants.KSETTING_CONTENT_TYPE, "application/json"); + paraMap.put(Constants.KSETTING_OAUTH_CONSUMER_KEY, null); + paraMap.put(Constants.KSETTING_OAUTH_CONSUMER_SECRET, null); + paraMap.put(Constants.KSETTING_OAUTH_SIGNATURE_METHOD, null); + paraMap.put(Constants.KSETTING_OAUTH_VERSION, null); + + paraMap.put(Constants.KSETTING_CUSTOMHTTP_HEADER, null); + paraMap.put(Constants.KSETTING_TOKENID, null); + paraMap.put(Constants.KSETTING_DUMP_HEADER, "false"); + paraMap.put(Constants.KSETTING_RETURN_REQUEST_PAYLOAD, "false"); + + paraMap.put(Constants.KSETTING_TRUST_STORE_FILENAME, this.getProperties().truststoreFileLocation()); + String trustPassword = getKeyStorePassword(toAbsolutePath(this.getProperties().truststorePasswordFileLocation())); + paraMap.put(Constants.KSETTING_TRUST_STORE_PASSWORD, trustPassword); + paraMap.put(Constants.KSETTING_KEY_STORE_FILENAME, this.getProperties().keystoreFileLocation()); + String KeyPassword = getKeyStorePassword(toAbsolutePath(this.getProperties().keystorePasswordFileLocation())); + paraMap.put(Constants.KSETTING_KEY_STORE_PASSWORD, KeyPassword); + + } + + private Path toAbsolutePath(final String path) { + return Paths.get(path).toAbsolutePath(); + } + + private String getKeyStorePassword(final Path location) { + try { + return new String(readAllBytes(location)); + } catch (Exception e) { + log.error("Could not read password from: '" + location + "'.", e); + throw new ApplicationException(e); + } + } + + public void modifyControllerParamMap(String fieldName, String value) { + paraMap.put(fieldName, value); + } + + public String getControllerParamMapValue(String fieldName) { + return paraMap.get(fieldName); + } + + public void printControllerParamMap() { + log.info("----------------Controller Param Map-------------------"); + for (String name : paraMap.keySet()) { + String value = paraMap.get(name); + log.info(name + " : " + value); + } + } +} diff --git a/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java b/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java new file mode 100644 index 0000000..4542fb5 --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java @@ -0,0 +1,131 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import io.vavr.CheckedRunnable; +import io.vavr.Tuple2; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.*; +import static org.onap.dcae.controller.Conversions.toList; + +class ConfigFilesFacade { + + private static Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class); + + private final Path dMaaPConfigPath; + private final Path propertiesPath; + + public ConfigFilesFacade(Path dMaaPConfigPath, Path propertiesPath) { + this.dMaaPConfigPath = dMaaPConfigPath; + this.propertiesPath = propertiesPath; + } + + Try<Map<String, String>> readCollectorProperties() { + log.info(f("Reading collector properties from path: '%s'", propertiesPath)); + return Try(this::readProperties) + .map(prop -> toList(prop.getKeys()).toMap(k -> k, k -> (String) prop.getProperty(k))) + .mapFailure(enhanceError("Unable to read properties configuration from path '%s'", propertiesPath)) + .onFailure(logError(log)) + .peek(props -> log.info(f("Read following collector properties: '%s'", props))); + } + + Try<JSONObject> readDMaaPConfiguration() { + log.info(f("Reading DMaaP configuration from file: '%s'", dMaaPConfigPath)); + return readFile(dMaaPConfigPath) + .recover(FileNotFoundException.class, __ -> "{}") + .mapFailure(enhanceError("Unable to read DMaaP configuration from file '%s'", dMaaPConfigPath)) + .flatMap(Conversions::toJson) + .onFailure(logError(log)) + .peek(props -> log.info(f("Read following DMaaP properties: '%s'", props))); + } + + Try<Void> writeDMaaPConfiguration(JSONObject dMaaPConfiguration) { + log.info(f("Writing DMaaP configuration '%s' into file '%s'", dMaaPConfiguration, dMaaPConfigPath)); + return writeFile(dMaaPConfigPath, indentConfiguration(dMaaPConfiguration.toString())) + .mapFailure(enhanceError("Could not save new DMaaP configuration to path '%s'", dMaaPConfigPath)) + .onFailure(logError(log)) + .peek(__ -> log.info("Written successfully")); + } + + + Try<Void> writeProperties(Map<String, String> properties) { + log.info(f("Writing properties configuration '%s' into file '%s'", properties, propertiesPath)); + return Try.run(saveProperties(properties)) + .mapFailure(enhanceError("Could not save properties to path '%s'", properties)) + .onFailure(logError(log)) + .peek(__ -> log.info("Written successfully")); + } + + private Try<String> readFile(Path path) { + return Try(() -> new String(Files.readAllBytes(path), StandardCharsets.UTF_8)) + .mapFailure(enhanceError("Could not read content from path: '%s'", path)); + } + + private Try<Void> writeFile(Path path, String content) { + return Try.run(() -> Files.write(path, content.getBytes())) + .mapFailure(enhanceError("Could not write content to path: '%s'", path)); + } + + private PropertiesConfiguration readProperties() throws ConfigurationException { + PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(); + propertiesConfiguration.setDelimiterParsingDisabled(true); + propertiesConfiguration.load(propertiesPath.toFile()); + return propertiesConfiguration; + } + + private CheckedRunnable saveProperties(Map<String, String> properties) { + return () -> { + PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(propertiesPath.toFile()); + propertiesConfiguration.setEncoding(null); + for (Tuple2<String, String> property : properties) { + updateProperty(propertiesConfiguration, property); + } + propertiesConfiguration.save(); + }; + } + + private void updateProperty(PropertiesConfiguration propertiesConfiguration, Tuple2<String, String> property) { + if (propertiesConfiguration.containsKey(property._1)) { + propertiesConfiguration.setProperty(property._1, property._2); + } else { + propertiesConfiguration.addProperty(property._1, property._2); + } + } + + private String indentConfiguration(String configuration) { + return new JSONObject(configuration).toString(4); + } + +} diff --git a/src/main/java/org/onap/dcae/controller/ConfigLoader.java b/src/main/java/org/onap/dcae/controller/ConfigLoader.java new file mode 100644 index 0000000..1b7e60b --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/ConfigLoader.java @@ -0,0 +1,147 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import io.vavr.Function0; +import io.vavr.Function1; +import io.vavr.collection.HashMap; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import org.json.JSONObject; +import org.onap.dcae.RestConfCollector; +import org.onap.dcae.common.publishing.PublisherConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.util.function.Consumer; + +import static org.onap.dcae.common.publishing.DMaaPConfigurationParser.parseToDomainMapping; +import static org.onap.dcae.controller.ConfigParsing.getDMaaPConfig; +import static org.onap.dcae.controller.ConfigParsing.getProperties; +import static org.onap.dcae.controller.EnvPropertiesReader.readEnvProps; + +public class ConfigLoader { + + private static final String SKIP_MSG = "Skipping dynamic configuration update"; + private static Logger log = LoggerFactory.getLogger(ConfigLoader.class); + private final Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer; + private final ConfigFilesFacade configFilesFacade; + private final Function1<EnvProps, Try<JSONObject>> configurationSource; + private final Function0<Map<String, String>> envVariablesSupplier; + private boolean toRestart = false; + + ConfigLoader(Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer, + ConfigFilesFacade configFilesFacade, + Function1<EnvProps, Try<JSONObject>> configurationSource, + Function0<Map<String, String>> envVariablesSupplier) { + this.eventPublisherReconfigurer = eventPublisherReconfigurer; + this.configFilesFacade = configFilesFacade; + this.configurationSource = configurationSource; + this.envVariablesSupplier = envVariablesSupplier; + } + + public static ConfigLoader create(Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer, + Path dMaaPConfigFile, Path propertiesConfigFile) { + log.info("ConfigLoader create ...."); + return new ConfigLoader(eventPublisherReconfigurer, + new ConfigFilesFacade(dMaaPConfigFile, propertiesConfigFile), + ConfigSource::getAppConfig, + () -> HashMap.ofAll(System.getenv())); + } + + public void updateConfig() { + log.info("Trying to dynamically update config from Config Binding Service"); + readEnvProps(envVariablesSupplier.get()) + .onEmpty(() -> log.warn(SKIP_MSG)) + .forEach(this::updateConfig); + } + + private void updateConfig(EnvProps props) { + configurationSource.apply(props) + .onFailure(logSkip()) + .onSuccess(newConf -> { + updateConfigurationProperties(newConf); + updateDMaaPProperties(newConf); + reloadApplication(); + } + ); + } + + private void reloadApplication() { + if (toRestart) { + log.info("New app config - Application will be restarted"); + RestConfCollector.restartApplication(); + } + } + + private void updateDMaaPProperties(JSONObject newConf) { + configFilesFacade.readDMaaPConfiguration() + .onFailure(logSkip()) + .onSuccess(oldDMaaPConf -> getDMaaPConfig(newConf) + .onEmpty(() -> log.warn(SKIP_MSG)) + .forEach(newDMaaPConf -> compareAndOverwriteDMaaPConfig(oldDMaaPConf, newDMaaPConf))); + } + + + private void updateConfigurationProperties(JSONObject newConf) { + configFilesFacade.readCollectorProperties() + .onFailure(logSkip()) + .onSuccess(oldProps -> compareAndOverwritePropertiesConfig(newConf, oldProps)); + } + + private void compareAndOverwritePropertiesConfig(JSONObject newConf, Map<String, String> oldProps) { + Map<String, String> newProperties = getProperties(newConf); + Map<String, String> result = oldProps.filterKeys((s) -> newProperties.keySet().contains(s)); + if (!result.equals(newProperties)) { + configFilesFacade.writeProperties(newProperties) + .onSuccess(__ -> { + toRestart = true; + log.info("New properties configuration written to file"); + }) + .onFailure(logSkip()); + } else { + log.info("Collector properties from CBS are the same as currently used ones. " + SKIP_MSG); + } + } + + private void compareAndOverwriteDMaaPConfig(JSONObject oldDMaaPConf, JSONObject newDMaaPConf) { + if (!oldDMaaPConf.toString().equals(newDMaaPConf.toString())) { + parseToDomainMapping(newDMaaPConf) + .onFailure(exc -> log.error(SKIP_MSG, exc)) + .onSuccess(eventPublisherReconfigurer) + .onSuccess(parsedConfig -> + configFilesFacade.writeDMaaPConfiguration(newDMaaPConf) + .onFailure(logSkip()) + .onSuccess(__ -> { + toRestart = true; + log.info("New dMaaP configuration written to file"); + })); + } else { + log.info("DMaaP config from CBS is the same as currently used one. " + SKIP_MSG); + } + } + + private Consumer<Throwable> logSkip() { + return __ -> log.error(SKIP_MSG); + } +} diff --git a/src/main/java/org/onap/dcae/controller/ConfigParsing.java b/src/main/java/org/onap/dcae/controller/ConfigParsing.java new file mode 100644 index 0000000..fea3451 --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/ConfigParsing.java @@ -0,0 +1,59 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.vavr.API.Try; +import static io.vavr.API.Tuple; +import static org.onap.dcae.common.publishing.VavrUtils.f; +import static org.onap.dcae.controller.Conversions.toList; + +interface ConfigParsing { + + Logger log = LoggerFactory.getLogger(ConfigParsing.class); + + static Option<JSONObject> getDMaaPConfig(JSONObject configuration) { + log.info(f("Getting DMaaP configuration from app configuration: '%s'", configuration)); + return toList(configuration.toMap().entrySet().iterator()) + .filter(t -> t.getKey().startsWith("streams_publishes")) + .headOption() + .flatMap(e -> Try(() -> configuration.getJSONObject(e.getKey())).toOption()) + .onEmpty(() -> log.warn(f("App configuration '%s' is missing DMaaP configuration ('streams_publishes' key) " + + "or DMaaP configuration is not a valid json document", configuration))) + .peek(dMaaPConf -> log.info(f("Found following DMaaP configuration: '%s'", dMaaPConf))); + } + + static Map<String, String> getProperties(JSONObject configuration) { + log.info(f("Getting properties configuration from app configuration: '%s'", configuration)); + Map<String, String> confEntries = toList(configuration.toMap().entrySet().iterator()) + .toMap(e -> Tuple(e.getKey(), String.valueOf(e.getValue()))) + .filterKeys(e -> !e.startsWith("streams_publishes")); + log.info(f("Found following app properties: '%s'", confEntries)); + return confEntries; + } + +} diff --git a/src/main/java/org/onap/dcae/controller/ConfigSource.java b/src/main/java/org/onap/dcae/controller/ConfigSource.java new file mode 100644 index 0000000..78cb147 --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/ConfigSource.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import com.mashape.unirest.http.HttpResponse; +import com.mashape.unirest.http.Unirest; +import io.vavr.control.Try; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.common.publishing.VavrUtils.f; + +final class ConfigSource { + + private static final Logger log = LoggerFactory.getLogger(ConfigSource.class); + + static Try<JSONObject> getAppConfig(EnvProps envProps) { + log.info("Fetching app configuration from CBS"); + return callConsulForCBSConfiguration(envProps) + .peek(strBody -> log.info(f("Received following CBS configuration from Consul '%s'", strBody))) + .flatMap(Conversions::toJsonArray) + .flatMap(ConfigSource::withdrawCatalog) + .flatMap(json -> constructFullCBSUrl(envProps, json)) + .flatMap(cbsUrl -> callCBSForAppConfig(envProps, cbsUrl)) + .flatMap(Conversions::toJson) + .peek(jsonNode -> log.info(f("Received app configuration: '%s'", jsonNode))) + .onFailure(exc -> log.error("Could not fetch application config", exc)); + } + + private static Try<String> callConsulForCBSConfiguration(EnvProps envProps) { + return executeGet(envProps.consulProtocol + "://" + envProps.consulHost + ":" + + envProps.consulPort + "/v1/catalog/service/" + envProps.cbsName) + .mapFailure(enhanceError("Unable to retrieve CBS configuration from Consul")); + } + + private static Try<String> constructFullCBSUrl(EnvProps envProps, JSONObject json) { + return Try(() -> envProps.cbsProtocol + "://" + json.get("ServiceAddress").toString() + ":" + + json.get("ServicePort").toString()) + .mapFailure(enhanceError("ServiceAddress / ServicePort missing from CBS conf: '%s'", json)); + } + + private static Try<JSONObject> withdrawCatalog(JSONArray json) { + return Try(() -> new JSONObject(json.get(0).toString())) + .mapFailure(enhanceError("CBS response '%s' is in invalid format," + + " most probably is it not a list of configuration objects", json)); + } + + private static Try<String> callCBSForAppConfig(EnvProps envProps, String cbsUrl) { + log.info("Calling CBS for application config"); + return executeGet(cbsUrl + "/service_component/" + envProps.appName) + .mapFailure(enhanceError("Unable to fetch configuration from CBS")); + } + + + private static Try<String> executeGet(String url) { + log.info(f("Calling HTTP GET on url: '%s'", url)); + return Try(() -> Unirest.get(url).asString()) + .mapFailure(enhanceError("Http call (GET '%s') failed.", url)) + .filter( + res -> res.getStatus() == 200, + res -> new RuntimeException(f("HTTP call (GET '%s') failed with status %s and body '%s'", + url, res.getStatus(), res.getBody()))) + .map(HttpResponse::getBody) + .peek(body -> log.info(f("HTTP GET on '%s' returned body '%s'", url, body))); + } + +} diff --git a/src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java b/src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java new file mode 100644 index 0000000..2941b9b --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/ControllerConfigInfo.java @@ -0,0 +1,88 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +public class ControllerConfigInfo { + private String controller_name; + private String controller_restapiUrl; + private String controller_restapiUser; + private String controller_restapiPassword; + private String controller_accessTokenUrl; + private String controller_accessTokenFile; + private String controller_subscriptionUrl; + private String controller_accessTokenMethod; + private String controller_subsMethod; + + public ControllerConfigInfo(String controller_name, + String controller_restapiUrl, + String controller_restapiUser, + String controller_restapiPassword, + String controller_accessTokenUrl, + String controller_accessTokenFile, + String controller_subscriptionUrl, + String controller_accessTokenMethod, + String controller_subsMethod) { + this.controller_name = controller_name; + this.controller_restapiUrl = controller_restapiUrl; + this.controller_restapiUser = controller_restapiUser; + this.controller_restapiPassword = controller_restapiPassword; + this.controller_accessTokenUrl = controller_accessTokenUrl; + this.controller_accessTokenFile = controller_accessTokenFile; + this.controller_subscriptionUrl = controller_subscriptionUrl; + this.controller_accessTokenMethod = controller_accessTokenMethod; + this.controller_subsMethod = controller_subsMethod; + } + + public String getController_name() { + return controller_name; + } + + public String getController_restapiUrl() { + return controller_restapiUrl; + } + + public String getController_restapiUser() { + return controller_restapiUser; + } + + public String getController_restapiPassword() { + return controller_restapiPassword; + } + + public String getController_accessTokenUrl() { + return controller_accessTokenUrl; + } + + public String getController_accessTokenFile() { + return controller_accessTokenFile; + } + + public String getController_accessTokenMethod() { + return controller_accessTokenMethod; + } + + public String getController_subsMethod() { + return controller_subsMethod; + } + + public String getController_subscriptionUrl() { + return controller_subscriptionUrl; + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/controller/Conversions.java b/src/main/java/org/onap/dcae/controller/Conversions.java new file mode 100644 index 0000000..0a3e7c7 --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/Conversions.java @@ -0,0 +1,54 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; + +import io.vavr.API; +import io.vavr.collection.List; +import io.vavr.control.Try; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; +import org.json.JSONArray; +import org.json.JSONObject; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +interface Conversions { + + static Try<JSONObject> toJson(String strBody) { + return API.Try(() -> new JSONObject(strBody)) + .mapFailure(enhanceError("Value '%s' is not a valid JSON document", strBody)); + } + + static Try<JSONArray> toJsonArray(String strBody) { + return API.Try(() -> new JSONArray(strBody)) + .mapFailure(enhanceError("Value '%s' is not a valid JSON array", strBody)); + } + + static <T> List<T> toList(Iterator<T> iterator) { + return List.ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)); + } + +} diff --git a/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java b/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java new file mode 100644 index 0000000..3feeacc --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.vavr.API.List; +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.f; + +final class EnvPropertiesReader { + + private final static Logger log = LoggerFactory.getLogger(EnvPropertiesReader.class); + + static Option<EnvProps> readEnvProps(Map<String, String> environmentVariables) { + log.info("Loading necessary environment variables for dynamic configuration update"); + int consulPort = getConsulPort(environmentVariables); + String consulProtocol = getConsulProtocol(environmentVariables); + String cbsProtocol = getCbsProtocol(environmentVariables); + Option<String> consulHost = getConsulHost(environmentVariables); + Option<String> cbsServiceName = getCBSName(environmentVariables); + Option<String> collectorAppName = getAppName(environmentVariables); + return Option.sequence(List(consulHost, cbsServiceName, collectorAppName)) + .map(e -> new EnvProps(consulProtocol, e.get(0), consulPort, cbsProtocol, e.get(1), e.get(2))) + .onEmpty(() -> log.warn("Some required environment variables are missing")) + .peek(props -> log.info(f("Discovered following environment variables: '%s'", props))); + } + + private static Option<String> getAppName(Map<String, String> environmentVariables) { + return environmentVariables.get("HOSTNAME") + .orElse(environmentVariables.get("SERVICE_NAME")) + .onEmpty(() -> log.warn("App service name (as registered in CBS) (env var: 'HOSTNAME' / 'SERVICE_NAME') " + + "is missing error environment variables.")); + } + + private static Option<String> getCBSName(Map<String, String> environmentVariables) { + return environmentVariables.get("CONFIG_BINDING_SERVICE") + .onEmpty(() -> log.warn("Name of CBS Service (as registered in Consul) (env var: 'CONFIG_BINDING_SERVICE') " + + "is missing from environment variables.")); + } + + private static Integer getConsulPort(Map<String, String> environmentVariables) { + return environmentVariables.get("CONSUL_PORT") + .flatMap(str -> Try(() -> Integer.valueOf(str)) + .onFailure(exc -> log.warn("Consul port is not an integer value", exc)) + .toOption()) + .onEmpty(() -> log.warn("Consul port (env var: 'CONSUL_PORT') is missing from environment variables. " + + "Using default value of 8500")) + .getOrElse(8500); + } + + private static Option<String> getConsulHost(Map<String, String> environmentVariables) { + return environmentVariables.get("CONSUL_HOST") + .onEmpty(() -> log.warn("Consul host (env var: 'CONSUL_HOST') (without port) " + + "is missing from environment variables.")); + } + + private static String getConsulProtocol(Map<String, String> environmentVariables) { + return getProtocolFrom("CONSUL_PROTOCOL", environmentVariables); + } + + private static String getCbsProtocol(Map<String, String> environmentVariables) { + return getProtocolFrom("CBS_PROTOCOL", environmentVariables); + } + + private static String getProtocolFrom(String variableName, Map<String, String> environmentVariables) { + return environmentVariables.get(variableName) + .onEmpty(() -> log.warn("Consul protocol (env var: '" + variableName + "') is missing " + + "from environment variables.")) + .getOrElse("http"); + } + +} diff --git a/src/main/java/org/onap/dcae/controller/EnvProps.java b/src/main/java/org/onap/dcae/controller/EnvProps.java new file mode 100644 index 0000000..a2d381d --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/EnvProps.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.controller; + +import java.util.Objects; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class EnvProps { + + final String consulProtocol; + final String consulHost; + final int consulPort; + final String cbsName; + final String cbsProtocol; + final String appName; + + EnvProps(String consulProtocol, String consulHost, int consulPort, String cbsProtocol, String cbsName, String appName) { + this.consulProtocol = consulProtocol; + this.consulHost = consulHost; + this.consulPort = consulPort; + this.cbsProtocol = cbsProtocol; + this.cbsName = cbsName; + this.appName = appName; + } + + @Override + public String toString() { + return "EnvProps{" + + "consulProtocol='" + consulProtocol + '\'' + + ", consulHost='" + consulHost + '\'' + + ", consulPort=" + consulPort + + ", cbsProtocol='" + cbsProtocol + '\'' + + ", cbsName='" + cbsName + '\'' + + ", appName='" + appName + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EnvProps envProps = (EnvProps) o; + return consulPort == envProps.consulPort && + Objects.equals(consulProtocol, envProps.consulProtocol) && + Objects.equals(consulHost, envProps.consulHost) && + Objects.equals(cbsProtocol, envProps.cbsProtocol) && + Objects.equals(cbsName, envProps.cbsName) && + Objects.equals(appName, envProps.appName); + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java new file mode 100644 index 0000000..1c0d85b --- /dev/null +++ b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java @@ -0,0 +1,207 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.controller; + +import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.SseFeature; +import org.onap.dcae.common.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +import static org.onap.dcae.common.RestapiCallNodeUtil.*; + +public class PersistentEventConnection implements Runnable { + public String event_name; + private String event_description; + private boolean event_sseventUrlEmbed; + private String event_sseventsField; + private String event_sseventsUrl; + private String event_subscriptionTemplate; + private String event_unSubscriptionTemplate; + private String event_ruleId; + private EventConnectionState state; + private volatile boolean running = true; + private static final Logger log = LoggerFactory.getLogger(PersistentEventConnection.class); + + + private RestConfContext ctx; + private AccessController parentCtrllr; + private Map<String, String> eventParaMap; + + public PersistentEventConnection(String event_name, + String event_description, + boolean event_sseventUrlEmbed, + String event_sseventsField, + String event_sseventsUrl, + String event_subscriptionTemplate, + String event_unSubscriptionTemplate, + String event_ruleId, + AccessController parentCtrllr) { + this.event_name = event_name; + this.event_description = event_description; + this.event_sseventUrlEmbed = event_sseventUrlEmbed; + this.event_sseventsField = event_sseventsField; + this.event_sseventsUrl = event_sseventsUrl; + this.event_subscriptionTemplate = event_subscriptionTemplate; + this.event_unSubscriptionTemplate = event_unSubscriptionTemplate; + this.event_ruleId = event_ruleId; + this.state = EventConnectionState.INIT; + + this.ctx = new RestConfContext(); + for (String s : parentCtrllr.getCtx().getAttributeKeySet()) { + this.ctx.setAttribute(s, ctx.getAttribute(s)); + } + this.parentCtrllr = parentCtrllr; + this.eventParaMap = new HashMap<>(); + this.eventParaMap.putAll(parentCtrllr.getParaMap()); + printEventParamMap(); + log.info("New persistent connection created " + event_name); + } + + @Override + public void run() { + Parameters p = null; + try { + modifyEventParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(parentCtrllr.getProperties().authorizationEnabled()) + + parentCtrllr.getCfgInfo().getController_restapiUrl() + + parentCtrllr.getCfgInfo().getController_subscriptionUrl()); + modifyEventParamMap(Constants.KDEFAULT_TEMP_FILENAME, event_subscriptionTemplate); + modifyEventParamMap(Constants.KSETTING_REST_UNAME, parentCtrllr.getCfgInfo().getController_restapiUser()); + modifyEventParamMap(Constants.KSETTING_REST_PASSWORD, parentCtrllr.getCfgInfo().getController_restapiPassword()); + modifyEventParamMap(Constants.KSETTING_HTTP_METHOD, parentCtrllr.getCfgInfo().getController_subsMethod()); + + parentCtrllr.getRestApiCallNode().sendRequest(eventParaMap, ctx, null); + } catch (Exception e) { + log.error("Exception occured!", e); + Thread.currentThread().interrupt(); + } + + /* Retrieve url from result and construct SSE url */ + if (event_sseventUrlEmbed) { + String key = getEventParamMapValue(Constants.KSETTING_RESP_PREFIX).concat(".").concat(event_sseventsField); + log.info("key " + key); + this.event_sseventsUrl = ctx.getAttribute(key); + } + + log.info("SSE received url " + event_sseventsUrl); + try { + p = getParameters(eventParaMap); + } catch (Exception e) { + log.error("Exception occured!", e); + Thread.currentThread().interrupt(); + } + printEventParamMap(); + String url = getUriMethod(parentCtrllr.getProperties().authorizationEnabled()) + + parentCtrllr.getCfgInfo().getController_restapiUrl() + event_sseventsUrl; + Client client = ignoreSslClient().register(SseFeature.class); + WebTarget target = addAuthType(client, p).target(url); + String tokenId = getEventParamMapValue(Constants.KSETTING_TOKENID); + String headerName = "X-ACCESS-TOKEN"; + if (tokenId == null) { + headerName = HttpHeaders.AUTHORIZATION; + tokenId = getAuthorizationToken(parentCtrllr.getCfgInfo().getController_restapiUser(), + parentCtrllr.getCfgInfo().getController_restapiPassword()); + } + AdditionalHeaderWebTarget newTarget = new AdditionalHeaderWebTarget(target, tokenId, headerName); + EventSource eventSource = EventSource.target(newTarget).build(); + eventSource.register(new DataChangeEventListener(this)); + eventSource.open(); + log.info("Connected to SSE source"); + while (running) { + try { + log.info("SSE state " + eventSource.isOpen()); + Thread.sleep(5000); + } catch (InterruptedException ie) { + log.info("Exception: " + ie.getMessage()); + Thread.currentThread().interrupt(); + running = false; + } + } + eventSource.close(); + log.info("Closed connection to SSE source"); + } + + private String getAuthorizationToken(String userName, String password) { + return "Basic " + Base64.getEncoder().encodeToString(( + userName + ":" + password).getBytes()); + } + + private Client ignoreSslClient() { + SSLContext sslcontext = null; + + try { + sslcontext = SSLContext.getInstance("TLS"); + sslcontext.init(null, new TrustManager[]{new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }}, new java.security.SecureRandom()); + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new IllegalStateException(e); + } + + return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build(); + } + + public String getEvent_ruleId() { + return event_ruleId; + } + + public void modifyEventParamMap(String fieldName, String value) { + eventParaMap.put(fieldName, value); + } + + public String getEventParamMapValue(String fieldName) { + return eventParaMap.get(fieldName); + } + + public void printEventParamMap() { + log.info("----------------Event Param Map-------------------"); + for (String name : eventParaMap.keySet()) { + String value = eventParaMap.get(name); + log.info(name + " : " + value); + } + } +} |