diff options
author | renealr <reneal.rogers@amdocs.com> | 2018-08-24 09:20:37 -0400 |
---|---|---|
committer | renealr <reneal.rogers@amdocs.com> | 2018-08-24 11:25:20 -0400 |
commit | 11b1339ca114835bf60356b4061f1ff3f3fd3e8b (patch) | |
tree | 6965f83294a0bd4d7a01935dbacf3bc3e61df147 /sparkybe-onap-service/src/main | |
parent | 3f9bce9e9d5c7f779a20d289811a88babfbda3cf (diff) |
update sync queries to use searh data service
Issue-ID: AAI-1540
Change-Id: I6a00b0e12830e1220070ae60ba9b2c42e67dfbfc
Signed-off-by: renealr <reneal.rogers@amdocs.com>
Diffstat (limited to 'sparkybe-onap-service/src/main')
30 files changed, 241 insertions, 3375 deletions
diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java index 9c3c1d9..7fd7e9c 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSyncControllerFactory.java @@ -30,17 +30,15 @@ import org.onap.aai.sparky.config.oxm.OxmEntityLookup; import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor; import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner; import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory; -import org.onap.aai.sparky.sync.IndexCleaner; import org.onap.aai.sparky.sync.IndexIntegrityValidator; import org.onap.aai.sparky.sync.SyncController; import org.onap.aai.sparky.sync.SyncControllerImpl; import org.onap.aai.sparky.sync.SyncControllerRegistrar; import org.onap.aai.sparky.sync.SyncControllerRegistry; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; import org.onap.aai.sparky.sync.config.SyncControllerConfig; @@ -51,13 +49,13 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar LoggerFactory.getInstance().getLogger(AggregationSyncControllerFactory.class); private ActiveInventoryAdapter aaiAdapter; - private ElasticSearchAdapter esAdapter; + private SearchServiceAdapter searchServiceAdapter; private SuggestionEntityLookup suggestionEntityLookup; private Map<String, String> aggregationEntityToIndexMap; private Map<String, ElasticSearchSchemaConfig> indexNameToSchemaConfigMap; - private ElasticSearchEndpointConfig elasticSearchEndpointConfig; + private RestEndpointConfig endpointConfig; private SyncControllerConfig syncControllerConfig; private SyncControllerRegistry syncControllerRegistry; private NetworkStatisticsConfig aaiStatConfig; @@ -67,14 +65,14 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar private List<SyncController> syncControllers; - public AggregationSyncControllerFactory(ElasticSearchEndpointConfig esEndpointConfig, + public AggregationSyncControllerFactory(RestEndpointConfig endpointConfig, SyncControllerConfig syncControllerConfig, SyncControllerRegistry syncControllerRegistry, SuggestionEntityLookup suggestionEntityLookup, OxmEntityLookup oxmEntityLookup, ElasticSearchSchemaFactory elasticSearchSchemaFactory) { this.elasticSearchSchemaFactory = elasticSearchSchemaFactory; this.syncControllers = new ArrayList<SyncController>(); - this.elasticSearchEndpointConfig = esEndpointConfig; + this.endpointConfig = endpointConfig; this.syncControllerConfig = syncControllerConfig; this.syncControllerRegistry = syncControllerRegistry; this.suggestionEntityLookup = suggestionEntityLookup; @@ -106,13 +104,13 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar this.indexNameToSchemaConfigMap = indexNameToSchemaConfigMap; } - public ElasticSearchEndpointConfig getElasticSearchEndpointConfig() { - return elasticSearchEndpointConfig; + public RestEndpointConfig getEndpointConfig() { + return endpointConfig; } - public void setElasticSearchEndpointConfig( - ElasticSearchEndpointConfig elasticSearchEndpointConfig) { - this.elasticSearchEndpointConfig = elasticSearchEndpointConfig; + public void setEndpointConfig( + RestEndpointConfig endpointConfig) { + this.endpointConfig = endpointConfig; } public SyncControllerConfig getSyncControllerConfig() { @@ -131,12 +129,12 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar this.aaiAdapter = aaiAdapter; } - public ElasticSearchAdapter getEsAdapter() { - return esAdapter; + public SearchServiceAdapter getSearchServiceAdapter() { + return searchServiceAdapter; } - public void setEsAdapter(ElasticSearchAdapter esAdapter) { - this.esAdapter = esAdapter; + public void setSearchServiceAdapter(SearchServiceAdapter searchServiceAdapter) { + this.searchServiceAdapter = searchServiceAdapter; } public SuggestionEntityLookup getSuggestionEntityLookup() { @@ -185,8 +183,8 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar continue; } - IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(esAdapter, - schemaConfig, elasticSearchEndpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); + IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(searchServiceAdapter, + schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); aggregationSyncController.registerIndexValidator(aggregationIndexValidator); @@ -197,15 +195,10 @@ public class AggregationSyncControllerFactory implements SyncControllerRegistrar oxmEntityLookup); aggSynchronizer.setAaiAdapter(aaiAdapter); - aggSynchronizer.setElasticSearchAdapter(esAdapter); + aggSynchronizer.setSearchServiceAdapter(searchServiceAdapter); aggregationSyncController.registerEntitySynchronizer(aggSynchronizer); - IndexCleaner entityDataIndexCleaner = - new ElasticSearchIndexCleaner(esAdapter, elasticSearchEndpointConfig, schemaConfig); - - aggregationSyncController.registerIndexCleaner(entityDataIndexCleaner); - syncControllers.add(aggregationSyncController); } catch (Exception exc) { diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java index 67015c5..0b9733f 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/AggregationSynchronizer.java @@ -56,9 +56,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor; import org.onap.aai.sparky.sync.enumeration.OperationState; import org.onap.aai.sparky.sync.enumeration.SynchronizerState; import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchPut; -import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate; +import org.onap.aai.sparky.sync.task.PerformSearchServicePut; +import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval; +import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate; import org.onap.aai.sparky.util.NodeUtils; import org.slf4j.MDC; @@ -263,7 +263,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer */ String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), ae.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); return; @@ -326,9 +326,9 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer if (wasEntryDiscovered) { if (versionNumber != null && jsonPayload != null) { - String requestPayload = - elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(), - schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload); + String requestPayload = + searchServiceAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(), + ae.getId(),jsonPayload); NetworkTransaction transactionTracker = new NetworkTransaction(); transactionTracker.setEntityType(esGetTxn.getEntityType()); @@ -336,9 +336,9 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer transactionTracker.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(), - requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor) - .whenComplete((result, error) -> { + supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(), + requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor) + .whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -363,8 +363,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer updateElasticTxn.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter), - esPutExecutor).whenComplete((result, error) -> { + supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter), + esPutExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -525,7 +525,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), ae.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage()); } @@ -539,8 +539,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor) - .whenComplete((result, error) -> { + supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor) + .whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java deleted file mode 100644 index c6cd3b1..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySummarizer.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.aggregation.sync; - -import static java.util.concurrent.CompletableFuture.supplyAsync; - -import java.io.IOException; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.EnumSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - -import javax.json.Json; -import javax.ws.rs.core.MediaType; - -import org.onap.aai.cl.api.Logger; -import org.onap.aai.cl.eelf.LoggerFactory; -import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.config.oxm.SearchableEntityLookup; -import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor; -import org.onap.aai.sparky.dal.rest.HttpMethod; -import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.sync.AbstractEntitySynchronizer; -import org.onap.aai.sparky.sync.IndexSynchronizer; -import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; -import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; -import org.onap.aai.sparky.sync.enumeration.OperationState; -import org.onap.aai.sparky.sync.enumeration.SynchronizerState; -import org.onap.aai.sparky.util.NodeUtils; -import org.slf4j.MDC; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; - -/** - * The Class HistoricalEntitySummarizer. - */ -public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer - implements IndexSynchronizer { - - private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class); - private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ"; - - private boolean allWorkEnumerated; - private ConcurrentHashMap<String, AtomicInteger> entityCounters; - private boolean syncInProgress; - private Map<String, String> contextMap; - private ElasticSearchSchemaConfig schemaConfig; - private SearchableEntityLookup searchableEntityLookup; - - /** - * Instantiates a new historical entity summarizer. - * - * @throws Exception the exception - */ - public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers, - int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig, - NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup) - throws Exception { - super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig); - - this.schemaConfig = schemaConfig; - this.allWorkEnumerated = false; - this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>(); - this.synchronizerName = "Historical Entity Summarizer"; - this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS); - this.syncInProgress = false; - this.contextMap = MDC.getCopyOfContextMap(); - this.syncDurationInMs = -1; - this.searchableEntityLookup = searchableEntityLookup; - } - - /** - * Collect all the work. - * - * @return the operation state - */ - private OperationState collectAllTheWork() { - - Map<String, SearchableOxmEntityDescriptor> descriptorMap = - searchableEntityLookup.getSearchableEntityDescriptors(); - - if (descriptorMap.isEmpty()) { - LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities"); - - return OperationState.ERROR; - } - - Collection<String> entityTypes = descriptorMap.keySet(); - - AtomicInteger asyncWoH = new AtomicInteger(0); - - asyncWoH.set(entityTypes.size()); - - try { - for (String entityType : entityTypes) { - - supplyAsync(new Supplier<Void>() { - - @Override - public Void get() { - MDC.setContextMap(contextMap); - try { - OperationResult typeLinksResult = - aaiAdapter.getSelfLinksByEntityType(entityType); - updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult); - processEntityTypeSelfLinks(entityType, typeLinksResult); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage()); - - } - - return null; - } - - }, aaiExecutor).whenComplete((result, error) -> { - - asyncWoH.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage()); - } - - }); - - } - - - while (asyncWoH.get() > 0) { - - if (LOG.isDebugEnabled()) { - LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed."); - } - - Thread.sleep(250); - } - - esWorkOnHand.set(entityCounters.size()); - - // start doing the real work - allWorkEnumerated = true; - - insertEntityTypeCounters(); - - if (LOG.isDebugEnabled()) { - - StringBuilder sb = new StringBuilder(128); - - sb.append("\n\nHistorical Entity Counters:"); - - for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) { - sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get()); - } - - LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString()); - - } - - } catch (Exception exc) { - LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage()); - - - esWorkOnHand.set(0); - allWorkEnumerated = true; - - return OperationState.ERROR; - } - - return OperationState.OK; - - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync() - */ - @Override - public OperationState doSync() { - this.syncDurationInMs = -1; - String txnID = NodeUtils.getRandomTxnId(); - MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", ""); - - if (syncInProgress) { - LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING); - return OperationState.PENDING; - } - - clearCache(); - - syncInProgress = true; - this.syncStartedTimeStampInMs = System.currentTimeMillis(); - allWorkEnumerated = false; - - return collectAllTheWork(); - } - - /** - * Process entity type self links. - * - * @param entityType the entity type - * @param operationResult the operation result - */ - private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) { - - JsonNode rootNode = null; - - final String jsonResult = operationResult.getResult(); - - if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) { - - try { - rootNode = mapper.readTree(jsonResult); - } catch (IOException exc) { - LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage()); - return; - } - - JsonNode resultData = rootNode.get("result-data"); - ArrayNode resultDataArrayNode = null; - - if (resultData != null && resultData.isArray()) { - resultDataArrayNode = (ArrayNode) resultData; - entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size())); - } - } - - } - - /** - * Insert entity type counters. - */ - private void insertEntityTypeCounters() { - - if (esWorkOnHand.get() <= 0) { - return; - } - - SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT); - Timestamp timestamp = new Timestamp(System.currentTimeMillis()); - String currentFormattedTimeStamp = dateFormat.format(timestamp); - - Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet(); - - for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) { - - supplyAsync(new Supplier<Void>() { - - @Override - public Void get() { - MDC.setContextMap(contextMap); - String jsonString = Json.createObjectBuilder().add( - "count", entityCounterEntry.getValue().get()) - .add("entityType", entityCounterEntry.getKey()) - .add("timestamp", currentFormattedTimeStamp).build().toString(); - - String link = null; - try { - link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName); - OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE); - updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() ); - } - - return null; - } - - }, esExecutor).whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - }); - - } - - while (esWorkOnHand.get() > 0) { - - try { - Thread.sleep(500); - } catch (InterruptedException exc) { - LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage()); - Thread.currentThread().interrupt(); - } - } - - } - - @Override - public SynchronizerState getState() { - - if (!isSyncDone()) { - return SynchronizerState.PERFORMING_SYNCHRONIZATION; - } - - return SynchronizerState.IDLE; - - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean) - */ - @Override - public String getStatReport(boolean showFinalReport) { - syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs; - return this.getStatReport(syncDurationInMs, showFinalReport); - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown() - */ - @Override - public void shutdown() { - this.shutdownExecutors(); - } - - @Override - protected boolean isSyncDone() { - - int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get(); - - if (LOG.isDebugEnabled()) { - LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand - + " all work enumerated = " + allWorkEnumerated); - } - - if (totalWorkOnHand > 0 || !allWorkEnumerated) { - return false; - } - - this.syncInProgress = false; - - return true; - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache() - */ - @Override - public void clearCache() { - - if (syncInProgress) { - LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored"); - return; - } - - super.clearCache(); - this.resetCounters(); - if (entityCounters != null) { - entityCounters.clear(); - } - - allWorkEnumerated = false; - - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java deleted file mode 100644 index da1f290..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/aggregation/sync/HistoricalEntitySyncController.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.aggregation.sync; - -import org.onap.aai.sparky.config.oxm.SearchableEntityLookup; -import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory; -import org.onap.aai.sparky.sync.IndexIntegrityValidator; -import org.onap.aai.sparky.sync.SyncControllerImpl; -import org.onap.aai.sparky.sync.SyncControllerRegistrar; -import org.onap.aai.sparky.sync.SyncControllerRegistry; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; -import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; -import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; -import org.onap.aai.sparky.sync.config.SyncControllerConfig; - -import java.util.concurrent.TimeUnit; - -public class HistoricalEntitySyncController extends SyncControllerImpl - implements SyncControllerRegistrar { - - private SyncControllerRegistry syncControllerRegistry; - - public HistoricalEntitySyncController(SyncControllerConfig syncControllerConfig, - ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter, - ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig, - int syncFrequencyInMinutes, NetworkStatisticsConfig aaiStatConfig, - NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup, - ElasticSearchSchemaFactory elasticSearchSchemaFactory) throws Exception { - super(syncControllerConfig); - - // final String controllerName = "Historical Entity Count Synchronizer"; - - long taskFrequencyInMs = getTaskFrequencyInMs(syncFrequencyInMinutes); - - setDelayInMs(taskFrequencyInMs); - setSyncFrequencyInMs(taskFrequencyInMs); - - IndexIntegrityValidator entityCounterHistoryValidator = new IndexIntegrityValidator(esAdapter, - schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); - - registerIndexValidator(entityCounterHistoryValidator); - - HistoricalEntitySummarizer historicalSummarizer = new HistoricalEntitySummarizer(schemaConfig, - syncControllerConfig.getNumInternalSyncWorkers(), - syncControllerConfig.getNumSyncActiveInventoryWorkers(), - syncControllerConfig.getNumSyncElasticWorkers(),aaiStatConfig, esStatConfig,searchableEntityLookup); - - historicalSummarizer.setAaiAdapter(aaiAdapter); - historicalSummarizer.setElasticSearchAdapter(esAdapter); - - registerEntitySynchronizer(historicalSummarizer); - - } - - static long getTaskFrequencyInMs(int syncFrequencyInMinutes) { - return TimeUnit.MINUTES.toMillis(Integer.toUnsignedLong(syncFrequencyInMinutes)); - } - - public SyncControllerRegistry getSyncControllerRegistry() { - return syncControllerRegistry; - } - - public void setSyncControllerRegistry(SyncControllerRegistry syncControllerRegistry) { - this.syncControllerRegistry = syncControllerRegistry; - } - - @Override - public void registerController() { - if ( syncControllerRegistry != null ) { - if ( syncControllerConfig.isEnabled()) { - syncControllerRegistry.registerSyncController(this); - } - } - - } -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java index 54b82fb..6596a9d 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutoSuggestionSyncController.java @@ -23,16 +23,14 @@ package org.onap.aai.sparky.autosuggestion.sync; import org.onap.aai.sparky.config.oxm.OxmEntityLookup; import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.search.filters.config.FiltersConfig; -import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner; import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory; -import org.onap.aai.sparky.sync.IndexCleaner; import org.onap.aai.sparky.sync.IndexIntegrityValidator; import org.onap.aai.sparky.sync.SyncControllerImpl; import org.onap.aai.sparky.sync.SyncControllerRegistrar; import org.onap.aai.sparky.sync.SyncControllerRegistry; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; import org.onap.aai.sparky.sync.config.SyncControllerConfig; @@ -43,8 +41,8 @@ public class AutoSuggestionSyncController extends SyncControllerImpl implements private SyncControllerRegistry syncControllerRegistry; public AutoSuggestionSyncController(SyncControllerConfig syncControllerConfig, - ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter, - ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig, + ActiveInventoryAdapter aaiAdapter, SearchServiceAdapter searchServiceAdapter, + ElasticSearchSchemaConfig schemaConfig, RestEndpointConfig endpointConfig, NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup, SuggestionEntityLookup suggestionEntityLookup, FiltersConfig filtersConfig, @@ -53,7 +51,7 @@ public class AutoSuggestionSyncController extends SyncControllerImpl implements // final String controllerName = "Auto Suggestion Synchronizer"; - IndexIntegrityValidator autoSuggestionIndexValidator = new IndexIntegrityValidator(esAdapter, + IndexIntegrityValidator autoSuggestionIndexValidator = new IndexIntegrityValidator(searchServiceAdapter, schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); registerIndexValidator(autoSuggestionIndexValidator); @@ -65,15 +63,10 @@ public class AutoSuggestionSyncController extends SyncControllerImpl implements oxmEntityLookup, suggestionEntityLookup, filtersConfig); suggestionSynchronizer.setAaiAdapter(aaiAdapter); - suggestionSynchronizer.setElasticSearchAdapter(esAdapter); + suggestionSynchronizer.setSearchServiceAdapter(searchServiceAdapter); registerEntitySynchronizer(suggestionSynchronizer); - IndexCleaner autosuggestIndexCleaner = - new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig); - - registerIndexCleaner(autosuggestIndexCleaner); - } public SyncControllerRegistry getSyncControllerRegistry() { diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java index 74ee4ea..583f260 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/AutosuggestionSynchronizer.java @@ -62,8 +62,8 @@ import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity; import org.onap.aai.sparky.sync.enumeration.OperationState; import org.onap.aai.sparky.sync.enumeration.SynchronizerState; import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchPut; -import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; +import org.onap.aai.sparky.sync.task.PerformSearchServicePut; +import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval; import org.onap.aai.sparky.util.NodeUtils; import org.onap.aai.sparky.util.SuggestionsPermutation; import org.slf4j.MDC; @@ -434,7 +434,7 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer if (sse.isSuggestableDoc()) { String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), sse.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage()); } @@ -448,8 +448,8 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor) - .whenComplete((result, error) -> { + supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor) + .whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -515,7 +515,7 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer */ String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), sse.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); return; @@ -552,8 +552,8 @@ public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer updateElasticTxn.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter), - esPutExecutor).whenComplete((result, error) -> { + supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter), + esPutExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java index fe3ecd0..7501e5d 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSuggestionSynchronizer.java @@ -40,7 +40,7 @@ import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity; import org.onap.aai.sparky.sync.enumeration.OperationState; import org.onap.aai.sparky.sync.enumeration.SynchronizerState; -import org.onap.aai.sparky.sync.task.PerformElasticSearchPut; +import org.onap.aai.sparky.sync.task.PerformSearchServicePut; import org.onap.aai.sparky.util.NodeUtils; import org.slf4j.MDC; @@ -119,7 +119,7 @@ public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), syncEntity.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), syncEntity.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); } @@ -135,8 +135,8 @@ public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer esWorkOnHand.incrementAndGet(); final Map<String, String> contextMap = MDC.getCopyOfContextMap(); - supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn, - elasticSearchAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> { + supplyAsync(new PerformSearchServicePut(jsonPayload, elasticPutTxn, + searchServiceAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java index c48ee5a..62183fc 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/autosuggestion/sync/VnfAliasSyncController.java @@ -21,16 +21,14 @@ package org.onap.aai.sparky.autosuggestion.sync; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.search.filters.config.FiltersConfig; -import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner; import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory; -import org.onap.aai.sparky.sync.IndexCleaner; import org.onap.aai.sparky.sync.IndexIntegrityValidator; import org.onap.aai.sparky.sync.SyncControllerImpl; import org.onap.aai.sparky.sync.SyncControllerRegistrar; import org.onap.aai.sparky.sync.SyncControllerRegistry; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; import org.onap.aai.sparky.sync.config.SyncControllerConfig; @@ -40,8 +38,8 @@ public class VnfAliasSyncController extends SyncControllerImpl implements SyncCo private SyncControllerRegistry syncControllerRegistry; public VnfAliasSyncController(SyncControllerConfig syncControllerConfig, - ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter, - ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig, + ActiveInventoryAdapter aaiAdapter, SearchServiceAdapter searchServiceAdapter, + ElasticSearchSchemaConfig schemaConfig, RestEndpointConfig endpointConfig, NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig, FiltersConfig filtersConfig, ElasticSearchSchemaFactory elasticSearchSchemaFactory) throws Exception { @@ -49,7 +47,7 @@ public class VnfAliasSyncController extends SyncControllerImpl implements SyncCo // final String controllerName = "VNFs Alias Suggestion Synchronizer"; - IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(esAdapter, schemaConfig, + IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(searchServiceAdapter, schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); registerIndexValidator(indexValidator); @@ -60,16 +58,10 @@ public class VnfAliasSyncController extends SyncControllerImpl implements SyncCo syncControllerConfig.getNumSyncElasticWorkers(), aaiStatConfig, esStatConfig, filtersConfig); synchronizer.setAaiAdapter(aaiAdapter); - synchronizer.setElasticSearchAdapter(esAdapter); + synchronizer.setSearchServiceAdapter(searchServiceAdapter); registerEntitySynchronizer(synchronizer); - - IndexCleaner indexCleaner = - new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig); - - registerIndexCleaner(indexCleaner); - } public SyncControllerRegistry getSyncControllerRegistry() { diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java index 2087fa3..cf7908b 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/crossentityreference/sync/CrossEntityReferenceSynchronizer.java @@ -60,9 +60,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor; import org.onap.aai.sparky.sync.enumeration.OperationState; import org.onap.aai.sparky.sync.enumeration.SynchronizerState; import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchPut; -import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate; +import org.onap.aai.sparky.sync.task.PerformSearchServicePut; +import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval; +import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate; import org.onap.aai.sparky.util.NodeUtils; import org.slf4j.MDC; @@ -575,8 +575,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer String link = null; try { - link = elasticSearchAdapter - .buildElasticSearchGetDocUrl(getIndexName(), icer.getId()); + link = searchServiceAdapter + .buildSearchServiceDocUrl(getIndexName(), icer.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage()); @@ -592,8 +592,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer esWorkOnHand.incrementAndGet(); supplyAsync( - new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), - esExecutor).whenComplete((result, error) -> { + new PerformSearchServiceRetrieval(n2, searchServiceAdapter), + esExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -669,7 +669,7 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer */ String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), icer.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); return; @@ -728,8 +728,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer if (wasEntryDiscovered) { if (versionNumber != null && jsonPayload != null) { - String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(), - "default", icer.getId(), versionNumber, jsonPayload); + String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(), + icer.getId(), jsonPayload); NetworkTransaction transactionTracker = new NetworkTransaction(); transactionTracker.setEntityType(esGetResult.getEntityType()); @@ -737,9 +737,9 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer transactionTracker.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(), - requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor) - .whenComplete((result, error) -> { + supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(), + requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor) + .whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -762,8 +762,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer updateElasticTxn.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter), - esPutExecutor).whenComplete((result, error) -> { + supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter), + esPutExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java deleted file mode 100644 index 3072b91..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/dal/ElasticSearchAdapter.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.dal; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.ws.rs.core.MediaType; - -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.restclient.client.RestClient; -import org.onap.aai.sparky.dal.rest.RestClientConstructionException; -import org.onap.aai.sparky.dal.rest.RestClientFactory; -import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; - -/** - * The Class ElasticSearchAdapter. - - */ -public class ElasticSearchAdapter { - - private static final String BULK_IMPORT_INDEX_TEMPLATE = - "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n"; - - private static final String BULK_API = "_bulk"; - - private static final String DEFAULT_TYPE = "default"; - - private RestClient restClient; - private RestEndpointConfig endpointConfig; - - /** - * Instantiates a new elastic search adapter. - * @throws RestClientConstructionException - */ - public ElasticSearchAdapter(RestEndpointConfig endpointConfig) throws RestClientConstructionException { - - this.restClient = RestClientFactory.buildClient(endpointConfig); - this.endpointConfig = endpointConfig; - - } - - protected Map<String, List<String>> getMessageHeaders() { - Map<String, List<String>> headers = new HashMap<String, List<String>>(); - // insert mandatory headers if there are any - return headers; - } - - public OperationResult doGet(String url, MediaType acceptContentType) { - return restClient.get(url, getMessageHeaders(), acceptContentType); - } - - public OperationResult doDelete(String url, MediaType acceptContentType) { - return restClient.delete(url, getMessageHeaders(), acceptContentType); - } - - public OperationResult doPost(String url, String jsonPayload, MediaType acceptContentType) { - return restClient.post(url, jsonPayload, getMessageHeaders(), MediaType.APPLICATION_JSON_TYPE, - acceptContentType); - } - - public OperationResult doPut(String url, String jsonPayload, MediaType acceptContentType) { - return restClient.put(url, jsonPayload, getMessageHeaders(), MediaType.APPLICATION_JSON_TYPE, - acceptContentType); - } - - public OperationResult doPatch(String url, String jsonPayload, MediaType acceptContentType) { - - Map<String,List<String>> headers = getMessageHeaders(); - headers.putIfAbsent("X-HTTP-Method-Override", new ArrayList<String>()); - headers.get("X-HTTP-Method-Override").add("PATCH"); - - return restClient.post(url, jsonPayload, headers, MediaType.APPLICATION_JSON_TYPE, acceptContentType); - } - - public OperationResult doHead(String url, MediaType acceptContentType) { - return restClient.head(url, getMessageHeaders(), acceptContentType); - } - - public OperationResult doBulkOperation(String url, String payload) { - return restClient.put(url, payload, getMessageHeaders(), - MediaType.APPLICATION_FORM_URLENCODED_TYPE, MediaType.APPLICATION_JSON_TYPE); - } - - public String buildBulkImportOperationRequest(String index, String type, String id, - String version, String payload) { - - StringBuilder requestPayload = new StringBuilder(128); - - requestPayload.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, index, type, id, version)); - requestPayload.append(payload).append("\n"); - - return requestPayload.toString(); - - } - - public OperationResult retrieveEntityById(String host, String port, String indexName, - String docType, String resourceUrl) { - String esUrl = - String.format("http://%s:%s/%s/%s/%s", host, port, indexName, docType, resourceUrl); - return doGet(esUrl, MediaType.APPLICATION_JSON_TYPE); - } - - public String buildElasticSearchUrlForApi(String indexName, String api) { - return String.format("http://%s:%s/%s/%s", endpointConfig.getEndpointIpAddress(), - endpointConfig.getEndpointServerPort(), indexName, api); - } - - public String buildElasticSearchUrl(String indexName, String docType) { - return String.format("http://%s:%s/%s/%s", endpointConfig.getEndpointIpAddress(), - endpointConfig.getEndpointServerPort(), indexName, docType); - } - - public String buildElasticSearchGetDocUrl(String indexName, String docType, String docId) { - return String.format("http://%s:%s/%s/%s/%s", endpointConfig.getEndpointIpAddress(), - endpointConfig.getEndpointServerPort(), indexName, docType, docId); - } - - public String buildElasticSearchGetDocUrl(String indexName, String docId) { - return buildElasticSearchGetDocUrl(indexName, DEFAULT_TYPE, docId); - } - - public String buildElasticSearchPostUrl(String indexName) { - return String.format("http://%s:%s/%s/%s", endpointConfig.getEndpointIpAddress(), - endpointConfig.getEndpointServerPort(), indexName, DEFAULT_TYPE); - } - - public String getBulkUrl() { - return String.format("http://%s:%s/%s", endpointConfig.getEndpointIpAddress(), - endpointConfig.getEndpointServerPort(), BULK_API); - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java deleted file mode 100644 index a7f372a..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/EntityHistoryQueryBuilder.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.inventory; - -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; - -/** - * The Class EntityHistoryQueryBuilder. - */ -public class EntityHistoryQueryBuilder { - - private static final String TABLE = "table"; - private static final String GRAPH = "graph"; - - /** - * Gets the query. - * - * @param type the type - * @return the query - */ - public static JsonObject getQuery(String type) { - if (type.equalsIgnoreCase(TABLE)) { - return createTableQuery(); - } else if (type.equalsIgnoreCase(GRAPH)) { - return createGraphQuery(); - } else { - return null; - } - } - - /** - * Creates the graph query. - * - * @return the json object - */ - public static JsonObject createGraphQuery() { - JsonObjectBuilder jsonBuilder = Json.createObjectBuilder(); - - jsonBuilder.add("aggs", - Json.createObjectBuilder().add("group_by_entityType", - Json.createObjectBuilder() - .add("terms", Json.createObjectBuilder().add("field", "entityType").add("size", 0)) - .add("aggs", Json.createObjectBuilder().add("group_by_date", - Json.createObjectBuilder().add("date_histogram", createDateHistogram()) - .add("aggs", Json.createObjectBuilder().add("sort_by_date", - Json.createObjectBuilder().add("top_hits", createTopHitsBlob()))))))); - jsonBuilder.add("size", 0); - - return jsonBuilder.build(); - } - - /** - * Creates the table query. - * - * @return the json object - */ - public static JsonObject createTableQuery() { - JsonObjectBuilder jsonBuilder = Json.createObjectBuilder(); - - jsonBuilder.add("aggs", - Json.createObjectBuilder().add("group_by_entityType", - Json.createObjectBuilder() - .add("terms", Json.createObjectBuilder().add("field", "entityType").add("size", 0)) - .add("aggs", Json.createObjectBuilder().add("sort_by_date", - Json.createObjectBuilder().add("top_hits", createTopHitsBlob()))))); - jsonBuilder.add("size", 0); - - return jsonBuilder.build(); - } - - /** - * Creates the date histogram. - * - * @return the json object - */ - private static JsonObject createDateHistogram() { - JsonObjectBuilder jsonBuilder = Json.createObjectBuilder(); - - jsonBuilder.add("field", "timestamp"); - jsonBuilder.add("min_doc_count", 1); - jsonBuilder.add("interval", "day"); - jsonBuilder.add("format", "epoch_millis"); - - return jsonBuilder.build(); - } - - /** - * Creates the top hits blob. - * - * @return the json object - */ - private static JsonObject createTopHitsBlob() { - JsonObjectBuilder builder = Json.createObjectBuilder(); - builder.add("size", 1); - builder.add("sort", getSortCriteria()); - return builder.build(); - } - - public static JsonArray getSortCriteria() { - JsonArrayBuilder jsonBuilder = Json.createArrayBuilder(); - jsonBuilder.add(Json.createObjectBuilder().add("timestamp", - Json.createObjectBuilder().add("order", "desc"))); - - return jsonBuilder.build(); - } - - /** - * The main method. - * - * @param args the arguments - */ - public static void main(String[] args) { - System.out.println("TABLE-QUERY: " + createTableQuery().toString()); - System.out.println("GRAPH_QUERY: " + createGraphQuery().toString()); - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java deleted file mode 100644 index 9c1f226..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/GeoVisualizationProcessor.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.inventory; - -import java.io.IOException; - -import org.apache.camel.Exchange; -import org.apache.camel.component.restlet.RestletConstants; -import org.json.JSONArray; -import org.json.JSONObject; -import org.onap.aai.cl.api.Logger; -import org.onap.aai.cl.eelf.LoggerFactory; -import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.util.NodeUtils; -import org.restlet.Request; -import org.restlet.Response; -import org.restlet.data.ClientInfo; -import org.restlet.data.Form; -import org.restlet.data.MediaType; -import org.restlet.data.Parameter; -import org.restlet.data.Status; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * The Class GeoVisualizationServlet. - */ -public class GeoVisualizationProcessor { - - private static final Logger LOG = - LoggerFactory.getInstance().getLogger(GeoVisualizationProcessor.class); - - private ObjectMapper mapper; - private ElasticSearchAdapter elasticSearchAdapter = null; - private String topographicalSearchIndexName; - - private static final String SEARCH_STRING = "_search"; - private static final String SEARCH_PARAMETER = "?filter_path=hits.hits._source&_source=location&size=5000&q=entityType:"; - private static final String PARAMETER_KEY = "entity"; - - /** - * Instantiates a new geo visualization processor - */ - public GeoVisualizationProcessor(ElasticSearchAdapter elasticSearchAdapter, String topographicalSearchIndexName) { - this.mapper = new ObjectMapper(); - this.elasticSearchAdapter = elasticSearchAdapter; - this.topographicalSearchIndexName = topographicalSearchIndexName; - } - - /** - * Gets the geo visualization results. - * - * @param response the response - * @param entityType the entity type - * @return the geo visualization results - * @throws Exception the exception - */ - protected OperationResult getGeoVisualizationResults(Exchange exchange) throws Exception { - OperationResult operationResult = new OperationResult(); - - - Object xTransactionId = exchange.getIn().getHeader("X-TransactionId"); - if (xTransactionId == null) { - xTransactionId = NodeUtils.getRandomTxnId(); - } - - Object partnerName = exchange.getIn().getHeader("X-FromAppId"); - if (partnerName == null) { - partnerName = "Browser"; - } - - Request request = exchange.getIn().getHeader(RestletConstants.RESTLET_REQUEST, Request.class); - - /* Disables automatic Apache Camel Restlet component logging which prints out an undesirable log entry - which includes client (e.g. browser) information */ - request.setLoggable(false); - - ClientInfo clientInfo = request.getClientInfo(); - MdcContext.initialize((String) xTransactionId, "AAI-UI", "", (String) partnerName, clientInfo.getAddress() + ":" + clientInfo.getPort()); - - String entityType = ""; - - Form form = request.getResourceRef().getQueryAsForm(); - for (Parameter parameter : form) { - if(PARAMETER_KEY.equals(parameter.getName())) { - entityType = parameter.getName(); - } - } - - String api = SEARCH_STRING + SEARCH_PARAMETER + entityType; - - final String requestUrl = elasticSearchAdapter.buildElasticSearchUrlForApi(topographicalSearchIndexName, api); - - try { - - OperationResult opResult = - elasticSearchAdapter.doGet(requestUrl, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE); - - JSONObject finalOutputJson = formatOutput(opResult.getResult()); - - Response response = exchange.getIn().getHeader(RestletConstants.RESTLET_RESPONSE, Response.class); - response.setStatus(Status.SUCCESS_OK); - response.setEntity(String.valueOf(finalOutputJson), MediaType.APPLICATION_JSON); - exchange.getOut().setBody(response); - - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ERROR_GENERIC, "Error processing Geo Visualization request"); - } - - return operationResult; - } - - /** - * Format output. - * - * @param results the results - * @return the JSON object - */ - private JSONObject formatOutput(String results) { - JsonNode resultNode = null; - JSONObject finalResult = new JSONObject(); - JSONArray entitiesArr = new JSONArray(); - - try { - resultNode = mapper.readTree(results); - - final JsonNode hitsNode = resultNode.get("hits").get("hits"); - if (hitsNode.isArray()) { - - for (final JsonNode arrayNode : hitsNode) { - JsonNode sourceNode = arrayNode.get("_source"); - if (sourceNode.get("location") != null) { - JsonNode locationNode = sourceNode.get("location"); - if (NodeUtils.isNumeric(locationNode.get("lon").asText()) - && NodeUtils.isNumeric(locationNode.get("lat").asText())) { - JSONObject location = new JSONObject(); - location.put("longitude", locationNode.get("lon").asText()); - location.put("latitude", locationNode.get("lat").asText()); - - entitiesArr.put(location); - } - - } - } - } - finalResult.put("plotPoints", entitiesArr); - - } catch (IOException exc) { - LOG.warn(AaiUiMsgs.ERROR_BUILDING_SEARCH_RESPONSE, exc.getLocalizedMessage()); - } - - return finalResult; - } -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java deleted file mode 100644 index 9f775e5..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/GeoIndexDocument.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.inventory.entity; - -import java.io.Serializable; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.List; - -import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor; -import org.onap.aai.sparky.config.oxm.OxmEntityLookup; -import org.onap.aai.sparky.sync.entity.IndexDocument; -import org.onap.aai.sparky.util.NodeUtils; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * The Class GeoIndexDocument. - */ -public class GeoIndexDocument implements Serializable, IndexDocument { - - @JsonIgnore - private static final long serialVersionUID = -5188479658230319058L; - - protected String entityType; - protected String entityPrimaryKeyValue; - protected String entityPrimaryKeyName; - protected String latitude; - protected String longitude; - protected String selfLink; - - @JsonIgnore - protected OxmEntityLookup oxmEntityLookup; - - @JsonIgnore - protected ObjectMapper mapper = new ObjectMapper(); - // generated, SHA-256 digest - @JsonIgnore - protected String id; - - /** - * Convert bytes to hex string. - * - * @param bytesToConvert the bytes to convert - * @return the string - */ - private static String convertBytesToHexString(byte[] bytesToConvert) { - StringBuffer hexString = new StringBuffer(); - for (int i = 0; i < bytesToConvert.length; i++) { - hexString.append(Integer.toHexString(0xFF & bytesToConvert[i])); - } - return hexString.toString(); - } - - - @JsonIgnore - public boolean isValidGeoDocument() { - - boolean isValid = true; - - isValid &= (this.getEntityType() != null); - isValid &= (this.getLatitude() != null); - isValid &= (this.getLongitude() != null); - isValid &= (this.getId() != null); - isValid &= (this.getSelfLink() != null); - - isValid &= NodeUtils.isNumeric(this.getLatitude()); - isValid &= NodeUtils.isNumeric(this.getLongitude()); - - return isValid; - } - - /** - * Concat array. - * - * @param list the list - * @param delimiter the delimiter - * @return the string - */ - private static String concatArray(List<String> list, char delimiter) { - - if (list == null || list.size() == 0) { - return ""; - } - - StringBuilder result = new StringBuilder(64); - - int listSize = list.size(); - boolean firstValue = true; - - for (String item : list) { - - if (firstValue) { - result.append(item); - firstValue = false; - } else { - result.append(delimiter).append(item); - } - - } - - return result.toString(); - - } - - /* - * We'll try and create a unique identity key that we can use for differencing the previously - * imported record sets as we won't have granular control of what is created/removed and when. The - * best we can hope for is identification of resources by generated Id until the Identity-Service - * UUID is tagged against all resources, then we can use that instead. - */ - - /** - * Generate unique sha digest. - * - * @param entityType the entity type - * @param fieldName the field name - * @param fieldValue the field value - * @return the string - * @throws NoSuchAlgorithmException the no such algorithm exception - */ - public static String generateUniqueShaDigest(String entityType, String fieldName, - String fieldValue) throws NoSuchAlgorithmException { - - /* - * Basically SHA-256 will result in an identity with a guaranteed uniqueness compared to just a - * java hashcode value. - */ - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - digest.update(String.format("%s.%s.%s", entityType, fieldName, fieldValue).getBytes()); - return convertBytesToHexString(digest.digest()); - } - - /** - * Instantiates a new geo index document. - */ - public GeoIndexDocument() {} - - /* - * (non-Javadoc) - * - */ - - @Override - @JsonIgnore - public String getAsJson() throws JsonProcessingException { - - if (latitude != null && longitude != null) { - - /** - * A valid entry from this class is one that has both lat and long. If one or both is missing - * we shouldn't be indexing anything. - */ - - return NodeUtils.convertObjectToJson(this, true); - - } - - return null; - - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.entity.IndexDocument#deriveFields() - */ - @Override - public void deriveFields() { - - /* - * We'll try and create a unique identity key that we can use for differencing the previously - * imported record sets as we won't have granular control of what is created/removed and when. - * The best we can hope for is identification of resources by generated Id until the - * Identity-Service UUID is tagged against all resources, then we can use that instead. - */ - - OxmEntityDescriptor descriptor = oxmEntityLookup.getEntityDescriptors().get(entityType); - String entityPrimaryKeyName = NodeUtils.concatArray( - descriptor.getPrimaryKeyAttributeNames(), "/"); - - this.id = - NodeUtils.generateUniqueShaDigest(entityType, entityPrimaryKeyName, entityPrimaryKeyValue); - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "TopographicalEntity [" + ("entityType=" + entityType + ", ") - + ("entityPrimaryKeyValue=" + entityPrimaryKeyValue + ", ") - + ("latitude=" + latitude + ", ") + ("longitude=" + longitude + ", ") + ("ID=" + id + ", ") - + ("selfLink=" + selfLink) + "]"; - } - - @Override - @JsonIgnore - public String getId() { - return this.id; - } - - @JsonProperty("entityType") - public String getEntityType() { - return entityType; - } - - public void setEntityType(String entityType) { - this.entityType = entityType; - } - - @JsonProperty("entityPrimaryKeyValue") - public String getEntityPrimaryKeyValue() { - return entityPrimaryKeyValue; - } - - public void setEntityPrimaryKeyValue(String entityPrimaryKeyValue) { - this.entityPrimaryKeyValue = entityPrimaryKeyValue; - } - - @JsonProperty("entityPrimaryKeyName") - public String getEntityPrimaryKeyName() { - return entityPrimaryKeyName; - } - - public void setEntityPrimaryKeyName(String entityPrimaryKeyName) { - this.entityPrimaryKeyName = entityPrimaryKeyName; - } - - @JsonProperty("lat") - public String getLatitude() { - return latitude; - } - - public void setLatitude(String latitude) { - this.latitude = latitude; - } - - @JsonProperty("long") - public String getLongitude() { - return longitude; - } - - public void setLongitude(String longitude) { - this.longitude = longitude; - } - - @JsonProperty("link") - public String getSelfLink() { - return selfLink; - } - - public void setSelfLink(String selfLink) { - this.selfLink = selfLink; - } - - @JsonIgnore - public static long getSerialversionuid() { - return serialVersionUID; - } - - public void setId(String id) { - this.id = id; - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java deleted file mode 100644 index 88d47ba..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/inventory/entity/TopographicalEntity.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.inventory.entity; - -import java.io.IOException; -import java.io.Serializable; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.List; - -import javax.json.Json; -import javax.json.JsonObject; - -/** - * The Class TopographicalEntity. - */ -public class TopographicalEntity implements Serializable { - - private static final long serialVersionUID = -5188479658230319058L; - - protected String entityType; - protected String entityPrimaryKeyValue; - protected String entityPrimaryKeyName; - protected String latitude; - protected String longitude; - protected String selfLink; - - // generated, SHA-256 digest - protected String id; - - /** - * Convert bytes to hex string. - * - * @param bytesToConvert the bytes to convert - * @return the string - */ - private static String convertBytesToHexString(byte[] bytesToConvert) { - StringBuffer hexString = new StringBuffer(); - for (int i = 0; i < bytesToConvert.length; i++) { - hexString.append(Integer.toHexString(0xFF & bytesToConvert[i])); - } - return hexString.toString(); - } - - /** - * Concat array. - * - * @param list the list - * @param delimiter the delimiter - * @return the string - */ - private static String concatArray(List<String> list, char delimiter) { - - if (list == null || list.size() == 0) { - return ""; - } - - StringBuilder result = new StringBuilder(64); - - int listSize = list.size(); - boolean firstValue = true; - - for (String item : list) { - - if (firstValue) { - result.append(item); - firstValue = false; - } else { - result.append(delimiter).append(item); - } - - } - - return result.toString(); - - } - - /* - * We'll try and create a unique identity key that we can use for differencing the previously - * imported record sets as we won't have granular control of what is created/removed and when. The - * best we can hope for is identification of resources by generated Id until the Identity-Service - * UUID is tagged against all resources, then we can use that instead. - */ - - /** - * Generate unique sha digest. - * - * @param entityType the entity type - * @param fieldName the field name - * @param fieldValue the field value - * @return the string - * @throws NoSuchAlgorithmException the no such algorithm exception - */ - public static String generateUniqueShaDigest(String entityType, String fieldName, - String fieldValue) throws NoSuchAlgorithmException { - - /* - * Basically SHA-256 will result in an identity with a guaranteed uniqueness compared to just a - * java hashcode value. - */ - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - digest.update(String.format("%s.%s.%s", entityType, fieldName, fieldValue).getBytes()); - return convertBytesToHexString(digest.digest()); - } - - /** - * Instantiates a new topographical entity. - */ - public TopographicalEntity() {} - - /* - * (non-Javadoc) - * - */ - public String getAsJson() throws IOException { - - JsonObject obj = - Json.createObjectBuilder().add("entityType", entityType).add("pkey", entityPrimaryKeyValue) - .add("location", Json.createObjectBuilder().add("lat", latitude).add("lon", longitude)) - .add("selfLink", selfLink).build(); - - return obj.toString(); - } - - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "TopographicalEntity [" + ("entityType=" + entityType + ", ") - + ("entityPrimaryKeyValue=" + entityPrimaryKeyValue + ", ") - + ("latitude=" + latitude + ", ") + ("longitude=" + longitude + ", ") + ("ID=" + id + ", ") - + ("selfLink=" + selfLink) + "]"; - } - - public String getId() { - return this.id; - } - - public String getEntityType() { - return entityType; - } - - public void setEntityType(String entityType) { - this.entityType = entityType; - } - - public String getEntityPrimaryKeyValue() { - return entityPrimaryKeyValue; - } - - public void setEntityPrimaryKeyValue(String entityPrimaryKeyValue) { - this.entityPrimaryKeyValue = entityPrimaryKeyValue; - } - - public String getEntityPrimaryKeyName() { - return entityPrimaryKeyName; - } - - public void setEntityPrimaryKeyName(String entityPrimaryKeyName) { - this.entityPrimaryKeyName = entityPrimaryKeyName; - } - - public String getLatitude() { - return latitude; - } - - public void setLatitude(String latitude) { - this.latitude = latitude; - } - - public String getLongitude() { - return longitude; - } - - public void setLongitude(String longitude) { - this.longitude = longitude; - } - - public String getSelfLink() { - return selfLink; - } - - public void setSelfLink(String selfLink) { - this.selfLink = selfLink; - } - - public static long getSerialversionuid() { - return serialVersionUID; - } - - public void setId(String id) { - this.id = id; - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java deleted file mode 100644 index ad651c7..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/EntityCountHistoryProcessor.java +++ /dev/null @@ -1,403 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.search; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.component.restlet.RestletConstants; -import org.json.JSONArray; -import org.json.JSONObject; -import org.onap.aai.cl.api.Logger; -import org.onap.aai.cl.eelf.LoggerFactory; -import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.inventory.EntityHistoryQueryBuilder; -import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.util.NodeUtils; -import org.onap.aai.sparky.util.RestletUtils; -import org.restlet.Request; -import org.restlet.Response; -import org.restlet.data.ClientInfo; -import org.restlet.data.MediaType; -import org.restlet.data.Status; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; - -/** - * Receives and processes Entity Count History requests - */ -public class EntityCountHistoryProcessor implements Processor { - - private static final Logger LOG = - LoggerFactory.getInstance().getLogger(EntityCountHistoryProcessor.class); - - private static final long serialVersionUID = 1L; - - private ElasticSearchAdapter elasticSearchAdapter = null; - private ObjectMapper mapper; - - private static final String SEARCH_PRETTY_STRING = "_search?pretty"; - private static final String TYPE = "type"; - private static final String TABLE = "table"; - private static final String GRAPH = "graph"; - - private List<String> entityTypesToSummarize; - private List<String> vnfEntityTypes; - - private String entityCountHistoryIndexName; - - private boolean summarizeVnfs = false; - - private RestletUtils restletUtils = new RestletUtils(); - - /** - * Instantiates a new Entity Count History - */ - - public EntityCountHistoryProcessor(ElasticSearchAdapter elasticSearchAdapter, - String entityTypesToSummarizeDelimitedList, String vnfEntityTypesDelimitedList, String entityCountHistoryIndexName) { - - this.elasticSearchAdapter = elasticSearchAdapter; - this.entityCountHistoryIndexName = entityCountHistoryIndexName; - - entityTypesToSummarize = - Arrays.asList(entityTypesToSummarizeDelimitedList.toLowerCase().split("[\\s,]+")); - - vnfEntityTypes = - Arrays.asList(vnfEntityTypesDelimitedList.toLowerCase().split("[\\s,]+")); - - summarizeVnfs = vnfEntityTypesDelimitedList.toLowerCase().contains("vnf"); - - this.mapper = new ObjectMapper(); - this.mapper.configure(SerializationFeature.INDENT_OUTPUT, true); - } - - /** - * Processes a entity count history search request - * - * @param exchange The Exchange object generated by Apache Camel for the incoming request - */ - - @Override - public void process(Exchange exchange) throws Exception { - - Request request = exchange.getIn().getHeader(RestletConstants.RESTLET_REQUEST, Request.class); - Response restletResponse = - exchange.getIn().getHeader(RestletConstants.RESTLET_RESPONSE, Response.class); - - Object xTransactionId = exchange.getIn().getHeader("X-TransactionId"); - if (xTransactionId == null) { - xTransactionId = NodeUtils.getRandomTxnId(); - } - - Object partnerName = exchange.getIn().getHeader("X-FromAppId"); - if (partnerName == null) { - partnerName = "Browser"; - } - - /* - * Disables automatic Apache Camel Restlet component logging which prints out an undesirable log - * entry which includes client (e.g. browser) information - */ - request.setLoggable(false); - - ClientInfo clientInfo = request.getClientInfo(); - MdcContext.initialize((String) xTransactionId, "AAI-UI", "", (String) partnerName, - clientInfo.getAddress() + ":" + clientInfo.getPort()); - - String typeParameter = getTypeParameter(exchange); - - if (null != typeParameter && !typeParameter.isEmpty()) { - OperationResult operationResult = null; - - try { - operationResult = getResults(restletResponse, typeParameter); - restletResponse.setEntity(operationResult.getResult(), MediaType.APPLICATION_JSON); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, exc.getLocalizedMessage()); - } - } else { - LOG.error(AaiUiMsgs.RESOURCE_NOT_FOUND, request.getOriginalRef().toString()); - String errorMessage = - restletUtils.generateJsonErrorResponse("Unsupported request. Resource not found."); - restletResponse.setEntity(errorMessage, MediaType.APPLICATION_JSON); - restletResponse.setStatus(Status.CLIENT_ERROR_NOT_FOUND); - } - - exchange.getOut().setBody(restletResponse); - } - - - /** - * Format line graph output - * - * @param results The results - * @return The JSON object - * @throws JsonProcessingException The JSON processing exception - */ - public JSONObject formatLineGraphOutput(String results) throws JsonProcessingException { - Map<Long, Long> countByDateMap = new HashMap<Long, Long>(); - - JsonNode resultNode = null; - - JSONObject finalResult = new JSONObject(); - JSONArray finalResultArr = new JSONArray(); - - try { - resultNode = mapper.readTree(results); - - final JsonNode bucketsNode = getBucketsNode(resultNode); - - if (bucketsNode.isArray()) { - - for (final JsonNode entityNode : bucketsNode) { - final JsonNode dateBucketNode = entityNode.get("group_by_date").get("buckets"); - if (dateBucketNode.isArray()) { - for (final JsonNode dateBucket : dateBucketNode) { - Long date = dateBucket.get("key").asLong(); - final JsonNode countBucketNode = - dateBucket.get("sort_by_date").get("hits").get("hits"); - - if (countBucketNode.isArray()) { - final JsonNode latestEntityNode = countBucketNode.get(0); - - long currentCount = latestEntityNode.get("_source").get("count").asLong(); - if (countByDateMap.containsKey(date)) { - // add to the value if map already contains this date - currentCount += countByDateMap.get(date); - } - - countByDateMap.put(date, currentCount); - } - } - - } - } - } - - /* - * Sort the map by epoch timestamp - */ - Map<Long, Long> sortedMap = new TreeMap<Long, Long>(countByDateMap); - for (Entry<Long, Long> entry : sortedMap.entrySet()) { - JSONObject dateEntry = new JSONObject(); - dateEntry.put("date", entry.getKey()); - dateEntry.put("count", entry.getValue()); - finalResultArr.put(dateEntry); - } - - } catch (Exception exc) { - LOG.warn(AaiUiMsgs.ERROR_BUILDING_SEARCH_RESPONSE, exc.getLocalizedMessage()); - } - - return finalResult.put("result", finalResultArr); - } - - /** - * Format table output - * - * @param results The results - * @return The JSON object - * @throws JsonProcessingException The JSON processing exception - */ - public JSONObject formatTableOutput(String results) throws JsonProcessingException { - JsonNode resultNode = null; - - JSONObject finalResult = new JSONObject(); - JSONArray entitiesArr = new JSONArray(); - - Map<String, Long> entityCountInTable = initializeEntityMap(); - - long vnfCount = 0; - - try { - resultNode = mapper.readTree(results); - - final JsonNode bucketsNode = getBucketsNode(resultNode); - if (bucketsNode.isArray()) { - - for (final JsonNode entityNode : bucketsNode) { - String entityType = entityNode.get("key").asText(); - boolean isAVnf = vnfEntityTypes.contains(entityType); - long countValue = 0; - - if (isAVnf || entityCountInTable.get(entityType) != null) { - final JsonNode hitsBucketNode = entityNode.get("sort_by_date").get("hits").get("hits"); - if (hitsBucketNode.isArray()) { - // the first bucket will be the latest - final JsonNode hitNode = hitsBucketNode.get(0); - - countValue = hitNode.get("_source").get("count").asLong(); - - /* - * Special case: Add all the VNF types together to get aggregate count - */ - if (summarizeVnfs && isAVnf) { - vnfCount += countValue; - countValue = vnfCount; - entityType = "vnf"; - } - - entityCountInTable.replace(entityType, countValue); - } - } - - } - } - for (Entry<String, Long> entry : entityCountInTable.entrySet()) { - JSONObject entityType = new JSONObject(); - entityType.put("key", entry.getKey()); - entityType.put("doc_count", entry.getValue()); - entitiesArr.put(entityType); - } - - finalResult.put("result", entitiesArr); - - } catch (Exception exc) { - LOG.warn(AaiUiMsgs.ERROR_BUILDING_RESPONSE_FOR_TABLE_QUERY, exc.getLocalizedMessage()); - } - - return finalResult; - } - - /** - * Gets the results - * - * @param response The response - * @param type The type - * @return The results - */ - public OperationResult getResults(Response response, String type) { - OperationResult operationResult = new OperationResult(); - - String reqPayload = EntityHistoryQueryBuilder.getQuery(type).toString(); - - try { - final String fullUrlStr = elasticSearchAdapter - .buildElasticSearchUrlForApi(entityCountHistoryIndexName, SEARCH_PRETTY_STRING); - - OperationResult opResult = elasticSearchAdapter.doPost(fullUrlStr, reqPayload, - javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE); - - JSONObject finalOutput = null; - if (type.equalsIgnoreCase(TABLE)) { - finalOutput = formatTableOutput(opResult.getResult()); - } else if (type.equalsIgnoreCase(GRAPH)) { - finalOutput = formatLineGraphOutput(opResult.getResult()); - } - - if (finalOutput != null) { - response.setEntity(finalOutput.toString(), MediaType.APPLICATION_JSON); - operationResult.setResult(finalOutput.toString()); - } - } catch (JsonProcessingException exc) { - restletUtils.handleRestletErrors(LOG, "Unable to map JSONpayload", exc, response); - } - - return operationResult; - } - - /** - * Gets the buckets node - * - * @param node The node - * @return The buckets node - * @throws Exception The exception - */ - public JsonNode getBucketsNode(JsonNode node) throws Exception { - if (node.get("aggregations").get("group_by_entityType").get("buckets") != null) { - return node.get("aggregations").get("group_by_entityType").get("buckets"); - } else { - throw new Exception("Failed to map JSON response"); - } - } - - /** - * Initialize entity map - * - * @return the map - */ - private Map<String, Long> initializeEntityMap() { - Map<String, Long> entityMap = new HashMap<String, Long>(); - for (String entity : entityTypesToSummarize) { - entityMap.put(entity, (long) 0); - } - - return entityMap; - } - - /** - * Extracts the "type" query parameter from the request URI - * - * @param exchange - * @return String containing the value of the "type" query parameter of the request. Returns null - * if no "type" parameter found - */ - public String getTypeParameter(Exchange exchange) { - String typeParameter = null; - - String requestUriParameterString = exchange.getIn().getHeader("CamelHttpQuery", String.class); - - if (null != requestUriParameterString) { - String[] requestParameterParts = requestUriParameterString.split("&"); - - String[] parameter = requestParameterParts[0].split("="); - String currentParameterKey = parameter[0]; - - if (null != currentParameterKey && !currentParameterKey.isEmpty()) { - // Check if we're looking at the "type" parameter key - if (currentParameterKey.equals(TYPE)) { - boolean uriIncludesTypeParameterValue = - (parameter.length >= 2) && !parameter[1].isEmpty(); - - if (uriIncludesTypeParameterValue) { - String typeParameterValue = parameter[1]; - - // Is the parameter value one that we return data for? - if (typeParameterValue.equalsIgnoreCase(TABLE) - || typeParameterValue.equalsIgnoreCase(GRAPH)) { - typeParameter = typeParameterValue; - } - } - } - } - } - - return typeParameter; - } - - - public void setRestletUtils(RestletUtils restletUtils) { - this.restletUtils = restletUtils; - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java index cb3e5e4..a22170a 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/SearchServiceAdapter.java @@ -43,6 +43,13 @@ public class SearchServiceAdapter { private static final String VALUE_QUERY = "query"; private static final String SUGGEST_QUERY = "suggest"; + private static final String BULK_API = "bulk"; + private static final String DOCUMENT_EDNPOINT = "documents"; + private static final String SEARH_SERVICE_BULK_TEMPLATE = + "{\"create\":{\"metaData\":{\"url\":\"%s\"},\"document\":\"%s\"}}\n"; + + private static final String SEARH_SERVICE_SINGLE_ENTITY_TEMPLATE = + "{\"queries\":[{\"must\":{\"match\":{\"field\":\"_id\",\"value\":\"%s\"}}}]}\n"; private RestClient client; private RestEndpointConfig endpointConfig; @@ -113,6 +120,13 @@ public class SearchServiceAdapter { OperationResult or = client.delete(url, getTxnHeader(), MediaType.APPLICATION_JSON_TYPE); return new OperationResult(or.getResultCode(), or.getResult()); } + + public OperationResult doBulkOperation(String url, String jsonPayload) { + + OperationResult or = client.post(url, jsonPayload, getTxnHeader(), + MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + return new OperationResult(or.getResultCode(), or.getResult()); + } public Map<String, List<String>> getTxnHeader() { HashMap<String, List<String>> headers = new HashMap<String, List<String>>(); @@ -121,6 +135,15 @@ public class SearchServiceAdapter { headers.put("X-FromAppId", Arrays.asList(MDC.get(MdcContext.MDC_PARTNER_NAME))); return headers; } + +public String buildBulkImportOperationRequest(String indexName, String id, String payload){ + + StringBuilder requestPayload = new StringBuilder(128); + String SearchTarget = buildSearchServiceDocUrl(indexName,id); + + requestPayload.append(String.format(SEARH_SERVICE_BULK_TEMPLATE,SearchTarget,payload)); + return requestPayload.toString(); + } /** * Get Full URL for search @@ -143,12 +166,50 @@ public class SearchServiceAdapter { public String buildSuggestServiceQueryUrl(String indexName) { return buildSearchServiceUrlForApi(indexName, SUGGEST_QUERY); } + + public String buildSearchServiceDocUrl(String indexName,String api) { + + return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/%s/%s/%s", + endpointConfig.getEndpointIpAddress(), endpointConfig.getEndpointServerPort(), + serviceApiVersion, indexName,DOCUMENT_EDNPOINT, api); + } + + + public String buildSearchServiceCreateDocApi(String indexName){ + + return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/%s/%s", + endpointConfig.getEndpointIpAddress(), endpointConfig.getEndpointServerPort(), + serviceApiVersion, indexName,DOCUMENT_EDNPOINT ); + } public String buildSearchServiceUrlForApi(String indexName, String api) { + return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/%s/%s", endpointConfig.getEndpointIpAddress(), endpointConfig.getEndpointServerPort(), serviceApiVersion, indexName, api); } + + public String buildSearchServiceBulkUrl() { + + return String.format("https://%s:%s/services/search-data-service/%s/search/%s", endpointConfig.getEndpointIpAddress(), + endpointConfig.getEndpointServerPort(),serviceApiVersion,BULK_API); + } + + public OperationResult retrieveEntityById(String entityId,String indexName) { + + StringBuilder requestPayload = new StringBuilder(128); + requestPayload.append(String.format(SEARH_SERVICE_SINGLE_ENTITY_TEMPLATE,entityId)); + String payload = requestPayload.toString(); + String searchServiceUrl = buildSearchServiceQueryUrl(indexName); + + return this.doPost(searchServiceUrl,payload); + } + +public String buildSearchServiceCreateIndexUrl(String indexName) { + + return String.format("https://%s:%s/services/search-data-service/%s/search/indexes/dynamic/%s", endpointConfig.getEndpointIpAddress(), + endpointConfig.getEndpointServerPort(),serviceApiVersion,indexName); + } } diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java index 707f907..1eb1823 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/search/filters/FilterElasticSearchAdapter.java @@ -31,11 +31,10 @@ import org.json.JSONObject; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.logging.AaiUiMsgs; import org.onap.aai.sparky.search.filters.config.UiFilterDataSourceConfig; import org.onap.aai.sparky.search.filters.entity.UiFilterEntity; -import org.onap.aai.sparky.viewandinspect.config.SparkyConstants; /** @@ -51,10 +50,10 @@ public class FilterElasticSearchAdapter { private static final String CONTAINER = "default"; private static final String BUCKETS = "buckets"; private static final String FILTER_VALUE_KEY = "key"; - private ElasticSearchAdapter elasticSearchAdapter; + private SearchServiceAdapter searchServiceAdapter; - public FilterElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) { - this.elasticSearchAdapter = elasticSearchAdapter; + public FilterElasticSearchAdapter(SearchServiceAdapter searchServiceAdapter) { + this.searchServiceAdapter = searchServiceAdapter; } /** @@ -76,10 +75,9 @@ public class FilterElasticSearchAdapter { filterValueQuery = FilterQueryBuilder.createFilterValueQueryObject(dataSourceConfig.getFieldName()); } - OperationResult opResult = elasticSearchAdapter.doPost( - elasticSearchAdapter.buildElasticSearchUrlForApi(dataSourceConfig.getIndexName(), - SparkyConstants.ES_SEARCH_API), - filterValueQuery.toString(), MediaType.APPLICATION_JSON_TYPE); + OperationResult opResult = searchServiceAdapter.doPost( + searchServiceAdapter.buildSearchServiceQueryUrl(dataSourceConfig.getIndexName()), + filterValueQuery.toString(), "application/json"); String result = opResult.getResult(); if(opResult.wasSuccessful() && result != null) { diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java index 444eafb..a85e9f2 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/AbstractEntitySynchronizer.java @@ -30,7 +30,7 @@ import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.restclient.client.OperationResult; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.dal.NetworkTransaction; import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics; import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics; @@ -39,7 +39,7 @@ import org.onap.aai.sparky.dal.rest.HttpMethod; import org.onap.aai.sparky.dal.rest.RestOperationalStatistics; import org.onap.aai.sparky.logging.AaiUiMsgs; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; -import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; +import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval; import org.onap.aai.sparky.util.NodeUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -72,7 +72,7 @@ public abstract class AbstractEntitySynchronizer { protected EnumSet<StatFlag> enabledStatFlags; - protected ElasticSearchAdapter elasticSearchAdapter; + protected SearchServiceAdapter searchServiceAdapter; protected ActiveInventoryAdapter aaiAdapter; protected ExecutorService synchronizerExecutor; @@ -362,12 +362,12 @@ public abstract class AbstractEntitySynchronizer { */ public void clearCache() {} - public ElasticSearchAdapter getElasticSearchAdapter() { - return elasticSearchAdapter; + public SearchServiceAdapter getSearchServiceAdapter() { + return searchServiceAdapter; } - public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) { - this.elasticSearchAdapter = elasticSearchAdapter; + public void setSearchServiceAdapter(SearchServiceAdapter searchServiceAdapter) { + this.searchServiceAdapter = searchServiceAdapter; } public ActiveInventoryAdapter getAaiAdapter() { @@ -531,7 +531,7 @@ public abstract class AbstractEntitySynchronizer { /* * In this retry flow the se object has already derived its fields */ - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), id); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage()); } @@ -548,7 +548,7 @@ public abstract class AbstractEntitySynchronizer { * called incrementAndGet when queuing the failed PUT! */ - supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter), + supplyAsync(new PerformSearchServiceRetrieval(retryTransaction, searchServiceAdapter), esExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java deleted file mode 100644 index a397d91..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java +++ /dev/null @@ -1,600 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.sync; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import javax.ws.rs.core.MediaType; - -import org.onap.aai.cl.api.Logger; -import org.onap.aai.cl.eelf.LoggerFactory; -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; -import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; -import org.onap.aai.sparky.sync.entity.ObjectIdCollection; -import org.onap.aai.sparky.sync.entity.SearchableEntity; -import org.onap.aai.sparky.sync.enumeration.OperationState; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * The Class ElasticSearchIndexCleaner. - */ -public class ElasticSearchIndexCleaner implements IndexCleaner { - - private static final Logger LOG = - LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class); - - private static final String BULK_OP_LINE_TEMPLATE = "%s\n"; - private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; - - private ObjectIdCollection before; - private ObjectIdCollection after; - - private ObjectMapper mapper; - private ElasticSearchAdapter esAdapter; - private ElasticSearchEndpointConfig endpointConfig; - private ElasticSearchSchemaConfig schemaConfig; - - /** - * Instantiates a new elastic search index cleaner. - * - * @param restDataProvider the rest data provider - * @param indexName the index name - * @param indexType the index type - * @param host the host - * @param port the port - * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes - * @param numItemsToGetBulkRequest the num items to get bulk request - */ - public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) { - this.esAdapter = esAdapter; - this.before = null; - this.after = null; - this.endpointConfig = endpointConfig; - this.schemaConfig = schemaConfig; - this.mapper = new ObjectMapper(); - } - - /* - * (non-Javadoc) - * - * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection() - */ - @Override - public OperationState populatePreOperationCollection() { - - try { - before = retrieveAllDocumentIdentifiers(); - return OperationState.OK; - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage()); - return OperationState.ERROR; - } - - } - - /* - * (non-Javadoc) - * - * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection() - */ - @Override - public OperationState populatePostOperationCollection() { - try { - after = retrieveAllDocumentIdentifiers(); - return OperationState.OK; - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage()); - return OperationState.ERROR; - } - } - - /* - * (non-Javadoc) - * - * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup() - */ - @Override - public OperationState performCleanup() { - // TODO Auto-generated method stub - LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName()); - - int sizeBefore = before.getSize(); - int sizeAfter = after.getSize(); - - LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore), - String.valueOf(sizeAfter)); - - /* - * If the processedImportIds size <= 0, then something has failed in the sync operation and we - * shouldn't do the selective delete right now. - */ - - if (sizeAfter > 0) { - - Collection<String> presyncIds = before.getImportedObjectIds(); - presyncIds.removeAll(after.getImportedObjectIds()); - - try { - LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), - String.valueOf(presyncIds.size())); - - ObjectIdCollection bulkIds = new ObjectIdCollection(); - - Iterator<String> it = presyncIds.iterator(); - int numItemsInBulkRequest = 0; - int numItemsRemainingToBeDeleted = presyncIds.size(); - - while (it.hasNext()) { - - bulkIds.addObjectId(it.next()); - numItemsInBulkRequest++; - - if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) { - LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize())); - bulkDelete(bulkIds.getImportedObjectIds()); - numItemsRemainingToBeDeleted -= numItemsInBulkRequest; - numItemsInBulkRequest = 0; - bulkIds.clear(); - } - } - - if (numItemsRemainingToBeDeleted > 0) { - LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize())); - bulkDelete(bulkIds.getImportedObjectIds()); - } - - - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage()); - - } - } - - return OperationState.OK; - } - - @Override - public String getIndexName() { - return schemaConfig.getIndexName(); - } - - /** - * Builds the initial scroll request payload. - * - * @param numItemsToGetPerRequest the num items to get per request - * @param fieldList the field list - * @return the string - * @throws JsonProcessingException the json processing exception - */ - protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest, - List<String> fieldList) throws JsonProcessingException { - - ObjectNode rootNode = mapper.createObjectNode(); - rootNode.put("size", numItemsToGetPerRequest); - - ArrayNode fields = mapper.createArrayNode(); - - for (String f : fieldList) { - fields.add(f); - } - - rootNode.set("fields", fields); - - ObjectNode queryNode = mapper.createObjectNode(); - queryNode.set("match_all", mapper.createObjectNode()); - - rootNode.set("query", queryNode); - - return mapper.writeValueAsString(rootNode); - - } - - /** - * Builds the subsequent scroll context request payload. - * - * @param scrollId the scroll id - * @param contextTimeToLiveInMinutes the context time to live in minutes - * @return the string - * @throws JsonProcessingException the json processing exception - */ - protected String buildSubsequentScrollContextRequestPayload(String scrollId, - int contextTimeToLiveInMinutes) throws JsonProcessingException { - - ObjectNode rootNode = mapper.createObjectNode(); - - rootNode.put("scroll", contextTimeToLiveInMinutes + "m"); - rootNode.put("scroll_id", scrollId); - - return mapper.writeValueAsString(rootNode); - - } - - /** - * Parses the elastic search result. - * - * @param jsonResult the json result - * @return the json node - * @throws JsonProcessingException the json processing exception - * @throws IOException Signals that an I/O exception has occurred. - */ - protected JsonNode parseElasticSearchResult(String jsonResult) - throws JsonProcessingException, IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readTree(jsonResult); - } - - /** - * Lookup index doc. - * - * @param ids the ids - * @param docs the docs - * @return the array list - */ - protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids, - List<SearchableEntity> docs) { - ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>(); - - if (ids != null && docs != null) { - for (SearchableEntity d : docs) { - if (ids.contains(d.getId())) { - objs.add(d); - } - } - } - - return objs; - } - - /** - * Builds the delete data object. - * - * @param index the index - * @param type the type - * @param id the id - * @return the object node - */ - protected ObjectNode buildDeleteDataObject(String index, String type, String id) { - - ObjectNode indexDocProperties = mapper.createObjectNode(); - - indexDocProperties.put("_index", index); - indexDocProperties.put("_type", type); - indexDocProperties.put("_id", id); - - ObjectNode rootNode = mapper.createObjectNode(); - rootNode.set("delete", indexDocProperties); - - return rootNode; - } - - /** - * This method might appear to be a little strange, and is simply an optimization to take an - * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists. - * - * @param startNode the start node - * @param fieldPath the field path - * @return the node path - */ - protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) { - - JsonNode jsonNode = null; - - for (String field : fieldPath) { - if (jsonNode == null) { - jsonNode = startNode.get(field); - } else { - jsonNode = jsonNode.get(field); - } - - /* - * This is our safety net in case any intermediate path returns a null - */ - - if (jsonNode == null) { - return null; - } - - } - - return jsonNode; - } - - /** - * Gets the full url. - * - * @param resourceUrl the resource url - * @return the full url - */ - private String getFullUrl(String resourceUrl) { - return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(), - endpointConfig.getEsServerPort(), resourceUrl); - } - - /** - * Retrieve all document identifiers. - * - * @return the object id collection - * @throws IOException Signals that an I/O exception has occurred. - */ - public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException { - - ObjectIdCollection currentDocumentIds = new ObjectIdCollection(); - - long opStartTimeInMs = System.currentTimeMillis(); - - List<String> fields = new ArrayList<String>(); - fields.add("_id"); - // fields.add("entityType"); - - String scrollRequestPayload = - buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields); - - final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll=" - + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m"); - - OperationResult result = - esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE); - - if (result.wasSuccessful()) { - - JsonNode rootNode = parseElasticSearchResult(result.getResult()); - - /* - * Check the result for success / failure, and enumerate all the index ids that resulted in - * success, and ignore the ones that failed or log them so we have a record of the failure. - */ - int totalRecordsAvailable = 0; - String scrollId = null; - int numRecordsFetched = 0; - - if (rootNode != null) { - - scrollId = getFieldValue(rootNode, "_scroll_id"); - final String tookStr = getFieldValue(rootNode, "took"); - int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr); - boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out")); - - if (timedOut) { - LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers", - String.valueOf(tookInMs)); - } else { - LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers", - String.valueOf(tookInMs)); - } - - JsonNode hitsNode = rootNode.get("hits"); - totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText()); - - LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers", - String.valueOf(totalRecordsAvailable)); - - /* - * Collect all object ids - */ - - ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits"); - - Iterator<JsonNode> nodeIterator = hitsArray.iterator(); - - String key = null; - String value = null; - JsonNode jsonNode = null; - - while (nodeIterator.hasNext()) { - - jsonNode = nodeIterator.next(); - - key = getFieldValue(jsonNode, "_id"); - - if (key != null) { - currentDocumentIds.addObjectId(key); - } - - } - - int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched); - - int numRequiredAdditionalFetches = - (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize()); - - /* - * Do an additional fetch for the remaining items (if needed) - */ - - if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) { - numRequiredAdditionalFetches += 1; - } - - if (LOG.isDebugEnabled()) { - LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES, - String.valueOf(numRequiredAdditionalFetches)); - } - - - for (int x = 0; x < numRequiredAdditionalFetches; x++) { - - if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) { - // abort the whole thing because now we can't reliably cleanup the orphans. - throw new IOException( - "Failed to collect pre-sync doc collection from index. Aborting operation"); - } - if (LOG.isDebugEnabled()) { - LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES, - String.valueOf(currentDocumentIds.getSize()), - String.valueOf(totalRecordsAvailable)); - } - - } - - } - - } else { - // scroll context get failed, nothing else to do - LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString()); - } - - LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers", - String.valueOf((System.currentTimeMillis() - opStartTimeInMs))); - - return currentDocumentIds; - - } - - /** - * Collect items from scroll context. - * - * @param scrollId the scroll id - * @param objectIds the object ids - * @return the operation state - * @throws IOException Signals that an I/O exception has occurred. - */ - private OperationState collectItemsFromScrollContext(String scrollId, - ObjectIdCollection objectIds) throws IOException { - - String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId, - endpointConfig.getScrollContextTimeToLiveInMinutes()); - - final String fullUrlStr = getFullUrl("/_search/scroll"); - - OperationResult opResult = - esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE); - - if (opResult.getResultCode() >= 300) { - LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult()); - return OperationState.ERROR; - } - - JsonNode rootNode = parseElasticSearchResult(opResult.getResult()); - boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out")); - final String tookStr = getFieldValue(rootNode, "took"); - int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr); - - JsonNode hitsNode = rootNode.get("hits"); - - /* - * Check the result for success / failure, and enumerate all the index ids that resulted in - * success, and ignore the ones that failed or log them so we have a record of the failure. - */ - - if (rootNode != null) { - - if (timedOut) { - LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs)); - } else { - LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs)); - } - - /* - * Collect all object ids - */ - - ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits"); - String key = null; - String value = null; - JsonNode jsonNode = null; - - Iterator<JsonNode> nodeIterator = hitsArray.iterator(); - - while (nodeIterator.hasNext()) { - - jsonNode = nodeIterator.next(); - - key = getFieldValue(jsonNode, "_id"); - - if (key != null) { - objectIds.addObjectId(key); - - } - - } - } - - return OperationState.OK; - } - - /** - * Gets the field value. - * - * @param node the node - * @param fieldName the field name - * @return the field value - */ - protected String getFieldValue(JsonNode node, String fieldName) { - - JsonNode field = node.get(fieldName); - - if (field != null) { - return field.asText(); - } - - return null; - - } - - /** - * Bulk delete. - * - * @param docIds the doc ids - * @return the operation result - * @throws IOException Signals that an I/O exception has occurred. - */ - public OperationResult bulkDelete(Collection<String> docIds) throws IOException { - - if (docIds == null || docIds.size() == 0) { - LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP); - return new OperationResult(500, - "Skipping bulkDelete(); operation because docs to delete list is empty"); - } - - LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size())); - - StringBuilder sb = new StringBuilder(128); - - for (String id : docIds) { - sb.append(String.format(BULK_OP_LINE_TEMPLATE, - buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id))); - } - - sb.append("\n"); - - final String fullUrlStr = getFullUrl("/_bulk"); - - return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE); - - } - - /* - - */ - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java index c83aa72..c9d8272 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/IndexIntegrityValidator.java @@ -25,9 +25,9 @@ import javax.ws.rs.core.MediaType; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; +import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; /** @@ -38,11 +38,11 @@ public class IndexIntegrityValidator implements IndexValidator { private static final Logger LOG = LoggerFactory.getInstance().getLogger(IndexIntegrityValidator.class); - private ElasticSearchEndpointConfig endpointConfig; + private RestEndpointConfig endpointConfig; private ElasticSearchSchemaConfig schemaConfig; private String tableConfigJson; - private final ElasticSearchAdapter esAdapter; + private final SearchServiceAdapter searchServiceAdapter; /** * Instantiates a new index integrity validator. @@ -54,21 +54,21 @@ public class IndexIntegrityValidator implements IndexValidator { * @param port the port * @param tableConfigJson the table config json */ - public IndexIntegrityValidator(ElasticSearchAdapter esAdapter, - ElasticSearchSchemaConfig esSchemaConfig, ElasticSearchEndpointConfig esEndpointConfig, + public IndexIntegrityValidator(SearchServiceAdapter searchServiceAdapter, + ElasticSearchSchemaConfig esSchemaConfig, RestEndpointConfig esEndpointConfig, String tableConfigJson) { - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.schemaConfig = esSchemaConfig; this.endpointConfig = esEndpointConfig; this.tableConfigJson = tableConfigJson; } - public ElasticSearchEndpointConfig getEndpointConfig() { + public RestEndpointConfig getEndpointConfig() { return endpointConfig; } - public void setEndpointConfig(ElasticSearchEndpointConfig endpointConfig) { + public void setEndpointConfig(RestEndpointConfig endpointConfig) { this.endpointConfig = endpointConfig; } @@ -80,8 +80,8 @@ public class IndexIntegrityValidator implements IndexValidator { this.schemaConfig = schemaConfig; } - public ElasticSearchAdapter getEsAdapter() { - return esAdapter; + public SearchServiceAdapter getSearchServiceAdapter() { + return searchServiceAdapter; } @Override @@ -95,10 +95,14 @@ public class IndexIntegrityValidator implements IndexValidator { * * @see org.openecomp.sparky.synchronizer.IndexValidator#exists() */ + /* TODO + * currently Search does not support head operations on an index neither does it support get operations + * on an index. get is being used so that it does not break any code. + * */ @Override public boolean exists() { - final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName() + "/"); - OperationResult existsResult = esAdapter.doHead(fullUrlStr, MediaType.APPLICATION_JSON_TYPE); + final String fullUrlStr = getFullUrl(schemaConfig.getIndexName() + "/"); + OperationResult existsResult = searchServiceAdapter.doGet(fullUrlStr, "application/json"); int rc = existsResult.getResultCode(); @@ -135,7 +139,7 @@ public class IndexIntegrityValidator implements IndexValidator { final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName() + "/"); OperationResult createResult = - esAdapter.doPut(fullUrlStr, tableConfigJson, MediaType.APPLICATION_JSON_TYPE); + searchServiceAdapter.doPut(fullUrlStr, tableConfigJson,"application/json"); int rc = createResult.getResultCode(); @@ -167,8 +171,8 @@ public class IndexIntegrityValidator implements IndexValidator { * @return the full url */ private String getFullUrl(String resourceUrl) { - return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(), - endpointConfig.getEsServerPort(), resourceUrl); + String createIndexUrl = searchServiceAdapter.buildSearchServiceCreateIndexUrl(resourceUrl); + return createIndexUrl; } } diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java deleted file mode 100644 index 4ba3405..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/config/ElasticSearchEndpointConfig.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.sync.config; - -public class ElasticSearchEndpointConfig { - - private String esIpAddress; - private String esServerPort; - private int scrollContextTimeToLiveInMinutes; - private int scrollContextBatchRequestSize; - - public ElasticSearchEndpointConfig() { - - } - - public String getEsIpAddress() { - return esIpAddress; - } - - public void setEsIpAddress(String esIpAddress) { - this.esIpAddress = esIpAddress; - } - - public String getEsServerPort() { - return esServerPort; - } - - public void setEsServerPort(String esServerPort) { - this.esServerPort = esServerPort; - } - - public int getScrollContextTimeToLiveInMinutes() { - return scrollContextTimeToLiveInMinutes; - } - - public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) { - this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes; - } - - public int getScrollContextBatchRequestSize() { - return scrollContextBatchRequestSize; - } - - public void setScrollContextBatchRequestSize(int scrollContextBatchRequestSize) { - this.scrollContextBatchRequestSize = scrollContextBatchRequestSize; - } - - - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchPut.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServicePut.java index d516ba8..c62445c 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchPut.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServicePut.java @@ -23,19 +23,17 @@ package org.onap.aai.sparky.sync.task; import java.util.Map; import java.util.function.Supplier; -import javax.ws.rs.core.MediaType; - import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.dal.NetworkTransaction; import org.slf4j.MDC; /** * The Class PerformElasticSearchPut. */ -public class PerformElasticSearchPut implements Supplier<NetworkTransaction> { +public class PerformSearchServicePut implements Supplier<NetworkTransaction> { - private ElasticSearchAdapter esAdapter; + private SearchServiceAdapter searchServiceAdapter; private String jsonPayload; private NetworkTransaction txn; private Map<String, String> contextMap; @@ -47,19 +45,19 @@ public class PerformElasticSearchPut implements Supplier<NetworkTransaction> { * @param txn the txn * @param restDataProvider the rest data provider */ - public PerformElasticSearchPut(String jsonPayload, NetworkTransaction txn, - ElasticSearchAdapter esAdapter) { + public PerformSearchServicePut(String jsonPayload, NetworkTransaction txn, + SearchServiceAdapter searchServiceAdapter) { this.jsonPayload = jsonPayload; this.txn = txn; - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.contextMap = MDC.getCopyOfContextMap(); } - public PerformElasticSearchPut(String jsonPayload, NetworkTransaction txn, - ElasticSearchAdapter esAdapter, Map<String, String> contextMap) { + public PerformSearchServicePut(String jsonPayload, NetworkTransaction txn, + SearchServiceAdapter searchServiceAdapter, Map<String, String> contextMap) { this.jsonPayload = jsonPayload; this.txn = txn; - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.contextMap = contextMap; } @@ -76,7 +74,7 @@ public class PerformElasticSearchPut implements Supplier<NetworkTransaction> { long startTimeInMs = System.currentTimeMillis(); OperationResult or = - esAdapter.doPut(txn.getLink(), jsonPayload, MediaType.APPLICATION_JSON_TYPE); + searchServiceAdapter.doPut(txn.getLink(), jsonPayload, "application/json"); txn.setOperationResult(or); txn.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchRetrieval.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceRetrieval.java index 5191c65..ab7da7a 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchRetrieval.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceRetrieval.java @@ -23,20 +23,18 @@ package org.onap.aai.sparky.sync.task; import java.util.Map; import java.util.function.Supplier; -import javax.ws.rs.core.MediaType; - import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.dal.NetworkTransaction; import org.slf4j.MDC; /** * The Class PerformElasticSearchRetrieval. */ -public class PerformElasticSearchRetrieval implements Supplier<NetworkTransaction> { +public class PerformSearchServiceRetrieval implements Supplier<NetworkTransaction> { private NetworkTransaction txn; - private ElasticSearchAdapter esAdapter; + private SearchServiceAdapter searchServiceAdapter; private Map<String, String> contextMap; /** @@ -45,10 +43,10 @@ public class PerformElasticSearchRetrieval implements Supplier<NetworkTransactio * @param elasticSearchTxn the elastic search txn * @param restDataProvider the rest data provider */ - public PerformElasticSearchRetrieval(NetworkTransaction elasticSearchTxn, - ElasticSearchAdapter esAdapter) { + public PerformSearchServiceRetrieval(NetworkTransaction elasticSearchTxn, + SearchServiceAdapter searchServiceAdapter) { this.txn = elasticSearchTxn; - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.contextMap = MDC.getCopyOfContextMap(); } @@ -59,7 +57,7 @@ public class PerformElasticSearchRetrieval implements Supplier<NetworkTransactio public NetworkTransaction get() { MDC.setContextMap(contextMap); long startTimeInMs = System.currentTimeMillis(); - OperationResult or = esAdapter.doGet(txn.getLink(), MediaType.APPLICATION_JSON_TYPE); + OperationResult or = searchServiceAdapter.doGet(txn.getLink(), "application/json"); txn.setOperationResult(or); txn.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs); return txn; diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchUpdate.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceUpdate.java index 9e8a8fc..d52a338 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformElasticSearchUpdate.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/PerformSearchServiceUpdate.java @@ -24,16 +24,16 @@ import java.util.Map; import java.util.function.Supplier; import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.dal.NetworkTransaction; import org.slf4j.MDC; /** * The Class PerformElasticSearchUpdate. */ -public class PerformElasticSearchUpdate implements Supplier<NetworkTransaction> { +public class PerformSearchServiceUpdate implements Supplier<NetworkTransaction> { - private ElasticSearchAdapter esAdapter; + private SearchServiceAdapter searchServiceAdapter; private NetworkTransaction operationTracker; private String updatePayload; private String updateUrl; @@ -47,11 +47,11 @@ public class PerformElasticSearchUpdate implements Supplier<NetworkTransaction> * @param esDataProvider the es data provider * @param transactionTracker the transaction tracker */ - public PerformElasticSearchUpdate(String updateUrl, String updatePayload, - ElasticSearchAdapter esAdapter, NetworkTransaction transactionTracker) { + public PerformSearchServiceUpdate(String updateUrl, String updatePayload, + SearchServiceAdapter searchServiceAdapter, NetworkTransaction transactionTracker) { this.updateUrl = updateUrl; this.updatePayload = updatePayload; - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.contextMap = MDC.getCopyOfContextMap(); this.operationTracker = new NetworkTransaction(); operationTracker.setEntityType(transactionTracker.getEntityType()); @@ -69,7 +69,7 @@ public class PerformElasticSearchUpdate implements Supplier<NetworkTransaction> operationTracker.setTaskAgeInMs(); MDC.setContextMap(contextMap); long startTimeInMs = System.currentTimeMillis(); - OperationResult or = esAdapter.doBulkOperation(updateUrl, updatePayload); + OperationResult or = searchServiceAdapter.doBulkOperation(updateUrl, updatePayload); operationTracker.setOperationResult(or); operationTracker.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs); return operationTracker; diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java deleted file mode 100644 index 301e65d..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/sync/task/StoreDocumentTask.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.sync.task; - -import java.util.Map; -import java.util.function.Supplier; - -import javax.ws.rs.core.MediaType; - -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.dal.NetworkTransaction; -import org.onap.aai.sparky.sync.entity.IndexDocument; -import org.slf4j.MDC; - -/** - * The Class StoreDocumentTask. - */ -public class StoreDocumentTask implements Supplier<NetworkTransaction> { - - private IndexDocument doc; - - private NetworkTransaction txn; - - private ElasticSearchAdapter esAdapter; - private Map<String, String> contextMap; - - /** - * Instantiates a new store document task. - * - * @param doc the doc - * @param txn the txn - * @param esAdapter the es adapter - */ - public StoreDocumentTask(IndexDocument doc, NetworkTransaction txn, - ElasticSearchAdapter esAdapter) { - this.doc = doc; - this.txn = txn; - this.esAdapter = esAdapter; - this.contextMap = MDC.getCopyOfContextMap(); - } - - /* (non-Javadoc) - * @see java.util.function.Supplier#get() - */ - @Override - public NetworkTransaction get() { - txn.setTaskAgeInMs(); - - long startTimeInMs = System.currentTimeMillis(); - MDC.setContextMap(contextMap); - OperationResult operationResult = new OperationResult(); - - try { - - operationResult = - esAdapter.doPut(txn.getLink(), doc.getAsJson(), MediaType.APPLICATION_JSON_TYPE); - txn.setOpTimeInMs(System.currentTimeMillis() - startTimeInMs); - } catch (Exception exception) { - operationResult.setResult(500, exception.getMessage()); - } - - txn.setOperationResult(operationResult); - - return txn; - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java deleted file mode 100644 index 66c249c..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSyncController.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.topology.sync; - -import org.onap.aai.sparky.config.oxm.GeoEntityLookup; -import org.onap.aai.sparky.config.oxm.OxmEntityLookup; -import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner; -import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory; -import org.onap.aai.sparky.sync.IndexCleaner; -import org.onap.aai.sparky.sync.IndexIntegrityValidator; -import org.onap.aai.sparky.sync.SyncControllerImpl; -import org.onap.aai.sparky.sync.SyncControllerRegistrar; -import org.onap.aai.sparky.sync.SyncControllerRegistry; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; -import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; -import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; -import org.onap.aai.sparky.sync.config.SyncControllerConfig; - -public class GeoSyncController extends SyncControllerImpl implements SyncControllerRegistrar { - - private SyncControllerRegistry syncControllerRegistry; - - public GeoSyncController(SyncControllerConfig syncControllerConfig, - ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter, - ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig, - NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig, - GeoEntityLookup geoEntityLookup, OxmEntityLookup oxmEntityLookup, - ElasticSearchSchemaFactory elasticSearchSchemaFactory) throws Exception { - super(syncControllerConfig); - - // final String controllerName = "Inventory Geo Synchronizer"; - - IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(esAdapter, schemaConfig, - endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); - - registerIndexValidator(indexValidator); - - GeoSynchronizer synchronizer = - new GeoSynchronizer(schemaConfig, syncControllerConfig.getNumInternalSyncWorkers(), - syncControllerConfig.getNumSyncActiveInventoryWorkers(), - syncControllerConfig.getNumSyncElasticWorkers(), aaiStatConfig, esStatConfig, - geoEntityLookup, oxmEntityLookup); - - synchronizer.setAaiAdapter(aaiAdapter); - synchronizer.setElasticSearchAdapter(esAdapter); - - registerEntitySynchronizer(synchronizer); - - - IndexCleaner indexCleaner = - new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig); - - registerIndexCleaner(indexCleaner); - - } - - public SyncControllerRegistry getSyncControllerRegistry() { - return syncControllerRegistry; - } - - public void setSyncControllerRegistry(SyncControllerRegistry syncControllerRegistry) { - this.syncControllerRegistry = syncControllerRegistry; - } - - @Override - public void registerController() { - - if ( syncControllerRegistry != null ) { - if ( syncControllerConfig.isEnabled()) { - syncControllerRegistry.registerSyncController(this); - } - } - } - - - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java deleted file mode 100644 index 809c21a..0000000 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/topology/sync/GeoSynchronizer.java +++ /dev/null @@ -1,477 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.sparky.topology.sync; - -import static java.util.concurrent.CompletableFuture.supplyAsync; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.Supplier; - -import org.onap.aai.cl.api.Logger; -import org.onap.aai.cl.eelf.LoggerFactory; -import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.restclient.client.OperationResult; -import org.onap.aai.sparky.config.oxm.GeoEntityLookup; -import org.onap.aai.sparky.config.oxm.GeoOxmEntityDescriptor; -import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor; -import org.onap.aai.sparky.config.oxm.OxmEntityLookup; -import org.onap.aai.sparky.dal.NetworkTransaction; -import org.onap.aai.sparky.dal.rest.HttpMethod; -import org.onap.aai.sparky.inventory.entity.GeoIndexDocument; -import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.sync.AbstractEntitySynchronizer; -import org.onap.aai.sparky.sync.IndexSynchronizer; -import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; -import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; -import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor; -import org.onap.aai.sparky.sync.enumeration.OperationState; -import org.onap.aai.sparky.sync.enumeration.SynchronizerState; -import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval; -import org.onap.aai.sparky.sync.task.StoreDocumentTask; -import org.onap.aai.sparky.util.NodeUtils; -import org.slf4j.MDC; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; - - -/** - * The Class GeoSynchronizer. - */ -public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer { - - private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class); - - private boolean allWorkEnumerated; - private Deque<SelfLinkDescriptor> selflinks; - private GeoEntityLookup geoEntityLookup; - private OxmEntityLookup oxmEntityLookup; - - private Map<String, GeoOxmEntityDescriptor> geoDescriptorMap = null; - - /** - * Instantiates a new geo synchronizer. - * - * @throws Exception the exception - */ - public GeoSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers, - int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig, - NetworkStatisticsConfig esStatConfig, GeoEntityLookup geoEntityLookup, - OxmEntityLookup oxmEntityLookup) throws Exception { - - super(LOG, "GEO", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); - this.geoEntityLookup = geoEntityLookup; - this.oxmEntityLookup = oxmEntityLookup; - this.allWorkEnumerated = false; - this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>(); - this.synchronizerName = "Geo Synchronizer"; - this.geoDescriptorMap = geoEntityLookup.getGeoEntityDescriptors(); - this.aaiEntityStats.intializeEntityCounters(geoDescriptorMap.keySet()); - this.esEntityStats.intializeEntityCounters(geoDescriptorMap.keySet()); - this.syncDurationInMs = -1; - } - - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync() - */ - @Override - public OperationState doSync() { - this.syncDurationInMs = -1; - resetCounters(); - setShouldSkipSync(false); - allWorkEnumerated = false; - syncStartedTimeStampInMs = System.currentTimeMillis(); - String txnID = NodeUtils.getRandomTxnId(); - MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", ""); - - collectAllTheWork(); - return OperationState.OK; - } - - - /** - * Collect all the work. - * - * @return the operation state - */ - public OperationState collectAllTheWork() { - final Map<String,String> contextMap = MDC.getCopyOfContextMap(); - - if (geoDescriptorMap.isEmpty()) { - setShouldSkipSync(true); - LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities"); - return OperationState.ERROR; - } - - Collection<String> syncTypes = geoDescriptorMap.keySet(); - - try { - - /* - * launch a parallel async thread to process the documents for each entity-type (to max the of - * the configured executor anyway) - */ - - aaiWorkOnHand.set(syncTypes.size()); - - for (String key : syncTypes) { - - supplyAsync(new Supplier<Void>() { - - @Override - public Void get() { - MDC.setContextMap(contextMap); - OperationResult typeLinksResult = null; - try { - typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key); - aaiWorkOnHand.decrementAndGet(); - processEntityTypeSelfLinks(typeLinksResult); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc); - } - - return null; - } - - }, aaiExecutor).whenComplete((result, error) -> { - - if (error != null) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage()); - } - }); - - } - - while (aaiWorkOnHand.get() != 0) { - - if (LOG.isDebugEnabled()) { - LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED); - } - - Thread.sleep(1000); - } - - aaiWorkOnHand.set(selflinks.size()); - allWorkEnumerated = true; - syncEntityTypes(); - - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc); - } - return OperationState.OK; - } - - /** - * Sync entity types. - */ - private void syncEntityTypes() { - - while (selflinks.peek() != null) { - - SelfLinkDescriptor linkDescriptor = selflinks.poll(); - aaiWorkOnHand.decrementAndGet(); - - OxmEntityDescriptor descriptor = null; - - if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) { - - descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType()); - - if (descriptor == null) { - LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType()); - // go to next element in iterator - continue; - } - - NetworkTransaction txn = new NetworkTransaction(); - txn.setDescriptor(descriptor); - txn.setLink(linkDescriptor.getSelfLink()); - txn.setOperationType(HttpMethod.GET); - txn.setEntityType(linkDescriptor.getEntityType()); - - aaiWorkOnHand.incrementAndGet(); - - supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor) - .whenComplete((result, error) -> { - - aaiWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage()); - } else { - if (result == null) { - LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink()); - } else { - processEntityTypeSelfLinkResult(result); - } - } - }); - } - } - } - - /** - * Process entity type self links. - * - * @param operationResult the operation result - */ - private void processEntityTypeSelfLinks(OperationResult operationResult) { - - final String jsonResult = operationResult.getResult(); - - if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) { - - try { - JsonNode rootNode = mapper.readTree(jsonResult); - JsonNode resultData = rootNode.get("result-data"); - - if (resultData.isArray()) { - ArrayNode resultDataArrayNode = (ArrayNode) resultData; - - Iterator<JsonNode> elementIterator = resultDataArrayNode.elements(); - - while (elementIterator.hasNext()) { - JsonNode element = elementIterator.next(); - - final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type"); - final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link"); - - if (resourceType != null && resourceLink != null) { - - if (geoDescriptorMap.containsKey(resourceType)) { - selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType)); - } else { - LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType); - // go to next element in iterator - continue; - } - - } - } - } - } catch (IOException exc) { - LOG.error(AaiUiMsgs.ERROR_GENERIC, exc); - } - } - - } - - /** - * Process entity type self link result. - * - * @param txn the txn - */ - private void processEntityTypeSelfLinkResult(NetworkTransaction txn) { - - updateActiveInventoryCounters(txn); - - if (!txn.getOperationResult().wasSuccessful()) { - return; - } - - GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(txn.getEntityType()); - - if ( descriptor == null ) { - return; - } - - try { - if (descriptor.hasGeoEntity()) { - - GeoIndexDocument geoDoc = new GeoIndexDocument(); - - final String jsonResult = txn.getOperationResult().getResult(); - - if (jsonResult != null && jsonResult.length() > 0) { - - populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink()); - - if (!geoDoc.isValidGeoDocument()) { - - LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString()); - - } else { - - String link = null; - try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), geoDoc.getId()); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc); - } - - if (link != null) { - - NetworkTransaction n2 = new NetworkTransaction(); - n2.setLink(link); - n2.setEntityType(txn.getEntityType()); - n2.setDescriptor(txn.getDescriptor()); - n2.setOperationType(HttpMethod.PUT); - - esWorkOnHand.incrementAndGet(); - - supplyAsync(new StoreDocumentTask(geoDoc, n2, elasticSearchAdapter), esExecutor) - .whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage()); - } else { - updateElasticSearchCounters(result); - processStoreDocumentResult(result); - } - }); - } - } - } - } - } catch (JsonProcessingException exc) { - LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc); - } catch (IOException exc) { - LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc); - } - - return; - } - - - /** - * Process store document result. - * - * @param txn the txn - */ - private void processStoreDocumentResult(NetworkTransaction txn) { - - OperationResult or = txn.getOperationResult(); - - if (!or.wasSuccessful()) { - LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString()); - /* - * if(or.getResultCode() != 404 || (or.getResultCode() == 404 && - * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error( - * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " + - * or.getResult()); } - */ - - } - - } - - - @Override - public SynchronizerState getState() { - - if (!isSyncDone()) { - return SynchronizerState.PERFORMING_SYNCHRONIZATION; - } - - return SynchronizerState.IDLE; - - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean) - */ - @Override - public String getStatReport(boolean showFinalReport) { - syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs; - return this.getStatReport(syncDurationInMs, showFinalReport); - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown() - */ - @Override - public void shutdown() { - this.shutdownExecutors(); - } - - /** - * Populate geo document. - * - * @param doc the doc - * @param result the result - * @param resultDescriptor the result descriptor - * @param entityLink the entity link - * @throws JsonProcessingException the json processing exception - * @throws IOException Signals that an I/O exception has occurred. - */ - protected void populateGeoDocument(GeoIndexDocument doc, String result, - OxmEntityDescriptor resultDescriptor, String entityLink) - throws JsonProcessingException, IOException { - - doc.setSelfLink(entityLink); - doc.setEntityType(resultDescriptor.getEntityName()); - - JsonNode entityNode = mapper.readTree(result); - - List<String> primaryKeyValues = new ArrayList<String>(); - String pkeyValue = null; - - for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) { - pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName); - if (pkeyValue != null) { - primaryKeyValues.add(pkeyValue); - } else { - LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName()); - } - } - - final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/"); - doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue); - - GeoOxmEntityDescriptor descriptor = geoDescriptorMap.get(resultDescriptor.getEntityName()); - - String geoLatKey = descriptor.getGeoLatName(); - String geoLongKey = descriptor.getGeoLongName(); - - doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey)); - doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey)); - doc.deriveFields(); - - } - - @Override - protected boolean isSyncDone() { - if (shouldSkipSync()) { - syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs; - return true; - } - - int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get(); - - if (totalWorkOnHand > 0 || !allWorkEnumerated) { - return false; - } - - return true; - } - -} diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java index 69acc42..17c9072 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewandinspect/services/BaseVisualizationService.java @@ -34,11 +34,11 @@ import org.onap.aai.restclient.client.OperationResult; import org.onap.aai.sparky.config.oxm.OxmEntityLookup; import org.onap.aai.sparky.config.oxm.OxmModelLoader; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; import org.onap.aai.sparky.dal.GizmoAdapter; +import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; import org.onap.aai.sparky.logging.AaiUiMsgs; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.subscription.config.SubscriptionConfig; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; import org.onap.aai.sparky.sync.entity.SearchableEntity; import org.onap.aai.sparky.util.NodeUtils; @@ -64,7 +64,7 @@ public class BaseVisualizationService implements VisualizationService { private final ActiveInventoryAdapter aaiAdapter; private final GizmoAdapter gizmoAdapter; - private final ElasticSearchAdapter esAdapter; + private final SearchServiceAdapter searchServiceAdapter; private final ExecutorService aaiExecutorService; private ConcurrentHashMap<Long, VisualizationContext> contextMap; @@ -72,18 +72,18 @@ public class BaseVisualizationService implements VisualizationService { private VisualizationConfigs visualizationConfigs; private SubscriptionConfig subConfig; - private ElasticSearchEndpointConfig endpointEConfig; + private RestEndpointConfig endpointConfig; private ElasticSearchSchemaConfig schemaEConfig; private OxmEntityLookup oxmEntityLookup; public BaseVisualizationService(OxmModelLoader loader, VisualizationConfigs visualizationConfigs, - ActiveInventoryAdapter aaiAdapter, GizmoAdapter gizmoAdapter, ElasticSearchAdapter esAdapter, - ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig, + ActiveInventoryAdapter aaiAdapter, GizmoAdapter gizmoAdapter, SearchServiceAdapter searchServiceAdapter, + RestEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig, int numActiveInventoryWorkers, OxmEntityLookup oxmEntityLookup, SubscriptionConfig subscriptionConfig) throws Exception { this.visualizationConfigs = visualizationConfigs; - this.endpointEConfig = endpointConfig; + this.endpointConfig = endpointConfig; this.schemaEConfig = schemaConfig; this.oxmEntityLookup = oxmEntityLookup; this.subConfig = subscriptionConfig; @@ -97,7 +97,7 @@ public class BaseVisualizationService implements VisualizationService { this.aaiAdapter = aaiAdapter; this.gizmoAdapter = gizmoAdapter; - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -159,10 +159,10 @@ public class BaseVisualizationService implements VisualizationService { if (operationResult.wasSuccessful()) { try { - JsonNode elasticValue = mapper.readValue(operationResult.getResult(), JsonNode.class); + JsonNode searchServiceResults = mapper.readValue(operationResult.getResult(), JsonNode.class); - if (elasticValue != null) { - JsonNode sourceField = elasticValue.get("_source"); + if (searchServiceResults != null) { + JsonNode sourceField = extractSearchServiceContent(searchServiceResults); if (sourceField != null) { sourceEntity = new SearchableEntity(); @@ -203,9 +203,9 @@ public class BaseVisualizationService implements VisualizationService { * Here is where we need to make a dip to elastic-search for the self-link by entity-id (link * hash). */ - dataCollectionResult = esAdapter.retrieveEntityById(endpointEConfig.getEsIpAddress(), - endpointEConfig.getEsServerPort(),schemaEConfig.getIndexName(), - schemaEConfig.getIndexDocType(), queryRequest.getHashId()); + dataCollectionResult = searchServiceAdapter.retrieveEntityById(queryRequest.getHashId(), + schemaEConfig.getIndexName()); + sourceEntity = extractSearchableEntityFromElasticEntity(dataCollectionResult); if (sourceEntity != null) { @@ -377,6 +377,16 @@ public class BaseVisualizationService implements VisualizationService { } return output; } + + private JsonNode extractSearchServiceContent(JsonNode returnedData){ + + JsonNode searchResults = returnedData.get("searchResult"); + JsonNode searchHits = searchResults.get("hits"); + JsonNode searchDoc = searchHits.get(0).get("document"); + JsonNode content = searchDoc.get("content"); + + return content; + } public void shutdown() { aaiExecutorService.shutdown(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java index 8365237..79eded1 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectEntitySynchronizer.java @@ -58,9 +58,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor; import org.onap.aai.sparky.sync.enumeration.OperationState; import org.onap.aai.sparky.sync.enumeration.SynchronizerState; import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchPut; -import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval; -import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate; +import org.onap.aai.sparky.sync.task.PerformSearchServicePut; +import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval; +import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate; import org.onap.aai.sparky.util.NodeUtils; import org.slf4j.MDC; @@ -378,7 +378,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer */ String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); return; @@ -441,8 +441,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer if (wasEntryDiscovered) { if (versionNumber != null && jsonPayload != null) { - String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(), - "default", se.getId(), versionNumber, jsonPayload); + String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(), + se.getId(), jsonPayload); NetworkTransaction transactionTracker = new NetworkTransaction(); transactionTracker.setEntityType(esGetTxn.getEntityType()); @@ -450,9 +450,9 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer transactionTracker.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(), - requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor) - .whenComplete((result, error) -> { + supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(), + requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor) + .whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -478,8 +478,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer updateElasticTxn.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter), - esPutExecutor).whenComplete((result, error) -> { + supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter), + esPutExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); @@ -578,7 +578,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer String link = null; try { - link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId()); + link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage()); } @@ -592,8 +592,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer esWorkOnHand.incrementAndGet(); - supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor) - .whenComplete((result, error) -> { + supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor) + .whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); diff --git a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java index d8b4af6..2cecf25 100644 --- a/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java +++ b/sparkybe-onap-service/src/main/java/org/onap/aai/sparky/viewinspect/sync/ViewInspectSyncController.java @@ -25,15 +25,13 @@ import org.onap.aai.sparky.config.oxm.OxmEntityLookup; import org.onap.aai.sparky.config.oxm.SearchableEntityLookup; import org.onap.aai.sparky.crossentityreference.sync.CrossEntityReferenceSynchronizer; import org.onap.aai.sparky.dal.ActiveInventoryAdapter; -import org.onap.aai.sparky.dal.ElasticSearchAdapter; -import org.onap.aai.sparky.sync.ElasticSearchIndexCleaner; +import org.onap.aai.sparky.dal.rest.config.RestEndpointConfig; +import org.onap.aai.sparky.search.SearchServiceAdapter; import org.onap.aai.sparky.sync.ElasticSearchSchemaFactory; -import org.onap.aai.sparky.sync.IndexCleaner; import org.onap.aai.sparky.sync.IndexIntegrityValidator; import org.onap.aai.sparky.sync.SyncControllerImpl; import org.onap.aai.sparky.sync.SyncControllerRegistrar; import org.onap.aai.sparky.sync.SyncControllerRegistry; -import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig; import org.onap.aai.sparky.sync.config.SyncControllerConfig; @@ -43,13 +41,13 @@ public class ViewInspectSyncController extends SyncControllerImpl private SyncControllerRegistry syncControllerRegistry; private ActiveInventoryAdapter aaiAdapter; - private ElasticSearchAdapter esAdapter; + private SearchServiceAdapter searchServiceAdapter; private ElasticSearchSchemaConfig schemaConfig; - private ElasticSearchEndpointConfig endpointConfig; + private RestEndpointConfig endpointConfig; public ViewInspectSyncController(SyncControllerConfig syncControllerConfig, - ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter, - ElasticSearchSchemaConfig schemaConfig, ElasticSearchEndpointConfig endpointConfig, + ActiveInventoryAdapter aaiAdapter, SearchServiceAdapter searchServiceAdapter, + ElasticSearchSchemaConfig schemaConfig, RestEndpointConfig endpointConfig, NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig, CrossEntityReferenceLookup crossEntityReferenceLookup, OxmEntityLookup oxmEntityLookup, SearchableEntityLookup searchableEntityLookup, @@ -60,10 +58,10 @@ public class ViewInspectSyncController extends SyncControllerImpl // final String controllerName = "View and Inspect Entity Synchronizer"; this.aaiAdapter = aaiAdapter; - this.esAdapter = esAdapter; + this.searchServiceAdapter = searchServiceAdapter; this.schemaConfig = schemaConfig; this.endpointConfig = endpointConfig; - IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(esAdapter, schemaConfig, + IndexIntegrityValidator indexValidator = new IndexIntegrityValidator(searchServiceAdapter, schemaConfig, endpointConfig, elasticSearchSchemaFactory.getIndexSchema(schemaConfig)); registerIndexValidator(indexValidator); @@ -76,7 +74,7 @@ public class ViewInspectSyncController extends SyncControllerImpl oxmEntityLookup, searchableEntityLookup); ses.setAaiAdapter(aaiAdapter); - ses.setElasticSearchAdapter(esAdapter); + ses.setSearchServiceAdapter(searchServiceAdapter); registerEntitySynchronizer(ses); @@ -87,14 +85,10 @@ public class ViewInspectSyncController extends SyncControllerImpl crossEntityReferenceLookup, oxmEntityLookup, searchableEntityLookup); cers.setAaiAdapter(aaiAdapter); - cers.setElasticSearchAdapter(esAdapter); + cers.setSearchServiceAdapter(searchServiceAdapter); registerEntitySynchronizer(cers); - IndexCleaner indexCleaner = - new ElasticSearchIndexCleaner(esAdapter, endpointConfig, schemaConfig); - - registerIndexCleaner(indexCleaner); } |