diff options
Diffstat (limited to 'sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java')
-rw-r--r-- | sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java index 52de8e4..444eafb 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java @@ -23,8 +23,10 @@ package org.onap.aai.sparky.sync; import java.util.EnumSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.restclient.client.OperationResult; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; @@ -37,10 +39,13 @@ import org.onap.aai.sparky.dal.rest.HttpMethod; import org.onap.aai.sparky.dal.rest.RestOperationalStatistics; import org.onap.aai.sparky.logging.AaiUiMsgs; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; +import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; import org.onap.aai.sparky.util.NodeUtils; import com.fasterxml.jackson.databind.ObjectMapper; +import static java.util.concurrent.CompletableFuture.supplyAsync; + /** * The Class AbstractEntitySynchronizer. * @@ -48,6 +53,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; */ public abstract class AbstractEntitySynchronizer { + protected static final Logger LOG = + LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class); protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409; protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3); @@ -517,4 +524,43 @@ public abstract class AbstractEntitySynchronizer { esEntityStats.reset(); } + + protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) { + String link = null; + try { + /* + * In this retry flow the se object has already derived its fields + */ + link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id); + } catch (Exception exc) { + LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); + } + + if (link != null) { + NetworkTransaction retryTransaction = new NetworkTransaction(); + retryTransaction.setLink(link); + retryTransaction.setEntityType(txn.getEntityType()); + retryTransaction.setDescriptor(txn.getDescriptor()); + retryTransaction.setOperationType(HttpMethod.GET); + + /* + * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already + * called incrementAndGet when queuing the failed PUT! + */ + + supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), + esExecutor).whenComplete((result, error) -> { + + esWorkOnHand.decrementAndGet(); + + if (error != null) { + LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); + } else { + updateElasticSearchCounters(result); + networkTransactionConsumer.accept(result); + } + }); + } + } + } |