aboutsummaryrefslogtreecommitdiffstats
path: root/asdctool/src
diff options
context:
space:
mode:
Diffstat (limited to 'asdctool/src')
-rw-r--r--asdctool/src/main/java/org/openecomp/sdc/asdctool/impl/DataMigration.java1511
-rw-r--r--asdctool/src/main/java/org/openecomp/sdc/asdctool/main/EsToCassandraDataMigrationMenu.java6
2 files changed, 743 insertions, 774 deletions
diff --git a/asdctool/src/main/java/org/openecomp/sdc/asdctool/impl/DataMigration.java b/asdctool/src/main/java/org/openecomp/sdc/asdctool/impl/DataMigration.java
index 9b1e3dc227..c399e58276 100644
--- a/asdctool/src/main/java/org/openecomp/sdc/asdctool/impl/DataMigration.java
+++ b/asdctool/src/main/java/org/openecomp/sdc/asdctool/impl/DataMigration.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,11 @@
package org.openecomp.sdc.asdctool.impl;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import fj.data.Either;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -38,8 +43,8 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.EnumMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TimeZone;
-
import org.apache.commons.lang.SystemUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -77,778 +82,742 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import com.carrotsearch.hppc.cursors.ObjectCursor;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import fj.data.Either;
-
/**
* Created by mlando on 5/16/2016.
*/
public class DataMigration {
- private Gson gson = new Gson();
-
- private ObjectMapper jsonMapper = new ObjectMapper();
-
- private static Logger log = LoggerFactory.getLogger(DataMigration.class.getName());
-
- protected ElasticSearchClient elasticSearchClient;
- @Autowired
- protected AuditCassandraDao auditCassandraDao;
- @Autowired
- protected ArtifactCassandraDao artifactCassandraDao;
- private static final String dateFormatPattern = "yyyy-MM-dd HH:mm:ss.SSS z";
- private static SimpleDateFormat simpleDateFormat;
-
- /**
- * the method exports and imports the records from ES to cassandra the flow
- * will check to see if the files are not empty if the files are not empty
- * the export will be skiped and the flow will use the existing files. the
- * flow will check if the tables in cassandra are empty, if the tables are
- * not empty the proces will stop and exit. if the tables are empty the
- * method will import the records from the files. in case of a fail the flow
- * will exit and clear all the Cassandra tables.
- *
- * @param appConfigDir
- * the location of the dir in wich the output files will be
- * stored
- * @param exportFromEs
- * should the es be exported again and overwrite the old export
- * @param importToCassandra
- * should we import the data into cassandra
- * @return true in case the operation was successful.
- */
- public boolean migrateDataESToCassndra(String appConfigDir, boolean exportFromEs, boolean importToCassandra) {
- initFormater();
- if (!initEsClient())
- return false;
- Map<Table, File> files = createOutPutFiles(appConfigDir, exportFromEs);
- if (files == null) {
- return false;
- }
- if (exportFromEs && filesEmpty(files)) {
- Map<Table, PrintWriter> printerWritersMap = createWriters(files);
- if (printerWritersMap == null) {
- return false;
- }
- try {
- ImmutableOpenMap<String, IndexMetaData> indexData = getIndexData();
- for (ObjectCursor<String> key : indexData.keys()) {
- if ("resources".equalsIgnoreCase(key.value)) {
- if (!exportArtifacts(key.value, printerWritersMap)) {
- return false;
- }
- } else if (key.value.startsWith("auditingevents")) {
- if (!exportAudit(key.value, printerWritersMap)) {
- return false;
- }
- }
- }
- } finally {
- if (elasticSearchClient != null) {
- elasticSearchClient.close();
- }
- for (PrintWriter writer : printerWritersMap.values()) {
- writer.close();
- }
- }
- }
- if (importToCassandra && !importToCassndra(files)) {
- return false;
- }
-
- return true;
- }
-
- private void initFormater() {
- simpleDateFormat = new SimpleDateFormat(dateFormatPattern);
- simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
-
- private boolean initEsClient() {
- String configHome = System.getProperty("config.home");
- URL url = null;
- Settings settings = null;
- try {
- if (SystemUtils.IS_OS_WINDOWS) {
- url = new URL("file:///" + configHome + "/elasticsearch.yml");
- } else {
- url = new URL("file:" + configHome + "/elasticsearch.yml");
- }
- log.debug("URL {}", url);
- settings = Settings.settingsBuilder().loadFromPath(Paths.get(url.toURI())).build();
- } catch (MalformedURLException | URISyntaxException e1) {
- log.error("Failed to create URL in order to load elasticsearch yml", e1);
- return true;
- }
-
- this.elasticSearchClient = new ElasticSearchClient();
- this.elasticSearchClient.setClusterName(settings.get("cluster.name"));
- this.elasticSearchClient.setLocal(settings.get("elasticSearch.local"));
- this.elasticSearchClient.setTransportClient(settings.get("elasticSearch.transportclient"));
- try {
- elasticSearchClient.initialize();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
-
- /**
- * the method clears all the cassandra tables
- */
- private void truncateCassandraTable() {
- log.info("import failed. truncating Cassandra tables.");
- artifactCassandraDao.deleteAllArtifacts();
- auditCassandraDao.deleteAllAudit();
- }
-
- /**
- * the method imports the records from the files into cassandra
- *
- * @param files
- * a map of files holding
- * @return true if the operation was successful
- */
- private boolean importToCassndra(Map<Table, File> files) {
- log.info("starting to import date into Cassandra.");
- if (!validtaTablsNotEmpty(files))
- return true;
- for (Table table : files.keySet()) {
- log.info("importing recordes into {}", table.getTableDescription().getTableName());
- if (!handleImport(files, table)) {
- truncateCassandraTable();
- return false;
- }
- }
- log.info("finished to import date into Cassandra.");
- return true;
- }
-
- private boolean validtaTablsNotEmpty(Map<Table, File> files) {
- for (Table table : files.keySet()) {
- Either<Boolean, CassandraOperationStatus> isTableEmptyRes = checkIfTableIsEmpty(table);
- if (isTableEmptyRes.isRight() || !isTableEmptyRes.left().value()) {
- log.error("Cassandra table {} is not empty operation aborted.",
- table.getTableDescription().getTableName());
- return false;
- }
- }
- return true;
- }
-
- /**
- * the method retrieves the fields from the given map and praprs them for
- * storage as an audit according to the table name
- *
- * @param map
- * the map from which we will retrive the fields enum values
- * @param table
- * the table we are going to store the record in.
- * @return a enummap representing the audit record that is going to be
- * created.
- */
- private EnumMap<AuditingFieldsKeysEnum, Object> createAuditMap(Map<String, String> map, Table table) {
- EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = new EnumMap<>(AuditingFieldsKeysEnum.class);
- switch (table) {
- case USER_ADMIN_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_AFTER, map.get("USER_AFTER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_BEFORE, map.get("USER_BEFORE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- break;
- case USER_ACCESS_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_UID, map.get("USER"));
- break;
- case RESOURCE_ADMIN_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_INVARIANT_UUID, map.get("INVARIANT_UUID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_VERSION, map.get("PREV_VERSION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_STATE, map.get("PREV_STATE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DPREV_STATUS, map.get("DPREV_STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DCURR_STATUS, map.get("DCURR_STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TOSCA_NODE_TYPE, map.get("TOSCA_NODE_TYPE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_COMMENT, map.get("COMMENT"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ARTIFACT_DATA, map.get("ARTIFACT_DATA"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_PREV_ARTIFACT_UUID, map.get("PREV_ARTIFACT_UUID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CURR_ARTIFACT_UUID, map.get("CURR_ARTIFACT_UUID"));
- break;
- case DISTRIBUTION_DOWNLOAD_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
- break;
- case DISTRIBUTION_ENGINE_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- if (map.get("TOPIC_NAME") != null) {
- if (map.get("TOPIC_NAME").contains("-STATUS-")) {
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
- map.get("TOPIC_NAME"));
- } else {
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
- map.get("TOPIC_NAME"));
- }
- } else {
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
- map.get("DSTATUS_TOPIC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
- map.get("DNOTIF_TOPIC"));
- }
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ROLE, map.get("ROLE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_API_KEY, map.get("API_KEY"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ENVRIONMENT_NAME, map.get("D_ENV"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
- break;
- case DISTRIBUTION_NOTIFICATION_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
- break;
- case DISTRIBUTION_STATUS_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TIME, map.get("STATUS_TIME"));
- break;
- case DISTRIBUTION_DEPLOY_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
- break;
- case DISTRIBUTION_GET_UEB_CLUSTER_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- if (map.get("STATUS_DESC") != null) {
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("STATUS_DESC"));
- } else {
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- }
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
- break;
- case AUTH_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_USER, map.get("USER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_URL, map.get("URL"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_STATUS, map.get("AUTH_STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_REALM, map.get("REALM"));
- break;
- case CONSUMER_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ECOMP_USER, map.get("ECOMP_USER"));
- break;
- case CATEGORY_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CATEGORY_NAME, map.get("CATEGORY_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SUB_CATEGORY_NAME, map.get("SUB_CATEGORY_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_GROUPING_NAME, map.get("GROUPING_NAME"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
- break;
- case GET_USERS_LIST_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
- break;
- case GET_CATEGORY_HIERARCHY_EVENT:
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
- auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
- break;
- default:
- auditingFields = null;
- break;
- }
- return auditingFields;
- }
-
- /**
- * the method reads the content of the file intended for a given table, and
- * sores them in cassandra
- *
- * @param files
- * a map of files from which the recordes will be retrieved.
- * @param table
- * the name of the table we want to look up in the files and sore
- * in Cassandra // * @param store the function to call when
- * storing recordes in cassndra
- * @return true if the operation was successful
- */
- private boolean handleImport(Map<Table, File> files, Table table) {
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader(files.get(table)));
- String line = null;
- while ((line = br.readLine()) != null) {
- CassandraOperationStatus res = null;
- if (Table.ARTIFACT.equals(table)) {
- res = artifactCassandraDao.saveArtifact(jsonMapper.readValue(line, ESArtifactData.class));
- } else {
- Type type = new TypeToken<Map<String, String>>() {
- }.getType();
- Map<String, String> map = gson.fromJson(line, type);
- EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = createAuditMap(map, table);
- AuditingGenericEvent recordForCassandra = null;
- try {
- recordForCassandra = createAuditRecord(auditingFields);
- } catch (ParseException e) {
- log.error("filed to parse time stemp in recored {}", auditingFields);
- return false;
- }
-
- res = auditCassandraDao.saveRecord(recordForCassandra);
- }
- if (!res.equals(CassandraOperationStatus.OK)) {
- log.error("save recored to cassndra {} failed with status {} aborting.",
- table.getTableDescription().getTableName(), res);
- return false;
- }
- }
- return true;
- } catch (IOException e) {
- log.error("failed to read file", e);
- return false;
- } finally {
- if (br != null) {
- try {
- br.close();
- } catch (IOException e) {
- log.error("failed to close file reader", e);
- }
- }
- }
- }
-
- /**
- * the method checks if the given table is empty
- *
- * @param table
- * the name of the table we want to check
- * @return true if the table is empty
- */
- private Either<Boolean, CassandraOperationStatus> checkIfTableIsEmpty(Table table) {
- if (Table.ARTIFACT.equals(table)) {
- return artifactCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
- } else {
- return auditCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
- }
- }
-
- private boolean filesEmpty(Map<Table, File> files) {
- for (Table table : files.keySet()) {
- File file = files.get(table);
- if (file.length() != 0) {
- log.info("file:{} is not empty skipping export", table.getTableDescription().getTableName());
- return false;
- }
- }
- return true;
- }
-
- /**
- * the method reads the records from es index of audit's into a file as
- * json's.
- *
- * @param value
- * the name of the index we want
- * @param printerWritersMap
- * a map of the writers we use to write to a file.
- * @return true in case the export was successful.
- */
- private boolean exportAudit(String value, Map<Table, PrintWriter> printerWritersMap) {
- log.info("stratng to export audit data from es index{} to file.", value);
- QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
- SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(value).setScroll(new TimeValue(60000))
- .setQuery(queryBuilder).setSize(100).execute().actionGet();
- while (true) {
- for (SearchHit hit : scrollResp.getHits().getHits()) {
- PrintWriter out = printerWritersMap.get(TypeToTableMapping.getTableByType(hit.getType()));
- out.println(hit.getSourceAsString());
- }
- scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
- .setScroll(new TimeValue(60000)).execute().actionGet();
- if (scrollResp.getHits().getHits().length == 0) {
- break;
-
- }
- }
-
- log.info("export audit data from es to file. finished succsesfully");
- return true;
- }
-
- /**
- * the method reads the records from es index of resources into a file as
- * json's.
- *
- * @param index
- * the name of the index we want to read
- * @param printerWritersMap
- * a map of the writers we use to write to a file.
- * @return true in case the export was successful.
- */
- private boolean exportArtifacts(String index, Map<Table, PrintWriter> printerWritersMap) {
- log.info("stratng to export artifact data from es to file.");
- PrintWriter out = printerWritersMap.get(Table.ARTIFACT);
- QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
- SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(index).setScroll(new TimeValue(60000))
- .setQuery(queryBuilder).setSize(100).execute().actionGet();
- while (true) {
- for (SearchHit hit : scrollResp.getHits().getHits()) {
- ;
- out.println(hit.getSourceAsString());
- }
- scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
- .setScroll(new TimeValue(60000)).execute().actionGet();
- if (scrollResp.getHits().getHits().length == 0) {
- break;
-
- }
- }
-
- log.info("export artifact data from es to file. finished succsesfully");
- return true;
- }
-
- /**
- * the method retrieves all the indexes from elasticsearch
- *
- * @return a map of indexes and there metadata
- */
- private ImmutableOpenMap<String, IndexMetaData> getIndexData() {
- return elasticSearchClient.getClient().admin().cluster().prepareState().get().getState().getMetaData()
- .getIndices();
- }
-
- /**
- * the method creates all the files and dir which holds them. in case the
- * files exist they will not be created again.
- *
- * @param appConfigDir
- * the base path under which the output dir will be created and
- * the export result files the created filesa are named according
- * to the name of the table into which it will be imported.
- * @param exportToEs
- * if true all the export files will be recreated
- * @returnthe returns a map of tables and the files representing them them
- */
- private Map<Table, File> createOutPutFiles(String appConfigDir, boolean exportToEs) {
- Map<Table, File> result = new EnumMap<Table, File>(Table.class);
- File outputDir = new File(appConfigDir + "/output/");
- if (!createOutPutFolder(outputDir)) {
- return null;
- }
- for (Table table : Table.values()) {
- File file = new File(outputDir + "/" + table.getTableDescription().getTableName());
- if (exportToEs) {
- try {
- if (file.exists()) {
- Files.delete(file.toPath());
- }
- } catch (IOException e) {
- log.error("failed to delete output file {}", file.getAbsolutePath(), e);
- return null;
- }
- file = new File(outputDir + "/" + table.getTableDescription().getTableName());
- }
- if (!file.exists()) {
- try {
- file.createNewFile();
- } catch (IOException e) {
- log.error("failed to create output file {}", file.getAbsolutePath(), e);
- return null;
- }
- }
- result.put(table, file);
-
- }
- return result;
- }
-
- /**
- * the method create the writers to each file
- *
- * @param files
- * a map of the files according to table
- * @return returns a map of writers according to table.
- */
- private Map<Table, PrintWriter> createWriters(Map<Table, File> files) {
- Map<Table, PrintWriter> printerWritersMap = new EnumMap<>(Table.class);
- try {
- for (Table table : files.keySet()) {
- log.info("creating writer for {}", table);
- File file = files.get(table);
- FileWriter fw = new FileWriter(file, true);
- BufferedWriter bw = new BufferedWriter(fw);
- PrintWriter out = new PrintWriter(bw);
- printerWritersMap.put(table, out);
- log.info("creating writer for {} was successful", table);
- }
- } catch (IOException e) {
- log.error("create writer to file failed", e);
- return null;
- }
- return printerWritersMap;
- }
-
- /**
- * the method creates the output dir in case it does not exist
- *
- * @param outputDir
- * the path under wich the directory will be created.
- * @return true in case the create was succsesful or the dir already exists
- */
- private boolean createOutPutFolder(File outputDir) {
- if (!outputDir.exists()) {
- log.info("creating output dir {}", outputDir.getAbsolutePath());
- try {
- Files.createDirectories(outputDir.toPath());
- } catch (IOException e) {
- log.error("failed to create output dir {}", outputDir.getAbsolutePath(), e);
- return false;
- }
- }
- return true;
- }
-
- public enum TypeToTableMapping {
- USER_ADMIN_EVENT_TYPE(AuditingTypesConstants.USER_ADMIN_EVENT_TYPE,
- Table.USER_ADMIN_EVENT), USER_ACCESS_EVENT_TYPE(AuditingTypesConstants.USER_ACCESS_EVENT_TYPE,
- Table.USER_ACCESS_EVENT), RESOURCE_ADMIN_EVENT_TYPE(
- AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE,
- Table.RESOURCE_ADMIN_EVENT), DISTRIBUTION_DOWNLOAD_EVENT_TYPE(
- AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE,
- Table.DISTRIBUTION_DOWNLOAD_EVENT), DISTRIBUTION_ENGINE_EVENT_TYPE(
- AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE,
- Table.DISTRIBUTION_ENGINE_EVENT), DISTRIBUTION_NOTIFICATION_EVENT_TYPE(
- AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE,
- Table.DISTRIBUTION_NOTIFICATION_EVENT), DISTRIBUTION_STATUS_EVENT_TYPE(
- AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE,
- Table.DISTRIBUTION_STATUS_EVENT), DISTRIBUTION_DEPLOY_EVENT_TYPE(
- AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE,
- Table.DISTRIBUTION_DEPLOY_EVENT), DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE(
- AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE,
- Table.DISTRIBUTION_GET_UEB_CLUSTER_EVENT), AUTH_EVENT_TYPE(
- AuditingTypesConstants.AUTH_EVENT_TYPE,
- Table.AUTH_EVENT), CONSUMER_EVENT_TYPE(
- AuditingTypesConstants.CONSUMER_EVENT_TYPE,
- Table.CONSUMER_EVENT), CATEGORY_EVENT_TYPE(
- AuditingTypesConstants.CATEGORY_EVENT_TYPE,
- Table.CATEGORY_EVENT), GET_USERS_LIST_EVENT_TYPE(
- AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE,
- Table.GET_USERS_LIST_EVENT), GET_CATEGORY_HIERARCHY_EVENT_TYPE(
- AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE,
- Table.GET_CATEGORY_HIERARCHY_EVENT);
-
- String typeName;
- Table table;
-
- TypeToTableMapping(String typeName, Table table) {
- this.typeName = typeName;
- this.table = table;
- }
-
- public String getTypeName() {
- return typeName;
- }
-
- public Table getTable() {
- return table;
- }
-
- public static Table getTableByType(String type) {
- for (TypeToTableMapping mapping : TypeToTableMapping.values()) {
- if (mapping.getTypeName().equalsIgnoreCase(type)) {
- return mapping.getTable();
- }
- }
- return null;
- }
- }
-
- public static AuditingGenericEvent createAuditRecord(EnumMap<AuditingFieldsKeysEnum, Object> auditingFields)
- throws ParseException {
- AuditingActionEnum actionEnum = AuditingActionEnum
- .getActionByName((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_ACTION));
- String tableName = actionEnum.getAuditingEsType();
- AuditingGenericEvent event = null;
- Date date = null;
- switch (tableName) {
- case AuditingTypesConstants.USER_ADMIN_EVENT_TYPE:
- UserAdminEvent userAdminEvent = new UserAdminEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- userAdminEvent.setTimestamp1(date);
- event = userAdminEvent;
- break;
- case AuditingTypesConstants.AUTH_EVENT_TYPE:
- AuthEvent authEvent = new AuthEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- authEvent.setTimestamp1(date);
- event = authEvent;
- break;
- case AuditingTypesConstants.CATEGORY_EVENT_TYPE:
- CategoryEvent categoryEvent = new CategoryEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- categoryEvent.setTimestamp1(date);
- event = categoryEvent;
- break;
- case AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE:
- ResourceAdminEvent resourceAdminEvent = new ResourceAdminEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- resourceAdminEvent.setTimestamp1(date);
- event = resourceAdminEvent;
- break;
- case AuditingTypesConstants.USER_ACCESS_EVENT_TYPE:
- event = new UserAccessEvent(auditingFields);
- UserAccessEvent userAccessEvent = new UserAccessEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- userAccessEvent.setTimestamp1(date);
- event = userAccessEvent;
- break;
- case AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE:
- DistributionStatusEvent distributionStatusEvent = new DistributionStatusEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- distributionStatusEvent.setTimestamp1(date);
- event = distributionStatusEvent;
- break;
- case AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE:
- DistributionDownloadEvent distributionDownloadEvent = new DistributionDownloadEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- distributionDownloadEvent.setTimestamp1(date);
- event = distributionDownloadEvent;
- break;
- case AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE:
- DistributionEngineEvent distributionEngineEvent = new DistributionEngineEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- distributionEngineEvent.setTimestamp1(date);
- event = distributionEngineEvent;
- break;
- case AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE:
- DistributionNotificationEvent distributionNotificationEvent = new DistributionNotificationEvent(
- auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- distributionNotificationEvent.setTimestamp1(date);
- event = distributionNotificationEvent;
- break;
- case AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE:
- DistributionDeployEvent distributionDeployEvent = new DistributionDeployEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- distributionDeployEvent.setTimestamp1(date);
- event = distributionDeployEvent;
- break;
- case AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE:
- AuditingGetUebClusterEvent auditingGetUebClusterEvent = new AuditingGetUebClusterEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- auditingGetUebClusterEvent.setTimestamp1(date);
- event = auditingGetUebClusterEvent;
- break;
- case AuditingTypesConstants.CONSUMER_EVENT_TYPE:
- ConsumerEvent consumerEvent = new ConsumerEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- consumerEvent.setTimestamp1(date);
- event = consumerEvent;
- break;
- case AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE:
- GetUsersListEvent getUsersListEvent = new GetUsersListEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- getUsersListEvent.setTimestamp1(date);
- event = getUsersListEvent;
- break;
- case AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE:
- GetCategoryHierarchyEvent getCategoryHierarchyEvent = new GetCategoryHierarchyEvent(auditingFields);
- date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
- getCategoryHierarchyEvent.setTimestamp1(date);
- event = getCategoryHierarchyEvent;
- break;
-
- }
- return event;
- }
+ private Gson gson = new Gson();
+
+ private ObjectMapper jsonMapper = new ObjectMapper();
+
+ private static Logger log = LoggerFactory.getLogger(DataMigration.class.getName());
+
+ private ElasticSearchClient elasticSearchClient;
+
+ @Autowired
+ protected AuditCassandraDao auditCassandraDao;
+ @Autowired
+ protected ArtifactCassandraDao artifactCassandraDao;
+
+ private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS z";
+ private SimpleDateFormat simpleDateFormat;
+
+ /**
+ * the method exports and imports the records from ES to cassandra the flow
+ * will check to see if the files are not empty if the files are not empty
+ * the export will be skiped and the flow will use the existing files. the
+ * flow will check if the tables in cassandra are empty, if the tables are
+ * not empty the proces will stop and exit. if the tables are empty the
+ * method will import the records from the files. in case of a fail the flow
+ * will exit and clear all the Cassandra tables.
+ *
+ * @param appConfigDir
+ * the location of the dir in wich the output files will be
+ * stored
+ * @param exportFromEs
+ * should the es be exported again and overwrite the old export
+ * @param importToCassandra
+ * should we import the data into cassandra
+ * @return true in case the operation was successful.
+ */
+ public boolean migrateDataEsToCassandra(String appConfigDir, boolean exportFromEs, boolean importToCassandra) {
+ initFormater();
+ if (!initEsClient()) {
+ return false;
+ }
+ Map<Table, File> files = createOutPutFiles(appConfigDir, exportFromEs);
+ if (files == null) {
+ return false;
+ }
+ if (exportFromEs && filesEmpty(files)) {
+ Map<Table, PrintWriter> printerWritersMap = createWriters(files);
+ if (printerWritersMap == null) {
+ return false;
+ }
+ try {
+ ImmutableOpenMap<String, IndexMetaData> indexData = getIndexData();
+ for (ObjectCursor<String> key : indexData.keys()) {
+ if (("resources".equalsIgnoreCase(key.value) && !exportArtifacts(key.value, printerWritersMap))
+ || (key.value.startsWith("auditingevents") && !exportAudit(key.value, printerWritersMap))) {
+ return false;
+ }
+ }
+ } finally {
+ if (elasticSearchClient != null) {
+ elasticSearchClient.close();
+ }
+ for (PrintWriter writer : printerWritersMap.values()) {
+ writer.close();
+ }
+ }
+ }
+ return !importToCassandra || importToCassndra(files);
+ }
+
+ private void initFormater() {
+ simpleDateFormat = new SimpleDateFormat(DATE_FORMAT_PATTERN);
+ simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ private boolean initEsClient() {
+ String configHome = System.getProperty("config.home");
+ URL url;
+ Settings settings;
+ try {
+ if (SystemUtils.IS_OS_WINDOWS) {
+ url = new URL("file:///" + configHome + "/elasticsearch.yml");
+ } else {
+ url = new URL("file:" + configHome + "/elasticsearch.yml");
+ }
+ log.debug("URL {}", url);
+ settings = Settings.settingsBuilder().loadFromPath(Paths.get(url.toURI())).build();
+ } catch (MalformedURLException | URISyntaxException e1) {
+ log.error("Failed to create URL in order to load elasticsearch yml", e1);
+ return true;
+ }
+
+ this.elasticSearchClient = new ElasticSearchClient();
+ this.elasticSearchClient.setClusterName(settings.get("cluster.name"));
+ this.elasticSearchClient.setLocal(settings.get("elasticSearch.local"));
+ this.elasticSearchClient.setTransportClient(settings.get("elasticSearch.transportclient"));
+ try {
+ elasticSearchClient.initialize();
+ } catch (URISyntaxException e) {
+ log.error("Failed to initialize elasticSearchClient", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * the method clears all the cassandra tables.
+ */
+ private void truncateCassandraTable() {
+ log.info("import failed. truncating Cassandra tables.");
+ artifactCassandraDao.deleteAllArtifacts();
+ auditCassandraDao.deleteAllAudit();
+ }
+
+ /**
+ * the method imports the records from the files into cassandra.
+ *
+ * @param files
+ * a map of files holding
+ * @return true if the operation was successful
+ */
+ private boolean importToCassndra(Map<Table, File> files) {
+ log.info("starting to import date into Cassandra.");
+ if (!validtaTablsNotEmpty(files)) {
+ return true;
+ }
+ for (Table table : files.keySet()) {
+ log.info("importing recordes into {}", table.getTableDescription().getTableName());
+ if (!handleImport(files, table)) {
+ truncateCassandraTable();
+ return false;
+ }
+ }
+ log.info("finished to import date into Cassandra.");
+ return true;
+ }
+
+ private boolean validtaTablsNotEmpty(Map<Table, File> files) {
+ for (Table table : files.keySet()) {
+ Either<Boolean, CassandraOperationStatus> isTableEmptyRes = checkIfTableIsEmpty(table);
+ if (isTableEmptyRes.isRight() || !isTableEmptyRes.left().value()) {
+ log.error("Cassandra table {} is not empty operation aborted.",
+ table.getTableDescription().getTableName());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * the method retrieves the fields from the given map and praprs them for
+ * storage as an audit according to the table name
+ *
+ * @param map
+ * the map from which we will retrive the fields enum values
+ * @param table
+ * the table we are going to store the record in.
+ * @return a enummap representing the audit record that is going to be
+ * created.
+ */
+ private EnumMap<AuditingFieldsKeysEnum, Object> createAuditMap(Map<String, String> map, Table table) {
+ EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = new EnumMap<>(AuditingFieldsKeysEnum.class);
+ switch (table) {
+ case USER_ADMIN_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_AFTER, map.get("USER_AFTER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_BEFORE, map.get("USER_BEFORE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ break;
+ case USER_ACCESS_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_UID, map.get("USER"));
+ break;
+ case RESOURCE_ADMIN_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_INVARIANT_UUID, map.get("INVARIANT_UUID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_VERSION, map.get("PREV_VERSION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_STATE, map.get("PREV_STATE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DPREV_STATUS, map.get("DPREV_STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DCURR_STATUS, map.get("DCURR_STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TOSCA_NODE_TYPE, map.get("TOSCA_NODE_TYPE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_COMMENT, map.get("COMMENT"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ARTIFACT_DATA, map.get("ARTIFACT_DATA"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_PREV_ARTIFACT_UUID, map.get("PREV_ARTIFACT_UUID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CURR_ARTIFACT_UUID, map.get("CURR_ARTIFACT_UUID"));
+ break;
+ case DISTRIBUTION_DOWNLOAD_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+ break;
+ case DISTRIBUTION_ENGINE_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ if (map.get("TOPIC_NAME") != null) {
+ if (map.get("TOPIC_NAME").contains("-STATUS-")) {
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
+ map.get("TOPIC_NAME"));
+ } else {
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
+ map.get("TOPIC_NAME"));
+ }
+ } else {
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
+ map.get("DSTATUS_TOPIC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
+ map.get("DNOTIF_TOPIC"));
+ }
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ROLE, map.get("ROLE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_API_KEY, map.get("API_KEY"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ENVRIONMENT_NAME, map.get("D_ENV"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+ break;
+ case DISTRIBUTION_NOTIFICATION_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
+ break;
+ case DISTRIBUTION_STATUS_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TIME, map.get("STATUS_TIME"));
+ break;
+ case DISTRIBUTION_DEPLOY_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
+ break;
+ case DISTRIBUTION_GET_UEB_CLUSTER_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ if (map.get("STATUS_DESC") != null) {
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("STATUS_DESC"));
+ } else {
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ }
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+ break;
+ case AUTH_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_USER, map.get("USER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_URL, map.get("URL"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_STATUS, map.get("AUTH_STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_REALM, map.get("REALM"));
+ break;
+ case CONSUMER_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ECOMP_USER, map.get("ECOMP_USER"));
+ break;
+ case CATEGORY_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CATEGORY_NAME, map.get("CATEGORY_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SUB_CATEGORY_NAME, map.get("SUB_CATEGORY_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_GROUPING_NAME, map.get("GROUPING_NAME"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+ break;
+ case GET_USERS_LIST_EVENT:
+ case GET_CATEGORY_HIERARCHY_EVENT:
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+ auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
+ break;
+ default:
+ auditingFields = null;
+ break;
+ }
+ return auditingFields;
+ }
+
+ /**
+ * the method reads the content of the file intended for a given table, and
+ * sores them in cassandra
+ *
+ * @param files
+ * a map of files from which the recordes will be retrieved.
+ * @param table
+ * the name of the table we want to look up in the files and sore
+ * in Cassandra // * @param store the function to call when
+ * storing recordes in cassndra
+ * @return true if the operation was successful
+ */
+ private boolean handleImport(Map<Table, File> files, Table table) {
+ try (BufferedReader br = new BufferedReader(new FileReader(files.get(table)))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ CassandraOperationStatus res;
+ if (Table.ARTIFACT.equals(table)) {
+ res = artifactCassandraDao.saveArtifact(jsonMapper.readValue(line, ESArtifactData.class));
+ } else {
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ Map<String, String> map = gson.fromJson(line, type);
+ EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = createAuditMap(map, table);
+ AuditingGenericEvent recordForCassandra;
+ try {
+ recordForCassandra = createAuditRecord(auditingFields);
+ } catch (ParseException e) {
+ log.error("filed to parse time stemp in recored {}", auditingFields);
+ return false;
+ }
+ res = auditCassandraDao.saveRecord(recordForCassandra);
+ }
+ if (!res.equals(CassandraOperationStatus.OK)) {
+ log.error("save recored to cassndra {} failed with status {} aborting.",
+ table.getTableDescription().getTableName(), res);
+ return false;
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ log.error("failed to read file", e);
+ return false;
+ }
+ }
+
+ /**
+ * the method checks if the given table is empty.
+ *
+ * @param table
+ * the name of the table we want to check
+ * @return true if the table is empty
+ */
+ private Either<Boolean, CassandraOperationStatus> checkIfTableIsEmpty(Table table) {
+ if (Table.ARTIFACT.equals(table)) {
+ return artifactCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
+ } else {
+ return auditCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
+ }
+ }
+
+ private boolean filesEmpty(Map<Table, File> files) {
+ for (Entry<Table, File> entry : files.entrySet()) {
+ File file = entry.getValue();
+ if (file.length() != 0) {
+ log.info("file:{} is not empty skipping export", entry.getKey().getTableDescription().getTableName());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * the method reads the records from es index of audit's into a file as
+ * json's.
+ *
+ * @param value
+ * the name of the index we want
+ * @param printerWritersMap
+ * a map of the writers we use to write to a file.
+ * @return true in case the export was successful.
+ */
+ private boolean exportAudit(String value, Map<Table, PrintWriter> printerWritersMap) {
+ log.info("stratng to export audit data from es index{} to file.", value);
+ QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
+ SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(value).setScroll(new TimeValue(60000))
+ .setQuery(queryBuilder).setSize(100).execute().actionGet();
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ PrintWriter out = printerWritersMap.get(TypeToTableMapping.getTableByType(hit.getType()));
+ out.println(hit.getSourceAsString());
+ }
+ scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(new TimeValue(60000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+
+ }
+ }
+ log.info("export audit data from es to file. finished succsesfully");
+ return true;
+ }
+
+ /**
+ * the method reads the records from es index of resources into a file as
+ * json's.
+ *
+ * @param index
+ * the name of the index we want to read
+ * @param printerWritersMap
+ * a map of the writers we use to write to a file.
+ * @return true in case the export was successful.
+ */
+ private boolean exportArtifacts(String index, Map<Table, PrintWriter> printerWritersMap) {
+ log.info("stratng to export artifact data from es to file.");
+ PrintWriter out = printerWritersMap.get(Table.ARTIFACT);
+ QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
+ SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(index).setScroll(new TimeValue(60000))
+ .setQuery(queryBuilder).setSize(100).execute().actionGet();
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ out.println(hit.getSourceAsString());
+ }
+ scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(new TimeValue(60000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+ log.info("export artifact data from es to file. finished succsesfully");
+ return true;
+ }
+
+ /**
+ * the method retrieves all the indexes from elasticsearch.
+ *
+ * @return a map of indexes and there metadata
+ */
+ private ImmutableOpenMap<String, IndexMetaData> getIndexData() {
+ return elasticSearchClient.getClient().admin().cluster().prepareState().get().getState().getMetaData()
+ .getIndices();
+ }
+
+ /**
+ * the method creates all the files and dir which holds them. in case the
+ * files exist they will not be created again.
+ *
+ * @param appConfigDir
+ * the base path under which the output dir will be created and
+ * the export result files the created filesa are named according
+ * to the name of the table into which it will be imported.
+ * @param exportToEs
+ * if true all the export files will be recreated
+ * @return returns a map of tables and the files representing them them
+ */
+ private Map<Table, File> createOutPutFiles(String appConfigDir, boolean exportToEs) {
+ Map<Table, File> result = new EnumMap<>(Table.class);
+ File outputDir = new File(appConfigDir + "/output/");
+ if (!createOutPutFolder(outputDir)) {
+ return null;
+ }
+ for (Table table : Table.values()) {
+ File file = new File(outputDir + "/" + table.getTableDescription().getTableName());
+ if (exportToEs) {
+ try {
+ if (file.exists()) {
+ Files.delete(file.toPath());
+ }
+ } catch (IOException e) {
+ log.error("failed to delete output file {}", file.getAbsolutePath(), e);
+ return null;
+ }
+ file = new File(outputDir + "/" + table.getTableDescription().getTableName());
+ }
+ if (!file.exists()) {
+ try {
+ file.createNewFile();
+ } catch (IOException e) {
+ log.error("failed to create output file {}", file.getAbsolutePath(), e);
+ return null;
+ }
+ }
+ result.put(table, file);
+ }
+ return result;
+ }
+
+ /**
+ * the method create the writers to each file
+ *
+ * @param files
+ * a map of the files according to table
+ * @return returns a map of writers according to table.
+ */
+ private Map<Table, PrintWriter> createWriters(Map<Table, File> files) {
+ Map<Table, PrintWriter> printerWritersMap = new EnumMap<>(Table.class);
+ try {
+ for (Entry<Table, File> entry : files.entrySet()) {
+ log.info("creating writer for {}", entry.getKey());
+ File file = entry.getValue();
+ FileWriter fw = new FileWriter(file, true);
+ BufferedWriter bw = new BufferedWriter(fw);
+ PrintWriter out = new PrintWriter(bw);
+ printerWritersMap.put(entry.getKey(), out);
+ log.info("creating writer for {} was successful", entry.getKey());
+ }
+ } catch (IOException e) {
+ log.error("create writer to file failed", e);
+ return null;
+ }
+ return printerWritersMap;
+ }
+
+ /**
+ * the method creates the output dir in case it does not exist
+ *
+ * @param outputDir
+ * the path under wich the directory will be created.
+ * @return true in case the create was succsesful or the dir already exists
+ */
+ private boolean createOutPutFolder(File outputDir) {
+ if (!outputDir.exists()) {
+ log.info("creating output dir {}", outputDir.getAbsolutePath());
+ try {
+ Files.createDirectories(outputDir.toPath());
+ } catch (IOException e) {
+ log.error("failed to create output dir {}", outputDir.getAbsolutePath(), e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public enum TypeToTableMapping {
+ USER_ADMIN_EVENT_TYPE(AuditingTypesConstants.USER_ADMIN_EVENT_TYPE,
+ Table.USER_ADMIN_EVENT), USER_ACCESS_EVENT_TYPE(AuditingTypesConstants.USER_ACCESS_EVENT_TYPE,
+ Table.USER_ACCESS_EVENT), RESOURCE_ADMIN_EVENT_TYPE(
+ AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE,
+ Table.RESOURCE_ADMIN_EVENT), DISTRIBUTION_DOWNLOAD_EVENT_TYPE(
+ AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE,
+ Table.DISTRIBUTION_DOWNLOAD_EVENT), DISTRIBUTION_ENGINE_EVENT_TYPE(
+ AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE,
+ Table.DISTRIBUTION_ENGINE_EVENT), DISTRIBUTION_NOTIFICATION_EVENT_TYPE(
+ AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE,
+ Table.DISTRIBUTION_NOTIFICATION_EVENT), DISTRIBUTION_STATUS_EVENT_TYPE(
+ AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE,
+ Table.DISTRIBUTION_STATUS_EVENT), DISTRIBUTION_DEPLOY_EVENT_TYPE(
+ AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE,
+ Table.DISTRIBUTION_DEPLOY_EVENT), DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE(
+ AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE,
+ Table.DISTRIBUTION_GET_UEB_CLUSTER_EVENT), AUTH_EVENT_TYPE(
+ AuditingTypesConstants.AUTH_EVENT_TYPE,
+ Table.AUTH_EVENT), CONSUMER_EVENT_TYPE(
+ AuditingTypesConstants.CONSUMER_EVENT_TYPE,
+ Table.CONSUMER_EVENT), CATEGORY_EVENT_TYPE(
+ AuditingTypesConstants.CATEGORY_EVENT_TYPE,
+ Table.CATEGORY_EVENT), GET_USERS_LIST_EVENT_TYPE(
+ AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE,
+ Table.GET_USERS_LIST_EVENT), GET_CATEGORY_HIERARCHY_EVENT_TYPE(
+ AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE,
+ Table.GET_CATEGORY_HIERARCHY_EVENT);
+
+ String typeName;
+ Table table;
+
+ TypeToTableMapping(String typeName, Table table) {
+ this.typeName = typeName;
+ this.table = table;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public static Table getTableByType(String type) {
+ for (TypeToTableMapping mapping : TypeToTableMapping.values()) {
+ if (mapping.getTypeName().equalsIgnoreCase(type)) {
+ return mapping.getTable();
+ }
+ }
+ return null;
+ }
+ }
+
+ public AuditingGenericEvent createAuditRecord(EnumMap<AuditingFieldsKeysEnum, Object> auditingFields)
+ throws ParseException {
+ AuditingActionEnum actionEnum = AuditingActionEnum
+ .getActionByName((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_ACTION));
+ String tableName = actionEnum.getAuditingEsType();
+ AuditingGenericEvent event;
+ Date date;
+ switch (tableName) {
+ case AuditingTypesConstants.USER_ADMIN_EVENT_TYPE:
+ UserAdminEvent userAdminEvent = new UserAdminEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ userAdminEvent.setTimestamp1(date);
+ event = userAdminEvent;
+ break;
+ case AuditingTypesConstants.AUTH_EVENT_TYPE:
+ AuthEvent authEvent = new AuthEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ authEvent.setTimestamp1(date);
+ event = authEvent;
+ break;
+ case AuditingTypesConstants.CATEGORY_EVENT_TYPE:
+ CategoryEvent categoryEvent = new CategoryEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ categoryEvent.setTimestamp1(date);
+ event = categoryEvent;
+ break;
+ case AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE:
+ ResourceAdminEvent resourceAdminEvent = new ResourceAdminEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ resourceAdminEvent.setTimestamp1(date);
+ event = resourceAdminEvent;
+ break;
+ case AuditingTypesConstants.USER_ACCESS_EVENT_TYPE:
+ UserAccessEvent userAccessEvent = new UserAccessEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ userAccessEvent.setTimestamp1(date);
+ event = userAccessEvent;
+ break;
+ case AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE:
+ DistributionStatusEvent distributionStatusEvent = new DistributionStatusEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ distributionStatusEvent.setTimestamp1(date);
+ event = distributionStatusEvent;
+ break;
+ case AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE:
+ DistributionDownloadEvent distributionDownloadEvent = new DistributionDownloadEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ distributionDownloadEvent.setTimestamp1(date);
+ event = distributionDownloadEvent;
+ break;
+ case AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE:
+ DistributionEngineEvent distributionEngineEvent = new DistributionEngineEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ distributionEngineEvent.setTimestamp1(date);
+ event = distributionEngineEvent;
+ break;
+ case AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE:
+ DistributionNotificationEvent distributionNotificationEvent = new DistributionNotificationEvent(
+ auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ distributionNotificationEvent.setTimestamp1(date);
+ event = distributionNotificationEvent;
+ break;
+ case AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE:
+ DistributionDeployEvent distributionDeployEvent = new DistributionDeployEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ distributionDeployEvent.setTimestamp1(date);
+ event = distributionDeployEvent;
+ break;
+ case AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE:
+ AuditingGetUebClusterEvent auditingGetUebClusterEvent = new AuditingGetUebClusterEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ auditingGetUebClusterEvent.setTimestamp1(date);
+ event = auditingGetUebClusterEvent;
+ break;
+ case AuditingTypesConstants.CONSUMER_EVENT_TYPE:
+ ConsumerEvent consumerEvent = new ConsumerEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ consumerEvent.setTimestamp1(date);
+ event = consumerEvent;
+ break;
+ case AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE:
+ GetUsersListEvent getUsersListEvent = new GetUsersListEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ getUsersListEvent.setTimestamp1(date);
+ event = getUsersListEvent;
+ break;
+ case AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE:
+ GetCategoryHierarchyEvent getCategoryHierarchyEvent = new GetCategoryHierarchyEvent(auditingFields);
+ date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+ getCategoryHierarchyEvent.setTimestamp1(date);
+ event = getCategoryHierarchyEvent;
+ break;
+ default:
+ event = null;
+ break;
+ }
+ return event;
+ }
}
diff --git a/asdctool/src/main/java/org/openecomp/sdc/asdctool/main/EsToCassandraDataMigrationMenu.java b/asdctool/src/main/java/org/openecomp/sdc/asdctool/main/EsToCassandraDataMigrationMenu.java
index d563f2cf33..0bac1df353 100644
--- a/asdctool/src/main/java/org/openecomp/sdc/asdctool/main/EsToCassandraDataMigrationMenu.java
+++ b/asdctool/src/main/java/org/openecomp/sdc/asdctool/main/EsToCassandraDataMigrationMenu.java
@@ -55,7 +55,7 @@ public class EsToCassandraDataMigrationMenu {
case "es-to-cassndra-migration":
dataMigration = (DataMigration) context.getBean("DataMigrationBean");
log.debug("Start migration from ES to C* ");
- if (dataMigration.migrateDataESToCassndra(appConfigDir, true, true)) {
+ if (dataMigration.migrateDataEsToCassandra(appConfigDir, true, true)) {
log.debug("migration from ES to C* was finished successfull");
System.exit(0);
} else {
@@ -66,7 +66,7 @@ public class EsToCassandraDataMigrationMenu {
case "es-to-cassndra-migration-export-only":
dataMigration = (DataMigration) context.getBean("DataMigrationBean");
log.debug("Start migration export only from ES to C* ");
- if (dataMigration.migrateDataESToCassndra(appConfigDir, true, false)) {
+ if (dataMigration.migrateDataEsToCassandra(appConfigDir, true, false)) {
log.debug("migration export only from ES to C* was finished successfull");
System.exit(0);
} else {
@@ -77,7 +77,7 @@ public class EsToCassandraDataMigrationMenu {
case "es-to-cassndra-migration-import-only":
dataMigration = (DataMigration) context.getBean("DataMigrationBean");
log.debug("Start migration import only from ES to C* ");
- if (dataMigration.migrateDataESToCassndra(appConfigDir, false, true)) {
+ if (dataMigration.migrateDataEsToCassandra(appConfigDir, false, true)) {
log.debug("migration import only from ES to C* was finished successfull");
System.exit(0);
} else {