diff options
5 files changed, 60 insertions, 141 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java index b4c254a..67015c5 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; import org.onap.aai.cl.api.Logger; @@ -232,46 +233,14 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer AggregationEntity ae = rsc.getAggregationEntity(); NetworkTransaction txn = rsc.getNetworkTransaction(); - String link = null; - try { - /* - * In this retry flow the se object has already derived its fields - */ - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); - } - - if (link != null) { - NetworkTransaction retryTransaction = new NetworkTransaction(); - retryTransaction.setLink(link); - retryTransaction.setEntityType(txn.getEntityType()); - retryTransaction.setDescriptor(txn.getDescriptor()); - retryTransaction.setOperationType(HttpMethod.GET); - - /* - * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already - * called incrementAndGet when queuing the failed PUT! - */ - - supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), - esExecutor).whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); - } else { - updateElasticSearchCounters(result); - performDocumentUpsert(result, ae); - } - }); - } + final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, ae); + performRetrySync(ae.getId(), networkTransactionConsumer, txn); } } } - + + /** * Perform document upsert. * diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java index 6f8299c..74ee4ea 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; import org.onap.aai.cl.api.Logger; @@ -615,41 +616,8 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer SuggestionSearchEntity sus = susc.getSuggestionSearchEntity(); NetworkTransaction txn = susc.getNetworkTransaction(); - String link = null; - try { - /* - * In this retry flow the se object has already derived its fields - */ - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId()); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); - } - - if (link != null) { - NetworkTransaction retryTransaction = new NetworkTransaction(); - retryTransaction.setLink(link); - retryTransaction.setEntityType(txn.getEntityType()); - retryTransaction.setDescriptor(txn.getDescriptor()); - retryTransaction.setOperationType(HttpMethod.GET); - - /* - * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already - * called incrementAndGet when queuing the failed PUT! - */ - - supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), - esExecutor).whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); - } else { - updateElasticSearchCounters(result); - performDocumentUpsert(result, sus); - } - }); - } + final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, sus); + performRetrySync(sus.getId(), networkTransactionConsumer, txn); } } diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java index a817eb9..2087fa3 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import java.util.function.Supplier; import org.onap.aai.cl.api.Logger; @@ -44,7 +45,6 @@ import org.onap.aai.sparky.config.oxm.CrossEntityReferenceLookup; import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor; import org.onap.aai.sparky.config.oxm.OxmEntityLookup; import org.onap.aai.sparky.config.oxm.SearchableEntityLookup; -import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; import org.onap.aai.sparky.dal.NetworkTransaction; import org.onap.aai.sparky.dal.rest.HttpMethod; @@ -824,40 +824,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference(); NetworkTransaction txn = rsc.getNetworkTransaction(); - String link = null; - try { - // In this retry flow the icer object has already - // derived its fields - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId()); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); - } - - if (link != null) { - NetworkTransaction retryTransaction = new NetworkTransaction(); - retryTransaction.setLink(link); - retryTransaction.setEntityType(txn.getEntityType()); - retryTransaction.setDescriptor(txn.getDescriptor()); - retryTransaction.setOperationType(HttpMethod.GET); - - /* - * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did - * that for this request already when queuing the failed PUT! - */ - - supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), - esExecutor).whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); - } else { - updateElasticSearchCounters(result); - performDocumentUpsert(result, icer); - } - }); - } + final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result,icer); + performRetrySync(icer.getId(), networkTransactionConsumer, txn); } } diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java index 52de8e4..444eafb 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java @@ -23,8 +23,10 @@ package org.onap.aai.sparky.sync; import java.util.EnumSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.restclient.client.OperationResult; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; @@ -37,10 +39,13 @@ import org.onap.aai.sparky.dal.rest.HttpMethod; import org.onap.aai.sparky.dal.rest.RestOperationalStatistics; import org.onap.aai.sparky.logging.AaiUiMsgs; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; +import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; import org.onap.aai.sparky.util.NodeUtils; import com.fasterxml.jackson.databind.ObjectMapper; +import static java.util.concurrent.CompletableFuture.supplyAsync; + /** * The Class AbstractEntitySynchronizer. * @@ -48,6 +53,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; */ public abstract class AbstractEntitySynchronizer { + protected static final Logger LOG = + LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class); protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409; protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3); @@ -517,4 +524,43 @@ public abstract class AbstractEntitySynchronizer { esEntityStats.reset(); } + + protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) { + String link = null; + try { + /* + * In this retry flow the se object has already derived its fields + */ + link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id); + } catch (Exception exc) { + LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); + } + + if (link != null) { + NetworkTransaction retryTransaction = new NetworkTransaction(); + retryTransaction.setLink(link); + retryTransaction.setEntityType(txn.getEntityType()); + retryTransaction.setDescriptor(txn.getDescriptor()); + retryTransaction.setOperationType(HttpMethod.GET); + + /* + * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already + * called incrementAndGet when queuing the failed PUT! + */ + + supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), + esExecutor).whenComplete((result, error) -> { + + esWorkOnHand.decrementAndGet(); + + if (error != null) { + LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); + } else { + updateElasticSearchCounters(result); + networkTransactionConsumer.accept(result); + } + }); + } + } + } diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java index 285a76b..8365237 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import java.util.function.Supplier; import org.onap.aai.cl.api.Logger; @@ -661,41 +662,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer SearchableEntity se = rsc.getSearchableEntity(); NetworkTransaction txn = rsc.getNetworkTransaction(); - String link = null; - try { - /* - * In this retry flow the se object has already derived its fields - */ - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId()); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); - } - - if (link != null) { - NetworkTransaction retryTransaction = new NetworkTransaction(); - retryTransaction.setLink(link); - retryTransaction.setEntityType(txn.getEntityType()); - retryTransaction.setDescriptor(txn.getDescriptor()); - retryTransaction.setOperationType(HttpMethod.GET); - - /* - * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already - * called incrementAndGet when queuing the failed PUT! - */ - - supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), - esExecutor).whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage()); - } else { - updateElasticSearchCounters(result); - performDocumentUpsert(result, se); - } - }); - } + final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, se); + performRetrySync(se.getId(), networkTransactionConsumer, txn); } } |