diff options
Diffstat (limited to 'sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java')
-rw-r--r-- | sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java | 41 |
1 files changed, 5 insertions, 36 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java index b4c254a..67015c5 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; import org.onap.aai.cl.api.Logger; @@ -232,46 +233,14 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer AggregationEntity ae = rsc.getAggregationEntity(); NetworkTransaction txn = rsc.getNetworkTransaction(); - String link = null; - try { - /* - * In this retry flow the se object has already derived its fields - */ - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); - } - - if (link != null) { - NetworkTransaction retryTransaction = new NetworkTransaction(); - retryTransaction.setLink(link); - retryTransaction.setEntityType(txn.getEntityType()); - retryTransaction.setDescriptor(txn.getDescriptor()); - retryTransaction.setOperationType(HttpMethod.GET); - - /* - * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already - * called incrementAndGet when queuing the failed PUT! - */ - - supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), - esExecutor).whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); - } else { - updateElasticSearchCounters(result); - performDocumentUpsert(result, ae); - } - }); - } + final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, ae); + performRetrySync(ae.getId(), networkTransactionConsumer, txn); } } } - + + /** * Perform document upsert. * |