diff options
Diffstat (limited to 'src/main/java/org/openecomp/sparky/synchronizer/AggregationSuggestionSynchronizer.java')
-rw-r--r-- | src/main/java/org/openecomp/sparky/synchronizer/AggregationSuggestionSynchronizer.java | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/main/java/org/openecomp/sparky/synchronizer/AggregationSuggestionSynchronizer.java b/src/main/java/org/openecomp/sparky/synchronizer/AggregationSuggestionSynchronizer.java new file mode 100644 index 0000000..0337f6a --- /dev/null +++ b/src/main/java/org/openecomp/sparky/synchronizer/AggregationSuggestionSynchronizer.java @@ -0,0 +1,187 @@ +/** + * ============LICENSE_START=================================================== + * SPARKY (AAI UI service) + * ============================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ + +package org.openecomp.sparky.synchronizer; + +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import org.openecomp.cl.mdc.MdcContext; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.openecomp.cl.api.Logger; +import org.openecomp.cl.eelf.LoggerFactory; +import org.openecomp.sparky.dal.NetworkTransaction; +import org.openecomp.sparky.dal.rest.HttpMethod; +import org.openecomp.sparky.dal.rest.OperationResult; +import org.openecomp.sparky.logging.AaiUiMsgs; +import org.openecomp.sparky.synchronizer.entity.AggregationSuggestionEntity; +import org.openecomp.sparky.synchronizer.enumeration.OperationState; +import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState; +import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut; +import org.openecomp.sparky.util.NodeUtils; +import org.slf4j.MDC; + +public class AggregationSuggestionSynchronizer extends AbstractEntitySynchronizer + implements IndexSynchronizer { + + private static final Logger LOG = + LoggerFactory.getInstance().getLogger(AggregationSuggestionSynchronizer.class); + + private boolean isSyncInProgress; + private boolean shouldPerformRetry; + private Map<String, String> contextMap; + protected ExecutorService esPutExecutor; + + public AggregationSuggestionSynchronizer(String indexName) throws Exception { + super(LOG, "ASS-" + indexName.toUpperCase(), 2, 5, 5, indexName); + + this.isSyncInProgress = false; + this.shouldPerformRetry = false; + this.synchronizerName = "Aggregation Suggestion Synchronizer"; + this.contextMap = MDC.getCopyOfContextMap(); + this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG); + } + + @Override + protected boolean isSyncDone() { + int totalWorkOnHand = esWorkOnHand.get(); + + if (LOG.isDebugEnabled()) { + LOG.debug(AaiUiMsgs.DEBUG_GENERIC, + indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand); + } + + if (totalWorkOnHand > 0 || !isSyncInProgress) { + return false; + } + + return true; + } + + @Override + public OperationState doSync() { + isSyncInProgress = true; + + syncEntity(); + + while (!isSyncDone()) { + try { + if (shouldPerformRetry) { + syncEntity(); + } + Thread.sleep(1000); + } catch (Exception exc) { + // We don't care about this exception + } + } + + return OperationState.OK; + } + + private void syncEntity() { + String txnId = NodeUtils.getRandomTxnId(); + MdcContext.initialize(txnId, "AggregationSuggestionSynchronizer", "", "Sync", ""); + + AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(); + syncEntity.deriveFields(); + + String link = null; + try { + link = getElasticFullUrl("/" + syncEntity.getId(), getIndexName()); + } catch (Exception exc) { + LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); + } + + try { + String jsonPayload = null; + jsonPayload = syncEntity.getIndexDocumentJson(); + if (link != null && jsonPayload != null) { + + NetworkTransaction elasticPutTxn = new NetworkTransaction(); + elasticPutTxn.setLink(link); + elasticPutTxn.setOperationType(HttpMethod.PUT); + + esWorkOnHand.incrementAndGet(); + final Map<String, String> contextMap = MDC.getCopyOfContextMap(); + supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn, + esDataProvider, contextMap), esPutExecutor).whenComplete((result, error) -> { + + esWorkOnHand.decrementAndGet(); + + if (error != null) { + String message = "Aggregation suggestion entity sync UPDATE PUT error - " + + error.getLocalizedMessage(); + LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message); + } else { + updateElasticSearchCounters(result); + wasEsOperationSuccessful(result); + } + }); + } + } catch (Exception exc) { + String message = + "Exception caught during aggregation suggestion entity sync PUT operation. Message - " + + exc.getLocalizedMessage(); + LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message); + } + } + + private void wasEsOperationSuccessful(NetworkTransaction result) { + if (result != null) { + OperationResult opResult = result.getOperationResult(); + + if (!opResult.wasSuccessful()) { + shouldPerformRetry = true; + } else { + isSyncInProgress = false; + shouldPerformRetry = false; + } + } + } + + @Override + public SynchronizerState getState() { + if (!isSyncDone()) { + return SynchronizerState.PERFORMING_SYNCHRONIZATION; + } + + return SynchronizerState.IDLE; + } + + @Override + public String getStatReport(boolean shouldDisplayFinalReport) { + return getStatReport(System.currentTimeMillis() - this.syncStartedTimeStampInMs, + shouldDisplayFinalReport); + } + + @Override + public void shutdown() { + this.shutdownExecutors(); + } +} |