diff options
Diffstat (limited to 'champ-service/src')
3 files changed, 153 insertions, 160 deletions
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java index a52127e..346bd02 100644 --- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java +++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java @@ -153,108 +153,109 @@ public class ChampAsyncRequestProcessor extends TimerTask { // Get the next event to be published from the queue. eventEnvelope = requestProcesserEventQueue.take(); event = eventEnvelope.getBody(); - } catch (InterruptedException e) { - // Restore the interrupted status. - Thread.currentThread().interrupt(); - } - // Apply Champ Event header - eventEnvelope.setHeader(new GraphEventHeader.Builder().requestId(event.getTransactionId()).build()); + // Apply Champ Event header + eventEnvelope.setHeader(new GraphEventHeader.Builder().requestId(event.getTransactionId()).build()); - // Parse the event and call champ Dao to process , Create the - // response event and put it on response queue - event.setResult(GraphEventResult.SUCCESS); + // Parse the event and call champ Dao to process , Create the + // response event and put it on response queue + event.setResult(GraphEventResult.SUCCESS); - // Check if this request is part of an ongoing DB transaction - ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId()); - if ((event.getDbTransactionId() != null) && (transaction == null)) { - event.setResult(GraphEventResult.FAILURE); - event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found"); - event.setHttpErrorStatus(Status.BAD_REQUEST); - } + // Check if this request is part of an ongoing DB transaction + ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId()); + if ((event.getDbTransactionId() != null) && (transaction == null)) { + event.setResult(GraphEventResult.FAILURE); + event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found"); + event.setHttpErrorStatus(Status.BAD_REQUEST); + } - if (event.getResult() != GraphEventResult.FAILURE) { - try { - if (event.getVertex() != null) { + if (event.getResult() != GraphEventResult.FAILURE) { + try { + if (event.getVertex() != null) { - switch (event.getOperation()) { - case CREATE: - event.setVertex(GraphEventVertex.fromChampObject( + switch (event.getOperation()) { + case CREATE: + event.setVertex(GraphEventVertex.fromChampObject( champDataService.storeObject(event.getVertex().toChampObject(), - Optional.ofNullable(transaction)), + Optional.ofNullable(transaction)), event.getVertex().getModelVersion())); - break; + break; - case UPDATE: - event.setVertex(GraphEventVertex.fromChampObject( - champDataService.replaceObject(event.getVertex().toChampObject(event.getVertex().toJson()), - event.getVertex().getId(), Optional.ofNullable(transaction)), + case UPDATE: + event.setVertex(GraphEventVertex.fromChampObject( + champDataService.replaceObject( + event.getVertex().toChampObject(event.getVertex().toJson()), + event.getVertex().getId(), Optional.ofNullable(transaction)), event.getVertex().getModelVersion())); - break; - case DELETE: - champDataService.deleteObject(event.getVertex().getId(), + break; + case DELETE: + champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction)); - break; - default: - // log error - } - } else if (event.getEdge() != null) { - switch (event.getOperation()) { - case CREATE: - event.setEdge(GraphEventEdge.fromChampRelationship( + break; + default: + // log error + } + } else if (event.getEdge() != null) { + switch (event.getOperation()) { + case CREATE: + event.setEdge(GraphEventEdge.fromChampRelationship( champDataService.storeRelationship(event.getEdge().toChampRelationship(), - Optional.ofNullable(transaction)), + Optional.ofNullable(transaction)), event.getEdge().getModelVersion())); - break; + break; - case UPDATE: - event.setEdge(GraphEventEdge.fromChampRelationship( + case UPDATE: + event.setEdge(GraphEventEdge.fromChampRelationship( champDataService.updateRelationship(event.getEdge().toChampRelationship(), - event.getEdge().getId(), Optional.ofNullable(transaction)), + event.getEdge().getId(), Optional.ofNullable(transaction)), event.getEdge().getModelVersion())); - break; - case DELETE: - champDataService.deleteRelationship(event.getEdge().getId(), + break; + case DELETE: + champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction)); - break; - default: - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + break; + default: + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Invalid operation for event transactionId: " + event.getTransactionId()); - } + } - } else { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + } else { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Invalid payload for event transactionId: " + event.getTransactionId()); + } + } catch (ChampServiceException champException) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage()); + event.setResult(GraphEventResult.FAILURE); + event.setErrorMessage(champException.getMessage()); + event.setHttpErrorStatus(champException.getHttpStatus()); + + } catch (Exception ex) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage()); + event.setResult(GraphEventResult.FAILURE); + event.setErrorMessage(ex.getMessage()); + event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR); } - } catch (ChampServiceException champException) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage()); - event.setResult(GraphEventResult.FAILURE); - event.setErrorMessage(champException.getMessage()); - event.setHttpErrorStatus(champException.getHttpStatus()); - - } catch (Exception ex) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage()); - event.setResult(GraphEventResult.FAILURE); - event.setErrorMessage(ex.getMessage()); - event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR); } - } - if (event.getResult().equals(GraphEventResult.SUCCESS)) { - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + if (event.getResult().equals(GraphEventResult.SUCCESS)) { + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString() + " , result: " + event.getResult()); - } else { - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString() + " , result: " + event.getResult()); + } else { + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString() + " , result: " + event.getResult() + " , error: " - + event.getErrorMessage()); - } + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString() + " , result: " + event.getResult() + " , error: " + + event.getErrorMessage()); + } - champAsyncResponsePublisher.publishResponseEvent(eventEnvelope); + champAsyncResponsePublisher.publishResponseEvent(eventEnvelope); + } catch (InterruptedException e) { + // Restore the interrupted status. + Thread.currentThread().interrupt(); + } } } @@ -281,30 +282,30 @@ public class ChampAsyncRequestProcessor extends TimerTask { if (events == null || !events.iterator().hasNext()) { logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved"); - } - - for (String event : events) { - try { - GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event); - GraphEvent requestEvent = requestEnvelope.getBody(); - auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + } else { + for (String event : events) { + try { + GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event); + GraphEvent requestEvent = requestEnvelope.getBody(); + auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received of type: " + requestEvent.getObjectType() + " with key: " - + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId() - + " , operation: " + requestEvent.getOperation().toString()); - logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, + + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId() + + " , operation: " + requestEvent.getOperation().toString()); + logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received of type: " + requestEvent.getObjectType() + " with key: " - + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId() - + " , operation: " + requestEvent.getOperation().toString()); - logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event); + + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId() + + " , operation: " + requestEvent.getOperation().toString()); + logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event); - // Try to submit the event to be published to the event bus. - if (!requestProcesserEventQueue.offer(requestEnvelope)) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, + // Try to submit the event to be published to the event bus. + if (!requestProcesserEventQueue.offer(requestEnvelope)) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Event could not be published to the event bus due to: Internal buffer capacity exceeded."); - } + } - } catch (Exception e) { - logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage()); + } catch (Exception e) { + logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage()); + } } } diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java index c3f3859..558c8c1 100644 --- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java +++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java @@ -115,37 +115,37 @@ public class ChampAsyncResponsePublisher { @Override public void run() { - while (true) { - GraphEventEnvelope eventEnvelope = null; - GraphEvent event = null; - try { - // Get the next event to be published from the queue. - eventEnvelope = responsePublisherEventQueue.take(); - event = eventEnvelope.getBody(); - } catch (InterruptedException e) { - // Restore the interrupted status. - Thread.currentThread().interrupt(); - } - // Publish the response - try { - event.setTimestamp(System.currentTimeMillis()); - asyncResponsePublisher.sendSync(eventEnvelope.toJson()); - if (event.getResult().equals(GraphEventResult.SUCCESS)) { - logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO, - "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString() + " , result: " + event.getResult()); - } else { - logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO, - "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString() + " , result: " + event.getResult() + " , error: " - + event.getErrorMessage()); - } - } catch (Exception ex) { - logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage()); - } - + while (true) { + try { + // Get the next event to be published from the queue. + GraphEventEnvelope eventEnvelope = responsePublisherEventQueue.take(); + GraphEvent event = eventEnvelope.getBody(); + + // Publish the response + try { + event.setTimestamp(System.currentTimeMillis()); + asyncResponsePublisher.sendSync(eventEnvelope.toJson()); + if (event.getResult().equals(GraphEventResult.SUCCESS)) { + logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO, + "Response published for Event of type: " + event.getObjectType() + " with key: " + event + .getObjectKey() + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString() + " , result: " + event.getResult()); + } else { + logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO, + "Response published for Event of type: " + event.getObjectType() + " with key: " + event + .getObjectKey() + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString() + " , result: " + event.getResult() + " , error: " + + event.getErrorMessage()); + } + } catch (Exception ex) { + logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage()); + } + } catch (InterruptedException e) { + // Restore the interrupted status. + Thread.currentThread().interrupt(); + } } } } diff --git a/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java b/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java index ef9f534..74b761e 100644 --- a/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java +++ b/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java @@ -22,52 +22,44 @@ package org.onap.champ.util; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.Properties; public class ChampProperties { - private static Properties properties; + private static Properties properties; - static { - properties = new Properties(); - File file = new File(ChampServiceConstants.CHAMP_CONFIG_FILE); - try { - properties.load(new FileInputStream(file)); - } catch (IOException e) { - e.printStackTrace(); - Runtime.getRuntime().halt(1); + static { + properties = new Properties(); + try (FileInputStream fileInputStream = new FileInputStream( + new File(ChampServiceConstants.CHAMP_CONFIG_FILE)) + ) { + properties.load(fileInputStream); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(1); + } } - } - public static String get(String key) { - return properties.getProperty(key); - } - - public static String get(String key, String defaultValue) { - return properties.getProperty(key, defaultValue); - } - - public static void put(String key, String value) { - properties.setProperty(key, value); - FileOutputStream fileOut = null; - try { - fileOut = new FileOutputStream(new File(ChampServiceConstants.CHAMP_CONFIG_FILE)); - properties.store(fileOut, "Added property: " + key); - } catch (Exception e) { - e.printStackTrace(); - } finally { + public static String get(String key) { + return properties.getProperty(key); + } - try { - fileOut.close(); - } catch (IOException ex) { - ex.printStackTrace(); - } + public static String get(String key, String defaultValue) { + return properties.getProperty(key, defaultValue); } - } + public static void put(String key, String value) { + properties.setProperty(key, value); + try (FileOutputStream fileOut = new FileOutputStream( + new File(ChampServiceConstants.CHAMP_CONFIG_FILE)) + ) { + properties.store(fileOut, "Added property: " + key); + } catch (Exception e) { + e.printStackTrace(); + } + } } |