aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/historytruncate/HistoryTruncate.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/historytruncate/HistoryTruncate.java')
-rw-r--r--src/main/java/org/onap/aai/historytruncate/HistoryTruncate.java518
1 files changed, 518 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/historytruncate/HistoryTruncate.java b/src/main/java/org/onap/aai/historytruncate/HistoryTruncate.java
new file mode 100644
index 0000000..45b5d04
--- /dev/null
+++ b/src/main/java/org/onap/aai/historytruncate/HistoryTruncate.java
@@ -0,0 +1,518 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.historytruncate;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.aai.config.PropertyPasswordConfiguration;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.onap.aai.dbmap.AAIGraph;
+import org.onap.aai.exceptions.AAIException;
+import org.onap.aai.logging.ErrorLogHelper;
+import org.onap.aai.logging.LogFormatTools;
+import org.onap.aai.util.AAIConfig;
+import org.onap.aai.util.AAIConstants;
+import org.onap.aai.util.AAISystemExitUtil;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+
+import com.att.eelf.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.janusgraph.core.JanusGraph;
+
+public class HistoryTruncate {
+
+ private static Logger LOGGER = LoggerFactory.getLogger(HistoryTruncate.class);
+
+ /* Using realtime d */
+ private static final String REALTIME_DB = "realtime";
+
+ private static final String LOG_ONLY_MODE = "LOG_ONLY";
+ private static final String DELETE_AND_LOG_MODE = "DELETE_AND_LOG";
+ private static final String SILENT_DELETE_MODE = "SILENT_DELETE";
+ static ArrayList <String> VALIDMODES = new <String> ArrayList ();
+ static {
+ VALIDMODES.add(LOG_ONLY_MODE);
+ VALIDMODES.add(DELETE_AND_LOG_MODE);
+ VALIDMODES.add(SILENT_DELETE_MODE);
+ }
+
+ private static final int batchCommitSize = 500;
+
+ private static boolean historyEnabled;
+ private static String defaultTruncateMode;
+ private static Integer defaultTruncateWindowDays;
+
+
+ /**
+ * The main method.
+ *
+ */
+ public static void main(String[] args) {
+
+ // Set the logging file properties to be used by EELFManager
+ System.setProperty("aai.service.name", HistoryTruncate.class.getSimpleName());
+ Properties props = System.getProperties();
+ props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS);
+ props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
+
+ AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
+ PropertyPasswordConfiguration initializer = new PropertyPasswordConfiguration();
+ initializer.initialize(ctx);
+ try {
+ ctx.scan(
+ "org.onap.aai.config",
+ "org.onap.aai.setup"
+ );
+ ctx.refresh();
+ } catch (Exception e) {
+ LOGGER.error("Error - Could not initialize context beans for HistoryTruncate. ");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+
+ historyEnabled = Boolean.parseBoolean(ctx.getEnvironment().getProperty("history.enabled","false"));
+ if( !historyEnabled ) {
+ String emsg = "Error - HistoryTruncate may only be used when history.enabled=true. ";
+ System.out.println(emsg);
+ LOGGER.error(emsg);
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+
+ defaultTruncateWindowDays = Integer.parseInt(ctx.getEnvironment().getProperty("history.truncate.window.days","999"));
+ defaultTruncateMode = ctx.getEnvironment().getProperty("history.truncate.mode",LOG_ONLY_MODE);
+
+ HistoryTruncate histTrunc = new HistoryTruncate();
+ boolean success = histTrunc.executeCommand(args);
+ if(success){
+ AAISystemExitUtil.systemExitCloseAAIGraph(0);
+ } else {
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+
+ }// End of main()
+
+
+ public boolean executeCommand(String[] args) {
+ boolean successStatus = true;
+ // If they passed in args on the command line, then we should
+ // use those in place of the default ones we got from environment variables.
+ // "-truncateMode","LOG_ONLY","-truncateWindow","999"
+ String truncateMode = defaultTruncateMode;
+ int truncateWindowDays = defaultTruncateWindowDays;
+
+ if (args != null && args.length > 0) {
+ // They passed some arguments in that will affect processing
+ for (int i = 0; i < args.length; i++) {
+ String thisArg = args[i];
+ if (thisArg.equals("-truncateMode")) {
+ i++;
+ if (i >= args.length) {
+ LOGGER.error(" No value passed with -truncateMode option. ");
+ return false;
+ }
+ if( !VALIDMODES.contains(args[i]) ) {
+ LOGGER.error(" Unrecognized -truncateMode value passed: [" +
+ args[i] + "]. Valid values = " + VALIDMODES.toString() );
+ return false;
+ }
+ truncateMode = args[i];
+ } else if (thisArg.equals("-truncateWindowDays")) {
+ i++;
+ if (i >= args.length) {
+ LOGGER.error("No value passed with -truncateWindowDays option.");
+ return false;
+ }
+ String nextArg = args[i];
+ try {
+ truncateWindowDays = Integer.parseInt(nextArg);
+ } catch (Exception e) {
+ LOGGER.error("Bad value passed with -truncateWindowDays option: ["
+ + nextArg + "]");
+ return false;
+ }
+ } else {
+ LOGGER.error(" Unrecognized argument passed to HistoryTruncate: ["
+ + thisArg + "]. ");
+ LOGGER.error(" Valid values are: -truncateMode -truncateWindowDays ");
+ return false;
+ }
+ }
+ }
+
+ LOGGER.debug(" Running HistoryTruncate with: truncateMode = " + truncateMode +
+ ", truncateWindowDays = " + truncateWindowDays );
+
+ Long truncateEndTs = calculateTruncWindowEndTimeStamp(truncateWindowDays);
+ JanusGraph jgraph = null;
+ long scriptStartTime = System.currentTimeMillis();
+ Boolean doLogging = doLoggingOrNot( truncateMode );
+ Boolean doDelete = doDeleteOrNot( truncateMode );
+
+ try {
+ AAIConfig.init();
+ ErrorLogHelper.loadProperties();
+
+ LOGGER.debug(" ---- NOTE --- about to open graph (takes a little while) ");
+ verifyGraph(AAIGraph.getInstance().getGraph());
+ jgraph = AAIGraph.getInstance().getGraph();
+ LOGGER.debug(" ---- got the new graph instance. ");
+
+ // Note - process edges first so they get logged as they are deleted since
+ // edges connected to vertices being deleted would get auto-deleted by the db.
+ long timeA = System.nanoTime();
+ processEdges(jgraph, truncateEndTs, doLogging, doDelete);
+ long timeB = System.nanoTime();
+ long diffTime = timeB - timeA;
+ long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" Took this long to process the Edges: " +
+ minCount + " minutes, " + secCount + " seconds " );
+
+ processVerts(jgraph, truncateEndTs, doLogging, doDelete);
+ long timeC = System.nanoTime();
+ diffTime = timeC - timeB;
+ minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" Took this long to process the Vertices: " +
+ minCount + " minutes, " + secCount + " seconds " );
+
+ } catch (AAIException e) {
+ ErrorLogHelper.logError("AAI_6128", e.getMessage());
+ LOGGER.error("Encountered an exception during the historyTruncate: ", e);
+ e.printStackTrace();
+ successStatus = false;
+ } catch (Exception ex) {
+ ErrorLogHelper.logError("AAI_6128", ex.getMessage());
+ LOGGER.error("Encountered an exception during the historyTruncate: ", ex);
+ ex.printStackTrace();
+ successStatus = false;
+ } finally {
+ if (jgraph != null ) {
+ // Any changes that worked correctly should have already done
+ // their commits.
+ if(!"true".equals(System.getProperty("org.onap.aai.graphadmin.started"))) {
+ if (jgraph.isOpen()) {
+ jgraph.tx().rollback();
+ jgraph.close();
+ }
+ }
+ }
+ }
+
+ return successStatus;
+ }
+
+
+ public void processVerts(JanusGraph jgraph, Long truncBeforeTs,
+ Boolean doLogging, Boolean doDelete ) {
+
+ Graph g = jgraph.newTransaction();
+ GraphTraversalSource gts = g.traversal();
+ //Iterator <Vertex> vertItr = gts.V().has(AAIProperties.END_TS, P.lt(truncBeforeTs));
+ Iterator <Vertex> vertItr = gts.V().has("end-ts", P.lt(truncBeforeTs));
+ ArrayList <Long> vidList = new ArrayList <Long> ();
+ while( vertItr.hasNext() ) {
+ Vertex tmpV = vertItr.next();
+ Long tmpVid = Long.valueOf(tmpV.id().toString());
+ vidList.add(tmpVid);
+ }
+
+ int vTotalCount = vidList.size();
+ int batchCount = vTotalCount / batchCommitSize;
+ if((batchCount * batchCommitSize) < vTotalCount){
+ batchCount++;
+ }
+
+ LOGGER.info( " Vertex TotalCount = " + vTotalCount +
+ ", we get batchCount = " + batchCount +
+ ", using commit size = " + batchCommitSize );
+
+ int vIndex = 0;
+ for(int batchNo=1; batchNo<=batchCount; batchNo++){
+ ArrayList <Long> batchVids = new ArrayList <Long> ();
+ int thisBVCount = 0;
+ while( (thisBVCount < batchCommitSize) && (vIndex < vTotalCount) ) {
+ batchVids.add(vidList.get(vIndex));
+ thisBVCount++;
+ vIndex++;
+ }
+ // now process this batch
+ LOGGER.info( "Process vertex batch # " + batchNo +
+ ", which contains " + batchVids.size() + " ids. ");
+ processVertBatch(jgraph, doLogging, doDelete, batchVids);
+ }
+ }
+
+
+ private void processVertBatch(JanusGraph jgraph, Boolean doLogging,
+ Boolean doDelete, ArrayList <Long> vidList ) {
+
+ Graph g = jgraph.newTransaction();
+ GraphTraversalSource gts = g.traversal();
+ int delFailCount = 0;
+ int vCount = 0;
+ int delCount = 0;
+
+ Iterator <Vertex> vertItr = gts.V(vidList);
+ while( vertItr.hasNext() ) {
+ vCount++;
+ Vertex tmpV = vertItr.next();
+ String tmpVid = tmpV.id().toString();
+ String tmpPropsStr = "";
+ if( doLogging ) {
+ Iterator<VertexProperty<Object>> pI = tmpV.properties();
+ while( pI.hasNext() ){
+ VertexProperty<Object> tp = pI.next();
+ Object val = tp.value();
+ tmpPropsStr = tmpPropsStr + "[" + tp.key() + "=" + val + "]";
+ }
+ LOGGER.info(" vid = " + tmpVid + ", props: (" + tmpPropsStr + ") " );
+ }
+
+ if( doDelete ) {
+ LOGGER.info("Removing vid = " + tmpVid );
+ try {
+ tmpV.remove();
+ delCount++;
+ } catch ( Exception e ) {
+ // figure out what to do
+ delFailCount++;
+ LOGGER.error("ERROR trying to delete Candidate VID = " + tmpVid + " " + LogFormatTools.getStackTop(e));
+ }
+ }
+ }
+
+ if( doDelete ) {
+ LOGGER.info("Calling commit on delete of Vertices." );
+ try {
+ g.tx().commit();
+ } catch ( Exception e ) {
+ LOGGER.error("ERROR trying to commit Vertex Deletes for this batch. " +
+ LogFormatTools.getStackTop(e) );
+ LOGGER.info( vCount + " candidate vertices processed. "
+ + " vertex deletes - COMMIT FAILED. ");
+ return;
+ }
+ }
+
+ if( doDelete ) {
+ LOGGER.info( vCount + " candidate vertices processed. " +
+ delFailCount + " delete attempts failed, " +
+ delCount + " deletes successful. ");
+ }
+ else {
+ LOGGER.info( vCount + " candidate vertices processed in this batch. " );
+ }
+ }
+
+
+ public void processEdges(JanusGraph jgraph, Long truncBeforeTs,
+ Boolean doLogging, Boolean doDelete ) {
+
+ Graph g = jgraph.newTransaction();
+ GraphTraversalSource gts = g.traversal();
+ //Iterator <Edge> edgeItr = gts.E().has(AAIProperties.END_TS, P.lt(truncBeforeTs));
+ Iterator <Edge> edgeItr = gts.E().has("end-ts", P.lt(truncBeforeTs));
+ ArrayList <String> eidList = new ArrayList <String> ();
+ while( edgeItr.hasNext() ) {
+ Edge tmpE = edgeItr.next();
+ String tmpEid = tmpE.id().toString();
+ eidList.add(tmpEid);
+ }
+
+ int eTotalCount = eidList.size();
+ int batchCount = eTotalCount / batchCommitSize;
+ if((batchCount * batchCommitSize) < eTotalCount){
+ batchCount++;
+ }
+
+ LOGGER.info( " Edge TotalCount = " + eTotalCount +
+ ", we get batchCount = " + batchCount +
+ ", using commit size = " + batchCommitSize );
+
+ int eIndex = 0;
+ for(int batchNo=1; batchNo<=batchCount; batchNo++){
+ ArrayList <String> batchEids = new ArrayList <String> ();
+ int thisBECount = 0;
+ while( (thisBECount < batchCommitSize) && (eIndex < eTotalCount) ) {
+ batchEids.add(eidList.get(eIndex));
+ thisBECount++;
+ eIndex++;
+ }
+ // now process this batch
+ LOGGER.info( "Process edge batch # " + batchNo +
+ ", which contains " + batchEids.size() + " ids. ");
+ processEdgeBatch(jgraph, doLogging, doDelete, batchEids);
+ }
+ }
+
+
+ private void processEdgeBatch(JanusGraph jgraph, Boolean doLogging,
+ Boolean doDelete, ArrayList <String> eidList ) {
+
+ Graph g = jgraph.newTransaction();
+ GraphTraversalSource gts = g.traversal();
+ int delFailCount = 0;
+ int eCount = 0;
+ int delCount = 0;
+
+ Iterator <Edge> edgeItr = gts.E(eidList);
+ while( edgeItr.hasNext() ) {
+ eCount++;
+ Edge tmpE = edgeItr.next();
+ String tmpEid = tmpE.id().toString();
+ if( doLogging ) {
+ String tmpEProps = "";
+ Iterator<Property<Object>> epI = tmpE.properties();
+ while( epI.hasNext() ){
+ Property<Object> ep = epI.next();
+ Object val = ep.value();
+ tmpEProps = tmpEProps + "[" + ep.key() + "=" + val + "]";
+ }
+ Iterator <Vertex> conVtxs = tmpE.bothVertices();
+ String tmpConVs = "";
+ while( conVtxs.hasNext() ) {
+ Vertex conV = conVtxs.next();
+ tmpConVs = tmpConVs + "[" + conV.id().toString() + "] ";
+ }
+ LOGGER.info(" eid = " + tmpEid
+ + ", Connecting vids = " + tmpConVs
+ + ", props: (" + tmpEProps + "). " );
+ }
+
+ if( doDelete ) {
+ LOGGER.info("Removing Edge eid = " + tmpEid );
+ try {
+ tmpE.remove();
+ delCount++;
+ } catch ( Exception e ) {
+ delFailCount++;
+ LOGGER.error("ERROR trying to delete Candidate Edge with eid = " + tmpEid + " " + LogFormatTools.getStackTop(e));
+ }
+ }
+ }
+
+ if( doDelete ) {
+ LOGGER.info("Calling commit on delete of Edges." );
+ try {
+ g.tx().commit();
+ } catch ( Exception e ) {
+ LOGGER.error("ERROR trying to commit Edge Deletes for this batch. " +
+ LogFormatTools.getStackTop(e) );
+ LOGGER.info( eCount + " candidate edges processed. "
+ + " edge deletes - COMMIT FAILED. ");
+ return;
+ }
+ }
+
+ if( doDelete ) {
+ LOGGER.info( eCount + " candidate edges processed. " +
+ delFailCount + " delete attempts failed, " +
+ delCount + " deletes successful. ");
+ }
+ else {
+ LOGGER.info( eCount + " candidate edges processed in this batch. " );
+ }
+ }
+
+
+ public int getCandidateVertexCount(JanusGraph jgraph, int windowDaysVal) {
+ Graph g = jgraph.newTransaction();
+ GraphTraversalSource gts = g.traversal();
+ Long truncTs = calculateTruncWindowEndTimeStamp(windowDaysVal);
+ //int candVCount = gts.V().has(AAIProperties.END_TS, P.lt(truncTs)).count().next().intValue();
+ int candVCount = gts.V().has("end-ts", P.lt(truncTs)).count().next().intValue();
+ LOGGER.info( " for the timeStamp = " + truncTs
+ + ", which corresponds to the passed truncateWindowDays = "
+ + windowDaysVal
+ + ", found " + candVCount
+ + " candidate vertices. ");
+ return candVCount;
+ }
+
+
+ public int getCandidateEdgeCount(JanusGraph jgraph, int windowDaysVal) {
+ Graph g = jgraph.newTransaction();
+ GraphTraversalSource gts = g.traversal();
+ Long truncTs = calculateTruncWindowEndTimeStamp(windowDaysVal);
+ //int candECount = gts.E().has(AAIProperties.END_TS, P.lt(truncTs)).count().next().intValue();
+ int candECount = gts.E().has("end-ts", P.lt(truncTs)).count().next().intValue();
+ LOGGER.info( " for the timeStamp = " + truncTs
+ + ", which corresponds to the passed truncateWindowDays = "
+ + windowDaysVal
+ + ", found " + candECount
+ + " candidate Edges. ");
+ return candECount;
+ }
+
+
+ public static void verifyGraph(JanusGraph graph) {
+
+ if (graph == null) {
+ String emsg = "Not able to get a graph object in DataSnapshot.java\n";
+ LOGGER.debug(emsg);
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+
+ }
+
+ public long calculateTruncWindowEndTimeStamp( int timeWindowDays ){
+ // Given a window size in days, calculate the timestamp that
+ // represents the early-edge of that window.
+
+ long unixTimeNow = System.currentTimeMillis();
+ if( timeWindowDays <= 0 ){
+ // This just means that they want to truncate all the way up to the current time
+ return unixTimeNow;
+ }
+
+ long windowInMillis = timeWindowDays * 24 * 60 * 60L * 1000;
+ long windowEdgeTimeStampInMs = unixTimeNow - windowInMillis;
+ return windowEdgeTimeStampInMs;
+
+ }
+
+ private Boolean doLoggingOrNot( String truncMode ){
+ if( truncMode.equals(SILENT_DELETE_MODE) ){
+ return false;
+ }
+ else {
+ return true;
+ }
+ }
+
+ private Boolean doDeleteOrNot( String truncMode ){
+ if( truncMode.equals(LOG_ONLY_MODE) ){
+ return false;
+ }
+ else {
+ return true;
+ }
+ }
+
+
+} \ No newline at end of file