aboutsummaryrefslogtreecommitdiffstats
path: root/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
diff options
context:
space:
mode:
Diffstat (limited to 'champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java')
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java792
1 files changed, 792 insertions, 0 deletions
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
new file mode 100644
index 0000000..79a7f3c
--- /dev/null
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
@@ -0,0 +1,792 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.event;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.onap.aai.champcore.ChampCapabilities;
+import org.onap.aai.champcore.ChampGraph;
+import org.onap.aai.champcore.ChampTransaction;
+import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
+import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampMarshallingException;
+import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
+import org.onap.aai.champcore.exceptions.ChampTransactionException;
+import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
+import org.onap.aai.champcore.model.ChampObject;
+import org.onap.aai.champcore.model.ChampObjectConstraint;
+import org.onap.aai.champcore.model.ChampObjectIndex;
+import org.onap.aai.champcore.model.ChampPartition;
+import org.onap.aai.champcore.model.ChampRelationship;
+import org.onap.aai.champcore.model.ChampRelationshipConstraint;
+import org.onap.aai.champcore.model.ChampRelationshipIndex;
+import org.onap.aai.champcore.model.ChampSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.aai.event.api.EventPublisher;
+
+
+
+/**
+ * This class provides the hooks to allow Champ operations to be logged to an event
+ * stream.
+ */
+public abstract class AbstractLoggingChampGraph implements ChampGraph {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
+
+ public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
+ public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
+ public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
+ public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
+ @Override
+ public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
+ @Override
+ public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
+ public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
+ public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
+ public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
+ public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
+
+
+ /**
+ * Creates or updates a vertex in the graph data store.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param object - The vertex to be created or updated.
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @return - The vertex, as created, marshaled as a {@link ChampObject}
+ *
+ * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
+ * into the backend representation
+ * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
+ * by {@link ChampGraph#retrieveSchema}
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * is not present or object not found in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
+
+ /**
+ * Updates an existing vertex in the graph store.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param object - The vertex to be created or updated.
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @return - The updated vertex, marshaled as a {@link ChampObject}
+ *
+ * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
+ * the backend representation
+ * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
+ * by {@link ChampGraph#retrieveSchema}
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * is not present or object not found in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
+
+ /**
+ * Deletes an existing vertex from the graph store.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * is not present or object not found in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
+
+ /**
+ * Creates or updates an edge in the graph data store.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param relationship - The ChampRelationship that you wish to store in the graph
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @return - The {@link ChampRelationship} as it was stored.
+ *
+ * @throws ChampUnmarshallingException - If the edge which was created could not be
+ * unmarshalled into a ChampRelationship
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * marshalled into the backend representation
+ * @throws ChampObjectNotExistsException - If either the source or target object referenced
+ * by this relationship does not exist in the graph
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * specifed by {@link ChampGraph#retrieveSchema}
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * but the object cannot be found in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+
+ /**
+ * Replaces an existing edge in the graph data store.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param relationship - The ChampRelationship that you wish to replace in the graph
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @return - The {@link ChampRelationship} as it was stored.
+ *
+ * @throws ChampUnmarshallingException - If the edge which was created could not be
+ * unmarshalled into a ChampRelationship
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * marshalled into the backend representation
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * specifed by {@link ChampGraph#retrieveSchema}
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * but the object cannot be found in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+
+ /**
+ * Removes an edge from the graph data store.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param relationship - The ChampRelationship that you wish to remove from the graph.
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * but the object cannot be found in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
+
+ /**
+ * Create or update a {@link ChampPartition}.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param partition - The ChampPartition that you wish to create or update in the graph.
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @return - The {@link ChampPartition} as it was stored.
+ *
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * specifed by {@link ChampGraph#retrieveSchema}
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * but the object cannot be found in the graph
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * marshalled into the backend representation
+ * @throws ChampObjectNotExistsException - If either the source or target object referenced
+ * by this relationship does not exist in the graph
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
+
+ /**
+ * Removes a partition from the graph.
+ * <p>
+ * If a transaction context is not provided, then a transaction will be automatically
+ * created and committed for this operation only, otherwise, the supplied transaction
+ * will be used and it will be up to the caller to commit the transaction at its
+ * discretion.
+ *
+ * @param graph - The partition to be removed.
+ * @param transaction - Optional transaction context to perform the operation in.
+ *
+ * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
+ */
+ public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
+
+ /**
+ * Create or update an object index in the graph.
+ *
+ * @param index - The object index to be created/updated.
+ */
+ public abstract void executeStoreObjectIndex(ChampObjectIndex index);
+
+ public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
+ public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
+ public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
+ public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
+ public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
+ public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
+ public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
+ public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
+ public abstract ChampSchema retrieveSchema();
+ public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
+ public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
+ public abstract void deleteSchema();
+ public abstract ChampCapabilities capabilities();
+
+
+
+ public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
+ public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
+
+ public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
+ public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
+
+ public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+
+
+
+ /** Number of events that can be queued up for publication before we begin dropping
+ * events. */
+ private Integer eventQueueCapacity;
+
+ /** Number of event publisher worker threads. */
+ private Integer eventStreamPublisherPoolSize;
+
+ /** Pool of worker threads that do the work of publishing the events to the event bus. */
+ protected ThreadPoolExecutor publisherPool;
+
+ /** Client used for publishing events to the event bus. */
+ protected EventPublisher producer;
+
+ /** Internal queue where outgoing events will be buffered until they can be serviced by
+ * the event publisher worker threads. */
+ protected BlockingQueue<ChampEvent> eventQueue;
+
+
+ /**
+ * Thread factory for the event producer workers.
+ */
+ private class ProducerWorkerThreadFactory implements ThreadFactory {
+
+ private AtomicInteger threadNumber = new AtomicInteger(1);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
+ }
+ }
+
+
+ /**
+ * Create a new instance of the AbstractLoggingChampGraph.
+ *
+ * @param properties - Set of configuration properties for this graph instance.
+ */
+ protected AbstractLoggingChampGraph(Map<String, Object> properties) {
+
+ // Extract the necessary parameters from the configuration properties.
+ configure(properties);
+
+ // Make sure we were passed an event producer as one of our properties, otherwise
+ // there is really nothing more we can do...
+ if(producer == null) {
+ logger.error("No event stream producer was supplied.");
+ logger.error("NOTE!! Champ events will NOT be published to the event stream!");
+ return;
+ }
+
+ // Create the blocking queue that we will use to buffer events that we want
+ // published to the event bus.
+ eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
+
+ // Create the executor pool that will do the work of publishing events to the event bus.
+ publisherPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
+ new ProducerWorkerThreadFactory());
+
+ try {
+
+ // Start up the producer worker threads.
+ for(int i=0; i<eventStreamPublisherPoolSize; i++) {
+ publisherPool.submit(new EventPublisherWorker());
+ }
+
+ } catch (Exception e) {
+
+ logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
+ logger.error("NOTE!! Champ events may NOT be published to the event stream!");
+ return;
+ }
+ }
+
+
+ /**
+ * Process the configuration properties supplied for this graph instance.
+ *
+ * @param properties - Configuration parameters.
+ */
+ private void configure(Map<String, Object> properties) {
+
+ producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
+
+ eventQueueCapacity =
+ (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
+ eventStreamPublisherPoolSize =
+ (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
+ }
+
+
+ public void setProducer(EventPublisher aProducer) {
+
+ producer = aProducer;
+ }
+
+ private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
+
+ if(properties.containsKey(property)) {
+ return properties.get(property);
+ } else {
+ return defaultValue;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+
+ if(publisherPool != null) {
+ publisherPool.shutdown();
+
+ try {
+ publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {}
+ }
+
+ if(producer != null) {
+
+ try {
+ producer.close();
+
+ } catch (Exception e) {
+ logger.error("Failed to stop event stream producer: " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
+
+ try {
+
+ // Commit the transaction.
+ transaction.commit();
+
+ } catch (ChampTransactionException e) {
+
+ logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
+
+ List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
+ for(ChampEvent event : enqueuedEvents) {
+
+ logger.debug("Graph event " + event.toString() + " not published.");
+ }
+ throw e;
+ }
+
+ // Now that the transaction has been successfully committed, we need
+ // to log the events that were produced within that transaction's
+ // context.
+ List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
+ for(ChampEvent event : enqueuedEvents) {
+ logEvent(event);
+ }
+ }
+
+ @Override
+ public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
+
+ // Rollback the transaction.
+ transaction.rollback();
+ }
+
+ @Override
+ public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
+ return storeObject(object, Optional.empty());
+ }
+
+ @Override
+ public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
+
+ ChampObject storedObject = executeStoreObject(object, transaction);
+
+ if(storedObject != null) {
+
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.STORE)
+ .entity(storedObject)
+ .build(),
+ transaction);
+ }
+
+ return storedObject;
+ }
+
+ @Override
+ public ChampObject replaceObject(ChampObject object)
+ throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
+
+ return replaceObject(object, Optional.empty());
+ }
+
+ @Override
+ public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
+ throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
+
+ ChampObject replacedObject = executeReplaceObject(object, transaction);
+
+ if(replacedObject != null) {
+
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.REPLACE)
+ .entity(replacedObject)
+ .build(),
+ transaction);
+ }
+
+ return replacedObject;
+ }
+
+ @Override
+ public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
+ deleteObject(key, Optional.empty());
+ }
+
+ @Override
+ public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
+
+ // Retrieve the object that we are deleting before it's gone, so that we can
+ // report it to the event stream.
+ Optional<ChampObject> objectToDelete = Optional.empty();
+ try {
+ objectToDelete = retrieveObject(key, transaction);
+
+ } catch (ChampUnmarshallingException e) {
+ logger.error("Unable to generate delete object log: " + e.getMessage());
+ }
+
+ executeDeleteObject(key, transaction);
+
+ if(objectToDelete.isPresent()) {
+ // Update the event stream with the current operation.
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.DELETE)
+ .entity(objectToDelete.get())
+ .build(),
+ transaction);
+ }
+ }
+
+ @Override
+ public ChampRelationship storeRelationship(ChampRelationship relationship)
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampObjectNotExistsException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
+ return storeRelationship(relationship, Optional.empty());
+ }
+
+ @Override
+ public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampObjectNotExistsException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
+
+ ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
+
+ if(storedRelationship != null) {
+
+ // Update the event stream with the current operation.
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.STORE)
+ .entity(storedRelationship)
+ .build(),
+ transaction);
+ }
+
+ return storedRelationship;
+ }
+
+ @Override
+ public ChampRelationship replaceRelationship(ChampRelationship relationship)
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
+ return replaceRelationship(relationship, Optional.empty());
+ }
+
+ @Override
+ public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
+
+ ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
+
+ if(replacedRelationship != null) {
+
+ // Update the event stream with the current operation.
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.REPLACE)
+ .entity(replacedRelationship)
+ .build(),
+ transaction);
+ }
+
+ return replacedRelationship;
+ }
+
+ @Override
+ public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
+ deleteRelationship(relationship, Optional.empty());
+ }
+
+ @Override
+ public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
+
+ executeDeleteRelationship(relationship, transaction);
+
+ // Update the event stream with the current operation.
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.DELETE)
+ .entity(relationship)
+ .build(),
+ transaction);
+ }
+
+ @Override
+ public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
+ return storePartition(partition, Optional.empty());
+ }
+
+ @Override
+ public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
+
+ ChampPartition storedPartition = executeStorePartition(partition, transaction);
+
+ if(storedPartition != null) {
+
+ // Update the event stream with the current operation.
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.STORE)
+ .entity(storedPartition)
+ .build(),
+ transaction);
+ }
+
+ return storedPartition;
+ }
+
+ @Override
+ public void deletePartition(ChampPartition graph) throws ChampTransactionException{
+ deletePartition(graph, Optional.empty());
+ }
+
+ @Override
+ public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
+
+ executeDeletePartition(graph, transaction);
+
+ // Update the event stream with the current operation.
+ logOrEnqueueEvent(ChampEvent.builder()
+ .operation(ChampOperation.DELETE)
+ .entity(graph)
+ .build(),
+ transaction);
+ }
+
+ @Override
+ public void storeObjectIndex(ChampObjectIndex index) {
+
+ executeStoreObjectIndex(index);
+
+ // Update the event stream with the current operation.
+ logEvent(ChampEvent.builder()
+ .operation(ChampOperation.STORE)
+ .entity(index)
+ .build());
+ }
+
+
+ public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
+
+ // Retrieve the index that we are deleting before it's gone, so that we can
+ // report it to the event stream.
+ Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
+
+ executeDeleteObjectIndex(indexName);
+
+ if(indexToDelete.isPresent()) {
+ // Update the event stream with the current operation.
+ logEvent(ChampEvent.builder()
+ .operation(ChampOperation.DELETE)
+ .entity(indexToDelete.get())
+ .build());
+ }
+ }
+
+
+ public void storeRelationshipIndex(ChampRelationshipIndex index) {
+
+ executeStoreRelationshipIndex(index);
+
+ // Update the event stream with the current operation.
+ logEvent(ChampEvent.builder()
+ .operation(ChampOperation.STORE)
+ .entity(index)
+ .build());
+ }
+
+
+ public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
+
+ // Retrieve the index that we are deleting before it's gone, so that we can
+ // report it to the event stream.
+ Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
+
+ executeDeleteRelationshipIndex(indexName);
+
+ if(indexToDelete.isPresent()) {
+ // Update the event stream with the current operation.
+ logEvent(ChampEvent.builder()
+ .operation(ChampOperation.DELETE)
+ .entity(indexToDelete.get())
+ .build());
+ }
+ }
+
+ private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
+
+ if(!transaction.isPresent()) {
+ // Update the event stream with the current operation.
+ logEvent(event);
+ } else {
+
+ // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
+ event.setDbTransactionId ( transaction.get ().id () );
+ transaction.get().logEvent(event);
+ }
+ }
+
+ /**
+ * Submits an event to be published to the event stream.
+ *
+ * @param anEvent - The event to be published.
+ */
+ public void logEvent(ChampEvent anEvent) {
+
+ if(eventQueue == null) {
+ return;
+ }
+
+ logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
+ if(logger.isDebugEnabled()) {
+ logger.debug("Event payload: " + anEvent.toString());
+ }
+
+ // Try to submit the event to be published to the event bus.
+ if(!eventQueue.offer(anEvent)) {
+ logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+ }
+ }
+
+
+ /**
+ * This class implements the worker threads for our thread pool which are responsible for
+ * pulling the next outgoing event from the internal buffer and forwarding them to the event
+ * bus client.
+ * <p>
+ * Each publish operation is performed synchronously, so that the thread will only move on
+ * to the next available event once it has actually published the current event to the bus.
+ */
+ private class EventPublisherWorker implements Runnable {
+
+ /** Partition key to use when publishing events to the event stream. We WANT all events
+ * to go to a single partition, so we are just using a hard-coded key for every event. */
+ private static final String EVENTS_PARTITION_KEY = "champEventKey";
+
+
+ @Override
+ public void run() {
+
+ while(true) {
+
+ ChampEvent event = null;
+ try {
+
+ // Get the next event to be published from the queue.
+ event = eventQueue.take();
+
+ } catch (InterruptedException e) {
+
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
+
+ // Try publishing the event to the event bus. This call will block
+ // until
+ try {
+ producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
+
+ } catch (Exception e) {
+
+ logger.error("Failed to publish event to event bus: " + e.getMessage());
+ }
+ }
+ }
+ }
+}