aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--champ-lib/champ-core/pom.xml14
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java463
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java345
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java97
-rw-r--r--champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java (renamed from champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java)43
-rw-r--r--champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java60
-rw-r--r--champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java55
-rw-r--r--champ-lib/champ-core/src/test/resources/event/event-envelope.json17
-rw-r--r--champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java6
-rw-r--r--champ-service/src/main/java/org/onap/champ/event/GraphEvent.java51
-rw-r--r--champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java12
-rw-r--r--champ-service/src/test/resources/event/event-envelope.json2
12 files changed, 688 insertions, 477 deletions
diff --git a/champ-lib/champ-core/pom.xml b/champ-lib/champ-core/pom.xml
index 7315c16..4ce3d5f 100644
--- a/champ-lib/champ-core/pom.xml
+++ b/champ-lib/champ-core/pom.xml
@@ -105,6 +105,20 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.skyscreamer</groupId>
+ <artifactId>jsonassert</artifactId>
+ <version>1.5.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
index 07647d2..1f93a97 100644
--- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java
@@ -32,11 +32,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
-
import org.onap.aai.champcore.ChampCapabilities;
import org.onap.aai.champcore.ChampGraph;
import org.onap.aai.champcore.ChampTransaction;
import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
+import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
import org.onap.aai.champcore.exceptions.ChampMarshallingException;
import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
@@ -52,11 +53,10 @@ import org.onap.aai.champcore.model.ChampRelationship;
import org.onap.aai.champcore.model.ChampRelationshipConstraint;
import org.onap.aai.champcore.model.ChampRelationshipIndex;
import org.onap.aai.champcore.model.ChampSchema;
+import org.onap.aai.event.api.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.onap.aai.event.api.EventPublisher;
-
/**
@@ -66,10 +66,10 @@ import org.onap.aai.event.api.EventPublisher;
public abstract class AbstractLoggingChampGraph implements ChampGraph {
private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
-
+
public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
- public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
+ public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
@Override
public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
@@ -80,48 +80,48 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
-
+
/**
* Creates or updates a vertex in the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param object - The vertex to be created or updated.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The vertex, as created, marshaled as a {@link ChampObject}
- *
- * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
+ *
+ * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
* into the backend representation
- * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
+ * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
* by {@link ChampGraph#retrieveSchema}
- * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
* is not present or object not found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
-
+
/**
* Updates an existing vertex in the graph store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param object - The vertex to be created or updated.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The updated vertex, marshaled as a {@link ChampObject}
- *
- * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
+ *
+ * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
* the backend representation
- * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
+ * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
* by {@link ChampGraph#retrieveSchema}
- * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
* is not present or object not found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
@@ -130,136 +130,136 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
/**
* Deletes an existing vertex from the graph store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
* @param transaction - Optional transaction context to perform the operation in.
- *
- * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
+ *
+ * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
* is not present or object not found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
-
+
/**
* Creates or updates an edge in the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param relationship - The ChampRelationship that you wish to store in the graph
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The {@link ChampRelationship} as it was stored.
- *
- * @throws ChampUnmarshallingException - If the edge which was created could not be
+ *
+ * @throws ChampUnmarshallingException - If the edge which was created could not be
* unmarshalled into a ChampRelationship
- * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
* marshalled into the backend representation
- * @throws ChampObjectNotExistsException - If either the source or target object referenced
+ * @throws ChampObjectNotExistsException - If either the source or target object referenced
* by this relationship does not exist in the graph
- * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
* specifed by {@link ChampGraph#retrieveSchema}
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
- public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+ public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
/**
* Replaces an existing edge in the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param relationship - The ChampRelationship that you wish to replace in the graph
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The {@link ChampRelationship} as it was stored.
- *
- * @throws ChampUnmarshallingException - If the edge which was created could not be
+ *
+ * @throws ChampUnmarshallingException - If the edge which was created could not be
* unmarshalled into a ChampRelationship
- * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
* marshalled into the backend representation
- * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
* specifed by {@link ChampGraph#retrieveSchema}
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
- public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
-
+ public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+
/**
* Removes an edge from the graph data store.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param relationship - The ChampRelationship that you wish to remove from the graph.
* @param transaction - Optional transaction context to perform the operation in.
- *
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ *
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
-
+
/**
* Create or update a {@link ChampPartition}.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param partition - The ChampPartition that you wish to create or update in the graph.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @return - The {@link ChampPartition} as it was stored.
- *
- * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
+ *
+ * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
* specifed by {@link ChampGraph#retrieveSchema}
- * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
+ * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
* but the object cannot be found in the graph
- * @throws ChampMarshallingException - If the {@code relationship} is not able to be
+ * @throws ChampMarshallingException - If the {@code relationship} is not able to be
* marshalled into the backend representation
- * @throws ChampObjectNotExistsException - If either the source or target object referenced
+ * @throws ChampObjectNotExistsException - If either the source or target object referenced
* by this relationship does not exist in the graph
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
-
+
/**
* Removes a partition from the graph.
* <p>
- * If a transaction context is not provided, then a transaction will be automatically
+ * If a transaction context is not provided, then a transaction will be automatically
* created and committed for this operation only, otherwise, the supplied transaction
- * will be used and it will be up to the caller to commit the transaction at its
+ * will be used and it will be up to the caller to commit the transaction at its
* discretion.
- *
+ *
* @param graph - The partition to be removed.
* @param transaction - Optional transaction context to perform the operation in.
- *
+ *
* @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
*/
public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
-
+
/**
* Create or update an object index in the graph.
- *
+ *
* @param index - The object index to be created/updated.
*/
public abstract void executeStoreObjectIndex(ChampObjectIndex index);
-
+
public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
@@ -274,56 +274,56 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
public abstract void deleteSchema();
public abstract ChampCapabilities capabilities();
-
-
+
+
public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
-
+
public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
-
- public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
-
-
-
+
+ public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+
+
+
/** Number of events that can be queued up for publication before we begin dropping
* events. */
private Integer eventQueueCapacity;
-
+
/** Number of event publisher worker threads. */
private Integer eventStreamPublisherPoolSize;
-
+
/** Pool of worker threads that do the work of publishing the events to the event bus. */
protected ThreadPoolExecutor publisherPool;
-
+
/** Client used for publishing events to the event bus. */
protected EventPublisher producer;
-
- /** Internal queue where outgoing events will be buffered until they can be serviced by
+
+ /** Internal queue where outgoing events will be buffered until they can be serviced by
* the event publisher worker threads. */
protected BlockingQueue<ChampEvent> eventQueue;
-
+
/**
* Thread factory for the event producer workers.
*/
private class ProducerWorkerThreadFactory implements ThreadFactory {
-
+
private AtomicInteger threadNumber = new AtomicInteger(1);
-
+
public Thread newThread(Runnable r) {
return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
}
}
-
-
+
+
/**
* Create a new instance of the AbstractLoggingChampGraph.
- *
+ *
* @param properties - Set of configuration properties for this graph instance.
*/
protected AbstractLoggingChampGraph(Map<String, Object> properties) {
-
+
// Extract the necessary parameters from the configuration properties.
configure(properties);
@@ -334,104 +334,104 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
logger.error("NOTE!! Champ events will NOT be published to the event stream!");
return;
}
-
+
// Create the blocking queue that we will use to buffer events that we want
// published to the event bus.
eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
-
+
// Create the executor pool that will do the work of publishing events to the event bus.
- publisherPool =
- (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
+ publisherPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
new ProducerWorkerThreadFactory());
-
+
try {
-
+
// Start up the producer worker threads.
for(int i=0; i<eventStreamPublisherPoolSize; i++) {
- publisherPool.submit(new EventPublisherWorker());
+ publisherPool.submit(new EventPublisherWorker());
}
-
+
} catch (Exception e) {
-
+
logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
logger.error("NOTE!! Champ events may NOT be published to the event stream!");
return;
}
}
-
+
/**
* Process the configuration properties supplied for this graph instance.
- *
+ *
* @param properties - Configuration parameters.
*/
private void configure(Map<String, Object> properties) {
-
+
producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
-
+
eventQueueCapacity =
(Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
- eventStreamPublisherPoolSize =
+ eventStreamPublisherPoolSize =
(Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
}
-
-
+
+
public void setProducer(EventPublisher aProducer) {
-
+
producer = aProducer;
}
-
+
private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
-
+
if(properties.containsKey(property)) {
return properties.get(property);
} else {
return defaultValue;
}
}
-
+
@Override
public void shutdown() {
-
+
if(publisherPool != null) {
publisherPool.shutdown();
-
+
try {
publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {}
}
-
+
if(producer != null) {
-
+
try {
producer.close();
-
+
} catch (Exception e) {
logger.error("Failed to stop event stream producer: " + e.getMessage());
}
}
}
-
+
@Override
public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
-
+
try {
-
+
// Commit the transaction.
transaction.commit();
-
+
} catch (ChampTransactionException e) {
-
+
logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
for(ChampEvent event : enqueuedEvents) {
-
+
logger.debug("Graph event " + event.toString() + " not published.");
}
throw e;
}
-
+
// Now that the transaction has been successfully committed, we need
// to log the events that were produced within that transaction's
// context.
@@ -440,81 +440,81 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
logEvent(event);
}
}
-
+
@Override
public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
-
- // Rollback the transaction.
+
+ // Rollback the transaction.
transaction.rollback();
}
-
+
@Override
public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
return storeObject(object, Optional.empty());
}
-
+
@Override
public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
-
+
ChampObject storedObject = executeStoreObject(object, transaction);
-
+
if(storedObject != null) {
-
+
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
.entity(storedObject)
- .build(),
+ .build(),
transaction);
}
-
+
return storedObject;
}
-
+
@Override
public ChampObject replaceObject(ChampObject object)
throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
return replaceObject(object, Optional.empty());
}
-
+
@Override
public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
-
+
ChampObject replacedObject = executeReplaceObject(object, transaction);
-
+
if(replacedObject != null) {
-
+
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.REPLACE)
.entity(replacedObject)
- .build(),
+ .build(),
transaction);
}
-
+
return replacedObject;
}
-
+
@Override
public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
deleteObject(key, Optional.empty());
}
-
+
@Override
public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
- // Retrieve the object that we are deleting before it's gone, so that we can
+ // Retrieve the object that we are deleting before it's gone, so that we can
// report it to the event stream.
Optional<ChampObject> objectToDelete = Optional.empty();
try {
objectToDelete = retrieveObject(key, transaction);
-
+
} catch (ChampUnmarshallingException e) {
logger.error("Unable to generate delete object log: " + e.getMessage());
}
-
+
executeDeleteObject(key, transaction);
-
+
if(objectToDelete.isPresent()) {
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
@@ -524,29 +524,29 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
transaction);
}
}
-
+
@Override
public ChampRelationship storeRelationship(ChampRelationship relationship)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampObjectNotExistsException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampObjectNotExistsException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
return storeRelationship(relationship, Optional.empty());
}
-
+
@Override
public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampObjectNotExistsException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampObjectNotExistsException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
-
+
if(storedRelationship != null) {
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
@@ -554,30 +554,30 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
return storedRelationship;
}
@Override
public ChampRelationship replaceRelationship(ChampRelationship relationship)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
return replaceRelationship(relationship, Optional.empty());
}
-
+
@Override
public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
- throws ChampUnmarshallingException,
- ChampMarshallingException,
- ChampSchemaViolationException,
- ChampRelationshipNotExistsException, ChampTransactionException {
+ throws ChampUnmarshallingException,
+ ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampRelationshipNotExistsException, ChampTransactionException {
ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
-
+
if(replacedRelationship != null) {
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.REPLACE)
@@ -585,20 +585,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
return replacedRelationship;
}
-
+
@Override
public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
deleteRelationship(relationship, Optional.empty());
}
-
+
@Override
public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
executeDeleteRelationship(relationship, transaction);
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
@@ -606,19 +606,19 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
@Override
public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
return storePartition(partition, Optional.empty());
}
-
+
@Override
public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
ChampPartition storedPartition = executeStorePartition(partition, transaction);
-
+
if(storedPartition != null) {
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
@@ -626,20 +626,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
return storedPartition;
}
-
+
@Override
public void deletePartition(ChampPartition graph) throws ChampTransactionException{
deletePartition(graph, Optional.empty());
}
-
+
@Override
public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
executeDeletePartition(graph, transaction);
-
+
// Update the event stream with the current operation.
logOrEnqueueEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
@@ -647,69 +647,69 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
.build(),
transaction);
}
-
+
@Override
public void storeObjectIndex(ChampObjectIndex index) {
executeStoreObjectIndex(index);
-
+
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
.entity(index)
.build());
}
-
-
+
+
public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
-
- // Retrieve the index that we are deleting before it's gone, so that we can
+
+ // Retrieve the index that we are deleting before it's gone, so that we can
// report it to the event stream.
Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
-
+
executeDeleteObjectIndex(indexName);
-
+
if(indexToDelete.isPresent()) {
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
- .entity(indexToDelete.get())
+ .entity(indexToDelete.get())
.build());
}
}
-
-
+
+
public void storeRelationshipIndex(ChampRelationshipIndex index) {
executeStoreRelationshipIndex(index);
-
+
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.STORE)
- .entity(index)
+ .entity(index)
.build());
}
-
-
+
+
public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
- // Retrieve the index that we are deleting before it's gone, so that we can
+ // Retrieve the index that we are deleting before it's gone, so that we can
// report it to the event stream.
Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
-
+
executeDeleteRelationshipIndex(indexName);
-
+
if(indexToDelete.isPresent()) {
// Update the event stream with the current operation.
logEvent(ChampEvent.builder()
.operation(ChampOperation.DELETE)
- .entity(indexToDelete.get())
+ .entity(indexToDelete.get())
.build());
}
}
-
+
private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
-
+
if(!transaction.isPresent()) {
// Update the event stream with the current operation.
logEvent(event);
@@ -720,34 +720,34 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
transaction.get().logEvent(event);
}
}
-
+
/**
* Submits an event to be published to the event stream.
- *
+ *
* @param anEvent - The event to be published.
*/
public void logEvent(ChampEvent anEvent) {
-
+
if(eventQueue == null) {
return;
}
-
+
logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
if(logger.isDebugEnabled()) {
logger.debug("Event payload: " + anEvent.toString());
}
-
+
// Try to submit the event to be published to the event bus.
if(!eventQueue.offer(anEvent)) {
logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
}
}
-
-
+
+
/**
- * This class implements the worker threads for our thread pool which are responsible for
+ * This class implements the worker threads for our thread pool which are responsible for
* pulling the next outgoing event from the internal buffer and forwarding them to the event
- * bus client.
+ * bus client.
* <p>
* Each publish operation is performed synchronously, so that the thread will only move on
* to the next available event once it has actually published the current event to the bus.
@@ -757,32 +757,33 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
/** Partition key to use when publishing events to the event stream. We WANT all events
* to go to a single partition, so we are just using a hard-coded key for every event. */
private static final String EVENTS_PARTITION_KEY = "champEventKey";
-
-
+
+
@Override
public void run() {
-
- while(true) {
-
+
+ while(true) {
ChampEvent event = null;
try {
-
+
// Get the next event to be published from the queue.
event = eventQueue.take();
-
+
} catch (InterruptedException e) {
-
+
// Restore the interrupted status.
Thread.currentThread().interrupt();
}
-
- // Try publishing the event to the event bus. This call will block
- // until
+
+ // Create new envelope containing an event header and ChampEvent
+ ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
+
+ // Try publishing the event to the event bus. This call will block until
try {
- producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
-
+ producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
+
} catch (Exception e) {
-
+
logger.error("Failed to publish event to event bus: " + e.getMessage());
}
}
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java
index 3fd57a3..e7feeda 100644
--- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/ChampEvent.java
@@ -22,13 +22,11 @@ package org.onap.aai.champcore.event;
import java.io.IOException;
-
import org.onap.aai.champcore.model.ChampObject;
import org.onap.aai.champcore.model.ChampObjectIndex;
import org.onap.aai.champcore.model.ChampPartition;
import org.onap.aai.champcore.model.ChampRelationship;
import org.onap.aai.champcore.model.ChampRelationshipIndex;
-
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;
@@ -39,174 +37,177 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class ChampEvent {
- public enum ChampOperation {
- STORE,
- REPLACE,
- DELETE
- }
-
- private static ObjectMapper mapper = new ObjectMapper();
-
- private ChampOperation operation;
- private long timestamp;
- private String transactionId = null;
- private ChampObject vertex = null;
- private ChampRelationship relationship = null;
- private ChampPartition partition = null;
- private ChampObjectIndex objectIndex = null;
- private ChampRelationshipIndex relationshipIndex = null;
- private String dbTransactionId = null;
-
-
- public static Builder builder() {
- return new Builder();
- }
-
- public ChampOperation getOperation() {
- return operation;
- }
-
- public void setOperation(ChampOperation operation) {
- this.operation = operation;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @JsonProperty("transaction-id")
- public String getTransactionId() {
- return transactionId;
- }
-
- public void setTransactionId(String transactionId) {
- this.transactionId = transactionId;
- }
-
- public ChampObject getVertex() {
- return vertex;
- }
-
- public void setVertex(ChampObject vertex) {
- this.vertex = vertex;
- }
-
- public ChampRelationship getRelationship() {
- return relationship;
- }
-
- public void setRelationship(ChampRelationship relationship) {
- this.relationship = relationship;
- }
-
- public ChampPartition getPartition() {
- return partition;
- }
-
- public void setPartition(ChampPartition partition) {
- this.partition = partition;
- }
-
- public ChampObjectIndex getObjectIndex() {
- return objectIndex;
- }
-
- public void setObjectIndex(ChampObjectIndex index) {
- this.objectIndex = index;
- }
-
- public ChampRelationshipIndex getRelationshipIndex() {
- return relationshipIndex;
- }
-
- public void setRelationshipIndex(ChampRelationshipIndex relationshipIndex) {
- this.relationshipIndex = relationshipIndex;
- }
-
- @JsonProperty("database-transaction-id")
- public String getDbTransactionId () { return dbTransactionId; }
-
-
- public void setDbTransactionId ( String id ) { this.dbTransactionId = id; }
-
-
-
- public String toJson() {
-
- ObjectMapper mapper = new ObjectMapper();
- mapper.setSerializationInclusion(Include.NON_NULL);
-
- try {
- return mapper.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- return "Unmarshallable: " + e.getMessage();
- }
- }
-
- public static ChampEvent fromJson(String json) throws JsonParseException, JsonMappingException, IOException {
-
- mapper.setSerializationInclusion(Include.NON_NULL);
- return mapper.readValue(json, ChampEvent.class);
- }
- @Override
- public String toString() {
-
- return toJson();
- }
-
- public static class Builder {
-
- ChampEvent event = null;
-
-
- public Builder() {
- event = new ChampEvent();
- }
-
- public Builder operation(ChampOperation operation) {
- event.setOperation(operation);
- return this;
- }
-
- public Builder entity(ChampObject entity) {
- event.setVertex(entity);
- return this;
- }
-
- public Builder entity(ChampRelationship relationship) {
- event.relationship = relationship;
- return this;
- }
-
- public Builder entity(ChampPartition partition) {
- event.partition = partition;
- return this;
- }
-
- public Builder entity(ChampObjectIndex index) {
- event.objectIndex = index;
- return this;
- }
-
- public Builder entity(ChampRelationshipIndex relationshipIndex) {
- event.relationshipIndex = relationshipIndex;
- return this;
- }
-
-
- public ChampEvent build() {
-
- event.setTimestamp(System.currentTimeMillis());
-
- // Set a unique transaction id on this event that can be used by downstream entities
- // for log correlation.
- event.setTransactionId(java.util.UUID.randomUUID().toString());
-
- return event;
- }
- }
+ public enum ChampOperation {
+ STORE, REPLACE, DELETE
+ }
+
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ private ChampOperation operation;
+ private long timestamp;
+ private String transactionId = null;
+ private ChampObject vertex = null;
+ private ChampRelationship relationship = null;
+ private ChampPartition partition = null;
+ private ChampObjectIndex objectIndex = null;
+ private ChampRelationshipIndex relationshipIndex = null;
+ private String dbTransactionId = null;
+
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public ChampOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(ChampOperation operation) {
+ this.operation = operation;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @JsonProperty("transaction-id")
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public ChampObject getVertex() {
+ return vertex;
+ }
+
+ public void setVertex(ChampObject vertex) {
+ this.vertex = vertex;
+ }
+
+ public ChampRelationship getRelationship() {
+ return relationship;
+ }
+
+ public void setRelationship(ChampRelationship relationship) {
+ this.relationship = relationship;
+ }
+
+ public ChampPartition getPartition() {
+ return partition;
+ }
+
+ public void setPartition(ChampPartition partition) {
+ this.partition = partition;
+ }
+
+ public ChampObjectIndex getObjectIndex() {
+ return objectIndex;
+ }
+
+ public void setObjectIndex(ChampObjectIndex index) {
+ this.objectIndex = index;
+ }
+
+ public ChampRelationshipIndex getRelationshipIndex() {
+ return relationshipIndex;
+ }
+
+ public void setRelationshipIndex(ChampRelationshipIndex relationshipIndex) {
+ this.relationshipIndex = relationshipIndex;
+ }
+
+ @JsonProperty("database-transaction-id")
+ public String getDbTransactionId() {
+ return dbTransactionId;
+ }
+
+
+ public void setDbTransactionId(String id) {
+ this.dbTransactionId = id;
+ }
+
+
+
+ public String toJson() {
+
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.setSerializationInclusion(Include.NON_NULL);
+
+ try {
+ return mapper.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ return "Unmarshallable: " + e.getMessage();
+ }
+ }
+
+ public static ChampEvent fromJson(String json) throws JsonParseException, JsonMappingException, IOException {
+
+ mapper.setSerializationInclusion(Include.NON_NULL);
+ return mapper.readValue(json, ChampEvent.class);
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+
+ public static class Builder {
+
+ ChampEvent event = null;
+
+
+ public Builder() {
+ event = new ChampEvent();
+ }
+
+ public Builder operation(ChampOperation operation) {
+ event.setOperation(operation);
+ return this;
+ }
+
+ public Builder entity(ChampObject entity) {
+ event.setVertex(entity);
+ return this;
+ }
+
+ public Builder entity(ChampRelationship relationship) {
+ event.relationship = relationship;
+ return this;
+ }
+
+ public Builder entity(ChampPartition partition) {
+ event.partition = partition;
+ return this;
+ }
+
+ public Builder entity(ChampObjectIndex index) {
+ event.objectIndex = index;
+ return this;
+ }
+
+ public Builder entity(ChampRelationshipIndex relationshipIndex) {
+ event.relationshipIndex = relationshipIndex;
+ return this;
+ }
+
+
+ public ChampEvent build() {
+
+ event.setTimestamp(System.currentTimeMillis());
+
+ // Set a unique transaction id on this event that can be used by downstream entities
+ // for log correlation.
+ event.setTransactionId(java.util.UUID.randomUUID().toString());
+
+ return event;
+ }
+ }
}
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java
new file mode 100644
index 0000000..2547ae5
--- /dev/null
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelope.java
@@ -0,0 +1,97 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.event.envelope;
+
+import org.onap.aai.champcore.event.ChampEvent;
+import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class ChampEventEnvelope {
+
+ private ChampEventHeader header;
+ private ChampEvent body;
+
+ /**
+ * Marshaller/unmarshaller for converting to/from JSON.
+ */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public ChampEventEnvelope(ChampEvent event) {
+ this.header = new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_NOTIFICATION)
+ .requestId(event.getTransactionId()).build();
+ this.body = event;
+ }
+
+ public ChampEventEnvelope(ChampEventHeader header, ChampEvent body) {
+ this.header = header;
+ this.body = body;
+ }
+
+ public ChampEventHeader getHeader() {
+ return header;
+ }
+
+ public void setHeader(ChampEventHeader header) {
+ this.header = header;
+ }
+
+ public ChampEvent getBody() {
+ return body;
+ }
+
+ public void setBody(ChampEvent body) {
+ this.body = body;
+ }
+
+ /**
+ * Serializes this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Deserializes the provided JSON string into a Event Envelope object.
+ *
+ * @param json the JSON string to produce the Event Envelope from.
+ * @return an Event Envelope object.
+ * @throws ChampUnmarshallingException
+ */
+ public static ChampEventEnvelope fromJson(String json) throws ChampUnmarshallingException {
+ try {
+ if (json == null || json.isEmpty()) {
+ throw new ChampUnmarshallingException("Empty or null JSON string.");
+ }
+ return gson.fromJson(json, ChampEventEnvelope.class);
+ } catch (Exception ex) {
+ throw new ChampUnmarshallingException("Unable to parse JSON string: ");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+} \ No newline at end of file
diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java
index 59e01ea..a9dbdaf 100644
--- a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java
+++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/envelope/ChampEventHeader.java
@@ -19,7 +19,7 @@
* ============LICENSE_END============================================
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.champ.event.envelope;
+package org.onap.aai.champcore.event.envelope;
import java.time.Instant;
import java.time.ZoneOffset;
@@ -31,11 +31,24 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
-public class GraphEventHeader {
+public class ChampEventHeader {
private static final String SOURCE_NAME = "CHAMP";
- private static final String EVENT_TYPE = "db-update-result";
+ public enum EventType {
+ UPDATE_RESULT("update-result"),
+ UPDATE_NOTIFICATION("update-notification-raw");
+
+ private final String name;
+
+ EventType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
@SerializedName("request-id")
private String requestId;
@@ -59,17 +72,19 @@ public class GraphEventHeader {
private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
- public static Builder builder() {
- return new Builder();
- }
-
public static class Builder {
+ private final EventType eventType;
+
private String requestId;
private String validationEntityType;
private String validationTopEntityType;
private String entityLink;
+ public Builder(EventType eventType) {
+ this.eventType = eventType;
+ }
+
public Builder requestId(String val) {
requestId = val;
return this;
@@ -90,16 +105,16 @@ public class GraphEventHeader {
return this;
}
- public GraphEventHeader build() {
- return new GraphEventHeader(this);
+ public ChampEventHeader build() {
+ return new ChampEventHeader(this);
}
}
- private GraphEventHeader(Builder builder) {
+ private ChampEventHeader(Builder builder) {
requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString();
timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now());
sourceName = SOURCE_NAME;
- eventType = EVENT_TYPE;
+ eventType = builder.eventType.getName();
validationEntityType = builder.validationEntityType;
validationTopEntityType = builder.validationTopEntityType;
@@ -197,18 +212,18 @@ public class GraphEventHeader {
*/
@Override
public boolean equals(Object obj) {
- if (!(obj instanceof GraphEventHeader)) {
+ if (!(obj instanceof ChampEventHeader)) {
return false;
} else if (obj == this) {
return true;
}
- GraphEventHeader rhs = (GraphEventHeader) obj;
+ ChampEventHeader rhs = (ChampEventHeader) obj;
// @formatter:off
return new EqualsBuilder()
.append(requestId, rhs.requestId)
.append(timestamp, rhs.timestamp)
.append(sourceName, rhs.sourceName)
- .append(eventType, rhs.sourceName)
+ .append(eventType, rhs.eventType)
.append(validationEntityType, rhs.validationEntityType)
.append(validationTopEntityType, rhs.validationTopEntityType)
.append(entityLink, rhs.entityLink)
diff --git a/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java
new file mode 100644
index 0000000..6b7bd02
--- /dev/null
+++ b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/event/envelope/ChampEventEnvelopeTest.java
@@ -0,0 +1,60 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.event.envelope;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.onap.aai.champcore.event.ChampEvent;
+import org.onap.aai.champcore.model.ChampObject;
+import org.onap.aai.champcore.util.TestUtil;
+import org.skyscreamer.jsonassert.Customization;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.skyscreamer.jsonassert.JSONCompareMode;
+import org.skyscreamer.jsonassert.comparator.CustomComparator;
+
+public class ChampEventEnvelopeTest {
+
+ @Test
+ public void testEventEnvelopeFormat() throws Exception {
+ String expectedEnvelope = TestUtil.getFileAsString("event/event-envelope.json");
+
+ ChampEvent body = ChampEvent.builder().entity(new ChampObject.Builder("pserver").build()).build();
+
+ String envelope = new ChampEventEnvelope(body).toJson();
+
+ JSONAssert.assertEquals(expectedEnvelope, envelope,
+ new CustomComparator(JSONCompareMode.STRICT, new Customization("header.request-id", (o1, o2) -> true),
+ new Customization("header.timestamp", (o1, o2) -> true),
+ new Customization("body.timestamp", (o1, o2) -> true),
+ new Customization("body.transactionId", (o1, o2) -> true)));
+ }
+
+ @Test
+ public void testRequestIdIsTransactionId() throws Exception {
+ ChampEvent body = ChampEvent.builder().entity(new ChampObject.Builder("pserver").build()).build();
+
+ ChampEventEnvelope envelope = new ChampEventEnvelope(body);
+
+ assertThat(envelope.getHeader().getRequestId(), is(envelope.getBody().getTransactionId()));
+ }
+}
diff --git a/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java
new file mode 100644
index 0000000..2aaddad
--- /dev/null
+++ b/champ-lib/champ-core/src/test/java/org/onap/aai/champcore/util/TestUtil.java
@@ -0,0 +1,55 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.champcore.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class TestUtil {
+
+ public static Path getPath(String resourceFilename) throws URISyntaxException {
+ URL resource = ClassLoader.getSystemResource(resourceFilename);
+ if (resource != null) {
+ return Paths.get(resource.toURI());
+ }
+
+ // If the resource is not found relative to the classpath, try to get it from the file system directly.
+ File file = new File(resourceFilename);
+ if (!file.exists()) {
+ throw new RuntimeException("Resource does not exist: " + resourceFilename);
+ }
+ return file.toPath();
+ }
+
+ public static String getContentUtf8(Path filePath) throws IOException {
+ return new String(Files.readAllBytes(filePath));
+ }
+
+ public static String getFileAsString(String resourceFilename) throws IOException, URISyntaxException {
+ return getContentUtf8(getPath(resourceFilename));
+ }
+} \ No newline at end of file
diff --git a/champ-lib/champ-core/src/test/resources/event/event-envelope.json b/champ-lib/champ-core/src/test/resources/event/event-envelope.json
new file mode 100644
index 0000000..0389c4a
--- /dev/null
+++ b/champ-lib/champ-core/src/test/resources/event/event-envelope.json
@@ -0,0 +1,17 @@
+{
+ "header": {
+ "request-id": "bffbbf77-e6fc-4c5a-af02-2dbe1bef003f",
+ "timestamp": "20180320T164558Z",
+ "source-name": "CHAMP",
+ "event-type": "update-notification-raw"
+ },
+ "body": {
+ "timestamp": 1521564357772,
+ "transactionId": "bffbbf77-e6fc-4c5a-af02-2dbe1bef003f",
+ "vertex": {
+ "type": "pserver",
+ "key": {},
+ "properties": {}
+ }
+ }
+} \ No newline at end of file
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
index 334871e..3b13a42 100644
--- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
+++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
@@ -29,16 +29,16 @@ import java.util.concurrent.ThreadPoolExecutor;
import javax.naming.OperationNotSupportedException;
import javax.ws.rs.core.Response.Status;
import org.onap.aai.champcore.ChampTransaction;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.event.api.EventConsumer;
import org.onap.champ.ChampRESTAPI;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.event.GraphEvent.GraphEventResult;
-import org.onap.champ.event.envelope.GraphEventEnvelope;
-import org.onap.champ.event.envelope.GraphEventHeader;
import org.onap.champ.event.GraphEventEdge;
import org.onap.champ.event.GraphEventVertex;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
import org.onap.champ.exception.ChampServiceException;
import org.onap.champ.service.ChampDataService;
import org.onap.champ.service.ChampThreadFactory;
@@ -159,7 +159,7 @@ public class ChampAsyncRequestProcessor extends TimerTask {
}
// Apply Champ Event header
- eventEnvelope.setHeader(GraphEventHeader.builder().requestId(event.getTransactionId()).build());
+ eventEnvelope.setHeader(new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_RESULT).requestId(event.getTransactionId()).build());
// Parse the event and call champ Dao to process , Create the
// response event and put it on response queue
diff --git a/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java b/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java
index cf2f11d..b967ee1 100644
--- a/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java
+++ b/champ-service/src/main/java/org/onap/champ/event/GraphEvent.java
@@ -22,10 +22,6 @@ package org.onap.champ.event;
import javax.ws.rs.core.Response.Status;
-import org.onap.champ.exception.ChampServiceException;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
public class GraphEvent {
@@ -59,12 +55,6 @@ public class GraphEvent {
private Status httpErrorStatus;
- /**
- * Marshaller/unmarshaller for converting to/from JSON.
- */
- private static final Gson gson = new GsonBuilder().disableHtmlEscaping()
- .setPrettyPrinting().create();
-
public static Builder builder(GraphEventOperation operation) {
return new Builder(operation);
}
@@ -134,47 +124,6 @@ public class GraphEvent {
this.edge = edge;
}
- /**
- * Unmarshalls this Vertex object into a JSON string.
- *
- * @return - A JSON format string representation of this Vertex.
- */
- public String toJson() {
- return gson.toJson(this);
- }
-
- /**
- * Marshalls the provided JSON string into a Vertex object.
- *
- * @param json - The JSON string to produce the Vertex from.
- * @return - A Vertex object.
- * @throws SpikeException
- */
- public static GraphEvent fromJson(String json) throws ChampServiceException {
-
- try {
-
- // Make sure that we were actually provided a non-empty string
- // before we
- // go any further.
- if (json == null || json.isEmpty()) {
- throw new ChampServiceException("Empty or null JSON string.", Status.BAD_REQUEST);
- }
-
- // Marshall the string into a Vertex object.
- return gson.fromJson(json, GraphEvent.class);
-
- } catch (Exception ex) {
- throw new ChampServiceException("Unable to parse JSON string: ", Status.BAD_REQUEST);
- }
- }
-
- @Override
- public String toString() {
-
- return toJson();
- }
-
public String getObjectKey() {
if (this.getVertex() != null) {
return this.getVertex().getId();
diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
index 7958a3a..13e0f7a 100644
--- a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
+++ b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
@@ -22,6 +22,7 @@
package org.onap.champ.event.envelope;
import javax.ws.rs.core.Response.Status;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.exception.ChampServiceException;
import com.google.gson.Gson;
@@ -29,7 +30,7 @@ import com.google.gson.GsonBuilder;
public class GraphEventEnvelope {
- private GraphEventHeader header;
+ private ChampEventHeader header;
private GraphEvent body;
/**
@@ -38,20 +39,21 @@ public class GraphEventEnvelope {
private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
public GraphEventEnvelope(GraphEvent event) {
- this.header = new GraphEventHeader.Builder().requestId(event.getTransactionId()).build();
+ this.header = new ChampEventHeader.Builder(ChampEventHeader.EventType.UPDATE_RESULT)
+ .requestId(event.getTransactionId()).build();
this.body = event;
}
- public GraphEventEnvelope(GraphEventHeader header, GraphEvent body) {
+ public GraphEventEnvelope(ChampEventHeader header, GraphEvent body) {
this.header = header;
this.body = body;
}
- public GraphEventHeader getHeader() {
+ public ChampEventHeader getHeader() {
return header;
}
- public void setHeader(GraphEventHeader header) {
+ public void setHeader(ChampEventHeader header) {
this.header = header;
}
diff --git a/champ-service/src/test/resources/event/event-envelope.json b/champ-service/src/test/resources/event/event-envelope.json
index 68888c0..b31ab55 100644
--- a/champ-service/src/test/resources/event/event-envelope.json
+++ b/champ-service/src/test/resources/event/event-envelope.json
@@ -3,7 +3,7 @@
"request-id": "2253f351-d9b6-4638-9fe3-2c194bee1b29",
"timestamp": "20180316T092301Z",
"source-name": "CHAMP",
- "event-type": "db-update-result"
+ "event-type": "update-result"
},
"body": {
"operation": "CREATE",