summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java41
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java38
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java38
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java46
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java38
5 files changed, 60 insertions, 141 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
index b4c254a..67015c5 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
@@ -232,46 +233,14 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
AggregationEntity ae = rsc.getAggregationEntity();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, ae);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, ae);
+ performRetrySync(ae.getId(), networkTransactionConsumer, txn);
}
}
}
-
+
+
/**
* Perform document upsert.
*
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java
index 6f8299c..74ee4ea 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
@@ -615,41 +616,8 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
NetworkTransaction txn = susc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, sus);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, sus);
+ performRetrySync(sus.getId(), networkTransactionConsumer, txn);
}
}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java
index a817eb9..2087fa3 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
@@ -44,7 +45,6 @@ import org.onap.aai.sparky.config.oxm.CrossEntityReferenceLookup;
import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
-import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.onap.aai.sparky.dal.rest.HttpMethod;
@@ -824,40 +824,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- // In this retry flow the icer object has already
- // derived its fields
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
- * that for this request already when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, icer);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result,icer);
+ performRetrySync(icer.getId(), networkTransactionConsumer, txn);
}
}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
index 52de8e4..444eafb 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
@@ -23,8 +23,10 @@ package org.onap.aai.sparky.sync;
import java.util.EnumSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
@@ -37,10 +39,13 @@ import org.onap.aai.sparky.dal.rest.HttpMethod;
import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
import org.onap.aai.sparky.logging.AaiUiMsgs;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
import org.onap.aai.sparky.util.NodeUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+
/**
* The Class AbstractEntitySynchronizer.
*
@@ -48,6 +53,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
*/
public abstract class AbstractEntitySynchronizer {
+ protected static final Logger LOG =
+ LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class);
protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
@@ -517,4 +524,43 @@ public abstract class AbstractEntitySynchronizer {
esEntityStats.reset();
}
+
+ protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) {
+ String link = null;
+ try {
+ /*
+ * In this retry flow the se object has already derived its fields
+ */
+ link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id);
+ } catch (Exception exc) {
+ LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
+ }
+
+ if (link != null) {
+ NetworkTransaction retryTransaction = new NetworkTransaction();
+ retryTransaction.setLink(link);
+ retryTransaction.setEntityType(txn.getEntityType());
+ retryTransaction.setDescriptor(txn.getDescriptor());
+ retryTransaction.setOperationType(HttpMethod.GET);
+
+ /*
+ * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
+ * called incrementAndGet when queuing the failed PUT!
+ */
+
+ supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
+ esExecutor).whenComplete((result, error) -> {
+
+ esWorkOnHand.decrementAndGet();
+
+ if (error != null) {
+ LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
+ } else {
+ updateElasticSearchCounters(result);
+ networkTransactionConsumer.accept(result);
+ }
+ });
+ }
+ }
+
}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java
index 285a76b..8365237 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
@@ -661,41 +662,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
SearchableEntity se = rsc.getSearchableEntity();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, se);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, se);
+ performRetrySync(se.getId(), networkTransactionConsumer, txn);
}
}