aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/migration/v12/MigrateINVPhysicalInventory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/migration/v12/MigrateINVPhysicalInventory.java')
-rw-r--r--src/main/java/org/onap/aai/migration/v12/MigrateINVPhysicalInventory.java361
1 files changed, 361 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/migration/v12/MigrateINVPhysicalInventory.java b/src/main/java/org/onap/aai/migration/v12/MigrateINVPhysicalInventory.java
new file mode 100644
index 0000000..0c85481
--- /dev/null
+++ b/src/main/java/org/onap/aai/migration/v12/MigrateINVPhysicalInventory.java
@@ -0,0 +1,361 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.migration.v12;
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.javatuples.Pair;
+import org.onap.aai.db.props.AAIProperties;
+import org.onap.aai.edges.EdgeIngestor;
+import org.onap.aai.introspection.Introspector;
+import org.onap.aai.introspection.LoaderFactory;
+import org.onap.aai.migration.*;
+import org.onap.aai.serialization.db.EdgeSerializer;
+import org.onap.aai.serialization.engines.TransactionalGraphEngine;
+import org.onap.aai.setup.SchemaVersions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+@MigrationPriority(25)
+@MigrationDangerRating(100)
+public class MigrateINVPhysicalInventory extends Migrator {
+
+ private static final String NODE_TYPE_PNF = "pnf";
+ private static final String NODE_TYPE_PINTERFACE = "p-interface";
+ private static final String NODE_TYPE_PINTERFACES = "p-interfaces";
+ private static final String PROPERTY_PNF_NAME = "pnf-name";
+ private static final String PROPERTY_INTERFACE_NAME = "interface-name";
+ protected final AtomicInteger skippedRowsCount = new AtomicInteger(0);
+ protected final AtomicInteger processedRowsCount = new AtomicInteger(0);
+
+ private boolean success = true;
+ private boolean checkLog = false;
+ private GraphTraversalSource g = null;
+ protected int headerLength;
+
+ protected final AtomicInteger falloutRowsCount = new AtomicInteger(0);
+ private static final String homeDir = System.getProperty("AJSC_HOME");
+ private static List<String> dmaapMsgList = new ArrayList<String>();
+
+ public MigrateINVPhysicalInventory(TransactionalGraphEngine engine, LoaderFactory loaderFactory, EdgeIngestor edgeIngestor, EdgeSerializer edgeSerializer, SchemaVersions schemaVersions) {
+ super(engine, loaderFactory, edgeIngestor, edgeSerializer, schemaVersions);
+ this.g = this.engine.asAdmin().getTraversalSource();
+ }
+
+ @Override
+ public void run() {
+ logger.info("---------- Start migration of INV File Physical Inventory ----------");
+ String configDir = System.getProperty("BUNDLECONFIG_DIR");
+ if (homeDir == null) {
+ logger.info("ERROR: Could not find sys prop AJSC_HOME");
+ success = false;
+ return;
+ }
+ if (configDir == null) {
+ success = false;
+ return;
+ }
+
+ String feedDir = homeDir + "/" + configDir + "/" + "migration-input-files/sarea-inventory/";
+ String fileName = feedDir+ "inv.csv";
+ logger.info(fileName);
+ logger.info("---------- Processing INV Entries from file ----------");
+
+
+ try {
+ Map<String, Set<String>> data = loadFile(fileName);
+ this.processData(data);
+
+ logger.info("\n ******* Summary Report for Inv File Physical Migration *******");
+ logger.info("Number of distinct pnfs processed: "+data.keySet().size());
+ logger.info("Rows processed: " + processedRowsCount);
+ logger.info("Rows skipped: "+ skippedRowsCount);
+ logger.info("Fallout Rows count: " + falloutRowsCount);
+
+ } catch (FileNotFoundException e) {
+ logger.info("ERROR: Could not file file " + fileName, e.getMessage());
+ success = false;
+ checkLog = true;
+ } catch (IOException e) {
+ logger.info("ERROR: Issue reading file " + fileName, e);
+ success = false;
+ } catch (Exception e) {
+ logger.info("encountered exception", e);
+ e.printStackTrace();
+ success = false;
+ }
+ }
+
+ protected void processData(Map<String, Set<String>> data) throws Exception{
+
+ for (Map.Entry<String, Set<String>> entry : data.entrySet()) {
+ String pnfName = entry.getKey();
+ final Set<String> newPInterfaces = entry.getValue();
+ Introspector pnf;
+ Vertex pnfVertex;
+ EventAction eventAction = EventAction.UPDATE;
+ boolean pnfChangesMade = false;
+
+ if (pnfExists(pnfName)) {
+ pnf = serializer.getLatestVersionView(getPnf(pnfName));
+ pnfVertex = getPnf(pnfName);
+ } else {
+ pnf = loader.introspectorFromName(NODE_TYPE_PNF);
+ pnf.setValue(PROPERTY_PNF_NAME, pnfName);
+ pnfVertex = serializer.createNewVertex(pnf);
+ eventAction = EventAction.CREATE;
+ pnfChangesMade = true;
+ }
+
+ if (pnfChangesMade) {
+ serializer.serializeSingleVertex(pnfVertex, pnf, getMigrationName());
+ logger.info ("\t Pnf [" + pnfName +"] created with vertex id "+pnfVertex);
+// pnf = serializer.getLatestVersionView(pnfVertex);
+// this.notificationHelper.addEvent(pnfVertex, serializer.getLatestVersionView(pnfVertex), eventAction, this.serializer.getURIForVertex(pnfVertex, false));
+// logger.info("\t Dmaap notification sent for creation of pnf ");
+ String dmaapMsg = System.nanoTime() + "_" + pnfVertex.id().toString() + "_" + pnfVertex.value("resource-version").toString();
+ dmaapMsgList.add(dmaapMsg);
+ } else {
+ logger.info("\t Pnf ["+ pnfName +"] already exists ");
+ }
+
+ if (!newPInterfaces.isEmpty()) {
+ Introspector pInterfacesIntrospector = pnf.getWrappedValue(NODE_TYPE_PINTERFACES);
+ if ( pInterfacesIntrospector == null) {
+ pInterfacesIntrospector = pnf.newIntrospectorInstanceOfProperty(NODE_TYPE_PINTERFACES);
+ pnf.setValue(NODE_TYPE_PINTERFACES, pInterfacesIntrospector.getUnderlyingObject());
+ }
+
+ for (Introspector introspector : pInterfacesIntrospector.getWrappedListValue(NODE_TYPE_PINTERFACE)) {
+ String interfaceName = introspector.getValue(PROPERTY_INTERFACE_NAME).toString();
+ if (newPInterfaces.contains(interfaceName)) {
+ newPInterfaces.remove(interfaceName);
+ }
+ }
+
+ for (String pInterfaceName : newPInterfaces) {
+ Introspector pInterface = loader.introspectorFromName(NODE_TYPE_PINTERFACE);
+ pInterface.setValue(PROPERTY_INTERFACE_NAME, pInterfaceName);
+ Vertex pInterfaceVertex = serializer.createNewVertex(pInterface);
+ pInterfaceVertex.property(AAIProperties.AAI_URI, pnfVertex.property(AAIProperties.AAI_URI).value() + "/p-interfaces/p-interface/" + pInterfaceName);
+ edgeSerializer.addTreeEdge(g, pnfVertex, pInterfaceVertex);
+ eventAction = EventAction.CREATE;
+ serializer.serializeSingleVertex(pInterfaceVertex, pInterface, getMigrationName());
+ logger.info ("\t p-interface [" + pInterfaceName +"] created with vertex id "+ pInterfaceVertex + " on pnf ["+pnfName+"]");
+// pInterface = serializer.getLatestVersionView(pInterfaceVertex);
+// this.notificationHelper.addEvent(pInterfaceVertex, pInterface, eventAction, this.serializer.getURIForVertex(pInterfaceVertex, false));
+// logger.info("\t Dmaap notification sent for creation of p-interface ");
+ String dmaapMsg = System.nanoTime() + "_" + pInterfaceVertex.id().toString() + "_" + pInterfaceVertex.value("resource-version").toString();
+ dmaapMsgList.add(dmaapMsg);
+ }
+ }
+ }
+ }
+
+ protected boolean pnfExists(String pnfName) {
+ return g.V().has(PROPERTY_PNF_NAME, pnfName).has(AAIProperties.NODE_TYPE, NODE_TYPE_PNF).hasNext();
+ }
+
+ protected Vertex getPnf(String pnfName) {
+ return g.V().has(PROPERTY_PNF_NAME, pnfName).has(AAIProperties.NODE_TYPE, NODE_TYPE_PNF).next();
+ }
+
+ /**
+ * Load file to the map for processing
+ * @param fileName
+ * @return
+ * @throws Exception
+ */
+ protected Map<String,Set<String>> loadFile(String fileName) throws Exception {
+ List<String> lines = Files.readAllLines(Paths.get(fileName));
+ return this.getFileContents(lines);
+ }
+
+ /**
+ * Get lines from file.
+ * @param lines
+ * @return
+ * @throws Exception
+ */
+ protected Map<String,Set<String>> getFileContents(List<String> lines) throws Exception {
+
+ final Map<String,Set<String>> fileContents = new ConcurrentHashMap<>();
+
+ processAndRemoveHeader(lines);
+
+ logger.info("Total rows count excluding header: "+ lines.size());
+
+ lines.stream()
+ .filter(line -> !line.isEmpty())
+ .map(line -> Arrays.asList(line.split("\\s*,\\s*", -1)))
+// .filter(this::verifyLine)
+ .map(this::processLine)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .forEach(p -> {
+ processedRowsCount.getAndIncrement();
+ String pnfName = p.getValue0();
+ if (!fileContents.containsKey(pnfName)) {
+ Set<String> s = new HashSet<>();
+ fileContents.put(p.getValue0(), s);
+ }
+ if (p.getValue1() != null) {
+ fileContents.get(p.getValue0()).add(p.getValue1());
+ }
+ })
+ ;
+
+ return fileContents;
+
+
+ }
+
+ /**
+ * Verify line has the necessary details.
+ * @param line
+ * @return
+ */
+ protected boolean verifyLine(List<String> line) {
+ if (line.size() != headerLength) {
+ logger.info("ERROR: INV line should contain " + headerLength + " columns, contains " + line.size() + " instead.");
+ this.skippedRowsCount.getAndIncrement();
+ return false;
+ }
+ return true;
+ }
+
+ /**
+* * Get the pnf name and interface name from the line.
+ * @param line
+ * @return
+ */
+ protected Optional<Pair<String,String>> processLine(List<String> line) {
+ logger.info("Processing line... " + line.toString());
+ int lineSize = line.size();
+ if (lineSize < 11){
+ logger.info("Skipping line, does not contain pnf and/or port columns");
+ skippedRowsCount.getAndIncrement();
+ return Optional.empty();
+ }
+
+ String pnfName = line.get(0);
+ String portAid = line.get(11).replaceAll("^\"|\"$", "").replaceAll("\\s+","");
+
+ if (pnfName.isEmpty() && portAid.isEmpty()) {
+ logger.info("Line missing pnf name and port " + line);
+ falloutRowsCount.getAndIncrement();
+ return Optional.empty();
+ } else if (pnfName.isEmpty()) {
+ logger.info("Line missing pnf name" + line);
+ falloutRowsCount.getAndIncrement();
+ return Optional.empty();
+ } else if (portAid.isEmpty()) {
+ logger.info("Line missing port " + line);
+ return Optional.of(Pair.with(pnfName, null));
+ }
+ return Optional.of(Pair.with(pnfName, portAid));
+ }
+
+ /**
+ * Verify header of the csv and remove it from the list.
+ * @param lines
+ * @throws Exception
+ */
+ protected String processAndRemoveHeader(List<String> lines) throws Exception {
+ String firstLine;
+ if (lines.isEmpty()) {
+ String msg = "ERROR: Missing Header in file";
+ success = false;
+ logger.error(msg);
+ throw new Exception(msg);
+ } else {
+ firstLine = lines.get(0);
+ }
+
+ this.headerLength = firstLine.split("\\s*,\\s*", -1).length;
+ logger.info("headerLength: " + headerLength);
+ if (this.headerLength < 21){
+ String msg = "ERROR: Input file should have 21 columns";
+ success = false;
+ logger.error(msg);
+ throw new Exception(msg);
+ }
+
+ return lines.remove(0);
+ }
+
+
+ @Override
+ public Status getStatus() {
+ if (checkLog) {
+ return Status.CHECK_LOGS;
+ }
+ else if (success) {
+ return Status.SUCCESS;
+ }
+ else {
+ return Status.FAILURE;
+ }
+ }
+
+ @Override
+ public void commit() {
+ engine.commit();
+ createDmaapFiles(dmaapMsgList);
+ }
+
+ @Override
+ public Optional<String[]> getAffectedNodeTypes() {
+ return Optional.of(new String[]{NODE_TYPE_PNF});
+ }
+
+ @Override
+ public String getMigrationName() {
+ return "MigrateINVPhysicalInventory";
+ }
+
+}