diff options
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java')
-rw-r--r-- | src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java | 227 |
1 files changed, 126 insertions, 101 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java index 217d6c0..946489b 100644 --- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java +++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java @@ -317,7 +317,7 @@ public class DataSnapshot { } else if( command.equals("MULTITHREAD_RELOAD") ){ // Note - this will use as many threads as the snapshot file is - // broken up into. (up to a limit) + // broken up into. (up to a limit - whatever the 'threadCount' variable is set to) if (args.length >= 2) { // Since they are re-loading, they need to pass the snapshot file name to use. // We expected the file to be found in our snapshot directory. Note - if @@ -357,8 +357,14 @@ public class DataSnapshot { } } - - threadCount4Create = cArgs.threadCount; + try { + threadCount4Create = cArgs.threadCount; + } + catch ( NumberFormatException nfe ){ + ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount found for DataSnapshot [" + cArgs.threadCount + "]"); + LOGGER.debug("Bad (non-integer) threadCount found by DataSnapshot [" + cArgs.threadCount + "]"); + AAISystemExitUtil.systemExitCloseAAIGraph(1); + } maxNodesPerFile4Create = cArgs.maxNodesPerFile; //Print Defaults LOGGER.debug("DataSnapshot command is [" + cArgs.command + "]"); @@ -375,8 +381,7 @@ public class DataSnapshot { LOGGER.debug("VertToEdgeProcDelay is [" + cArgs.vertToEdgeProcDelay + "]"); LOGGER.debug("StaggerThreadDelay is [" + cArgs.staggerThreadDelay + "]"); LOGGER.debug("Caller process is ["+ cArgs.caller + "]"); - - + //Print non-default values if (!AAIConfig.isEmpty(cArgs.fileName)){ LOGGER.debug("Snapshot file name (if not default) to use is [" + cArgs.fileName + "]"); @@ -553,6 +558,9 @@ public class DataSnapshot { // NOTE - it will only use as many threads as the number of files the // snapshot is written to. Ie. if you have a single-file snapshot, // then this will be single-threaded. + // If the number of files is greater than the 'threadCount' parameter, + // then we will use more than one pass to keep the number of simultaneous + // threads below the threadCount param. // LOGGER.debug(" Command = " + command ); @@ -561,114 +569,129 @@ public class DataSnapshot { } ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false); int fCount = snapFilesArr.size(); + int threadPassesNeeded = (int) Math.ceil((double)fCount / (double)threadCount4Create); + int filesPerPass = (int) Math.ceil((double)fCount / (double)threadPassesNeeded); + JanusGraph graph1 = AAIGraph.getInstance().getGraph(); long timeStart = System.nanoTime(); HashMap <String,String> old2NewVertIdMap = new HashMap <String,String> (); - // We're going to try loading in the vertices - without edges or properties - // using Separate threads - - ExecutorService executor = Executors.newFixedThreadPool(fCount); - List<Future<HashMap<String,String>>> list = new ArrayList<Future<HashMap<String,String>>>(); - - for( int i=0; i < fCount; i++ ){ - File f = snapFilesArr.get(i); + ExecutorService executor = Executors.newFixedThreadPool(fCount); + int threadFailCount = 0; + + LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs + + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs + + ", maxErrorsPerThread = " + maxErrorsPerThread ); + + // -------------------------------------- + // Step 1 -- Load empty vertices + // -------------------------------------- + int fileNo = 0; + for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){ + List<Future<HashMap<String,String>>> listFutV = new ArrayList<Future<HashMap<String,String>>>(); + + int thisPassCount = 0; + while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){ + File f = snapFilesArr.get(fileNo); String fname = f.getName(); String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname; Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit LOGGER.debug(" -- Read file: [" + fullSnapName + "]"); - LOGGER.debug(" -- Call the PartialVertexLoader to just load vertices ----"); - LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs - + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs - + ", maxErrorsPerThread = " + maxErrorsPerThread ); Callable <HashMap<String,String>> vLoader = new PartialVertexLoader(graph1, fullSnapName, vertAddDelayMs, failureDelayMs, retryDelayMs, maxErrorsPerThread, LOGGER); Future <HashMap<String,String>> future = (Future<HashMap<String, String>>) executor.submit(vLoader); - // add Future to the list, we can get return value using Future - list.add(future); - LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY thread # "+ i ); + // add future to the list, we can get return value later + listFutV.add(future); + LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY file # "+ fileNo + + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")"); + + thisPassCount++; + fileNo++; } - + int threadCount4Reload = 0; - int threadFailCount = 0; - for(Future<HashMap<String,String>> fut : list){ - threadCount4Reload++; - try { - old2NewVertIdMap.putAll(fut.get()); - LOGGER.debug(" -- back from PartialVertexLoader. returned thread # " + threadCount4Reload + - ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() ); - } - catch (InterruptedException e) { - threadFailCount++; - AAIException ae = new AAIException("AAI_6128", e , "InterruptedException"); + for(Future<HashMap<String,String>> fut : listFutV){ + threadCount4Reload++; + try { + old2NewVertIdMap.putAll(fut.get()); + LOGGER.debug(" -- back from PartialVertexLoader. returned pass # " + + passNo + ", thread # " + + threadCount4Reload + + ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() ); + } + catch (InterruptedException e) { + threadFailCount++; + AAIException ae = new AAIException("AAI_6128", e , "InterruptedException"); ErrorLogHelper.logException(ae); - } - catch (ExecutionException e) { - threadFailCount++; - AAIException ae = new AAIException("AAI_6128", e , "ExecutionException"); + } + catch (ExecutionException e) { + threadFailCount++; + AAIException ae = new AAIException("AAI_6128", e , "ExecutionException"); ErrorLogHelper.logException(ae); - } - } - - executor.shutdown(); - - if( threadFailCount > 0 ) { - String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. "; - LOGGER.debug(emsg); - throw new Exception( emsg ); + } } + } // end of passes for loading empty vertices + + executor.shutdown(); + if( threadFailCount > 0 ) { + String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. "; + LOGGER.debug(emsg); + throw new Exception( emsg ); + } - long timeX = System.nanoTime(); - long diffTime = timeX - timeStart; - long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); - long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); - LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " + - minCount + " minutes, " + secCount + " seconds " ); - - // Give the DB a little time to chew on all those vertices - Thread.sleep(vertToEdgeProcDelay); - - // ---------------------------------------------------------------------------------------- - LOGGER.debug("\n\n\n -- Now do the edges/props ----------------------"); - // ---------------------------------------------------------------------------------------- + long timeX = System.nanoTime(); + long diffTime = timeX - timeStart; + long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); + // Give the DB a little time to chew on all those new vertices + Thread.sleep(vertToEdgeProcDelay); - // We're going to try loading in the edges and missing properties - // Note - we're passing the whole oldVid2newVid mapping to the PartialPropAndEdgeLoader - // so that the String-updates to the GraphSON will happen in the threads instead of - // here in the un-threaded calling method. - executor = Executors.newFixedThreadPool(fCount); - ArrayList<Future<ArrayList<String>>> listEdg = new ArrayList<Future<ArrayList<String>>>(); - for( int i=0; i < fCount; i++ ){ - File f = snapFilesArr.get(i); + + // ------------------------------------------------------------- + // Step 2 -- Load Edges and properties onto the empty vertices + // ------------------------------------------------------------- + LOGGER.debug("\n\n\n -- Now load the edges/properties ----------------------"); + executor = Executors.newFixedThreadPool(fCount); + + fileNo = 0; + for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){ + ArrayList<Future<ArrayList<String>>> listFutEdg = new ArrayList<Future<ArrayList<String>>>(); + + int thisPassCount = 0; + while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){ + File f = snapFilesArr.get(fileNo); String fname = f.getName(); String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname; Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit LOGGER.debug(" -- Read file: [" + fullSnapName + "]"); - LOGGER.debug(" -- Call the PartialPropAndEdgeLoader for Properties and EDGEs ----"); - LOGGER.debug(" -- edgeAddDelayMs = " + vertAddDelayMs - + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs - + ", maxErrorsPerThread = " + maxErrorsPerThread ); - - Callable eLoader = new PartialPropAndEdgeLoader(graph1, fullSnapName, + Callable eLoader = new PartialPropAndEdgeLoader(graph1, fullSnapName, edgeAddDelayMs, failureDelayMs, retryDelayMs, old2NewVertIdMap, maxErrorsPerThread, LOGGER); Future <ArrayList<String>> future = (Future<ArrayList<String>>) executor.submit(eLoader); - //add Future to the list, we can get return value using Future - listEdg.add(future); - LOGGER.debug(" -- Starting PartialPropAndEdge thread # "+ i ); + // add future to the list, we can wait for it below + listFutEdg.add(future); + LOGGER.debug(" -- Starting PartialPropAndEdge file # " + + fileNo + " (pass # " + passNo + ", passIndex " + + thisPassCount + ")" ); + + thisPassCount++; + fileNo++; } - threadCount4Reload = 0; - for(Future<ArrayList<String>> fut : listEdg){ - threadCount4Reload++; - try{ - fut.get(); // DEBUG -- should be doing something with the return value if it's not empty - ie. errors - LOGGER.debug(" -- back from PartialPropAndEdgeLoader. thread # " + threadCount4Reload ); - } + int threadCount4Reload = 0; + for( Future<ArrayList<String>> fut : listFutEdg ){ + threadCount4Reload++; + try{ + fut.get(); + LOGGER.debug(" -- back from PartialPropAndEdgeLoader. pass # " + + passNo + ", thread # " + threadCount4Reload ); + } catch (InterruptedException e) { threadFailCount++; AAIException ae = new AAIException("AAI_6128", e , "InterruptedException"); @@ -680,30 +703,32 @@ public class DataSnapshot { ErrorLogHelper.logException(ae); } } + + } // end of passes for reloading edges and properties - executor.shutdown(); + executor.shutdown(); - if( threadFailCount > 0 ) { - String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. "; - LOGGER.debug(emsg); - throw new Exception( emsg ); - } + if( threadFailCount > 0 ) { + String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. "; + LOGGER.debug(emsg); + throw new Exception( emsg ); + } - // This is needed so we can see the data committed by the called threads - graph1.tx().commit(); + // This is needed so we can see the data committed by the called threads + graph1.tx().commit(); - long timeEnd = System.nanoTime(); - diffTime = timeEnd - timeX; - minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); - secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); - LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " + - minCount + " minutes, " + secCount + " seconds " ); + long timeEnd = System.nanoTime(); + diffTime = timeEnd - timeX; + minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); - long totalDiffTime = timeEnd - timeStart; - long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime); - long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount); - LOGGER.debug(" -- TOTAL multi-threaded reload time: " + - totalMinCount + " minutes, " + totalSecCount + " seconds " ); + long totalDiffTime = timeEnd - timeStart; + long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime); + long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount); + LOGGER.debug(" -- TOTAL multi-threaded reload time: " + + totalMinCount + " minutes, " + totalSecCount + " seconds " ); } else if (command.equals("CLEAR_ENTIRE_DATABASE")) { // ------------------------------------------------------------------ |