diff options
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot')
5 files changed, 1701 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java new file mode 100644 index 0000000..12815ee --- /dev/null +++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java @@ -0,0 +1,835 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.datasnapshot; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.*; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import java.util.concurrent.TimeUnit; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +import org.apache.commons.configuration.PropertiesConfiguration; + +import org.apache.tinkerpop.gremlin.structure.io.IoCore; +import org.apache.tinkerpop.gremlin.structure.io.graphson.LegacyGraphSONReader; +import org.onap.aai.dbmap.AAIGraph; +import org.onap.aai.dbmap.AAIGraphConfig; +import org.onap.aai.exceptions.AAIException; +import org.onap.aai.logging.ErrorLogHelper; +import org.onap.aai.util.AAIConfig; +import org.onap.aai.util.AAIConstants; +import org.onap.aai.util.AAISystemExitUtil; +import org.onap.aai.util.FormatDate; + +import com.att.eelf.configuration.Configuration; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.janusgraph.core.JanusGraph; +import org.janusgraph.core.JanusGraphFactory; +import org.janusgraph.core.util.JanusGraphCleanup; + +public class DataSnapshot { + + private static EELFLogger LOGGER; + + /* Using realtime d */ + private static final String REALTIME_DB = "realtime"; + + private static final Set<String> SNAPSHOT_RELOAD_COMMANDS = new HashSet<>(); + + static { + SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_LEGACY_DATA"); + SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA"); + SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA_MULTI"); + } + + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + boolean success = true; + + // Set the logging file properties to be used by EELFManager + System.setProperty("aai.service.name", DataSnapshot.class.getSimpleName()); + Properties props = System.getProperties(); + props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS); + props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG); + LOGGER = EELFManager.getInstance().getLogger(DataSnapshot.class); + Boolean dbClearFlag = false; + JanusGraph graph = null; + String command = "JUST_TAKE_SNAPSHOT"; // This is the default + String oldSnapshotFileName = ""; + + Long vertAddDelayMs = 1L; // Default value + Long edgeAddDelayMs = 1L; // Default value + + Long failureDelayMs = 50L; // Default value + Long retryDelayMs = 1500L; // Default value + int maxErrorsPerThread = 25; // Default value + Long vertToEdgeProcDelay = 9000L; // Default value + Long staggerThreadDelay = 5000L; // Default value + + int threadCount = 0; + Boolean debugFlag = false; + int debugAddDelayTime = 1; // Default to 1 millisecond + + boolean isExistingTitan = false; + + if (args.length >= 1) { + command = args[0]; + } + + if( SNAPSHOT_RELOAD_COMMANDS.contains(command)){ + if (args.length == 2) { + // If re-loading, they need to also pass the snapshot file name to use. + // We expected the file to be found in our snapshot directory. + oldSnapshotFileName = args[1]; + } + } + else if( command.equals("THREADED_SNAPSHOT") ){ + if (args.length == 2) { + // If doing a "threaded" snapshot, they need to specify how many threads to use + try { + threadCount = Integer.parseInt(args[1]); + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + args[1] + "]"); + LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + args[1] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( threadCount < 1 || threadCount > 100 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + args[1] + "]"); + LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + args[1] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount ); + } + else if (args.length == 3) { + // If doing a "threaded" snapshot, they need to specify how many threads to use + // They can also use debug mode if they pass the word "DEBUG" to do the nodes one at a time to see where it breaks. + try { + threadCount = Integer.parseInt(args[1]); + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + args[1] + "]"); + LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + args[1] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( threadCount < 1 || threadCount > 100 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + args[1] + "]"); + LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + args[1] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( args[2].equals("DEBUG") ){ + debugFlag = true; + } + LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount + + ", and DEBUG mode set ON. "); + } + else if (args.length == 4) { + // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1) + // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2) + // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3) + try { + threadCount = Integer.parseInt(args[1]); + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + args[1] + "]"); + LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + args[1] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( threadCount < 1 || threadCount > 100 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + args[1] + "]"); + LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + args[1] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( args[2].equals("DEBUG") ){ + debugFlag = true; + } + try { + debugAddDelayTime = Integer.parseInt(args[3]); + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) debugAddDelayTime passed to DataSnapshot [" + args[3] + "]"); + LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot [" + args[3] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount + + ", DEBUG mode ON and addDelayTimer = " + debugAddDelayTime + " mSec. "); + } + else { + ErrorLogHelper.logError("AAI_6128", "Wrong param count (should be 2,3 or 4) when using THREADED_SNAPSHOT."); + LOGGER.debug("Wrong param count (should be 2,3 or 4) when using THREADED_SNAPSHOT."); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + } + else if( command.equals("MULTITHREAD_RELOAD") ){ + // Note - this will use as many threads as the snapshot file is + // broken up into. (up to a limit) + if (args.length == 2) { + // Since they are re-loading, they need to pass the snapshot file name to use. + // We expected the file to be found in our snapshot directory. Note - if + // it is a multi-part snapshot, then this should be the root of the name. + // We will be using the default delay timers. + oldSnapshotFileName = args[1]; + } + else if (args.length == 7) { + // Since they are re-loading, they need to pass the snapshot file name to use. + // We expected the file to be found in our snapshot directory. Note - if + // it is a multi-part snapshot, then this should be the root of the name. + oldSnapshotFileName = args[1]; + // They should be passing the timers in in this order: + // vertDelay, edgeDelay, failureDelay, retryDelay + vertAddDelayMs = Long.parseLong(args[2]); + edgeAddDelayMs = Long.parseLong(args[3]); + failureDelayMs = Long.parseLong(args[4]); + retryDelayMs = Long.parseLong(args[5]); + try { + maxErrorsPerThread = Integer.parseInt(args[6]); + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + args[6] + "]"); + LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + args[6] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( maxErrorsPerThread < 1 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + args[6] + "]"); + LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + args[6] + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + } + else { + ErrorLogHelper.logError("AAI_6128", "Wrong param count (should be either 2 or 7) when using MUTLITHREAD_RELOAD."); + LOGGER.debug("Wrong param count (should be 2 or 7) when using MUTLITHREAD_RELOAD."); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + } + else if (command.equals("CLEAR_ENTIRE_DATABASE")) { + if (args.length >= 2) { + oldSnapshotFileName = args[1]; + } + if (args.length == 3) { + String titanFlag = args[2]; + if ("titan".equalsIgnoreCase(titanFlag)) { + isExistingTitan = true; + } + } + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + + AAIConfig.init(); + ErrorLogHelper.loadProperties(); + LOGGER.debug("Command = " + command + ", oldSnapshotFileName = [" + oldSnapshotFileName + "]."); + String targetDir = AAIConstants.AAI_HOME + AAIConstants.AAI_FILESEP + "logs" + AAIConstants.AAI_FILESEP + "data" + AAIConstants.AAI_FILESEP + "dataSnapshots"; + + // Make sure the dataSnapshots directory is there + new File(targetDir).mkdirs(); + + LOGGER.debug(" ---- NOTE --- about to open graph (takes a little while) "); + + if (command.equals("JUST_TAKE_SNAPSHOT")) { + // ------------------------------------------ + // They just want to take a snapshot. + // ------------------------------------------ + verifyGraph(AAIGraph.getInstance().getGraph()); + FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT"); + String dteStr = fd.getDateTime(); + String newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr; + graph = AAIGraph.getInstance().getGraph(); + + graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname); + + LOGGER.debug("Snapshot written to " + newSnapshotOutFname); + + } + else if (command.equals("THREADED_SNAPSHOT")) { + // --------------------------------------------------------------------- + // They want the creation of the snapshot to be spread out via threads + // --------------------------------------------------------------------- + + FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT"); + String dteStr = fd.getDateTime(); + String newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr; + verifyGraph(AAIGraph.getInstance().getGraph()); + graph = AAIGraph.getInstance().getGraph(); + LOGGER.debug(" Successfully got the Graph instance. "); + long timeA = System.nanoTime(); + + LOGGER.debug(" Need to divide vertexIds across this many threads: " + threadCount ); + HashMap <String,ArrayList> vertListHash = new HashMap <String,ArrayList> (); + for( int t = 0; t < threadCount; t++ ){ + ArrayList <Vertex> vList = new ArrayList <Vertex> (); + String tk = "" + t; + vertListHash.put( tk, vList); + } + LOGGER.debug("Count how many nodes are in the db. "); + long totalVertCount = graph.traversal().V().count().next(); + LOGGER.debug(" Total Count of Nodes in DB = " + totalVertCount + "."); + long nodesPerFile = totalVertCount / threadCount; + LOGGER.debug(" Thread count = " + threadCount + ", each file will get (roughly): " + nodesPerFile + " nodes."); + long timeA2 = System.nanoTime(); + long diffTime = timeA2 - timeA; + long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To count all vertices in DB it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + long vtxIndex = 0; + int currentTNum = 0; + String currentTKey = "0"; + long thisThrIndex = 0; + Iterator <Vertex> vtxItr = graph.vertices(); + while( vtxItr.hasNext() ){ + // Divide up all the vertices so we can process them on different threads + vtxIndex++; + thisThrIndex++; + if( (thisThrIndex > nodesPerFile) && (currentTNum < threadCount -1) ){ + // We will need to start adding to the Hash for the next thread + currentTNum++; + currentTKey = "" + currentTNum; + thisThrIndex = 0; + } + (vertListHash.get(currentTKey)).add(vtxItr.next()); + } + + long timeB = System.nanoTime(); + diffTime = timeB - timeA2; + minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To Loop over all vertices, and put them into sub-Arrays it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + // Need to print out each set of vertices using it's own thread + ArrayList <Thread> threadArr = new ArrayList <Thread> (); + for( int thNum = 0; thNum < threadCount; thNum++ ){ + String thNumStr = "" + thNum; + String subFName = newSnapshotOutFname + ".P" + thNumStr; + Thread thr = new Thread(new PrintVertexDetails(graph, subFName, vertListHash.get(thNumStr), + debugFlag, debugAddDelayTime) ); + thr.start(); + threadArr.add(thr); + } + + // Make sure all the threads finish before moving on. + for( int thNum = 0; thNum < threadCount; thNum++ ){ + if( null != threadArr.get(thNum) ){ + (threadArr.get(thNum)).join(); + } + } + + long timeC = System.nanoTime(); + diffTime = timeC - timeB; + minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To write all the data out to snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + + } else if( command.equals("MULTITHREAD_RELOAD") ){ + // --------------------------------------------------------------------- + // They want the RELOAD of the snapshot to be spread out via threads + // NOTE - it will only use as many threads as the number of files the + // snapshot is written to. Ie. if you have a single-file snapshot, + // then this will be single-threaded. + // + ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false); + int fCount = snapFilesArr.size(); + Iterator <File> fItr = snapFilesArr.iterator(); + + JanusGraph graph1 = AAIGraph.getInstance().getGraph(); + long timeStart = System.nanoTime(); + + HashMap <String,String> old2NewVertIdMap = new <String,String> HashMap (); + + // We're going to try loading in the vertices - without edges or properties + // using Separate threads + + ExecutorService executor = Executors.newFixedThreadPool(fCount); + List<Future<HashMap<String,String>>> list = new ArrayList<Future<HashMap<String,String>>>(); + + for( int i=0; i < fCount; i++ ){ + File f = snapFilesArr.get(i); + String fname = f.getName(); + String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname; + Thread.sleep(staggerThreadDelay); // Stagger the threads a bit + LOGGER.debug(" -- Read file: [" + fullSnapName + "]"); + LOGGER.debug(" -- Call the PartialVertexLoader to just load vertices ----"); + LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs + + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs + + ", maxErrorsPerThread = " + maxErrorsPerThread ); + Callable <HashMap<String,String>> vLoader = new PartialVertexLoader(graph1, fullSnapName, + vertAddDelayMs, failureDelayMs, retryDelayMs, maxErrorsPerThread, LOGGER); + Future <HashMap<String,String>> future = (Future<HashMap<String, String>>) executor.submit(vLoader); + + // add Future to the list, we can get return value using Future + list.add(future); + LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY thread # "+ i ); + } + + threadCount = 0; + int threadFailCount = 0; + for(Future<HashMap<String,String>> fut : list){ + threadCount++; + try { + old2NewVertIdMap.putAll(fut.get()); + LOGGER.debug(" -- back from PartialVertexLoader. returned thread # " + threadCount + + ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() ); + } + catch (InterruptedException e) { + threadFailCount++; + e.printStackTrace(); + } + catch (ExecutionException e) { + threadFailCount++; + e.printStackTrace(); + } + } + + executor.shutdown(); + + if( threadFailCount > 0 ) { + String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. "; + LOGGER.debug(emsg); + throw new Exception( emsg ); + } + + long timeX = System.nanoTime(); + long diffTime = timeX - timeStart; + long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + // Give the DB a little time to chew on all those vertices + Thread.sleep(vertToEdgeProcDelay); + + // ---------------------------------------------------------------------------------------- + LOGGER.debug("\n\n\n -- Now do the edges/props ----------------------"); + // ---------------------------------------------------------------------------------------- + + + // We're going to try loading in the edges and missing properties + // Note - we're passing the whole oldVid2newVid mapping to the PartialPropAndEdgeLoader + // so that the String-updates to the GraphSON will happen in the threads instead of + // here in the un-threaded calling method. + executor = Executors.newFixedThreadPool(fCount); + ArrayList<Future<ArrayList<String>>> listEdg = new ArrayList<Future<ArrayList<String>>>(); + for( int i=0; i < fCount; i++ ){ + File f = snapFilesArr.get(i); + String fname = f.getName(); + String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname; + Thread.sleep(staggerThreadDelay); // Stagger the threads a bit + LOGGER.debug(" -- Read file: [" + fullSnapName + "]"); + LOGGER.debug(" -- Call the PartialPropAndEdgeLoader for Properties and EDGEs ----"); + LOGGER.debug(" -- edgeAddDelayMs = " + vertAddDelayMs + + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs + + ", maxErrorsPerThread = " + maxErrorsPerThread ); + + Callable eLoader = new PartialPropAndEdgeLoader(graph1, fullSnapName, + edgeAddDelayMs, failureDelayMs, retryDelayMs, + old2NewVertIdMap, maxErrorsPerThread, LOGGER); + Future <ArrayList<String>> future = (Future<ArrayList<String>>) executor.submit(eLoader); + + //add Future to the list, we can get return value using Future + listEdg.add(future); + LOGGER.debug(" -- Starting PartialPropAndEdge thread # "+ i ); + } + + threadCount = 0; + for(Future<ArrayList<String>> fut : listEdg){ + threadCount++; + try{ + fut.get(); // DEBUG -- should be doing something with the return value if it's not empty - ie. errors + LOGGER.debug(" -- back from PartialPropAndEdgeLoader. thread # " + threadCount ); + } + catch (InterruptedException e) { + threadFailCount++; + e.printStackTrace(); + } + catch (ExecutionException e) { + threadFailCount++; + e.printStackTrace(); + } + } + + executor.shutdown(); + + if( threadFailCount > 0 ) { + String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. "; + LOGGER.debug(emsg); + throw new Exception( emsg ); + } + + // This is needed so we can see the data committed by the called threads + graph1.tx().commit(); + + long timeEnd = System.nanoTime(); + diffTime = timeEnd - timeX; + minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + long totalDiffTime = timeEnd - timeStart; + long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime); + long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount); + LOGGER.debug(" -- TOTAL multi-threaded reload time: " + + totalMinCount + " minutes, " + totalSecCount + " seconds " ); + + } else if (command.equals("CLEAR_ENTIRE_DATABASE")) { + // ------------------------------------------------------------------ + // They are calling this to clear the db before re-loading it + // later + // ------------------------------------------------------------------ + + // First - make sure the backup file(s) they will be using can be + // found and has(have) data. + // getFilesToProcess makes sure the file(s) exist and have some data. + getFilesToProcess(targetDir, oldSnapshotFileName, true); + + LOGGER.debug("\n>>> WARNING <<<< "); + LOGGER.debug(">>> All data and schema in this database will be removed at this point. <<<"); + LOGGER.debug(">>> Processing will begin in 5 seconds. <<<"); + LOGGER.debug(">>> WARNING <<<< "); + + try { + // Give them a chance to back out of this + Thread.sleep(5000); + } catch (java.lang.InterruptedException ie) { + LOGGER.debug(" DB Clearing has been aborted. "); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + LOGGER.debug(" Begin clearing out old data. "); + String rtConfig = AAIConstants.REALTIME_DB_CONFIG; + String serviceName = System.getProperty("aai.service.name", "NA"); + LOGGER.debug("Getting new configs for clearig"); + PropertiesConfiguration propertiesConfiguration = new AAIGraphConfig.Builder(rtConfig).forService(serviceName).withGraphType(REALTIME_DB).buildConfiguration(); + if(isExistingTitan){ + LOGGER.debug("Existing DB is Titan"); + propertiesConfiguration.setProperty("graph.titan-version","1.0.0"); + } + LOGGER.debug("Open New Janus Graph"); + JanusGraph janusGraph = JanusGraphFactory.open(propertiesConfiguration); + verifyGraph(janusGraph); + + if(isExistingTitan){ + JanusGraphFactory.drop(janusGraph); + } else { + janusGraph.close(); + JanusGraphCleanup.clear(janusGraph); + } + LOGGER.debug(" Done clearing data. "); + LOGGER.debug(">>> IMPORTANT - NOTE >>> you need to run the SchemaGenerator (use GenTester) before "); + LOGGER.debug(" reloading data or the data will be put in without indexes. "); + dbClearFlag = true; + LOGGER.debug("All done clearing DB"); + + } else if (command.equals("RELOAD_LEGACY_DATA")) { + // ------------------------------------------------------------------- + // They want to restore the database from an old snapshot file + // ------------------------------------------------------------------- + verifyGraph(AAIGraph.getInstance().getGraph()); + graph = AAIGraph.getInstance().getGraph(); + if (oldSnapshotFileName.equals("")) { + String emsg = "No oldSnapshotFileName passed to DataSnapshot when RELOAD_LEGACY_DATA used."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + String oldSnapshotFullFname = targetDir + AAIConstants.AAI_FILESEP + oldSnapshotFileName; + File f = new File(oldSnapshotFullFname); + if (!f.exists()) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " could not be found."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } else if (!f.canRead()) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " could not be read."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } else if (f.length() == 0) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " had no data."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + LOGGER.debug("We will load data IN from the file = " + oldSnapshotFullFname); + LOGGER.debug(" Begin reloading JanusGraph 0.5 data. "); + + LegacyGraphSONReader lgr = LegacyGraphSONReader.build().create(); + InputStream is = new FileInputStream(oldSnapshotFullFname); + lgr.readGraph(is, graph); + + LOGGER.debug("Completed the inputGraph command, now try to commit()... "); + graph.tx().commit(); + LOGGER.debug("Completed reloading JanusGraph 0.5 data."); + + long vCount = graph.traversal().V().count().next(); + LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db."); + } else if (command.equals("RELOAD_DATA")) { + // ------------------------------------------------------------------- + // They want to restore the database from an old snapshot file + // ------------------------------------------------------------------- + verifyGraph(AAIGraph.getInstance().getGraph()); + graph = AAIGraph.getInstance().getGraph(); + if (oldSnapshotFileName.equals("")) { + String emsg = "No oldSnapshotFileName passed to DataSnapshot when RELOAD_DATA used."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + String oldSnapshotFullFname = targetDir + AAIConstants.AAI_FILESEP + oldSnapshotFileName; + File f = new File(oldSnapshotFullFname); + if (!f.exists()) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " could not be found."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } else if (!f.canRead()) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " could not be read."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } else if (f.length() == 0) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " had no data."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + LOGGER.debug("We will load data IN from the file = " + oldSnapshotFullFname); + LOGGER.debug(" Begin reloading data. "); + graph.io(IoCore.graphson()).readGraph(oldSnapshotFullFname); + LOGGER.debug("Completed the inputGraph command, now try to commit()... "); + graph.tx().commit(); + LOGGER.debug("Completed reloading data."); + + long vCount = graph.traversal().V().count().next(); + + LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db."); + + } else if (command.equals("RELOAD_DATA_MULTI")) { + // ------------------------------------------------------------------- + // They want to restore the database from a group of snapshot files + // Note - this uses multiple snapshot files, but runs single-threaded. + // ------------------------------------------------------------------- + verifyGraph(AAIGraph.getInstance().getGraph()); + graph = AAIGraph.getInstance().getGraph(); + + ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false); + + long timeA = System.nanoTime(); + + int fCount = snapFilesArr.size(); + Iterator <File> fItr = snapFilesArr.iterator(); + Vector<InputStream> inputStreamsV = new Vector<>(); + for( int i = 0; i < fCount; i++ ){ + File f = snapFilesArr.get(i); + String fname = f.getName(); + if (!f.canRead()) { + String emsg = "oldSnapshotFile " + fname + " could not be read."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } else if (f.length() == 0) { + String emsg = "oldSnapshotFile " + fname + " had no data."; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + String fullFName = targetDir + AAIConstants.AAI_FILESEP + fname; + InputStream fis = new FileInputStream(fullFName); + inputStreamsV.add(fis); + } + // Now add inputStreams.elements() to the Vector, + // inputStreams.elements() will return Enumerations + InputStream sis = new SequenceInputStream(inputStreamsV.elements()); + LOGGER.debug("Begin loading data from " + fCount + " files -----"); + graph.io(IoCore.graphson()).reader().create().readGraph(sis, graph); + LOGGER.debug("Completed the inputGraph command, now try to commit()... "); + graph.tx().commit(); + LOGGER.debug(" >> Completed reloading data."); + + long vCount = graph.traversal().V().count().next(); + LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db."); + + long timeB = System.nanoTime(); + long diffTime = timeB - timeA; + long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To Reload this snapshot, it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + + } else { + String emsg = "Bad command passed to DataSnapshot: [" + command + "]"; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + } catch (AAIException e) { + ErrorLogHelper.logError("AAI_6128", e.getMessage()); + LOGGER.error("Encountered an exception during the datasnapshot: ", e); + e.printStackTrace(); + success = false; + } catch (Exception ex) { + ErrorLogHelper.logError("AAI_6128", ex.getMessage()); + LOGGER.error("Encountered an exception during the datasnapshot: ", ex); + ex.printStackTrace(); + success = false; + } finally { + if (!dbClearFlag && graph != null) { + // Any changes that worked correctly should have already done + // thier commits. + if(!"true".equals(System.getProperty("org.onap.aai.graphadmin.started"))) { + if (graph.isOpen()) { + graph.tx().rollback(); + graph.close(); + } + } + } + try { + baos.close(); + } catch (IOException iox) { + } + } + + if(success){ + AAISystemExitUtil.systemExitCloseAAIGraph(0); + } else { + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + }// End of main() + + + private static ArrayList <File> getFilesToProcess(String targetDir, String oldSnapshotFileName, boolean doingClearDb) + throws Exception { + + if( oldSnapshotFileName == null || oldSnapshotFileName.equals("") ){ + String emsg = "No oldSnapshotFileName passed to DataSnapshot for Reload. "; + if( doingClearDb ) { + emsg = "No oldSnapshotFileName passed to DataSnapshot. Needed when Clearing the db in case we need a backup. "; + } + LOGGER.debug(emsg); + throw new Exception( emsg ); + } + + ArrayList <File> snapFilesArrList = new ArrayList <File> (); + + // First, we'll assume that this is a multi-file snapshot and + // look for names based on that. + String thisSnapPrefix = oldSnapshotFileName + ".P"; + File fDir = new File(targetDir); // Snapshot directory + File[] allFilesArr = fDir.listFiles(); + for (File snapFile : allFilesArr) { + String snapFName = snapFile.getName(); + if( snapFName.startsWith(thisSnapPrefix)){ + if (!snapFile.canRead()) { + String emsg = "oldSnapshotFile " + snapFName + " could not be read."; + LOGGER.debug(emsg); + throw new Exception (emsg); + } else if (snapFile.length() == 0) { + String emsg = "oldSnapshotFile " + snapFName + " had no data."; + LOGGER.debug(emsg); + throw new Exception (emsg); + } + snapFilesArrList.add(snapFile); + } + } + + if( snapFilesArrList.isEmpty() ){ + // Multi-file snapshot check did not find files, so this may + // be a single-file snapshot. + String oldSnapshotFullFname = targetDir + AAIConstants.AAI_FILESEP + oldSnapshotFileName; + File f = new File(oldSnapshotFullFname); + if (!f.exists()) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " could not be found."; + LOGGER.debug(emsg); + throw new Exception (emsg); + } else if (!f.canRead()) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " could not be read."; + LOGGER.debug(emsg); + throw new Exception (emsg); + } else if (f.length() == 0) { + String emsg = "oldSnapshotFile " + oldSnapshotFullFname + " had no data."; + LOGGER.debug(emsg); + throw new Exception (emsg); + } + snapFilesArrList.add(f); + } + + if( snapFilesArrList.isEmpty() ){ + // Still haven't found anything.. that was not a good file name. + String fullFName = targetDir + AAIConstants.AAI_FILESEP + thisSnapPrefix; + String emsg = "oldSnapshotFile " + fullFName + "* could not be found."; + LOGGER.debug(emsg); + throw new Exception(emsg); + } + + return snapFilesArrList; + } + + + public static void verifyGraph(JanusGraph graph) { + + if (graph == null) { + String emsg = "Not able to get a graph object in DataSnapshot.java\n"; + LOGGER.debug(emsg); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + } + + +}
\ No newline at end of file diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshotTasks.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshotTasks.java new file mode 100644 index 0000000..cc9ca97 --- /dev/null +++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshotTasks.java @@ -0,0 +1,115 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datasnapshot; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.*; + +import org.onap.aai.datagrooming.DataGrooming; +import org.onap.aai.datagrooming.DataGroomingTasks; +import org.onap.aai.exceptions.AAIException; +import org.onap.aai.logging.ErrorLogHelper; +import org.onap.aai.logging.LoggingContext; +import org.onap.aai.util.AAIConfig; +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +@Component +@PropertySource("file:${server.local.startpath}/etc/appprops/datatoolscrons.properties") +public class DataSnapshotTasks { + + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DataSnapshotTasks.class); + private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); + + @Scheduled(cron = "${datasnapshottasks.cron}" ) + public void snapshotScheduleTask() throws AAIException, Exception { + + LoggingContext.init(); + LoggingContext.requestId(UUID.randomUUID().toString()); + LoggingContext.partnerName("AAI"); + LoggingContext.targetEntity("CronApp"); + LoggingContext.component("dataSnapshot"); + LoggingContext.serviceName("snapshotScheduleTask"); + LoggingContext.targetServiceName("snapshotScheduleTask"); + LoggingContext.statusCode(LoggingContext.StatusCode.COMPLETE); + + if(!"true".equals(AAIConfig.get("aai.disable.check.snapshot.running", "false"))){ + if(checkIfDataSnapshotIsRunning()){ + LOGGER.info("Data Snapshot is already running on the system"); + return; + } + } + + LOGGER.info("Started cron job dataSnapshot @ " + dateFormat.format(new Date())); + try { + if (AAIConfig.get("aai.cron.enable.dataSnapshot").equals("true")) { + DataSnapshot dataSnapshot = new DataSnapshot(); + String [] dataSnapshotParms = AAIConfig.get("aai.datasnapshot.params", "JUST_TAKE_SNAPSHOT").split("\\s+"); + LOGGER.info("DataSnapshot Params {}", Arrays.toString(dataSnapshotParms)); + dataSnapshot.main(dataSnapshotParms); + } + } + catch (Exception e) { + ErrorLogHelper.logError("AAI_4000", "Exception running cron job for DataSnapshot"+e.toString()); + LOGGER.info("AAI_4000", "Exception running cron job for DataSnapshot"+e.toString()); + throw e; + } finally { + LOGGER.info("Ended cron job dataSnapshot @ " + dateFormat.format(new Date())); + LoggingContext.clear(); + } + + } + + private boolean checkIfDataSnapshotIsRunning(){ + + Process process = null; + + int count = 0; + try { + process = new ProcessBuilder().command("bash", "-c", "ps -ef | grep '[D]ataSnapshot'").start(); + InputStream is = process.getInputStream(); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + while (br.readLine() != null){ + count++; + } + + int exitVal = process.waitFor(); + LOGGER.info("Exit value of the dataSnapshot check process: " + exitVal); + } catch (Exception e) { + e.printStackTrace(); + } + + if(count > 0){ + return true; + } else { + return false; + } + } +} + +
\ No newline at end of file diff --git a/src/main/java/org/onap/aai/datasnapshot/PartialPropAndEdgeLoader.java b/src/main/java/org/onap/aai/datasnapshot/PartialPropAndEdgeLoader.java new file mode 100644 index 0000000..af858ae --- /dev/null +++ b/src/main/java/org/onap/aai/datasnapshot/PartialPropAndEdgeLoader.java @@ -0,0 +1,421 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datasnapshot; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.Callable; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.janusgraph.core.JanusGraph; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.json.JSONArray; +import org.json.JSONObject; + +import com.att.eelf.configuration.EELFLogger; + + + + +public class PartialPropAndEdgeLoader implements Callable <ArrayList<String>>{ + + private EELFLogger LOGGER; + + private JanusGraph jg; + private String fName; + private Long edgeAddDelayMs; + private Long retryDelayMs; + private Long failureDelayMs; + private HashMap<String,String> old2NewVidMap; + private int maxAllowedErrors; + + + + public PartialPropAndEdgeLoader (JanusGraph graph, String fn, Long edgeDelay, Long failureDelay, Long retryDelay, + HashMap<String,String> vidMap, int maxErrors, EELFLogger elfLog ){ + jg = graph; + fName = fn; + edgeAddDelayMs = edgeDelay; + failureDelayMs = failureDelay; + retryDelayMs = retryDelay; + old2NewVidMap = vidMap; + maxAllowedErrors = maxErrors; + LOGGER = elfLog; + } + + + public ArrayList<String> call() throws Exception { + + // This is a partner to the "PartialVertexLoader" code. + // That code loads in vertex-id's/vertex-label's for a + // multi-file data snapshot. + // This code assumes that the all vertex-id's are now in the target db. + // This code loads vertex properties and edges for a + // multi-file data snapshot (the same one that loaded + // the vertex-ids). + // + + + // NOTE - We will be loading parameters and edges for one node at a time so that problems can be + // identified or ignored or re-tried instead of causing the entire load to fail. + // + // Return an arrayList of Strings to give info on what nodes encountered problems + + int entryCount = 0; + int retryCount = 0; + int failureCount = 0; + int retryFailureCount = 0; + HashMap <String,String> failedAttemptHash = new HashMap <String,String> (); + ArrayList <String> failedAttemptInfo = new ArrayList <String> (); + + int passNum = 1; + try( BufferedReader br = new BufferedReader(new FileReader(fName))) { + // loop through the file lines and do PUT for each vertex or the edges depending on what the loadtype is + for(String origLine; (origLine = br.readLine()) != null; ) { + entryCount++; + Thread.sleep(edgeAddDelayMs); // Space the edge requests out a little + + String errInfoStr = processThisLine(origLine, passNum); + if( !errInfoStr.equals("") ){ + // There was a problem with this line + String vidStr = getTheVidForThisLine(origLine); + // We'll use the failedAttemptHash to reTry this item + failedAttemptHash.put(vidStr,origLine); + failedAttemptInfo.add(errInfoStr); + failureCount++; + if( failureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialPropAndEdgeLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + Thread.sleep(failureDelayMs); // take a little nap if it failed + } + } // End of looping over each line + if( br != null ){ + br.close(); + } + } + catch (Exception e) { + LOGGER.debug(" --- Failed in the main loop for Buffered-Reader item # " + entryCount + + ", fName = " + fName ); + LOGGER.debug(" --- msg = " + e.getMessage() ); + throw e; + } + + // --------------------------------------------------------------------------- + // Now Re-Try any failed requests that might have Failed on the first pass. + // --------------------------------------------------------------------------- + passNum++; + try { + for (String failedVidStr : failedAttemptHash.keySet()) { + // Take a little nap, and retry this failed attempt + LOGGER.debug("DEBUG >> We will sleep for " + retryDelayMs + " and then RETRY any failed edge/property ADDs. "); + Thread.sleep(retryDelayMs); + retryCount++; + Long failedVidL = Long.parseLong(failedVidStr); + // When an Edge/Property Add fails, we store the whole (translated) graphSON line as the data in the failedAttemptHash + // We're really just doing a GET of this one vertex here... + String jsonLineToRetry = failedAttemptHash.get(failedVidStr); + String errInfoStr = processThisLine(jsonLineToRetry, passNum); + if( !errInfoStr.equals("") ){ + // There was a problem with this line + String translatedVidStr = getTheVidForThisLine(jsonLineToRetry); + failedAttemptHash.put(translatedVidStr,jsonLineToRetry); + failedAttemptInfo.add(errInfoStr); + retryFailureCount++; + if( retryFailureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialPropAndEdgeLoader() because " + + "Max Allowed Error count was exceeded while doing retries for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + Thread.sleep(failureDelayMs); // take a little nap if it failed + } + } // End of looping over each failed line + } + catch (Exception e) { + LOGGER.debug(" -- error in RETRY block. ErrorMsg = [" + e.getMessage() + "]" ); + throw e; + } + + LOGGER.debug(">>> After Processing in PartialPropAndEdgeLoader() " + + entryCount + " records processed. " + failureCount + " records failed. " + + retryCount + " RETRYs processed. " + retryFailureCount + " RETRYs failed. "); + + return failedAttemptInfo; + + }// end of call() + + + + private String translateThisVid(String oldVid) throws Exception { + + if( old2NewVidMap == null ){ + throw new Exception(" ERROR - null old2NewVidMap found in translateThisVid. "); + } + + if( old2NewVidMap.containsKey(oldVid) ){ + return old2NewVidMap.get(oldVid); + } + else { + throw new Exception(" ERROR - could not find VID translation for original VID = " + oldVid ); + } + } + + + private String getTheVidForThisLine(String graphSonLine) throws Exception { + + if( graphSonLine == null ){ + throw new Exception(" ERROR - null graphSonLine passed to getTheVidForThisLine. "); + } + + // We are assuming that the graphSonLine has the vertexId as the first ID: + // {"id":100995128,"label":"vertex","inE":{"hasPinterface":[{"id":"7lgg0e-2... etc... + + // The vertexId for this line is the numeric part after the initial {"id":xxxxx up to the first comma + int x = graphSonLine.indexOf(':') + 1; + int y = graphSonLine.indexOf(','); + String initialVid = graphSonLine.substring(x,y); + if( initialVid != null && !initialVid.isEmpty() && initialVid.matches("^[0-9]+$") ){ + return initialVid; + } + else { + throw new Exception(" ERROR - could not determine initial VID for graphSonLine: " + graphSonLine ); + } + } + + + private String processThisLine(String graphSonLine, int passNum){ + + String passInfo = ""; + if( passNum > 1 ) { + passInfo = " >> RETRY << pass # " + passNum + " "; + } + + JSONObject jObj = new JSONObject(); + String originalVid = ""; + + try{ + jObj = new JSONObject(graphSonLine); + originalVid = jObj.get("id").toString(); + } + catch ( Exception e ){ + LOGGER.debug(" -- Could not convert line to JsonObject [ " + graphSonLine + "]" ); + LOGGER.debug(" -- ErrorMsg = [" +e.getMessage() + "]"); + + return(" DEBUG -a- JSON translation exception when processing this line ---"); + //xxxxxDEBUGxxxxx I think we put some info on the return String and then return? + } + + // ----------------------------------------------------------------------------------------- + // Note - this assumes that any vertices referred to by an edge will already be in the DB. + // ----------------------------------------------------------------------------------------- + Vertex dbVtx = null; + + String newVidStr = ""; + Long newVidL = 0L; + try { + newVidStr = translateThisVid(originalVid); + newVidL = Long.parseLong(newVidStr); + } + catch ( Exception e ){ + LOGGER.debug(" -- " + passInfo + " translate VertexId before adding edges failed for this: vtxId = " + + originalVid + ". ErrorMsg = [" +e.getMessage() + "]"); + + return(" DEBUG -b- there VID-translation error when processing this line ---"); + //xxxxxDEBUGxxxxx I think we put some info on the return String and then return? + } + + + try { + dbVtx = getVertexFromDbForVid(newVidStr); + } + catch ( Exception e ){ + LOGGER.debug(" -- " + passInfo + " READ Vertex from DB before adding edges failed for this: vtxId = " + originalVid + + ", newVidId = " + newVidL + ". ErrorMsg = [" +e.getMessage() + "]"); + + return(" -- there was an error processing this line --- Line = [" + graphSonLine + "]"); + //xxxxxxDEBUGxxxx I think we put some info on the return String and then return? + } + + + String edResStr = processEdgesForVtx( jObj, dbVtx, passInfo, originalVid ); + if( edResStr.equals("") ){ + // We will commit the edges by themselves in case the properties stuff below fails + try { + jg.tx().commit(); + } + catch ( Exception e ){ + LOGGER.debug(" -- " + passInfo + " COMMIT FAILED adding EDGES for this vertex: vtxId = " + + originalVid + ". ErrorMsg = [" +e.getMessage() + "]"); + //xxxxxxxxxx I think we put some info on the return String and then return? + return(" DEBUG -d- there was an error doing the commit while processing edges for this line ---"); + } + } + + // Add the properties that we didn't have when we added the 'bare-bones' vertex + String pResStr = processPropertiesForVtx( jObj, dbVtx, passInfo, originalVid ); + if( pResStr.equals("") ){ + try { + jg.tx().commit(); + return ""; + } + catch ( Exception e ){ + LOGGER.debug(" -- " + passInfo + " COMMIT FAILED adding Properties for this vertex: vtxId = " + + originalVid + ". ErrorMsg = [" +e.getMessage() + "]"); + //xxxxxxxxxx I think we put some info on the return String and then return? + return(" DEBUG -e- there was an error doing the commit while processing Properties for this line ---"); + } + } + else { + LOGGER.debug("DEBUG " + passInfo + " Error processing Properties for this vertex: vtxId = " + originalVid ); + + //xxxxxxxxxx I think we put some info on the return String and then return? + return(" DEBUG -f- there was an error while processing Properties for this line ---"); + } + } + + + private String processPropertiesForVtx( JSONObject jObj, Vertex dbVtx, String passInfo, String originalVid ){ + + try { + JSONObject propsOb = (JSONObject) jObj.get("properties"); + Iterator <String> propsItr = propsOb.keys(); + while( propsItr.hasNext() ){ + String pKey = propsItr.next(); + JSONArray propsDetArr = propsOb.getJSONArray(pKey); + for( int i=0; i< propsDetArr.length(); i++ ){ + JSONObject prop = propsDetArr.getJSONObject(i); + String val = prop.getString("value"); + dbVtx.property(pKey, val); //DEBUGjojo -- val is always String here.. which is not right -------------------DEBUG + } + } + + } + catch ( Exception e ){ + LOGGER.debug(" -- " + passInfo + " failure getting/setting properties for: vtxId = " + + originalVid + ". ErrorMsg = [" + e.getMessage() + "]"); + //xxxDEBUGxxxxxxx I think we put some info on the return String and then return? + return(" DEBUG -g- there was an error adding properties while processing this line ---"); + + } + + return ""; + } + + + private Vertex getVertexFromDbForVid( String vtxIdStr ) throws Exception { + Vertex thisVertex = null; + Long vtxIdL = 0L; + + try { + vtxIdL = Long.parseLong(vtxIdStr); + Iterator <Vertex> vItr = jg.vertices(vtxIdL); + // Note - we only expect to find one vertex found for this ID. + while( vItr.hasNext() ){ + thisVertex = vItr.next(); + } + } + catch ( Exception e ){ + String emsg = "Error finding vertex for vid = " + vtxIdStr + "[" + e.getMessage() + "]"; + throw new Exception ( emsg ); + } + + if( thisVertex == null ){ + String emsg = "Could not find vertex for passed vid = " + vtxIdStr; + throw new Exception ( emsg ); + } + + return thisVertex; + } + + + private String processEdgesForVtx( JSONObject jObj, Vertex dbVtx, String passInfo, String originalVid ){ + + // Process the edges for this vertex -- but, just the "OUT" ones so edges don't get added twice (once from + // each side of the edge). + JSONObject edOb = null; + try { + edOb = (JSONObject) jObj.get("outE"); + } + catch (Exception e){ + // There were no OUT edges. This is OK. + return ""; + } + + try { + if( edOb == null ){ + // There were no OUT edges. This is OK. Not all nodes have out edges. + return ""; + } + Iterator <String> edItr = edOb.keys(); + while( edItr.hasNext() ){ + String eLabel = edItr.next(); + String inVid = ""; // Note - this should really be a Long? + JSONArray edArr = edOb.getJSONArray(eLabel); + for( int i=0; i< edArr.length(); i++ ){ + JSONObject eObj = edArr.getJSONObject(i); + String inVidStr = eObj.get("inV").toString(); + String translatedInVidStr = translateThisVid(inVidStr); + Vertex newInVertex = getVertexFromDbForVid(translatedInVidStr); + + // Note - addEdge automatically adds the edge in the OUT direction from the + // 'anchor' node that the call is being made from. + Edge tmpE = dbVtx.addEdge(eLabel, newInVertex); + JSONObject ePropsOb = null; + try { + ePropsOb = (JSONObject) eObj.get("properties"); + } + catch (Exception e){ + // NOTE - model definition related edges do not have edge properties. That is OK. + // Ie. when a model-element node has an "isA" edge to a "model-ver" node, that edge does + // not have edge properties on it. + } + if( ePropsOb != null ){ + Iterator <String> ePropsItr = ePropsOb.keys(); + while( ePropsItr.hasNext() ){ + String pKey = ePropsItr.next(); + tmpE.property(pKey, ePropsOb.getString(pKey)); + } + } + } + } + + } + catch ( Exception e ){ + String msg = " -- " + passInfo + " failure adding edge for: original vtxId = " + + originalVid + ". ErrorMsg = [" +e.getMessage() + "]"; + LOGGER.debug( " -- " + msg ); + //xxxxxxDEBUGxxxx I think we might need some better info on the return String to return? + LOGGER.debug(" -- now going to return/bail out of processEdgesForVtx" ); + return(" >> " + msg ); + + } + + return ""; + } + + +} + + diff --git a/src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java b/src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java new file mode 100644 index 0000000..387f45e --- /dev/null +++ b/src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java @@ -0,0 +1,223 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datasnapshot; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.util.HashMap; +import java.util.concurrent.Callable; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader; +import org.janusgraph.core.JanusGraph; + +import com.att.eelf.configuration.EELFLogger; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + + + +public class PartialVertexLoader implements Callable<HashMap<String,String>>{ + + private EELFLogger LOGGER; + + private JanusGraph jg; + private String fName; + private Long vertAddDelayMs; + private Long failurePauseMs; + private Long retryDelayMs; + private int maxAllowedErrors; + + public PartialVertexLoader (JanusGraph graph, String fn, Long vertDelay, Long failurePause, + Long retryDelay, int maxErrors, EELFLogger elfLog ){ + jg = graph; + fName = fn; + vertAddDelayMs = vertDelay; + failurePauseMs = failurePause; + retryDelayMs = retryDelay; + maxAllowedErrors = maxErrors; + LOGGER = elfLog; + } + + public HashMap<String,String> call() throws Exception { + + // NOTE - we will be loading one node at a time so that bad nodes can be ignored instead of causing the + // entire load to fail. + // + int entryCount = 0; + int retryCount = 0; + int failureCount = 0; + int retryFailureCount = 0; + HashMap <String,String> failedAttemptHash = new HashMap <String,String> (); + HashMap <String,String> old2NewVtxIdHash = new HashMap <String,String> (); + GraphSONReader gsr = GraphSONReader.build().create(); + + + // Read this file into a JSON object + JsonParser parser = new JsonParser(); + + try( BufferedReader br = new BufferedReader(new FileReader(fName))) { + // loop through the file lines and do PUT for each vertex or the edges depending on what the loadtype is + for(String line; (line = br.readLine()) != null; ) { + entryCount++; + Object ob = parser.parse(line); + JsonObject jObj = (JsonObject) ob; + // NOTE - we will need to keep track of how the newly generated vid's map + // to the old ones so we can aim the edges correctly later. + + // ---- Note -- This ONLY loads the vertexId and the label for each vertex ------------- + Thread.sleep(vertAddDelayMs); + + String oldVtxIdStr = jObj.get("id").getAsString(); + String vtxLabelStr = jObj.get("label").getAsString(); + try { + Vertex tmpV = jg.addVertex(vtxLabelStr); + String newVtxIdStr = tmpV.id().toString(); + old2NewVtxIdHash.put(oldVtxIdStr, newVtxIdStr); + } + catch ( Exception e ){ + failureCount++; + Thread.sleep(failurePauseMs); // Slow down if things are failing + LOGGER.debug(" >> addVertex FAILED for vtxId = " + oldVtxIdStr + ", label = [" + + vtxLabelStr + "]. ErrorMsg = [" + e.getMessage() + "]" ); + //e.printStackTrace(); + failedAttemptHash.put(oldVtxIdStr, vtxLabelStr); + if( failureCount > maxAllowedErrors ) { + LOGGER.debug(" >>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + try { + jg.tx().commit(); + } + catch ( Exception e ){ + failureCount++; + Thread.sleep(failurePauseMs); // Slow down if things are failing + LOGGER.debug(" -- COMMIT FAILED for Vtx ADD for vtxId = " + oldVtxIdStr + ", label = [" + + vtxLabelStr + "]. ErrorMsg = [" +e.getMessage() + "]" ); + //e.printStackTrace(); + failedAttemptHash.put(oldVtxIdStr, vtxLabelStr); + if( failureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + + } // End of looping over each line + + if( br != null ){ + br.close(); + } + } + catch (Exception e) { + LOGGER.debug(" --- Failed in the main loop for Buffered-Reader item # " + entryCount + + ", fName = " + fName ); + LOGGER.debug(" --- msg = " + e.getMessage() ); + e.printStackTrace(); + throw e; + } + + // --------------------------------------------------------------------------- + // Now Re-Try any failed requests that might have Failed on the first pass. + // --------------------------------------------------------------------------- + try { + for (String failedVidStr : failedAttemptHash.keySet()) { + // Take a little nap, and retry this failed attempt + LOGGER.debug("DEBUG >> We will sleep for " + retryDelayMs + " and then RETRY any failed vertex ADDs. "); + Thread.sleep(retryDelayMs); + + retryCount++; + // When a vertex Add fails we store the label as the data in the failedAttemptHash. + String failedLabel = failedAttemptHash.get(failedVidStr); + LOGGER.debug("DEBUG >> RETRY << " + + failedVidStr + ", label = " + failedLabel ); + try { + Vertex tmpV = jg.addVertex(failedLabel); + String newVtxIdStr = tmpV.id().toString(); + old2NewVtxIdHash.put(failedVidStr, newVtxIdStr); + } + catch ( Exception e ){ + retryFailureCount++; + LOGGER.debug(" -- addVertex FAILED for RETRY for vtxId = " + + failedVidStr + ", label = [" + failedLabel + + "]. ErrorMsg = [" +e.getMessage() + "]" ); + e.printStackTrace(); + if( retryFailureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + try { + jg.tx().commit(); + // If this worked, we can take it off of the failed list + failedAttemptHash.remove(failedVidStr); + } + catch ( Exception e ){ + retryFailureCount++; + LOGGER.debug(" -- COMMIT FAILED for RETRY for vtxId = " + failedVidStr + + ", label = [" + failedLabel + "]. ErrorMsg = [" + e.getMessage() + "]" ); + e.printStackTrace(); + if( retryFailureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + } // End of looping over failed attempt hash and doing retries + + } + catch ( Exception e ){ + LOGGER.debug(" -- error in RETRY block. ErrorMsg = [" +e.getMessage() + "]" ); + e.printStackTrace(); + throw e; + } + + // This would need to be properly logged... + LOGGER.debug(">>> After Processing in PartialVertexLoader(): " + + entryCount + " records processed. " + failureCount + " records failed. " + + retryCount + " RETRYs processed. " + retryFailureCount + " RETRYs failed. "); + + return old2NewVtxIdHash; + + }// end of call() + + + +} + + diff --git a/src/main/java/org/onap/aai/datasnapshot/PrintVertexDetails.java b/src/main/java/org/onap/aai/datasnapshot/PrintVertexDetails.java new file mode 100644 index 0000000..791ae15 --- /dev/null +++ b/src/main/java/org/onap/aai/datasnapshot/PrintVertexDetails.java @@ -0,0 +1,107 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datasnapshot; + +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.IoCore; +import org.janusgraph.core.JanusGraph; + + +public class PrintVertexDetails implements Runnable{ + + //private static EELFLogger LOGGER; + + private JanusGraph jg; + private String fname; + private ArrayList<Vertex> vtxList; + private Boolean debugOn; + private int debugDelayMs; + + public PrintVertexDetails (JanusGraph graph, String fn, ArrayList<Vertex> vL, Boolean debugFlag, int debugDelay){ + jg = graph; + fname = fn; + vtxList = vL; + debugOn = debugFlag; + debugDelayMs = debugDelay; + } + + public void run(){ + if( debugOn ){ + // This is much slower, but sometimes we need to find out which single line is causing a failure + try{ + int okCount = 0; + int failCount = 0; + Long debugDelayMsL = new Long(debugDelayMs); + FileOutputStream subFileStr = new FileOutputStream(fname); + Iterator <Vertex> vSubItr = vtxList.iterator(); + while( vSubItr.hasNext() ){ + Long vertexIdL = 0L; + String aaiNodeType = ""; + String aaiUri = ""; + String aaiUuid = ""; + try { + Vertex tmpV = vSubItr.next(); + vertexIdL = (Long) tmpV.id(); + aaiNodeType = (String) tmpV.property("aai-node-type").orElse(null); + aaiUri = (String) tmpV.property("aai-uri").orElse(null); + aaiUuid = (String) tmpV.property("aai-uuid").orElse(null); + + Thread.sleep(debugDelayMsL); // Make sure it doesn't bump into itself + jg.io(IoCore.graphson()).writer().create().writeVertex(subFileStr, tmpV, Direction.BOTH); + okCount++; + } + catch(Exception e) { + failCount++; + System.out.println(" >> DEBUG MODE >> Failed at: VertexId = [" + vertexIdL + + "], aai-node-type = [" + aaiNodeType + + "], aai-uuid = [" + aaiUuid + + "], aai-uri = [" + aaiUri + "]. " ); + e.printStackTrace(); + } + } + System.out.println(" -- Printed " + okCount + " vertexes out to " + fname + + ", with " + failCount + " failed."); + subFileStr.close(); + } + catch(Exception e){ + e.printStackTrace(); + } + } + else { + // Not in DEBUG mode, so we'll do all the nodes in one group + try{ + int count = vtxList.size(); + Iterator <Vertex> vSubItr = vtxList.iterator(); + FileOutputStream subFileStr = new FileOutputStream(fname); + jg.io(IoCore.graphson()).writer().create().writeVertices(subFileStr, vSubItr, Direction.BOTH); + subFileStr.close(); + System.out.println(" -- Printed " + count + " vertexes out to " + fname); + } + catch(Exception e){ + e.printStackTrace(); + } + } + } + +}
\ No newline at end of file |