aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java')
-rw-r--r--src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java227
1 files changed, 126 insertions, 101 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
index 217d6c0..946489b 100644
--- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
+++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
@@ -317,7 +317,7 @@ public class DataSnapshot {
}
else if( command.equals("MULTITHREAD_RELOAD") ){
// Note - this will use as many threads as the snapshot file is
- // broken up into. (up to a limit)
+ // broken up into. (up to a limit - whatever the 'threadCount' variable is set to)
if (args.length >= 2) {
// Since they are re-loading, they need to pass the snapshot file name to use.
// We expected the file to be found in our snapshot directory. Note - if
@@ -357,8 +357,14 @@ public class DataSnapshot {
}
}
-
- threadCount4Create = cArgs.threadCount;
+ try {
+ threadCount4Create = cArgs.threadCount;
+ }
+ catch ( NumberFormatException nfe ){
+ ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount found for DataSnapshot [" + cArgs.threadCount + "]");
+ LOGGER.debug("Bad (non-integer) threadCount found by DataSnapshot [" + cArgs.threadCount + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
maxNodesPerFile4Create = cArgs.maxNodesPerFile;
//Print Defaults
LOGGER.debug("DataSnapshot command is [" + cArgs.command + "]");
@@ -375,8 +381,7 @@ public class DataSnapshot {
LOGGER.debug("VertToEdgeProcDelay is [" + cArgs.vertToEdgeProcDelay + "]");
LOGGER.debug("StaggerThreadDelay is [" + cArgs.staggerThreadDelay + "]");
LOGGER.debug("Caller process is ["+ cArgs.caller + "]");
-
-
+
//Print non-default values
if (!AAIConfig.isEmpty(cArgs.fileName)){
LOGGER.debug("Snapshot file name (if not default) to use is [" + cArgs.fileName + "]");
@@ -553,6 +558,9 @@ public class DataSnapshot {
// NOTE - it will only use as many threads as the number of files the
// snapshot is written to. Ie. if you have a single-file snapshot,
// then this will be single-threaded.
+ // If the number of files is greater than the 'threadCount' parameter,
+ // then we will use more than one pass to keep the number of simultaneous
+ // threads below the threadCount param.
//
LOGGER.debug(" Command = " + command );
@@ -561,114 +569,129 @@ public class DataSnapshot {
}
ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false);
int fCount = snapFilesArr.size();
+ int threadPassesNeeded = (int) Math.ceil((double)fCount / (double)threadCount4Create);
+ int filesPerPass = (int) Math.ceil((double)fCount / (double)threadPassesNeeded);
+
JanusGraph graph1 = AAIGraph.getInstance().getGraph();
long timeStart = System.nanoTime();
HashMap <String,String> old2NewVertIdMap = new HashMap <String,String> ();
- // We're going to try loading in the vertices - without edges or properties
- // using Separate threads
-
- ExecutorService executor = Executors.newFixedThreadPool(fCount);
- List<Future<HashMap<String,String>>> list = new ArrayList<Future<HashMap<String,String>>>();
-
- for( int i=0; i < fCount; i++ ){
- File f = snapFilesArr.get(i);
+ ExecutorService executor = Executors.newFixedThreadPool(fCount);
+ int threadFailCount = 0;
+
+ LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
+ + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
+ + ", maxErrorsPerThread = " + maxErrorsPerThread );
+
+ // --------------------------------------
+ // Step 1 -- Load empty vertices
+ // --------------------------------------
+ int fileNo = 0;
+ for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+ List<Future<HashMap<String,String>>> listFutV = new ArrayList<Future<HashMap<String,String>>>();
+
+ int thisPassCount = 0;
+ while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){
+ File f = snapFilesArr.get(fileNo);
String fname = f.getName();
String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname;
Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit
LOGGER.debug(" -- Read file: [" + fullSnapName + "]");
- LOGGER.debug(" -- Call the PartialVertexLoader to just load vertices ----");
- LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
- + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
- + ", maxErrorsPerThread = " + maxErrorsPerThread );
Callable <HashMap<String,String>> vLoader = new PartialVertexLoader(graph1, fullSnapName,
vertAddDelayMs, failureDelayMs, retryDelayMs, maxErrorsPerThread, LOGGER);
Future <HashMap<String,String>> future = (Future<HashMap<String, String>>) executor.submit(vLoader);
- // add Future to the list, we can get return value using Future
- list.add(future);
- LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY thread # "+ i );
+ // add future to the list, we can get return value later
+ listFutV.add(future);
+ LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY file # "+ fileNo
+ + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")");
+
+ thisPassCount++;
+ fileNo++;
}
-
+
int threadCount4Reload = 0;
- int threadFailCount = 0;
- for(Future<HashMap<String,String>> fut : list){
- threadCount4Reload++;
- try {
- old2NewVertIdMap.putAll(fut.get());
- LOGGER.debug(" -- back from PartialVertexLoader. returned thread # " + threadCount4Reload +
- ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
- }
- catch (InterruptedException e) {
- threadFailCount++;
- AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
+ for(Future<HashMap<String,String>> fut : listFutV){
+ threadCount4Reload++;
+ try {
+ old2NewVertIdMap.putAll(fut.get());
+ LOGGER.debug(" -- back from PartialVertexLoader. returned pass # "
+ + passNo + ", thread # "
+ + threadCount4Reload +
+ ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
+ }
+ catch (InterruptedException e) {
+ threadFailCount++;
+ AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
ErrorLogHelper.logException(ae);
- }
- catch (ExecutionException e) {
- threadFailCount++;
- AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
+ }
+ catch (ExecutionException e) {
+ threadFailCount++;
+ AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
ErrorLogHelper.logException(ae);
- }
- }
-
- executor.shutdown();
-
- if( threadFailCount > 0 ) {
- String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. ";
- LOGGER.debug(emsg);
- throw new Exception( emsg );
+ }
}
+ } // end of passes for loading empty vertices
+
+ executor.shutdown();
+ if( threadFailCount > 0 ) {
+ String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. ";
+ LOGGER.debug(emsg);
+ throw new Exception( emsg );
+ }
- long timeX = System.nanoTime();
- long diffTime = timeX - timeStart;
- long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
- long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
- LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " +
- minCount + " minutes, " + secCount + " seconds " );
-
- // Give the DB a little time to chew on all those vertices
- Thread.sleep(vertToEdgeProcDelay);
-
- // ----------------------------------------------------------------------------------------
- LOGGER.debug("\n\n\n -- Now do the edges/props ----------------------");
- // ----------------------------------------------------------------------------------------
+ long timeX = System.nanoTime();
+ long diffTime = timeX - timeStart;
+ long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " +
+ minCount + " minutes, " + secCount + " seconds " );
+ // Give the DB a little time to chew on all those new vertices
+ Thread.sleep(vertToEdgeProcDelay);
- // We're going to try loading in the edges and missing properties
- // Note - we're passing the whole oldVid2newVid mapping to the PartialPropAndEdgeLoader
- // so that the String-updates to the GraphSON will happen in the threads instead of
- // here in the un-threaded calling method.
- executor = Executors.newFixedThreadPool(fCount);
- ArrayList<Future<ArrayList<String>>> listEdg = new ArrayList<Future<ArrayList<String>>>();
- for( int i=0; i < fCount; i++ ){
- File f = snapFilesArr.get(i);
+
+ // -------------------------------------------------------------
+ // Step 2 -- Load Edges and properties onto the empty vertices
+ // -------------------------------------------------------------
+ LOGGER.debug("\n\n\n -- Now load the edges/properties ----------------------");
+ executor = Executors.newFixedThreadPool(fCount);
+
+ fileNo = 0;
+ for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+ ArrayList<Future<ArrayList<String>>> listFutEdg = new ArrayList<Future<ArrayList<String>>>();
+
+ int thisPassCount = 0;
+ while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){
+ File f = snapFilesArr.get(fileNo);
String fname = f.getName();
String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname;
Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit
LOGGER.debug(" -- Read file: [" + fullSnapName + "]");
- LOGGER.debug(" -- Call the PartialPropAndEdgeLoader for Properties and EDGEs ----");
- LOGGER.debug(" -- edgeAddDelayMs = " + vertAddDelayMs
- + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
- + ", maxErrorsPerThread = " + maxErrorsPerThread );
-
- Callable eLoader = new PartialPropAndEdgeLoader(graph1, fullSnapName,
+ Callable eLoader = new PartialPropAndEdgeLoader(graph1, fullSnapName,
edgeAddDelayMs, failureDelayMs, retryDelayMs,
old2NewVertIdMap, maxErrorsPerThread, LOGGER);
Future <ArrayList<String>> future = (Future<ArrayList<String>>) executor.submit(eLoader);
- //add Future to the list, we can get return value using Future
- listEdg.add(future);
- LOGGER.debug(" -- Starting PartialPropAndEdge thread # "+ i );
+ // add future to the list, we can wait for it below
+ listFutEdg.add(future);
+ LOGGER.debug(" -- Starting PartialPropAndEdge file # "
+ + fileNo + " (pass # " + passNo + ", passIndex "
+ + thisPassCount + ")" );
+
+ thisPassCount++;
+ fileNo++;
}
- threadCount4Reload = 0;
- for(Future<ArrayList<String>> fut : listEdg){
- threadCount4Reload++;
- try{
- fut.get(); // DEBUG -- should be doing something with the return value if it's not empty - ie. errors
- LOGGER.debug(" -- back from PartialPropAndEdgeLoader. thread # " + threadCount4Reload );
- }
+ int threadCount4Reload = 0;
+ for( Future<ArrayList<String>> fut : listFutEdg ){
+ threadCount4Reload++;
+ try{
+ fut.get();
+ LOGGER.debug(" -- back from PartialPropAndEdgeLoader. pass # "
+ + passNo + ", thread # " + threadCount4Reload );
+ }
catch (InterruptedException e) {
threadFailCount++;
AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
@@ -680,30 +703,32 @@ public class DataSnapshot {
ErrorLogHelper.logException(ae);
}
}
+
+ } // end of passes for reloading edges and properties
- executor.shutdown();
+ executor.shutdown();
- if( threadFailCount > 0 ) {
- String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. ";
- LOGGER.debug(emsg);
- throw new Exception( emsg );
- }
+ if( threadFailCount > 0 ) {
+ String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. ";
+ LOGGER.debug(emsg);
+ throw new Exception( emsg );
+ }
- // This is needed so we can see the data committed by the called threads
- graph1.tx().commit();
+ // This is needed so we can see the data committed by the called threads
+ graph1.tx().commit();
- long timeEnd = System.nanoTime();
- diffTime = timeEnd - timeX;
- minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
- secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
- LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " +
- minCount + " minutes, " + secCount + " seconds " );
+ long timeEnd = System.nanoTime();
+ diffTime = timeEnd - timeX;
+ minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " +
+ minCount + " minutes, " + secCount + " seconds " );
- long totalDiffTime = timeEnd - timeStart;
- long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime);
- long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount);
- LOGGER.debug(" -- TOTAL multi-threaded reload time: " +
- totalMinCount + " minutes, " + totalSecCount + " seconds " );
+ long totalDiffTime = timeEnd - timeStart;
+ long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime);
+ long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount);
+ LOGGER.debug(" -- TOTAL multi-threaded reload time: " +
+ totalMinCount + " minutes, " + totalSecCount + " seconds " );
} else if (command.equals("CLEAR_ENTIRE_DATABASE")) {
// ------------------------------------------------------------------