diff options
author | Michael Arrastia <MArrasti@amdocs.com> | 2018-03-28 19:07:34 +0100 |
---|---|---|
committer | Michael Arrastia <MArrasti@amdocs.com> | 2018-03-28 19:07:34 +0100 |
commit | 282a3420f6a8e2174034fcfa98b5a3ece28023a7 (patch) | |
tree | 558d1d3d559acf291e325769e987ce82ce44a117 /champ-service/src | |
parent | 533b090aa92f5eaa6c674fd63940fcacf4dc811e (diff) |
Update to consume and publish events in new format
The new format includes:
- the graph request/response encapsulated in a body property
- new event header with details such as timestamp, request-id,
event-type
Issue-ID: AAI-960
Change-Id: Ib84ddd54352ca95c3968d2d2936f6348951c2d2c
Signed-off-by: Michael Arrastia <MArrasti@amdocs.com>
Diffstat (limited to 'champ-service/src')
7 files changed, 668 insertions, 247 deletions
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java index 610cac9..334871e 100644 --- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java +++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java @@ -26,16 +26,17 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; - import javax.naming.OperationNotSupportedException; import javax.ws.rs.core.Response.Status; - import org.onap.aai.champcore.ChampTransaction; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; import org.onap.champ.ChampRESTAPI; import org.onap.champ.event.GraphEvent; import org.onap.champ.event.GraphEvent.GraphEventResult; +import org.onap.champ.event.envelope.GraphEventEnvelope; +import org.onap.champ.event.envelope.GraphEventHeader; import org.onap.champ.event.GraphEventEdge; import org.onap.champ.event.GraphEventVertex; import org.onap.champ.exception.ChampServiceException; @@ -43,281 +44,283 @@ import org.onap.champ.service.ChampDataService; import org.onap.champ.service.ChampThreadFactory; import org.onap.champ.service.logging.ChampMsgs; -import org.onap.aai.event.api.EventConsumer; - /** - * This Class polls the Graph events from request topic perform the necessary - * CRUD operation by calling champDAO and queues up the response to be consumed - * by response handler. + * This Class polls the Graph events from request topic perform the necessary CRUD operation by calling champDAO and + * queues up the response to be consumed by response handler. */ public class ChampAsyncRequestProcessor extends TimerTask { - private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class); + private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class); - private ChampDataService champDataService; + private ChampDataService champDataService; - /** - * Number of events that can be queued up. - */ - private Integer requestProcesserQueueSize; + /** + * Number of events that can be queued up. + */ + private Integer requestProcesserQueueSize; - /** - * Number of event publisher worker threads. - */ - private Integer requestProcesserPoolSize; - - /** - * Number of event publisher worker threads. - */ - private Integer requestPollingTimeSeconds; + /** + * Number of event publisher worker threads. + */ + private Integer requestProcesserPoolSize; - /** - * Internal queue where outgoing events will be buffered until they can be - * serviced by. - **/ - private BlockingQueue<GraphEvent> requestProcesserEventQueue; + /** + * Number of event publisher worker threads. + */ + private Integer requestPollingTimeSeconds; - /** - * Pool of worker threads that do the work of publishing the events to the - * event bus. - */ - private ThreadPoolExecutor requestProcesserPool; + /** + * Internal queue where outgoing events will be buffered until they can be serviced by. + **/ + private BlockingQueue<GraphEventEnvelope> requestProcesserEventQueue; - private ChampAsyncResponsePublisher champAsyncResponsePublisher; + /** + * Pool of worker threads that do the work of publishing the events to the event bus. + */ + private ThreadPoolExecutor requestProcesserPool; - private EventConsumer asyncRequestConsumer; + private ChampAsyncResponsePublisher champAsyncResponsePublisher; - private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000; + private EventConsumer asyncRequestConsumer; - private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10; - private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000; - private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor"; - Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName()); + private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000; - public ChampAsyncRequestProcessor(ChampDataService champDataService, - ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) { + private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10; + private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000; + private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor"; + Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName()); - this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY; + public ChampAsyncRequestProcessor(ChampDataService champDataService, + ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) { - this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE; + this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY; - this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND; - requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize); - requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize, - new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME)); + this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE; - for (int i = 0; i < requestProcesserPoolSize; i++) { - requestProcesserPool.submit(new ChampProcessorWorker()); + this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND; + requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize); + requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize, + new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME)); + + for (int i = 0; i < requestProcesserPoolSize; i++) { + requestProcesserPool.submit(new ChampProcessorWorker()); + } + + this.champDataService = champDataService; + this.champAsyncResponsePublisher = champAsyncResponsePublisher; + this.asyncRequestConsumer = asyncRequestConsumer; + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer " + + asyncRequestConsumer.getClass().getName()); } - this.champDataService = champDataService; - this.champAsyncResponsePublisher = champAsyncResponsePublisher; - this.asyncRequestConsumer = asyncRequestConsumer; - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, - "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer " - + asyncRequestConsumer.getClass().getName()); - } - - + public ChampAsyncRequestProcessor(ChampDataService champDataService, + ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer, + Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) { + + this.requestProcesserQueueSize = requestProcesserQueueSize; - public ChampAsyncRequestProcessor(ChampDataService champDataService, - ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer, - Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) { + this.requestProcesserPoolSize = requestProcesserPoolSize; - this.requestProcesserQueueSize = requestProcesserQueueSize; + this.requestPollingTimeSeconds = requestPollingTimeSeconds; - this.requestProcesserPoolSize = requestProcesserPoolSize; - - this.requestPollingTimeSeconds = requestPollingTimeSeconds; + requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize); + requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize, + new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME)); - requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize); - requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize, - new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME)); + for (int i = 0; i < requestProcesserPoolSize; i++) { + requestProcesserPool.submit(new ChampProcessorWorker()); + } - for (int i = 0; i < requestProcesserPoolSize; i++) { - requestProcesserPool.submit(new ChampProcessorWorker()); + this.champDataService = champDataService; + this.champAsyncResponsePublisher = champAsyncResponsePublisher; + this.asyncRequestConsumer = asyncRequestConsumer; + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer " + + asyncRequestConsumer.getClass().getName()); } - this.champDataService = champDataService; - this.champAsyncResponsePublisher = champAsyncResponsePublisher; - this.asyncRequestConsumer = asyncRequestConsumer; - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, - "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer " - + asyncRequestConsumer.getClass().getName()); - } + private class ChampProcessorWorker implements Runnable { + + @Override + public void run() { + + while (true) { + + GraphEventEnvelope eventEnvelope = null; + GraphEvent event = null; + try { + // Get the next event to be published from the queue. + eventEnvelope = requestProcesserEventQueue.take(); + event = eventEnvelope.getBody(); + } catch (InterruptedException e) { + // Restore the interrupted status. + Thread.currentThread().interrupt(); + } + + // Apply Champ Event header + eventEnvelope.setHeader(GraphEventHeader.builder().requestId(event.getTransactionId()).build()); + + // Parse the event and call champ Dao to process , Create the + // response event and put it on response queue + event.setResult(GraphEventResult.SUCCESS); + + // Check if this request is part of an ongoing DB transaction + ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId()); + if ((event.getDbTransactionId() != null) && (transaction == null)) { + event.setResult(GraphEventResult.FAILURE); + event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found"); + event.setHttpErrorStatus(Status.BAD_REQUEST); + } + + if (event.getResult() != GraphEventResult.FAILURE) { + try { + if (event.getVertex() != null) { + + switch (event.getOperation()) { + case CREATE: + event.setVertex(GraphEventVertex.fromChampObject( + champDataService.storeObject(event.getVertex().toChampObject(), + Optional.ofNullable(transaction)), + event.getVertex().getModelVersion())); + break; + + case UPDATE: + event.setVertex(GraphEventVertex.fromChampObject( + champDataService.replaceObject(event.getVertex().toChampObject(), + event.getVertex().getId(), Optional.ofNullable(transaction)), + event.getVertex().getModelVersion())); + break; + case DELETE: + champDataService.deleteObject(event.getVertex().getId(), + Optional.ofNullable(transaction)); + break; + default: + // log error + } + } else if (event.getEdge() != null) { + switch (event.getOperation()) { + case CREATE: + event.setEdge(GraphEventEdge.fromChampRelationship( + champDataService.storeRelationship(event.getEdge().toChampRelationship(), + Optional.ofNullable(transaction)), + event.getEdge().getModelVersion())); + break; + + case UPDATE: + event.setEdge(GraphEventEdge.fromChampRelationship( + champDataService.updateRelationship(event.getEdge().toChampRelationship(), + event.getEdge().getId(), Optional.ofNullable(transaction)), + event.getEdge().getModelVersion())); + + break; + case DELETE: + champDataService.deleteRelationship(event.getEdge().getId(), + Optional.ofNullable(transaction)); + break; + default: + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + "Invalid operation for event transactionId: " + event.getTransactionId()); + } + + } else { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + "Invalid payload for event transactionId: " + event.getTransactionId()); + } + } catch (ChampServiceException champException) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage()); + event.setResult(GraphEventResult.FAILURE); + event.setErrorMessage(champException.getMessage()); + event.setHttpErrorStatus(champException.getHttpStatus()); + + } catch (Exception ex) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage()); + event.setResult(GraphEventResult.FAILURE); + event.setErrorMessage(ex.getMessage()); + event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR); + } + } + + if (event.getResult().equals(GraphEventResult.SUCCESS)) { + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey() + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString() + " , result: " + event.getResult()); + } else { + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey() + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString() + " , result: " + event.getResult() + " , error: " + + event.getErrorMessage()); + } + + champAsyncResponsePublisher.publishResponseEvent(eventEnvelope); - private class ChampProcessorWorker implements Runnable { + } + } + } @Override public void run() { - while (true) { + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events"); - GraphEvent event = null; - try { - // Get the next event to be published from the queue. - event = requestProcesserEventQueue.take(); - } catch (InterruptedException e) { - // Restore the interrupted status. - Thread.currentThread().interrupt(); - } - - // Parse the event and call champ Dao to process , Create the - // response event and put it on response queue - event.setResult(GraphEventResult.SUCCESS); - - // Check if this request is part of an ongoing DB transaction - ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId()); - if ( (event.getDbTransactionId() != null) && (transaction == null) ) { - event.setResult(GraphEventResult.FAILURE); - event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found"); - event.setHttpErrorStatus(Status.BAD_REQUEST); - } - - if (event.getResult() != GraphEventResult.FAILURE) { - try { - if (event.getVertex() != null) { - - switch (event.getOperation()) { - case CREATE: - event.setVertex(GraphEventVertex.fromChampObject( - champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)), - event.getVertex().getModelVersion())); - break; - - case UPDATE: - event.setVertex(GraphEventVertex.fromChampObject( - champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)), - event.getVertex().getModelVersion())); - break; - case DELETE: - champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction)); - break; - default: - // log error - } - } else if (event.getEdge() != null) { - switch (event.getOperation()) { - case CREATE: - event.setEdge(GraphEventEdge.fromChampRelationship( - champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)), - event.getEdge().getModelVersion())); - break; - - case UPDATE: - event.setEdge(GraphEventEdge.fromChampRelationship(champDataService - .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)), - event.getEdge().getModelVersion())); - - break; - case DELETE: - champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction)); - break; - default: - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, - "Invalid operation for event transactionId: " + event.getTransactionId()); - } - - } else { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, - "Invalid payload for event transactionId: " + event.getTransactionId()); - } - } catch (ChampServiceException champException) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage()); - event.setResult(GraphEventResult.FAILURE); - event.setErrorMessage(champException.getMessage()); - event.setHttpErrorStatus(champException.getHttpStatus()); - - } catch (Exception ex) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage()); - event.setResult(GraphEventResult.FAILURE); - event.setErrorMessage(ex.getMessage()); - event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR); - } + if (asyncRequestConsumer == null) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + "Unable to initialize ChampAsyncRequestProcessor"); } - if (event.getResult().equals(GraphEventResult.SUCCESS)) { - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, - "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString() + " , result: " + event.getResult()); - } else { - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, - "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString() + " , result: " + event.getResult() + " , error: " - + event.getErrorMessage()); + Iterable<String> events = null; + try { + events = asyncRequestConsumer.consume(); + } catch (Exception e) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage()); + return; } - champAsyncResponsePublisher.publishResponseEvent(event); - - } - } - } - - @Override - public void run() { - - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events"); + if (events == null || !events.iterator().hasNext()) { + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved"); - if (asyncRequestConsumer == null) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor"); - } - - Iterable<String> events = null; - try { - events = asyncRequestConsumer.consume(); - } catch (Exception e) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage()); - return; - } - - if (events == null || !events.iterator().hasNext()) { - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved"); + } - } + for (String event : events) { + try { + GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event); + GraphEvent requestEvent = requestEnvelope.getBody(); + auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + "Event received of type: " + requestEvent.getObjectType() + " with key: " + + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId() + + " , operation: " + requestEvent.getOperation().toString()); + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + "Event received of type: " + requestEvent.getObjectType() + " with key: " + + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId() + + " , operation: " + requestEvent.getOperation().toString()); + logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event); + + // Try to submit the event to be published to the event bus. + if (!requestProcesserEventQueue.offer(requestEnvelope)) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + "Event could not be published to the event bus due to: Internal buffer capacity exceeded."); + } + + } catch (Exception e) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage()); + } + } - for (String event : events) { - try { - GraphEvent requestEvent = GraphEvent.fromJson(event); - auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, - "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey() - + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: " - + requestEvent.getOperation().toString()); - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, - "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey() - + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: " - + requestEvent.getOperation().toString()); - logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event); - - // Try to submit the event to be published to the event bus. - if (!requestProcesserEventQueue.offer(requestEvent)) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, - "Event could not be published to the event bus due to: Internal buffer capacity exceeded."); + try { + asyncRequestConsumer.commitOffsets(); + } catch (OperationNotSupportedException e) { + // Dmaap doesnt support commit with offset + logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage()); + } catch (Exception e) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage()); } - } catch (Exception e) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage()); - } } - try { - asyncRequestConsumer.commitOffsets(); - } catch(OperationNotSupportedException e) { - //Dmaap doesnt support commit with offset - logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage()); - } - catch (Exception e) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage()); + public Integer getRequestPollingTimeSeconds() { + return requestPollingTimeSeconds; } - } - - - - public Integer getRequestPollingTimeSeconds() { - return requestPollingTimeSeconds; - } - - } diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java index a9560b0..c3f3859 100644 --- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java +++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java @@ -27,13 +27,13 @@ import java.util.concurrent.ThreadPoolExecutor; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventPublisher; import org.onap.champ.event.GraphEvent; import org.onap.champ.event.GraphEvent.GraphEventResult; +import org.onap.champ.event.envelope.GraphEventEnvelope; import org.onap.champ.service.ChampThreadFactory; import org.onap.champ.service.logging.ChampMsgs; -import org.onap.aai.event.api.EventPublisher; - public class ChampAsyncResponsePublisher { private EventPublisher asyncResponsePublisher; @@ -51,7 +51,7 @@ public class ChampAsyncResponsePublisher { /** * Internal queue where outgoing events will be buffered. **/ - private BlockingQueue<GraphEvent> responsePublisherEventQueue; + private BlockingQueue<GraphEventEnvelope> responsePublisherEventQueue; /** * Pool of worker threads that do the work of publishing the events to the @@ -72,7 +72,7 @@ public class ChampAsyncResponsePublisher { this.responsePublisherPoolSize = responsePublisherPoolSize; - responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize); + responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize); responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize, new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME)); @@ -91,7 +91,7 @@ public class ChampAsyncResponsePublisher { responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE; - responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize); + responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize); responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize, new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME)); @@ -105,8 +105,8 @@ public class ChampAsyncResponsePublisher { + asyncResponsePublisher.getClass().getName()); } - public void publishResponseEvent(GraphEvent event) { - responsePublisherEventQueue.offer(event); + public void publishResponseEvent(GraphEventEnvelope eventEnvelope) { + responsePublisherEventQueue.offer(eventEnvelope); } @@ -116,23 +116,20 @@ public class ChampAsyncResponsePublisher { public void run() { while (true) { - + GraphEventEnvelope eventEnvelope = null; GraphEvent event = null; try { - // Get the next event to be published from the queue. - event = responsePublisherEventQueue.take(); - + eventEnvelope = responsePublisherEventQueue.take(); + event = eventEnvelope.getBody(); } catch (InterruptedException e) { - // Restore the interrupted status. Thread.currentThread().interrupt(); } // Publish the response - try { event.setTimestamp(System.currentTimeMillis()); - asyncResponsePublisher.sendSync(event.toJson()); + asyncResponsePublisher.sendSync(eventEnvelope.toJson()); if (event.getResult().equals(GraphEventResult.SUCCESS)) { logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO, "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey() diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java new file mode 100644 index 0000000..7958a3a --- /dev/null +++ b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventEnvelope.java @@ -0,0 +1,98 @@ +/** + * ============LICENSE_START========================================== + * org.onap.aai + * =================================================================== + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 Amdocs + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.champ.event.envelope; + +import javax.ws.rs.core.Response.Status; +import org.onap.champ.event.GraphEvent; +import org.onap.champ.exception.ChampServiceException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class GraphEventEnvelope { + + private GraphEventHeader header; + private GraphEvent body; + + /** + * Serializer/deserializer for converting to/from JSON. + */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public GraphEventEnvelope(GraphEvent event) { + this.header = new GraphEventHeader.Builder().requestId(event.getTransactionId()).build(); + this.body = event; + } + + public GraphEventEnvelope(GraphEventHeader header, GraphEvent body) { + this.header = header; + this.body = body; + } + + public GraphEventHeader getHeader() { + return header; + } + + public void setHeader(GraphEventHeader header) { + this.header = header; + } + + public GraphEvent getBody() { + return body; + } + + public void setBody(GraphEvent body) { + this.body = body; + } + + /** + * Serializes this Vertex object into a JSON string. + * + * @return - A JSON format string representation of this Vertex. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Deserializes the provided JSON string into a Event Envelope object. + * + * @param json the JSON string to produce the Event Envelope from. + * @return an Event Envelope object. + * @throws ChampServiceException + */ + public static GraphEventEnvelope fromJson(String json) throws ChampServiceException { + try { + if (json == null || json.isEmpty()) { + throw new ChampServiceException("Empty or null JSON string.", Status.BAD_REQUEST); + } + return gson.fromJson(json, GraphEventEnvelope.class); + } catch (Exception ex) { + throw new ChampServiceException("Unable to parse JSON string: ", Status.BAD_REQUEST); + } + } + + @Override + public String toString() { + return toJson(); + } + +} diff --git a/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java new file mode 100644 index 0000000..59e01ea --- /dev/null +++ b/champ-service/src/main/java/org/onap/champ/event/envelope/GraphEventHeader.java @@ -0,0 +1,228 @@ +/** + * ============LICENSE_START========================================== + * org.onap.aai + * =================================================================== + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 Amdocs + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.champ.event.envelope; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Objects; +import java.util.UUID; +import org.apache.commons.lang3.builder.EqualsBuilder; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; + +public class GraphEventHeader { + + private static final String SOURCE_NAME = "CHAMP"; + + private static final String EVENT_TYPE = "db-update-result"; + + @SerializedName("request-id") + private String requestId; + + private String timestamp; + + @SerializedName("source-name") + private String sourceName; + + @SerializedName("event-type") + private String eventType; + + @SerializedName("validation-entity-type") + private String validationEntityType; + + @SerializedName("validation-top-entity-type") + private String validationTopEntityType; + + @SerializedName("entity-link") + private String entityLink; + + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private String requestId; + private String validationEntityType; + private String validationTopEntityType; + private String entityLink; + + public Builder requestId(String val) { + requestId = val; + return this; + } + + public Builder validationEntityType(String val) { + validationEntityType = val; + return this; + } + + public Builder validationTopEntityType(String val) { + validationTopEntityType = val; + return this; + } + + public Builder entityLink(String val) { + entityLink = val; + return this; + } + + public GraphEventHeader build() { + return new GraphEventHeader(this); + } + } + + private GraphEventHeader(Builder builder) { + requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString(); + timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now()); + sourceName = SOURCE_NAME; + eventType = EVENT_TYPE; + + validationEntityType = builder.validationEntityType; + validationTopEntityType = builder.validationTopEntityType; + entityLink = builder.entityLink; + } + + /** + * Serializes this object into a JSON string representation. + * + * @return a JSON format string representation of this object. + */ + public String toJson() { + return gson.toJson(this); + } + + /////////////////////////////////////////////////////////////////////////// + // GETTERS AND SETTERS + /////////////////////////////////////////////////////////////////////////// + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getValidationEntityType() { + return validationEntityType; + } + + public void setValidationEntityType(String validationEntityType) { + this.validationEntityType = validationEntityType; + } + + public String getValidationTopEntityType() { + return validationTopEntityType; + } + + public void setValidationTopEntityType(String validationTopEntityType) { + this.validationTopEntityType = validationTopEntityType; + } + + public String getEntityLink() { + return entityLink; + } + + public void setEntityLink(String entityLink) { + this.entityLink = entityLink; + } + + /////////////////////////////////////////////////////////////////////////// + // OVERRIDES + /////////////////////////////////////////////////////////////////////////// + + /* + * (non-Javadoc) + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType, + this.validationTopEntityType, this.entityLink); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GraphEventHeader)) { + return false; + } else if (obj == this) { + return true; + } + GraphEventHeader rhs = (GraphEventHeader) obj; + // @formatter:off + return new EqualsBuilder() + .append(requestId, rhs.requestId) + .append(timestamp, rhs.timestamp) + .append(sourceName, rhs.sourceName) + .append(eventType, rhs.sourceName) + .append(validationEntityType, rhs.validationEntityType) + .append(validationTopEntityType, rhs.validationTopEntityType) + .append(entityLink, rhs.entityLink) + .isEquals(); + // @formatter:on + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.toJson(); + } +} diff --git a/champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java b/champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java new file mode 100644 index 0000000..5c39f99 --- /dev/null +++ b/champ-service/src/test/java/org/onap/champ/event/GraphEventEnvelopeTest.java @@ -0,0 +1,42 @@ +package org.onap.champ.event; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Test; +import org.onap.aai.champcore.model.ChampObject; +import org.onap.champ.event.GraphEvent.GraphEventOperation; +import org.onap.champ.event.envelope.GraphEventEnvelope; +import org.onap.champ.util.TestUtil; +import org.skyscreamer.jsonassert.Customization; +import org.skyscreamer.jsonassert.JSONAssert; +import org.skyscreamer.jsonassert.JSONCompareMode; +import org.skyscreamer.jsonassert.comparator.CustomComparator; + +public class GraphEventEnvelopeTest { + + @Test + public void testEventEnvelopeFormat() throws Exception { + String expectedEnvelope = TestUtil.getFileAsString("event/event-envelope.json"); + + GraphEvent body = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromChampObject(new ChampObject.Builder("pserver").build(), "v13")).build(); + + String graphEventEnvelope = new GraphEventEnvelope(body).toJson(); + + JSONAssert.assertEquals(expectedEnvelope, graphEventEnvelope, + new CustomComparator(JSONCompareMode.STRICT, new Customization("header.request-id", (o1, o2) -> true), + new Customization("header.timestamp", (o1, o2) -> true), + new Customization("body.timestamp", (o1, o2) -> true), + new Customization("body.transaction-id", (o1, o2) -> true))); + } + + @Test + public void testRequestIdIsTransactionId() throws Exception { + GraphEvent body = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromChampObject(new ChampObject.Builder("pserver").build(), "v13")).build(); + + GraphEventEnvelope envelope = new GraphEventEnvelope(body); + + assertThat(envelope.getHeader().getRequestId(), is(envelope.getBody().getTransactionId())); + } +} diff --git a/champ-service/src/test/java/org/onap/champ/util/TestUtil.java b/champ-service/src/test/java/org/onap/champ/util/TestUtil.java new file mode 100644 index 0000000..b9924e4 --- /dev/null +++ b/champ-service/src/test/java/org/onap/champ/util/TestUtil.java @@ -0,0 +1,34 @@ +package org.onap.champ.util; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class TestUtil { + + public static Path getPath(String resourceFilename) throws URISyntaxException { + URL resource = ClassLoader.getSystemResource(resourceFilename); + if (resource != null) { + return Paths.get(resource.toURI()); + } + + // If the resource is not found relative to the classpath, try to get it from the file system directly. + File file = new File(resourceFilename); + if (!file.exists()) { + throw new RuntimeException("Resource does not exist: " + resourceFilename); + } + return file.toPath(); + } + + public static String getContentUtf8(Path filePath) throws IOException { + return new String(Files.readAllBytes(filePath)); + } + + public static String getFileAsString(String resourceFilename) throws IOException, URISyntaxException { + return getContentUtf8(getPath(resourceFilename)); + } +}
\ No newline at end of file diff --git a/champ-service/src/test/resources/event/event-envelope.json b/champ-service/src/test/resources/event/event-envelope.json new file mode 100644 index 0000000..68888c0 --- /dev/null +++ b/champ-service/src/test/resources/event/event-envelope.json @@ -0,0 +1,19 @@ +{ + "header": { + "request-id": "2253f351-d9b6-4638-9fe3-2c194bee1b29", + "timestamp": "20180316T092301Z", + "source-name": "CHAMP", + "event-type": "db-update-result" + }, + "body": { + "operation": "CREATE", + "transaction-id": "2253f351-d9b6-4638-9fe3-2c194bee1b29", + "timestamp": 1521198075620, + "vertex": { + "key": "", + "schema-version": "v13", + "type": "pserver", + "properties": {} + } + } +}
\ No newline at end of file |