summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Arrastia <MArrasti@amdocs.com>2018-06-05 13:19:45 +0100
committerMichael Arrastia <MArrasti@amdocs.com>2018-06-05 13:19:45 +0100
commita96fd576a4cb3cd25fdc2eb0c4f2c1c98f6d50cf (patch)
tree296becbaa361d15f2a0ce73510f67353adad3bf1
parentd448077492f159e5877b73c0e732aecc5b4a7695 (diff)
Update payload format for update notification
The update notification event is now aligned with the format of the response event. Both response and update notification events issued by Champ now include top level header and body properties. Change-Id: I00f9971cdbab4944def66c25f5939d5cc4de71bd Issue-ID: AAI-1195 Signed-off-by: Michael Arrastia <MArrasti@amdocs.com>
-rw-r--r--champ-lib/champ-core/pom.xml14
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java463
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java345
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java97
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java (renamed from champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java)43
-rw-r--r--champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java60
-rw-r--r--champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java55
-rw-r--r--champ-lib/champ-core/src/test/resources/event/event-envelope.json17
-rw-r--r--champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java6
-rw-r--r--champ-service/src/main/java/org/onap/champ/event/GraphEvent.java51
-rw-r--r--champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java12
-rw-r--r--champ-service/src/test/resources/event/event-envelope.json2
12 files changed, 688 insertions, 477 deletions
diff --git a/champ-lib/champ-core/pom.xml b/champ-lib/champ-core/pom.xml
index 7315c16..4ce3d5f 100644
--- a/champ-lib/champ-core/pom.xml
+++ b/champ-lib/champ-core/pom.xml
@@ -105,6 +105,20 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.skyscreamer</groupId>
+ <artifactId>jsonassert</artifactId>
+ <version>1.5.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
index 07647d2..1f93a97 100644
--- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
@@ -32,11 +32,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
-
import org.onap.aai.champcore.ChampCapabilities;
import org.onap.aai.champcore.ChampGraph;
import org.onap.aai.champcore.ChampTransaction;
import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
+import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
import org.onap.aai.champcore.exceptions.ChampMarshallingException;
import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
@@ -52,11 +53,10 @@ import org.onap.aai.champcore.model.ChampRelationship;
import org.onap.aai.champcore.model.ChampRelationshipConstraint;
import org.onap.aai.champcore.model.ChampRelationshipIndex;
import org.onap.aai.champcore.model.ChampSchema;
+import org.onap.aai.event.api.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.onap.aai.event.api.EventPublisher;
-
/**
@@ -66,10 +66,10 @@ import org.onap.aai.event.api.EventPublisher;
public abstract class AbstractLoggingChampGraph implements ChampGraph {
private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
-
+
public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
- public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
+ public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
@Override
public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
@@ -80,48 +80,48 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
-
+
/**
* Creates or updates a vertex in the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param object - The vertex to be created or updated.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The vertex, as created, marshaled as a {@link ChampObject}
- *
- * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
+ *
+ * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
* into the backend representation
- * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
+ * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
* by {@link ChampGraph#retrieveSchema}
- * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
* is not present or object not found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
-
+
/**
* Updates an existing vertex in the graph store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param object - The vertex to be created or updated.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The updated vertex, marshaled as a {@link ChampObject}
- *
- * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
+ *
+ * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
* the backend representation
- * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
+ * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
* by {@link ChampGraph#retrieveSchema}
- * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
* is not present or object not found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
@@ -130,136 +130,136 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
/**
* Deletes an existing vertex from the graph store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
* @param transaction - Optional transaction context to perform the operation in.
- *
- * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ *
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
* is not present or object not found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
-
+
/**
* Creates or updates an edge in the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param relationship - The ChampRelationship that you wish to store in the graph
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The {@link ChampRelationship} as it was stored.
- *
- * @throws ChampUnmarshallingException - If the edge which was created could not be
+ *
+ * @throws ChampUnmarshallingException - If the edge which was created could not be
* unmarshalled into a ChampRelationship
- * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
* marshalled into the backend representation
- * @throws ChampObjectNotExistsException - If either the source or target object referenced
+ * @throws ChampObjectNotExistsException - If either the source or target object referenced
* by this relationship does not exist in the graph
- * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
* specifed by {@link ChampGraph#retrieveSchema}
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
- public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+ public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
/**
* Replaces an existing edge in the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param relationship - The ChampRelationship that you wish to replace in the graph
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The {@link ChampRelationship} as it was stored.
- *
- * @throws ChampUnmarshallingException - If the edge which was created could not be
+ *
+ * @throws ChampUnmarshallingException - If the edge which was created could not be
* unmarshalled into a ChampRelationship
- * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
* marshalled into the backend representation
- * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
* specifed by {@link ChampGraph#retrieveSchema}
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
- public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
-
+ public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+
/**
* Removes an edge from the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param relationship - The ChampRelationship that you wish to remove from the graph.
* @param transaction - Optional transaction context to perform the operation in.
- *
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ *
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
-
+
/**
* Create or update a {@link ChampPartition}.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param partition - The ChampPartition that you wish to create or update in the graph.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The {@link ChampPartition} as it was stored.
- *
- * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ *
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
* specifed by {@link ChampGraph#retrieveSchema}
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
- * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
* marshalled into the backend representation
- * @throws ChampObjectNotExistsException - If either the source or target object referenced
+ * @throws ChampObjectNotExistsException - If either the source or target object referenced
* by this relationship does not exist in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
-
+
/**
* Removes a partition from the graph.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param graph - The partition to be removed.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
-
+
/**
* Create or update an object index in the graph.
- *
+ *
* @param index - The object index to be created/updated.
*/
public abstract void executeStoreObjectIndex(ChampObjectIndex index);
-
+
public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
@@ -274,56 +274,56 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
public abstract void deleteSchema();
public abstract ChampCapabilities capabilities();
-
-
+
+
public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
-
+
public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
-
- public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
-
-
-
+
+ public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+
+
+
/** Number of events that can be queued up for publication before we begin dropping
* events. */
private Integer eventQueueCapacity;
-
+
/** Number of event publisher worker threads. */
private Integer eventStreamPublisherPoolSize;
-
+
/** Pool of worker threads that do the work of publishing the events to the event bus. */
protected ThreadPoolExecutor publisherPool;
-
+
/** Client used for publishing events to the event bus. */
protected EventPublisher producer;
-
- /** Internal queue where outgoing events will be buffered until they can be serviced by
+
+ /** Internal queue where outgoing events will be buffered until they can be serviced by
* the event publisher worker threads. */
protected BlockingQueue<ChampEvent> eventQueue;
-
+
/**
* Thread factory for the event producer workers.
*/
private class ProducerWorkerThreadFactory implements ThreadFactory {
-
+
private AtomicInteger threadNumber = new AtomicInteger(1);
-
+
public Thread newThread(Runnable r) {
return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
}
}
-
-
+
+
/**
* Create a new instance of the AbstractLoggingChampGraph.
- *
+ *
* @param properties - Set of configuration properties for this graph instance.
*/
protected AbstractLoggingChampGraph(Map<String, Object> properties) {
-
+
// Extract the necessary parameters from the configuration properties.
configure(properties);
@@ -334,104 +334,104 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
logger.error("NOTE!! Champ events will NOT be published to the event stream!");
return;
}
-
+
// Create the blocking queue that we will use to buffer events that we want
// published to the event bus.
eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
-
+
// Create the executor pool that will do the work of publishing events to the event bus.
- publisherPool =
- (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
+ publisherPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
new ProducerWorkerThreadFactory());
-
+
try {
-
+
// Start up the producer worker threads.
for(int i=0; i<eventStreamPublisherPoolSize; i++) {
- publisherPool.submit(new EventPublisherWorker());
+ publisherPool.submit(new EventPublisherWorker());
}
-
+
} catch (Exception e) {
-
+
logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
logger.error("NOTE!! Champ events may NOT be published to the event stream!");
return;
}
}
-
+
/**
* Process the configuration properties supplied for this graph instance.
- *
+ *
* @param properties - Configuration parameters.
*/
private void configure(Map<String, Object> properties) {
-
+
producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
-
+
eventQueueCapacity =
(Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
- eventStreamPublisherPoolSize =
+ eventStreamPublisherPoolSize =
(Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
}
-
-
+
+
public void setProducer(EventPublisher aProducer) {
-
+
producer = aProducer;
}
-
+
private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
-
+
if(properties.containsKey(property)) {
return properties.get(property);
} else {
return defaultValue;
}
}
-
+
@Override
public void shutdown() {
-
+
if(publisherPool != null) {
publisherPool.shutdown();
-
+
try {
publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {}
}
-
+
if(producer != null) {
-
+
try {
producer.close();
-
+
} catch (Exception e) {
logger.error("Failed to stop event stream producer: " + e.getMessage());
}
}
}
-
+
@Override
public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
-
+
try {
-
+
// Commit the transaction.
transaction.commit();
-
+
} catch (ChampTransactionException e) {
-
+
logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
for(ChampEvent event : enqueuedEvents) {
-
+
logger.debug("Graph event " + event.toString() + " not published.");
}
throw e;
}
-
+
// Now that the transaction has been successfully committed, we need
// to log the events that were produced within that transaction's
// context.
@@ -440,81 +440,81 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
logEvent(event);
}
}
-
+
@Override
public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
-
- // Rollback the transaction.
+
+ // Rollback the transaction.
transaction.rollback();
}
-
+
@Override
public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
return storeObject(object, Optional.empty());
}
-
+
@Override
public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
-
+
ChampObject storedObject = executeStoreObject(object, transaction);
-
+
if(storedObject != null) {
-
+
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
.entity(storedObject)
- .build(),
+ .build(),
transaction);
}
-
+
return storedObject;
}
-
+
@Override
public ChampObject replaceObject(ChampObject object)
throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
return replaceObject(object, Optional.empty());
}
-
+
@Override
public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
-
+
ChampObject replacedObject = executeReplaceObject(object, transaction);
-
+
if(replacedObject != null) {
-
+
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.REPLACE)
.entity(replacedObject)
- .build(),
+ .build(),
transaction);
}
-
+
return replacedObject;
}
-
+
@Override
public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
deleteObject(key, Optional.empty());
}
-
+
@Override
public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
- // Retrieve the object that we are deleting before it's gone, so that we can
+ // Retrieve the object that we are deleting before it's gone, so that we can
// report it to the event stream.
Optional<ChampObject> objectToDelete = Optional.empty();
try {
objectToDelete = retrieveObject(key, transaction);
-
+
} catch (ChampUnmarshallingException e) {
logger.error("Unable to generate delete object log: " + e.getMessage());
}
-
+
executeDeleteObject(key, transaction);
-
+
if(objectToDelete.isPresent()) {
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
@@ -524,29 +524,29 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
transaction);
}
}
-
+
@Override
public ChampRelationship storeRelationship(ChampRelationship relationship)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampObjectNotExistsException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampObjectNotExistsException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
return storeRelationship(relationship, Optional.empty());
}
-
+
@Override
public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampObjectNotExistsException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampObjectNotExistsException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
-
+
if(storedRelationship != null) {
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
@@ -554,30 +554,30 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
return storedRelationship;
}
@Override
public ChampRelationship replaceRelationship(ChampRelationship relationship)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
return replaceRelationship(relationship, Optional.empty());
}
-
+
@Override
public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
-
+
if(replacedRelationship != null) {
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.REPLACE)
@@ -585,20 +585,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
return replacedRelationship;
}
-
+
@Override
public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
deleteRelationship(relationship, Optional.empty());
}
-
+
@Override
public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
executeDeleteRelationship(relationship, transaction);
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
@@ -606,19 +606,19 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
@Override
public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
return storePartition(partition, Optional.empty());
}
-
+
@Override
public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
ChampPartition storedPartition = executeStorePartition(partition, transaction);
-
+
if(storedPartition != null) {
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
@@ -626,20 +626,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
return storedPartition;
}
-
+
@Override
public void deletePartition(ChampPartition graph) throws ChampTransactionException{
deletePartition(graph, Optional.empty());
}
-
+
@Override
public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
executeDeletePartition(graph, transaction);
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
@@ -647,69 +647,69 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
@Override
public void storeObjectIndex(ChampObjectIndex index) {
executeStoreObjectIndex(index);
-
+
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
.entity(index)
.build());
}
-
-
+
+
public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
-
- // Retrieve the index that we are deleting before it's gone, so that we can
+
+ // Retrieve the index that we are deleting before it's gone, so that we can
// report it to the event stream.
Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
-
+
executeDeleteObjectIndex(indexName);
-
+
if(indexToDelete.isPresent()) {
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
- .entity(indexToDelete.get())
+ .entity(indexToDelete.get())
.build());
}
}
-
-
+
+
public void storeRelationshipIndex(ChampRelationshipIndex index) {
executeStoreRelationshipIndex(index);
-
+
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
- .entity(index)
+ .entity(index)
.build());
}
-
-
+
+
public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
- // Retrieve the index that we are deleting before it's gone, so that we can
+ // Retrieve the index that we are deleting before it's gone, so that we can
// report it to the event stream.
Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
-
+
executeDeleteRelationshipIndex(indexName);
-
+
if(indexToDelete.isPresent()) {
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
- .entity(indexToDelete.get())
+ .entity(indexToDelete.get())
.build());
}
}
-
+
private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
-
+
if(!transaction.isPresent()) {
// Update the event stream with the current operation.
logEvent(event);
@@ -720,34 +720,34 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
transaction.get().logEvent(event);
}
}
-
+
/**
* Submits an event to be published to the event stream.
- *
+ *
* @param anEvent - The event to be published.
*/
public void logEvent(ChampEvent anEvent) {
-
+
if(eventQueue == null) {
return;
}
-
+
logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
if(logger.isDebugEnabled()) {
logger.debug("Event payload: " + anEvent.toString());
}
-
+
// Try to submit the event to be published to the event bus.
if(!eventQueue.offer(anEvent)) {
logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
}
}
-
-
+
+
/**
- * This class implements the worker threads for our thread pool which are responsible for
+ * This class implements the worker threads for our thread pool which are responsible for
* pulling the next outgoing event from the internal buffer and forwarding them to the event
- * bus client.
+ * bus client.
* <p>
* Each publish operation is performed synchronously, so that the thread will only move on
* to the next available event once it has actually published the current event to the bus.
@@ -757,32 +757,33 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
/** Partition key to use when publishing events to the event stream. We WANT all events
* to go to a single partition, so we are just using a hard-coded key for every event. */
private static final String EVENTS_PARTITION_KEY = "champEventKey";
-
-
+
+
@Override
public void run() {
-
- while(true) {
-
+
+ while(true) {
ChampEvent event = null;
try {
-
+
// Get the next event to be published from the queue.
event = eventQueue.take();
-
+
} catch (InterruptedException e) {
-
+
// Restore the interrupted status.
Thread.currentThread().interrupt();
}
-
- // Try publishing the event to the event bus. This call will block
- // until
+
+ // Create new envelope containing an event header and ChampEvent
+ ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
+
+ // Try publishing the event to the event bus. This call will block until
try {
- producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
-
+ producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
+
} catch (Exception e) {
-
+
logger.error("Failed to publish event to event bus: " + e.getMessage());
}
}
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java
index 3fd57a3..e7feeda 100644
--- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java
@@ -22,13 +22,11 @@ package org.onap.aai.champcore.event;
import java.io.IOException;
-
import org.onap.aai.champcore.model.ChampObject;
import org.onap.aai.champcore.model.ChampObjectIndex;
import org.onap.aai.champcore.model.ChampPartition;
import org.onap.aai.champcore.model.ChampRelationship;
import org.onap.aai.champcore.model.ChampRelationshipIndex;
-
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;
@@ -39,174 +37,177 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class ChampEvent {
- public enum ChampOperation {
- STORE,
- REPLACE,
- DELETE
- }
-
- private static ObjectMapper mapper = new ObjectMapper();
-
- private ChampOperation operation;
- private long timestamp;
- private String transactionId = null;
- private ChampObject vertex = null;
- private ChampRelationship relationship = null;
- private ChampPartition partition = null;
- private ChampObjectIndex objectIndex = null;
- private ChampRelationshipIndex relationshipIndex = null;
- private String dbTransactionId = null;
-
-
- public static Builder builder() {
- return new Builder();
- }
-
- public ChampOperation getOperation() {
- return operation;
- }
-
- public void setOperation(ChampOperation operation) {
- this.operation = operation;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @JsonProperty("transaction-id")
- public String getTransactionId() {
- return transactionId;
- }
-
- public void setTransactionId(String transactionId) {
- this.transactionId = transactionId;
- }
-
- public ChampObject getVertex() {
- return vertex;
- }
-
- public void setVertex(ChampObject vertex) {
- this.vertex = vertex;
- }
-
- public ChampRelationship getRelationship() {
- return relationship;
- }
-
- public void setRelationship(ChampRelationship relationship) {
- this.relationship = relationship;
- }
-
- public ChampPartition getPartition() {
- return partition;
- }
-
- public void setPartition(ChampPartition partition) {
- this.partition = partition;
- }
-
- public ChampObjectIndex getObjectIndex() {
- return objectIndex;
- }
-
- public void setObjectIndex(ChampObjectIndex index) {
- this.objectIndex = index;
- }
-
- public ChampRelationshipIndex getRelationshipIndex() {
- return relationshipIndex;
- }
-
- public void setRelationshipIndex(ChampRelationshipIndex relationshipIndex) {
- this.relationshipIndex = relationshipIndex;
- }
-
- @JsonProperty("database-transaction-id")
- public String getDbTransactionId () { return dbTransactionId; }
-
-
- public void setDbTransactionId ( String id ) { this.dbTransactionId = id; }
-
-
-
- public String toJson() {
-
- ObjectMapper mapper = new ObjectMapper();
- mapper.setSerializationInclusion(Include.NON_NULL);
-
- try {
- return mapper.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- return "Unmarshallable: " + e.getMessage();
- }
- }
-
- public static ChampEvent fromJson(String json) throws JsonParseException, JsonMappingException, IOException {
-
- mapper.setSerializationInclusion(Include.NON_NULL);
- return mapper.readValue(json, ChampEvent.class);
- }
- @Override
- public String toString() {
-
- return toJson();
- }
-
- public static class Builder {
-
- ChampEvent event = null;
-
-
- public Builder() {
- event = new ChampEvent();
- }
-
- public Builder operation(ChampOperation operation) {
- event.setOperation(operation);
- return this;
- }
-
- public Builder entity(ChampObject entity) {
- event.setVertex(entity);
- return this;
- }
-
- public Builder entity(ChampRelationship relationship) {
- event.relationship = relationship;
- return this;
- }
-
- public Builder entity(ChampPartition partition) {
- event.partition = partition;
- return this;
- }
-
- public Builder entity(ChampObjectIndex index) {
- event.objectIndex = index;
- return this;
- }
-
- public Builder entity(ChampRelationshipIndex relationshipIndex) {
- event.relationshipIndex = relationshipIndex;
- return this;
- }
-
-
- public ChampEvent build() {
-
- event.setTimestamp(System.currentTimeMillis());
-
- // Set a unique transaction id on this event that can be used by downstream entities
- // for log correlation.
- event.setTransactionId(java.util.UUID.randomUUID().toString());
-
- return event;
- }
- }
+ public enum ChampOperation {
+ STORE, REPLACE, DELETE
+ }
+
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ private ChampOperation operation;
+ private long timestamp;
+ private String transactionId = null;
+ private ChampObject vertex = null;
+ private ChampRelationship relationship = null;
+ private ChampPartition partition = null;
+ private ChampObjectIndex objectIndex = null;
+ private ChampRelationshipIndex relationshipIndex = null;
+ private String dbTransactionId = null;
+
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public ChampOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(ChampOperation operation) {
+ this.operation = operation;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @JsonProperty("transaction-id")
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public ChampObject getVertex() {
+ return vertex;
+ }
+
+ public void setVertex(ChampObject vertex) {
+ this.vertex = vertex;
+ }
+
+ public ChampRelationship getRelationship() {
+ return relationship;
+ }
+
+ public void setRelationship(ChampRelationship relationship) {
+ this.relationship = relationship;
+ }
+
+ public ChampPartition getPartition() {
+ return partition;
+ }
+
+ public void setPartition(ChampPartition partition) {
+ this.partition = partition;
+ }
+
+ public ChampObjectIndex getObjectIndex() {
+ return objectIndex;
+ }
+
+ public void setObjectIndex(ChampObjectIndex index) {
+ this.objectIndex = index;
+ }
+
+ public ChampRelationshipIndex getRelationshipIndex() {
+ return relationshipIndex;
+ }
+
+ public void setRelationshipIndex(ChampRelationshipIndex relationshipIndex) {
+ this.relationshipIndex = relationshipIndex;
+ }
+
+ @JsonProperty("database-transaction-id")
+ public String getDbTransactionId() {
+ return dbTransactionId;
+ }
+
+
+ public void setDbTransactionId(String id) {
+ this.dbTransactionId = id;
+ }
+
+
+
+ public String toJson() {
+
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.setSerializationInclusion(Include.NON_NULL);
+
+ try {
+ return mapper.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ return "Unmarshallable: " + e.getMessage();
+ }
+ }
+
+ public static ChampEvent fromJson(String json) throws JsonParseException, JsonMappingException, IOException {
+
+ mapper.setSerializationInclusion(Include.NON_NULL);
+ return mapper.readValue(json, ChampEvent.class);
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+
+ public static class Builder {
+
+ ChampEvent event = null;
+
+
+ public Builder() {
+ event = new ChampEvent();
+ }
+
+ public Builder operation(ChampOperation operation) {
+ event.setOperation(operation);
+ return this;
+ }
+
+ public Builder entity(ChampObject entity) {
+ event.setVertex(entity);
+ return this;
+ }
+
+ public Builder entity(ChampRelationship relationship) {
+ event.relationship = relationship;
+ return this;
+ }
+
+ public Builder entity(ChampPartition partition) {
+ event.partition = partition;
+ return this;
+ }
+
+ public Builder entity(ChampObjectIndex index) {
+ event.objectIndex = index;
+ return this;
+ }
+
+ public Builder entity(ChampRelationshipIndex relationshipIndex) {
+ event.relationshipIndex = relationshipIndex;
+ return this;
+ }
+
+
+ public ChampEvent build() {
+
+ event.setTimestamp(System.currentTimeMillis());
+
+ // Set a unique transaction id on this event that can be used by downstream entities
+ // for log correlation.
+ event.setTransactionId(java.util.UUID.randomUUID().toString());
+
+ return event;
+ }
+ }
}
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java
new file mode 100644
index 0000000..2547ae5
--- /dev/null
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java
@@ -0,0 +1,97 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.event.envelope;
+
+import org.onap.aai.champcore.event.ChampEvent;
+import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class ChampEventEnvelope {
+
+ private ChampEventHeader header;
+ private ChampEvent body;
+
+ /**
+ * Marshaller/unmarshaller for converting to/from JSON.
+ */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public ChampEventEnvelope(ChampEvent event) {
+ this.header = new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_NOTIFICATION)
+ .requestId(event.getTransactionId()).build();
+ this.body = event;
+ }
+
+ public ChampEventEnvelope(ChampEventHeader header, ChampEvent body) {
+ this.header = header;
+ this.body = body;
+ }
+
+ public ChampEventHeader getHeader() {
+ return header;
+ }
+
+ public void setHeader(ChampEventHeader header) {
+ this.header = header;
+ }
+
+ public ChampEvent getBody() {
+ return body;
+ }
+
+ public void setBody(ChampEvent body) {
+ this.body = body;
+ }
+
+ /**
+ * Serializes this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Deserializes the provided JSON string into a Event Envelope object.
+ *
+ * @param json the JSON string to produce the Event Envelope from.
+ * @return an Event Envelope object.
+ * @throws ChampUnmarshallingException
+ */
+ public static ChampEventEnvelope fromJson(String json) throws ChampUnmarshallingException {
+ try {
+ if (json == null || json.isEmpty()) {
+ throw new ChampUnmarshallingException("Empty or null JSON string.");
+ }
+ return gson.fromJson(json, ChampEventEnvelope.class);
+ } catch (Exception ex) {
+ throw new ChampUnmarshallingException("Unable to parse JSON string: ");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+} \ No newline at end of file
diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java
index 59e01ea..a9dbdaf 100644
--- a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java
@@ -19,7 +19,7 @@
* ============LICENSE_END============================================
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.champ.event.envelope;
+package org.onap.aai.champcore.event.envelope;
import java.time.Instant;
import java.time.ZoneOffset;
@@ -31,11 +31,24 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
-public class GraphEventHeader {
+public class ChampEventHeader {
private static final String SOURCE_NAME = "CHAMP";
- private static final String EVENT_TYPE = "db-update-result";
+ public enum EventType {
+ UPDATE_RESULT("update-result"),
+ UPDATE_NOTIFICATION("update-notification-raw");
+
+ private final String name;
+
+ EventType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
@SerializedName("request-id")
private String requestId;
@@ -59,17 +72,19 @@ public class GraphEventHeader {
private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
- public static Builder builder() {
- return new Builder();
- }
-
public static class Builder {
+ private final EventType eventType;
+
private String requestId;
private String validationEntityType;
private String validationTopEntityType;
private String entityLink;
+ public Builder(EventType eventType) {
+ this.eventType = eventType;
+ }
+
public Builder requestId(String val) {
requestId = val;
return this;
@@ -90,16 +105,16 @@ public class GraphEventHeader {
return this;
}
- public GraphEventHeader build() {
- return new GraphEventHeader(this);
+ public ChampEventHeader build() {
+ return new ChampEventHeader(this);
}
}
- private GraphEventHeader(Builder builder) {
+ private ChampEventHeader(Builder builder) {
requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString();
timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now());
sourceName = SOURCE_NAME;
- eventType = EVENT_TYPE;
+ eventType = builder.eventType.getName();
validationEntityType = builder.validationEntityType;
validationTopEntityType = builder.validationTopEntityType;
@@ -197,18 +212,18 @@ public class GraphEventHeader {
*/
@Override
public boolean equals(Object obj) {
- if (!(obj instanceof GraphEventHeader)) {
+ if (!(obj instanceof ChampEventHeader)) {
return false;
} else if (obj == this) {
return true;
}
- GraphEventHeader rhs = (GraphEventHeader) obj;
+ ChampEventHeader rhs = (ChampEventHeader) obj;
// @formatter:off
return new EqualsBuilder()
.append(requestId, rhs.requestId)
.append(timestamp, rhs.timestamp)
.append(sourceName, rhs.sourceName)
- .append(eventType, rhs.sourceName)
+ .append(eventType, rhs.eventType)
.append(validationEntityType, rhs.validationEntityType)
.append(validationTopEntityType, rhs.validationTopEntityType)
.append(entityLink, rhs.entityLink)
diff --git a/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java
new file mode 100644
index 0000000..6b7bd02
--- /dev/null
+++ b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java
@@ -0,0 +1,60 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.event.envelope;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.onap.aai.champcore.event.ChampEvent;
+import org.onap.aai.champcore.model.ChampObject;
+import org.onap.aai.champcore.util.TestUtil;
+import org.skyscreamer.jsonassert.Customization;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.skyscreamer.jsonassert.JSONCompareMode;
+import org.skyscreamer.jsonassert.comparator.CustomComparator;
+
+public class ChampEventEnvelopeTest {
+
+ @Test
+ public void testEventEnvelopeFormat() throws Exception {
+ String expectedEnvelope = TestUtil.getFileAsString("event/event-envelope.json");
+
+ ChampEvent body = ChampEvent.builder().entity(new ChampObject.Builder("pserver").build()).build();
+
+ String envelope = new ChampEventEnvelope(body).toJson();
+
+ JSONAssert.assertEquals(expectedEnvelope, envelope,
+ new CustomComparator(JSONCompareMode.STRICT, new Customization("header.request-id", (o1, o2) -> true),
+ new Customization("header.timestamp", (o1, o2) -> true),
+ new Customization("body.timestamp", (o1, o2) -> true),
+ new Customization("body.transactionId", (o1, o2) -> true)));
+ }
+
+ @Test
+ public void testRequestIdIsTransactionId() throws Exception {
+ ChampEvent body = ChampEvent.builder().entity(new ChampObject.Builder("pserver").build()).build();
+
+ ChampEventEnvelope envelope = new ChampEventEnvelope(body);
+
+ assertThat(envelope.getHeader().getRequestId(), is(envelope.getBody().getTransactionId()));
+ }
+}
diff --git a/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java
new file mode 100644
index 0000000..2aaddad
--- /dev/null
+++ b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java
@@ -0,0 +1,55 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class TestUtil {
+
+ public static Path getPath(String resourceFilename) throws URISyntaxException {
+ URL resource = ClassLoader.getSystemResource(resourceFilename);
+ if (resource != null) {
+ return Paths.get(resource.toURI());
+ }
+
+ // If the resource is not found relative to the classpath, try to get it from the file system directly.
+ File file = new File(resourceFilename);
+ if (!file.exists()) {
+ throw new RuntimeException("Resource does not exist: " + resourceFilename);
+ }
+ return file.toPath();
+ }
+
+ public static String getContentUtf8(Path filePath) throws IOException {
+ return new String(Files.readAllBytes(filePath));
+ }
+
+ public static String getFileAsString(String resourceFilename) throws IOException, URISyntaxException {
+ return getContentUtf8(getPath(resourceFilename));
+ }
+} \ No newline at end of file
diff --git a/champ-lib/champ-core/src/test/resources/event/event-envelope.json b/champ-lib/champ-core/src/test/resources/event/event-envelope.json
new file mode 100644
index 0000000..0389c4a
--- /dev/null
+++ b/champ-lib/champ-core/src/test/resources/event/event-envelope.json
@@ -0,0 +1,17 @@
+{
+ "header": {
+ "request-id": "bffbbf77-e6fc-4c5a-af02-2dbe1bef003f",
+ "timestamp": "20180320T164558Z",
+ "source-name": "CHAMP",
+ "event-type": "update-notification-raw"
+ },
+ "body": {
+ "timestamp": 1521564357772,
+ "transactionId": "bffbbf77-e6fc-4c5a-af02-2dbe1bef003f",
+ "vertex": {
+ "type": "pserver",
+ "key": {},
+ "properties": {}
+ }
+ }
+} \ No newline at end of file
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
index 334871e..3b13a42 100644
--- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
+++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
@@ -29,16 +29,16 @@ import java.util.concurrent.ThreadPoolExecutor;
import javax.naming.OperationNotSupportedException;
import javax.ws.rs.core.Response.Status;
import org.onap.aai.champcore.ChampTransaction;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.event.api.EventConsumer;
import org.onap.champ.ChampRESTAPI;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.event.GraphEvent.GraphEventResult;
-import org.onap.champ.event.envelope.GraphEventEnvelope;
-import org.onap.champ.event.envelope.GraphEventHeader;
import org.onap.champ.event.GraphEventEdge;
import org.onap.champ.event.GraphEventVertex;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
import org.onap.champ.exception.ChampServiceException;
import org.onap.champ.service.ChampDataService;
import org.onap.champ.service.ChampThreadFactory;
@@ -159,7 +159,7 @@ public class ChampAsyncRequestProcessor extends TimerTask {
}
// Apply Champ Event header
- eventEnvelope.setHeader(GraphEventHeader.builder().requestId(event.getTransactionId()).build());
+ eventEnvelope.setHeader(new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_RESULT).requestId(event.getTransactionId()).build());
// Parse the event and call champ Dao to process , Create the
// response event and put it on response queue
diff --git a/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java b/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java
index cf2f11d..b967ee1 100644
--- a/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java
+++ b/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java
@@ -22,10 +22,6 @@ package org.onap.champ.event;
import javax.ws.rs.core.Response.Status;
-import org.onap.champ.exception.ChampServiceException;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
public class GraphEvent {
@@ -59,12 +55,6 @@ public class GraphEvent {
private Status httpErrorStatus;
- /**
- * Marshaller/unmarshaller for converting to/from JSON.
- */
- private static final Gson gson = new GsonBuilder().disableHtmlEscaping()
- .setPrettyPrinting().create();
-
public static Builder builder(GraphEventOperation operation) {
return new Builder(operation);
}
@@ -134,47 +124,6 @@ public class GraphEvent {
this.edge = edge;
}
- /**
- * Unmarshalls this Vertex object into a JSON string.
- *
- * @return - A JSON format string representation of this Vertex.
- */
- public String toJson() {
- return gson.toJson(this);
- }
-
- /**
- * Marshalls the provided JSON string into a Vertex object.
- *
- * @param json - The JSON string to produce the Vertex from.
- * @return - A Vertex object.
- * @throws SpikeException
- */
- public static GraphEvent fromJson(String json) throws ChampServiceException {
-
- try {
-
- // Make sure that we were actually provided a non-empty string
- // before we
- // go any further.
- if (json == null || json.isEmpty()) {
- throw new ChampServiceException("Empty or null JSON string.", Status.BAD_REQUEST);
- }
-
- // Marshall the string into a Vertex object.
- return gson.fromJson(json, GraphEvent.class);
-
- } catch (Exception ex) {
- throw new ChampServiceException("Unable to parse JSON string: ", Status.BAD_REQUEST);
- }
- }
-
- @Override
- public String toString() {
-
- return toJson();
- }
-
public String getObjectKey() {
if (this.getVertex() != null) {
return this.getVertex().getId();
diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
index 7958a3a..13e0f7a 100644
--- a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
+++ b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
@@ -22,6 +22,7 @@
package org.onap.champ.event.envelope;
import javax.ws.rs.core.Response.Status;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.exception.ChampServiceException;
import com.google.gson.Gson;
@@ -29,7 +30,7 @@ import com.google.gson.GsonBuilder;
public class GraphEventEnvelope {
- private GraphEventHeader header;
+ private ChampEventHeader header;
private GraphEvent body;
/**
@@ -38,20 +39,21 @@ public class GraphEventEnvelope {
private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
public GraphEventEnvelope(GraphEvent event) {
- this.header = new GraphEventHeader.Builder().requestId(event.getTransactionId()).build();
+ this.header = new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_RESULT)
+ .requestId(event.getTransactionId()).build();
this.body = event;
}
- public GraphEventEnvelope(GraphEventHeader header, GraphEvent body) {
+ public GraphEventEnvelope(ChampEventHeader header, GraphEvent body) {
this.header = header;
this.body = body;
}
- public GraphEventHeader getHeader() {
+ public ChampEventHeader getHeader() {
return header;
}
- public void setHeader(GraphEventHeader header) {
+ public void setHeader(ChampEventHeader header) {
this.header = header;
}
diff --git a/champ-service/src/test/resources/event/event-envelope.json b/champ-service/src/test/resources/event/event-envelope.json
index 68888c0..b31ab55 100644
--- a/champ-service/src/test/resources/event/event-envelope.json
+++ b/champ-service/src/test/resources/event/event-envelope.json
@@ -3,7 +3,7 @@
"request-id": "2253f351-d9b6-4638-9fe3-2c194bee1b29",
"timestamp": "20180316T092301Z",
"source-name": "CHAMP",
- "event-type": "db-update-result"
+ "event-type": "update-result"
},
"body": {
"operation": "CREATE",