diff options
Diffstat (limited to 'champ-lib/champ-core')
8 files changed, 891 insertions, 403 deletions
diff --git a/champ-lib/champ-core/pom.xml b/champ-lib/champ-core/pom.xml index 7315c16..4ce3d5f 100644 --- a/champ-lib/champ-core/pom.xml +++ b/champ-lib/champ-core/pom.xml @@ -105,6 +105,20 @@ limitations under the License. </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.skyscreamer</groupId> + <artifactId>jsonassert</artifactId> + <version>1.5.0</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java index 07647d2..1f93a97 100644 --- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java +++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java @@ -32,11 +32,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; - import org.onap.aai.champcore.ChampCapabilities; import org.onap.aai.champcore.ChampGraph; import org.onap.aai.champcore.ChampTransaction; import org.onap.aai.champcore.event.ChampEvent.ChampOperation; +import org.onap.aai.champcore.event.envelope.ChampEventEnvelope; +import org.onap.aai.champcore.event.envelope.ChampEventHeader; import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException; import org.onap.aai.champcore.exceptions.ChampMarshallingException; import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException; @@ -52,11 +53,10 @@ import org.onap.aai.champcore.model.ChampRelationship; import org.onap.aai.champcore.model.ChampRelationshipConstraint; import org.onap.aai.champcore.model.ChampRelationshipIndex; import org.onap.aai.champcore.model.ChampSchema; +import org.onap.aai.event.api.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.onap.aai.event.api.EventPublisher; - /** @@ -66,10 +66,10 @@ import org.onap.aai.event.api.EventPublisher; public abstract class AbstractLoggingChampGraph implements ChampGraph { private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class); - + public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException; public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException; - public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException; + public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException; public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException; @Override public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException; @@ -80,48 +80,48 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException; public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException; - + /** * Creates or updates a vertex in the graph data store. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param object - The vertex to be created or updated. * @param transaction - Optional transaction context to perform the operation in. - * + * * @return - The vertex, as created, marshaled as a {@link ChampObject} - * - * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled + * + * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled * into the backend representation - * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed + * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed * by {@link ChampGraph#retrieveSchema} - * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} + * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} * is not present or object not found in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException; - + /** * Updates an existing vertex in the graph store. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param object - The vertex to be created or updated. * @param transaction - Optional transaction context to perform the operation in. - * + * * @return - The updated vertex, marshaled as a {@link ChampObject} - * - * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into + * + * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into * the backend representation - * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed + * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed * by {@link ChampGraph#retrieveSchema} - * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} + * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} * is not present or object not found in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ @@ -130,136 +130,136 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { /** * Deletes an existing vertex from the graph store. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey} * @param transaction - Optional transaction context to perform the operation in. - * - * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} + * + * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} * is not present or object not found in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException; - + /** * Creates or updates an edge in the graph data store. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param relationship - The ChampRelationship that you wish to store in the graph * @param transaction - Optional transaction context to perform the operation in. - * + * * @return - The {@link ChampRelationship} as it was stored. - * - * @throws ChampUnmarshallingException - If the edge which was created could not be + * + * @throws ChampUnmarshallingException - If the edge which was created could not be * unmarshalled into a ChampRelationship - * @throws ChampMarshallingException - If the {@code relationship} is not able to be + * @throws ChampMarshallingException - If the {@code relationship} is not able to be * marshalled into the backend representation - * @throws ChampObjectNotExistsException - If either the source or target object referenced + * @throws ChampObjectNotExistsException - If either the source or target object referenced * by this relationship does not exist in the graph - * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints + * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints * specifed by {@link ChampGraph#retrieveSchema} - * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() + * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() * but the object cannot be found in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ - public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; + public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; /** * Replaces an existing edge in the graph data store. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param relationship - The ChampRelationship that you wish to replace in the graph * @param transaction - Optional transaction context to perform the operation in. - * + * * @return - The {@link ChampRelationship} as it was stored. - * - * @throws ChampUnmarshallingException - If the edge which was created could not be + * + * @throws ChampUnmarshallingException - If the edge which was created could not be * unmarshalled into a ChampRelationship - * @throws ChampMarshallingException - If the {@code relationship} is not able to be + * @throws ChampMarshallingException - If the {@code relationship} is not able to be * marshalled into the backend representation - * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints + * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints * specifed by {@link ChampGraph#retrieveSchema} - * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() + * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() * but the object cannot be found in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ - public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; - + public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; + /** * Removes an edge from the graph data store. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param relationship - The ChampRelationship that you wish to remove from the graph. * @param transaction - Optional transaction context to perform the operation in. - * - * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() + * + * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() * but the object cannot be found in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException; - + /** * Create or update a {@link ChampPartition}. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param partition - The ChampPartition that you wish to create or update in the graph. * @param transaction - Optional transaction context to perform the operation in. - * + * * @return - The {@link ChampPartition} as it was stored. - * - * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints + * + * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints * specifed by {@link ChampGraph#retrieveSchema} - * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() + * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() * but the object cannot be found in the graph - * @throws ChampMarshallingException - If the {@code relationship} is not able to be + * @throws ChampMarshallingException - If the {@code relationship} is not able to be * marshalled into the backend representation - * @throws ChampObjectNotExistsException - If either the source or target object referenced + * @throws ChampObjectNotExistsException - If either the source or target object referenced * by this relationship does not exist in the graph * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException; - + /** * Removes a partition from the graph. * <p> - * If a transaction context is not provided, then a transaction will be automatically + * If a transaction context is not provided, then a transaction will be automatically * created and committed for this operation only, otherwise, the supplied transaction - * will be used and it will be up to the caller to commit the transaction at its + * will be used and it will be up to the caller to commit the transaction at its * discretion. - * + * * @param graph - The partition to be removed. * @param transaction - Optional transaction context to perform the operation in. - * + * * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed. */ public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException; - + /** * Create or update an object index in the graph. - * + * * @param index - The object index to be created/updated. */ public abstract void executeStoreObjectIndex(ChampObjectIndex index); - + public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName); public abstract Stream<ChampObjectIndex> retrieveObjectIndices(); public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException; @@ -274,56 +274,56 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { public abstract void deleteSchema(); public abstract ChampCapabilities capabilities(); - - + + public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity"; public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000; - + public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size"; public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5; - - public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; - - - + + public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; + + + /** Number of events that can be queued up for publication before we begin dropping * events. */ private Integer eventQueueCapacity; - + /** Number of event publisher worker threads. */ private Integer eventStreamPublisherPoolSize; - + /** Pool of worker threads that do the work of publishing the events to the event bus. */ protected ThreadPoolExecutor publisherPool; - + /** Client used for publishing events to the event bus. */ protected EventPublisher producer; - - /** Internal queue where outgoing events will be buffered until they can be serviced by + + /** Internal queue where outgoing events will be buffered until they can be serviced by * the event publisher worker threads. */ protected BlockingQueue<ChampEvent> eventQueue; - + /** * Thread factory for the event producer workers. */ private class ProducerWorkerThreadFactory implements ThreadFactory { - + private AtomicInteger threadNumber = new AtomicInteger(1); - + public Thread newThread(Runnable r) { return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement()); } } - - + + /** * Create a new instance of the AbstractLoggingChampGraph. - * + * * @param properties - Set of configuration properties for this graph instance. */ protected AbstractLoggingChampGraph(Map<String, Object> properties) { - + // Extract the necessary parameters from the configuration properties. configure(properties); @@ -334,104 +334,104 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { logger.error("NOTE!! Champ events will NOT be published to the event stream!"); return; } - + // Create the blocking queue that we will use to buffer events that we want // published to the event bus. eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity); - + // Create the executor pool that will do the work of publishing events to the event bus. - publisherPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, + publisherPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, new ProducerWorkerThreadFactory()); - + try { - + // Start up the producer worker threads. for(int i=0; i<eventStreamPublisherPoolSize; i++) { - publisherPool.submit(new EventPublisherWorker()); + publisherPool.submit(new EventPublisherWorker()); } - + } catch (Exception e) { - + logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'"); logger.error("NOTE!! Champ events may NOT be published to the event stream!"); return; } } - + /** * Process the configuration properties supplied for this graph instance. - * + * * @param properties - Configuration parameters. */ private void configure(Map<String, Object> properties) { - + producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER); - + eventQueueCapacity = (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY); - eventStreamPublisherPoolSize = + eventStreamPublisherPoolSize = (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE); } - - + + public void setProducer(EventPublisher aProducer) { - + producer = aProducer; } - + private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) { - + if(properties.containsKey(property)) { return properties.get(property); } else { return defaultValue; } } - + @Override public void shutdown() { - + if(publisherPool != null) { publisherPool.shutdown(); - + try { publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) {} } - + if(producer != null) { - + try { producer.close(); - + } catch (Exception e) { logger.error("Failed to stop event stream producer: " + e.getMessage()); } } } - + @Override public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException { - + try { - + // Commit the transaction. transaction.commit(); - + } catch (ChampTransactionException e) { - + logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure."); List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents(); for(ChampEvent event : enqueuedEvents) { - + logger.debug("Graph event " + event.toString() + " not published."); } throw e; } - + // Now that the transaction has been successfully committed, we need // to log the events that were produced within that transaction's // context. @@ -440,81 +440,81 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { logEvent(event); } } - + @Override public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException { - - // Rollback the transaction. + + // Rollback the transaction. transaction.rollback(); } - + @Override public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException { return storeObject(object, Optional.empty()); } - + @Override public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException { - + ChampObject storedObject = executeStoreObject(object, transaction); - + if(storedObject != null) { - + logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(storedObject) - .build(), + .build(), transaction); } - + return storedObject; } - + @Override public ChampObject replaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException { return replaceObject(object, Optional.empty()); } - + @Override public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException { - + ChampObject replacedObject = executeReplaceObject(object, transaction); - + if(replacedObject != null) { - + logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.REPLACE) .entity(replacedObject) - .build(), + .build(), transaction); } - + return replacedObject; } - + @Override public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException { deleteObject(key, Optional.empty()); } - + @Override public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException { - // Retrieve the object that we are deleting before it's gone, so that we can + // Retrieve the object that we are deleting before it's gone, so that we can // report it to the event stream. Optional<ChampObject> objectToDelete = Optional.empty(); try { objectToDelete = retrieveObject(key, transaction); - + } catch (ChampUnmarshallingException e) { logger.error("Unable to generate delete object log: " + e.getMessage()); } - + executeDeleteObject(key, transaction); - + if(objectToDelete.isPresent()) { // Update the event stream with the current operation. logOrEnqueueEvent(ChampEvent.builder() @@ -524,29 +524,29 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { transaction); } } - + @Override public ChampRelationship storeRelationship(ChampRelationship relationship) - throws ChampUnmarshallingException, - ChampMarshallingException, - ChampObjectNotExistsException, - ChampSchemaViolationException, - ChampRelationshipNotExistsException, ChampTransactionException { + throws ChampUnmarshallingException, + ChampMarshallingException, + ChampObjectNotExistsException, + ChampSchemaViolationException, + ChampRelationshipNotExistsException, ChampTransactionException { return storeRelationship(relationship, Optional.empty()); } - + @Override public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) - throws ChampUnmarshallingException, - ChampMarshallingException, - ChampObjectNotExistsException, - ChampSchemaViolationException, - ChampRelationshipNotExistsException, ChampTransactionException { + throws ChampUnmarshallingException, + ChampMarshallingException, + ChampObjectNotExistsException, + ChampSchemaViolationException, + ChampRelationshipNotExistsException, ChampTransactionException { ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction); - + if(storedRelationship != null) { - + // Update the event stream with the current operation. logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.STORE) @@ -554,30 +554,30 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { .build(), transaction); } - + return storedRelationship; } @Override public ChampRelationship replaceRelationship(ChampRelationship relationship) - throws ChampUnmarshallingException, - ChampMarshallingException, - ChampSchemaViolationException, - ChampRelationshipNotExistsException, ChampTransactionException { + throws ChampUnmarshallingException, + ChampMarshallingException, + ChampSchemaViolationException, + ChampRelationshipNotExistsException, ChampTransactionException { return replaceRelationship(relationship, Optional.empty()); } - + @Override public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) - throws ChampUnmarshallingException, - ChampMarshallingException, - ChampSchemaViolationException, - ChampRelationshipNotExistsException, ChampTransactionException { + throws ChampUnmarshallingException, + ChampMarshallingException, + ChampSchemaViolationException, + ChampRelationshipNotExistsException, ChampTransactionException { ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction); - + if(replacedRelationship != null) { - + // Update the event stream with the current operation. logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.REPLACE) @@ -585,20 +585,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { .build(), transaction); } - + return replacedRelationship; } - + @Override public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException { deleteRelationship(relationship, Optional.empty()); } - + @Override public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException { executeDeleteRelationship(relationship, transaction); - + // Update the event stream with the current operation. logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) @@ -606,19 +606,19 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { .build(), transaction); } - + @Override public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException { return storePartition(partition, Optional.empty()); } - + @Override public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException { ChampPartition storedPartition = executeStorePartition(partition, transaction); - + if(storedPartition != null) { - + // Update the event stream with the current operation. logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.STORE) @@ -626,20 +626,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { .build(), transaction); } - + return storedPartition; } - + @Override public void deletePartition(ChampPartition graph) throws ChampTransactionException{ deletePartition(graph, Optional.empty()); } - + @Override public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException { executeDeletePartition(graph, transaction); - + // Update the event stream with the current operation. logOrEnqueueEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) @@ -647,69 +647,69 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { .build(), transaction); } - + @Override public void storeObjectIndex(ChampObjectIndex index) { executeStoreObjectIndex(index); - + // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(index) .build()); } - - + + public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException { - - // Retrieve the index that we are deleting before it's gone, so that we can + + // Retrieve the index that we are deleting before it's gone, so that we can // report it to the event stream. Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName); - + executeDeleteObjectIndex(indexName); - + if(indexToDelete.isPresent()) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) - .entity(indexToDelete.get()) + .entity(indexToDelete.get()) .build()); } } - - + + public void storeRelationshipIndex(ChampRelationshipIndex index) { executeStoreRelationshipIndex(index); - + // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) - .entity(index) + .entity(index) .build()); } - - + + public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException { - // Retrieve the index that we are deleting before it's gone, so that we can + // Retrieve the index that we are deleting before it's gone, so that we can // report it to the event stream. Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName); - + executeDeleteRelationshipIndex(indexName); - + if(indexToDelete.isPresent()) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) - .entity(indexToDelete.get()) + .entity(indexToDelete.get()) .build()); } } - + private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) { - + if(!transaction.isPresent()) { // Update the event stream with the current operation. logEvent(event); @@ -720,34 +720,34 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { transaction.get().logEvent(event); } } - + /** * Submits an event to be published to the event stream. - * + * * @param anEvent - The event to be published. */ public void logEvent(ChampEvent anEvent) { - + if(eventQueue == null) { return; } - + logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus"); if(logger.isDebugEnabled()) { logger.debug("Event payload: " + anEvent.toString()); } - + // Try to submit the event to be published to the event bus. if(!eventQueue.offer(anEvent)) { logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded."); } } - - + + /** - * This class implements the worker threads for our thread pool which are responsible for + * This class implements the worker threads for our thread pool which are responsible for * pulling the next outgoing event from the internal buffer and forwarding them to the event - * bus client. + * bus client. * <p> * Each publish operation is performed synchronously, so that the thread will only move on * to the next available event once it has actually published the current event to the bus. @@ -757,32 +757,33 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { /** Partition key to use when publishing events to the event stream. We WANT all events * to go to a single partition, so we are just using a hard-coded key for every event. */ private static final String EVENTS_PARTITION_KEY = "champEventKey"; - - + + @Override public void run() { - - while(true) { - + + while(true) { ChampEvent event = null; try { - + // Get the next event to be published from the queue. event = eventQueue.take(); - + } catch (InterruptedException e) { - + // Restore the interrupted status. Thread.currentThread().interrupt(); } - - // Try publishing the event to the event bus. This call will block - // until + + // Create new envelope containing an event header and ChampEvent + ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event); + + // Try publishing the event to the event bus. This call will block until try { - producer.sendSync(EVENTS_PARTITION_KEY, event.toJson()); - + producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson()); + } catch (Exception e) { - + logger.error("Failed to publish event to event bus: " + e.getMessage()); } } diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java index 3fd57a3..e7feeda 100644 --- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java +++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java @@ -22,13 +22,11 @@ package org.onap.aai.champcore.event; import java.io.IOException; - import org.onap.aai.champcore.model.ChampObject; import org.onap.aai.champcore.model.ChampObjectIndex; import org.onap.aai.champcore.model.ChampPartition; import org.onap.aai.champcore.model.ChampRelationship; import org.onap.aai.champcore.model.ChampRelationshipIndex; - import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParseException; @@ -39,174 +37,177 @@ import com.fasterxml.jackson.databind.ObjectMapper; public class ChampEvent { - public enum ChampOperation { - STORE, - REPLACE, - DELETE - } - - private static ObjectMapper mapper = new ObjectMapper(); - - private ChampOperation operation; - private long timestamp; - private String transactionId = null; - private ChampObject vertex = null; - private ChampRelationship relationship = null; - private ChampPartition partition = null; - private ChampObjectIndex objectIndex = null; - private ChampRelationshipIndex relationshipIndex = null; - private String dbTransactionId = null; - - - public static Builder builder() { - return new Builder(); - } - - public ChampOperation getOperation() { - return operation; - } - - public void setOperation(ChampOperation operation) { - this.operation = operation; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - @JsonProperty("transaction-id") - public String getTransactionId() { - return transactionId; - } - - public void setTransactionId(String transactionId) { - this.transactionId = transactionId; - } - - public ChampObject getVertex() { - return vertex; - } - - public void setVertex(ChampObject vertex) { - this.vertex = vertex; - } - - public ChampRelationship getRelationship() { - return relationship; - } - - public void setRelationship(ChampRelationship relationship) { - this.relationship = relationship; - } - - public ChampPartition getPartition() { - return partition; - } - - public void setPartition(ChampPartition partition) { - this.partition = partition; - } - - public ChampObjectIndex getObjectIndex() { - return objectIndex; - } - - public void setObjectIndex(ChampObjectIndex index) { - this.objectIndex = index; - } - - public ChampRelationshipIndex getRelationshipIndex() { - return relationshipIndex; - } - - public void setRelationshipIndex(ChampRelationshipIndex relationshipIndex) { - this.relationshipIndex = relationshipIndex; - } - - @JsonProperty("database-transaction-id") - public String getDbTransactionId () { return dbTransactionId; } - - - public void setDbTransactionId ( String id ) { this.dbTransactionId = id; } - - - - public String toJson() { - - ObjectMapper mapper = new ObjectMapper(); - mapper.setSerializationInclusion(Include.NON_NULL); - - try { - return mapper.writeValueAsString(this); - } catch (JsonProcessingException e) { - return "Unmarshallable: " + e.getMessage(); - } - } - - public static ChampEvent fromJson(String json) throws JsonParseException, JsonMappingException, IOException { - - mapper.setSerializationInclusion(Include.NON_NULL); - return mapper.readValue(json, ChampEvent.class); - } - @Override - public String toString() { - - return toJson(); - } - - public static class Builder { - - ChampEvent event = null; - - - public Builder() { - event = new ChampEvent(); - } - - public Builder operation(ChampOperation operation) { - event.setOperation(operation); - return this; - } - - public Builder entity(ChampObject entity) { - event.setVertex(entity); - return this; - } - - public Builder entity(ChampRelationship relationship) { - event.relationship = relationship; - return this; - } - - public Builder entity(ChampPartition partition) { - event.partition = partition; - return this; - } - - public Builder entity(ChampObjectIndex index) { - event.objectIndex = index; - return this; - } - - public Builder entity(ChampRelationshipIndex relationshipIndex) { - event.relationshipIndex = relationshipIndex; - return this; - } - - - public ChampEvent build() { - - event.setTimestamp(System.currentTimeMillis()); - - // Set a unique transaction id on this event that can be used by downstream entities - // for log correlation. - event.setTransactionId(java.util.UUID.randomUUID().toString()); - - return event; - } - } + public enum ChampOperation { + STORE, REPLACE, DELETE + } + + private static ObjectMapper mapper = new ObjectMapper(); + + private ChampOperation operation; + private long timestamp; + private String transactionId = null; + private ChampObject vertex = null; + private ChampRelationship relationship = null; + private ChampPartition partition = null; + private ChampObjectIndex objectIndex = null; + private ChampRelationshipIndex relationshipIndex = null; + private String dbTransactionId = null; + + + public static Builder builder() { + return new Builder(); + } + + public ChampOperation getOperation() { + return operation; + } + + public void setOperation(ChampOperation operation) { + this.operation = operation; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @JsonProperty("transaction-id") + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public ChampObject getVertex() { + return vertex; + } + + public void setVertex(ChampObject vertex) { + this.vertex = vertex; + } + + public ChampRelationship getRelationship() { + return relationship; + } + + public void setRelationship(ChampRelationship relationship) { + this.relationship = relationship; + } + + public ChampPartition getPartition() { + return partition; + } + + public void setPartition(ChampPartition partition) { + this.partition = partition; + } + + public ChampObjectIndex getObjectIndex() { + return objectIndex; + } + + public void setObjectIndex(ChampObjectIndex index) { + this.objectIndex = index; + } + + public ChampRelationshipIndex getRelationshipIndex() { + return relationshipIndex; + } + + public void setRelationshipIndex(ChampRelationshipIndex relationshipIndex) { + this.relationshipIndex = relationshipIndex; + } + + @JsonProperty("database-transaction-id") + public String getDbTransactionId() { + return dbTransactionId; + } + + + public void setDbTransactionId(String id) { + this.dbTransactionId = id; + } + + + + public String toJson() { + + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(Include.NON_NULL); + + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + return "Unmarshallable: " + e.getMessage(); + } + } + + public static ChampEvent fromJson(String json) throws JsonParseException, JsonMappingException, IOException { + + mapper.setSerializationInclusion(Include.NON_NULL); + return mapper.readValue(json, ChampEvent.class); + } + + @Override + public String toString() { + + return toJson(); + } + + public static class Builder { + + ChampEvent event = null; + + + public Builder() { + event = new ChampEvent(); + } + + public Builder operation(ChampOperation operation) { + event.setOperation(operation); + return this; + } + + public Builder entity(ChampObject entity) { + event.setVertex(entity); + return this; + } + + public Builder entity(ChampRelationship relationship) { + event.relationship = relationship; + return this; + } + + public Builder entity(ChampPartition partition) { + event.partition = partition; + return this; + } + + public Builder entity(ChampObjectIndex index) { + event.objectIndex = index; + return this; + } + + public Builder entity(ChampRelationshipIndex relationshipIndex) { + event.relationshipIndex = relationshipIndex; + return this; + } + + + public ChampEvent build() { + + event.setTimestamp(System.currentTimeMillis()); + + // Set a unique transaction id on this event that can be used by downstream entities + // for log correlation. + event.setTransactionId(java.util.UUID.randomUUID().toString()); + + return event; + } + } } diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java new file mode 100644 index 0000000..2547ae5 --- /dev/null +++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java @@ -0,0 +1,97 @@ +/** + * ============LICENSE_START========================================== + * org.onap.aai + * =================================================================== + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 Amdocs + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.champcore.event.envelope; + +import org.onap.aai.champcore.event.ChampEvent; +import org.onap.aai.champcore.exceptions.ChampUnmarshallingException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class ChampEventEnvelope { + + private ChampEventHeader header; + private ChampEvent body; + + /** + * Marshaller/unmarshaller for converting to/from JSON. + */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public ChampEventEnvelope(ChampEvent event) { + this.header = new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_NOTIFICATION) + .requestId(event.getTransactionId()).build(); + this.body = event; + } + + public ChampEventEnvelope(ChampEventHeader header, ChampEvent body) { + this.header = header; + this.body = body; + } + + public ChampEventHeader getHeader() { + return header; + } + + public void setHeader(ChampEventHeader header) { + this.header = header; + } + + public ChampEvent getBody() { + return body; + } + + public void setBody(ChampEvent body) { + this.body = body; + } + + /** + * Serializes this Vertex object into a JSON string. + * + * @return - A JSON format string representation of this Vertex. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Deserializes the provided JSON string into a Event Envelope object. + * + * @param json the JSON string to produce the Event Envelope from. + * @return an Event Envelope object. + * @throws ChampUnmarshallingException + */ + public static ChampEventEnvelope fromJson(String json) throws ChampUnmarshallingException { + try { + if (json == null || json.isEmpty()) { + throw new ChampUnmarshallingException("Empty or null JSON string."); + } + return gson.fromJson(json, ChampEventEnvelope.class); + } catch (Exception ex) { + throw new ChampUnmarshallingException("Unable to parse JSON string: "); + } + } + + @Override + public String toString() { + return toJson(); + } +}
\ No newline at end of file diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java new file mode 100644 index 0000000..a9dbdaf --- /dev/null +++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java @@ -0,0 +1,243 @@ +/** + * ============LICENSE_START========================================== + * org.onap.aai + * =================================================================== + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 Amdocs + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.champcore.event.envelope; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Objects; +import java.util.UUID; +import org.apache.commons.lang3.builder.EqualsBuilder; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; + +public class ChampEventHeader { + + private static final String SOURCE_NAME = "CHAMP"; + + public enum EventType { + UPDATE_RESULT("update-result"), + UPDATE_NOTIFICATION("update-notification-raw"); + + private final String name; + + EventType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + @SerializedName("request-id") + private String requestId; + + private String timestamp; + + @SerializedName("source-name") + private String sourceName; + + @SerializedName("event-type") + private String eventType; + + @SerializedName("validation-entity-type") + private String validationEntityType; + + @SerializedName("validation-top-entity-type") + private String validationTopEntityType; + + @SerializedName("entity-link") + private String entityLink; + + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public static class Builder { + + private final EventType eventType; + + private String requestId; + private String validationEntityType; + private String validationTopEntityType; + private String entityLink; + + public Builder(EventType eventType) { + this.eventType = eventType; + } + + public Builder requestId(String val) { + requestId = val; + return this; + } + + public Builder validationEntityType(String val) { + validationEntityType = val; + return this; + } + + public Builder validationTopEntityType(String val) { + validationTopEntityType = val; + return this; + } + + public Builder entityLink(String val) { + entityLink = val; + return this; + } + + public ChampEventHeader build() { + return new ChampEventHeader(this); + } + } + + private ChampEventHeader(Builder builder) { + requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString(); + timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now()); + sourceName = SOURCE_NAME; + eventType = builder.eventType.getName(); + + validationEntityType = builder.validationEntityType; + validationTopEntityType = builder.validationTopEntityType; + entityLink = builder.entityLink; + } + + /** + * Serializes this object into a JSON string representation. + * + * @return a JSON format string representation of this object. + */ + public String toJson() { + return gson.toJson(this); + } + + /////////////////////////////////////////////////////////////////////////// + // GETTERS AND SETTERS + /////////////////////////////////////////////////////////////////////////// + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getValidationEntityType() { + return validationEntityType; + } + + public void setValidationEntityType(String validationEntityType) { + this.validationEntityType = validationEntityType; + } + + public String getValidationTopEntityType() { + return validationTopEntityType; + } + + public void setValidationTopEntityType(String validationTopEntityType) { + this.validationTopEntityType = validationTopEntityType; + } + + public String getEntityLink() { + return entityLink; + } + + public void setEntityLink(String entityLink) { + this.entityLink = entityLink; + } + + /////////////////////////////////////////////////////////////////////////// + // OVERRIDES + /////////////////////////////////////////////////////////////////////////// + + /* + * (non-Javadoc) + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType, + this.validationTopEntityType, this.entityLink); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ChampEventHeader)) { + return false; + } else if (obj == this) { + return true; + } + ChampEventHeader rhs = (ChampEventHeader) obj; + // @formatter:off + return new EqualsBuilder() + .append(requestId, rhs.requestId) + .append(timestamp, rhs.timestamp) + .append(sourceName, rhs.sourceName) + .append(eventType, rhs.eventType) + .append(validationEntityType, rhs.validationEntityType) + .append(validationTopEntityType, rhs.validationTopEntityType) + .append(entityLink, rhs.entityLink) + .isEquals(); + // @formatter:on + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.toJson(); + } +} diff --git a/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java new file mode 100644 index 0000000..6b7bd02 --- /dev/null +++ b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java @@ -0,0 +1,60 @@ +/** + * ============LICENSE_START========================================== + * org.onap.aai + * =================================================================== + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 Amdocs + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.champcore.event.envelope; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Test; +import org.onap.aai.champcore.event.ChampEvent; +import org.onap.aai.champcore.model.ChampObject; +import org.onap.aai.champcore.util.TestUtil; +import org.skyscreamer.jsonassert.Customization; +import org.skyscreamer.jsonassert.JSONAssert; +import org.skyscreamer.jsonassert.JSONCompareMode; +import org.skyscreamer.jsonassert.comparator.CustomComparator; + +public class ChampEventEnvelopeTest { + + @Test + public void testEventEnvelopeFormat() throws Exception { + String expectedEnvelope = TestUtil.getFileAsString("event/event-envelope.json"); + + ChampEvent body = ChampEvent.builder().entity(new ChampObject.Builder("pserver").build()).build(); + + String envelope = new ChampEventEnvelope(body).toJson(); + + JSONAssert.assertEquals(expectedEnvelope, envelope, + new CustomComparator(JSONCompareMode.STRICT, new Customization("header.request-id", (o1, o2) -> true), + new Customization("header.timestamp", (o1, o2) -> true), + new Customization("body.timestamp", (o1, o2) -> true), + new Customization("body.transactionId", (o1, o2) -> true))); + } + + @Test + public void testRequestIdIsTransactionId() throws Exception { + ChampEvent body = ChampEvent.builder().entity(new ChampObject.Builder("pserver").build()).build(); + + ChampEventEnvelope envelope = new ChampEventEnvelope(body); + + assertThat(envelope.getHeader().getRequestId(), is(envelope.getBody().getTransactionId())); + } +} diff --git a/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java new file mode 100644 index 0000000..2aaddad --- /dev/null +++ b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java @@ -0,0 +1,55 @@ +/** + * ============LICENSE_START========================================== + * org.onap.aai + * =================================================================== + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 Amdocs + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.champcore.util; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class TestUtil { + + public static Path getPath(String resourceFilename) throws URISyntaxException { + URL resource = ClassLoader.getSystemResource(resourceFilename); + if (resource != null) { + return Paths.get(resource.toURI()); + } + + // If the resource is not found relative to the classpath, try to get it from the file system directly. + File file = new File(resourceFilename); + if (!file.exists()) { + throw new RuntimeException("Resource does not exist: " + resourceFilename); + } + return file.toPath(); + } + + public static String getContentUtf8(Path filePath) throws IOException { + return new String(Files.readAllBytes(filePath)); + } + + public static String getFileAsString(String resourceFilename) throws IOException, URISyntaxException { + return getContentUtf8(getPath(resourceFilename)); + } +}
\ No newline at end of file diff --git a/champ-lib/champ-core/src/test/resources/event/event-envelope.json b/champ-lib/champ-core/src/test/resources/event/event-envelope.json new file mode 100644 index 0000000..0389c4a --- /dev/null +++ b/champ-lib/champ-core/src/test/resources/event/event-envelope.json @@ -0,0 +1,17 @@ +{ + "header": { + "request-id": "bffbbf77-e6fc-4c5a-af02-2dbe1bef003f", + "timestamp": "20180320T164558Z", + "source-name": "CHAMP", + "event-type": "update-notification-raw" + }, + "body": { + "timestamp": 1521564357772, + "transactionId": "bffbbf77-e6fc-4c5a-af02-2dbe1bef003f", + "vertex": { + "type": "pserver", + "key": {}, + "properties": {} + } + } +}
\ No newline at end of file |