summaryrefslogtreecommitdiffstats
path: root/plugins/grToolkit/provider/src/main/java/org/onap/ccsdk/sli/plugins/grtoolkit/GrToolkitProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/grToolkit/provider/src/main/java/org/onap/ccsdk/sli/plugins/grtoolkit/GrToolkitProvider.java')
-rwxr-xr-xplugins/grToolkit/provider/src/main/java/org/onap/ccsdk/sli/plugins/grtoolkit/GrToolkitProvider.java765
1 files changed, 765 insertions, 0 deletions
diff --git a/plugins/grToolkit/provider/src/main/java/org/onap/ccsdk/sli/plugins/grtoolkit/GrToolkitProvider.java b/plugins/grToolkit/provider/src/main/java/org/onap/ccsdk/sli/plugins/grtoolkit/GrToolkitProvider.java
new file mode 100755
index 000000000..7a8b3deb0
--- /dev/null
+++ b/plugins/grToolkit/provider/src/main/java/org/onap/ccsdk/sli/plugins/grtoolkit/GrToolkitProvider.java
@@ -0,0 +1,765 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.plugins.grtoolkit;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nonnull;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.onap.ccsdk.sli.core.dblib.DbLibService;
+import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionManager;
+import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionResponse;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.AdminHealth;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.ClusterActor;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.DatabaseHealth;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.FailoverStatus;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.Health;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.MemberBuilder;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.PropertyKeys;
+import org.onap.ccsdk.sli.plugins.grtoolkit.data.SiteHealth;
+import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.HealthResolver;
+import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SingleNodeHealthResolver;
+import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SixNodeHealthResolver;
+import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.ThreeNodeHealthResolver;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
+import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * API implementation of the {@code GrToolkitService} interface generated from
+ * the gr-toolkit.yang model. The RPCs contained within this class are meant to
+ * run in an architecture agnostic fashion, where the response is repeatable
+ * and predictable across any given node configuration. To facilitate this,
+ * health checking and failover logic has been abstracted into the
+ * {@code HealthResolver} classes.
+ * <p>
+ * Anyone who wishes to write a custom resolver for use with GR Toolkit should
+ * extend the {@code HealthResolver} class. The currently provided resolvers
+ * are useful references for further implementation.
+ *
+ * @author Anthony Haddox
+ * @see GrToolkitService
+ * @see HealthResolver
+ * @see SingleNodeHealthResolver
+ * @see ThreeNodeHealthResolver
+ * @see SixNodeHealthResolver
+ */
+public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
+ private static final String APP_NAME = "gr-toolkit";
+ private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
+ private String akkaConfig;
+ private String httpProtocol;
+ private String siteIdentifier = System.getenv("SITE_NAME");
+ private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
+ private final ExecutorService executor;
+ protected DataBroker dataBroker;
+ protected NotificationPublishService notificationService;
+ protected RpcProviderRegistry rpcRegistry;
+ protected BindingAwareBroker.RpcRegistration<GrToolkitService> rpcRegistration;
+ protected DbLibService dbLib;
+ private String member;
+ private ClusterActor self;
+ private HashMap<String, ClusterActor> memberMap;
+ private Properties properties;
+ private DistributedDataStoreInterface configDatastore;
+ private HealthResolver resolver;
+
+ /**
+ * Constructs the provider for the GR Toolkit API. Dependencies are
+ * injected using the GrToolkit.xml blueprint.
+ *
+ * @param dataBroker The Data Broker
+ * @param notificationProviderService The Notification Service
+ * @param rpcProviderRegistry The RPC Registry
+ * @param configDatastore The Configuration Data Store provided by the controller
+ * @param dbLibService Reference to the controller provided DbLibService
+ */
+ public GrToolkitProvider(DataBroker dataBroker,
+ NotificationPublishService notificationProviderService,
+ RpcProviderRegistry rpcProviderRegistry,
+ DistributedDataStoreInterface configDatastore,
+ DbLibService dbLibService) {
+ log.info("Creating provider for {}", APP_NAME);
+ this.executor = Executors.newFixedThreadPool(1);
+ this.dataBroker = dataBroker;
+ this.notificationService = notificationProviderService;
+ this.rpcRegistry = rpcProviderRegistry;
+ this.configDatastore = configDatastore;
+ this.dbLib = dbLibService;
+ initialize();
+ }
+
+ /**
+ * Initializes some structures necessary to hold health check information
+ * and perform failovers.
+ */
+ private void initialize() {
+ log.info("Initializing provider for {}", APP_NAME);
+ createContainers();
+ setProperties();
+ defineMembers();
+ rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
+ log.info("Initialization complete for {}", APP_NAME);
+ }
+
+ /**
+ * Creates the {@code Properties} object with the contents of
+ * gr-toolkit.properties, found at the {@code SDNC_CONFIG_DIR} directory,
+ * which should be set as an environment variable. If the properties file
+ * is not found, GR Toolkit will not function.
+ */
+ private void setProperties() {
+ log.info("Loading properties from {}", PROPERTIES_FILE);
+ properties = new Properties();
+ File propertiesFile = new File(PROPERTIES_FILE);
+ if(!propertiesFile.exists()) {
+ log.warn("setProperties(): Properties file not found.");
+ } else {
+ try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
+ properties.load(fileInputStream);
+ if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
+ properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
+ }
+ httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
+ akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
+ if(StringUtils.isEmpty(siteIdentifier)) {
+ siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
+ }
+ log.info("setProperties(): Loaded properties.");
+ } catch(IOException e) {
+ log.error("setProperties(): Error loading properties.", e);
+ }
+ }
+ }
+
+ /**
+ * Parses the akka.conf file used by the controller to define an akka
+ * cluster. This method requires the <i>seed-nodes</i> definition to exist
+ * on a single line.
+ */
+ private void defineMembers() {
+ member = configDatastore.getActorUtils().getCurrentMemberName().getName();
+ log.info("defineMembers(): Cluster member: {}", member);
+
+ log.info("defineMembers(): Parsing akka.conf for cluster memberMap...");
+ try {
+ File akkaConfigFile = new File(this.akkaConfig);
+ try(FileReader fileReader = new FileReader(akkaConfigFile);
+ BufferedReader bufferedReader = new BufferedReader(fileReader)) {
+ String line;
+ while((line = bufferedReader.readLine()) != null) {
+ if(line.contains("seed-nodes =")) {
+ parseSeedNodes(line);
+ break;
+ }
+ }
+ }
+ } catch(IOException e) {
+ log.error("defineMembers(): Couldn't load akka", e);
+ } catch(NullPointerException e) {
+ log.error("defineMembers(): akkaConfig is null. Check properties file and restart {} bundle.", APP_NAME);
+ log.error("defineMembers(): NullPointerException", e);
+ }
+ log.info("self:\n{}", self);
+ }
+
+ /**
+ * Sets up the {@code InstanceIdentifier}s for Data Store transactions.
+ */
+ private void createContainers() {
+ // Replace with MD-SAL write for FailoverStatus
+ }
+
+ /**
+ * Shuts down the {@code ExecutorService} and closes the RPC Provider Registry.
+ */
+ @Override
+ public void close() throws Exception {
+ log.info("Closing provider for {}", APP_NAME);
+ executor.shutdown();
+ rpcRegistration.close();
+ log.info("close(): Successfully closed provider for {}", APP_NAME);
+ }
+
+ /**
+ * Listens for changes to the Data tree.
+ *
+ * @param changes Data tree changes.
+ */
+ @Override
+ public void onDataTreeChanged(@Nonnull Collection changes) {
+ log.info("onDataTreeChanged(): No changes.");
+ }
+
+ /**
+ * Makes a call to {@code resolver.getClusterHealth()} to determine the
+ * health of the akka clustered controllers.
+ *
+ * @param input request body adhering to the model for
+ * {@code ClusterHealthInput}
+ * @return response adhering to the model for {@code ClusterHealthOutput}
+ * @see HealthResolver
+ * @see ClusterHealthInput
+ * @see ClusterHealthOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
+ log.info("{}:cluster-health invoked.", APP_NAME);
+ resolver.getClusterHealth();
+ return buildClusterHealthOutput();
+ }
+
+ /**
+ * Makes a call to {@code resolver.getSiteHealth()} to determine the health
+ * of all of the application components of a site. In a multi-site config,
+ * this will gather the health of all sites.
+ *
+ * @param input request body adhering to the model for
+ * {@code SiteHealthInput}
+ * @return response adhering to the model for {@code SiteHealthOutput}
+ * @see HealthResolver
+ * @see SiteHealthInput
+ * @see SiteHealthOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
+ log.info("{}:site-health invoked.", APP_NAME);
+ List<SiteHealth> sites = resolver.getSiteHealth();
+ return buildSiteHealthOutput(sites);
+ }
+
+ /**
+ * Makes a call to {@code resolver.getDatabaseHealth()} to determine the
+ * health of the database(s) used by the controller.
+ *
+ * @param input request body adhering to the model for
+ * {@code DatabaseHealthInput}
+ * @return response adhering to the model for {@code DatabaseHealthOutput}
+ * @see HealthResolver
+ * @see DatabaseHealthInput
+ * @see DatabaseHealthOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
+ log.info("{}:database-health invoked.", APP_NAME);
+ DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
+ DatabaseHealth health = resolver.getDatabaseHealth();
+ outputBuilder.setStatus(health.getHealth().equals(Health.HEALTHY) ? "200" : "500");
+ outputBuilder.setHealth(health.getHealth().toString());
+ outputBuilder.setServedBy(member);
+ log.info("databaseHealth(): Health: {}", health.getHealth());
+ return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
+ }
+
+ /**
+ * Makes a call to {@code resolver.getAdminHealth()} to determine the
+ * health of the administrative portal(s) used by the controller.
+ *
+ * @param input request body adhering to the model for
+ * {@code AdminHealthInput}
+ * @return response adhering to the model for {@code AdminHealthOutput}
+ * @see HealthResolver
+ * @see AdminHealthInput
+ * @see AdminHealthOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
+ log.info("{}:admin-health invoked.", APP_NAME);
+ AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
+ AdminHealth adminHealth = resolver.getAdminHealth();
+ outputBuilder.setStatus(Integer.toString(adminHealth.getStatusCode()));
+ outputBuilder.setHealth(adminHealth.getHealth().toString());
+ outputBuilder.setServedBy(member);
+ log.info("adminHealth(): Status: {} | Health: {}", adminHealth.getStatusCode(), adminHealth.getHealth());
+ return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
+ }
+
+ /**
+ * Places IP Tables rules in place to drop akka communications traffic with
+ * one or mode nodes. This method does not not perform any checks to see if
+ * rules currently exist, and assumes success.
+ *
+ * @param input request body adhering to the model for
+ * {@code HaltAkkaTrafficInput}
+ * @return response adhering to the model for {@code HaltAkkaTrafficOutput}
+ * @see HaltAkkaTrafficInput
+ * @see HaltAkkaTrafficOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
+ log.info("{}:halt-akka-traffic invoked.", APP_NAME);
+ HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
+ outputBuilder.setStatus("200");
+ modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
+ outputBuilder.setServedBy(member);
+
+ return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
+ }
+
+ /**
+ * Removes IP Tables rules in place to permit akka communications traffic
+ * with one or mode nodes. This method does not not perform any checks to
+ * see if rules currently exist, and assumes success.
+ *
+ * @param input request body adhering to the model for
+ * {@code ResumeAkkaTrafficInput}
+ * @return response adhering to the model for {@code ResumeAkkaTrafficOutput}
+ * @see ResumeAkkaTrafficInput
+ * @see ResumeAkkaTrafficOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
+ log.info("{}:resume-akka-traffic invoked.", APP_NAME);
+ ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
+ outputBuilder.setStatus("200");
+ modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
+ outputBuilder.setServedBy(member);
+
+ return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
+ }
+
+ /**
+ * Returns a canned response containing the identifier for this
+ * controller's site.
+ *
+ * @param input request body adhering to the model for
+ * {@code SiteIdentifierInput}
+ * @return response adhering to the model for {@code SiteIdentifierOutput}
+ * @see SiteIdentifierInput
+ * @see SiteIdentifierOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
+ log.info("{}:site-identifier invoked.", APP_NAME);
+ SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
+ outputBuilder.setStatus("200");
+ outputBuilder.setId(siteIdentifier);
+ outputBuilder.setServedBy(member);
+ return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
+ }
+
+ /**
+ * Makes a call to {@code resolver.tryFailover()} to try a failover defined
+ * by the active {@code HealthResolver}.
+ *
+ * @param input request body adhering to the model for
+ * {@code FailoverInput}
+ * @return response adhering to the model for {@code FailoverOutput}
+ * @see HealthResolver
+ * @see FailoverInput
+ * @see FailoverOutput
+ */
+ @Override
+ public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
+ log.info("{}:failover invoked.", APP_NAME);
+ FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
+ FailoverStatus failoverStatus = resolver.tryFailover(input);
+ outputBuilder.setServedBy(member);
+ outputBuilder.setMessage(failoverStatus.getMessage());
+ outputBuilder.setStatus(Integer.toString(failoverStatus.getStatusCode()));
+ log.info("{}:{}.", APP_NAME, failoverStatus.getMessage());
+ return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
+ }
+
+ /**
+ * Performs an akka traffic isolation of the active site from the standby
+ * site in an Active/Standby architecture. Invokes the
+ * {@code halt-akka-traffic} RPC against the standby site nodes using the
+ * information of the active site nodes.
+ *
+ * @param activeSite list of nodes in the active site
+ * @param standbySite list of nodes in the standby site
+ * @param port http or https port of the controller
+ * @deprecated No longer used since the refactor to use the HealthResolver
+ * pattern. Retained so the logic can be replicated later.
+ */
+ @Deprecated
+ private void isolateSiteFromCluster(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
+ log.info("isolateSiteFromCluster(): Halting Akka traffic...");
+ for(ClusterActor actor : standbySite) {
+ try {
+ log.info("Halting Akka traffic for: {}", actor.getNode());
+ // Build JSON with activeSite actor Node and actor AkkaPort
+ JSONObject akkaInput = new JSONObject();
+ JSONObject inputBlock = new JSONObject();
+ JSONArray votingStateArray = new JSONArray();
+ JSONObject nodeInfo;
+ for(ClusterActor node : activeSite) {
+ nodeInfo = new JSONObject();
+ nodeInfo.put("node", node.getNode());
+ nodeInfo.put("port", node.getAkkaPort());
+ votingStateArray.put(nodeInfo);
+ }
+ inputBlock.put("node-info", votingStateArray);
+ akkaInput.put("input", inputBlock);
+ ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", ConnectionManager.HttpMethod.POST, akkaInput.toString(), "");
+ } catch(IOException e) {
+ log.error("isolateSiteFromCluster(): Could not halt Akka traffic for: " + actor.getNode(), e);
+ }
+ }
+ }
+
+ /**
+ * Invokes the down unreachable action through the Jolokia mbean API.
+ *
+ * @param activeSite list of nodes in the active site
+ * @param standbySite list of nodes in the standby site
+ * @param port http or https port of the controller
+ * @deprecated No longer used since the refactor to use the HealthResolver
+ * pattern. Retained so the logic can be replicated later.
+ */
+ @Deprecated
+ private void downUnreachableNodes(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
+ log.info("downUnreachableNodes(): Setting site unreachable...");
+ JSONObject jolokiaInput = new JSONObject();
+ jolokiaInput.put("type", "EXEC");
+ jolokiaInput.put("mbean", "akka:type=Cluster");
+ jolokiaInput.put("operation", "down");
+ JSONArray arguments = new JSONArray();
+ for(ClusterActor actor : activeSite) {
+ // Build Jolokia input
+ // May need to change from akka port to actor.getAkkaPort()
+ arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
+ }
+ jolokiaInput.put("arguments", arguments);
+ log.debug("downUnreachableNodes(): {}", jolokiaInput);
+ try {
+ log.info("downUnreachableNodes(): Setting nodes unreachable");
+ ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", ConnectionManager.HttpMethod.POST, jolokiaInput.toString(), "");
+ } catch(IOException e) {
+ log.error("downUnreachableNodes(): Error setting nodes unreachable", e);
+ }
+ }
+
+ /**
+ * Triggers a data backup and export sequence of MD-SAL data. Invokes the
+ * {@code data-export-import:schedule-export} RPC to schedule a data export
+ * and subsequently the {@code daexim-offsite-backup:backup-data} RPC
+ * against the active site to export and backup the data. Assumes the
+ * controllers have the org.onap.ccsdk.sli.northbound.daeximoffsitebackup
+ * bundle installed.
+ *
+ * @param activeSite list of nodes in the active site
+ * @param port http or https port of the controller
+ * @deprecated No longer used since the refactor to use the HealthResolver
+ * pattern. Retained so the logic can be replicated later.
+ */
+ @Deprecated
+ private void backupMdSal(ArrayList<ClusterActor> activeSite, String port) {
+ log.info("backupMdSal(): Backing up data...");
+ try {
+ log.info("backupMdSal(): Scheduling backup for: {}", activeSite.get(0).getNode());
+ ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", ConnectionManager.HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }", "");
+ } catch(IOException e) {
+ log.error("backupMdSal(): Error backing up MD-SAL", e);
+ }
+ for(ClusterActor actor : activeSite) {
+ try {
+ // Move data offsite
+ log.info("backupMdSal(): Backing up data for: {}", actor.getNode());
+ ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", ConnectionManager.HttpMethod.POST, null, "");
+ } catch(IOException e) {
+ log.error("backupMdSal(): Error backing up data.", e);
+ }
+ }
+ }
+
+ /**
+ * Builds a response object for {@code clusterHealth()}. Sorts and iterates
+ * over the contents of the {@code memberMap}, which contains the health
+ * information of the cluster, and adds them to the {@code outputBuilder}.
+ * If the ClusterActor is healthy, according to
+ * {@code resolver.isControllerHealthy()}, the {@code ClusterHealthOutput}
+ * status has a {@code 0} appended, otherwise a {@code 1} is appended. A
+ * status of all zeroes denotes a healthy cluster. This status should be
+ * easily decoded by tools which use the output.
+ *
+ * @return future containing a completed {@code ClusterHealthOutput}
+ * @see ClusterActor
+ * @see ClusterHealthOutput
+ * @see HealthResolver
+ */
+ @SuppressWarnings("unchecked")
+ private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput() {
+ ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
+ outputBuilder.setServedBy(member);
+ List memberList = new ArrayList<Member>();
+ StringBuilder stat = new StringBuilder();
+ memberMap.values()
+ .stream()
+ .sorted(Comparator.comparingInt(member -> Integer.parseInt(member.getMember().split("-")[1])))
+ .forEach(member -> {
+ memberList.add(new MemberBuilder(member).build());
+ // 0 is a healthy controller, 1 is unhealthy.
+ // The list is sorted so users can decode to find unhealthy nodes
+ // This will also let them figure out health on a per-site basis
+ // Depending on any tools they use with this API
+ if(resolver.isControllerHealthy(member)) {
+ stat.append("0");
+ } else {
+ stat.append("1");
+ }
+ });
+ outputBuilder.setStatus(stat.toString());
+ outputBuilder.setMembers(memberList);
+ RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
+ return Futures.immediateFuture(rpcResult);
+ }
+
+ /**
+ * Builds a response object for {@code siteHealth()}. Iterates over a list
+ * of {@code SiteHealth} objects and populates the {@code SiteHealthOutput}
+ * with the information.
+ *
+ * @param sites list of sites
+ * @return future containing a completed {@code SiteHealthOutput}
+ * @see SiteHealth
+ * @see HealthResolver
+ */
+ @SuppressWarnings("unchecked")
+ private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(List<SiteHealth> sites) {
+ SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
+ SitesBuilder siteBuilder = new SitesBuilder();
+ outputBuilder.setStatus("200");
+ outputBuilder.setSites((List) new ArrayList<Site>());
+
+ for(SiteHealth site : sites) {
+ siteBuilder.setHealth(site.getHealth().toString());
+ siteBuilder.setRole(site.getRole());
+ siteBuilder.setId(site.getId());
+ outputBuilder.getSites().add(siteBuilder.build());
+ log.info("buildSiteHealthOutput(): Health for {}: {}", site.getId(), site.getHealth().getHealth());
+ }
+
+ outputBuilder.setServedBy(member);
+ RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
+ return Futures.immediateFuture(rpcResult);
+ }
+
+ /**
+ * Parses a line containing the akka networking information of the akka
+ * controller cluster. Assumes entries of the format:
+ * <p>
+ * akka.tcp://opendaylight-cluster-data@<FQDN>:<AKKA_PORT>
+ * <p>
+ * The information is stored in a {@code ClusterActor} object, and then
+ * added to the memberMap HashMap, with the {@code FQDN} as the key. The
+ * final step is a call to {@code createHealthResolver} to create the
+ * health resolver for the provider.
+ *
+ * @param line the line containing all of the seed nodes
+ * @see ClusterActor
+ * @see HealthResolver
+ */
+ private void parseSeedNodes(String line) {
+ memberMap = new HashMap<>();
+ line = line.substring(line.indexOf("[\""), line.indexOf(']'));
+ String[] splits = line.split(",");
+
+ for(int ndx = 0; ndx < splits.length; ndx++) {
+ String nodeName = splits[ndx];
+ int delimLocation = nodeName.indexOf('@');
+ String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
+ splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
+ log.info("parseSeedNodes(): Adding node: {}:{}", splits[ndx], port);
+ ClusterActor clusterActor = new ClusterActor();
+ clusterActor.setNode(splits[ndx]);
+ clusterActor.setAkkaPort(port);
+ clusterActor.setMember("member-" + (ndx + 1));
+ if(member.equals(clusterActor.getMember())) {
+ self = clusterActor;
+ }
+ memberMap.put(clusterActor.getNode(), clusterActor);
+ log.info("parseSeedNodes(): {}", clusterActor);
+ }
+
+ createHealthResolver();
+ }
+
+ /**
+ * Creates the specific health resolver requested by the user, as specified
+ * in the gr-toolkit.properties file. If a resolver is not specified, or
+ * there is an issue creating the resolver, it will use a fallback resolver
+ * based on how many nodes are added to the memberMap HashMap.
+ *
+ * @see HealthResolver
+ * @see SingleNodeHealthResolver
+ * @see ThreeNodeHealthResolver
+ * @see SixNodeHealthResolver
+ */
+ private void createHealthResolver() {
+ log.info("createHealthResolver(): Creating health resolver...");
+ try {
+ Class resolverClass = null;
+ String userDefinedResolver = properties.getProperty(PropertyKeys.RESOLVER);
+ if(StringUtils.isEmpty(userDefinedResolver)) {
+ throw new InstantiationException();
+ }
+ resolverClass = Class.forName(userDefinedResolver);
+ Class[] types = { Map.class , properties.getClass(), DbLibService.class };
+ Constructor<HealthResolver> constructor = resolverClass.getConstructor(types);
+ Object[] parameters = { memberMap, properties, dbLib };
+ resolver = constructor.newInstance(parameters);
+ log.info("createHealthResolver(): Created resolver from name {}", resolver.toString());
+ } catch(ClassNotFoundException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
+ log.warn("createHealthResolver(): Could not create user defined resolver", e);
+ if(memberMap.size() == 1) {
+ log.info("createHealthResolver(): FALLBACK: Initializing SingleNodeHealthResolver...");
+ resolver = new SingleNodeHealthResolver(memberMap, properties, dbLib);
+ } else if(memberMap.size() == 3) {
+ log.info("createHealthResolver(): FALLBACK: Initializing ThreeNodeHealthResolver...");
+ resolver = new ThreeNodeHealthResolver(memberMap, properties, dbLib);
+ } else if(memberMap.size() == 6) {
+ log.info("createHealthResolver(): FALLBACK: Initializing SixNodeHealthResolver...");
+ resolver = new SixNodeHealthResolver(memberMap, properties, dbLib);
+ }
+ }
+ }
+
+ /**
+ * Adds or drops IPTables rules to block or resume akka traffic for a node
+ * in the akka cluster. Assumes that the user or group that the controller
+ * is run as has the ability to run sudo /sbin/iptables without requiring a
+ * password. This method will run indefinitely if that assumption is not
+ * correct. This method does not check to see if any rules around the node
+ * are preexisting, so multiple uses will result in multiple additions and
+ * removals from IPTables.
+ *
+ * @param task the operation to be performed against IPTables
+ * @param nodeInfo array containing the nodes to be added or dropped from
+ * IPTables
+ */
+ private void modifyIpTables(IpTables task, Object[] nodeInfo) {
+ log.info("modifyIpTables(): Modifying IPTables rules...");
+ if(task == IpTables.ADD) {
+ for(Object node : nodeInfo) {
+ org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
+ (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
+ log.info("modifyIpTables(): Isolating {}", n.getNode());
+ executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
+ executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
+ }
+ } else if(task == IpTables.DELETE) {
+ for(Object node : nodeInfo) {
+ org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
+ (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
+ log.info("modifyIpTables(): De-isolating {}", n.getNode());
+ executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
+ executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
+ }
+ }
+ if(nodeInfo.length > 0) {
+ executeCommand("sudo /sbin/iptables -L");
+ }
+ }
+
+ /**
+ * Opens a shell session and executes a command.
+ *
+ * @param command the shell command to execute
+ */
+ private void executeCommand(String command) {
+ log.info("executeCommand(): Executing command: {}", command);
+ String[] cmd = command.split(" ");
+ try {
+ Process p = Runtime.getRuntime().exec(cmd);
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ String inputLine;
+ StringBuilder content = new StringBuilder();
+ while((inputLine = bufferedReader.readLine()) != null) {
+ content.append(inputLine);
+ }
+ bufferedReader.close();
+ log.info("executeCommand(): {}", content);
+ } catch(IOException e) {
+ log.error("executeCommand(): Error executing command", e);
+ }
+ }
+
+ /**
+ * The IPTables operations this module can perform.
+ */
+ enum IpTables {
+ ADD,
+ DELETE
+ }
+} \ No newline at end of file