summaryrefslogtreecommitdiffstats
path: root/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java')
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java41
1 files changed, 5 insertions, 36 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
index b4c254a..67015c5 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
@@ -232,46 +233,14 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
AggregationEntity ae = rsc.getAggregationEntity();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, ae);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, ae);
+ performRetrySync(ae.getId(), networkTransactionConsumer, txn);
}
}
}
-
+
+
/**
* Perform document upsert.
*