summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-06-12 16:53:33 +0200
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-06-13 15:28:10 +0200
commit023e0b5018247f02e05dbed3f91c495161fde814 (patch)
tree9fed7ae6e82ea9389c27896a0b66962aa1a242f7
parent1647fc01e2dd11a56770cf6b1dd6d2c1f11392f5 (diff)
Support exporting to the kryo format in the dataSnapshot script
- add support for the kryo format when creating a database snapshot Issue-ID: AAI-3872 Change-Id: Iba085f7227bbfcb928370443c2e0eac95e49bb9c Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
-rw-r--r--src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java396
-rw-r--r--src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java62
2 files changed, 266 insertions, 192 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
index a9312b2..cdb858e 100644
--- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
+++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
@@ -67,7 +67,7 @@ import com.beust.jcommander.ParameterException;
public class DataSnapshot {
private static Logger LOGGER;
-
+
/* Using realtime d */
private static final String REALTIME_DB = "realtime";
@@ -79,10 +79,10 @@ public class DataSnapshot {
SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA");
SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA_MULTI");
}
-
+
private CommandLineArgs cArgs;
-
-
+
+
/**
* The main method.
*
@@ -101,7 +101,7 @@ public class DataSnapshot {
DataSnapshot dataSnapshot = new DataSnapshot();
success = dataSnapshot.executeCommand(args, success, dbClearFlag, graph, command,
oldSnapshotFileName);
-
+
if(success){
AAISystemExitUtil.systemExitCloseAAIGraph(0);
} else {
@@ -114,7 +114,7 @@ public class DataSnapshot {
public boolean executeCommand(String[] args, boolean success,
Boolean dbClearFlag, JanusGraph graph, String command,
String oldSnapshotFileName) {
-
+
// Set the logging file properties to be used by EELFManager
System.setProperty("aai.service.name", DataSnapshot.class.getSimpleName());
Properties props = System.getProperties();
@@ -122,7 +122,7 @@ public class DataSnapshot {
props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
LOGGER = LoggerFactory.getLogger(DataSnapshot.class);
cArgs = new CommandLineArgs();
-
+
String itemName = "aai.datasnapshot.threads.for.create";
try {
String val = AAIConfig.get(itemName);
@@ -144,8 +144,8 @@ public class DataSnapshot {
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
long maxNodesPerFile4Create = cArgs.maxNodesPerFile;
-
- cArgs.snapshotType = "graphson";
+
+ // cArgs.snapshotType = "graphson";
Long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS;
itemName = "aai.datasnapshot.vertex.add.delay.ms";
try {
@@ -156,7 +156,7 @@ public class DataSnapshot {
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
-
+
Long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS;
itemName = "aai.datasnapshot.edge.add.delay.ms";
try {
@@ -167,7 +167,7 @@ public class DataSnapshot {
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
-
+
Long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS;
itemName = "aai.datasnapshot.failure.delay.ms";
try {
@@ -178,7 +178,7 @@ public class DataSnapshot {
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
-
+
Long retryDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_RETRY_DELAY_MS;
itemName = "aai.datasnapshot.retry.delay.ms";
try {
@@ -189,7 +189,7 @@ public class DataSnapshot {
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
-
+
int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD;
itemName = "aai.datasnapshot.max.errors.per.thread";
try {
@@ -200,7 +200,7 @@ public class DataSnapshot {
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
-
+
Long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS;
itemName = "aai.datasnapshot.vertex.to.edge.proc.delay.ms";
try {
@@ -211,7 +211,7 @@ public class DataSnapshot {
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
-
+
itemName = "aai.datasnapshot.stagger.thread.delay.ms";
try {
String val = AAIConfig.get(itemName);
@@ -220,28 +220,29 @@ public class DataSnapshot {
}
}catch ( Exception e ){
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
- }
-
+ }
+
long debugAddDelayTime = 1; // Default to 1 millisecond
Boolean debug4Create = false; // By default we do not use debugging for snapshot creation
-
+
JCommander jCommander;
try {
jCommander = new JCommander(cArgs, args);
jCommander.setProgramName(DataSnapshot.class.getSimpleName());
} catch (ParameterException e1) {
AAIException ae = new AAIException("AAI_6128", e1 , "Error - invalid value passed to list of args - "+args);
- ErrorLogHelper.logException(ae);
+ ErrorLogHelper.logException(ae);
AAISystemExitUtil.systemExitCloseAAIGraph(1);
}
-
+
if (args.length >= 1) {
command = cArgs.command;
}
-
+
String source = cArgs.caller;
- String snapshotType = "graphson";
+ // String snapshotType = "graphson";
+ String snapshotType = cArgs.snapshotType;
if( SNAPSHOT_RELOAD_COMMANDS.contains(cArgs.command)){
if (args.length >= 2) {
// If re-loading, they need to also pass the snapshot file name to use.
@@ -253,38 +254,10 @@ public class DataSnapshot {
else if( command.equals("THREADED_SNAPSHOT") ){
if (args.length >= 2) {
// If doing a "threaded" snapshot, they need to specify how many threads to use
- try {
- threadCount4Create = cArgs.threadCount;
- }
- catch ( NumberFormatException nfe ){
- ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
- LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
- if( threadCount4Create < 1 || threadCount4Create > 100 ){
- ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
- LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
- LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create );
-
- try {
- maxNodesPerFile4Create = cArgs.maxNodesPerFile;
- }
- catch ( NumberFormatException nfe ){
- ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
- LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
-
- if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){
- ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
- LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
- LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
- LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create );
-
+ threadCount4Create = validateThreadCount(cArgs);
+
+ validateMaxNodesPerFile(cArgs);
+
// If doing a "threaded" snapshot, they need to specify how many threads to use
// They can also use debug mode if they pass the word "DEBUG" to do the nodes one at a time to see where it breaks.
if( cArgs.debugFlag.equals("DEBUG") ){
@@ -292,21 +265,9 @@ public class DataSnapshot {
}
LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create +
", and DEBUG-flag set to: " + debug4Create );
-
+
if (debug4Create) {
- // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1)
- // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2)
- // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3)
- try {
- debugAddDelayTime = cArgs.debugAddDelayTime;
- } catch (NumberFormatException nfe) {
- ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["
- + cArgs.debugAddDelayTime + "]");
- LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["+ cArgs.debugAddDelayTime + "]");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
- LOGGER.debug(" Will do Threaded Snapshot with threadCount = "+ threadCount4Create + ", DEBUG-flag set to: "
- + debug4Create + ", and addDelayTimer = " + debugAddDelayTime + " mSec. ");
+ debugAddDelayTime = validateDebugAddDelayTime(cArgs, threadCount4Create, debug4Create);
}
}
else {
@@ -324,26 +285,14 @@ public class DataSnapshot {
// it is a multi-part snapshot, then this should be the root of the name.
// We will be using the default delay timers.
oldSnapshotFileName = cArgs.oldFileName;
-
+
// They should be passing the timers in in this order:
// vertDelay, edgeDelay, failureDelay, retryDelay
vertAddDelayMs = cArgs.vertAddDelayMs;
edgeAddDelayMs = cArgs.edgeAddDelayMs;
failureDelayMs = cArgs.failureDelayMs;
retryDelayMs = cArgs.retryDelayMs;
- try {
- maxErrorsPerThread = cArgs.maxErrorsPerThread;
- }
- catch ( NumberFormatException nfe ){
- ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
- LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
- if( maxErrorsPerThread < 1 ){
- ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
- LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
- AAISystemExitUtil.systemExitCloseAAIGraph(1);
- }
+ maxErrorsPerThread = validateMaxErrorsPerThread(cArgs);
}
else {
ErrorLogHelper.logError("AAI_6128", "Wrong param count (should be either 2 or 7) when using MUTLITHREAD_RELOAD.");
@@ -371,7 +320,7 @@ public class DataSnapshot {
LOGGER.debug("File name to reload snapshot [" + cArgs.oldFileName + "]");
LOGGER.debug("snapshotType is [" + cArgs.snapshotType + "]");
LOGGER.debug("Thread count is [" + cArgs.threadCount + "]");
- LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]");
LOGGER.debug("Debug Flag is [" + cArgs.debugFlag + "]");
LOGGER.debug("DebugAddDelayTimer is [" + cArgs.debugAddDelayTime + "]");
LOGGER.debug("VertAddDelayMs is [" + cArgs.vertAddDelayMs + "]");
@@ -392,8 +341,8 @@ public class DataSnapshot {
if (!AAIConfig.isEmpty(cArgs.oldFileDir)){
LOGGER.debug("Directory path (if not default) to load the old snapshot file from is [" + cArgs.oldFileDir + "]");
}
-
-
+
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
AAIConfig.init();
@@ -403,36 +352,16 @@ public class DataSnapshot {
// Make sure the dataSnapshots directory is there
new File(targetDir).mkdirs();
LOGGER.debug(" ---- NOTE --- about to open graph (takes a little while) ");
-
+
if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
&& threadCount4Create == 1 ){
- // -------------------------------------------------------------------------------
- // They want to take a snapshot on a single thread and have it go in a single file
- // NOTE - they can't use the DEBUG option in this case.
- // -------------------------------------------------------------------------------
- LOGGER.debug(" Command = " + command );
- verifyGraph(AAIGraph.getInstance().getGraph());
- FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
- String dteStr = fd.getDateTime();
- graph = AAIGraph.getInstance().getGraph();
- GraphAdminDBUtils.logConfigs(graph.configuration());
- String newSnapshotOutFname = null;
- long timeA = System.nanoTime();
- newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr;
- graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname);
- LOGGER.debug("Snapshot written to " + newSnapshotOutFname);
- long timeB = System.nanoTime();
- long diffTime = timeB - timeA;
- long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
- long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
- LOGGER.debug(" -- Single-Thread dataSnapshot took: " +
- minCount + " minutes, " + secCount + " seconds " );
-
- }
- else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
- && threadCount4Create > 1 ){
+ graph = writeSnapshotFile(command, targetDir, snapshotType);
+
+ }
+ else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
+ && threadCount4Create > 1 ){
// ------------------------------------------------------------
- // They want the creation of the snapshot to be spread out via
+ // They want the creation of the snapshot to be spread out via
// threads and go to multiple files
// ------------------------------------------------------------
LOGGER.debug(" Command = " + command );
@@ -442,7 +371,7 @@ public class DataSnapshot {
} else {
FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
String dteStr = fd.getDateTime();
- newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP
+ newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP
+ "dataSnapshot.graphSON." + dteStr;
}
verifyGraph(AAIGraph.getInstance().getGraph());
@@ -460,12 +389,12 @@ public class DataSnapshot {
LOGGER.debug(" -- To count all vertices in DB it took: " +
minCount + " minutes, " + secCount + " seconds " );
LOGGER.debug(" Total Count of Nodes in DB = " + totalVertCount + ".");
-
- int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create,
+
+ int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create,
maxNodesPerFile4Create );
- int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);
- long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);
-
+ int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);
+ long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);
+
LOGGER.debug(" We will run this many simultaneous threads: " + threadCount4Create );
LOGGER.debug(" Required number of passes: " + threadPassesNeeded );
LOGGER.debug(" Max Nodes per file: " + maxNodesPerFile4Create );
@@ -479,8 +408,8 @@ public class DataSnapshot {
String tk = "" + t;
vertIdListHash.put( tk, vIdList);
}
-
- int currentTNum = 0;
+
+ int currentTNum = 0;
String currentTKey = "0";
long thisThrIndex = 0;
Iterator <Vertex> vtxItr = graph.vertices(); // Getting ALL vertices!
@@ -496,12 +425,12 @@ public class DataSnapshot {
long vid = (long)(vtxItr.next()).id();
(vertIdListHash.get(currentTKey)).add(vid);
}
-
+
// close this graph instance thing here since we have all the ids
graph.tx().rollback();
graph.tx().close();
-
-
+
+
long timeB = System.nanoTime();
diffTime = timeB - timeA2;
minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -512,9 +441,9 @@ public class DataSnapshot {
// Need to print out each set of vertices using it's own thread
// NOTE - we may have more files to generate than number of threads - which
// just means that ALL the files won't necessarily be generated in parallel.
-
+
int fileNo = 0;
- for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+ for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
ArrayList <Thread> threadArr = new ArrayList <Thread> ();
// For each Pass, kick off all the threads and wait until they finish
long timeP1 = System.nanoTime();
@@ -522,14 +451,14 @@ public class DataSnapshot {
String fileNoStr = "" + fileNo;
String subFName = newSnapshotOutFname + ".P" + fileNoStr;
LOGGER.debug(" DEBUG >>> kick off pass # " + passNo + ", thread # " + thNum);
- Thread thr = new Thread(new PrintVertexDetails(graph, subFName,
+ Thread thr = new Thread(new PrintVertexDetails(graph, subFName,
vertIdListHash.get(fileNoStr),
- debug4Create, debugAddDelayTime,
+ debug4Create, debugAddDelayTime,
snapshotType, LOGGER) );
thr.start();
threadArr.add(thr);
fileNo++;
- }
+ }
// Make sure all the threads finish before considering this Pass finished.
for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
if( null != threadArr.get(thNum) ){
@@ -543,7 +472,7 @@ public class DataSnapshot {
LOGGER.debug(" Pass number " + passNo + " (out of " + threadPassesNeeded +
") took " + minCount + " minutes, " + secCount + " seconds ");
}
-
+
long timeC = System.nanoTime();
diffTime = timeC - timeB;
minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -551,7 +480,7 @@ public class DataSnapshot {
LOGGER.debug(" -- To write all the data out to snapshot files, it took: " +
minCount + " minutes, " + secCount + " seconds " );
-
+
} else if( command.equals("MULTITHREAD_RELOAD") ){
// ---------------------------------------------------------------------
// They want the RELOAD of the snapshot to be spread out via threads
@@ -559,11 +488,11 @@ public class DataSnapshot {
// snapshot is written to. Ie. if you have a single-file snapshot,
// then this will be single-threaded.
// If the number of files is greater than the 'threadCount' parameter,
- // then we will use more than one pass to keep the number of simultaneous
+ // then we will use more than one pass to keep the number of simultaneous
// threads below the threadCount param.
//
LOGGER.debug(" Command = " + command );
-
+
if (cArgs.oldFileDir != null && !cArgs.oldFileDir.isEmpty()){
targetDir = cArgs.oldFileDir;
}
@@ -579,11 +508,11 @@ public class DataSnapshot {
ExecutorService executor = Executors.newFixedThreadPool(fCount);
int threadFailCount = 0;
-
+
LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
+ ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
+ ", maxErrorsPerThread = " + maxErrorsPerThread );
-
+
// --------------------------------------
// Step 1 -- Load empty vertices
// --------------------------------------
@@ -606,18 +535,18 @@ public class DataSnapshot {
listFutV.add(future);
LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY file # "+ fileNo
+ "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")");
-
+
thisPassCount++;
fileNo++;
}
-
+
int threadCount4Reload = 0;
for(Future<HashMap<String,String>> fut : listFutV){
threadCount4Reload++;
try {
old2NewVertIdMap.putAll(fut.get());
- LOGGER.debug(" -- back from PartialVertexLoader. returned pass # "
- + passNo + ", thread # "
+ LOGGER.debug(" -- back from PartialVertexLoader. returned pass # "
+ + passNo + ", thread # "
+ threadCount4Reload +
", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
}
@@ -633,7 +562,7 @@ public class DataSnapshot {
}
}
} // end of passes for loading empty vertices
-
+
executor.shutdown();
if( threadFailCount > 0 ) {
String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. ";
@@ -651,7 +580,7 @@ public class DataSnapshot {
// Give the DB a little time to chew on all those new vertices
Thread.sleep(vertToEdgeProcDelay);
-
+
// -------------------------------------------------------------
// Step 2 -- Load Edges and properties onto the empty vertices
// -------------------------------------------------------------
@@ -677,9 +606,9 @@ public class DataSnapshot {
// add future to the list, we can wait for it below
listFutEdg.add(future);
LOGGER.debug(" -- Starting PartialPropAndEdge file # "
- + fileNo + " (pass # " + passNo + ", passIndex "
+ + fileNo + " (pass # " + passNo + ", passIndex "
+ thisPassCount + ")" );
-
+
thisPassCount++;
fileNo++;
}
@@ -688,7 +617,7 @@ public class DataSnapshot {
for( Future<ArrayList<String>> fut : listFutEdg ){
threadCount4Reload++;
try{
- fut.get();
+ fut.get();
LOGGER.debug(" -- back from PartialPropAndEdgeLoader. pass # "
+ passNo + ", thread # " + threadCount4Reload );
}
@@ -703,8 +632,8 @@ public class DataSnapshot {
ErrorLogHelper.logException(ae);
}
}
-
- } // end of passes for reloading edges and properties
+
+ } // end of passes for reloading edges and properties
executor.shutdown();
@@ -770,7 +699,7 @@ public class DataSnapshot {
LOGGER.debug(" reloading data or the data will be put in without indexes. ");
dbClearFlag = true;
LOGGER.debug("All done clearing DB");
-
+
} else if (command.equals("RELOAD_DATA")) {
// ---------------------------------------------------------------------------
// They want to restore the database from either a single file, or a group
@@ -786,7 +715,7 @@ public class DataSnapshot {
LOGGER.debug(emsg);
AAISystemExitUtil.systemExitCloseAAIGraph(1);
}
-
+
long timeA = System.nanoTime();
ArrayList <File> snapFilesArr = new ArrayList <File> ();
@@ -809,15 +738,15 @@ public class DataSnapshot {
}
}
}
-
+
if( snapFilesArr.isEmpty() ){
String emsg = "oldSnapshotFile " + onePieceSnapshotFname + "(with or without .P0) could not be found.";
LOGGER.debug(emsg);
AAISystemExitUtil.systemExitCloseAAIGraph(1);
}
-
+
int fCount = snapFilesArr.size();
- Vector<InputStream> inputStreamsV = new Vector<>();
+ Vector<InputStream> inputStreamsV = new Vector<>();
for( int i = 0; i < fCount; i++ ){
File f = snapFilesArr.get(i);
String fname = f.getName();
@@ -850,14 +779,14 @@ public class DataSnapshot {
long vCount = graph.traversal().V().count().next();
LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db.");
-
+
long timeB = System.nanoTime();
long diffTime = timeB - timeA;
long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
LOGGER.debug(" -- To Reload this snapshot, it took: " +
minCount + " minutes, " + secCount + " seconds " );
-
+
LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db.");
} else {
@@ -892,7 +821,122 @@ public class DataSnapshot {
return success;
}
-
+
+
+ private JanusGraph writeSnapshotFile(String command, String targetDir, String format) throws IOException {
+ JanusGraph graph;
+ // -------------------------------------------------------------------------------
+ // They want to take a snapshot on a single thread and have it go in a single file
+ // NOTE - they can't use the DEBUG option in this case.
+ // -------------------------------------------------------------------------------
+ if (format != "graphson" && format != "gryo") {
+ format = "graphson";
+ }
+ LOGGER.debug(" Command = " + command );
+ verifyGraph(AAIGraph.getInstance().getGraph());
+ FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
+ String dteStr = fd.getDateTime();
+ graph = AAIGraph.getInstance().getGraph();
+ GraphAdminDBUtils.logConfigs(graph.configuration());
+ String newSnapshotOutFname = null;
+ long timeA = System.nanoTime();
+
+ if(format == "gryo") {
+ newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "snapshot_" + dteStr + ".gryo";
+ graph.io(IoCore.gryo()).writeGraph(newSnapshotOutFname);
+ } else {
+ newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr;
+ graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname);
+ }
+ LOGGER.debug("Snapshot written to " + newSnapshotOutFname);
+ long timeB = System.nanoTime();
+ long diffTime = timeB - timeA;
+ long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" -- Single-Thread dataSnapshot took: " +
+ minCount + " minutes, " + secCount + " seconds " );
+ return graph;
+ }
+
+
+ private int validateMaxErrorsPerThread(CommandLineArgs cArgs) {
+ int maxErrorsPerThread = 0;
+ try {
+ maxErrorsPerThread = cArgs.maxErrorsPerThread;
+ }
+ catch ( NumberFormatException nfe ){
+ ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+ LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ if( maxErrorsPerThread < 1 ){
+ ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+ LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ return maxErrorsPerThread;
+ }
+
+
+ private long validateDebugAddDelayTime(CommandLineArgs cArgs, int threadCount4Create, Boolean debug4Create) {
+ // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1)
+ // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2)
+ // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3)
+ long debugAddDelayTime = 0;
+ try {
+ debugAddDelayTime = cArgs.debugAddDelayTime;
+ } catch (NumberFormatException nfe) {
+ ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["
+ + cArgs.debugAddDelayTime + "]");
+ LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["+ cArgs.debugAddDelayTime + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ LOGGER.debug(" Will do Threaded Snapshot with threadCount = "+ threadCount4Create + ", DEBUG-flag set to: "
+ + debug4Create + ", and addDelayTimer = " + debugAddDelayTime + " mSec. ");
+ return debugAddDelayTime;
+ }
+
+
+ private void validateMaxNodesPerFile(CommandLineArgs cArgs) {
+ long maxNodesPerFile4Create = 0;
+ try {
+ maxNodesPerFile4Create = cArgs.maxNodesPerFile;
+ }
+ catch ( NumberFormatException nfe ){
+ ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+
+ if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){
+ ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create );
+ }
+
+
+ private int validateThreadCount(CommandLineArgs cArgs) {
+ int threadCount4Create = 0;
+ try {
+ threadCount4Create = cArgs.threadCount;
+ }
+ catch ( NumberFormatException nfe ){
+ ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+ LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ if( threadCount4Create < 1 || threadCount4Create > 100 ){
+ ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+ LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create );
+ return threadCount4Create;
+ }
+
private static ArrayList <File> getFilesToProcess(String targetDir, String oldSnapshotFileName, boolean doingClearDb)
throws Exception {
@@ -972,26 +1016,26 @@ public class DataSnapshot {
}
-
- public static int figureOutFileCount( long totalVertCount, int threadCount4Create,
+
+ public static int figureOutFileCount( long totalVertCount, int threadCount4Create,
long maxNodesPerFile ) {
-
+
// NOTE - we would always like to use all of our threads. That is, if
// we could process all the data with 16 threads, but our threadCount4Create is
- // only 15, we will do two passes and use all 15 threads each pass which will
+ // only 15, we will do two passes and use all 15 threads each pass which will
// create a total of 30 files. Each file will be a bit smaller so the overall
// time for the two passes should be faster.
if( totalVertCount <= 0 || threadCount4Create <= 0 || maxNodesPerFile <= 0) {
return 1;
}
-
- long maxNodesPerPass = threadCount4Create * maxNodesPerFile;
- int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);
+
+ long maxNodesPerPass = threadCount4Create * maxNodesPerFile;
+ int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);
int fileCt = threadCount4Create * numberOfPasses;
-
- return fileCt;
+
+ return fileCt;
}
-
+
class CommandLineArgs {
@@ -1009,7 +1053,7 @@ public class DataSnapshot {
@Parameter(names = "-threadCount", description = "thread count for create")
public int threadCount = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_THREADS_FOR_CREATE;
-
+
@Parameter(names = "-maxNodesPerFile", description = "Max nodes per file")
public long maxNodesPerFile = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_NODES_PER_FILE_FOR_CREATE;
@@ -1018,13 +1062,13 @@ public class DataSnapshot {
@Parameter(names = "-debugAddDelayTime", description = "delay in ms between each Add for debug mode")
public long debugAddDelayTime = 1L;
-
+
@Parameter(names = "-vertAddDelayMs", description = "delay in ms while adding each vertex")
public long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS.longValue();
-
+
@Parameter(names = "-edgeAddDelayMs", description = "delay in ms while adding each edge")
public long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS.longValue();
-
+
@Parameter(names = "-failureDelayMs", description = "delay in ms when failure to load vertex or edge in snapshot")
public long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS.longValue();
@@ -1033,25 +1077,25 @@ public class DataSnapshot {
@Parameter(names = "-maxErrorsPerThread", description = "max errors allowed per thread")
public int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD;
-
+
@Parameter(names = "-vertToEdgeProcDelay", description = "vertex to edge processing delay in ms")
public long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS.longValue();
-
+
@Parameter(names = "-staggerThreadDelay", description = "thread delay stagger time in ms")
public long staggerThreadDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_STAGGER_THREAD_DELAY_MS;
-
+
@Parameter(names = "-fileName", description = "file name for generating snapshot ")
public String fileName = "";
-
+
@Parameter(names = "-snapshotDir", description = "file path for generating snapshot ")
public String snapshotDir = "";
-
+
@Parameter(names = "-oldFileDir", description = "directory containing the old snapshot file for reloading")
public String oldFileDir = "";
-
+
@Parameter(names = "-caller", description = "process invoking the dataSnapshot")
public String caller = "";
-
+
}
-
-} \ No newline at end of file
+
+}
diff --git a/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java b/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java
index 9d74651..cc61e99 100644
--- a/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java
+++ b/src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java
@@ -56,7 +56,7 @@ public class DataSnapshotTest extends AAISetup {
private JanusGraphTransaction currentTransaction;
private List<Vertex> vertexes;
-
+
private static final int DELAYSINGLETHREADTEST = 90;
@Rule
@@ -126,15 +126,13 @@ public class DataSnapshotTest extends AAISetup {
assertThat(outputCapture.toString(), containsString("graphson had no data."));
}
-
- @Ignore("Unit test failing temporarily ignore")
@Test
public void testTakeSnapshotAndItShouldCreateASnapshotFileWithOneVertex() throws IOException, InterruptedException {
String logsFolder = System.getProperty("AJSC_HOME") + "/logs/data/dataSnapshots/";
Set<Path> preSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
-
+
// previous test may have the same generated file name, this wait will ensure a new name is used for this test
System.out.println("delay generation, seconds " + DELAYSINGLETHREADTEST);
Thread.sleep(DELAYSINGLETHREADTEST*1000);
@@ -149,7 +147,7 @@ public class DataSnapshotTest extends AAISetup {
Set<Path> postSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
- assertThat(postSnapshotFiles.size(), is(37));
+ assertThat(postSnapshotFiles.size(), is(preSnapshotFiles.size() +1));
postSnapshotFiles.removeAll(preSnapshotFiles);
List<Path> snapshotPathList = postSnapshotFiles.stream().collect(Collectors.toList());
@@ -158,7 +156,39 @@ public class DataSnapshotTest extends AAISetup {
List<String> fileContents = Files.readAllLines(snapshotPathList.get(0));
assertThat(fileContents.get(0), containsString("id"));
}
-
+
+ @Test
+ public void testTakeKryoSnapshotAndItShouldCreateASnapshotFileWithOneVertex() throws IOException, InterruptedException {
+
+ String logsFolder = System.getProperty("AJSC_HOME") + "/logs/data/dataSnapshots/";
+
+ Set<Path> preSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
+
+ // previous test may have the same generated file name, this wait will ensure a new name is used for this test
+ System.out.println("delay generation, seconds " + DELAYSINGLETHREADTEST);
+ Thread.sleep(DELAYSINGLETHREADTEST*100);
+ // Run the clear dataSnapshot and this time it should fail
+ //String [] args = {"JUST_TAKE_SNAPSHOT"}; >> default behavior is now to use 15 threads
+ // To just get one file, you have to tell it to just use one.
+ String [] args = {"-c","THREADED_SNAPSHOT", "-threadCount" ,"1", "-snapshotType" , "gryo"};
+
+ DataSnapshot.main(args);
+
+ // Add sleep so the file actually gets created with the data
+
+ Set<Path> postSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
+
+ assertThat(postSnapshotFiles.size(), is(preSnapshotFiles.size() +1));
+ boolean gryoSnapshotExists = postSnapshotFiles.stream()
+ .map(Path::toString)
+ .anyMatch(name -> name.endsWith("gryo"));
+ assertTrue(gryoSnapshotExists);
+ postSnapshotFiles.removeAll(preSnapshotFiles);
+ List<Path> snapshotPathList = postSnapshotFiles.stream().collect(Collectors.toList());
+
+ assertThat(snapshotPathList.size(), is(1));
+ }
+
@Test
public void testTakeSnapshotMultiAndItShouldCreateMultipleSnapshotFiles() throws IOException {
@@ -180,27 +210,27 @@ public class DataSnapshotTest extends AAISetup {
long totalVerts = 5000;
int threadCt = 15;
long maxNodesPerFile = 120000;
-
- int fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
+
+ int fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
maxNodesPerFile );
assertThat( fileCt, is(15));
-
+
totalVerts = 5000;
threadCt = 15;
maxNodesPerFile = 100;
- fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
+ fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
maxNodesPerFile );
assertThat( fileCt, is(60));
-
+
totalVerts = 1500;
threadCt = 15;
maxNodesPerFile = 100;
- fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
+ fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
maxNodesPerFile );
- assertThat( fileCt, is(15));
-
+ assertThat( fileCt, is(15));
+
}
-
+
@Test
public void testTakeSnapshotMultiWithDebugAndItShouldCreateMultipleSnapshotFiles() throws IOException {
@@ -224,7 +254,7 @@ public class DataSnapshotTest extends AAISetup {
// Run the clear dataSnapshot and this time it should fail
String [] args = {"-c","THREADED_SNAPSHOT", "-threadCount","foo","-debugFlag", "DEBUG"};
-
+
DataSnapshot.main(args);
// For this test if there is only one vertex in the graph, not sure if it will create multiple files