aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/janusgraph/JanusGraphClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-dao/src/main/java/org/openecomp/sdc/be/dao/janusgraph/JanusGraphClient.java')
-rw-r--r--catalog-dao/src/main/java/org/openecomp/sdc/be/dao/janusgraph/JanusGraphClient.java412
1 files changed, 412 insertions, 0 deletions
diff --git a/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/janusgraph/JanusGraphClient.java b/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/janusgraph/JanusGraphClient.java
new file mode 100644
index 0000000000..5be907cc86
--- /dev/null
+++ b/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/janusgraph/JanusGraphClient.java
@@ -0,0 +1,412 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.dao.janusgraph;
+
+import org.janusgraph.core.*;
+import org.janusgraph.core.schema.ConsistencyModifier;
+import org.janusgraph.core.schema.JanusGraphIndex;
+import org.janusgraph.core.schema.JanusGraphManagement;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.ResourceUnavailableException;
+import org.janusgraph.diskstorage.locking.PermanentLockingException;
+import org.janusgraph.graphdb.database.idassigner.IDPoolExhaustedException;
+import fj.data.Either;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.dao.DAOJanusGraphStrategy;
+import org.openecomp.sdc.be.dao.JanusGraphClientStrategy;
+import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.*;
+
+
+@Component("janusgraph-client")
+public class JanusGraphClient {
+ private static Logger logger = LoggerFactory.getLogger(JanusGraphClient.class.getName());
+ private static Logger healthLogger = LoggerFactory.getLogger("janusgraph.healthcheck");
+
+ private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
+ private static final String OK = "GOOD";
+
+ public JanusGraphClient() {
+ }
+
+ private class HealthCheckTask implements Callable<Vertex> {
+ @Override
+ public Vertex call() {
+
+ JanusGraphVertex v = (JanusGraphVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
+ JanusGraphVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
+ healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
+ graph.tx().commit();
+
+ return v;
+ }
+ }
+
+ private class HealthCheckScheduledTask implements Runnable {
+ @Override
+ public void run() {
+ healthLogger.trace("Executing janusGraph Health Check Task - Start");
+ boolean healthStatus = isGraphOpen();
+ healthLogger.trace("Executing janusGraph Health Check Task - Status = {}", healthStatus);
+ if (healthStatus != lastHealthState) {
+ logger.trace("janusGraph Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
+ lastHealthState = healthStatus;
+ logAlarm();
+ }
+ }
+ }
+
+ private class ReconnectTask implements Runnable {
+ @Override
+ public void run() {
+ logger.trace("Trying to reconnect to JanusGraph...");
+ if (graph == null) {
+ createGraph(janusGraphCfgFile);
+ }
+ }
+ }
+
+ private JanusGraph graph;
+
+ // Health Check Variables
+
+ /**
+ * This executor will execute the health check task on a callable task that can be executed with a timeout.
+ */
+ ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "JanusGraph-Health-Check-Thread");
+ }
+ });
+ private long healthCheckReadTimeout = 2;
+ HealthCheckTask healthCallableTask = new HealthCheckTask();
+ HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
+ boolean lastHealthState = false;
+
+ // Reconnection variables
+ private ScheduledExecutorService reconnectScheduler = null;
+ private ScheduledExecutorService healthCheckScheduler = null;
+ private Runnable reconnectTask = null;
+ private long reconnectInterval = 3;
+ @SuppressWarnings("rawtypes")
+ private Future reconnectFuture;
+
+ private String janusGraphCfgFile = null;
+ JanusGraphClientStrategy janusGraphClientStrategy;
+
+ public JanusGraphClient(JanusGraphClientStrategy janusGraphClientStrategy) {
+ super();
+ this.janusGraphClientStrategy = janusGraphClientStrategy;
+
+ // Initialize a single threaded scheduler for health-check
+ this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "JanusGraph-Health-Check-Task");
+ }
+ });
+
+ healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphHealthCheckReadTimeout(2);
+ reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphReconnectIntervalInSeconds(3);
+
+ logger.info("** JanusGraphClient created");
+ }
+
+ @PostConstruct
+ public JanusGraphOperationStatus createGraph() {
+
+ logger.info("** createGraph started **");
+
+ if (ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphInMemoryGraph()) {
+ BaseConfiguration conf = new BaseConfiguration();
+ conf.setProperty("storage.backend", "inmemory");
+ graph = JanusGraphFactory.open(conf);
+ createJanusGraphSchema();
+ logger.info("** in memory graph created");
+ return JanusGraphOperationStatus.OK;
+ } else {
+ this.janusGraphCfgFile = janusGraphClientStrategy.getConfigFile();
+ if (janusGraphCfgFile == null || janusGraphCfgFile.isEmpty()) {
+ janusGraphCfgFile = "config/janusgraph.properties";
+ }
+
+ // yavivi
+ // In case connection failed on init time, schedule a reconnect task
+ // in the BG
+ JanusGraphOperationStatus status = createGraph(janusGraphCfgFile);
+ logger.debug("Create JanusGraph graph status {}", status);
+ if (status != JanusGraphOperationStatus.OK) {
+ this.startReconnectTask();
+ }
+
+ return status;
+ }
+ }
+
+ private void startHealthCheckTask() {
+ this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
+ }
+
+ /**
+ * This method will be invoked ONLY on init time in case JanusGraph storage is down.
+ */
+ private void startReconnectTask() {
+ this.reconnectTask = new ReconnectTask();
+ // Initialize a single threaded scheduler
+ this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "JanusGraph-Reconnect-Task");
+ }
+ });
+
+ logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
+ reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
+ }
+
+ public void cleanupGraph() {
+ if (graph != null) {
+ // graph.shutdown();
+ graph.close();
+ try {
+ JanusGraphFactory.drop(graph);
+ } catch (BackendException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private boolean graphInitialized(){
+ JanusGraphManagement graphMgmt = graph.openManagement();
+ return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
+ }
+
+
+ public JanusGraphOperationStatus createGraph(String janusGraphCfgFile) {
+ logger.info("** open graph with {} started", janusGraphCfgFile);
+ try {
+ logger.info("openGraph : try to load file {}", janusGraphCfgFile);
+ graph = JanusGraphFactory.open(janusGraphCfgFile);
+ if (graph.isClosed() || !graphInitialized()) {
+ logger.error("janusgraph graph was not initialized");
+ return JanusGraphOperationStatus.NOT_CREATED;
+ }
+
+ } catch (Exception e) {
+ this.graph = null;
+ logger.info("createGraph : failed to open JanusGraph graph with configuration file: {}", janusGraphCfgFile);
+ logger.debug("createGraph : failed with exception.", e);
+ return JanusGraphOperationStatus.NOT_CONNECTED;
+ }
+
+ logger.info("** JanusGraph graph created ");
+
+ // Do some post creation actions
+ this.onGraphOpened();
+
+ return JanusGraphOperationStatus.OK;
+ }
+
+ private void onGraphOpened() {
+ // if a reconnect task is running, cancel it.
+ if (this.reconnectFuture != null) {
+ logger.info("** Cancelling JanusGraph reconnect task");
+ reconnectFuture.cancel(true);
+ }
+
+ // create health-check node
+ if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
+ logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
+ Vertex healthCheckNode = graph.addVertex();
+ healthCheckNode.property(HEALTH_CHECK, OK);
+ logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
+ graph.tx().commit();
+ } else {
+ logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
+ }
+ this.startHealthCheckTask();
+ }
+
+
+ public Either<JanusGraph, JanusGraphOperationStatus> getGraph() {
+ if (graph != null) {
+ return Either.left(graph);
+ } else {
+ return Either.right(JanusGraphOperationStatus.NOT_CREATED);
+ }
+ }
+
+ public JanusGraphOperationStatus commit() {
+ if (graph != null) {
+ try {
+ graph.tx().commit();
+ return JanusGraphOperationStatus.OK;
+ } catch (Exception e) {
+ return handleJanusGraphException(e);
+ }
+ } else {
+ return JanusGraphOperationStatus.NOT_CREATED;
+ }
+ }
+
+ public JanusGraphOperationStatus rollback() {
+ if (graph != null) {
+ try {
+ // graph.rollback();
+ graph.tx().rollback();
+ return JanusGraphOperationStatus.OK;
+ } catch (Exception e) {
+ return handleJanusGraphException(e);
+ }
+ } else {
+ return JanusGraphOperationStatus.NOT_CREATED;
+ }
+ }
+
+ public static JanusGraphOperationStatus handleJanusGraphException(Exception e) {
+ if (e instanceof JanusGraphConfigurationException) {
+ return JanusGraphOperationStatus.JANUSGRAPH_CONFIGURATION;
+ }
+ if (e instanceof SchemaViolationException) {
+ return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
+ }
+ if (e instanceof PermanentLockingException) {
+ return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
+ }
+ if (e instanceof IDPoolExhaustedException) {
+ return JanusGraphOperationStatus.GENERAL_ERROR;
+ }
+ if (e instanceof InvalidElementException) {
+ return JanusGraphOperationStatus.INVALID_ELEMENT;
+ }
+ if (e instanceof InvalidIDException) {
+ return JanusGraphOperationStatus.INVALID_ID;
+ }
+ if (e instanceof QueryException) {
+ return JanusGraphOperationStatus.INVALID_QUERY;
+ }
+ if (e instanceof ResourceUnavailableException) {
+ return JanusGraphOperationStatus.RESOURCE_UNAVAILABLE;
+ }
+ if (e instanceof IllegalArgumentException) {
+ // TODO check the error message??
+ return JanusGraphOperationStatus.ILLEGAL_ARGUMENT;
+ }
+
+ return JanusGraphOperationStatus.GENERAL_ERROR;
+ }
+
+ public boolean getHealth() {
+ return this.lastHealthState;
+ }
+
+ private boolean isGraphOpen() {
+ healthLogger.trace("Invoking JanusGraph health check ...");
+ Vertex v = null;
+ if (graph != null) {
+ try {
+ Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
+ v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
+ healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
+ graph.tx().commit();
+ } catch (Exception e) {
+ String message = e.getMessage();
+ if (message == null) {
+ message = e.getClass().getName();
+ }
+ logger.error("JanusGraph Health Check Failed. {}", message);
+ return false;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+
+ public static void main(String[] args) throws InterruptedException {
+ JanusGraphClient client = new JanusGraphClient(new DAOJanusGraphStrategy());
+ client.createGraph();
+
+ while (true) {
+ boolean health = client.isGraphOpen();
+ System.err.println("health=" + health);
+ Thread.sleep(2000);
+ }
+
+ }
+
+
+ private static final String JANUSGRAPH_HEALTH_CHECK = "janusgraphHealthCheck";
+
+ private void logAlarm() {
+ if (lastHealthState) {
+ BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphRecovery(JANUSGRAPH_HEALTH_CHECK);
+ } else {
+ BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphError(JANUSGRAPH_HEALTH_CHECK);
+ }
+ }
+
+ private void createJanusGraphSchema() {
+
+ JanusGraphManagement graphMgt = graph.openManagement();
+ JanusGraphIndex index = null;
+ for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
+ PropertyKey propKey = null;
+ if (!graphMgt.containsPropertyKey(prop.getProperty())) {
+ Class<?> clazz = prop.getClazz();
+ if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
+ propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
+ }
+ } else {
+ propKey = graphMgt.getPropertyKey(prop.getProperty());
+ }
+ if (prop.isIndexed()) {
+ if (!graphMgt.containsGraphIndex(prop.getProperty())) {
+ if (prop.isUnique()) {
+ index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
+ // Ensures only one name per vertex
+ graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
+ // Ensures name uniqueness in the graph
+ graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
+
+ } else {
+ graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();
+ }
+ }
+ }
+ }
+ graphMgt.commit();
+ }
+
+}