diff options
Diffstat (limited to 'src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java')
-rw-r--r-- | src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java b/src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java new file mode 100644 index 0000000..387f45e --- /dev/null +++ b/src/main/java/org/onap/aai/datasnapshot/PartialVertexLoader.java @@ -0,0 +1,223 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datasnapshot; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.util.HashMap; +import java.util.concurrent.Callable; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader; +import org.janusgraph.core.JanusGraph; + +import com.att.eelf.configuration.EELFLogger; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + + + +public class PartialVertexLoader implements Callable<HashMap<String,String>>{ + + private EELFLogger LOGGER; + + private JanusGraph jg; + private String fName; + private Long vertAddDelayMs; + private Long failurePauseMs; + private Long retryDelayMs; + private int maxAllowedErrors; + + public PartialVertexLoader (JanusGraph graph, String fn, Long vertDelay, Long failurePause, + Long retryDelay, int maxErrors, EELFLogger elfLog ){ + jg = graph; + fName = fn; + vertAddDelayMs = vertDelay; + failurePauseMs = failurePause; + retryDelayMs = retryDelay; + maxAllowedErrors = maxErrors; + LOGGER = elfLog; + } + + public HashMap<String,String> call() throws Exception { + + // NOTE - we will be loading one node at a time so that bad nodes can be ignored instead of causing the + // entire load to fail. + // + int entryCount = 0; + int retryCount = 0; + int failureCount = 0; + int retryFailureCount = 0; + HashMap <String,String> failedAttemptHash = new HashMap <String,String> (); + HashMap <String,String> old2NewVtxIdHash = new HashMap <String,String> (); + GraphSONReader gsr = GraphSONReader.build().create(); + + + // Read this file into a JSON object + JsonParser parser = new JsonParser(); + + try( BufferedReader br = new BufferedReader(new FileReader(fName))) { + // loop through the file lines and do PUT for each vertex or the edges depending on what the loadtype is + for(String line; (line = br.readLine()) != null; ) { + entryCount++; + Object ob = parser.parse(line); + JsonObject jObj = (JsonObject) ob; + // NOTE - we will need to keep track of how the newly generated vid's map + // to the old ones so we can aim the edges correctly later. + + // ---- Note -- This ONLY loads the vertexId and the label for each vertex ------------- + Thread.sleep(vertAddDelayMs); + + String oldVtxIdStr = jObj.get("id").getAsString(); + String vtxLabelStr = jObj.get("label").getAsString(); + try { + Vertex tmpV = jg.addVertex(vtxLabelStr); + String newVtxIdStr = tmpV.id().toString(); + old2NewVtxIdHash.put(oldVtxIdStr, newVtxIdStr); + } + catch ( Exception e ){ + failureCount++; + Thread.sleep(failurePauseMs); // Slow down if things are failing + LOGGER.debug(" >> addVertex FAILED for vtxId = " + oldVtxIdStr + ", label = [" + + vtxLabelStr + "]. ErrorMsg = [" + e.getMessage() + "]" ); + //e.printStackTrace(); + failedAttemptHash.put(oldVtxIdStr, vtxLabelStr); + if( failureCount > maxAllowedErrors ) { + LOGGER.debug(" >>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + try { + jg.tx().commit(); + } + catch ( Exception e ){ + failureCount++; + Thread.sleep(failurePauseMs); // Slow down if things are failing + LOGGER.debug(" -- COMMIT FAILED for Vtx ADD for vtxId = " + oldVtxIdStr + ", label = [" + + vtxLabelStr + "]. ErrorMsg = [" +e.getMessage() + "]" ); + //e.printStackTrace(); + failedAttemptHash.put(oldVtxIdStr, vtxLabelStr); + if( failureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + + } // End of looping over each line + + if( br != null ){ + br.close(); + } + } + catch (Exception e) { + LOGGER.debug(" --- Failed in the main loop for Buffered-Reader item # " + entryCount + + ", fName = " + fName ); + LOGGER.debug(" --- msg = " + e.getMessage() ); + e.printStackTrace(); + throw e; + } + + // --------------------------------------------------------------------------- + // Now Re-Try any failed requests that might have Failed on the first pass. + // --------------------------------------------------------------------------- + try { + for (String failedVidStr : failedAttemptHash.keySet()) { + // Take a little nap, and retry this failed attempt + LOGGER.debug("DEBUG >> We will sleep for " + retryDelayMs + " and then RETRY any failed vertex ADDs. "); + Thread.sleep(retryDelayMs); + + retryCount++; + // When a vertex Add fails we store the label as the data in the failedAttemptHash. + String failedLabel = failedAttemptHash.get(failedVidStr); + LOGGER.debug("DEBUG >> RETRY << " + + failedVidStr + ", label = " + failedLabel ); + try { + Vertex tmpV = jg.addVertex(failedLabel); + String newVtxIdStr = tmpV.id().toString(); + old2NewVtxIdHash.put(failedVidStr, newVtxIdStr); + } + catch ( Exception e ){ + retryFailureCount++; + LOGGER.debug(" -- addVertex FAILED for RETRY for vtxId = " + + failedVidStr + ", label = [" + failedLabel + + "]. ErrorMsg = [" +e.getMessage() + "]" ); + e.printStackTrace(); + if( retryFailureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + try { + jg.tx().commit(); + // If this worked, we can take it off of the failed list + failedAttemptHash.remove(failedVidStr); + } + catch ( Exception e ){ + retryFailureCount++; + LOGGER.debug(" -- COMMIT FAILED for RETRY for vtxId = " + failedVidStr + + ", label = [" + failedLabel + "]. ErrorMsg = [" + e.getMessage() + "]" ); + e.printStackTrace(); + if( retryFailureCount > maxAllowedErrors ) { + LOGGER.debug(">>> Abandoning PartialVertexLoader() because " + + "Max Allowed Error count was exceeded for this thread. (max = " + + maxAllowedErrors + ". "); + throw new Exception(" ERROR - Max Allowed Error count exceeded for this thread. (max = " + maxAllowedErrors + ". "); + } + else { + continue; + } + } + } // End of looping over failed attempt hash and doing retries + + } + catch ( Exception e ){ + LOGGER.debug(" -- error in RETRY block. ErrorMsg = [" +e.getMessage() + "]" ); + e.printStackTrace(); + throw e; + } + + // This would need to be properly logged... + LOGGER.debug(">>> After Processing in PartialVertexLoader(): " + + entryCount + " records processed. " + failureCount + " records failed. " + + retryCount + " RETRYs processed. " + retryFailureCount + " RETRYs failed. "); + + return old2NewVtxIdHash; + + }// end of call() + + + +} + + |