summaryrefslogtreecommitdiffstats
path: root/champ-service/src
diff options
context:
space:
mode:
authorMichael Arrastia <MArrasti@amdocs.com>2018-03-28 19:07:34 +0100
committerMichael Arrastia <MArrasti@amdocs.com>2018-03-28 19:07:34 +0100
commit282a3420f6a8e2174034fcfa98b5a3ece28023a7 (patch)
tree558d1d3d559acf291e325769e987ce82ce44a117 /champ-service/src
parent533b090aa92f5eaa6c674fd63940fcacf4dc811e (diff)
Update to consume and publish events in new format
The new format includes: - the graph request/response encapsulated in a body property - new event header with details such as timestamp, request-id, event-type Issue-ID: AAI-960 Change-Id: Ib84ddd54352ca95c3968d2d2936f6348951c2d2c Signed-off-by: Michael Arrastia <MArrasti@amdocs.com>
Diffstat (limited to 'champ-service/src')
-rw-r--r--champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java469
-rw-r--r--champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java25
-rw-r--r--champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java98
-rw-r--r--champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java228
-rw-r--r--champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java42
-rw-r--r--champ-service/src/test/java/org/onap/champ/util/TestUtil.java34
-rw-r--r--champ-service/src/test/resources/event/event-envelope.json19
7 files changed, 668 insertions, 247 deletions
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
index 610cac9..334871e 100644
--- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
+++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
@@ -26,16 +26,17 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-
import javax.naming.OperationNotSupportedException;
import javax.ws.rs.core.Response.Status;
-
import org.onap.aai.champcore.ChampTransaction;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
import org.onap.champ.ChampRESTAPI;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.event.GraphEvent.GraphEventResult;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
+import org.onap.champ.event.envelope.GraphEventHeader;
import org.onap.champ.event.GraphEventEdge;
import org.onap.champ.event.GraphEventVertex;
import org.onap.champ.exception.ChampServiceException;
@@ -43,281 +44,283 @@ import org.onap.champ.service.ChampDataService;
import org.onap.champ.service.ChampThreadFactory;
import org.onap.champ.service.logging.ChampMsgs;
-import org.onap.aai.event.api.EventConsumer;
-
/**
- * This Class polls the Graph events from request topic perform the necessary
- * CRUD operation by calling champDAO and queues up the response to be consumed
- * by response handler.
+ * This Class polls the Graph events from request topic perform the necessary CRUD operation by calling champDAO and
+ * queues up the response to be consumed by response handler.
*/
public class ChampAsyncRequestProcessor extends TimerTask {
- private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
+ private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
- private ChampDataService champDataService;
+ private ChampDataService champDataService;
- /**
- * Number of events that can be queued up.
- */
- private Integer requestProcesserQueueSize;
+ /**
+ * Number of events that can be queued up.
+ */
+ private Integer requestProcesserQueueSize;
- /**
- * Number of event publisher worker threads.
- */
- private Integer requestProcesserPoolSize;
-
- /**
- * Number of event publisher worker threads.
- */
- private Integer requestPollingTimeSeconds;
+ /**
+ * Number of event publisher worker threads.
+ */
+ private Integer requestProcesserPoolSize;
- /**
- * Internal queue where outgoing events will be buffered until they can be
- * serviced by.
- **/
- private BlockingQueue<GraphEvent> requestProcesserEventQueue;
+ /**
+ * Number of event publisher worker threads.
+ */
+ private Integer requestPollingTimeSeconds;
- /**
- * Pool of worker threads that do the work of publishing the events to the
- * event bus.
- */
- private ThreadPoolExecutor requestProcesserPool;
+ /**
+ * Internal queue where outgoing events will be buffered until they can be serviced by.
+ **/
+ private BlockingQueue<GraphEventEnvelope> requestProcesserEventQueue;
- private ChampAsyncResponsePublisher champAsyncResponsePublisher;
+ /**
+ * Pool of worker threads that do the work of publishing the events to the event bus.
+ */
+ private ThreadPoolExecutor requestProcesserPool;
- private EventConsumer asyncRequestConsumer;
+ private ChampAsyncResponsePublisher champAsyncResponsePublisher;
- private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
+ private EventConsumer asyncRequestConsumer;
- private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
- private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
- private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
- Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
+ private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
- public ChampAsyncRequestProcessor(ChampDataService champDataService,
- ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
+ private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
+ private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
+ private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
+ Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
- this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
+ public ChampAsyncRequestProcessor(ChampDataService champDataService,
+ ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
- this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
+ this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
- this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
- requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
- requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
- new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+ this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
- for (int i = 0; i < requestProcesserPoolSize; i++) {
- requestProcesserPool.submit(new ChampProcessorWorker());
+ this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
+ requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
+ requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
+ new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+
+ for (int i = 0; i < requestProcesserPoolSize; i++) {
+ requestProcesserPool.submit(new ChampProcessorWorker());
+ }
+
+ this.champDataService = champDataService;
+ this.champAsyncResponsePublisher = champAsyncResponsePublisher;
+ this.asyncRequestConsumer = asyncRequestConsumer;
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
+ + asyncRequestConsumer.getClass().getName());
}
- this.champDataService = champDataService;
- this.champAsyncResponsePublisher = champAsyncResponsePublisher;
- this.asyncRequestConsumer = asyncRequestConsumer;
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
- + asyncRequestConsumer.getClass().getName());
- }
-
-
+ public ChampAsyncRequestProcessor(ChampDataService champDataService,
+ ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
+ Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
+
+ this.requestProcesserQueueSize = requestProcesserQueueSize;
- public ChampAsyncRequestProcessor(ChampDataService champDataService,
- ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
- Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
+ this.requestProcesserPoolSize = requestProcesserPoolSize;
- this.requestProcesserQueueSize = requestProcesserQueueSize;
+ this.requestPollingTimeSeconds = requestPollingTimeSeconds;
- this.requestProcesserPoolSize = requestProcesserPoolSize;
-
- this.requestPollingTimeSeconds = requestPollingTimeSeconds;
+ requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
+ requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
+ new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
- requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
- requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
- new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+ for (int i = 0; i < requestProcesserPoolSize; i++) {
+ requestProcesserPool.submit(new ChampProcessorWorker());
+ }
- for (int i = 0; i < requestProcesserPoolSize; i++) {
- requestProcesserPool.submit(new ChampProcessorWorker());
+ this.champDataService = champDataService;
+ this.champAsyncResponsePublisher = champAsyncResponsePublisher;
+ this.asyncRequestConsumer = asyncRequestConsumer;
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
+ + asyncRequestConsumer.getClass().getName());
}
- this.champDataService = champDataService;
- this.champAsyncResponsePublisher = champAsyncResponsePublisher;
- this.asyncRequestConsumer = asyncRequestConsumer;
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
- + asyncRequestConsumer.getClass().getName());
- }
+ private class ChampProcessorWorker implements Runnable {
+
+ @Override
+ public void run() {
+
+ while (true) {
+
+ GraphEventEnvelope eventEnvelope = null;
+ GraphEvent event = null;
+ try {
+ // Get the next event to be published from the queue.
+ eventEnvelope = requestProcesserEventQueue.take();
+ event = eventEnvelope.getBody();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
+
+ // Apply Champ Event header
+ eventEnvelope.setHeader(GraphEventHeader.builder().requestId(event.getTransactionId()).build());
+
+ // Parse the event and call champ Dao to process , Create the
+ // response event and put it on response queue
+ event.setResult(GraphEventResult.SUCCESS);
+
+ // Check if this request is part of an ongoing DB transaction
+ ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
+ if ((event.getDbTransactionId() != null) && (transaction == null)) {
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
+ event.setHttpErrorStatus(Status.BAD_REQUEST);
+ }
+
+ if (event.getResult() != GraphEventResult.FAILURE) {
+ try {
+ if (event.getVertex() != null) {
+
+ switch (event.getOperation()) {
+ case CREATE:
+ event.setVertex(GraphEventVertex.fromChampObject(
+ champDataService.storeObject(event.getVertex().toChampObject(),
+ Optional.ofNullable(transaction)),
+ event.getVertex().getModelVersion()));
+ break;
+
+ case UPDATE:
+ event.setVertex(GraphEventVertex.fromChampObject(
+ champDataService.replaceObject(event.getVertex().toChampObject(),
+ event.getVertex().getId(), Optional.ofNullable(transaction)),
+ event.getVertex().getModelVersion()));
+ break;
+ case DELETE:
+ champDataService.deleteObject(event.getVertex().getId(),
+ Optional.ofNullable(transaction));
+ break;
+ default:
+ // log error
+ }
+ } else if (event.getEdge() != null) {
+ switch (event.getOperation()) {
+ case CREATE:
+ event.setEdge(GraphEventEdge.fromChampRelationship(
+ champDataService.storeRelationship(event.getEdge().toChampRelationship(),
+ Optional.ofNullable(transaction)),
+ event.getEdge().getModelVersion()));
+ break;
+
+ case UPDATE:
+ event.setEdge(GraphEventEdge.fromChampRelationship(
+ champDataService.updateRelationship(event.getEdge().toChampRelationship(),
+ event.getEdge().getId(), Optional.ofNullable(transaction)),
+ event.getEdge().getModelVersion()));
+
+ break;
+ case DELETE:
+ champDataService.deleteRelationship(event.getEdge().getId(),
+ Optional.ofNullable(transaction));
+ break;
+ default:
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Invalid operation for event transactionId: " + event.getTransactionId());
+ }
+
+ } else {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Invalid payload for event transactionId: " + event.getTransactionId());
+ }
+ } catch (ChampServiceException champException) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage(champException.getMessage());
+ event.setHttpErrorStatus(champException.getHttpStatus());
+
+ } catch (Exception ex) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage(ex.getMessage());
+ event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ if (event.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult());
+ } else {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
+ + event.getErrorMessage());
+ }
+
+ champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
- private class ChampProcessorWorker implements Runnable {
+ }
+ }
+ }
@Override
public void run() {
- while (true) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
- GraphEvent event = null;
- try {
- // Get the next event to be published from the queue.
- event = requestProcesserEventQueue.take();
- } catch (InterruptedException e) {
- // Restore the interrupted status.
- Thread.currentThread().interrupt();
- }
-
- // Parse the event and call champ Dao to process , Create the
- // response event and put it on response queue
- event.setResult(GraphEventResult.SUCCESS);
-
- // Check if this request is part of an ongoing DB transaction
- ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
- if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
- event.setHttpErrorStatus(Status.BAD_REQUEST);
- }
-
- if (event.getResult() != GraphEventResult.FAILURE) {
- try {
- if (event.getVertex() != null) {
-
- switch (event.getOperation()) {
- case CREATE:
- event.setVertex(GraphEventVertex.fromChampObject(
- champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
- event.getVertex().getModelVersion()));
- break;
-
- case UPDATE:
- event.setVertex(GraphEventVertex.fromChampObject(
- champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
- event.getVertex().getModelVersion()));
- break;
- case DELETE:
- champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
- break;
- default:
- // log error
- }
- } else if (event.getEdge() != null) {
- switch (event.getOperation()) {
- case CREATE:
- event.setEdge(GraphEventEdge.fromChampRelationship(
- champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
- event.getEdge().getModelVersion()));
- break;
-
- case UPDATE:
- event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
- .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
- event.getEdge().getModelVersion()));
-
- break;
- case DELETE:
- champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
- break;
- default:
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
- "Invalid operation for event transactionId: " + event.getTransactionId());
- }
-
- } else {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
- "Invalid payload for event transactionId: " + event.getTransactionId());
- }
- } catch (ChampServiceException champException) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage(champException.getMessage());
- event.setHttpErrorStatus(champException.getHttpStatus());
-
- } catch (Exception ex) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage(ex.getMessage());
- event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
- }
+ if (asyncRequestConsumer == null) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Unable to initialize ChampAsyncRequestProcessor");
}
- if (event.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult());
- } else {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
- + event.getErrorMessage());
+ Iterable<String> events = null;
+ try {
+ events = asyncRequestConsumer.consume();
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ return;
}
- champAsyncResponsePublisher.publishResponseEvent(event);
-
- }
- }
- }
-
- @Override
- public void run() {
-
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
+ if (events == null || !events.iterator().hasNext()) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
- if (asyncRequestConsumer == null) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
- }
-
- Iterable<String> events = null;
- try {
- events = asyncRequestConsumer.consume();
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
- return;
- }
-
- if (events == null || !events.iterator().hasNext()) {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
+ }
- }
+ for (String event : events) {
+ try {
+ GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
+ GraphEvent requestEvent = requestEnvelope.getBody();
+ auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event received of type: " + requestEvent.getObjectType() + " with key: "
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event received of type: " + requestEvent.getObjectType() + " with key: "
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
+
+ // Try to submit the event to be published to the event bus.
+ if (!requestProcesserEventQueue.offer(requestEnvelope)) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+ }
+
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ }
+ }
- for (String event : events) {
- try {
- GraphEvent requestEvent = GraphEvent.fromJson(event);
- auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
- + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
- + requestEvent.getOperation().toString());
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
- + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
- + requestEvent.getOperation().toString());
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
-
- // Try to submit the event to be published to the event bus.
- if (!requestProcesserEventQueue.offer(requestEvent)) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
- "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+ try {
+ asyncRequestConsumer.commitOffsets();
+ } catch (OperationNotSupportedException e) {
+ // Dmaap doesnt support commit with offset
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
}
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
- }
}
- try {
- asyncRequestConsumer.commitOffsets();
- } catch(OperationNotSupportedException e) {
- //Dmaap doesnt support commit with offset
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
- }
- catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ public Integer getRequestPollingTimeSeconds() {
+ return requestPollingTimeSeconds;
}
- }
-
-
-
- public Integer getRequestPollingTimeSeconds() {
- return requestPollingTimeSeconds;
- }
-
-
}
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java
index a9560b0..c3f3859 100644
--- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java
+++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java
@@ -27,13 +27,13 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.event.GraphEvent.GraphEventResult;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
import org.onap.champ.service.ChampThreadFactory;
import org.onap.champ.service.logging.ChampMsgs;
-import org.onap.aai.event.api.EventPublisher;
-
public class ChampAsyncResponsePublisher {
private EventPublisher asyncResponsePublisher;
@@ -51,7 +51,7 @@ public class ChampAsyncResponsePublisher {
/**
* Internal queue where outgoing events will be buffered.
**/
- private BlockingQueue<GraphEvent> responsePublisherEventQueue;
+ private BlockingQueue<GraphEventEnvelope> responsePublisherEventQueue;
/**
* Pool of worker threads that do the work of publishing the events to the
@@ -72,7 +72,7 @@ public class ChampAsyncResponsePublisher {
this.responsePublisherPoolSize = responsePublisherPoolSize;
- responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
+ responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
@@ -91,7 +91,7 @@ public class ChampAsyncResponsePublisher {
responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
- responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
+ responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
@@ -105,8 +105,8 @@ public class ChampAsyncResponsePublisher {
+ asyncResponsePublisher.getClass().getName());
}
- public void publishResponseEvent(GraphEvent event) {
- responsePublisherEventQueue.offer(event);
+ public void publishResponseEvent(GraphEventEnvelope eventEnvelope) {
+ responsePublisherEventQueue.offer(eventEnvelope);
}
@@ -116,23 +116,20 @@ public class ChampAsyncResponsePublisher {
public void run() {
while (true) {
-
+ GraphEventEnvelope eventEnvelope = null;
GraphEvent event = null;
try {
-
// Get the next event to be published from the queue.
- event = responsePublisherEventQueue.take();
-
+ eventEnvelope = responsePublisherEventQueue.take();
+ event = eventEnvelope.getBody();
} catch (InterruptedException e) {
-
// Restore the interrupted status.
Thread.currentThread().interrupt();
}
// Publish the response
-
try {
event.setTimestamp(System.currentTimeMillis());
- asyncResponsePublisher.sendSync(event.toJson());
+ asyncResponsePublisher.sendSync(eventEnvelope.toJson());
if (event.getResult().equals(GraphEventResult.SUCCESS)) {
logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
"Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
new file mode 100644
index 0000000..7958a3a
--- /dev/null
+++ b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java
@@ -0,0 +1,98 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.champ.event.envelope;
+
+import javax.ws.rs.core.Response.Status;
+import org.onap.champ.event.GraphEvent;
+import org.onap.champ.exception.ChampServiceException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class GraphEventEnvelope {
+
+ private GraphEventHeader header;
+ private GraphEvent body;
+
+ /**
+ * Serializer/deserializer for converting to/from JSON.
+ */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public GraphEventEnvelope(GraphEvent event) {
+ this.header = new GraphEventHeader.Builder().requestId(event.getTransactionId()).build();
+ this.body = event;
+ }
+
+ public GraphEventEnvelope(GraphEventHeader header, GraphEvent body) {
+ this.header = header;
+ this.body = body;
+ }
+
+ public GraphEventHeader getHeader() {
+ return header;
+ }
+
+ public void setHeader(GraphEventHeader header) {
+ this.header = header;
+ }
+
+ public GraphEvent getBody() {
+ return body;
+ }
+
+ public void setBody(GraphEvent body) {
+ this.body = body;
+ }
+
+ /**
+ * Serializes this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Deserializes the provided JSON string into a Event Envelope object.
+ *
+ * @param json the JSON string to produce the Event Envelope from.
+ * @return an Event Envelope object.
+ * @throws ChampServiceException
+ */
+ public static GraphEventEnvelope fromJson(String json) throws ChampServiceException {
+ try {
+ if (json == null || json.isEmpty()) {
+ throw new ChampServiceException("Empty or null JSON string.", Status.BAD_REQUEST);
+ }
+ return gson.fromJson(json, GraphEventEnvelope.class);
+ } catch (Exception ex) {
+ throw new ChampServiceException("Unable to parse JSON string: ", Status.BAD_REQUEST);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+
+}
diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java
new file mode 100644
index 0000000..59e01ea
--- /dev/null
+++ b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java
@@ -0,0 +1,228 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.champ.event.envelope;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+
+public class GraphEventHeader {
+
+ private static final String SOURCE_NAME = "CHAMP";
+
+ private static final String EVENT_TYPE = "db-update-result";
+
+ @SerializedName("request-id")
+ private String requestId;
+
+ private String timestamp;
+
+ @SerializedName("source-name")
+ private String sourceName;
+
+ @SerializedName("event-type")
+ private String eventType;
+
+ @SerializedName("validation-entity-type")
+ private String validationEntityType;
+
+ @SerializedName("validation-top-entity-type")
+ private String validationTopEntityType;
+
+ @SerializedName("entity-link")
+ private String entityLink;
+
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String requestId;
+ private String validationEntityType;
+ private String validationTopEntityType;
+ private String entityLink;
+
+ public Builder requestId(String val) {
+ requestId = val;
+ return this;
+ }
+
+ public Builder validationEntityType(String val) {
+ validationEntityType = val;
+ return this;
+ }
+
+ public Builder validationTopEntityType(String val) {
+ validationTopEntityType = val;
+ return this;
+ }
+
+ public Builder entityLink(String val) {
+ entityLink = val;
+ return this;
+ }
+
+ public GraphEventHeader build() {
+ return new GraphEventHeader(this);
+ }
+ }
+
+ private GraphEventHeader(Builder builder) {
+ requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString();
+ timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now());
+ sourceName = SOURCE_NAME;
+ eventType = EVENT_TYPE;
+
+ validationEntityType = builder.validationEntityType;
+ validationTopEntityType = builder.validationTopEntityType;
+ entityLink = builder.entityLink;
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // GETTERS AND SETTERS
+ ///////////////////////////////////////////////////////////////////////////
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(String eventType) {
+ this.eventType = eventType;
+ }
+
+ public String getValidationEntityType() {
+ return validationEntityType;
+ }
+
+ public void setValidationEntityType(String validationEntityType) {
+ this.validationEntityType = validationEntityType;
+ }
+
+ public String getValidationTopEntityType() {
+ return validationTopEntityType;
+ }
+
+ public void setValidationTopEntityType(String validationTopEntityType) {
+ this.validationTopEntityType = validationTopEntityType;
+ }
+
+ public String getEntityLink() {
+ return entityLink;
+ }
+
+ public void setEntityLink(String entityLink) {
+ this.entityLink = entityLink;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // OVERRIDES
+ ///////////////////////////////////////////////////////////////////////////
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType,
+ this.validationTopEntityType, this.entityLink);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof GraphEventHeader)) {
+ return false;
+ } else if (obj == this) {
+ return true;
+ }
+ GraphEventHeader rhs = (GraphEventHeader) obj;
+ // @formatter:off
+ return new EqualsBuilder()
+ .append(requestId, rhs.requestId)
+ .append(timestamp, rhs.timestamp)
+ .append(sourceName, rhs.sourceName)
+ .append(eventType, rhs.sourceName)
+ .append(validationEntityType, rhs.validationEntityType)
+ .append(validationTopEntityType, rhs.validationTopEntityType)
+ .append(entityLink, rhs.entityLink)
+ .isEquals();
+ // @formatter:on
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+}
diff --git a/champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java b/champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java
new file mode 100644
index 0000000..5c39f99
--- /dev/null
+++ b/champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java
@@ -0,0 +1,42 @@
+package org.onap.champ.event;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.onap.aai.champcore.model.ChampObject;
+import org.onap.champ.event.GraphEvent.GraphEventOperation;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
+import org.onap.champ.util.TestUtil;
+import org.skyscreamer.jsonassert.Customization;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.skyscreamer.jsonassert.JSONCompareMode;
+import org.skyscreamer.jsonassert.comparator.CustomComparator;
+
+public class GraphEventEnvelopeTest {
+
+ @Test
+ public void testEventEnvelopeFormat() throws Exception {
+ String expectedEnvelope = TestUtil.getFileAsString("event/event-envelope.json");
+
+ GraphEvent body = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromChampObject(new ChampObject.Builder("pserver").build(), "v13")).build();
+
+ String graphEventEnvelope = new GraphEventEnvelope(body).toJson();
+
+ JSONAssert.assertEquals(expectedEnvelope, graphEventEnvelope,
+ new CustomComparator(JSONCompareMode.STRICT, new Customization("header.request-id", (o1, o2) -> true),
+ new Customization("header.timestamp", (o1, o2) -> true),
+ new Customization("body.timestamp", (o1, o2) -> true),
+ new Customization("body.transaction-id", (o1, o2) -> true)));
+ }
+
+ @Test
+ public void testRequestIdIsTransactionId() throws Exception {
+ GraphEvent body = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromChampObject(new ChampObject.Builder("pserver").build(), "v13")).build();
+
+ GraphEventEnvelope envelope = new GraphEventEnvelope(body);
+
+ assertThat(envelope.getHeader().getRequestId(), is(envelope.getBody().getTransactionId()));
+ }
+}
diff --git a/champ-service/src/test/java/org/onap/champ/util/TestUtil.java b/champ-service/src/test/java/org/onap/champ/util/TestUtil.java
new file mode 100644
index 0000000..b9924e4
--- /dev/null
+++ b/champ-service/src/test/java/org/onap/champ/util/TestUtil.java
@@ -0,0 +1,34 @@
+package org.onap.champ.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class TestUtil {
+
+ public static Path getPath(String resourceFilename) throws URISyntaxException {
+ URL resource = ClassLoader.getSystemResource(resourceFilename);
+ if (resource != null) {
+ return Paths.get(resource.toURI());
+ }
+
+ // If the resource is not found relative to the classpath, try to get it from the file system directly.
+ File file = new File(resourceFilename);
+ if (!file.exists()) {
+ throw new RuntimeException("Resource does not exist: " + resourceFilename);
+ }
+ return file.toPath();
+ }
+
+ public static String getContentUtf8(Path filePath) throws IOException {
+ return new String(Files.readAllBytes(filePath));
+ }
+
+ public static String getFileAsString(String resourceFilename) throws IOException, URISyntaxException {
+ return getContentUtf8(getPath(resourceFilename));
+ }
+} \ No newline at end of file
diff --git a/champ-service/src/test/resources/event/event-envelope.json b/champ-service/src/test/resources/event/event-envelope.json
new file mode 100644
index 0000000..68888c0
--- /dev/null
+++ b/champ-service/src/test/resources/event/event-envelope.json
@@ -0,0 +1,19 @@
+{
+ "header": {
+ "request-id": "2253f351-d9b6-4638-9fe3-2c194bee1b29",
+ "timestamp": "20180316T092301Z",
+ "source-name": "CHAMP",
+ "event-type": "db-update-result"
+ },
+ "body": {
+ "operation": "CREATE",
+ "transaction-id": "2253f351-d9b6-4638-9fe3-2c194bee1b29",
+ "timestamp": 1521198075620,
+ "vertex": {
+ "key": "",
+ "schema-version": "v13",
+ "type": "pserver",
+ "properties": {}
+ }
+ }
+} \ No newline at end of file