summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/sparky/synchronizer/AbstractEntitySynchronizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/sparky/synchronizer/AbstractEntitySynchronizer.java')
-rw-r--r--src/main/java/org/openecomp/sparky/synchronizer/AbstractEntitySynchronizer.java559
1 files changed, 559 insertions, 0 deletions
diff --git a/src/main/java/org/openecomp/sparky/synchronizer/AbstractEntitySynchronizer.java b/src/main/java/org/openecomp/sparky/synchronizer/AbstractEntitySynchronizer.java
new file mode 100644
index 0000000..14ea149
--- /dev/null
+++ b/src/main/java/org/openecomp/sparky/synchronizer/AbstractEntitySynchronizer.java
@@ -0,0 +1,559 @@
+/**
+ * ============LICENSE_START===================================================
+ * SPARKY (AAI UI service)
+ * ============================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ *
+ * ECOMP and OpenECOMP are trademarks
+ * and service marks of AT&T Intellectual Property.
+ */
+
+package org.openecomp.sparky.synchronizer;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.openecomp.cl.api.Logger;
+import org.openecomp.sparky.config.oxm.OxmModelLoader;
+import org.openecomp.sparky.dal.NetworkTransaction;
+import org.openecomp.sparky.dal.aai.ActiveInventoryDataProvider;
+import org.openecomp.sparky.dal.aai.ActiveInventoryEntityStatistics;
+import org.openecomp.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
+import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
+import org.openecomp.sparky.dal.elasticsearch.ElasticSearchDataProvider;
+import org.openecomp.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
+import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
+import org.openecomp.sparky.dal.rest.HttpMethod;
+import org.openecomp.sparky.dal.rest.OperationResult;
+import org.openecomp.sparky.dal.rest.RestOperationalStatistics;
+import org.openecomp.sparky.logging.AaiUiMsgs;
+import org.openecomp.sparky.util.NodeUtils;
+
+import org.openecomp.cl.mdc.MdcContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * The Class AbstractEntitySynchronizer.
+ *
+ * @author davea.
+ */
+public abstract class AbstractEntitySynchronizer {
+
+ protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
+ protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
+
+ protected final Logger logger;
+ protected ObjectMapper mapper;
+ protected OxmModelLoader oxmModelLoader;
+
+ /**
+ * The Enum StatFlag.
+ */
+ protected enum StatFlag {
+ AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
+ AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
+ }
+
+ protected EnumSet<StatFlag> enabledStatFlags;
+
+ protected ActiveInventoryDataProvider aaiDataProvider;
+ protected ElasticSearchDataProvider esDataProvider;
+
+ protected ExecutorService synchronizerExecutor;
+ protected ExecutorService aaiExecutor;
+ protected ExecutorService esExecutor;
+
+ private RestOperationalStatistics esRestStats;
+ protected ElasticSearchEntityStatistics esEntityStats;
+
+ private RestOperationalStatistics aaiRestStats;
+ protected ActiveInventoryEntityStatistics aaiEntityStats;
+ private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
+
+ private TaskProcessingStats aaiTaskProcessingStats;
+ private TaskProcessingStats esTaskProcessingStats;
+
+ private TransactionRateController aaiTransactionRateController;
+ private TransactionRateController esTransactionRateController;
+
+ protected AtomicInteger aaiWorkOnHand;
+ protected AtomicInteger esWorkOnHand;
+ protected String synchronizerName;
+
+ protected abstract boolean isSyncDone();
+
+ public String getActiveInventoryStatisticsReport() {
+
+ StringBuilder sb = new StringBuilder(128);
+
+ if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
+ sb.append("\n\n ").append("REST Operational Stats:");
+ sb.append(aaiRestStats.getStatisticsReport());
+ }
+
+ if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
+ sb.append("\n\n ").append("Entity Stats:");
+ sb.append(aaiEntityStats.getStatisticsReport());
+ }
+
+ if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
+ sb.append("\n\n ").append("Processing Exception Stats:");
+ sb.append(aaiProcessingExceptionStats.getStatisticsReport());
+ }
+
+ return sb.toString();
+
+ }
+
+ public String getElasticSearchStatisticsReport() {
+
+ StringBuilder sb = new StringBuilder(128);
+
+ if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
+ sb.append("\n\n ").append("REST Operational Stats:");
+ sb.append(esRestStats.getStatisticsReport());
+ }
+
+ if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
+ sb.append("\n\n ").append("Entity Stats:");
+ sb.append(esEntityStats.getStatisticsReport());
+ }
+
+ return sb.toString();
+
+ }
+
+ /**
+ * Adds the active inventory stat report.
+ *
+ * @param sb the sb
+ */
+ private void addActiveInventoryStatReport(StringBuilder sb) {
+
+ if (sb == null) {
+ return;
+ }
+
+ sb.append("\n\n AAI");
+ sb.append(getActiveInventoryStatisticsReport());
+
+ double currentTps = 0;
+ if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
+ sb.append("\n\n ").append("Task Processor Stats:");
+ sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
+
+ currentTps = aaiTransactionRateController.getCurrentTps();
+
+ sb.append("\n ").append("Current TPS: ").append(currentTps);
+ }
+
+ sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
+
+ if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
+ if (currentTps > 0) {
+ double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
+ sb.append("\n ").append("SyncDurationRemaining=")
+ .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
+ }
+ }
+
+ }
+
+ /**
+ * Adds the elastic stat report.
+ *
+ * @param sb the sb
+ */
+ private void addElasticStatReport(StringBuilder sb) {
+
+ if (sb == null) {
+ return;
+ }
+
+ sb.append("\n\n ELASTIC");
+ sb.append(getElasticSearchStatisticsReport());
+
+ double currentTps = 0;
+
+ if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
+ sb.append("\n\n ").append("Task Processor Stats:");
+ sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
+
+ currentTps = esTransactionRateController.getCurrentTps();
+
+ sb.append("\n ").append("Current TPS: ").append(currentTps);
+ }
+
+ sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
+
+ if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
+ if (currentTps > 0) {
+ double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
+ sb.append("\n ").append("SyncDurationRemaining=")
+ .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
+ }
+ }
+
+
+ }
+
+ /**
+ * Gets the stat report.
+ *
+ * @param syncOpTimeInMs the sync op time in ms
+ * @param showFinalReport the show final report
+ * @return the stat report
+ */
+ protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
+
+ StringBuilder sb = new StringBuilder(128);
+
+ sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
+ + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
+
+ addActiveInventoryStatReport(sb);
+ addElasticStatReport(sb);
+
+ if (showFinalReport) {
+ sb.append("\n\n ").append("Sync Completed!\n");
+ } else {
+ sb.append("\n\n ").append("Sync in Progress...\n");
+ }
+
+ return sb.toString();
+
+ }
+
+ protected String indexName;
+ protected long syncStartedTimeStampInMs;
+
+ /**
+ * Instantiates a new abstract entity synchronizer.
+ *
+ * @param logger the logger
+ * @param syncName the sync name
+ * @param numSyncWorkers the num sync workers
+ * @param numActiveInventoryWorkers the num active inventory workers
+ * @param numElasticsearchWorkers the num elasticsearch workers
+ * @param indexName the index name
+ * @throws Exception the exception
+ */
+ protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
+ int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
+ throws Exception {
+ this.logger = logger;
+ this.synchronizerExecutor =
+ NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
+ this.aaiExecutor =
+ NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
+ this.esExecutor =
+ NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
+ this.mapper = new ObjectMapper();
+ this.oxmModelLoader = OxmModelLoader.getInstance();
+ this.indexName = indexName;
+ this.esRestStats = new RestOperationalStatistics();
+ this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
+ this.aaiRestStats = new RestOperationalStatistics();
+ this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
+ this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
+ this.aaiTaskProcessingStats =
+ new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
+ this.esTaskProcessingStats =
+ new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
+
+ this.aaiTransactionRateController =
+ new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
+ this.esTransactionRateController =
+ new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
+
+ this.aaiWorkOnHand = new AtomicInteger(0);
+ this.esWorkOnHand = new AtomicInteger(0);
+
+ enabledStatFlags = EnumSet.allOf(StatFlag.class);
+
+ this.synchronizerName = "Abstact Entity Synchronizer";
+
+ String txnID = NodeUtils.getRandomTxnId();
+ MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
+
+ }
+
+ /**
+ * Inc active inventory work on hand counter.
+ */
+ protected void incActiveInventoryWorkOnHandCounter() {
+ aaiWorkOnHand.incrementAndGet();
+ }
+
+ /**
+ * Dec active inventory work on hand counter.
+ */
+ protected void decActiveInventoryWorkOnHandCounter() {
+ aaiWorkOnHand.decrementAndGet();
+ }
+
+ /**
+ * Inc elastic search work on hand counter.
+ */
+ protected void incElasticSearchWorkOnHandCounter() {
+ esWorkOnHand.incrementAndGet();
+ }
+
+ /**
+ * Dec elastic search work on hand counter.
+ */
+ protected void decElasticSearchWorkOnHandCounter() {
+ esWorkOnHand.decrementAndGet();
+ }
+
+ /**
+ * Shutdown executors.
+ */
+ protected void shutdownExecutors() {
+ try {
+ synchronizerExecutor.shutdown();
+ aaiExecutor.shutdown();
+ esExecutor.shutdown();
+ aaiDataProvider.shutdown();
+ esDataProvider.shutdown();
+ } catch (Exception exc) {
+ logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
+ }
+ }
+
+ /**
+ * Clear cache.
+ */
+ public void clearCache() {
+ if (aaiDataProvider != null) {
+ aaiDataProvider.clearCache();
+ }
+ }
+
+ protected ActiveInventoryDataProvider getAaiDataProvider() {
+ return aaiDataProvider;
+ }
+
+ public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
+ this.aaiDataProvider = aaiDataProvider;
+ }
+
+ protected ElasticSearchDataProvider getEsDataProvider() {
+ return esDataProvider;
+ }
+
+ public void setEsDataProvider(ElasticSearchDataProvider provider) {
+ this.esDataProvider = provider;
+ }
+
+ /**
+ * Gets the elastic full url.
+ *
+ * @param resourceUrl the resource url
+ * @param indexName the index name
+ * @param indexType the index type
+ * @return the elastic full url
+ * @throws Exception the exception
+ */
+ protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
+ throws Exception {
+ return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
+ }
+
+ /**
+ * Gets the elastic full url.
+ *
+ * @param resourceUrl the resource url
+ * @param indexName the index name
+ * @return the elastic full url
+ * @throws Exception the exception
+ */
+ protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
+ return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
+
+
+ /**
+ * Gets the response length.
+ *
+ * @param txn the txn
+ * @return the response length
+ */
+ private long getResponseLength(NetworkTransaction txn) {
+
+ if (txn == null) {
+ return -1;
+ }
+
+ OperationResult result = txn.getOperationResult();
+
+ if (result == null) {
+ return -1;
+ }
+
+ if (result.getResult() != null) {
+ return result.getResult().length();
+ }
+
+ return -1;
+ }
+
+ /**
+ * Update elastic search counters.
+ *
+ * @param method the method
+ * @param or the or
+ */
+ protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
+ updateElasticSearchCounters(new NetworkTransaction(method, null, or));
+ }
+
+ /**
+ * Update elastic search counters.
+ *
+ * @param method the method
+ * @param entityType the entity type
+ * @param or the or
+ */
+ protected void updateElasticSearchCounters(HttpMethod method, String entityType,
+ OperationResult or) {
+ updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
+ }
+
+ /**
+ * Update elastic search counters.
+ *
+ * @param txn the txn
+ */
+ protected void updateElasticSearchCounters(NetworkTransaction txn) {
+
+ if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
+ esRestStats.updateCounters(txn);
+ }
+
+ if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
+ esEntityStats.updateCounters(txn);
+ }
+
+ if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
+
+ esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
+
+ esTaskProcessingStats
+ .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
+ esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
+
+ // don't know the cost of the lengh calc, we'll see if it causes a
+ // problem
+
+ long responsePayloadSizeInBytes = getResponseLength(txn);
+ if (responsePayloadSizeInBytes >= 0) {
+ esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
+ }
+
+ esTaskProcessingStats
+ .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
+ }
+ }
+
+ /**
+ * Update active inventory counters.
+ *
+ * @param method the method
+ * @param or the or
+ */
+ protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
+ updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
+ }
+
+ /**
+ * Update active inventory counters.
+ *
+ * @param method the method
+ * @param entityType the entity type
+ * @param or the or
+ */
+ protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
+ OperationResult or) {
+ updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
+ }
+
+ /**
+ * Update active inventory counters.
+ *
+ * @param txn the txn
+ */
+ protected void updateActiveInventoryCounters(NetworkTransaction txn) {
+
+ if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
+ aaiRestStats.updateCounters(txn);
+ }
+
+ if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
+ aaiEntityStats.updateCounters(txn);
+ }
+
+ if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
+ aaiProcessingExceptionStats.updateCounters(txn);
+ }
+
+ if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
+ aaiTransactionRateController
+ .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
+
+ aaiTaskProcessingStats
+ .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
+ aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
+
+ // don't know the cost of the lengh calc, we'll see if it causes a
+ // problem
+
+ long responsePayloadSizeInBytes = getResponseLength(txn);
+ if (responsePayloadSizeInBytes >= 0) {
+ aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
+ }
+
+ aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
+ (long) aaiTransactionRateController.getCurrentTps());
+ }
+ }
+
+ /**
+ * Reset counters.
+ */
+ protected void resetCounters() {
+ aaiRestStats.reset();
+ aaiEntityStats.reset();
+ aaiProcessingExceptionStats.reset();
+
+ esRestStats.reset();
+ esEntityStats.reset();
+ }
+
+}