diff options
author | Fiete Ostkamp <Fiete.Ostkamp@telekom.de> | 2024-06-12 16:53:33 +0200 |
---|---|---|
committer | Fiete Ostkamp <Fiete.Ostkamp@telekom.de> | 2024-06-13 15:28:10 +0200 |
commit | 023e0b5018247f02e05dbed3f91c495161fde814 (patch) | |
tree | 9fed7ae6e82ea9389c27896a0b66962aa1a242f7 /src | |
parent | 1647fc01e2dd11a56770cf6b1dd6d2c1f11392f5 (diff) |
Support exporting to the kryo format in the dataSnapshot script
- add support for the kryo format when creating a database snapshot
Issue-ID: AAI-3872
Change-Id: Iba085f7227bbfcb928370443c2e0eac95e49bb9c
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java | 396 | ||||
-rw-r--r-- | src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java | 62 |
2 files changed, 266 insertions, 192 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java index a9312b2..cdb858e 100644 --- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java +++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java @@ -67,7 +67,7 @@ import com.beust.jcommander.ParameterException; public class DataSnapshot { private static Logger LOGGER; - + /* Using realtime d */ private static final String REALTIME_DB = "realtime"; @@ -79,10 +79,10 @@ public class DataSnapshot { SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA"); SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA_MULTI"); } - + private CommandLineArgs cArgs; - - + + /** * The main method. * @@ -101,7 +101,7 @@ public class DataSnapshot { DataSnapshot dataSnapshot = new DataSnapshot(); success = dataSnapshot.executeCommand(args, success, dbClearFlag, graph, command, oldSnapshotFileName); - + if(success){ AAISystemExitUtil.systemExitCloseAAIGraph(0); } else { @@ -114,7 +114,7 @@ public class DataSnapshot { public boolean executeCommand(String[] args, boolean success, Boolean dbClearFlag, JanusGraph graph, String command, String oldSnapshotFileName) { - + // Set the logging file properties to be used by EELFManager System.setProperty("aai.service.name", DataSnapshot.class.getSimpleName()); Properties props = System.getProperties(); @@ -122,7 +122,7 @@ public class DataSnapshot { props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG); LOGGER = LoggerFactory.getLogger(DataSnapshot.class); cArgs = new CommandLineArgs(); - + String itemName = "aai.datasnapshot.threads.for.create"; try { String val = AAIConfig.get(itemName); @@ -144,8 +144,8 @@ public class DataSnapshot { LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } long maxNodesPerFile4Create = cArgs.maxNodesPerFile; - - cArgs.snapshotType = "graphson"; + + // cArgs.snapshotType = "graphson"; Long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS; itemName = "aai.datasnapshot.vertex.add.delay.ms"; try { @@ -156,7 +156,7 @@ public class DataSnapshot { }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } - + Long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS; itemName = "aai.datasnapshot.edge.add.delay.ms"; try { @@ -167,7 +167,7 @@ public class DataSnapshot { }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } - + Long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS; itemName = "aai.datasnapshot.failure.delay.ms"; try { @@ -178,7 +178,7 @@ public class DataSnapshot { }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } - + Long retryDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_RETRY_DELAY_MS; itemName = "aai.datasnapshot.retry.delay.ms"; try { @@ -189,7 +189,7 @@ public class DataSnapshot { }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } - + int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD; itemName = "aai.datasnapshot.max.errors.per.thread"; try { @@ -200,7 +200,7 @@ public class DataSnapshot { }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } - + Long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS; itemName = "aai.datasnapshot.vertex.to.edge.proc.delay.ms"; try { @@ -211,7 +211,7 @@ public class DataSnapshot { }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); } - + itemName = "aai.datasnapshot.stagger.thread.delay.ms"; try { String val = AAIConfig.get(itemName); @@ -220,28 +220,29 @@ public class DataSnapshot { } }catch ( Exception e ){ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage()); - } - + } + long debugAddDelayTime = 1; // Default to 1 millisecond Boolean debug4Create = false; // By default we do not use debugging for snapshot creation - + JCommander jCommander; try { jCommander = new JCommander(cArgs, args); jCommander.setProgramName(DataSnapshot.class.getSimpleName()); } catch (ParameterException e1) { AAIException ae = new AAIException("AAI_6128", e1 , "Error - invalid value passed to list of args - "+args); - ErrorLogHelper.logException(ae); + ErrorLogHelper.logException(ae); AAISystemExitUtil.systemExitCloseAAIGraph(1); } - + if (args.length >= 1) { command = cArgs.command; } - + String source = cArgs.caller; - String snapshotType = "graphson"; + // String snapshotType = "graphson"; + String snapshotType = cArgs.snapshotType; if( SNAPSHOT_RELOAD_COMMANDS.contains(cArgs.command)){ if (args.length >= 2) { // If re-loading, they need to also pass the snapshot file name to use. @@ -253,38 +254,10 @@ public class DataSnapshot { else if( command.equals("THREADED_SNAPSHOT") ){ if (args.length >= 2) { // If doing a "threaded" snapshot, they need to specify how many threads to use - try { - threadCount4Create = cArgs.threadCount; - } - catch ( NumberFormatException nfe ){ - ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); - LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } - if( threadCount4Create < 1 || threadCount4Create > 100 ){ - ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); - LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } - LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create ); - - try { - maxNodesPerFile4Create = cArgs.maxNodesPerFile; - } - catch ( NumberFormatException nfe ){ - ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); - LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } - - if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){ - ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); - LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); - LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } - LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create ); - + threadCount4Create = validateThreadCount(cArgs); + + validateMaxNodesPerFile(cArgs); + // If doing a "threaded" snapshot, they need to specify how many threads to use // They can also use debug mode if they pass the word "DEBUG" to do the nodes one at a time to see where it breaks. if( cArgs.debugFlag.equals("DEBUG") ){ @@ -292,21 +265,9 @@ public class DataSnapshot { } LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create + ", and DEBUG-flag set to: " + debug4Create ); - + if (debug4Create) { - // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1) - // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2) - // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3) - try { - debugAddDelayTime = cArgs.debugAddDelayTime; - } catch (NumberFormatException nfe) { - ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) debugAddDelayTime passed to DataSnapshot [" - + cArgs.debugAddDelayTime + "]"); - LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["+ cArgs.debugAddDelayTime + "]"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } - LOGGER.debug(" Will do Threaded Snapshot with threadCount = "+ threadCount4Create + ", DEBUG-flag set to: " - + debug4Create + ", and addDelayTimer = " + debugAddDelayTime + " mSec. "); + debugAddDelayTime = validateDebugAddDelayTime(cArgs, threadCount4Create, debug4Create); } } else { @@ -324,26 +285,14 @@ public class DataSnapshot { // it is a multi-part snapshot, then this should be the root of the name. // We will be using the default delay timers. oldSnapshotFileName = cArgs.oldFileName; - + // They should be passing the timers in in this order: // vertDelay, edgeDelay, failureDelay, retryDelay vertAddDelayMs = cArgs.vertAddDelayMs; edgeAddDelayMs = cArgs.edgeAddDelayMs; failureDelayMs = cArgs.failureDelayMs; retryDelayMs = cArgs.retryDelayMs; - try { - maxErrorsPerThread = cArgs.maxErrorsPerThread; - } - catch ( NumberFormatException nfe ){ - ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); - LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } - if( maxErrorsPerThread < 1 ){ - ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); - LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); - AAISystemExitUtil.systemExitCloseAAIGraph(1); - } + maxErrorsPerThread = validateMaxErrorsPerThread(cArgs); } else { ErrorLogHelper.logError("AAI_6128", "Wrong param count (should be either 2 or 7) when using MUTLITHREAD_RELOAD."); @@ -371,7 +320,7 @@ public class DataSnapshot { LOGGER.debug("File name to reload snapshot [" + cArgs.oldFileName + "]"); LOGGER.debug("snapshotType is [" + cArgs.snapshotType + "]"); LOGGER.debug("Thread count is [" + cArgs.threadCount + "]"); - LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]"); + LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]"); LOGGER.debug("Debug Flag is [" + cArgs.debugFlag + "]"); LOGGER.debug("DebugAddDelayTimer is [" + cArgs.debugAddDelayTime + "]"); LOGGER.debug("VertAddDelayMs is [" + cArgs.vertAddDelayMs + "]"); @@ -392,8 +341,8 @@ public class DataSnapshot { if (!AAIConfig.isEmpty(cArgs.oldFileDir)){ LOGGER.debug("Directory path (if not default) to load the old snapshot file from is [" + cArgs.oldFileDir + "]"); } - - + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { AAIConfig.init(); @@ -403,36 +352,16 @@ public class DataSnapshot { // Make sure the dataSnapshots directory is there new File(targetDir).mkdirs(); LOGGER.debug(" ---- NOTE --- about to open graph (takes a little while) "); - + if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT")) && threadCount4Create == 1 ){ - // ------------------------------------------------------------------------------- - // They want to take a snapshot on a single thread and have it go in a single file - // NOTE - they can't use the DEBUG option in this case. - // ------------------------------------------------------------------------------- - LOGGER.debug(" Command = " + command ); - verifyGraph(AAIGraph.getInstance().getGraph()); - FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT"); - String dteStr = fd.getDateTime(); - graph = AAIGraph.getInstance().getGraph(); - GraphAdminDBUtils.logConfigs(graph.configuration()); - String newSnapshotOutFname = null; - long timeA = System.nanoTime(); - newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr; - graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname); - LOGGER.debug("Snapshot written to " + newSnapshotOutFname); - long timeB = System.nanoTime(); - long diffTime = timeB - timeA; - long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); - long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); - LOGGER.debug(" -- Single-Thread dataSnapshot took: " + - minCount + " minutes, " + secCount + " seconds " ); - - } - else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT")) - && threadCount4Create > 1 ){ + graph = writeSnapshotFile(command, targetDir, snapshotType); + + } + else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT")) + && threadCount4Create > 1 ){ // ------------------------------------------------------------ - // They want the creation of the snapshot to be spread out via + // They want the creation of the snapshot to be spread out via // threads and go to multiple files // ------------------------------------------------------------ LOGGER.debug(" Command = " + command ); @@ -442,7 +371,7 @@ public class DataSnapshot { } else { FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT"); String dteStr = fd.getDateTime(); - newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr; } verifyGraph(AAIGraph.getInstance().getGraph()); @@ -460,12 +389,12 @@ public class DataSnapshot { LOGGER.debug(" -- To count all vertices in DB it took: " + minCount + " minutes, " + secCount + " seconds " ); LOGGER.debug(" Total Count of Nodes in DB = " + totalVertCount + "."); - - int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create, + + int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create, maxNodesPerFile4Create ); - int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create); - long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create); - + int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create); + long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create); + LOGGER.debug(" We will run this many simultaneous threads: " + threadCount4Create ); LOGGER.debug(" Required number of passes: " + threadPassesNeeded ); LOGGER.debug(" Max Nodes per file: " + maxNodesPerFile4Create ); @@ -479,8 +408,8 @@ public class DataSnapshot { String tk = "" + t; vertIdListHash.put( tk, vIdList); } - - int currentTNum = 0; + + int currentTNum = 0; String currentTKey = "0"; long thisThrIndex = 0; Iterator <Vertex> vtxItr = graph.vertices(); // Getting ALL vertices! @@ -496,12 +425,12 @@ public class DataSnapshot { long vid = (long)(vtxItr.next()).id(); (vertIdListHash.get(currentTKey)).add(vid); } - + // close this graph instance thing here since we have all the ids graph.tx().rollback(); graph.tx().close(); - - + + long timeB = System.nanoTime(); diffTime = timeB - timeA2; minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); @@ -512,9 +441,9 @@ public class DataSnapshot { // Need to print out each set of vertices using it's own thread // NOTE - we may have more files to generate than number of threads - which // just means that ALL the files won't necessarily be generated in parallel. - + int fileNo = 0; - for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){ + for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){ ArrayList <Thread> threadArr = new ArrayList <Thread> (); // For each Pass, kick off all the threads and wait until they finish long timeP1 = System.nanoTime(); @@ -522,14 +451,14 @@ public class DataSnapshot { String fileNoStr = "" + fileNo; String subFName = newSnapshotOutFname + ".P" + fileNoStr; LOGGER.debug(" DEBUG >>> kick off pass # " + passNo + ", thread # " + thNum); - Thread thr = new Thread(new PrintVertexDetails(graph, subFName, + Thread thr = new Thread(new PrintVertexDetails(graph, subFName, vertIdListHash.get(fileNoStr), - debug4Create, debugAddDelayTime, + debug4Create, debugAddDelayTime, snapshotType, LOGGER) ); thr.start(); threadArr.add(thr); fileNo++; - } + } // Make sure all the threads finish before considering this Pass finished. for( int thNum = 0; thNum < threadCount4Create; thNum++ ){ if( null != threadArr.get(thNum) ){ @@ -543,7 +472,7 @@ public class DataSnapshot { LOGGER.debug(" Pass number " + passNo + " (out of " + threadPassesNeeded + ") took " + minCount + " minutes, " + secCount + " seconds "); } - + long timeC = System.nanoTime(); diffTime = timeC - timeB; minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); @@ -551,7 +480,7 @@ public class DataSnapshot { LOGGER.debug(" -- To write all the data out to snapshot files, it took: " + minCount + " minutes, " + secCount + " seconds " ); - + } else if( command.equals("MULTITHREAD_RELOAD") ){ // --------------------------------------------------------------------- // They want the RELOAD of the snapshot to be spread out via threads @@ -559,11 +488,11 @@ public class DataSnapshot { // snapshot is written to. Ie. if you have a single-file snapshot, // then this will be single-threaded. // If the number of files is greater than the 'threadCount' parameter, - // then we will use more than one pass to keep the number of simultaneous + // then we will use more than one pass to keep the number of simultaneous // threads below the threadCount param. // LOGGER.debug(" Command = " + command ); - + if (cArgs.oldFileDir != null && !cArgs.oldFileDir.isEmpty()){ targetDir = cArgs.oldFileDir; } @@ -579,11 +508,11 @@ public class DataSnapshot { ExecutorService executor = Executors.newFixedThreadPool(fCount); int threadFailCount = 0; - + LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs + ", maxErrorsPerThread = " + maxErrorsPerThread ); - + // -------------------------------------- // Step 1 -- Load empty vertices // -------------------------------------- @@ -606,18 +535,18 @@ public class DataSnapshot { listFutV.add(future); LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY file # "+ fileNo + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")"); - + thisPassCount++; fileNo++; } - + int threadCount4Reload = 0; for(Future<HashMap<String,String>> fut : listFutV){ threadCount4Reload++; try { old2NewVertIdMap.putAll(fut.get()); - LOGGER.debug(" -- back from PartialVertexLoader. returned pass # " - + passNo + ", thread # " + LOGGER.debug(" -- back from PartialVertexLoader. returned pass # " + + passNo + ", thread # " + threadCount4Reload + ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() ); } @@ -633,7 +562,7 @@ public class DataSnapshot { } } } // end of passes for loading empty vertices - + executor.shutdown(); if( threadFailCount > 0 ) { String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. "; @@ -651,7 +580,7 @@ public class DataSnapshot { // Give the DB a little time to chew on all those new vertices Thread.sleep(vertToEdgeProcDelay); - + // ------------------------------------------------------------- // Step 2 -- Load Edges and properties onto the empty vertices // ------------------------------------------------------------- @@ -677,9 +606,9 @@ public class DataSnapshot { // add future to the list, we can wait for it below listFutEdg.add(future); LOGGER.debug(" -- Starting PartialPropAndEdge file # " - + fileNo + " (pass # " + passNo + ", passIndex " + + fileNo + " (pass # " + passNo + ", passIndex " + thisPassCount + ")" ); - + thisPassCount++; fileNo++; } @@ -688,7 +617,7 @@ public class DataSnapshot { for( Future<ArrayList<String>> fut : listFutEdg ){ threadCount4Reload++; try{ - fut.get(); + fut.get(); LOGGER.debug(" -- back from PartialPropAndEdgeLoader. pass # " + passNo + ", thread # " + threadCount4Reload ); } @@ -703,8 +632,8 @@ public class DataSnapshot { ErrorLogHelper.logException(ae); } } - - } // end of passes for reloading edges and properties + + } // end of passes for reloading edges and properties executor.shutdown(); @@ -770,7 +699,7 @@ public class DataSnapshot { LOGGER.debug(" reloading data or the data will be put in without indexes. "); dbClearFlag = true; LOGGER.debug("All done clearing DB"); - + } else if (command.equals("RELOAD_DATA")) { // --------------------------------------------------------------------------- // They want to restore the database from either a single file, or a group @@ -786,7 +715,7 @@ public class DataSnapshot { LOGGER.debug(emsg); AAISystemExitUtil.systemExitCloseAAIGraph(1); } - + long timeA = System.nanoTime(); ArrayList <File> snapFilesArr = new ArrayList <File> (); @@ -809,15 +738,15 @@ public class DataSnapshot { } } } - + if( snapFilesArr.isEmpty() ){ String emsg = "oldSnapshotFile " + onePieceSnapshotFname + "(with or without .P0) could not be found."; LOGGER.debug(emsg); AAISystemExitUtil.systemExitCloseAAIGraph(1); } - + int fCount = snapFilesArr.size(); - Vector<InputStream> inputStreamsV = new Vector<>(); + Vector<InputStream> inputStreamsV = new Vector<>(); for( int i = 0; i < fCount; i++ ){ File f = snapFilesArr.get(i); String fname = f.getName(); @@ -850,14 +779,14 @@ public class DataSnapshot { long vCount = graph.traversal().V().count().next(); LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db."); - + long timeB = System.nanoTime(); long diffTime = timeB - timeA; long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); LOGGER.debug(" -- To Reload this snapshot, it took: " + minCount + " minutes, " + secCount + " seconds " ); - + LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db."); } else { @@ -892,7 +821,122 @@ public class DataSnapshot { return success; } - + + + private JanusGraph writeSnapshotFile(String command, String targetDir, String format) throws IOException { + JanusGraph graph; + // ------------------------------------------------------------------------------- + // They want to take a snapshot on a single thread and have it go in a single file + // NOTE - they can't use the DEBUG option in this case. + // ------------------------------------------------------------------------------- + if (format != "graphson" && format != "gryo") { + format = "graphson"; + } + LOGGER.debug(" Command = " + command ); + verifyGraph(AAIGraph.getInstance().getGraph()); + FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT"); + String dteStr = fd.getDateTime(); + graph = AAIGraph.getInstance().getGraph(); + GraphAdminDBUtils.logConfigs(graph.configuration()); + String newSnapshotOutFname = null; + long timeA = System.nanoTime(); + + if(format == "gryo") { + newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "snapshot_" + dteStr + ".gryo"; + graph.io(IoCore.gryo()).writeGraph(newSnapshotOutFname); + } else { + newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr; + graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname); + } + LOGGER.debug("Snapshot written to " + newSnapshotOutFname); + long timeB = System.nanoTime(); + long diffTime = timeB - timeA; + long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- Single-Thread dataSnapshot took: " + + minCount + " minutes, " + secCount + " seconds " ); + return graph; + } + + + private int validateMaxErrorsPerThread(CommandLineArgs cArgs) { + int maxErrorsPerThread = 0; + try { + maxErrorsPerThread = cArgs.maxErrorsPerThread; + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); + LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( maxErrorsPerThread < 1 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); + LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + return maxErrorsPerThread; + } + + + private long validateDebugAddDelayTime(CommandLineArgs cArgs, int threadCount4Create, Boolean debug4Create) { + // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1) + // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2) + // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3) + long debugAddDelayTime = 0; + try { + debugAddDelayTime = cArgs.debugAddDelayTime; + } catch (NumberFormatException nfe) { + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) debugAddDelayTime passed to DataSnapshot [" + + cArgs.debugAddDelayTime + "]"); + LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["+ cArgs.debugAddDelayTime + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + LOGGER.debug(" Will do Threaded Snapshot with threadCount = "+ threadCount4Create + ", DEBUG-flag set to: " + + debug4Create + ", and addDelayTimer = " + debugAddDelayTime + " mSec. "); + return debugAddDelayTime; + } + + + private void validateMaxNodesPerFile(CommandLineArgs cArgs) { + long maxNodesPerFile4Create = 0; + try { + maxNodesPerFile4Create = cArgs.maxNodesPerFile; + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); + LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + + if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); + LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]"); + LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create ); + } + + + private int validateThreadCount(CommandLineArgs cArgs) { + int threadCount4Create = 0; + try { + threadCount4Create = cArgs.threadCount; + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); + LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + if( threadCount4Create < 1 || threadCount4Create > 100 ){ + ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); + LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } + LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create ); + return threadCount4Create; + } + private static ArrayList <File> getFilesToProcess(String targetDir, String oldSnapshotFileName, boolean doingClearDb) throws Exception { @@ -972,26 +1016,26 @@ public class DataSnapshot { } - - public static int figureOutFileCount( long totalVertCount, int threadCount4Create, + + public static int figureOutFileCount( long totalVertCount, int threadCount4Create, long maxNodesPerFile ) { - + // NOTE - we would always like to use all of our threads. That is, if // we could process all the data with 16 threads, but our threadCount4Create is - // only 15, we will do two passes and use all 15 threads each pass which will + // only 15, we will do two passes and use all 15 threads each pass which will // create a total of 30 files. Each file will be a bit smaller so the overall // time for the two passes should be faster. if( totalVertCount <= 0 || threadCount4Create <= 0 || maxNodesPerFile <= 0) { return 1; } - - long maxNodesPerPass = threadCount4Create * maxNodesPerFile; - int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass); + + long maxNodesPerPass = threadCount4Create * maxNodesPerFile; + int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass); int fileCt = threadCount4Create * numberOfPasses; - - return fileCt; + + return fileCt; } - + class CommandLineArgs { @@ -1009,7 +1053,7 @@ public class DataSnapshot { @Parameter(names = "-threadCount", description = "thread count for create") public int threadCount = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_THREADS_FOR_CREATE; - + @Parameter(names = "-maxNodesPerFile", description = "Max nodes per file") public long maxNodesPerFile = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_NODES_PER_FILE_FOR_CREATE; @@ -1018,13 +1062,13 @@ public class DataSnapshot { @Parameter(names = "-debugAddDelayTime", description = "delay in ms between each Add for debug mode") public long debugAddDelayTime = 1L; - + @Parameter(names = "-vertAddDelayMs", description = "delay in ms while adding each vertex") public long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS.longValue(); - + @Parameter(names = "-edgeAddDelayMs", description = "delay in ms while adding each edge") public long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS.longValue(); - + @Parameter(names = "-failureDelayMs", description = "delay in ms when failure to load vertex or edge in snapshot") public long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS.longValue(); @@ -1033,25 +1077,25 @@ public class DataSnapshot { @Parameter(names = "-maxErrorsPerThread", description = "max errors allowed per thread") public int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD; - + @Parameter(names = "-vertToEdgeProcDelay", description = "vertex to edge processing delay in ms") public long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS.longValue(); - + @Parameter(names = "-staggerThreadDelay", description = "thread delay stagger time in ms") public long staggerThreadDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_STAGGER_THREAD_DELAY_MS; - + @Parameter(names = "-fileName", description = "file name for generating snapshot ") public String fileName = ""; - + @Parameter(names = "-snapshotDir", description = "file path for generating snapshot ") public String snapshotDir = ""; - + @Parameter(names = "-oldFileDir", description = "directory containing the old snapshot file for reloading") public String oldFileDir = ""; - + @Parameter(names = "-caller", description = "process invoking the dataSnapshot") public String caller = ""; - + } - -}
\ No newline at end of file + +} diff --git a/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java b/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java index 9d74651..cc61e99 100644 --- a/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java +++ b/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java @@ -56,7 +56,7 @@ public class DataSnapshotTest extends AAISetup { private JanusGraphTransaction currentTransaction; private List<Vertex> vertexes; - + private static final int DELAYSINGLETHREADTEST = 90; @Rule @@ -126,15 +126,13 @@ public class DataSnapshotTest extends AAISetup { assertThat(outputCapture.toString(), containsString("graphson had no data.")); } - - @Ignore("Unit test failing temporarily ignore") @Test public void testTakeSnapshotAndItShouldCreateASnapshotFileWithOneVertex() throws IOException, InterruptedException { String logsFolder = System.getProperty("AJSC_HOME") + "/logs/data/dataSnapshots/"; Set<Path> preSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet()); - + // previous test may have the same generated file name, this wait will ensure a new name is used for this test System.out.println("delay generation, seconds " + DELAYSINGLETHREADTEST); Thread.sleep(DELAYSINGLETHREADTEST*1000); @@ -149,7 +147,7 @@ public class DataSnapshotTest extends AAISetup { Set<Path> postSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet()); - assertThat(postSnapshotFiles.size(), is(37)); + assertThat(postSnapshotFiles.size(), is(preSnapshotFiles.size() +1)); postSnapshotFiles.removeAll(preSnapshotFiles); List<Path> snapshotPathList = postSnapshotFiles.stream().collect(Collectors.toList()); @@ -158,7 +156,39 @@ public class DataSnapshotTest extends AAISetup { List<String> fileContents = Files.readAllLines(snapshotPathList.get(0)); assertThat(fileContents.get(0), containsString("id")); } - + + @Test + public void testTakeKryoSnapshotAndItShouldCreateASnapshotFileWithOneVertex() throws IOException, InterruptedException { + + String logsFolder = System.getProperty("AJSC_HOME") + "/logs/data/dataSnapshots/"; + + Set<Path> preSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet()); + + // previous test may have the same generated file name, this wait will ensure a new name is used for this test + System.out.println("delay generation, seconds " + DELAYSINGLETHREADTEST); + Thread.sleep(DELAYSINGLETHREADTEST*100); + // Run the clear dataSnapshot and this time it should fail + //String [] args = {"JUST_TAKE_SNAPSHOT"}; >> default behavior is now to use 15 threads + // To just get one file, you have to tell it to just use one. + String [] args = {"-c","THREADED_SNAPSHOT", "-threadCount" ,"1", "-snapshotType" , "gryo"}; + + DataSnapshot.main(args); + + // Add sleep so the file actually gets created with the data + + Set<Path> postSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet()); + + assertThat(postSnapshotFiles.size(), is(preSnapshotFiles.size() +1)); + boolean gryoSnapshotExists = postSnapshotFiles.stream() + .map(Path::toString) + .anyMatch(name -> name.endsWith("gryo")); + assertTrue(gryoSnapshotExists); + postSnapshotFiles.removeAll(preSnapshotFiles); + List<Path> snapshotPathList = postSnapshotFiles.stream().collect(Collectors.toList()); + + assertThat(snapshotPathList.size(), is(1)); + } + @Test public void testTakeSnapshotMultiAndItShouldCreateMultipleSnapshotFiles() throws IOException { @@ -180,27 +210,27 @@ public class DataSnapshotTest extends AAISetup { long totalVerts = 5000; int threadCt = 15; long maxNodesPerFile = 120000; - - int fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, + + int fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, maxNodesPerFile ); assertThat( fileCt, is(15)); - + totalVerts = 5000; threadCt = 15; maxNodesPerFile = 100; - fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, + fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, maxNodesPerFile ); assertThat( fileCt, is(60)); - + totalVerts = 1500; threadCt = 15; maxNodesPerFile = 100; - fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, + fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, maxNodesPerFile ); - assertThat( fileCt, is(15)); - + assertThat( fileCt, is(15)); + } - + @Test public void testTakeSnapshotMultiWithDebugAndItShouldCreateMultipleSnapshotFiles() throws IOException { @@ -224,7 +254,7 @@ public class DataSnapshotTest extends AAISetup { // Run the clear dataSnapshot and this time it should fail String [] args = {"-c","THREADED_SNAPSHOT", "-threadCount","foo","-debugFlag", "DEBUG"}; - + DataSnapshot.main(args); // For this test if there is only one vertex in the graph, not sure if it will create multiple files |