summaryrefslogtreecommitdiffstats
path: root/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java')
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java46
1 files changed, 46 insertions, 0 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
index 52de8e4..444eafb 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
@@ -23,8 +23,10 @@ package org.onap.aai.sparky.sync;
import java.util.EnumSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
@@ -37,10 +39,13 @@ import org.onap.aai.sparky.dal.rest.HttpMethod;
import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
import org.onap.aai.sparky.logging.AaiUiMsgs;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
import org.onap.aai.sparky.util.NodeUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+
/**
* The Class AbstractEntitySynchronizer.
*
@@ -48,6 +53,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
*/
public abstract class AbstractEntitySynchronizer {
+ protected static final Logger LOG =
+ LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class);
protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
@@ -517,4 +524,43 @@ public abstract class AbstractEntitySynchronizer {
esEntityStats.reset();
}
+
+ protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) {
+ String link = null;
+ try {
+ /*
+ * In this retry flow the se object has already derived its fields
+ */
+ link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id);
+ } catch (Exception exc) {
+ LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
+ }
+
+ if (link != null) {
+ NetworkTransaction retryTransaction = new NetworkTransaction();
+ retryTransaction.setLink(link);
+ retryTransaction.setEntityType(txn.getEntityType());
+ retryTransaction.setDescriptor(txn.getDescriptor());
+ retryTransaction.setOperationType(HttpMethod.GET);
+
+ /*
+ * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
+ * called incrementAndGet when queuing the failed PUT!
+ */
+
+ supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
+ esExecutor).whenComplete((result, error) -> {
+
+ esWorkOnHand.decrementAndGet();
+
+ if (error != null) {
+ LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
+ } else {
+ updateElasticSearchCounters(result);
+ networkTransactionConsumer.accept(result);
+ }
+ });
+ }
+ }
+
}