aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java')
-rw-r--r--src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java248
1 files changed, 137 insertions, 111 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java
index 8d250d7..9aba8cf 100644
--- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java
+++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot4HistInit.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.onap.aai.config.PropertyPasswordConfiguration;
+import org.onap.aai.restclient.PropertyPasswordConfiguration;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.tinkerpop.gremlin.structure.io.IoCore;
@@ -75,8 +75,8 @@ import com.beust.jcommander.ParameterException;
public class DataSnapshot4HistInit {
- private static Logger LOGGER;
-
+ private static Logger LOGGER = LoggerFactory.getLogger(DataSnapshot4HistInit.class);
+
/* Using realtime d */
private static final String REALTIME_DB = "realtime";
@@ -109,6 +109,13 @@ public class DataSnapshot4HistInit {
*/
public static void main(String[] args) {
+ // Set the logging file properties to be used by EELFManager
+ System.setProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName());
+ Properties props = System.getProperties();
+ props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS);
+ props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
+
+
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
PropertyPasswordConfiguration initializer = new PropertyPasswordConfiguration();
initializer.initialize(ctx);
@@ -149,13 +156,6 @@ public class DataSnapshot4HistInit {
public boolean executeCommand(String[] args) {
- // Set the logging file properties to be used by EELFManager
- System.setProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName());
- Properties props = System.getProperties();
- props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS);
- props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
- LOGGER = LoggerFactory.getLogger(DataSnapshot4HistInit.class);
-
Boolean dbClearFlag = false;
JanusGraph graph = null;
String command = "UNKNOWN";
@@ -576,19 +576,25 @@ public class DataSnapshot4HistInit {
// NOTE - it will only use as many threads as the number of files the
// snapshot is written to. Ie. if you have a single-file snapshot,
// then this will be single-threaded.
+ // If the number of files is greater than the 'threadCount' parameter,
+ // then we will use more than one pass to keep the number of simultaneous
+ // threads below the threadCount param.
//
LOGGER.debug(" Command = " + command );
+
if (cArgs.oldFileDir != null && cArgs.oldFileDir != ""){
targetDir = cArgs.oldFileDir;
}
ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false);
int fCount = snapFilesArr.size();
+ int threadPassesNeeded = (int) Math.ceil((double)fCount / (double)threadCount4Create);
+ int filesPerPass = (int) Math.ceil((double)fCount / (double)threadPassesNeeded);
+
JanusGraph graph1 = AAIGraph.getInstance().getGraph();
- GraphAdminDBUtils.logConfigs(graph1.configuration());
long timeStart = System.nanoTime();
- HashMap <String,String> old2NewVertIdMap = new <String,String> HashMap ();
- HashMap <String,ArrayList<String>> nodeKeyNames = new <String,ArrayList<String>> HashMap ();
-
+ GraphAdminDBUtils.logConfigs(graph1.configuration());
+ HashMap <String,String> old2NewVertIdMap = new HashMap <String,String> ();
+ HashMap <String,ArrayList<String>> nodeKeyNames = new HashMap <String,ArrayList<String>> ();
try {
LOGGER.debug("call getNodeKeyNames ()" );
nodeKeyNames = getNodeKeyNames();
@@ -597,142 +603,162 @@ public class DataSnapshot4HistInit {
ErrorLogHelper.logException(ae);
AAISystemExitUtil.systemExitCloseAAIGraph(1);
}
+
+ ExecutorService executor = Executors.newFixedThreadPool(fCount);
+ int threadFailCount = 0;
- // We're going to try loading in the vertices - without edges or properties
- // using Separate threads
+ LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
+ + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
+ + ", maxErrorsPerThread = " + maxErrorsPerThread );
+
+ // --------------------------------------
+ // Step 1 -- Load empty vertices
+ // --------------------------------------
+ int fileNo = 0;
+ for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+ List<Future<HashMap<String,String>>> listFutV = new ArrayList<Future<HashMap<String,String>>>();
- ExecutorService executor = Executors.newFixedThreadPool(fCount);
- List<Future<HashMap<String,String>>> list = new ArrayList<Future<HashMap<String,String>>>();
- for( int i=0; i < fCount; i++ ){
- File f = snapFilesArr.get(i);
+ int thisPassCount = 0;
+ while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){
+ File f = snapFilesArr.get(fileNo);
String fname = f.getName();
String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname;
Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit
LOGGER.debug(" -- Read file: [" + fullSnapName + "]");
- LOGGER.debug(" -- Call the PartialVertexLoader to just load vertices ----");
- LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
- + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
- + ", maxErrorsPerThread = " + maxErrorsPerThread );
Callable <HashMap<String,String>> vLoader = new PartialVertexLoader(graph1, fullSnapName,
vertAddDelayMs, failureDelayMs, retryDelayMs, maxErrorsPerThread, LOGGER);
Future <HashMap<String,String>> future = (Future<HashMap<String, String>>) executor.submit(vLoader);
- // add Future to the list, we can get return value using Future
- list.add(future);
- LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY thread # "+ i );
+ // add future to the list, we can get return value later
+ listFutV.add(future);
+ LOGGER.debug(" -- Starting PartialDbLoad VERT_ONLY file # "+ fileNo
+ + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")");
+
+ thisPassCount++;
+ fileNo++;
}
-
+
int threadCount4Reload = 0;
- int threadFailCount = 0;
- for(Future<HashMap<String,String>> fut : list){
- threadCount4Reload++;
- try {
- old2NewVertIdMap.putAll(fut.get());
- LOGGER.debug(" -- back from PartialVertexLoader. returned thread # " + threadCount4Reload +
- ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
- }
- catch (InterruptedException e) {
- threadFailCount++;
- AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
- ErrorLogHelper.logException(ae);
- }
- catch (ExecutionException e) {
- threadFailCount++;
- AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
- ErrorLogHelper.logException(ae);
- }
- }
- executor.shutdown();
-
- if( threadFailCount > 0 ) {
- String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. ";
- LOGGER.debug(emsg);
- throw new Exception( emsg );
+ for(Future<HashMap<String,String>> fut : listFutV){
+ threadCount4Reload++;
+ try {
+ old2NewVertIdMap.putAll(fut.get());
+ LOGGER.debug(" -- back from PartialVertexLoader. returned pass # "
+ + passNo + ", thread # "
+ + threadCount4Reload +
+ ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
+ }
+ catch (InterruptedException e) {
+ threadFailCount++;
+ AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
+ ErrorLogHelper.logException(ae);
+ }
+ catch (ExecutionException e) {
+ threadFailCount++;
+ AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
+ ErrorLogHelper.logException(ae);
+ }
}
+ } // end of passes for loading empty vertices
+
+ executor.shutdown();
+ if( threadFailCount > 0 ) {
+ String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully. ";
+ LOGGER.debug(emsg);
+ throw new Exception( emsg );
+ }
- long timeX = System.nanoTime();
- long diffTime = timeX - timeStart;
- long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
- long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
- LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " +
- minCount + " minutes, " + secCount + " seconds " );
+ long timeX = System.nanoTime();
+ long diffTime = timeX - timeStart;
+ long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" -- To reload just the vertex ids from the snapshot files, it took: " +
+ minCount + " minutes, " + secCount + " seconds " );
- // Give the DB a little time to chew on all those vertices
- Thread.sleep(vertToEdgeProcDelay);
+ // Give the DB a little time to chew on all those new vertices
+ Thread.sleep(vertToEdgeProcDelay);
- // ----------------------------------------------------------------------------------------
- LOGGER.debug("\n\n\n -- Now do the edges/props ----------------------");
- // ----------------------------------------------------------------------------------------
- // We're going to try loading in the edges and missing properties
- // Note - we're passing the whole oldVid2newVid mapping to the PartialPropAndEdgeLoader
- // so that the String-updates to the GraphSON will happen in the threads instead of
- // here in the un-threaded calling method.
- executor = Executors.newFixedThreadPool(fCount);
- ArrayList<Future<ArrayList<String>>> listEdg = new ArrayList<Future<ArrayList<String>>>();
- for( int i=0; i < fCount; i++ ){
- File f = snapFilesArr.get(i);
+ // -------------------------------------------------------------
+ // Step 2 -- Load Edges and properties onto the empty vertices
+ // -------------------------------------------------------------
+ LOGGER.debug("\n\n\n -- Now load the edges/properties ----------------------");
+ executor = Executors.newFixedThreadPool(fCount);
+
+ fileNo = 0;
+ for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+ ArrayList<Future<ArrayList<String>>> listFutEdg = new ArrayList<Future<ArrayList<String>>>();
+
+ int thisPassCount = 0;
+ while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){
+ File f = snapFilesArr.get(fileNo);
String fname = f.getName();
String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname;
Thread.sleep(cArgs.staggerThreadDelay); // Stagger the threads a bit
LOGGER.debug(" -- Read file: [" + fullSnapName + "]");
- LOGGER.debug(" -- Call the PartialPropAndEdgeLoader4HistInit for Properties and EDGEs ----");
- LOGGER.debug(" -- edgeAddDelayMs = " + vertAddDelayMs
- + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
- + ", maxErrorsPerThread = " + maxErrorsPerThread );
-
-
- Callable eLoader = new PartialPropAndEdgeLoader4HistInit(graph1, fullSnapName,
+ Callable eLoader = new PartialPropAndEdgeLoader4HistInit(graph1, fullSnapName,
edgeAddDelayMs, failureDelayMs, retryDelayMs,
old2NewVertIdMap, maxErrorsPerThread, LOGGER,
scriptStartTime, nodeKeyNames);
+
Future <ArrayList<String>> future = (Future<ArrayList<String>>) executor.submit(eLoader);
- //add Future to the list, we can get return value using Future
- listEdg.add(future);
- LOGGER.debug(" -- Starting PartialPropAndEdge thread # "+ i );
+ // add future to the list, we can wait for it below
+ listFutEdg.add(future);
+ LOGGER.debug(" -- Starting PartialPropAndEdgeLoader4HistInit file # "
+ + fileNo + " (pass # " + passNo + ", passIndex "
+ + thisPassCount + ")" );
+
+ thisPassCount++;
+ fileNo++;
}
- threadCount4Reload = 0;
- for(Future<ArrayList<String>> fut : listEdg){
- threadCount4Reload++;
- try{
- fut.get(); // DEBUG -- should be doing something with the return value if it's not empty - ie. errors
- LOGGER.debug(" -- back from PartialPropAndEdgeLoader. thread # " + threadCount4Reload );
- }
+
+ int threadCount4Reload = 0;
+ for( Future<ArrayList<String>> fut : listFutEdg ){
+ threadCount4Reload++;
+ try{
+ fut.get();
+ LOGGER.debug(" -- back from PartialPropAndEdgeLoader4HistInit. pass # "
+ + passNo + ", thread # " + threadCount4Reload );
+ }
catch (InterruptedException e) {
threadFailCount++;
AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
- ErrorLogHelper.logException(ae);
+ ErrorLogHelper.logException(ae);
}
catch (ExecutionException e) {
threadFailCount++;
AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
- ErrorLogHelper.logException(ae);
+ ErrorLogHelper.logException(ae);
}
}
+
+ } // end of passes for reloading edges and properties
- executor.shutdown();
- if( threadFailCount > 0 ) {
- String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. ";
- LOGGER.debug(emsg);
- throw new Exception( emsg );
- }
+ executor.shutdown();
- // This is needed so we can see the data committed by the called threads
- graph1.tx().commit();
+ if( threadFailCount > 0 ) {
+ String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully. ";
+ LOGGER.debug(emsg);
+ throw new Exception( emsg );
+ }
- long timeEnd = System.nanoTime();
- diffTime = timeEnd - timeX;
- minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
- secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
- LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " +
- minCount + " minutes, " + secCount + " seconds " );
+ // This is needed so we can see the data committed by the called threads
+ graph1.tx().commit();
- long totalDiffTime = timeEnd - timeStart;
- long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime);
- long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount);
- LOGGER.debug(" -- TOTAL multi-threaded reload time: " +
- totalMinCount + " minutes, " + totalSecCount + " seconds " );
+ long timeEnd = System.nanoTime();
+ diffTime = timeEnd - timeX;
+ minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" -- To reload the edges and properties from snapshot files, it took: " +
+ minCount + " minutes, " + secCount + " seconds " );
+
+ long totalDiffTime = timeEnd - timeStart;
+ long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime);
+ long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount);
+ LOGGER.debug(" -- TOTAL multi-threaded reload time: " +
+ totalMinCount + " minutes, " + totalSecCount + " seconds " );
+
} else if (command.equals("CLEAR_ENTIRE_DATABASE")) {
// ------------------------------------------------------------------
// They are calling this to clear the db before re-loading it
@@ -1075,4 +1101,4 @@ public class DataSnapshot4HistInit {
}
-}
+} \ No newline at end of file