summaryrefslogtreecommitdiffstats
path: root/sparkybe-onap-service/src/main
diff options
context:
space:
mode:
authorrenealr <reneal.rogers@amdocs.com>2018-08-24 09:20:37 -0400
committerrenealr <reneal.rogers@amdocs.com>2018-08-24 11:25:20 -0400
commit11b1339ca114835bf60356b4061f1ff3f3fd3e8b (patch)
tree6965f83294a0bd4d7a01935dbacf3bc3e61df147 /sparkybe-onap-service/src/main
parent3f9bce9e9d5c7f779a20d289811a88babfbda3cf (diff)
update sync queries to use searh data service
Issue-ID: AAI-1540 Change-Id: I6a00b0e12830e1220070ae60ba9b2c42e67dfbfc Signed-off-by: renealr <reneal.rogers@amdocs.com>
Diffstat (limited to 'sparkybe-onap-service/src/main')
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java43
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java30
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java380
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java96
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java19
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java16
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java8
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java20
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java30
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java153
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java139
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java176
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java285
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java215
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java403
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java61
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java16
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java18
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java600
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java36
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java68
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServicePut.java (renamed from sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchPut.java)22
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceRetrieval.java (renamed from sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchRetrieval.java)16
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceUpdate.java (renamed from sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchUpdate.java)14
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java86
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java97
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java477
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java38
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java28
-rw-r--r--sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java26
30 files changed, 241 insertions, 3375 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java
index 9c3c1d9..7fd7e9c 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java
@@ -30,17 +30,15 @@ import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner;
import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory;
-import org.onap.aai.sparky.sync.IndexCleaner;
import org.onap.aai.sparky.sync.IndexIntegrityValidator;
import org.onap.aai.sparky.sync.SyncController;
import org.onap.aai.sparky.sync.SyncControllerImpl;
import org.onap.aai.sparky.sync.SyncControllerRegistrar;
import org.onap.aai.sparky.sync.SyncControllerRegistry;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
import org.onap.aai.sparky.sync.config.SyncControllerConfig;
@@ -51,13 +49,13 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar
LoggerFactory.getInstance().getLogger(AggregationSyncControllerFactory.class);
private ActiveInventoryAdapter aaiAdapter;
- private ElasticSearchAdapter esAdapter;
+ private SearchServiceAdapter searchServiceAdapter;
private SuggestionEntityLookup suggestionEntityLookup;
private Map<String, String> aggregationEntityToIndexMap;
private Map<String, ElasticSearchSchemaConfig> indexNameToSchemaConfigMap;
- private ElasticSearchEndpointConfig elasticSearchEndpointConfig;
+ private RestEndpointConfig endpointConfig;
private SyncControllerConfig syncControllerConfig;
private SyncControllerRegistry syncControllerRegistry;
private NetworkStatisticsConfig aaiStatConfig;
@@ -67,14 +65,14 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar
private List<SyncController> syncControllers;
- public AggregationSyncControllerFactory(ElasticSearchEndpointConfig esEndpointConfig,
+ public AggregationSyncControllerFactory(RestEndpointConfig endpointConfig,
SyncControllerConfig syncControllerConfig, SyncControllerRegistry syncControllerRegistry,
SuggestionEntityLookup suggestionEntityLookup,
OxmEntityLookup oxmEntityLookup,
ElasticSearchSchemaFactory elasticSearchSchemaFactory) {
this.elasticSearchSchemaFactory = elasticSearchSchemaFactory;
this.syncControllers = new ArrayList<SyncController>();
- this.elasticSearchEndpointConfig = esEndpointConfig;
+ this.endpointConfig = endpointConfig;
this.syncControllerConfig = syncControllerConfig;
this.syncControllerRegistry = syncControllerRegistry;
this.suggestionEntityLookup = suggestionEntityLookup;
@@ -106,13 +104,13 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar
this.indexNameToSchemaConfigMap = indexNameToSchemaConfigMap;
}
- public ElasticSearchEndpointConfig getElasticSearchEndpointConfig() {
- return elasticSearchEndpointConfig;
+ public RestEndpointConfig getEndpointConfig() {
+ return endpointConfig;
}
- public void setElasticSearchEndpointConfig(
- ElasticSearchEndpointConfig elasticSearchEndpointConfig) {
- this.elasticSearchEndpointConfig = elasticSearchEndpointConfig;
+ public void setEndpointConfig(
+ RestEndpointConfig endpointConfig) {
+ this.endpointConfig = endpointConfig;
}
public SyncControllerConfig getSyncControllerConfig() {
@@ -131,12 +129,12 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar
this.aaiAdapter = aaiAdapter;
}
- public ElasticSearchAdapter getEsAdapter() {
- return esAdapter;
+ public SearchServiceAdapter getSearchServiceAdapter() {
+ return searchServiceAdapter;
}
- public void setEsAdapter(ElasticSearchAdapter esAdapter) {
- this.esAdapter = esAdapter;
+ public void setSearchServiceAdapter(SearchServiceAdapter searchServiceAdapter) {
+ this.searchServiceAdapter = searchServiceAdapter;
}
public SuggestionEntityLookup getSuggestionEntityLookup() {
@@ -185,8 +183,8 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar
continue;
}
- IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(esAdapter,
- schemaConfig, elasticSearchEndpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
+ IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(searchServiceAdapter,
+ schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
aggregationSyncController.registerIndexValidator(aggregationIndexValidator);
@@ -197,15 +195,10 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar
oxmEntityLookup);
aggSynchronizer.setAaiAdapter(aaiAdapter);
- aggSynchronizer.setElasticSearchAdapter(esAdapter);
+ aggSynchronizer.setSearchServiceAdapter(searchServiceAdapter);
aggregationSyncController.registerEntitySynchronizer(aggSynchronizer);
- IndexCleaner entityDataIndexCleaner =
- new ElasticSearchIndexCleaner(esAdapter, elasticSearchEndpointConfig, schemaConfig);
-
- aggregationSyncController.registerIndexCleaner(entityDataIndexCleaner);
-
syncControllers.add(aggregationSyncController);
} catch (Exception exc) {
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
index 67015c5..0b9733f 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java
@@ -56,9 +56,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
import org.onap.aai.sparky.util.NodeUtils;
import org.slf4j.MDC;
@@ -263,7 +263,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
*/
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), ae.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
@@ -326,9 +326,9 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
if (wasEntryDiscovered) {
if (versionNumber != null && jsonPayload != null) {
- String requestPayload =
- elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
- schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
+ String requestPayload =
+ searchServiceAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
+ ae.getId(),jsonPayload);
NetworkTransaction transactionTracker = new NetworkTransaction();
transactionTracker.setEntityType(esGetTxn.getEntityType());
@@ -336,9 +336,9 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
transactionTracker.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
- requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
+ requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -363,8 +363,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
- esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+ esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -525,7 +525,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), ae.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
}
@@ -539,8 +539,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java
deleted file mode 100644
index c6cd3b1..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.aggregation.sync;
-
-import static java.util.concurrent.CompletableFuture.supplyAsync;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
-import javax.json.Json;
-import javax.ws.rs.core.MediaType;
-
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
-import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
-import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
-import org.onap.aai.sparky.dal.rest.HttpMethod;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
-import org.onap.aai.sparky.sync.IndexSynchronizer;
-import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
-import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
-import org.onap.aai.sparky.sync.enumeration.OperationState;
-import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
-import org.onap.aai.sparky.util.NodeUtils;
-import org.slf4j.MDC;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-/**
- * The Class HistoricalEntitySummarizer.
- */
-public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
- implements IndexSynchronizer {
-
- private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
- private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
-
- private boolean allWorkEnumerated;
- private ConcurrentHashMap<String, AtomicInteger> entityCounters;
- private boolean syncInProgress;
- private Map<String, String> contextMap;
- private ElasticSearchSchemaConfig schemaConfig;
- private SearchableEntityLookup searchableEntityLookup;
-
- /**
- * Instantiates a new historical entity summarizer.
- *
- * @throws Exception the exception
- */
- public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
- int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
- NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
- throws Exception {
- super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
-
- this.schemaConfig = schemaConfig;
- this.allWorkEnumerated = false;
- this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
- this.synchronizerName = "Historical Entity Summarizer";
- this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
- this.syncInProgress = false;
- this.contextMap = MDC.getCopyOfContextMap();
- this.syncDurationInMs = -1;
- this.searchableEntityLookup = searchableEntityLookup;
- }
-
- /**
- * Collect all the work.
- *
- * @return the operation state
- */
- private OperationState collectAllTheWork() {
-
- Map<String, SearchableOxmEntityDescriptor> descriptorMap =
- searchableEntityLookup.getSearchableEntityDescriptors();
-
- if (descriptorMap.isEmpty()) {
- LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
-
- return OperationState.ERROR;
- }
-
- Collection<String> entityTypes = descriptorMap.keySet();
-
- AtomicInteger asyncWoH = new AtomicInteger(0);
-
- asyncWoH.set(entityTypes.size());
-
- try {
- for (String entityType : entityTypes) {
-
- supplyAsync(new Supplier<Void>() {
-
- @Override
- public Void get() {
- MDC.setContextMap(contextMap);
- try {
- OperationResult typeLinksResult =
- aaiAdapter.getSelfLinksByEntityType(entityType);
- updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
- processEntityTypeSelfLinks(entityType, typeLinksResult);
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
-
- }
-
- return null;
- }
-
- }, aaiExecutor).whenComplete((result, error) -> {
-
- asyncWoH.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
- }
-
- });
-
- }
-
-
- while (asyncWoH.get() > 0) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
- }
-
- Thread.sleep(250);
- }
-
- esWorkOnHand.set(entityCounters.size());
-
- // start doing the real work
- allWorkEnumerated = true;
-
- insertEntityTypeCounters();
-
- if (LOG.isDebugEnabled()) {
-
- StringBuilder sb = new StringBuilder(128);
-
- sb.append("\n\nHistorical Entity Counters:");
-
- for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
- sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
- }
-
- LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
-
- }
-
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
-
-
- esWorkOnHand.set(0);
- allWorkEnumerated = true;
-
- return OperationState.ERROR;
- }
-
- return OperationState.OK;
-
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
- */
- @Override
- public OperationState doSync() {
- this.syncDurationInMs = -1;
- String txnID = NodeUtils.getRandomTxnId();
- MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
-
- if (syncInProgress) {
- LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
- return OperationState.PENDING;
- }
-
- clearCache();
-
- syncInProgress = true;
- this.syncStartedTimeStampInMs = System.currentTimeMillis();
- allWorkEnumerated = false;
-
- return collectAllTheWork();
- }
-
- /**
- * Process entity type self links.
- *
- * @param entityType the entity type
- * @param operationResult the operation result
- */
- private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
-
- JsonNode rootNode = null;
-
- final String jsonResult = operationResult.getResult();
-
- if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
-
- try {
- rootNode = mapper.readTree(jsonResult);
- } catch (IOException exc) {
- LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
- return;
- }
-
- JsonNode resultData = rootNode.get("result-data");
- ArrayNode resultDataArrayNode = null;
-
- if (resultData != null && resultData.isArray()) {
- resultDataArrayNode = (ArrayNode) resultData;
- entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
- }
- }
-
- }
-
- /**
- * Insert entity type counters.
- */
- private void insertEntityTypeCounters() {
-
- if (esWorkOnHand.get() <= 0) {
- return;
- }
-
- SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
- Timestamp timestamp = new Timestamp(System.currentTimeMillis());
- String currentFormattedTimeStamp = dateFormat.format(timestamp);
-
- Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
-
- for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
-
- supplyAsync(new Supplier<Void>() {
-
- @Override
- public Void get() {
- MDC.setContextMap(contextMap);
- String jsonString = Json.createObjectBuilder().add(
- "count", entityCounterEntry.getValue().get())
- .add("entityType", entityCounterEntry.getKey())
- .add("timestamp", currentFormattedTimeStamp).build().toString();
-
- String link = null;
- try {
- link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
- OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
- updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
- }
-
- return null;
- }
-
- }, esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- });
-
- }
-
- while (esWorkOnHand.get() > 0) {
-
- try {
- Thread.sleep(500);
- } catch (InterruptedException exc) {
- LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
- Thread.currentThread().interrupt();
- }
- }
-
- }
-
- @Override
- public SynchronizerState getState() {
-
- if (!isSyncDone()) {
- return SynchronizerState.PERFORMING_SYNCHRONIZATION;
- }
-
- return SynchronizerState.IDLE;
-
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
- */
- @Override
- public String getStatReport(boolean showFinalReport) {
- syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
- return this.getStatReport(syncDurationInMs, showFinalReport);
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
- */
- @Override
- public void shutdown() {
- this.shutdownExecutors();
- }
-
- @Override
- protected boolean isSyncDone() {
-
- int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
- + " all work enumerated = " + allWorkEnumerated);
- }
-
- if (totalWorkOnHand > 0 || !allWorkEnumerated) {
- return false;
- }
-
- this.syncInProgress = false;
-
- return true;
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
- */
- @Override
- public void clearCache() {
-
- if (syncInProgress) {
- LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
- return;
- }
-
- super.clearCache();
- this.resetCounters();
- if (entityCounters != null) {
- entityCounters.clear();
- }
-
- allWorkEnumerated = false;
-
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java
deleted file mode 100644
index da1f290..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.aggregation.sync;
-
-import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
-import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory;
-import org.onap.aai.sparky.sync.IndexIntegrityValidator;
-import org.onap.aai.sparky.sync.SyncControllerImpl;
-import org.onap.aai.sparky.sync.SyncControllerRegistrar;
-import org.onap.aai.sparky.sync.SyncControllerRegistry;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
-import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
-import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
-import org.onap.aai.sparky.sync.config.SyncControllerConfig;
-
-import java.util.concurrent.TimeUnit;
-
-public class HistoricalEntitySyncController extends SyncControllerImpl
- implements SyncControllerRegistrar {
-
- private SyncControllerRegistry syncControllerRegistry;
-
- public HistoricalEntitySyncController(SyncControllerConfig syncControllerConfig,
- ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
- ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig,
- int syncFrequencyInMinutes, NetworkStatisticsConfig aaiStatConfig,
- NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup,
- ElasticSearchSchemaFactory elasticSearchSchemaFactory) throws Exception {
- super(syncControllerConfig);
-
- // final String controllerName = "Historical Entity Count Synchronizer";
-
- long taskFrequencyInMs = getTaskFrequencyInMs(syncFrequencyInMinutes);
-
- setDelayInMs(taskFrequencyInMs);
- setSyncFrequencyInMs(taskFrequencyInMs);
-
- IndexIntegrityValidator entityCounterHistoryValidator = new IndexIntegrityValidator(esAdapter,
- schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
-
- registerIndexValidator(entityCounterHistoryValidator);
-
- HistoricalEntitySummarizer historicalSummarizer = new HistoricalEntitySummarizer(schemaConfig,
- syncControllerConfig.getNumInternalSyncWorkers(),
- syncControllerConfig.getNumSyncActiveInventoryWorkers(),
- syncControllerConfig.getNumSyncElasticWorkers(),aaiStatConfig, esStatConfig,searchableEntityLookup);
-
- historicalSummarizer.setAaiAdapter(aaiAdapter);
- historicalSummarizer.setElasticSearchAdapter(esAdapter);
-
- registerEntitySynchronizer(historicalSummarizer);
-
- }
-
- static long getTaskFrequencyInMs(int syncFrequencyInMinutes) {
- return TimeUnit.MINUTES.toMillis(Integer.toUnsignedLong(syncFrequencyInMinutes));
- }
-
- public SyncControllerRegistry getSyncControllerRegistry() {
- return syncControllerRegistry;
- }
-
- public void setSyncControllerRegistry(SyncControllerRegistry syncControllerRegistry) {
- this.syncControllerRegistry = syncControllerRegistry;
- }
-
- @Override
- public void registerController() {
- if ( syncControllerRegistry != null ) {
- if ( syncControllerConfig.isEnabled()) {
- syncControllerRegistry.registerSyncController(this);
- }
- }
-
- }
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java
index 54b82fb..6596a9d 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java
@@ -23,16 +23,14 @@ package org.onap.aai.sparky.autosuggestion.sync;
import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.search.filters.config.FiltersConfig;
-import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner;
import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory;
-import org.onap.aai.sparky.sync.IndexCleaner;
import org.onap.aai.sparky.sync.IndexIntegrityValidator;
import org.onap.aai.sparky.sync.SyncControllerImpl;
import org.onap.aai.sparky.sync.SyncControllerRegistrar;
import org.onap.aai.sparky.sync.SyncControllerRegistry;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
import org.onap.aai.sparky.sync.config.SyncControllerConfig;
@@ -43,8 +41,8 @@ public class AutoSuggestionSyncController extends SyncControllerImpl implements
private SyncControllerRegistry syncControllerRegistry;
public AutoSuggestionSyncController(SyncControllerConfig syncControllerConfig,
- ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
- ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig,
+ ActiveInventoryAdapter aaiAdapter, SearchServiceAdapter searchServiceAdapter,
+ ElasticSearchSchemaConfig schemaConfig, RestEndpointConfig endpointConfig,
NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
OxmEntityLookup oxmEntityLookup, SuggestionEntityLookup suggestionEntityLookup,
FiltersConfig filtersConfig,
@@ -53,7 +51,7 @@ public class AutoSuggestionSyncController extends SyncControllerImpl implements
// final String controllerName = "Auto Suggestion Synchronizer";
- IndexIntegrityValidator autoSuggestionIndexValidator = new IndexIntegrityValidator(esAdapter,
+ IndexIntegrityValidator autoSuggestionIndexValidator = new IndexIntegrityValidator(searchServiceAdapter,
schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
registerIndexValidator(autoSuggestionIndexValidator);
@@ -65,15 +63,10 @@ public class AutoSuggestionSyncController extends SyncControllerImpl implements
oxmEntityLookup, suggestionEntityLookup, filtersConfig);
suggestionSynchronizer.setAaiAdapter(aaiAdapter);
- suggestionSynchronizer.setElasticSearchAdapter(esAdapter);
+ suggestionSynchronizer.setSearchServiceAdapter(searchServiceAdapter);
registerEntitySynchronizer(suggestionSynchronizer);
- IndexCleaner autosuggestIndexCleaner =
- new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig);
-
- registerIndexCleaner(autosuggestIndexCleaner);
-
}
public SyncControllerRegistry getSyncControllerRegistry() {
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java
index 74ee4ea..583f260 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java
@@ -62,8 +62,8 @@ import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
import org.onap.aai.sparky.util.NodeUtils;
import org.onap.aai.sparky.util.SuggestionsPermutation;
import org.slf4j.MDC;
@@ -434,7 +434,7 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
if (sse.isSuggestableDoc()) {
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), sse.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
}
@@ -448,8 +448,8 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -515,7 +515,7 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
*/
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), sse.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
@@ -552,8 +552,8 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
- esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+ esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java
index fe3ecd0..7501e5d 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java
@@ -40,7 +40,7 @@ import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
import org.onap.aai.sparky.util.NodeUtils;
import org.slf4j.MDC;
@@ -119,7 +119,7 @@ public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), syncEntity.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), syncEntity.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
}
@@ -135,8 +135,8 @@ public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
esWorkOnHand.incrementAndGet();
final Map<String, String> contextMap = MDC.getCopyOfContextMap();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn,
- elasticSearchAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, elasticPutTxn,
+ searchServiceAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java
index c48ee5a..62183fc 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java
@@ -21,16 +21,14 @@
package org.onap.aai.sparky.autosuggestion.sync;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.search.filters.config.FiltersConfig;
-import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner;
import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory;
-import org.onap.aai.sparky.sync.IndexCleaner;
import org.onap.aai.sparky.sync.IndexIntegrityValidator;
import org.onap.aai.sparky.sync.SyncControllerImpl;
import org.onap.aai.sparky.sync.SyncControllerRegistrar;
import org.onap.aai.sparky.sync.SyncControllerRegistry;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
import org.onap.aai.sparky.sync.config.SyncControllerConfig;
@@ -40,8 +38,8 @@ public class VnfAliasSyncController extends SyncControllerImpl implements SyncCo
private SyncControllerRegistry syncControllerRegistry;
public VnfAliasSyncController(SyncControllerConfig syncControllerConfig,
- ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
- ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig,
+ ActiveInventoryAdapter aaiAdapter, SearchServiceAdapter searchServiceAdapter,
+ ElasticSearchSchemaConfig schemaConfig, RestEndpointConfig endpointConfig,
NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
FiltersConfig filtersConfig,
ElasticSearchSchemaFactory elasticSearchSchemaFactory) throws Exception {
@@ -49,7 +47,7 @@ public class VnfAliasSyncController extends SyncControllerImpl implements SyncCo
// final String controllerName = "VNFs Alias Suggestion Synchronizer";
- IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(esAdapter, schemaConfig,
+ IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(searchServiceAdapter, schemaConfig,
endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
registerIndexValidator(indexValidator);
@@ -60,16 +58,10 @@ public class VnfAliasSyncController extends SyncControllerImpl implements SyncCo
syncControllerConfig.getNumSyncElasticWorkers(), aaiStatConfig, esStatConfig, filtersConfig);
synchronizer.setAaiAdapter(aaiAdapter);
- synchronizer.setElasticSearchAdapter(esAdapter);
+ synchronizer.setSearchServiceAdapter(searchServiceAdapter);
registerEntitySynchronizer(synchronizer);
-
- IndexCleaner indexCleaner =
- new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig);
-
- registerIndexCleaner(indexCleaner);
-
}
public SyncControllerRegistry getSyncControllerRegistry() {
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java
index 2087fa3..cf7908b 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java
@@ -60,9 +60,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
import org.onap.aai.sparky.util.NodeUtils;
import org.slf4j.MDC;
@@ -575,8 +575,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
String link = null;
try {
- link = elasticSearchAdapter
- .buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
+ link = searchServiceAdapter
+ .buildSearchServiceDocUrl(getIndexName(), icer.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
exc.getLocalizedMessage());
@@ -592,8 +592,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
esWorkOnHand.incrementAndGet();
supplyAsync(
- new PerformElasticSearchRetrieval(n2, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
+ new PerformSearchServiceRetrieval(n2, searchServiceAdapter),
+ esExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -669,7 +669,7 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
*/
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), icer.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
@@ -728,8 +728,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
if (wasEntryDiscovered) {
if (versionNumber != null && jsonPayload != null) {
- String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
- "default", icer.getId(), versionNumber, jsonPayload);
+ String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
+ icer.getId(), jsonPayload);
NetworkTransaction transactionTracker = new NetworkTransaction();
transactionTracker.setEntityType(esGetResult.getEntityType());
@@ -737,9 +737,9 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
transactionTracker.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
- requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
+ requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -762,8 +762,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
- esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+ esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java
deleted file mode 100644
index 3072b91..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.dal;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.ws.rs.core.MediaType;
-
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.restclient.client.RestClient;
-import org.onap.aai.sparky.dal.rest.RestClientConstructionException;
-import org.onap.aai.sparky.dal.rest.RestClientFactory;
-import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
-
-/**
- * The Class ElasticSearchAdapter.
-
- */
-public class ElasticSearchAdapter {
-
- private static final String BULK_IMPORT_INDEX_TEMPLATE =
- "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
-
- private static final String BULK_API = "_bulk";
-
- private static final String DEFAULT_TYPE = "default";
-
- private RestClient restClient;
- private RestEndpointConfig endpointConfig;
-
- /**
- * Instantiates a new elastic search adapter.
- * @throws RestClientConstructionException
- */
- public ElasticSearchAdapter(RestEndpointConfig endpointConfig) throws RestClientConstructionException {
-
- this.restClient = RestClientFactory.buildClient(endpointConfig);
- this.endpointConfig = endpointConfig;
-
- }
-
- protected Map<String, List<String>> getMessageHeaders() {
- Map<String, List<String>> headers = new HashMap<String, List<String>>();
- // insert mandatory headers if there are any
- return headers;
- }
-
- public OperationResult doGet(String url, MediaType acceptContentType) {
- return restClient.get(url, getMessageHeaders(), acceptContentType);
- }
-
- public OperationResult doDelete(String url, MediaType acceptContentType) {
- return restClient.delete(url, getMessageHeaders(), acceptContentType);
- }
-
- public OperationResult doPost(String url, String jsonPayload, MediaType acceptContentType) {
- return restClient.post(url, jsonPayload, getMessageHeaders(), MediaType.APPLICATION_JSON_TYPE,
- acceptContentType);
- }
-
- public OperationResult doPut(String url, String jsonPayload, MediaType acceptContentType) {
- return restClient.put(url, jsonPayload, getMessageHeaders(), MediaType.APPLICATION_JSON_TYPE,
- acceptContentType);
- }
-
- public OperationResult doPatch(String url, String jsonPayload, MediaType acceptContentType) {
-
- Map<String,List<String>> headers = getMessageHeaders();
- headers.putIfAbsent("X-HTTP-Method-Override", new ArrayList<String>());
- headers.get("X-HTTP-Method-Override").add("PATCH");
-
- return restClient.post(url, jsonPayload, headers, MediaType.APPLICATION_JSON_TYPE, acceptContentType);
- }
-
- public OperationResult doHead(String url, MediaType acceptContentType) {
- return restClient.head(url, getMessageHeaders(), acceptContentType);
- }
-
- public OperationResult doBulkOperation(String url, String payload) {
- return restClient.put(url, payload, getMessageHeaders(),
- MediaType.APPLICATION_FORM_URLENCODED_TYPE, MediaType.APPLICATION_JSON_TYPE);
- }
-
- public String buildBulkImportOperationRequest(String index, String type, String id,
- String version, String payload) {
-
- StringBuilder requestPayload = new StringBuilder(128);
-
- requestPayload.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, index, type, id, version));
- requestPayload.append(payload).append("\n");
-
- return requestPayload.toString();
-
- }
-
- public OperationResult retrieveEntityById(String host, String port, String indexName,
- String docType, String resourceUrl) {
- String esUrl =
- String.format("http://%s:%s/%s/%s/%s", host, port, indexName, docType, resourceUrl);
- return doGet(esUrl, MediaType.APPLICATION_JSON_TYPE);
- }
-
- public String buildElasticSearchUrlForApi(String indexName, String api) {
- return String.format("http://%s:%s/%s/%s", endpointConfig.getEndpointIpAddress(),
- endpointConfig.getEndpointServerPort(), indexName, api);
- }
-
- public String buildElasticSearchUrl(String indexName, String docType) {
- return String.format("http://%s:%s/%s/%s", endpointConfig.getEndpointIpAddress(),
- endpointConfig.getEndpointServerPort(), indexName, docType);
- }
-
- public String buildElasticSearchGetDocUrl(String indexName, String docType, String docId) {
- return String.format("http://%s:%s/%s/%s/%s", endpointConfig.getEndpointIpAddress(),
- endpointConfig.getEndpointServerPort(), indexName, docType, docId);
- }
-
- public String buildElasticSearchGetDocUrl(String indexName, String docId) {
- return buildElasticSearchGetDocUrl(indexName, DEFAULT_TYPE, docId);
- }
-
- public String buildElasticSearchPostUrl(String indexName) {
- return String.format("http://%s:%s/%s/%s", endpointConfig.getEndpointIpAddress(),
- endpointConfig.getEndpointServerPort(), indexName, DEFAULT_TYPE);
- }
-
- public String getBulkUrl() {
- return String.format("http://%s:%s/%s", endpointConfig.getEndpointIpAddress(),
- endpointConfig.getEndpointServerPort(), BULK_API);
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java
deleted file mode 100644
index a7f372a..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.inventory;
-
-import javax.json.Json;
-import javax.json.JsonArray;
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
-
-/**
- * The Class EntityHistoryQueryBuilder.
- */
-public class EntityHistoryQueryBuilder {
-
- private static final String TABLE = "table";
- private static final String GRAPH = "graph";
-
- /**
- * Gets the query.
- *
- * @param type the type
- * @return the query
- */
- public static JsonObject getQuery(String type) {
- if (type.equalsIgnoreCase(TABLE)) {
- return createTableQuery();
- } else if (type.equalsIgnoreCase(GRAPH)) {
- return createGraphQuery();
- } else {
- return null;
- }
- }
-
- /**
- * Creates the graph query.
- *
- * @return the json object
- */
- public static JsonObject createGraphQuery() {
- JsonObjectBuilder jsonBuilder = Json.createObjectBuilder();
-
- jsonBuilder.add("aggs",
- Json.createObjectBuilder().add("group_by_entityType",
- Json.createObjectBuilder()
- .add("terms", Json.createObjectBuilder().add("field", "entityType").add("size", 0))
- .add("aggs", Json.createObjectBuilder().add("group_by_date",
- Json.createObjectBuilder().add("date_histogram", createDateHistogram())
- .add("aggs", Json.createObjectBuilder().add("sort_by_date",
- Json.createObjectBuilder().add("top_hits", createTopHitsBlob())))))));
- jsonBuilder.add("size", 0);
-
- return jsonBuilder.build();
- }
-
- /**
- * Creates the table query.
- *
- * @return the json object
- */
- public static JsonObject createTableQuery() {
- JsonObjectBuilder jsonBuilder = Json.createObjectBuilder();
-
- jsonBuilder.add("aggs",
- Json.createObjectBuilder().add("group_by_entityType",
- Json.createObjectBuilder()
- .add("terms", Json.createObjectBuilder().add("field", "entityType").add("size", 0))
- .add("aggs", Json.createObjectBuilder().add("sort_by_date",
- Json.createObjectBuilder().add("top_hits", createTopHitsBlob())))));
- jsonBuilder.add("size", 0);
-
- return jsonBuilder.build();
- }
-
- /**
- * Creates the date histogram.
- *
- * @return the json object
- */
- private static JsonObject createDateHistogram() {
- JsonObjectBuilder jsonBuilder = Json.createObjectBuilder();
-
- jsonBuilder.add("field", "timestamp");
- jsonBuilder.add("min_doc_count", 1);
- jsonBuilder.add("interval", "day");
- jsonBuilder.add("format", "epoch_millis");
-
- return jsonBuilder.build();
- }
-
- /**
- * Creates the top hits blob.
- *
- * @return the json object
- */
- private static JsonObject createTopHitsBlob() {
- JsonObjectBuilder builder = Json.createObjectBuilder();
- builder.add("size", 1);
- builder.add("sort", getSortCriteria());
- return builder.build();
- }
-
- public static JsonArray getSortCriteria() {
- JsonArrayBuilder jsonBuilder = Json.createArrayBuilder();
- jsonBuilder.add(Json.createObjectBuilder().add("timestamp",
- Json.createObjectBuilder().add("order", "desc")));
-
- return jsonBuilder.build();
- }
-
- /**
- * The main method.
- *
- * @param args the arguments
- */
- public static void main(String[] args) {
- System.out.println("TABLE-QUERY: " + createTableQuery().toString());
- System.out.println("GRAPH_QUERY: " + createGraphQuery().toString());
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java
deleted file mode 100644
index 9c1f226..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.inventory;
-
-import java.io.IOException;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.restlet.RestletConstants;
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
-import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.util.NodeUtils;
-import org.restlet.Request;
-import org.restlet.Response;
-import org.restlet.data.ClientInfo;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Parameter;
-import org.restlet.data.Status;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * The Class GeoVisualizationServlet.
- */
-public class GeoVisualizationProcessor {
-
- private static final Logger LOG =
- LoggerFactory.getInstance().getLogger(GeoVisualizationProcessor.class);
-
- private ObjectMapper mapper;
- private ElasticSearchAdapter elasticSearchAdapter = null;
- private String topographicalSearchIndexName;
-
- private static final String SEARCH_STRING = "_search";
- private static final String SEARCH_PARAMETER = "?filter_path=hits.hits._source&_source=location&size=5000&q=entityType:";
- private static final String PARAMETER_KEY = "entity";
-
- /**
- * Instantiates a new geo visualization processor
- */
- public GeoVisualizationProcessor(ElasticSearchAdapter elasticSearchAdapter, String topographicalSearchIndexName) {
- this.mapper = new ObjectMapper();
- this.elasticSearchAdapter = elasticSearchAdapter;
- this.topographicalSearchIndexName = topographicalSearchIndexName;
- }
-
- /**
- * Gets the geo visualization results.
- *
- * @param response the response
- * @param entityType the entity type
- * @return the geo visualization results
- * @throws Exception the exception
- */
- protected OperationResult getGeoVisualizationResults(Exchange exchange) throws Exception {
- OperationResult operationResult = new OperationResult();
-
-
- Object xTransactionId = exchange.getIn().getHeader("X-TransactionId");
- if (xTransactionId == null) {
- xTransactionId = NodeUtils.getRandomTxnId();
- }
-
- Object partnerName = exchange.getIn().getHeader("X-FromAppId");
- if (partnerName == null) {
- partnerName = "Browser";
- }
-
- Request request = exchange.getIn().getHeader(RestletConstants.RESTLET_REQUEST, Request.class);
-
- /* Disables automatic Apache Camel Restlet component logging which prints out an undesirable log entry
- which includes client (e.g. browser) information */
- request.setLoggable(false);
-
- ClientInfo clientInfo = request.getClientInfo();
- MdcContext.initialize((String) xTransactionId, "AAI-UI", "", (String) partnerName, clientInfo.getAddress() + ":" + clientInfo.getPort());
-
- String entityType = "";
-
- Form form = request.getResourceRef().getQueryAsForm();
- for (Parameter parameter : form) {
- if(PARAMETER_KEY.equals(parameter.getName())) {
- entityType = parameter.getName();
- }
- }
-
- String api = SEARCH_STRING + SEARCH_PARAMETER + entityType;
-
- final String requestUrl = elasticSearchAdapter.buildElasticSearchUrlForApi(topographicalSearchIndexName, api);
-
- try {
-
- OperationResult opResult =
- elasticSearchAdapter.doGet(requestUrl, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE);
-
- JSONObject finalOutputJson = formatOutput(opResult.getResult());
-
- Response response = exchange.getIn().getHeader(RestletConstants.RESTLET_RESPONSE, Response.class);
- response.setStatus(Status.SUCCESS_OK);
- response.setEntity(String.valueOf(finalOutputJson), MediaType.APPLICATION_JSON);
- exchange.getOut().setBody(response);
-
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ERROR_GENERIC, "Error processing Geo Visualization request");
- }
-
- return operationResult;
- }
-
- /**
- * Format output.
- *
- * @param results the results
- * @return the JSON object
- */
- private JSONObject formatOutput(String results) {
- JsonNode resultNode = null;
- JSONObject finalResult = new JSONObject();
- JSONArray entitiesArr = new JSONArray();
-
- try {
- resultNode = mapper.readTree(results);
-
- final JsonNode hitsNode = resultNode.get("hits").get("hits");
- if (hitsNode.isArray()) {
-
- for (final JsonNode arrayNode : hitsNode) {
- JsonNode sourceNode = arrayNode.get("_source");
- if (sourceNode.get("location") != null) {
- JsonNode locationNode = sourceNode.get("location");
- if (NodeUtils.isNumeric(locationNode.get("lon").asText())
- && NodeUtils.isNumeric(locationNode.get("lat").asText())) {
- JSONObject location = new JSONObject();
- location.put("longitude", locationNode.get("lon").asText());
- location.put("latitude", locationNode.get("lat").asText());
-
- entitiesArr.put(location);
- }
-
- }
- }
- }
- finalResult.put("plotPoints", entitiesArr);
-
- } catch (IOException exc) {
- LOG.warn(AaiUiMsgs.ERROR_BUILDING_SEARCH_RESPONSE, exc.getLocalizedMessage());
- }
-
- return finalResult;
- }
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java
deleted file mode 100644
index 9f775e5..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.inventory.entity;
-
-import java.io.Serializable;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-
-import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
-import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
-import org.onap.aai.sparky.sync.entity.IndexDocument;
-import org.onap.aai.sparky.util.NodeUtils;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * The Class GeoIndexDocument.
- */
-public class GeoIndexDocument implements Serializable, IndexDocument {
-
- @JsonIgnore
- private static final long serialVersionUID = -5188479658230319058L;
-
- protected String entityType;
- protected String entityPrimaryKeyValue;
- protected String entityPrimaryKeyName;
- protected String latitude;
- protected String longitude;
- protected String selfLink;
-
- @JsonIgnore
- protected OxmEntityLookup oxmEntityLookup;
-
- @JsonIgnore
- protected ObjectMapper mapper = new ObjectMapper();
- // generated, SHA-256 digest
- @JsonIgnore
- protected String id;
-
- /**
- * Convert bytes to hex string.
- *
- * @param bytesToConvert the bytes to convert
- * @return the string
- */
- private static String convertBytesToHexString(byte[] bytesToConvert) {
- StringBuffer hexString = new StringBuffer();
- for (int i = 0; i < bytesToConvert.length; i++) {
- hexString.append(Integer.toHexString(0xFF & bytesToConvert[i]));
- }
- return hexString.toString();
- }
-
-
- @JsonIgnore
- public boolean isValidGeoDocument() {
-
- boolean isValid = true;
-
- isValid &= (this.getEntityType() != null);
- isValid &= (this.getLatitude() != null);
- isValid &= (this.getLongitude() != null);
- isValid &= (this.getId() != null);
- isValid &= (this.getSelfLink() != null);
-
- isValid &= NodeUtils.isNumeric(this.getLatitude());
- isValid &= NodeUtils.isNumeric(this.getLongitude());
-
- return isValid;
- }
-
- /**
- * Concat array.
- *
- * @param list the list
- * @param delimiter the delimiter
- * @return the string
- */
- private static String concatArray(List<String> list, char delimiter) {
-
- if (list == null || list.size() == 0) {
- return "";
- }
-
- StringBuilder result = new StringBuilder(64);
-
- int listSize = list.size();
- boolean firstValue = true;
-
- for (String item : list) {
-
- if (firstValue) {
- result.append(item);
- firstValue = false;
- } else {
- result.append(delimiter).append(item);
- }
-
- }
-
- return result.toString();
-
- }
-
- /*
- * We'll try and create a unique identity key that we can use for differencing the previously
- * imported record sets as we won't have granular control of what is created/removed and when. The
- * best we can hope for is identification of resources by generated Id until the Identity-Service
- * UUID is tagged against all resources, then we can use that instead.
- */
-
- /**
- * Generate unique sha digest.
- *
- * @param entityType the entity type
- * @param fieldName the field name
- * @param fieldValue the field value
- * @return the string
- * @throws NoSuchAlgorithmException the no such algorithm exception
- */
- public static String generateUniqueShaDigest(String entityType, String fieldName,
- String fieldValue) throws NoSuchAlgorithmException {
-
- /*
- * Basically SHA-256 will result in an identity with a guaranteed uniqueness compared to just a
- * java hashcode value.
- */
- MessageDigest digest = MessageDigest.getInstance("SHA-256");
- digest.update(String.format("%s.%s.%s", entityType, fieldName, fieldValue).getBytes());
- return convertBytesToHexString(digest.digest());
- }
-
- /**
- * Instantiates a new geo index document.
- */
- public GeoIndexDocument() {}
-
- /*
- * (non-Javadoc)
- *
- */
-
- @Override
- @JsonIgnore
- public String getAsJson() throws JsonProcessingException {
-
- if (latitude != null && longitude != null) {
-
- /**
- * A valid entry from this class is one that has both lat and long. If one or both is missing
- * we shouldn't be indexing anything.
- */
-
- return NodeUtils.convertObjectToJson(this, true);
-
- }
-
- return null;
-
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.entity.IndexDocument#deriveFields()
- */
- @Override
- public void deriveFields() {
-
- /*
- * We'll try and create a unique identity key that we can use for differencing the previously
- * imported record sets as we won't have granular control of what is created/removed and when.
- * The best we can hope for is identification of resources by generated Id until the
- * Identity-Service UUID is tagged against all resources, then we can use that instead.
- */
-
- OxmEntityDescriptor descriptor = oxmEntityLookup.getEntityDescriptors().get(entityType);
- String entityPrimaryKeyName = NodeUtils.concatArray(
- descriptor.getPrimaryKeyAttributeNames(), "/");
-
- this.id =
- NodeUtils.generateUniqueShaDigest(entityType, entityPrimaryKeyName, entityPrimaryKeyValue);
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- return "TopographicalEntity [" + ("entityType=" + entityType + ", ")
- + ("entityPrimaryKeyValue=" + entityPrimaryKeyValue + ", ")
- + ("latitude=" + latitude + ", ") + ("longitude=" + longitude + ", ") + ("ID=" + id + ", ")
- + ("selfLink=" + selfLink) + "]";
- }
-
- @Override
- @JsonIgnore
- public String getId() {
- return this.id;
- }
-
- @JsonProperty("entityType")
- public String getEntityType() {
- return entityType;
- }
-
- public void setEntityType(String entityType) {
- this.entityType = entityType;
- }
-
- @JsonProperty("entityPrimaryKeyValue")
- public String getEntityPrimaryKeyValue() {
- return entityPrimaryKeyValue;
- }
-
- public void setEntityPrimaryKeyValue(String entityPrimaryKeyValue) {
- this.entityPrimaryKeyValue = entityPrimaryKeyValue;
- }
-
- @JsonProperty("entityPrimaryKeyName")
- public String getEntityPrimaryKeyName() {
- return entityPrimaryKeyName;
- }
-
- public void setEntityPrimaryKeyName(String entityPrimaryKeyName) {
- this.entityPrimaryKeyName = entityPrimaryKeyName;
- }
-
- @JsonProperty("lat")
- public String getLatitude() {
- return latitude;
- }
-
- public void setLatitude(String latitude) {
- this.latitude = latitude;
- }
-
- @JsonProperty("long")
- public String getLongitude() {
- return longitude;
- }
-
- public void setLongitude(String longitude) {
- this.longitude = longitude;
- }
-
- @JsonProperty("link")
- public String getSelfLink() {
- return selfLink;
- }
-
- public void setSelfLink(String selfLink) {
- this.selfLink = selfLink;
- }
-
- @JsonIgnore
- public static long getSerialversionuid() {
- return serialVersionUID;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java
deleted file mode 100644
index 88d47ba..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.inventory.entity;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-
-import javax.json.Json;
-import javax.json.JsonObject;
-
-/**
- * The Class TopographicalEntity.
- */
-public class TopographicalEntity implements Serializable {
-
- private static final long serialVersionUID = -5188479658230319058L;
-
- protected String entityType;
- protected String entityPrimaryKeyValue;
- protected String entityPrimaryKeyName;
- protected String latitude;
- protected String longitude;
- protected String selfLink;
-
- // generated, SHA-256 digest
- protected String id;
-
- /**
- * Convert bytes to hex string.
- *
- * @param bytesToConvert the bytes to convert
- * @return the string
- */
- private static String convertBytesToHexString(byte[] bytesToConvert) {
- StringBuffer hexString = new StringBuffer();
- for (int i = 0; i < bytesToConvert.length; i++) {
- hexString.append(Integer.toHexString(0xFF & bytesToConvert[i]));
- }
- return hexString.toString();
- }
-
- /**
- * Concat array.
- *
- * @param list the list
- * @param delimiter the delimiter
- * @return the string
- */
- private static String concatArray(List<String> list, char delimiter) {
-
- if (list == null || list.size() == 0) {
- return "";
- }
-
- StringBuilder result = new StringBuilder(64);
-
- int listSize = list.size();
- boolean firstValue = true;
-
- for (String item : list) {
-
- if (firstValue) {
- result.append(item);
- firstValue = false;
- } else {
- result.append(delimiter).append(item);
- }
-
- }
-
- return result.toString();
-
- }
-
- /*
- * We'll try and create a unique identity key that we can use for differencing the previously
- * imported record sets as we won't have granular control of what is created/removed and when. The
- * best we can hope for is identification of resources by generated Id until the Identity-Service
- * UUID is tagged against all resources, then we can use that instead.
- */
-
- /**
- * Generate unique sha digest.
- *
- * @param entityType the entity type
- * @param fieldName the field name
- * @param fieldValue the field value
- * @return the string
- * @throws NoSuchAlgorithmException the no such algorithm exception
- */
- public static String generateUniqueShaDigest(String entityType, String fieldName,
- String fieldValue) throws NoSuchAlgorithmException {
-
- /*
- * Basically SHA-256 will result in an identity with a guaranteed uniqueness compared to just a
- * java hashcode value.
- */
- MessageDigest digest = MessageDigest.getInstance("SHA-256");
- digest.update(String.format("%s.%s.%s", entityType, fieldName, fieldValue).getBytes());
- return convertBytesToHexString(digest.digest());
- }
-
- /**
- * Instantiates a new topographical entity.
- */
- public TopographicalEntity() {}
-
- /*
- * (non-Javadoc)
- *
- */
- public String getAsJson() throws IOException {
-
- JsonObject obj =
- Json.createObjectBuilder().add("entityType", entityType).add("pkey", entityPrimaryKeyValue)
- .add("location", Json.createObjectBuilder().add("lat", latitude).add("lon", longitude))
- .add("selfLink", selfLink).build();
-
- return obj.toString();
- }
-
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- return "TopographicalEntity [" + ("entityType=" + entityType + ", ")
- + ("entityPrimaryKeyValue=" + entityPrimaryKeyValue + ", ")
- + ("latitude=" + latitude + ", ") + ("longitude=" + longitude + ", ") + ("ID=" + id + ", ")
- + ("selfLink=" + selfLink) + "]";
- }
-
- public String getId() {
- return this.id;
- }
-
- public String getEntityType() {
- return entityType;
- }
-
- public void setEntityType(String entityType) {
- this.entityType = entityType;
- }
-
- public String getEntityPrimaryKeyValue() {
- return entityPrimaryKeyValue;
- }
-
- public void setEntityPrimaryKeyValue(String entityPrimaryKeyValue) {
- this.entityPrimaryKeyValue = entityPrimaryKeyValue;
- }
-
- public String getEntityPrimaryKeyName() {
- return entityPrimaryKeyName;
- }
-
- public void setEntityPrimaryKeyName(String entityPrimaryKeyName) {
- this.entityPrimaryKeyName = entityPrimaryKeyName;
- }
-
- public String getLatitude() {
- return latitude;
- }
-
- public void setLatitude(String latitude) {
- this.latitude = latitude;
- }
-
- public String getLongitude() {
- return longitude;
- }
-
- public void setLongitude(String longitude) {
- this.longitude = longitude;
- }
-
- public String getSelfLink() {
- return selfLink;
- }
-
- public void setSelfLink(String selfLink) {
- this.selfLink = selfLink;
- }
-
- public static long getSerialversionuid() {
- return serialVersionUID;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java
deleted file mode 100644
index ad651c7..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.search;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.component.restlet.RestletConstants;
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
-import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.inventory.EntityHistoryQueryBuilder;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.util.NodeUtils;
-import org.onap.aai.sparky.util.RestletUtils;
-import org.restlet.Request;
-import org.restlet.Response;
-import org.restlet.data.ClientInfo;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
-/**
- * Receives and processes Entity Count History requests
- */
-public class EntityCountHistoryProcessor implements Processor {
-
- private static final Logger LOG =
- LoggerFactory.getInstance().getLogger(EntityCountHistoryProcessor.class);
-
- private static final long serialVersionUID = 1L;
-
- private ElasticSearchAdapter elasticSearchAdapter = null;
- private ObjectMapper mapper;
-
- private static final String SEARCH_PRETTY_STRING = "_search?pretty";
- private static final String TYPE = "type";
- private static final String TABLE = "table";
- private static final String GRAPH = "graph";
-
- private List<String> entityTypesToSummarize;
- private List<String> vnfEntityTypes;
-
- private String entityCountHistoryIndexName;
-
- private boolean summarizeVnfs = false;
-
- private RestletUtils restletUtils = new RestletUtils();
-
- /**
- * Instantiates a new Entity Count History
- */
-
- public EntityCountHistoryProcessor(ElasticSearchAdapter elasticSearchAdapter,
- String entityTypesToSummarizeDelimitedList, String vnfEntityTypesDelimitedList, String entityCountHistoryIndexName) {
-
- this.elasticSearchAdapter = elasticSearchAdapter;
- this.entityCountHistoryIndexName = entityCountHistoryIndexName;
-
- entityTypesToSummarize =
- Arrays.asList(entityTypesToSummarizeDelimitedList.toLowerCase().split("[\\s,]+"));
-
- vnfEntityTypes =
- Arrays.asList(vnfEntityTypesDelimitedList.toLowerCase().split("[\\s,]+"));
-
- summarizeVnfs = vnfEntityTypesDelimitedList.toLowerCase().contains("vnf");
-
- this.mapper = new ObjectMapper();
- this.mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
- }
-
- /**
- * Processes a entity count history search request
- *
- * @param exchange The Exchange object generated by Apache Camel for the incoming request
- */
-
- @Override
- public void process(Exchange exchange) throws Exception {
-
- Request request = exchange.getIn().getHeader(RestletConstants.RESTLET_REQUEST, Request.class);
- Response restletResponse =
- exchange.getIn().getHeader(RestletConstants.RESTLET_RESPONSE, Response.class);
-
- Object xTransactionId = exchange.getIn().getHeader("X-TransactionId");
- if (xTransactionId == null) {
- xTransactionId = NodeUtils.getRandomTxnId();
- }
-
- Object partnerName = exchange.getIn().getHeader("X-FromAppId");
- if (partnerName == null) {
- partnerName = "Browser";
- }
-
- /*
- * Disables automatic Apache Camel Restlet component logging which prints out an undesirable log
- * entry which includes client (e.g. browser) information
- */
- request.setLoggable(false);
-
- ClientInfo clientInfo = request.getClientInfo();
- MdcContext.initialize((String) xTransactionId, "AAI-UI", "", (String) partnerName,
- clientInfo.getAddress() + ":" + clientInfo.getPort());
-
- String typeParameter = getTypeParameter(exchange);
-
- if (null != typeParameter && !typeParameter.isEmpty()) {
- OperationResult operationResult = null;
-
- try {
- operationResult = getResults(restletResponse, typeParameter);
- restletResponse.setEntity(operationResult.getResult(), MediaType.APPLICATION_JSON);
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, exc.getLocalizedMessage());
- }
- } else {
- LOG.error(AaiUiMsgs.RESOURCE_NOT_FOUND, request.getOriginalRef().toString());
- String errorMessage =
- restletUtils.generateJsonErrorResponse("Unsupported request. Resource not found.");
- restletResponse.setEntity(errorMessage, MediaType.APPLICATION_JSON);
- restletResponse.setStatus(Status.CLIENT_ERROR_NOT_FOUND);
- }
-
- exchange.getOut().setBody(restletResponse);
- }
-
-
- /**
- * Format line graph output
- *
- * @param results The results
- * @return The JSON object
- * @throws JsonProcessingException The JSON processing exception
- */
- public JSONObject formatLineGraphOutput(String results) throws JsonProcessingException {
- Map<Long, Long> countByDateMap = new HashMap<Long, Long>();
-
- JsonNode resultNode = null;
-
- JSONObject finalResult = new JSONObject();
- JSONArray finalResultArr = new JSONArray();
-
- try {
- resultNode = mapper.readTree(results);
-
- final JsonNode bucketsNode = getBucketsNode(resultNode);
-
- if (bucketsNode.isArray()) {
-
- for (final JsonNode entityNode : bucketsNode) {
- final JsonNode dateBucketNode = entityNode.get("group_by_date").get("buckets");
- if (dateBucketNode.isArray()) {
- for (final JsonNode dateBucket : dateBucketNode) {
- Long date = dateBucket.get("key").asLong();
- final JsonNode countBucketNode =
- dateBucket.get("sort_by_date").get("hits").get("hits");
-
- if (countBucketNode.isArray()) {
- final JsonNode latestEntityNode = countBucketNode.get(0);
-
- long currentCount = latestEntityNode.get("_source").get("count").asLong();
- if (countByDateMap.containsKey(date)) {
- // add to the value if map already contains this date
- currentCount += countByDateMap.get(date);
- }
-
- countByDateMap.put(date, currentCount);
- }
- }
-
- }
- }
- }
-
- /*
- * Sort the map by epoch timestamp
- */
- Map<Long, Long> sortedMap = new TreeMap<Long, Long>(countByDateMap);
- for (Entry<Long, Long> entry : sortedMap.entrySet()) {
- JSONObject dateEntry = new JSONObject();
- dateEntry.put("date", entry.getKey());
- dateEntry.put("count", entry.getValue());
- finalResultArr.put(dateEntry);
- }
-
- } catch (Exception exc) {
- LOG.warn(AaiUiMsgs.ERROR_BUILDING_SEARCH_RESPONSE, exc.getLocalizedMessage());
- }
-
- return finalResult.put("result", finalResultArr);
- }
-
- /**
- * Format table output
- *
- * @param results The results
- * @return The JSON object
- * @throws JsonProcessingException The JSON processing exception
- */
- public JSONObject formatTableOutput(String results) throws JsonProcessingException {
- JsonNode resultNode = null;
-
- JSONObject finalResult = new JSONObject();
- JSONArray entitiesArr = new JSONArray();
-
- Map<String, Long> entityCountInTable = initializeEntityMap();
-
- long vnfCount = 0;
-
- try {
- resultNode = mapper.readTree(results);
-
- final JsonNode bucketsNode = getBucketsNode(resultNode);
- if (bucketsNode.isArray()) {
-
- for (final JsonNode entityNode : bucketsNode) {
- String entityType = entityNode.get("key").asText();
- boolean isAVnf = vnfEntityTypes.contains(entityType);
- long countValue = 0;
-
- if (isAVnf || entityCountInTable.get(entityType) != null) {
- final JsonNode hitsBucketNode = entityNode.get("sort_by_date").get("hits").get("hits");
- if (hitsBucketNode.isArray()) {
- // the first bucket will be the latest
- final JsonNode hitNode = hitsBucketNode.get(0);
-
- countValue = hitNode.get("_source").get("count").asLong();
-
- /*
- * Special case: Add all the VNF types together to get aggregate count
- */
- if (summarizeVnfs && isAVnf) {
- vnfCount += countValue;
- countValue = vnfCount;
- entityType = "vnf";
- }
-
- entityCountInTable.replace(entityType, countValue);
- }
- }
-
- }
- }
- for (Entry<String, Long> entry : entityCountInTable.entrySet()) {
- JSONObject entityType = new JSONObject();
- entityType.put("key", entry.getKey());
- entityType.put("doc_count", entry.getValue());
- entitiesArr.put(entityType);
- }
-
- finalResult.put("result", entitiesArr);
-
- } catch (Exception exc) {
- LOG.warn(AaiUiMsgs.ERROR_BUILDING_RESPONSE_FOR_TABLE_QUERY, exc.getLocalizedMessage());
- }
-
- return finalResult;
- }
-
- /**
- * Gets the results
- *
- * @param response The response
- * @param type The type
- * @return The results
- */
- public OperationResult getResults(Response response, String type) {
- OperationResult operationResult = new OperationResult();
-
- String reqPayload = EntityHistoryQueryBuilder.getQuery(type).toString();
-
- try {
- final String fullUrlStr = elasticSearchAdapter
- .buildElasticSearchUrlForApi(entityCountHistoryIndexName, SEARCH_PRETTY_STRING);
-
- OperationResult opResult = elasticSearchAdapter.doPost(fullUrlStr, reqPayload,
- javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE);
-
- JSONObject finalOutput = null;
- if (type.equalsIgnoreCase(TABLE)) {
- finalOutput = formatTableOutput(opResult.getResult());
- } else if (type.equalsIgnoreCase(GRAPH)) {
- finalOutput = formatLineGraphOutput(opResult.getResult());
- }
-
- if (finalOutput != null) {
- response.setEntity(finalOutput.toString(), MediaType.APPLICATION_JSON);
- operationResult.setResult(finalOutput.toString());
- }
- } catch (JsonProcessingException exc) {
- restletUtils.handleRestletErrors(LOG, "Unable to map JSONpayload", exc, response);
- }
-
- return operationResult;
- }
-
- /**
- * Gets the buckets node
- *
- * @param node The node
- * @return The buckets node
- * @throws Exception The exception
- */
- public JsonNode getBucketsNode(JsonNode node) throws Exception {
- if (node.get("aggregations").get("group_by_entityType").get("buckets") != null) {
- return node.get("aggregations").get("group_by_entityType").get("buckets");
- } else {
- throw new Exception("Failed to map JSON response");
- }
- }
-
- /**
- * Initialize entity map
- *
- * @return the map
- */
- private Map<String, Long> initializeEntityMap() {
- Map<String, Long> entityMap = new HashMap<String, Long>();
- for (String entity : entityTypesToSummarize) {
- entityMap.put(entity, (long) 0);
- }
-
- return entityMap;
- }
-
- /**
- * Extracts the "type" query parameter from the request URI
- *
- * @param exchange
- * @return String containing the value of the "type" query parameter of the request. Returns null
- * if no "type" parameter found
- */
- public String getTypeParameter(Exchange exchange) {
- String typeParameter = null;
-
- String requestUriParameterString = exchange.getIn().getHeader("CamelHttpQuery", String.class);
-
- if (null != requestUriParameterString) {
- String[] requestParameterParts = requestUriParameterString.split("&");
-
- String[] parameter = requestParameterParts[0].split("=");
- String currentParameterKey = parameter[0];
-
- if (null != currentParameterKey && !currentParameterKey.isEmpty()) {
- // Check if we're looking at the "type" parameter key
- if (currentParameterKey.equals(TYPE)) {
- boolean uriIncludesTypeParameterValue =
- (parameter.length >= 2) && !parameter[1].isEmpty();
-
- if (uriIncludesTypeParameterValue) {
- String typeParameterValue = parameter[1];
-
- // Is the parameter value one that we return data for?
- if (typeParameterValue.equalsIgnoreCase(TABLE)
- || typeParameterValue.equalsIgnoreCase(GRAPH)) {
- typeParameter = typeParameterValue;
- }
- }
- }
- }
- }
-
- return typeParameter;
- }
-
-
- public void setRestletUtils(RestletUtils restletUtils) {
- this.restletUtils = restletUtils;
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java
index cb3e5e4..a22170a 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java
@@ -43,6 +43,13 @@ public class SearchServiceAdapter {
private static final String VALUE_QUERY = "query";
private static final String SUGGEST_QUERY = "suggest";
+ private static final String BULK_API = "bulk";
+ private static final String DOCUMENT_EDNPOINT = "documents";
+ private static final String SEARH_SERVICE_BULK_TEMPLATE =
+ "{\"create\":{\"metaData\":{\"url\":\"%s\"},\"document\":\"%s\"}}\n";
+
+ private static final String SEARH_SERVICE_SINGLE_ENTITY_TEMPLATE =
+ "{\"queries\":[{\"must\":{\"match\":{\"field\":\"_id\",\"value\":\"%s\"}}}]}\n";
private RestClient client;
private RestEndpointConfig endpointConfig;
@@ -113,6 +120,13 @@ public class SearchServiceAdapter {
OperationResult or = client.delete(url, getTxnHeader(), MediaType.APPLICATION_JSON_TYPE);
return new OperationResult(or.getResultCode(), or.getResult());
}
+
+ public OperationResult doBulkOperation(String url, String jsonPayload) {
+
+ OperationResult or = client.post(url, jsonPayload, getTxnHeader(),
+ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ return new OperationResult(or.getResultCode(), or.getResult());
+ }
public Map<String, List<String>> getTxnHeader() {
HashMap<String, List<String>> headers = new HashMap<String, List<String>>();
@@ -121,6 +135,15 @@ public class SearchServiceAdapter {
headers.put("X-FromAppId", Arrays.asList(MDC.get(MdcContext.MDC_PARTNER_NAME)));
return headers;
}
+
+public String buildBulkImportOperationRequest(String indexName, String id, String payload){
+
+ StringBuilder requestPayload = new StringBuilder(128);
+ String SearchTarget = buildSearchServiceDocUrl(indexName,id);
+
+ requestPayload.append(String.format(SEARH_SERVICE_BULK_TEMPLATE,SearchTarget,payload));
+ return requestPayload.toString();
+ }
/**
* Get Full URL for search
@@ -143,12 +166,50 @@ public class SearchServiceAdapter {
public String buildSuggestServiceQueryUrl(String indexName) {
return buildSearchServiceUrlForApi(indexName, SUGGEST_QUERY);
}
+
+ public String buildSearchServiceDocUrl(String indexName,String api) {
+
+ return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/%s/%s/%s",
+ endpointConfig.getEndpointIpAddress(), endpointConfig.getEndpointServerPort(),
+ serviceApiVersion, indexName,DOCUMENT_EDNPOINT, api);
+ }
+
+
+ public String buildSearchServiceCreateDocApi(String indexName){
+
+ return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/%s/%s",
+ endpointConfig.getEndpointIpAddress(), endpointConfig.getEndpointServerPort(),
+ serviceApiVersion, indexName,DOCUMENT_EDNPOINT );
+ }
public String buildSearchServiceUrlForApi(String indexName, String api) {
+
return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/%s/%s",
endpointConfig.getEndpointIpAddress(), endpointConfig.getEndpointServerPort(),
serviceApiVersion, indexName, api);
}
+
+ public String buildSearchServiceBulkUrl() {
+
+ return String.format("https://%s:%s/services/search-data-service/%s/search/%s", endpointConfig.getEndpointIpAddress(),
+ endpointConfig.getEndpointServerPort(),serviceApiVersion,BULK_API);
+ }
+
+ public OperationResult retrieveEntityById(String entityId,String indexName) {
+
+ StringBuilder requestPayload = new StringBuilder(128);
+ requestPayload.append(String.format(SEARH_SERVICE_SINGLE_ENTITY_TEMPLATE,entityId));
+ String payload = requestPayload.toString();
+ String searchServiceUrl = buildSearchServiceQueryUrl(indexName);
+
+ return this.doPost(searchServiceUrl,payload);
+ }
+
+public String buildSearchServiceCreateIndexUrl(String indexName) {
+
+ return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/dynamic/%s", endpointConfig.getEndpointIpAddress(),
+ endpointConfig.getEndpointServerPort(),serviceApiVersion,indexName);
+ }
}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java
index 707f907..1eb1823 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java
@@ -31,11 +31,10 @@ import org.json.JSONObject;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.logging.AaiUiMsgs;
import org.onap.aai.sparky.search.filters.config.UiFilterDataSourceConfig;
import org.onap.aai.sparky.search.filters.entity.UiFilterEntity;
-import org.onap.aai.sparky.viewandinspect.config.SparkyConstants;
/**
@@ -51,10 +50,10 @@ public class FilterElasticSearchAdapter {
private static final String CONTAINER = "default";
private static final String BUCKETS = "buckets";
private static final String FILTER_VALUE_KEY = "key";
- private ElasticSearchAdapter elasticSearchAdapter;
+ private SearchServiceAdapter searchServiceAdapter;
- public FilterElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
- this.elasticSearchAdapter = elasticSearchAdapter;
+ public FilterElasticSearchAdapter(SearchServiceAdapter searchServiceAdapter) {
+ this.searchServiceAdapter = searchServiceAdapter;
}
/**
@@ -76,10 +75,9 @@ public class FilterElasticSearchAdapter {
filterValueQuery = FilterQueryBuilder.createFilterValueQueryObject(dataSourceConfig.getFieldName());
}
- OperationResult opResult = elasticSearchAdapter.doPost(
- elasticSearchAdapter.buildElasticSearchUrlForApi(dataSourceConfig.getIndexName(),
- SparkyConstants.ES_SEARCH_API),
- filterValueQuery.toString(), MediaType.APPLICATION_JSON_TYPE);
+ OperationResult opResult = searchServiceAdapter.doPost(
+ searchServiceAdapter.buildSearchServiceQueryUrl(dataSourceConfig.getIndexName()),
+ filterValueQuery.toString(), "application/json");
String result = opResult.getResult();
if(opResult.wasSuccessful() && result != null) {
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
index 444eafb..a85e9f2 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java
@@ -30,7 +30,7 @@ import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
@@ -39,7 +39,7 @@ import org.onap.aai.sparky.dal.rest.HttpMethod;
import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
import org.onap.aai.sparky.logging.AaiUiMsgs;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
import org.onap.aai.sparky.util.NodeUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -72,7 +72,7 @@ public abstract class AbstractEntitySynchronizer {
protected EnumSet<StatFlag> enabledStatFlags;
- protected ElasticSearchAdapter elasticSearchAdapter;
+ protected SearchServiceAdapter searchServiceAdapter;
protected ActiveInventoryAdapter aaiAdapter;
protected ExecutorService synchronizerExecutor;
@@ -362,12 +362,12 @@ public abstract class AbstractEntitySynchronizer {
*/
public void clearCache() {}
- public ElasticSearchAdapter getElasticSearchAdapter() {
- return elasticSearchAdapter;
+ public SearchServiceAdapter getSearchServiceAdapter() {
+ return searchServiceAdapter;
}
- public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
- this.elasticSearchAdapter = elasticSearchAdapter;
+ public void setSearchServiceAdapter(SearchServiceAdapter searchServiceAdapter) {
+ this.searchServiceAdapter = searchServiceAdapter;
}
public ActiveInventoryAdapter getAaiAdapter() {
@@ -531,7 +531,7 @@ public abstract class AbstractEntitySynchronizer {
/*
* In this retry flow the se object has already derived its fields
*/
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id);
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), id);
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
}
@@ -548,7 +548,7 @@ public abstract class AbstractEntitySynchronizer {
* called incrementAndGet when queuing the failed PUT!
*/
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
+ supplyAsync(new PerformSearchServiceRetrieval(retryTransaction, searchServiceAdapter),
esExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java
deleted file mode 100644
index a397d91..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.sync;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.ws.rs.core.MediaType;
-
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
-import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
-import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
-import org.onap.aai.sparky.sync.entity.SearchableEntity;
-import org.onap.aai.sparky.sync.enumeration.OperationState;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * The Class ElasticSearchIndexCleaner.
- */
-public class ElasticSearchIndexCleaner implements IndexCleaner {
-
- private static final Logger LOG =
- LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
-
- private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
- private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
-
- private ObjectIdCollection before;
- private ObjectIdCollection after;
-
- private ObjectMapper mapper;
- private ElasticSearchAdapter esAdapter;
- private ElasticSearchEndpointConfig endpointConfig;
- private ElasticSearchSchemaConfig schemaConfig;
-
- /**
- * Instantiates a new elastic search index cleaner.
- *
- * @param restDataProvider the rest data provider
- * @param indexName the index name
- * @param indexType the index type
- * @param host the host
- * @param port the port
- * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
- * @param numItemsToGetBulkRequest the num items to get bulk request
- */
- public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
- this.esAdapter = esAdapter;
- this.before = null;
- this.after = null;
- this.endpointConfig = endpointConfig;
- this.schemaConfig = schemaConfig;
- this.mapper = new ObjectMapper();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
- */
- @Override
- public OperationState populatePreOperationCollection() {
-
- try {
- before = retrieveAllDocumentIdentifiers();
- return OperationState.OK;
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
- return OperationState.ERROR;
- }
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
- */
- @Override
- public OperationState populatePostOperationCollection() {
- try {
- after = retrieveAllDocumentIdentifiers();
- return OperationState.OK;
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
- return OperationState.ERROR;
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
- */
- @Override
- public OperationState performCleanup() {
- // TODO Auto-generated method stub
- LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
-
- int sizeBefore = before.getSize();
- int sizeAfter = after.getSize();
-
- LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
- String.valueOf(sizeAfter));
-
- /*
- * If the processedImportIds size <= 0, then something has failed in the sync operation and we
- * shouldn't do the selective delete right now.
- */
-
- if (sizeAfter > 0) {
-
- Collection<String> presyncIds = before.getImportedObjectIds();
- presyncIds.removeAll(after.getImportedObjectIds());
-
- try {
- LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(),
- String.valueOf(presyncIds.size()));
-
- ObjectIdCollection bulkIds = new ObjectIdCollection();
-
- Iterator<String> it = presyncIds.iterator();
- int numItemsInBulkRequest = 0;
- int numItemsRemainingToBeDeleted = presyncIds.size();
-
- while (it.hasNext()) {
-
- bulkIds.addObjectId(it.next());
- numItemsInBulkRequest++;
-
- if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
- LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
- bulkDelete(bulkIds.getImportedObjectIds());
- numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
- numItemsInBulkRequest = 0;
- bulkIds.clear();
- }
- }
-
- if (numItemsRemainingToBeDeleted > 0) {
- LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
- bulkDelete(bulkIds.getImportedObjectIds());
- }
-
-
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage());
-
- }
- }
-
- return OperationState.OK;
- }
-
- @Override
- public String getIndexName() {
- return schemaConfig.getIndexName();
- }
-
- /**
- * Builds the initial scroll request payload.
- *
- * @param numItemsToGetPerRequest the num items to get per request
- * @param fieldList the field list
- * @return the string
- * @throws JsonProcessingException the json processing exception
- */
- protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
- List<String> fieldList) throws JsonProcessingException {
-
- ObjectNode rootNode = mapper.createObjectNode();
- rootNode.put("size", numItemsToGetPerRequest);
-
- ArrayNode fields = mapper.createArrayNode();
-
- for (String f : fieldList) {
- fields.add(f);
- }
-
- rootNode.set("fields", fields);
-
- ObjectNode queryNode = mapper.createObjectNode();
- queryNode.set("match_all", mapper.createObjectNode());
-
- rootNode.set("query", queryNode);
-
- return mapper.writeValueAsString(rootNode);
-
- }
-
- /**
- * Builds the subsequent scroll context request payload.
- *
- * @param scrollId the scroll id
- * @param contextTimeToLiveInMinutes the context time to live in minutes
- * @return the string
- * @throws JsonProcessingException the json processing exception
- */
- protected String buildSubsequentScrollContextRequestPayload(String scrollId,
- int contextTimeToLiveInMinutes) throws JsonProcessingException {
-
- ObjectNode rootNode = mapper.createObjectNode();
-
- rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
- rootNode.put("scroll_id", scrollId);
-
- return mapper.writeValueAsString(rootNode);
-
- }
-
- /**
- * Parses the elastic search result.
- *
- * @param jsonResult the json result
- * @return the json node
- * @throws JsonProcessingException the json processing exception
- * @throws IOException Signals that an I/O exception has occurred.
- */
- protected JsonNode parseElasticSearchResult(String jsonResult)
- throws JsonProcessingException, IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readTree(jsonResult);
- }
-
- /**
- * Lookup index doc.
- *
- * @param ids the ids
- * @param docs the docs
- * @return the array list
- */
- protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
- List<SearchableEntity> docs) {
- ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
-
- if (ids != null && docs != null) {
- for (SearchableEntity d : docs) {
- if (ids.contains(d.getId())) {
- objs.add(d);
- }
- }
- }
-
- return objs;
- }
-
- /**
- * Builds the delete data object.
- *
- * @param index the index
- * @param type the type
- * @param id the id
- * @return the object node
- */
- protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
-
- ObjectNode indexDocProperties = mapper.createObjectNode();
-
- indexDocProperties.put("_index", index);
- indexDocProperties.put("_type", type);
- indexDocProperties.put("_id", id);
-
- ObjectNode rootNode = mapper.createObjectNode();
- rootNode.set("delete", indexDocProperties);
-
- return rootNode;
- }
-
- /**
- * This method might appear to be a little strange, and is simply an optimization to take an
- * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
- *
- * @param startNode the start node
- * @param fieldPath the field path
- * @return the node path
- */
- protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
-
- JsonNode jsonNode = null;
-
- for (String field : fieldPath) {
- if (jsonNode == null) {
- jsonNode = startNode.get(field);
- } else {
- jsonNode = jsonNode.get(field);
- }
-
- /*
- * This is our safety net in case any intermediate path returns a null
- */
-
- if (jsonNode == null) {
- return null;
- }
-
- }
-
- return jsonNode;
- }
-
- /**
- * Gets the full url.
- *
- * @param resourceUrl the resource url
- * @return the full url
- */
- private String getFullUrl(String resourceUrl) {
- return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
- endpointConfig.getEsServerPort(), resourceUrl);
- }
-
- /**
- * Retrieve all document identifiers.
- *
- * @return the object id collection
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
-
- ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
-
- long opStartTimeInMs = System.currentTimeMillis();
-
- List<String> fields = new ArrayList<String>();
- fields.add("_id");
- // fields.add("entityType");
-
- String scrollRequestPayload =
- buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
-
- final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll="
- + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
-
- OperationResult result =
- esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
-
- if (result.wasSuccessful()) {
-
- JsonNode rootNode = parseElasticSearchResult(result.getResult());
-
- /*
- * Check the result for success / failure, and enumerate all the index ids that resulted in
- * success, and ignore the ones that failed or log them so we have a record of the failure.
- */
- int totalRecordsAvailable = 0;
- String scrollId = null;
- int numRecordsFetched = 0;
-
- if (rootNode != null) {
-
- scrollId = getFieldValue(rootNode, "_scroll_id");
- final String tookStr = getFieldValue(rootNode, "took");
- int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
- boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
-
- if (timedOut) {
- LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
- String.valueOf(tookInMs));
- } else {
- LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
- String.valueOf(tookInMs));
- }
-
- JsonNode hitsNode = rootNode.get("hits");
- totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
-
- LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
- String.valueOf(totalRecordsAvailable));
-
- /*
- * Collect all object ids
- */
-
- ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
-
- Iterator<JsonNode> nodeIterator = hitsArray.iterator();
-
- String key = null;
- String value = null;
- JsonNode jsonNode = null;
-
- while (nodeIterator.hasNext()) {
-
- jsonNode = nodeIterator.next();
-
- key = getFieldValue(jsonNode, "_id");
-
- if (key != null) {
- currentDocumentIds.addObjectId(key);
- }
-
- }
-
- int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
-
- int numRequiredAdditionalFetches =
- (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
-
- /*
- * Do an additional fetch for the remaining items (if needed)
- */
-
- if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
- numRequiredAdditionalFetches += 1;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
- String.valueOf(numRequiredAdditionalFetches));
- }
-
-
- for (int x = 0; x < numRequiredAdditionalFetches; x++) {
-
- if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
- // abort the whole thing because now we can't reliably cleanup the orphans.
- throw new IOException(
- "Failed to collect pre-sync doc collection from index. Aborting operation");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
- String.valueOf(currentDocumentIds.getSize()),
- String.valueOf(totalRecordsAvailable));
- }
-
- }
-
- }
-
- } else {
- // scroll context get failed, nothing else to do
- LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
- }
-
- LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
- String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
-
- return currentDocumentIds;
-
- }
-
- /**
- * Collect items from scroll context.
- *
- * @param scrollId the scroll id
- * @param objectIds the object ids
- * @return the operation state
- * @throws IOException Signals that an I/O exception has occurred.
- */
- private OperationState collectItemsFromScrollContext(String scrollId,
- ObjectIdCollection objectIds) throws IOException {
-
- String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
- endpointConfig.getScrollContextTimeToLiveInMinutes());
-
- final String fullUrlStr = getFullUrl("/_search/scroll");
-
- OperationResult opResult =
- esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
-
- if (opResult.getResultCode() >= 300) {
- LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
- return OperationState.ERROR;
- }
-
- JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
- boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
- final String tookStr = getFieldValue(rootNode, "took");
- int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
-
- JsonNode hitsNode = rootNode.get("hits");
-
- /*
- * Check the result for success / failure, and enumerate all the index ids that resulted in
- * success, and ignore the ones that failed or log them so we have a record of the failure.
- */
-
- if (rootNode != null) {
-
- if (timedOut) {
- LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
- } else {
- LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
- }
-
- /*
- * Collect all object ids
- */
-
- ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
- String key = null;
- String value = null;
- JsonNode jsonNode = null;
-
- Iterator<JsonNode> nodeIterator = hitsArray.iterator();
-
- while (nodeIterator.hasNext()) {
-
- jsonNode = nodeIterator.next();
-
- key = getFieldValue(jsonNode, "_id");
-
- if (key != null) {
- objectIds.addObjectId(key);
-
- }
-
- }
- }
-
- return OperationState.OK;
- }
-
- /**
- * Gets the field value.
- *
- * @param node the node
- * @param fieldName the field name
- * @return the field value
- */
- protected String getFieldValue(JsonNode node, String fieldName) {
-
- JsonNode field = node.get(fieldName);
-
- if (field != null) {
- return field.asText();
- }
-
- return null;
-
- }
-
- /**
- * Bulk delete.
- *
- * @param docIds the doc ids
- * @return the operation result
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
-
- if (docIds == null || docIds.size() == 0) {
- LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
- return new OperationResult(500,
- "Skipping bulkDelete(); operation because docs to delete list is empty");
- }
-
- LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
-
- StringBuilder sb = new StringBuilder(128);
-
- for (String id : docIds) {
- sb.append(String.format(BULK_OP_LINE_TEMPLATE,
- buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
- }
-
- sb.append("\n");
-
- final String fullUrlStr = getFullUrl("/_bulk");
-
- return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
-
- }
-
- /*
-
- */
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java
index c83aa72..c9d8272 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java
@@ -25,9 +25,9 @@ import javax.ws.rs.core.MediaType;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
+import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
/**
@@ -38,11 +38,11 @@ public class IndexIntegrityValidator implements IndexValidator {
private static final Logger LOG =
LoggerFactory.getInstance().getLogger(IndexIntegrityValidator.class);
- private ElasticSearchEndpointConfig endpointConfig;
+ private RestEndpointConfig endpointConfig;
private ElasticSearchSchemaConfig schemaConfig;
private String tableConfigJson;
- private final ElasticSearchAdapter esAdapter;
+ private final SearchServiceAdapter searchServiceAdapter;
/**
* Instantiates a new index integrity validator.
@@ -54,21 +54,21 @@ public class IndexIntegrityValidator implements IndexValidator {
* @param port the port
* @param tableConfigJson the table config json
*/
- public IndexIntegrityValidator(ElasticSearchAdapter esAdapter,
- ElasticSearchSchemaConfig esSchemaConfig, ElasticSearchEndpointConfig esEndpointConfig,
+ public IndexIntegrityValidator(SearchServiceAdapter searchServiceAdapter,
+ ElasticSearchSchemaConfig esSchemaConfig, RestEndpointConfig esEndpointConfig,
String tableConfigJson) {
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.schemaConfig = esSchemaConfig;
this.endpointConfig = esEndpointConfig;
this.tableConfigJson = tableConfigJson;
}
- public ElasticSearchEndpointConfig getEndpointConfig() {
+ public RestEndpointConfig getEndpointConfig() {
return endpointConfig;
}
- public void setEndpointConfig(ElasticSearchEndpointConfig endpointConfig) {
+ public void setEndpointConfig(RestEndpointConfig endpointConfig) {
this.endpointConfig = endpointConfig;
}
@@ -80,8 +80,8 @@ public class IndexIntegrityValidator implements IndexValidator {
this.schemaConfig = schemaConfig;
}
- public ElasticSearchAdapter getEsAdapter() {
- return esAdapter;
+ public SearchServiceAdapter getSearchServiceAdapter() {
+ return searchServiceAdapter;
}
@Override
@@ -95,10 +95,14 @@ public class IndexIntegrityValidator implements IndexValidator {
*
* @see org.openecomp.sparky.synchronizer.IndexValidator#exists()
*/
+ /* TODO
+ * currently Search does not support head operations on an index neither does it support get operations
+ * on an index. get is being used so that it does not break any code.
+ * */
@Override
public boolean exists() {
- final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName() + "/");
- OperationResult existsResult = esAdapter.doHead(fullUrlStr, MediaType.APPLICATION_JSON_TYPE);
+ final String fullUrlStr = getFullUrl(schemaConfig.getIndexName() + "/");
+ OperationResult existsResult = searchServiceAdapter.doGet(fullUrlStr, "application/json");
int rc = existsResult.getResultCode();
@@ -135,7 +139,7 @@ public class IndexIntegrityValidator implements IndexValidator {
final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName() + "/");
OperationResult createResult =
- esAdapter.doPut(fullUrlStr, tableConfigJson, MediaType.APPLICATION_JSON_TYPE);
+ searchServiceAdapter.doPut(fullUrlStr, tableConfigJson,"application/json");
int rc = createResult.getResultCode();
@@ -167,8 +171,8 @@ public class IndexIntegrityValidator implements IndexValidator {
* @return the full url
*/
private String getFullUrl(String resourceUrl) {
- return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
- endpointConfig.getEsServerPort(), resourceUrl);
+ String createIndexUrl = searchServiceAdapter.buildSearchServiceCreateIndexUrl(resourceUrl);
+ return createIndexUrl;
}
}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java
deleted file mode 100644
index 4ba3405..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.sync.config;
-
-public class ElasticSearchEndpointConfig {
-
- private String esIpAddress;
- private String esServerPort;
- private int scrollContextTimeToLiveInMinutes;
- private int scrollContextBatchRequestSize;
-
- public ElasticSearchEndpointConfig() {
-
- }
-
- public String getEsIpAddress() {
- return esIpAddress;
- }
-
- public void setEsIpAddress(String esIpAddress) {
- this.esIpAddress = esIpAddress;
- }
-
- public String getEsServerPort() {
- return esServerPort;
- }
-
- public void setEsServerPort(String esServerPort) {
- this.esServerPort = esServerPort;
- }
-
- public int getScrollContextTimeToLiveInMinutes() {
- return scrollContextTimeToLiveInMinutes;
- }
-
- public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) {
- this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
- }
-
- public int getScrollContextBatchRequestSize() {
- return scrollContextBatchRequestSize;
- }
-
- public void setScrollContextBatchRequestSize(int scrollContextBatchRequestSize) {
- this.scrollContextBatchRequestSize = scrollContextBatchRequestSize;
- }
-
-
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchPut.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServicePut.java
index d516ba8..c62445c 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchPut.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServicePut.java
@@ -23,19 +23,17 @@ package org.onap.aai.sparky.sync.task;
import java.util.Map;
import java.util.function.Supplier;
-import javax.ws.rs.core.MediaType;
-
import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.slf4j.MDC;
/**
* The Class PerformElasticSearchPut.
*/
-public class PerformElasticSearchPut implements Supplier<NetworkTransaction> {
+public class PerformSearchServicePut implements Supplier<NetworkTransaction> {
- private ElasticSearchAdapter esAdapter;
+ private SearchServiceAdapter searchServiceAdapter;
private String jsonPayload;
private NetworkTransaction txn;
private Map<String, String> contextMap;
@@ -47,19 +45,19 @@ public class PerformElasticSearchPut implements Supplier<NetworkTransaction> {
* @param txn the txn
* @param restDataProvider the rest data provider
*/
- public PerformElasticSearchPut(String jsonPayload, NetworkTransaction txn,
- ElasticSearchAdapter esAdapter) {
+ public PerformSearchServicePut(String jsonPayload, NetworkTransaction txn,
+ SearchServiceAdapter searchServiceAdapter) {
this.jsonPayload = jsonPayload;
this.txn = txn;
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.contextMap = MDC.getCopyOfContextMap();
}
- public PerformElasticSearchPut(String jsonPayload, NetworkTransaction txn,
- ElasticSearchAdapter esAdapter, Map<String, String> contextMap) {
+ public PerformSearchServicePut(String jsonPayload, NetworkTransaction txn,
+ SearchServiceAdapter searchServiceAdapter, Map<String, String> contextMap) {
this.jsonPayload = jsonPayload;
this.txn = txn;
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.contextMap = contextMap;
}
@@ -76,7 +74,7 @@ public class PerformElasticSearchPut implements Supplier<NetworkTransaction> {
long startTimeInMs = System.currentTimeMillis();
OperationResult or =
- esAdapter.doPut(txn.getLink(), jsonPayload, MediaType.APPLICATION_JSON_TYPE);
+ searchServiceAdapter.doPut(txn.getLink(), jsonPayload, "application/json");
txn.setOperationResult(or);
txn.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs);
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchRetrieval.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceRetrieval.java
index 5191c65..ab7da7a 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchRetrieval.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceRetrieval.java
@@ -23,20 +23,18 @@ package org.onap.aai.sparky.sync.task;
import java.util.Map;
import java.util.function.Supplier;
-import javax.ws.rs.core.MediaType;
-
import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.slf4j.MDC;
/**
* The Class PerformElasticSearchRetrieval.
*/
-public class PerformElasticSearchRetrieval implements Supplier<NetworkTransaction> {
+public class PerformSearchServiceRetrieval implements Supplier<NetworkTransaction> {
private NetworkTransaction txn;
- private ElasticSearchAdapter esAdapter;
+ private SearchServiceAdapter searchServiceAdapter;
private Map<String, String> contextMap;
/**
@@ -45,10 +43,10 @@ public class PerformElasticSearchRetrieval implements Supplier<NetworkTransactio
* @param elasticSearchTxn the elastic search txn
* @param restDataProvider the rest data provider
*/
- public PerformElasticSearchRetrieval(NetworkTransaction elasticSearchTxn,
- ElasticSearchAdapter esAdapter) {
+ public PerformSearchServiceRetrieval(NetworkTransaction elasticSearchTxn,
+ SearchServiceAdapter searchServiceAdapter) {
this.txn = elasticSearchTxn;
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.contextMap = MDC.getCopyOfContextMap();
}
@@ -59,7 +57,7 @@ public class PerformElasticSearchRetrieval implements Supplier<NetworkTransactio
public NetworkTransaction get() {
MDC.setContextMap(contextMap);
long startTimeInMs = System.currentTimeMillis();
- OperationResult or = esAdapter.doGet(txn.getLink(), MediaType.APPLICATION_JSON_TYPE);
+ OperationResult or = searchServiceAdapter.doGet(txn.getLink(), "application/json");
txn.setOperationResult(or);
txn.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs);
return txn;
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchUpdate.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceUpdate.java
index 9e8a8fc..d52a338 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchUpdate.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceUpdate.java
@@ -24,16 +24,16 @@ import java.util.Map;
import java.util.function.Supplier;
import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.slf4j.MDC;
/**
* The Class PerformElasticSearchUpdate.
*/
-public class PerformElasticSearchUpdate implements Supplier<NetworkTransaction> {
+public class PerformSearchServiceUpdate implements Supplier<NetworkTransaction> {
- private ElasticSearchAdapter esAdapter;
+ private SearchServiceAdapter searchServiceAdapter;
private NetworkTransaction operationTracker;
private String updatePayload;
private String updateUrl;
@@ -47,11 +47,11 @@ public class PerformElasticSearchUpdate implements Supplier<NetworkTransaction>
* @param esDataProvider the es data provider
* @param transactionTracker the transaction tracker
*/
- public PerformElasticSearchUpdate(String updateUrl, String updatePayload,
- ElasticSearchAdapter esAdapter, NetworkTransaction transactionTracker) {
+ public PerformSearchServiceUpdate(String updateUrl, String updatePayload,
+ SearchServiceAdapter searchServiceAdapter, NetworkTransaction transactionTracker) {
this.updateUrl = updateUrl;
this.updatePayload = updatePayload;
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.contextMap = MDC.getCopyOfContextMap();
this.operationTracker = new NetworkTransaction();
operationTracker.setEntityType(transactionTracker.getEntityType());
@@ -69,7 +69,7 @@ public class PerformElasticSearchUpdate implements Supplier<NetworkTransaction>
operationTracker.setTaskAgeInMs();
MDC.setContextMap(contextMap);
long startTimeInMs = System.currentTimeMillis();
- OperationResult or = esAdapter.doBulkOperation(updateUrl, updatePayload);
+ OperationResult or = searchServiceAdapter.doBulkOperation(updateUrl, updatePayload);
operationTracker.setOperationResult(or);
operationTracker.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs);
return operationTracker;
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java
deleted file mode 100644
index 301e65d..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.sync.task;
-
-import java.util.Map;
-import java.util.function.Supplier;
-
-import javax.ws.rs.core.MediaType;
-
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.dal.NetworkTransaction;
-import org.onap.aai.sparky.sync.entity.IndexDocument;
-import org.slf4j.MDC;
-
-/**
- * The Class StoreDocumentTask.
- */
-public class StoreDocumentTask implements Supplier<NetworkTransaction> {
-
- private IndexDocument doc;
-
- private NetworkTransaction txn;
-
- private ElasticSearchAdapter esAdapter;
- private Map<String, String> contextMap;
-
- /**
- * Instantiates a new store document task.
- *
- * @param doc the doc
- * @param txn the txn
- * @param esAdapter the es adapter
- */
- public StoreDocumentTask(IndexDocument doc, NetworkTransaction txn,
- ElasticSearchAdapter esAdapter) {
- this.doc = doc;
- this.txn = txn;
- this.esAdapter = esAdapter;
- this.contextMap = MDC.getCopyOfContextMap();
- }
-
- /* (non-Javadoc)
- * @see java.util.function.Supplier#get()
- */
- @Override
- public NetworkTransaction get() {
- txn.setTaskAgeInMs();
-
- long startTimeInMs = System.currentTimeMillis();
- MDC.setContextMap(contextMap);
- OperationResult operationResult = new OperationResult();
-
- try {
-
- operationResult =
- esAdapter.doPut(txn.getLink(), doc.getAsJson(), MediaType.APPLICATION_JSON_TYPE);
- txn.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs);
- } catch (Exception exception) {
- operationResult.setResult(500, exception.getMessage());
- }
-
- txn.setOperationResult(operationResult);
-
- return txn;
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java
deleted file mode 100644
index 66c249c..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.topology.sync;
-
-import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
-import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
-import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner;
-import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory;
-import org.onap.aai.sparky.sync.IndexCleaner;
-import org.onap.aai.sparky.sync.IndexIntegrityValidator;
-import org.onap.aai.sparky.sync.SyncControllerImpl;
-import org.onap.aai.sparky.sync.SyncControllerRegistrar;
-import org.onap.aai.sparky.sync.SyncControllerRegistry;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
-import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
-import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
-import org.onap.aai.sparky.sync.config.SyncControllerConfig;
-
-public class GeoSyncController extends SyncControllerImpl implements SyncControllerRegistrar {
-
- private SyncControllerRegistry syncControllerRegistry;
-
- public GeoSyncController(SyncControllerConfig syncControllerConfig,
- ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
- ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig,
- NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
- GeoEntityLookup geoEntityLookup, OxmEntityLookup oxmEntityLookup,
- ElasticSearchSchemaFactory elasticSearchSchemaFactory) throws Exception {
- super(syncControllerConfig);
-
- // final String controllerName = "Inventory Geo Synchronizer";
-
- IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(esAdapter, schemaConfig,
- endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
-
- registerIndexValidator(indexValidator);
-
- GeoSynchronizer synchronizer =
- new GeoSynchronizer(schemaConfig, syncControllerConfig.getNumInternalSyncWorkers(),
- syncControllerConfig.getNumSyncActiveInventoryWorkers(),
- syncControllerConfig.getNumSyncElasticWorkers(), aaiStatConfig, esStatConfig,
- geoEntityLookup, oxmEntityLookup);
-
- synchronizer.setAaiAdapter(aaiAdapter);
- synchronizer.setElasticSearchAdapter(esAdapter);
-
- registerEntitySynchronizer(synchronizer);
-
-
- IndexCleaner indexCleaner =
- new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig);
-
- registerIndexCleaner(indexCleaner);
-
- }
-
- public SyncControllerRegistry getSyncControllerRegistry() {
- return syncControllerRegistry;
- }
-
- public void setSyncControllerRegistry(SyncControllerRegistry syncControllerRegistry) {
- this.syncControllerRegistry = syncControllerRegistry;
- }
-
- @Override
- public void registerController() {
-
- if ( syncControllerRegistry != null ) {
- if ( syncControllerConfig.isEnabled()) {
- syncControllerRegistry.registerSyncController(this);
- }
- }
- }
-
-
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java
deleted file mode 100644
index 809c21a..0000000
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java
+++ /dev/null
@@ -1,477 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.sparky.topology.sync;
-
-import static java.util.concurrent.CompletableFuture.supplyAsync;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.function.Supplier;
-
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
-import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.restclient.client.OperationResult;
-import org.onap.aai.sparky.config.oxm.GeoEntityLookup;
-import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor;
-import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
-import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
-import org.onap.aai.sparky.dal.NetworkTransaction;
-import org.onap.aai.sparky.dal.rest.HttpMethod;
-import org.onap.aai.sparky.inventory.entity.GeoIndexDocument;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
-import org.onap.aai.sparky.sync.IndexSynchronizer;
-import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
-import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
-import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
-import org.onap.aai.sparky.sync.enumeration.OperationState;
-import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
-import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.StoreDocumentTask;
-import org.onap.aai.sparky.util.NodeUtils;
-import org.slf4j.MDC;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-
-/**
- * The Class GeoSynchronizer.
- */
-public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer {
-
- private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class);
-
- private boolean allWorkEnumerated;
- private Deque<SelfLinkDescriptor> selflinks;
- private GeoEntityLookup geoEntityLookup;
- private OxmEntityLookup oxmEntityLookup;
-
- private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null;
-
- /**
- * Instantiates a new geo synchronizer.
- *
- * @throws Exception the exception
- */
- public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
- int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
- NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup,
- OxmEntityLookup oxmEntityLookup) throws Exception {
-
- super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
- this.geoEntityLookup = geoEntityLookup;
- this.oxmEntityLookup = oxmEntityLookup;
- this.allWorkEnumerated = false;
- this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
- this.synchronizerName = "Geo Synchronizer";
- this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors();
- this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
- this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet());
- this.syncDurationInMs = -1;
- }
-
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
- */
- @Override
- public OperationState doSync() {
- this.syncDurationInMs = -1;
- resetCounters();
- setShouldSkipSync(false);
- allWorkEnumerated = false;
- syncStartedTimeStampInMs = System.currentTimeMillis();
- String txnID = NodeUtils.getRandomTxnId();
- MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", "");
-
- collectAllTheWork();
- return OperationState.OK;
- }
-
-
- /**
- * Collect all the work.
- *
- * @return the operation state
- */
- public OperationState collectAllTheWork() {
- final Map<String,String> contextMap = MDC.getCopyOfContextMap();
-
- if (geoDescriptorMap.isEmpty()) {
- setShouldSkipSync(true);
- LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities");
- return OperationState.ERROR;
- }
-
- Collection<String> syncTypes = geoDescriptorMap.keySet();
-
- try {
-
- /*
- * launch a parallel async thread to process the documents for each entity-type (to max the of
- * the configured executor anyway)
- */
-
- aaiWorkOnHand.set(syncTypes.size());
-
- for (String key : syncTypes) {
-
- supplyAsync(new Supplier<Void>() {
-
- @Override
- public Void get() {
- MDC.setContextMap(contextMap);
- OperationResult typeLinksResult = null;
- try {
- typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
- aaiWorkOnHand.decrementAndGet();
- processEntityTypeSelfLinks(typeLinksResult);
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
- }
-
- return null;
- }
-
- }, aaiExecutor).whenComplete((result, error) -> {
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
- }
- });
-
- }
-
- while (aaiWorkOnHand.get() != 0) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
- }
-
- Thread.sleep(1000);
- }
-
- aaiWorkOnHand.set(selflinks.size());
- allWorkEnumerated = true;
- syncEntityTypes();
-
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc);
- }
- return OperationState.OK;
- }
-
- /**
- * Sync entity types.
- */
- private void syncEntityTypes() {
-
- while (selflinks.peek() != null) {
-
- SelfLinkDescriptor linkDescriptor = selflinks.poll();
- aaiWorkOnHand.decrementAndGet();
-
- OxmEntityDescriptor descriptor = null;
-
- if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
-
- descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
-
- if (descriptor == null) {
- LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
- // go to next element in iterator
- continue;
- }
-
- NetworkTransaction txn = new NetworkTransaction();
- txn.setDescriptor(descriptor);
- txn.setLink(linkDescriptor.getSelfLink());
- txn.setOperationType(HttpMethod.GET);
- txn.setEntityType(linkDescriptor.getEntityType());
-
- aaiWorkOnHand.incrementAndGet();
-
- supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
- .whenComplete((result, error) -> {
-
- aaiWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
- } else {
- if (result == null) {
- LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink());
- } else {
- processEntityTypeSelfLinkResult(result);
- }
- }
- });
- }
- }
- }
-
- /**
- * Process entity type self links.
- *
- * @param operationResult the operation result
- */
- private void processEntityTypeSelfLinks(OperationResult operationResult) {
-
- final String jsonResult = operationResult.getResult();
-
- if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
-
- try {
- JsonNode rootNode = mapper.readTree(jsonResult);
- JsonNode resultData = rootNode.get("result-data");
-
- if (resultData.isArray()) {
- ArrayNode resultDataArrayNode = (ArrayNode) resultData;
-
- Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
-
- while (elementIterator.hasNext()) {
- JsonNode element = elementIterator.next();
-
- final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
- final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
-
- if (resourceType != null && resourceLink != null) {
-
- if (geoDescriptorMap.containsKey(resourceType)) {
- selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType));
- } else {
- LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
- // go to next element in iterator
- continue;
- }
-
- }
- }
- }
- } catch (IOException exc) {
- LOG.error(AaiUiMsgs.ERROR_GENERIC, exc);
- }
- }
-
- }
-
- /**
- * Process entity type self link result.
- *
- * @param txn the txn
- */
- private void processEntityTypeSelfLinkResult(NetworkTransaction txn) {
-
- updateActiveInventoryCounters(txn);
-
- if (!txn.getOperationResult().wasSuccessful()) {
- return;
- }
-
- GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType());
-
- if ( descriptor == null ) {
- return;
- }
-
- try {
- if (descriptor.hasGeoEntity()) {
-
- GeoIndexDocument geoDoc = new GeoIndexDocument();
-
- final String jsonResult = txn.getOperationResult().getResult();
-
- if (jsonResult != null && jsonResult.length() > 0) {
-
- populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink());
-
- if (!geoDoc.isValidGeoDocument()) {
-
- LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString());
-
- } else {
-
- String link = null;
- try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc);
- }
-
- if (link != null) {
-
- NetworkTransaction n2 = new NetworkTransaction();
- n2.setLink(link);
- n2.setEntityType(txn.getEntityType());
- n2.setDescriptor(txn.getDescriptor());
- n2.setOperationType(HttpMethod.PUT);
-
- esWorkOnHand.incrementAndGet();
-
- supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor)
- .whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage());
- } else {
- updateElasticSearchCounters(result);
- processStoreDocumentResult(result);
- }
- });
- }
- }
- }
- }
- } catch (JsonProcessingException exc) {
- LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
- } catch (IOException exc) {
- LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc);
- }
-
- return;
- }
-
-
- /**
- * Process store document result.
- *
- * @param txn the txn
- */
- private void processStoreDocumentResult(NetworkTransaction txn) {
-
- OperationResult or = txn.getOperationResult();
-
- if (!or.wasSuccessful()) {
- LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString());
- /*
- * if(or.getResultCode() != 404 || (or.getResultCode() == 404 &&
- * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error(
- * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " +
- * or.getResult()); }
- */
-
- }
-
- }
-
-
- @Override
- public SynchronizerState getState() {
-
- if (!isSyncDone()) {
- return SynchronizerState.PERFORMING_SYNCHRONIZATION;
- }
-
- return SynchronizerState.IDLE;
-
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
- */
- @Override
- public String getStatReport(boolean showFinalReport) {
- syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
- return this.getStatReport(syncDurationInMs, showFinalReport);
- }
-
- /* (non-Javadoc)
- * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
- */
- @Override
- public void shutdown() {
- this.shutdownExecutors();
- }
-
- /**
- * Populate geo document.
- *
- * @param doc the doc
- * @param result the result
- * @param resultDescriptor the result descriptor
- * @param entityLink the entity link
- * @throws JsonProcessingException the json processing exception
- * @throws IOException Signals that an I/O exception has occurred.
- */
- protected void populateGeoDocument(GeoIndexDocument doc, String result,
- OxmEntityDescriptor resultDescriptor, String entityLink)
- throws JsonProcessingException, IOException {
-
- doc.setSelfLink(entityLink);
- doc.setEntityType(resultDescriptor.getEntityName());
-
- JsonNode entityNode = mapper.readTree(result);
-
- List<String> primaryKeyValues = new ArrayList<String>();
- String pkeyValue = null;
-
- for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
- pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
- if (pkeyValue != null) {
- primaryKeyValues.add(pkeyValue);
- } else {
- LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
- }
- }
-
- final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
- doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
-
- GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName());
-
- String geoLatKey = descriptor.getGeoLatName();
- String geoLongKey = descriptor.getGeoLongName();
-
- doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey));
- doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey));
- doc.deriveFields();
-
- }
-
- @Override
- protected boolean isSyncDone() {
- if (shouldSkipSync()) {
- syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
- return true;
- }
-
- int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
-
- if (totalWorkOnHand > 0 || !allWorkEnumerated) {
- return false;
- }
-
- return true;
- }
-
-}
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java
index 69acc42..17c9072 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java
@@ -34,11 +34,11 @@ import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.OxmModelLoader;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
import org.onap.aai.sparky.dal.GizmoAdapter;
+import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
import org.onap.aai.sparky.logging.AaiUiMsgs;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.subscription.config.SubscriptionConfig;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
import org.onap.aai.sparky.sync.entity.SearchableEntity;
import org.onap.aai.sparky.util.NodeUtils;
@@ -64,7 +64,7 @@ public class BaseVisualizationService implements VisualizationService {
private final ActiveInventoryAdapter aaiAdapter;
private final GizmoAdapter gizmoAdapter;
- private final ElasticSearchAdapter esAdapter;
+ private final SearchServiceAdapter searchServiceAdapter;
private final ExecutorService aaiExecutorService;
private ConcurrentHashMap<Long, VisualizationContext> contextMap;
@@ -72,18 +72,18 @@ public class BaseVisualizationService implements VisualizationService {
private VisualizationConfigs visualizationConfigs;
private SubscriptionConfig subConfig;
- private ElasticSearchEndpointConfig endpointEConfig;
+ private RestEndpointConfig endpointConfig;
private ElasticSearchSchemaConfig schemaEConfig;
private OxmEntityLookup oxmEntityLookup;
public BaseVisualizationService(OxmModelLoader loader, VisualizationConfigs visualizationConfigs,
- ActiveInventoryAdapter aaiAdapter, GizmoAdapter gizmoAdapter, ElasticSearchAdapter esAdapter,
- ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig,
+ ActiveInventoryAdapter aaiAdapter, GizmoAdapter gizmoAdapter, SearchServiceAdapter searchServiceAdapter,
+ RestEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig,
int numActiveInventoryWorkers, OxmEntityLookup oxmEntityLookup, SubscriptionConfig subscriptionConfig)
throws Exception {
this.visualizationConfigs = visualizationConfigs;
- this.endpointEConfig = endpointConfig;
+ this.endpointConfig = endpointConfig;
this.schemaEConfig = schemaConfig;
this.oxmEntityLookup = oxmEntityLookup;
this.subConfig = subscriptionConfig;
@@ -97,7 +97,7 @@ public class BaseVisualizationService implements VisualizationService {
this.aaiAdapter = aaiAdapter;
this.gizmoAdapter = gizmoAdapter;
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@@ -159,10 +159,10 @@ public class BaseVisualizationService implements VisualizationService {
if (operationResult.wasSuccessful()) {
try {
- JsonNode elasticValue = mapper.readValue(operationResult.getResult(), JsonNode.class);
+ JsonNode searchServiceResults = mapper.readValue(operationResult.getResult(), JsonNode.class);
- if (elasticValue != null) {
- JsonNode sourceField = elasticValue.get("_source");
+ if (searchServiceResults != null) {
+ JsonNode sourceField = extractSearchServiceContent(searchServiceResults);
if (sourceField != null) {
sourceEntity = new SearchableEntity();
@@ -203,9 +203,9 @@ public class BaseVisualizationService implements VisualizationService {
* Here is where we need to make a dip to elastic-search for the self-link by entity-id (link
* hash).
*/
- dataCollectionResult = esAdapter.retrieveEntityById(endpointEConfig.getEsIpAddress(),
- endpointEConfig.getEsServerPort(),schemaEConfig.getIndexName(),
- schemaEConfig.getIndexDocType(), queryRequest.getHashId());
+ dataCollectionResult = searchServiceAdapter.retrieveEntityById(queryRequest.getHashId(),
+ schemaEConfig.getIndexName());
+
sourceEntity = extractSearchableEntityFromElasticEntity(dataCollectionResult);
if (sourceEntity != null) {
@@ -377,6 +377,16 @@ public class BaseVisualizationService implements VisualizationService {
}
return output;
}
+
+ private JsonNode extractSearchServiceContent(JsonNode returnedData){
+
+ JsonNode searchResults = returnedData.get("searchResult");
+ JsonNode searchHits = searchResults.get("hits");
+ JsonNode searchDoc = searchHits.get(0).get("document");
+ JsonNode content = searchDoc.get("content");
+
+ return content;
+ }
public void shutdown() {
aaiExecutorService.shutdown();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java
index 8365237..79eded1 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java
@@ -58,9 +58,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
import org.onap.aai.sparky.util.NodeUtils;
import org.slf4j.MDC;
@@ -378,7 +378,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
*/
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
@@ -441,8 +441,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
if (wasEntryDiscovered) {
if (versionNumber != null && jsonPayload != null) {
- String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
- "default", se.getId(), versionNumber, jsonPayload);
+ String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
+ se.getId(), jsonPayload);
NetworkTransaction transactionTracker = new NetworkTransaction();
transactionTracker.setEntityType(esGetTxn.getEntityType());
@@ -450,9 +450,9 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
transactionTracker.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
- requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
+ requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -478,8 +478,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
- esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+ esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
@@ -578,7 +578,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
}
@@ -592,8 +592,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java
index d8b4af6..2cecf25 100644
--- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java
+++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java
@@ -25,15 +25,13 @@ import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
import org.onap.aai.sparky.crossentityreference.sync.CrossEntityReferenceSynchronizer;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
-import org.onap.aai.sparky.dal.ElasticSearchAdapter;
-import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner;
+import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig;
+import org.onap.aai.sparky.search.SearchServiceAdapter;
import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory;
-import org.onap.aai.sparky.sync.IndexCleaner;
import org.onap.aai.sparky.sync.IndexIntegrityValidator;
import org.onap.aai.sparky.sync.SyncControllerImpl;
import org.onap.aai.sparky.sync.SyncControllerRegistrar;
import org.onap.aai.sparky.sync.SyncControllerRegistry;
-import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
import org.onap.aai.sparky.sync.config.SyncControllerConfig;
@@ -43,13 +41,13 @@ public class ViewInspectSyncController extends SyncControllerImpl
private SyncControllerRegistry syncControllerRegistry;
private ActiveInventoryAdapter aaiAdapter;
- private ElasticSearchAdapter esAdapter;
+ private SearchServiceAdapter searchServiceAdapter;
private ElasticSearchSchemaConfig schemaConfig;
- private ElasticSearchEndpointConfig endpointConfig;
+ private RestEndpointConfig endpointConfig;
public ViewInspectSyncController(SyncControllerConfig syncControllerConfig,
- ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
- ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig,
+ ActiveInventoryAdapter aaiAdapter, SearchServiceAdapter searchServiceAdapter,
+ ElasticSearchSchemaConfig schemaConfig, RestEndpointConfig endpointConfig,
NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
CrossEntityReferenceLookup crossEntityReferenceLookup, OxmEntityLookup oxmEntityLookup,
SearchableEntityLookup searchableEntityLookup,
@@ -60,10 +58,10 @@ public class ViewInspectSyncController extends SyncControllerImpl
// final String controllerName = "View and Inspect Entity Synchronizer";
this.aaiAdapter = aaiAdapter;
- this.esAdapter = esAdapter;
+ this.searchServiceAdapter = searchServiceAdapter;
this.schemaConfig = schemaConfig;
this.endpointConfig = endpointConfig;
- IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(esAdapter, schemaConfig,
+ IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(searchServiceAdapter, schemaConfig,
endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig));
registerIndexValidator(indexValidator);
@@ -76,7 +74,7 @@ public class ViewInspectSyncController extends SyncControllerImpl
oxmEntityLookup, searchableEntityLookup);
ses.setAaiAdapter(aaiAdapter);
- ses.setElasticSearchAdapter(esAdapter);
+ ses.setSearchServiceAdapter(searchServiceAdapter);
registerEntitySynchronizer(ses);
@@ -87,14 +85,10 @@ public class ViewInspectSyncController extends SyncControllerImpl
crossEntityReferenceLookup, oxmEntityLookup, searchableEntityLookup);
cers.setAaiAdapter(aaiAdapter);
- cers.setElasticSearchAdapter(esAdapter);
+ cers.setSearchServiceAdapter(searchServiceAdapter);
registerEntitySynchronizer(cers);
- IndexCleaner indexCleaner =
- new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig);
-
- registerIndexCleaner(indexCleaner);
}