diff options
author | Arul.Nambi <arul.nambi@amdocs.com> | 2017-09-26 14:00:57 -0400 |
---|---|---|
committer | Arul.Nambi <arul.nambi@amdocs.com> | 2017-09-26 14:01:41 -0400 |
commit | c593dfe4c59d37d5d4ea14e3ac31da3318029562 (patch) | |
tree | 76cc5a494f02e14b809caad9c050fbfd6cd61a51 /src/main/java/org/openecomp/sparky/synchronizer/GeoSynchronizer.java | |
parent | 6777c6092050a0271c5d7de9c239cf1580d41fa8 (diff) |
Renaming openecomp to onap
Issue-ID: AAI-208
Change-Id: I2bd02287bed376111156aca0100e2b7b74e368e3
Signed-off-by: Arul.Nambi <arul.nambi@amdocs.com>
Diffstat (limited to 'src/main/java/org/openecomp/sparky/synchronizer/GeoSynchronizer.java')
-rw-r--r-- | src/main/java/org/openecomp/sparky/synchronizer/GeoSynchronizer.java | 466 |
1 files changed, 0 insertions, 466 deletions
diff --git a/src/main/java/org/openecomp/sparky/synchronizer/GeoSynchronizer.java b/src/main/java/org/openecomp/sparky/synchronizer/GeoSynchronizer.java deleted file mode 100644 index 493f3c9..0000000 --- a/src/main/java/org/openecomp/sparky/synchronizer/GeoSynchronizer.java +++ /dev/null @@ -1,466 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - */ -package org.openecomp.sparky.synchronizer; - -import static java.util.concurrent.CompletableFuture.supplyAsync; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.Supplier; - -import org.openecomp.cl.api.Logger; -import org.openecomp.cl.eelf.LoggerFactory; -import org.openecomp.cl.mdc.MdcContext; -import org.openecomp.sparky.config.oxm.OxmEntityDescriptor; -import org.openecomp.sparky.dal.NetworkTransaction; -import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig; -import org.openecomp.sparky.dal.rest.HttpMethod; -import org.openecomp.sparky.dal.rest.OperationResult; -import org.openecomp.sparky.inventory.entity.GeoIndexDocument; -import org.openecomp.sparky.logging.AaiUiMsgs; -import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor; -import org.openecomp.sparky.synchronizer.enumeration.OperationState; -import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState; -import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval; -import org.openecomp.sparky.synchronizer.task.StoreDocumentTask; -import org.openecomp.sparky.util.NodeUtils; -import org.slf4j.MDC; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; - - -/** - * The Class GeoSynchronizer. - */ -public class GeoSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer { - - private static final Logger LOG = LoggerFactory.getInstance().getLogger(GeoSynchronizer.class); - - private boolean allWorkEnumerated; - private Deque<SelfLinkDescriptor> selflinks; - - private ElasticSearchConfig elasticConfig = null; - private Map<String, OxmEntityDescriptor> geoDescriptorMap = null; - - /** - * Instantiates a new geo synchronizer. - * - * @param indexName the index name - * @throws Exception the exception - */ - public GeoSynchronizer(String indexName) throws Exception { - - super(LOG, "GEO", 2, 5, 5, indexName); - this.allWorkEnumerated = false; - this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>(); - this.synchronizerName = "Geo Synchronizer"; - this.geoDescriptorMap = oxmModelLoader.getGeoEntityDescriptors(); - this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap); - this.esEntityStats.initializeCountersFromOxmEntityDescriptors(geoDescriptorMap); - this.syncDurationInMs = -1; - } - - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync() - */ - @Override - public OperationState doSync() { - resetCounters(); - allWorkEnumerated = false; - syncStartedTimeStampInMs = System.currentTimeMillis(); - String txnID = NodeUtils.getRandomTxnId(); - MdcContext.initialize(txnID, "GeoSynchronizer", "", "Sync", ""); - - collectAllTheWork(); - return OperationState.OK; - } - - - /** - * Collect all the work. - * - * @return the operation state - */ - public OperationState collectAllTheWork() { - final Map<String,String> contextMap = MDC.getCopyOfContextMap(); - if (elasticConfig == null) { - try { - elasticConfig = ElasticSearchConfig.getConfig(); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.CONFIGURATION_ERROR, "Search"); - } - } - - if (geoDescriptorMap.isEmpty()) { - setShouldSkipSync(true); - LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "geo entities"); - return OperationState.ERROR; - } - - Collection<String> syncTypes = geoDescriptorMap.keySet(); - - try { - - /* - * launch a parallel async thread to process the documents for each entity-type (to max the of - * the configured executor anyway) - */ - - aaiWorkOnHand.set(syncTypes.size()); - - for (String key : syncTypes) { - - supplyAsync(new Supplier<Void>() { - - @Override - public Void get() { - MDC.setContextMap(contextMap); - OperationResult typeLinksResult = null; - try { - typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key); - aaiWorkOnHand.decrementAndGet(); - processEntityTypeSelfLinks(typeLinksResult); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc); - } - - return null; - } - - }, aaiExecutor).whenComplete((result, error) -> { - - if (error != null) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage()); - } - }); - - } - - while (aaiWorkOnHand.get() != 0) { - - if (LOG.isDebugEnabled()) { - LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED); - } - - Thread.sleep(1000); - } - - aaiWorkOnHand.set(selflinks.size()); - allWorkEnumerated = true; - syncEntityTypes(); - - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc); - } - return OperationState.OK; - } - - /** - * Sync entity types. - */ - private void syncEntityTypes() { - - while (selflinks.peek() != null) { - - SelfLinkDescriptor linkDescriptor = selflinks.poll(); - aaiWorkOnHand.decrementAndGet(); - - OxmEntityDescriptor descriptor = null; - - if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) { - - descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType()); - - if (descriptor == null) { - LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType()); - // go to next element in iterator - continue; - } - - NetworkTransaction txn = new NetworkTransaction(); - txn.setDescriptor(descriptor); - txn.setLink(linkDescriptor.getSelfLink()); - txn.setOperationType(HttpMethod.GET); - txn.setEntityType(linkDescriptor.getEntityType()); - - aaiWorkOnHand.incrementAndGet(); - - supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor) - .whenComplete((result, error) -> { - - aaiWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage()); - } else { - if (result == null) { - LOG.error(AaiUiMsgs.SELF_LINK_GET_NO_RESPONSE, linkDescriptor.getSelfLink()); - } else { - processEntityTypeSelfLinkResult(result); - } - } - }); - } - } - } - - /** - * Process entity type self links. - * - * @param operationResult the operation result - */ - private void processEntityTypeSelfLinks(OperationResult operationResult) { - - JsonNode rootNode = null; - - final String jsonResult = operationResult.getResult(); - - if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) { - - try { - rootNode = mapper.readTree(jsonResult); - } catch (IOException exc) { - LOG.error(AaiUiMsgs.ERROR_GENERIC, exc); - return; - } - - JsonNode resultData = rootNode.get("result-data"); - ArrayNode resultDataArrayNode = null; - - if (resultData.isArray()) { - resultDataArrayNode = (ArrayNode) resultData; - - Iterator<JsonNode> elementIterator = resultDataArrayNode.elements(); - JsonNode element = null; - - while (elementIterator.hasNext()) { - element = elementIterator.next(); - - final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type"); - final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link"); - - if (resourceType != null && resourceLink != null) { - - if (geoDescriptorMap.containsKey(resourceType)) { - selflinks.add(new SelfLinkDescriptor(resourceLink + "?nodes-only", resourceType)); - } else { - LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType); - // go to next element in iterator - continue; - } - - } - } - } - } - - } - - /** - * Process entity type self link result. - * - * @param txn the txn - */ - private void processEntityTypeSelfLinkResult(NetworkTransaction txn) { - - updateActiveInventoryCounters(txn); - - if (!txn.getOperationResult().wasSuccessful()) { - return; - } - - try { - if (!(txn.getDescriptor().getGeoLatName().isEmpty() - && txn.getDescriptor().getGeoLongName().isEmpty())) { - - GeoIndexDocument geoDoc = new GeoIndexDocument(oxmModelLoader); - - final String jsonResult = txn.getOperationResult().getResult(); - - if (jsonResult != null && jsonResult.length() > 0) { - - populateGeoDocument(geoDoc, jsonResult, txn.getDescriptor(), txn.getLink()); - - if (!geoDoc.isValidGeoDocument()) { - - LOG.info(AaiUiMsgs.GEO_SYNC_IGNORING_ENTITY, geoDoc.getEntityType(), geoDoc.toString()); - - } else { - - String link = null; - try { - link = getElasticFullUrl("/" + geoDoc.getId(), getIndexName(), "default"); - } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc); - } - - if (link != null) { - - NetworkTransaction n2 = new NetworkTransaction(); - n2.setLink(link); - n2.setEntityType(txn.getEntityType()); - n2.setDescriptor(txn.getDescriptor()); - n2.setOperationType(HttpMethod.PUT); - - esWorkOnHand.incrementAndGet(); - - supplyAsync(new StoreDocumentTask(geoDoc, n2, esDataProvider), esExecutor) - .whenComplete((result, error) -> { - - esWorkOnHand.decrementAndGet(); - - if (error != null) { - LOG.error(AaiUiMsgs.ES_STORE_FAILURE, error.getMessage()); - } else { - updateElasticSearchCounters(result); - processStoreDocumentResult(result); - } - }); - } - } - } - } - } catch (JsonProcessingException exc) { - LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc); - } catch (IOException exc) { - LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc); - } - - return; - } - - - /** - * Process store document result. - * - * @param txn the txn - */ - private void processStoreDocumentResult(NetworkTransaction txn) { - - OperationResult or = txn.getOperationResult(); - - if (!or.wasSuccessful()) { - LOG.error(AaiUiMsgs.ES_STORE_FAILURE, or.toString()); - /* - * if(or.getResultCode() != 404 || (or.getResultCode() == 404 && - * !synchronizerConfig.isResourceNotFoundErrorsSupressed())) { logger.error( - * "Skipping failed resource = " + "link" + " RC=[" + or.getResultCode() + "]. Message: " + - * or.getResult()); } - */ - - } - - } - - - @Override - public SynchronizerState getState() { - - if (!isSyncDone()) { - return SynchronizerState.PERFORMING_SYNCHRONIZATION; - } - - return SynchronizerState.IDLE; - - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean) - */ - @Override - public String getStatReport(boolean showFinalReport) { - syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs; - return this.getStatReport(syncDurationInMs, showFinalReport); - } - - /* (non-Javadoc) - * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown() - */ - @Override - public void shutdown() { - this.shutdownExecutors(); - } - - /** - * Populate geo document. - * - * @param doc the doc - * @param result the result - * @param resultDescriptor the result descriptor - * @param entityLink the entity link - * @throws JsonProcessingException the json processing exception - * @throws IOException Signals that an I/O exception has occurred. - */ - protected void populateGeoDocument(GeoIndexDocument doc, String result, - OxmEntityDescriptor resultDescriptor, String entityLink) - throws JsonProcessingException, IOException { - - doc.setSelfLink(entityLink); - doc.setEntityType(resultDescriptor.getEntityName()); - - JsonNode entityNode = mapper.readTree(result); - - List<String> primaryKeyValues = new ArrayList<String>(); - String pkeyValue = null; - - for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) { - pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName); - if (pkeyValue != null) { - primaryKeyValues.add(pkeyValue); - } else { - LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName()); - } - } - - final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/"); - doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue); - String geoLatKey = resultDescriptor.getGeoLatName(); - String geoLongKey = resultDescriptor.getGeoLongName(); - - doc.setLatitude(NodeUtils.getNodeFieldAsText(entityNode, geoLatKey)); - doc.setLongitude(NodeUtils.getNodeFieldAsText(entityNode, geoLongKey)); - doc.deriveFields(); - - } - - @Override - protected boolean isSyncDone() { - int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get(); - - if (totalWorkOnHand > 0 || !allWorkEnumerated) { - return false; - } - - return true; - } - -} |