aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java')
-rw-r--r--src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java270
1 files changed, 180 insertions, 90 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
index e7ae5ec..217d6c0 100644
--- a/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
+++ b/src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
@@ -25,7 +25,14 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -33,34 +40,33 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.janusgraph.core.JanusGraph;
+import org.janusgraph.core.JanusGraphFactory;
+import org.janusgraph.core.util.JanusGraphCleanup;
import org.onap.aai.dbmap.AAIGraph;
import org.onap.aai.dbmap.AAIGraphConfig;
import org.onap.aai.exceptions.AAIException;
import org.onap.aai.logging.ErrorLogHelper;
import org.onap.aai.util.AAIConfig;
import org.onap.aai.util.AAIConstants;
-import org.onap.aai.util.GraphAdminConstants;
import org.onap.aai.util.AAISystemExitUtil;
import org.onap.aai.util.FormatDate;
+import org.onap.aai.util.GraphAdminConstants;
import org.onap.aai.util.GraphAdminDBUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.att.eelf.configuration.Configuration;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
-import org.janusgraph.core.JanusGraph;
-import org.janusgraph.core.JanusGraphFactory;
-import org.janusgraph.core.util.JanusGraphCleanup;
-
public class DataSnapshot {
- private static EELFLogger LOGGER;
+ private static Logger LOGGER;
/* Using realtime d */
private static final String REALTIME_DB = "realtime";
@@ -108,17 +114,16 @@ public class DataSnapshot {
public boolean executeCommand(String[] args, boolean success,
Boolean dbClearFlag, JanusGraph graph, String command,
String oldSnapshotFileName) {
-
+
// Set the logging file properties to be used by EELFManager
System.setProperty("aai.service.name", DataSnapshot.class.getSimpleName());
Properties props = System.getProperties();
props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS);
props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
- LOGGER = EELFManager.getInstance().getLogger(DataSnapshot.class);
+ LOGGER = LoggerFactory.getLogger(DataSnapshot.class);
cArgs = new CommandLineArgs();
String itemName = "aai.datasnapshot.threads.for.create";
-
try {
String val = AAIConfig.get(itemName);
if( val != null && !val.equals("") ){
@@ -128,9 +133,19 @@ public class DataSnapshot {
LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
}
int threadCount4Create = cArgs.threadCount;
-
+
+ itemName = "aai.datasnapshot.max.nodes.per.file.for.create";
+ try {
+ String val = AAIConfig.get(itemName);
+ if( val != null && !val.equals("") ){
+ cArgs.maxNodesPerFile = Long.parseLong(val);
+ }
+ }catch ( Exception e ){
+ LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
+ }
+ long maxNodesPerFile4Create = cArgs.maxNodesPerFile;
+
cArgs.snapshotType = "graphson";
-
Long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS;
itemName = "aai.datasnapshot.vertex.add.delay.ms";
try {
@@ -215,10 +230,10 @@ public class DataSnapshot {
jCommander = new JCommander(cArgs, args);
jCommander.setProgramName(DataSnapshot.class.getSimpleName());
} catch (ParameterException e1) {
- LOGGER.error("Error - invalid value passed to list of args - "+args);
+ AAIException ae = new AAIException("AAI_6128", e1 , "Error - invalid value passed to list of args - "+args);
+ ErrorLogHelper.logException(ae);
AAISystemExitUtil.systemExitCloseAAIGraph(1);
}
-
if (args.length >= 1) {
command = cArgs.command;
@@ -253,6 +268,23 @@ public class DataSnapshot {
}
LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create );
+ try {
+ maxNodesPerFile4Create = cArgs.maxNodesPerFile;
+ }
+ catch ( NumberFormatException nfe ){
+ ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+
+ if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){
+ ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)");
+ AAISystemExitUtil.systemExitCloseAAIGraph(1);
+ }
+ LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create );
+
// If doing a "threaded" snapshot, they need to specify how many threads to use
// They can also use debug mode if they pass the word "DEBUG" to do the nodes one at a time to see where it breaks.
if( cArgs.debugFlag.equals("DEBUG") ){
@@ -326,42 +358,45 @@ public class DataSnapshot {
}
+ threadCount4Create = cArgs.threadCount;
+ maxNodesPerFile4Create = cArgs.maxNodesPerFile;
//Print Defaults
- LOGGER.info("DataSnapshot command is [" + cArgs.command + "]");
- LOGGER.info("File name to reload snapshot [" + cArgs.oldFileName + "]");
- LOGGER.info("snapshotType is [" + cArgs.snapshotType + "]");
- LOGGER.info("Thread count is [" + cArgs.threadCount + "]");
- LOGGER.info("Debug Flag is [" + cArgs.debugFlag + "]");
- LOGGER.info("DebugAddDelayTimer is [" + cArgs.debugAddDelayTime + "]");
- LOGGER.info("VertAddDelayMs is [" + cArgs.vertAddDelayMs + "]");
- LOGGER.info("FailureDelayMs is [" + cArgs.failureDelayMs + "]");
- LOGGER.info("RetryDelayMs is [" + cArgs.retryDelayMs + "]");
- LOGGER.info("MaxErrorsPerThread is [" + cArgs.maxErrorsPerThread + "]");
- LOGGER.info("VertToEdgeProcDelay is [" + cArgs.vertToEdgeProcDelay + "]");
- LOGGER.info("StaggerThreadDelay is [" + cArgs.staggerThreadDelay + "]");
- LOGGER.info("Caller process is ["+ cArgs.caller + "]");
+ LOGGER.debug("DataSnapshot command is [" + cArgs.command + "]");
+ LOGGER.debug("File name to reload snapshot [" + cArgs.oldFileName + "]");
+ LOGGER.debug("snapshotType is [" + cArgs.snapshotType + "]");
+ LOGGER.debug("Thread count is [" + cArgs.threadCount + "]");
+ LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]");
+ LOGGER.debug("Debug Flag is [" + cArgs.debugFlag + "]");
+ LOGGER.debug("DebugAddDelayTimer is [" + cArgs.debugAddDelayTime + "]");
+ LOGGER.debug("VertAddDelayMs is [" + cArgs.vertAddDelayMs + "]");
+ LOGGER.debug("FailureDelayMs is [" + cArgs.failureDelayMs + "]");
+ LOGGER.debug("RetryDelayMs is [" + cArgs.retryDelayMs + "]");
+ LOGGER.debug("MaxErrorsPerThread is [" + cArgs.maxErrorsPerThread + "]");
+ LOGGER.debug("VertToEdgeProcDelay is [" + cArgs.vertToEdgeProcDelay + "]");
+ LOGGER.debug("StaggerThreadDelay is [" + cArgs.staggerThreadDelay + "]");
+ LOGGER.debug("Caller process is ["+ cArgs.caller + "]");
+
//Print non-default values
if (!AAIConfig.isEmpty(cArgs.fileName)){
- LOGGER.info("Snapshot file name (if not default) to use is [" + cArgs.fileName + "]");
+ LOGGER.debug("Snapshot file name (if not default) to use is [" + cArgs.fileName + "]");
}
if (!AAIConfig.isEmpty(cArgs.snapshotDir)){
- LOGGER.info("Snapshot file Directory path (if not default) to use is [" + cArgs.snapshotDir + "]");
+ LOGGER.debug("Snapshot file Directory path (if not default) to use is [" + cArgs.snapshotDir + "]");
}
if (!AAIConfig.isEmpty(cArgs.oldFileDir)){
- LOGGER.info("Directory path (if not default) to load the old snapshot file from is [" + cArgs.oldFileDir + "]");
+ LOGGER.debug("Directory path (if not default) to load the old snapshot file from is [" + cArgs.oldFileDir + "]");
}
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
-
AAIConfig.init();
ErrorLogHelper.loadProperties();
LOGGER.debug("Command = " + command + ", oldSnapshotFileName = [" + oldSnapshotFileName + "].");
String targetDir = AAIConstants.AAI_HOME + AAIConstants.AAI_FILESEP + "logs" + AAIConstants.AAI_FILESEP + "data" + AAIConstants.AAI_FILESEP + "dataSnapshots";
// Make sure the dataSnapshots directory is there
new File(targetDir).mkdirs();
-
LOGGER.debug(" ---- NOTE --- about to open graph (takes a little while) ");
if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
@@ -370,7 +405,7 @@ public class DataSnapshot {
// They want to take a snapshot on a single thread and have it go in a single file
// NOTE - they can't use the DEBUG option in this case.
// -------------------------------------------------------------------------------
- LOGGER.debug("\n>>> Command = " + command );
+ LOGGER.debug(" Command = " + command );
verifyGraph(AAIGraph.getInstance().getGraph());
FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
String dteStr = fd.getDateTime();
@@ -390,19 +425,20 @@ public class DataSnapshot {
}
else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
- && threadCount4Create > 1 ){
+ && threadCount4Create > 1 ){
// ------------------------------------------------------------
// They want the creation of the snapshot to be spread out via
// threads and go to multiple files
// ------------------------------------------------------------
- LOGGER.debug("\n>>> Command = " + command );
+ LOGGER.debug(" Command = " + command );
String newSnapshotOutFname;
if (!AAIConfig.isEmpty(cArgs.fileName)){
newSnapshotOutFname = cArgs.fileName;
} else {
- FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
- String dteStr = fd.getDateTime();
- newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr;
+ FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
+ String dteStr = fd.getDateTime();
+ newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP
+ + "dataSnapshot.graphSON." + dteStr;
}
verifyGraph(AAIGraph.getInstance().getGraph());
graph = AAIGraph.getInstance().getGraph();
@@ -410,43 +446,57 @@ public class DataSnapshot {
GraphAdminDBUtils.logConfigs(graph.configuration());
long timeA = System.nanoTime();
- LOGGER.debug(" Need to divide vertexIds across this many threads: " + threadCount4Create );
- HashMap <String,ArrayList> vertListHash = new HashMap <String,ArrayList> ();
- for( int t = 0; t < threadCount4Create; t++ ){
- ArrayList <Vertex> vList = new ArrayList <Vertex> ();
- String tk = "" + t;
- vertListHash.put( tk, vList);
- }
LOGGER.debug("Count how many nodes are in the db. ");
long totalVertCount = graph.traversal().V().count().next();
- LOGGER.debug(" Total Count of Nodes in DB = " + totalVertCount + ".");
- long nodesPerFile = totalVertCount / threadCount4Create;
- LOGGER.debug(" Thread count = " + threadCount4Create + ", each file will get (roughly): " + nodesPerFile + " nodes.");
long timeA2 = System.nanoTime();
long diffTime = timeA2 - timeA;
long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
LOGGER.debug(" -- To count all vertices in DB it took: " +
minCount + " minutes, " + secCount + " seconds " );
+ LOGGER.debug(" Total Count of Nodes in DB = " + totalVertCount + ".");
- long vtxIndex = 0;
+ int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create,
+ maxNodesPerFile4Create );
+ int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);
+ long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);
+
+ LOGGER.debug(" We will run this many simultaneous threads: " + threadCount4Create );
+ LOGGER.debug(" Required number of passes: " + threadPassesNeeded );
+ LOGGER.debug(" Max Nodes per file: " + maxNodesPerFile4Create );
+ LOGGER.debug(" We will generate this many files: " + fileCount4Create );
+ LOGGER.debug(" Each file will have (roughly): " + nodesPerFile + " nodes.");
+ LOGGER.debug(" Now, divide vertexIds across this many files: " + fileCount4Create );
+
+ HashMap <String,ArrayList<Long>> vertIdListHash = new HashMap <String,ArrayList<Long>> ();
+ for( int t = 0; t < fileCount4Create; t++ ){
+ ArrayList <Long> vIdList = new ArrayList <Long> ();
+ String tk = "" + t;
+ vertIdListHash.put( tk, vIdList);
+ }
+
int currentTNum = 0;
String currentTKey = "0";
long thisThrIndex = 0;
- Iterator <Vertex> vtxItr = graph.vertices();
+ Iterator <Vertex> vtxItr = graph.vertices(); // Getting ALL vertices!
while( vtxItr.hasNext() ){
- // Divide up all the vertices so we can process them on different threads
- vtxIndex++;
+ // Divide up ALL the vertices so we can process them on different threads
thisThrIndex++;
- if( (thisThrIndex > nodesPerFile) && (currentTNum < threadCount4Create -1) ){
- // We will need to start adding to the Hash for the next thread
+ if( (thisThrIndex >= nodesPerFile) && (currentTNum < (fileCount4Create - 1)) ){
+ // We will need to start adding to the Hash for the next file
currentTNum++;
currentTKey = "" + currentTNum;
thisThrIndex = 0;
}
- (vertListHash.get(currentTKey)).add(vtxItr.next());
+ long vid = (long)(vtxItr.next()).id();
+ (vertIdListHash.get(currentTKey)).add(vid);
}
+ // close this graph instance thing here since we have all the ids
+ graph.tx().rollback();
+ graph.tx().close();
+
+
long timeB = System.nanoTime();
diffTime = timeB - timeA2;
minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -455,21 +505,38 @@ public class DataSnapshot {
minCount + " minutes, " + secCount + " seconds " );
// Need to print out each set of vertices using it's own thread
- ArrayList <Thread> threadArr = new ArrayList <Thread> ();
- for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
- String thNumStr = "" + thNum;
- String subFName = newSnapshotOutFname + ".P" + thNumStr;
- Thread thr = new Thread(new PrintVertexDetails(graph, subFName, vertListHash.get(thNumStr),
- debug4Create, debugAddDelayTime, snapshotType) );
- thr.start();
- threadArr.add(thr);
- }
+ // NOTE - we may have more files to generate than number of threads - which
+ // just means that ALL the files won't necessarily be generated in parallel.
- // Make sure all the threads finish before moving on.
- for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
- if( null != threadArr.get(thNum) ){
- (threadArr.get(thNum)).join();
+ int fileNo = 0;
+ for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+ ArrayList <Thread> threadArr = new ArrayList <Thread> ();
+ // For each Pass, kick off all the threads and wait until they finish
+ long timeP1 = System.nanoTime();
+ for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
+ String fileNoStr = "" + fileNo;
+ String subFName = newSnapshotOutFname + ".P" + fileNoStr;
+ LOGGER.debug(" DEBUG >>> kick off pass # " + passNo + ", thread # " + thNum);
+ Thread thr = new Thread(new PrintVertexDetails(graph, subFName,
+ vertIdListHash.get(fileNoStr),
+ debug4Create, debugAddDelayTime,
+ snapshotType, LOGGER) );
+ thr.start();
+ threadArr.add(thr);
+ fileNo++;
+ }
+ // Make sure all the threads finish before considering this Pass finished.
+ for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
+ if( null != threadArr.get(thNum) ){
+ (threadArr.get(thNum)).join();
+ }
}
+ long timeP2 = System.nanoTime();
+ diffTime = timeP2 - timeP1;
+ minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+ secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+ LOGGER.debug(" Pass number " + passNo + " (out of " + threadPassesNeeded +
+ ") took " + minCount + " minutes, " + secCount + " seconds ");
}
long timeC = System.nanoTime();
@@ -487,7 +554,7 @@ public class DataSnapshot {
// snapshot is written to. Ie. if you have a single-file snapshot,
// then this will be single-threaded.
//
- LOGGER.debug("\n>>> Command = " + command );
+ LOGGER.debug(" Command = " + command );
if (cArgs.oldFileDir != null && cArgs.oldFileDir != ""){
targetDir = cArgs.oldFileDir;
@@ -497,7 +564,7 @@ public class DataSnapshot {
JanusGraph graph1 = AAIGraph.getInstance().getGraph();
long timeStart = System.nanoTime();
- HashMap <String,String> old2NewVertIdMap = new <String,String> HashMap ();
+ HashMap <String,String> old2NewVertIdMap = new HashMap <String,String> ();
// We're going to try loading in the vertices - without edges or properties
// using Separate threads
@@ -535,11 +602,13 @@ public class DataSnapshot {
}
catch (InterruptedException e) {
threadFailCount++;
- e.printStackTrace();
+ AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
+ ErrorLogHelper.logException(ae);
}
catch (ExecutionException e) {
threadFailCount++;
- e.printStackTrace();
+ AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
+ ErrorLogHelper.logException(ae);
}
}
@@ -602,11 +671,13 @@ public class DataSnapshot {
}
catch (InterruptedException e) {
threadFailCount++;
- e.printStackTrace();
+ AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
+ ErrorLogHelper.logException(ae);
}
catch (ExecutionException e) {
threadFailCount++;
- e.printStackTrace();
+ AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
+ ErrorLogHelper.logException(ae);
}
}
@@ -639,13 +710,13 @@ public class DataSnapshot {
// They are calling this to clear the db before re-loading it
// later
// ------------------------------------------------------------------
- LOGGER.debug("\n>>> Command = " + command );
+ LOGGER.debug(" Command = " + command );
// First - make sure the backup file(s) they will be using can be
// found and has(have) data.
// getFilesToProcess makes sure the file(s) exist and have some data.
getFilesToProcess(targetDir, oldSnapshotFileName, true);
- LOGGER.debug("\n>>> WARNING <<<< ");
+ LOGGER.debug(" WARNING <<<< ");
LOGGER.debug(">>> All data and schema in this database will be removed at this point. <<<");
LOGGER.debug(">>> Processing will begin in 5 seconds. <<<");
LOGGER.debug(">>> WARNING <<<< ");
@@ -660,7 +731,7 @@ public class DataSnapshot {
LOGGER.debug(" Begin clearing out old data. ");
String rtConfig = AAIConstants.REALTIME_DB_CONFIG;
- String serviceName = System.getProperty("aai.service.name", "NA");
+ String serviceName = System.getProperty("aai.service.name", DataSnapshot.class.getSimpleName());
LOGGER.debug("Getting new configs for clearig");
PropertiesConfiguration propertiesConfiguration = new AAIGraphConfig.Builder(rtConfig).forService(serviceName).withGraphType(REALTIME_DB).buildConfiguration();
LOGGER.debug("Open New Janus Graph");
@@ -681,7 +752,7 @@ public class DataSnapshot {
// of snapshot files. Either way, this command will restore via single
// threaded processing.
// ---------------------------------------------------------------------------
- LOGGER.debug("\n>>> Command = " + command );
+ LOGGER.debug(" Command = " + command );
verifyGraph(AAIGraph.getInstance().getGraph());
graph = AAIGraph.getInstance().getGraph();
GraphAdminDBUtils.logConfigs(graph.configuration());
@@ -771,14 +842,11 @@ public class DataSnapshot {
}
} catch (AAIException e) {
- ErrorLogHelper.logError("AAI_6128", e.getMessage());
- LOGGER.error("Encountered an exception during the datasnapshot: ", e);
- e.printStackTrace();
+ ErrorLogHelper.logException(e);
success = false;
} catch (Exception ex) {
- ErrorLogHelper.logError("AAI_6128", ex.getMessage());
- LOGGER.error("Encountered an exception during the datasnapshot: ", ex);
- ex.printStackTrace();
+ AAIException ae = new AAIException("AAI_6128", ex , "Encountered an exception during the datasnapshot");
+ ErrorLogHelper.logException(ae);
success = false;
} finally {
if (!dbClearFlag && graph != null && !MIGRATION_PROCESS_NAME.equalsIgnoreCase(source)) {
@@ -879,9 +947,28 @@ public class DataSnapshot {
}
- class CommandLineArgs {
-
+
+ public static int figureOutFileCount( long totalVertCount, int threadCount4Create,
+ long maxNodesPerFile ) {
+
+ // NOTE - we would always like to use all of our threads. That is, if
+ // we could process all the data with 16 threads, but our threadCount4Create is
+ // only 15, we will do two passes and use all 15 threads each pass which will
+ // create a total of 30 files. Each file will be a bit smaller so the overall
+ // time for the two passes should be faster.
+ if( totalVertCount <= 0 || threadCount4Create <= 0 || maxNodesPerFile <= 0) {
+ return 1;
+ }
+ long maxNodesPerPass = threadCount4Create * maxNodesPerFile;
+ int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);
+ int fileCt = threadCount4Create * numberOfPasses;
+
+ return fileCt;
+ }
+
+
+ class CommandLineArgs {
@Parameter(names = "--help", help = true)
public boolean help;
@@ -897,6 +984,9 @@ public class DataSnapshot {
@Parameter(names = "-threadCount", description = "thread count for create")
public int threadCount = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_THREADS_FOR_CREATE;
+
+ @Parameter(names = "-maxNodesPerFile", description = "Max nodes per file")
+ public long maxNodesPerFile = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_NODES_PER_FILE_FOR_CREATE;
@Parameter(names = "-debugFlag", description = "DEBUG flag")
public String debugFlag = "";
@@ -924,7 +1014,7 @@ public class DataSnapshot {
@Parameter(names = "-staggerThreadDelay", description = "thread delay stagger time in ms")
public long staggerThreadDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_STAGGER_THREAD_DELAY_MS;
-
+
@Parameter(names = "-fileName", description = "file name for generating snapshot ")
public String fileName = "";