diff options
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java')
-rw-r--r-- | src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java | 248 |
1 files changed, 137 insertions, 111 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java index 8d250d7..9aba8cf 100644 --- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java +++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java @@ -40,7 +40,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.onap.aai.config.PropertyPasswordConfiguration; +import org.onap.aai.restclient.PropertyPasswordConfiguration; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.tinkerpop.gremlin.structure.io.IoCore; @@ -75,8 +75,8 @@ import com.beust.jcommander.ParameterException; public class DataSnapshot4HistInit { - private static Logger LOGGER; - + private static Logger LOGGER = LoggerFactory.getLogger(DataSnapshot4HistInit.class); + /* Using realtime d */ private static final String REALTIME_DB = "realtime"; @@ -109,6 +109,13 @@ public class DataSnapshot4HistInit { */ public static void main(String[] args) { + // Set the logging file properties to be used by EELFManager + System.setProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName()); + Properties props = System.getProperties(); + props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS); + props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG); + + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); PropertyPasswordConfiguration initializer = new PropertyPasswordConfiguration(); initializer.initialize(ctx); @@ -149,13 +156,6 @@ public class DataSnapshot4HistInit { public boolean executeCommand(String[] args) { - // Set the logging file properties to be used by EELFManager - System.setProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName()); - Properties props = System.getProperties(); - props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS); - props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG); - LOGGER = LoggerFactory.getLogger(DataSnapshot4HistInit.class); - Boolean dbClearFlag = false; JanusGraph graph = null; String command = "UNKNOWN"; @@ -576,19 +576,25 @@ public class DataSnapshot4HistInit { // NOTE - it will only use as many threads as the number of files the // snapshot is written to. Ie. if you have a single-file snapshot, // then this will be single-threaded. + // If the number of files is greater than the 'threadCount' parameter, + // then we will use more than one pass to keep the number of simultaneous + // threads below the threadCount param. // LOGGER.debug(" Command = " + command ); + if (cArgs.oldFileDir != null && cArgs.oldFileDir != ""){ targetDir = cArgs.oldFileDir; } ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false); int fCount = snapFilesArr.size(); + int threadPassesNeeded = (int) Math.ceil((double)fCount / (double)threadCount4Create); + int filesPerPass = (int) Math.ceil((double)fCount / (double)threadPassesNeeded); + JanusGraph graph1 = AAIGraph.getInstance().getGraph(); - GraphAdminDBUtils.logConfigs(graph1.configuration()); long timeStart = System.nanoTime(); - HashMap <String,String> old2NewVertIdMap = new <String,String> HashMap (); - HashMap <String,ArrayList<String>> nodeKeyNames = new <String,ArrayList<String>> HashMap (); - + GraphAdminDBUtils.logConfigs(graph1.configuration()); + HashMap <String,String> old2NewVertIdMap = new HashMap <String,String> (); + HashMap <String,ArrayList<String>> nodeKeyNames = new HashMap <String,ArrayList<String>> (); try { LOGGER.debug("call getNodeKeyNames ()" ); nodeKeyNames = getNodeKeyNames(); @@ -597,142 +603,162 @@ public class DataSnapshot4HistInit { ErrorLogHelper.logException(ae); AAISystemExitUtil.systemExitCloseAAIGraph(1); } + + ExecutorService executor = Executors.newFixedThreadPool(fCount); + int threadFailCount = 0; - // We're going to try loading in the vertices - without edges or properties - // using Separate threads + LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs + + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs + + ", maxErrorsPerThread = " + maxErrorsPerThread ); + + // -------------------------------------- + // Step 1 -- Load empty vertices + // -------------------------------------- + int fileNo = 0; + for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){ + List<Future<HashMap<String,String>>> listFutV = new ArrayList<Future<HashMap<String,String>>>(); - ExecutorService executor = Executors.newFixedThreadPool(fCount); - List<Future<HashMap<String,String>>> list = new ArrayList<Future<HashMap<String,String>>>(); - for( int i=0; i < fCount; i++ ){ - File f = snapFilesArr.get(i); + int thisPassCount = 0; + while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){ + File f = snapFilesArr.get(fileNo); String fname = f.getName(); String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname; Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit LOGGER.debug(" -- Read file: [" + fullSnapName + "]"); - LOGGER.debug(" -- Call the PartialVertexLoader to just load vertices ----"); - LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs - + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs - + ", maxErrorsPerThread = " + maxErrorsPerThread ); Callable <HashMap<String,String>> vLoader = new PartialVertexLoader(graph1, fullSnapName, vertAddDelayMs, failureDelayMs, retryDelayMs, maxErrorsPerThread, LOGGER); Future <HashMap<String,String>> future = (Future<HashMap<String, String>>) executor.submit(vLoader); - // add Future to the list, we can get return value using Future - list.add(future); - LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY thread # "+ i ); + // add future to the list, we can get return value later + listFutV.add(future); + LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY file # "+ fileNo + + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")"); + + thisPassCount++; + fileNo++; } - + int threadCount4Reload = 0; - int threadFailCount = 0; - for(Future<HashMap<String,String>> fut : list){ - threadCount4Reload++; - try { - old2NewVertIdMap.putAll(fut.get()); - LOGGER.debug(" -- back from PartialVertexLoader. returned thread # " + threadCount4Reload + - ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() ); - } - catch (InterruptedException e) { - threadFailCount++; - AAIException ae = new AAIException("AAI_6128", e , "InterruptedException"); - ErrorLogHelper.logException(ae); - } - catch (ExecutionException e) { - threadFailCount++; - AAIException ae = new AAIException("AAI_6128", e , "ExecutionException"); - ErrorLogHelper.logException(ae); - } - } - executor.shutdown(); - - if( threadFailCount > 0 ) { - String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. "; - LOGGER.debug(emsg); - throw new Exception( emsg ); + for(Future<HashMap<String,String>> fut : listFutV){ + threadCount4Reload++; + try { + old2NewVertIdMap.putAll(fut.get()); + LOGGER.debug(" -- back from PartialVertexLoader. returned pass # " + + passNo + ", thread # " + + threadCount4Reload + + ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() ); + } + catch (InterruptedException e) { + threadFailCount++; + AAIException ae = new AAIException("AAI_6128", e , "InterruptedException"); + ErrorLogHelper.logException(ae); + } + catch (ExecutionException e) { + threadFailCount++; + AAIException ae = new AAIException("AAI_6128", e , "ExecutionException"); + ErrorLogHelper.logException(ae); + } } + } // end of passes for loading empty vertices + + executor.shutdown(); + if( threadFailCount > 0 ) { + String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. "; + LOGGER.debug(emsg); + throw new Exception( emsg ); + } - long timeX = System.nanoTime(); - long diffTime = timeX - timeStart; - long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); - long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); - LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " + - minCount + " minutes, " + secCount + " seconds " ); + long timeX = System.nanoTime(); + long diffTime = timeX - timeStart; + long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); - // Give the DB a little time to chew on all those vertices - Thread.sleep(vertToEdgeProcDelay); + // Give the DB a little time to chew on all those new vertices + Thread.sleep(vertToEdgeProcDelay); - // ---------------------------------------------------------------------------------------- - LOGGER.debug("\n\n\n -- Now do the edges/props ----------------------"); - // ---------------------------------------------------------------------------------------- - // We're going to try loading in the edges and missing properties - // Note - we're passing the whole oldVid2newVid mapping to the PartialPropAndEdgeLoader - // so that the String-updates to the GraphSON will happen in the threads instead of - // here in the un-threaded calling method. - executor = Executors.newFixedThreadPool(fCount); - ArrayList<Future<ArrayList<String>>> listEdg = new ArrayList<Future<ArrayList<String>>>(); - for( int i=0; i < fCount; i++ ){ - File f = snapFilesArr.get(i); + // ------------------------------------------------------------- + // Step 2 -- Load Edges and properties onto the empty vertices + // ------------------------------------------------------------- + LOGGER.debug("\n\n\n -- Now load the edges/properties ----------------------"); + executor = Executors.newFixedThreadPool(fCount); + + fileNo = 0; + for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){ + ArrayList<Future<ArrayList<String>>> listFutEdg = new ArrayList<Future<ArrayList<String>>>(); + + int thisPassCount = 0; + while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){ + File f = snapFilesArr.get(fileNo); String fname = f.getName(); String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname; Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit LOGGER.debug(" -- Read file: [" + fullSnapName + "]"); - LOGGER.debug(" -- Call the PartialPropAndEdgeLoader4HistInit for Properties and EDGEs ----"); - LOGGER.debug(" -- edgeAddDelayMs = " + vertAddDelayMs - + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs - + ", maxErrorsPerThread = " + maxErrorsPerThread ); - - - Callable eLoader = new PartialPropAndEdgeLoader4HistInit(graph1, fullSnapName, + Callable eLoader = new PartialPropAndEdgeLoader4HistInit(graph1, fullSnapName, edgeAddDelayMs, failureDelayMs, retryDelayMs, old2NewVertIdMap, maxErrorsPerThread, LOGGER, scriptStartTime, nodeKeyNames); + Future <ArrayList<String>> future = (Future<ArrayList<String>>) executor.submit(eLoader); - //add Future to the list, we can get return value using Future - listEdg.add(future); - LOGGER.debug(" -- Starting PartialPropAndEdge thread # "+ i ); + // add future to the list, we can wait for it below + listFutEdg.add(future); + LOGGER.debug(" -- Starting PartialPropAndEdgeLoader4HistInit file # " + + fileNo + " (pass # " + passNo + ", passIndex " + + thisPassCount + ")" ); + + thisPassCount++; + fileNo++; } - threadCount4Reload = 0; - for(Future<ArrayList<String>> fut : listEdg){ - threadCount4Reload++; - try{ - fut.get(); // DEBUG -- should be doing something with the return value if it's not empty - ie. errors - LOGGER.debug(" -- back from PartialPropAndEdgeLoader. thread # " + threadCount4Reload ); - } + + int threadCount4Reload = 0; + for( Future<ArrayList<String>> fut : listFutEdg ){ + threadCount4Reload++; + try{ + fut.get(); + LOGGER.debug(" -- back from PartialPropAndEdgeLoader4HistInit. pass # " + + passNo + ", thread # " + threadCount4Reload ); + } catch (InterruptedException e) { threadFailCount++; AAIException ae = new AAIException("AAI_6128", e , "InterruptedException"); - ErrorLogHelper.logException(ae); + ErrorLogHelper.logException(ae); } catch (ExecutionException e) { threadFailCount++; AAIException ae = new AAIException("AAI_6128", e , "ExecutionException"); - ErrorLogHelper.logException(ae); + ErrorLogHelper.logException(ae); } } + + } // end of passes for reloading edges and properties - executor.shutdown(); - if( threadFailCount > 0 ) { - String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. "; - LOGGER.debug(emsg); - throw new Exception( emsg ); - } + executor.shutdown(); - // This is needed so we can see the data committed by the called threads - graph1.tx().commit(); + if( threadFailCount > 0 ) { + String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. "; + LOGGER.debug(emsg); + throw new Exception( emsg ); + } - long timeEnd = System.nanoTime(); - diffTime = timeEnd - timeX; - minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); - secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); - LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " + - minCount + " minutes, " + secCount + " seconds " ); + // This is needed so we can see the data committed by the called threads + graph1.tx().commit(); - long totalDiffTime = timeEnd - timeStart; - long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime); - long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount); - LOGGER.debug(" -- TOTAL multi-threaded reload time: " + - totalMinCount + " minutes, " + totalSecCount + " seconds " ); + long timeEnd = System.nanoTime(); + diffTime = timeEnd - timeX; + minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime); + secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount); + LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " + + minCount + " minutes, " + secCount + " seconds " ); + + long totalDiffTime = timeEnd - timeStart; + long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime); + long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount); + LOGGER.debug(" -- TOTAL multi-threaded reload time: " + + totalMinCount + " minutes, " + totalSecCount + " seconds " ); + } else if (command.equals("CLEAR_ENTIRE_DATABASE")) { // ------------------------------------------------------------------ // They are calling this to clear the db before re-loading it @@ -1075,4 +1101,4 @@ public class DataSnapshot4HistInit { } -} +}
\ No newline at end of file |