diff options
author | James Forsyth <jf2512@att.com> | 2018-05-23 20:22:24 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-05-23 20:22:24 +0000 |
commit | 762153cd733f43697679c3f97eba09212cbd2839 (patch) | |
tree | 5823a7f3f1cb2547833960cb477f320bf3d5543c | |
parent | 457c911507f6d54149b54a6c5e40137c1c091d37 (diff) | |
parent | 2ce560a13c699cf7af64aeb559518cf58475f704 (diff) |
Merge "Remove broken not used code"
-rw-r--r-- | src/main/java/org/onap/aai/champ/event/AbstractLoggingChampGraph.java | 501 |
1 files changed, 0 insertions, 501 deletions
diff --git a/src/main/java/org/onap/aai/champ/event/AbstractLoggingChampGraph.java b/src/main/java/org/onap/aai/champ/event/AbstractLoggingChampGraph.java deleted file mode 100644 index 16fc00b..0000000 --- a/src/main/java/org/onap/aai/champ/event/AbstractLoggingChampGraph.java +++ /dev/null @@ -1,501 +0,0 @@ -/** - * ============LICENSE_START========================================== - * org.onap.aai - * =================================================================== - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * =================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END============================================ - */ -package org.onap.aai.champ.event; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -import org.onap.aai.champ.ChampCapabilities; -import org.onap.aai.champ.ChampGraph; -import org.onap.aai.champ.event.ChampEvent.ChampOperation; -import org.onap.aai.champ.exceptions.ChampIndexNotExistsException; -import org.onap.aai.champ.exceptions.ChampMarshallingException; -import org.onap.aai.champ.exceptions.ChampObjectNotExistsException; -import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException; -import org.onap.aai.champ.exceptions.ChampSchemaViolationException; -import org.onap.aai.champ.exceptions.ChampUnmarshallingException; -import org.onap.aai.champ.model.ChampObject; -import org.onap.aai.champ.model.ChampObjectConstraint; -import org.onap.aai.champ.model.ChampObjectIndex; -import org.onap.aai.champ.model.ChampPartition; -import org.onap.aai.champ.model.ChampRelationship; -import org.onap.aai.champ.model.ChampRelationshipConstraint; -import org.onap.aai.champ.model.ChampRelationshipIndex; -import org.onap.aai.champ.model.ChampSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaPublisher; - - -/** - * This class provides the hooks to allow Champ operations to be logged to an event - * stream. - */ -public abstract class AbstractLoggingChampGraph implements ChampGraph { - - private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class); - - public abstract ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException; - public abstract ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException; - public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException; - public abstract void executeDeleteObject(Object key) throws ChampObjectNotExistsException; - public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams); - public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException; - public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; - @Override - public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException; - public abstract void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException; - @Override - public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException; - @Override - public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams); - public abstract ChampPartition executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException; - public abstract void executeDeletePartition(ChampPartition graph); - public abstract void executeStoreObjectIndex(ChampObjectIndex index); - @Override - public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName); - @Override - public abstract Stream<ChampObjectIndex> retrieveObjectIndices(); - public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException; - public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index); - @Override - public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName); - public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices(); - public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException; - public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException; - public abstract ChampSchema retrieveSchema(); - public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException; - public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException; - public abstract void deleteSchema(); - public abstract ChampCapabilities capabilities(); - - - /** Configuration property for setting the comma-separated list of servers to use for - * communicating with the event bus. */ - public static final String PARAM_EVENT_STREAM_HOSTS = "champ.event.stream.hosts"; - - /** Configuration property for setting the number of events that we will try to 'batch' - * up before sending them to the event bus. */ - public static final String PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size"; - public static final Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1; - - /** Configuration property for setting the maximum amount of time to wait for a batch of - * outgoing messages to fill up before sending the batch. */ - public static final String PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout"; - public static final Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; - - public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size"; - public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100; - - /** The event stream topic that we will publish Champ events to. */ - public static final String EVENT_TOPIC = "champRawEvents"; - - /** Number of events to 'batch up' before actually publishing them to the event bus. */ - private Integer eventStreamBatchSize; - - private Integer eventStreamBatchTimeout; - - private Integer eventStreamPublisherPoolSize; - - /** Comma-separated list of hosts for connecting to the event bus. */ - private String eventStreamHosts = null; - - /** Client used for publishing messages to the event bus. */ - protected CambriaPublisher producer; - - /** Pool of worker threads that do the work of publishing the events to the event bus. */ - protected ThreadPoolExecutor publisherPool; - - - /** - * Create a new instance of the AbstractLoggingChampGraph. - * - * @param properties - Set of configuration properties for this graph instance. - */ - protected AbstractLoggingChampGraph(Map<String, Object> properties) { - - // Extract the necessary parameters from the configuration properties. - configure(properties); - - // Create the executor pool that will do the work of publishing events to the event bus. - publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize); - - // Make sure that we are actually provided a list of hosts for connecting to the event - // bus before we actually try to do anything useful. - if(eventStreamHosts == null) { - - // We were not supplied a list of event bus hosts, so just bail. - logger.error("Cannot initialize event stream publisher without at least one event bus host."); - logger.error("NOTE!! Champ events will NOT be published to the event stream!"); - return; - } - - try { - - // Instantiate the producer that we will use to publish events to the event stream. - setProducer(new CambriaClientBuilders.PublisherBuilder() - .usingHosts(eventStreamHosts) - .onTopic(EVENT_TOPIC) - .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout) - .build()); - - } catch (MalformedURLException | GeneralSecurityException e) { - - logger.error("Could not instantiate event stream producer due to: " + e.getMessage()); - logger.error("NOTE: Champ events will NOT be published to the event stream"); - producer = null; - } - } - - - /** - * Process the configuration properties supplied for this graph instance. - * - * @param properties - Configuration parameters. - */ - private void configure(Map<String, Object> properties) { - - eventStreamBatchSize = - (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE, DEFAULT_EVENT_STREAM_BATCH_SIZE); - eventStreamBatchTimeout = - (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS); - eventStreamPublisherPoolSize = - (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE); - - if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) { - eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS); - } - } - - public void setProducer(CambriaPublisher aProducer) { - - producer = aProducer; - } - - private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) { - - if(properties.containsKey(property)) { - return properties.get(property); - } else { - return defaultValue; - } - } - - @Override - public void shutdown() { - - if(publisherPool != null) { - publisherPool.shutdown(); - - try { - publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) {} - } - - if(producer != null) { - producer.close(); - } - } - - public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException { - - ChampObject storedObject = executeStoreObject(object); - - if(storedObject != null) { - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.STORE) - .entity(storedObject) - .build()); - } - - return storedObject; - } - - - public ChampObject replaceObject(ChampObject object) - throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException { - - ChampObject replacedObject = executeReplaceObject(object); - - if(replacedObject != null) { - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.REPLACE) - .entity(replacedObject) - .build()); - } - - return replacedObject; - } - - - public void deleteObject(Object key) throws ChampObjectNotExistsException { - - // Retrieve the object that we are deleting before it's gone, so that we can - // report it to the event stream. - Optional<ChampObject> objectToDelete = Optional.empty(); - try { - objectToDelete = retrieveObject(key); - - } catch (ChampUnmarshallingException e) { - logger.error("Unable to generate delete object log: " + e.getMessage(),e); - } - - executeDeleteObject(key); - - if(objectToDelete.isPresent()) { - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.DELETE) - .entity(objectToDelete.get()) - .build()); - } - } - - - public ChampRelationship storeRelationship(ChampRelationship relationship) - throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException { - - ChampRelationship storedRelationship = executeStoreRelationship(relationship); - - if(storedRelationship != null) { - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.STORE) - .entity(storedRelationship) - .build()); - } - - return storedRelationship; - } - - - public ChampRelationship replaceRelationship(ChampRelationship relationship) - throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { - - ChampRelationship replacedRelationship = executeReplaceRelationship(relationship); - - if(replacedRelationship != null) { - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.REPLACE) - .entity(replacedRelationship) - .build()); - } - - return replacedRelationship; - } - - - public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException { - - executeDeleteRelationship(relationship); - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.DELETE) - .entity(relationship) - .build()); - } - - - public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException { - - ChampPartition storedPartition = executeStorePartition(partition); - - if(storedPartition != null) { - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.STORE) - .entity(storedPartition) - .build()); - } - - return storedPartition; - } - - - public void deletePartition(ChampPartition graph) { - - executeDeletePartition(graph); - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.DELETE) - .entity(graph) - .build()); - } - - - public void storeObjectIndex(ChampObjectIndex index) { - - executeStoreObjectIndex(index); - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.STORE) - .entity(index) - .build()); - } - - - public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException { - - // Retrieve the index that we are deleting before it's gone, so that we can - // report it to the event stream. - Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName); - - executeDeleteObjectIndex(indexName); - - if(indexToDelete.isPresent()) { - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.DELETE) - .entity(indexToDelete.get()) - .build()); - } - } - - - public void storeRelationshipIndex(ChampRelationshipIndex index) { - - executeStoreRelationshipIndex(index); - - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.STORE) - .entity(index) - .build()); - } - - - public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException { - - // Retrieve the index that we are deleting before it's gone, so that we can - // report it to the event stream. - Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName); - - executeDeleteRelationshipIndex(indexName); - - if(indexToDelete.isPresent()) { - // Update the event stream with the current operation. - logEvent(ChampEvent.builder() - .operation(ChampOperation.DELETE) - .entity(indexToDelete.get()) - .build()); - } - } - - - /** - * Submits an event to be published to the event stream. - * - * @param anEvent - The event to be published. - */ - public void logEvent(ChampEvent anEvent) { - - if(logger.isDebugEnabled()) { - logger.debug("Submitting event to be published to the event bus: " + anEvent.toString()); - } - - try { - - // Try submitting the event to be published to the event bus. - publisherPool.execute(new EventPublisher(anEvent)); - - } catch (RejectedExecutionException re) { - logger.error("Event could not be published to the event bus due to: " + re.getMessage(),re); - - } catch (NullPointerException npe) { - logger.error("Can not publish null event to event bus." + npe.getMessage(),npe); - } - } - - - /** - * This class runs as a background thread and is responsible for pulling Champ events off - * of the internal queue and publishing them to the event stream. - */ - private class EventPublisher implements Runnable { - - /** Partition key to use when publishing events to the event stream. We WANT all events - * to go to a single partition, so we are just using a hard-coded key for every event. */ - private static final String EVENTS_PARTITION_KEY = "champEventKey"; - - private ChampEvent event; - - public EventPublisher(ChampEvent event) { - this.event = event; - } - - - @Override - public void run() { - - boolean done = false; - while(!done && !Thread.currentThread().isInterrupted()) { - try { - - // Make sure that we actually have a producer instance to use to publish - // the event with. - if(producer != null) { - - // Try publishing the event to the event bus. - producer.send(EVENTS_PARTITION_KEY, event.toJson()); - - } else if (logger.isDebugEnabled()) { - logger.debug("Event bus producer is not instantiated - will not attempt to publish event"); - } - - done = true; - - } catch (IOException e) { - - // We were unable to publish to the event bus, so wait a bit and then try - // again. - try { - Thread.sleep(500); - - } catch (InterruptedException e1) { - logger.info("Stopping event publisher worker thread."); - return; - } - } - } - } - } -} |