aboutsummaryrefslogtreecommitdiffstats
path: root/champ-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'champ-service/src/main')
-rw-r--r--champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java191
-rw-r--r--champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java62
-rw-r--r--champ-service/src/main/java/org/onap/champ/util/ChampProperties.java60
3 files changed, 153 insertions, 160 deletions
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
index a52127e..346bd02 100644
--- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
+++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncRequestProcessor.java
@@ -153,108 +153,109 @@ public class ChampAsyncRequestProcessor extends TimerTask {
// Get the next event to be published from the queue.
eventEnvelope = requestProcesserEventQueue.take();
event = eventEnvelope.getBody();
- } catch (InterruptedException e) {
- // Restore the interrupted status.
- Thread.currentThread().interrupt();
- }
- // Apply Champ Event header
- eventEnvelope.setHeader(new GraphEventHeader.Builder().requestId(event.getTransactionId()).build());
+ // Apply Champ Event header
+ eventEnvelope.setHeader(new GraphEventHeader.Builder().requestId(event.getTransactionId()).build());
- // Parse the event and call champ Dao to process , Create the
- // response event and put it on response queue
- event.setResult(GraphEventResult.SUCCESS);
+ // Parse the event and call champ Dao to process , Create the
+ // response event and put it on response queue
+ event.setResult(GraphEventResult.SUCCESS);
- // Check if this request is part of an ongoing DB transaction
- ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
- if ((event.getDbTransactionId() != null) && (transaction == null)) {
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
- event.setHttpErrorStatus(Status.BAD_REQUEST);
- }
+ // Check if this request is part of an ongoing DB transaction
+ ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
+ if ((event.getDbTransactionId() != null) && (transaction == null)) {
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
+ event.setHttpErrorStatus(Status.BAD_REQUEST);
+ }
- if (event.getResult() != GraphEventResult.FAILURE) {
- try {
- if (event.getVertex() != null) {
+ if (event.getResult() != GraphEventResult.FAILURE) {
+ try {
+ if (event.getVertex() != null) {
- switch (event.getOperation()) {
- case CREATE:
- event.setVertex(GraphEventVertex.fromChampObject(
+ switch (event.getOperation()) {
+ case CREATE:
+ event.setVertex(GraphEventVertex.fromChampObject(
champDataService.storeObject(event.getVertex().toChampObject(),
- Optional.ofNullable(transaction)),
+ Optional.ofNullable(transaction)),
event.getVertex().getModelVersion()));
- break;
+ break;
- case UPDATE:
- event.setVertex(GraphEventVertex.fromChampObject(
- champDataService.replaceObject(event.getVertex().toChampObject(event.getVertex().toJson()),
- event.getVertex().getId(), Optional.ofNullable(transaction)),
+ case UPDATE:
+ event.setVertex(GraphEventVertex.fromChampObject(
+ champDataService.replaceObject(
+ event.getVertex().toChampObject(event.getVertex().toJson()),
+ event.getVertex().getId(), Optional.ofNullable(transaction)),
event.getVertex().getModelVersion()));
- break;
- case DELETE:
- champDataService.deleteObject(event.getVertex().getId(),
+ break;
+ case DELETE:
+ champDataService.deleteObject(event.getVertex().getId(),
Optional.ofNullable(transaction));
- break;
- default:
- // log error
- }
- } else if (event.getEdge() != null) {
- switch (event.getOperation()) {
- case CREATE:
- event.setEdge(GraphEventEdge.fromChampRelationship(
+ break;
+ default:
+ // log error
+ }
+ } else if (event.getEdge() != null) {
+ switch (event.getOperation()) {
+ case CREATE:
+ event.setEdge(GraphEventEdge.fromChampRelationship(
champDataService.storeRelationship(event.getEdge().toChampRelationship(),
- Optional.ofNullable(transaction)),
+ Optional.ofNullable(transaction)),
event.getEdge().getModelVersion()));
- break;
+ break;
- case UPDATE:
- event.setEdge(GraphEventEdge.fromChampRelationship(
+ case UPDATE:
+ event.setEdge(GraphEventEdge.fromChampRelationship(
champDataService.updateRelationship(event.getEdge().toChampRelationship(),
- event.getEdge().getId(), Optional.ofNullable(transaction)),
+ event.getEdge().getId(), Optional.ofNullable(transaction)),
event.getEdge().getModelVersion()));
- break;
- case DELETE:
- champDataService.deleteRelationship(event.getEdge().getId(),
+ break;
+ case DELETE:
+ champDataService.deleteRelationship(event.getEdge().getId(),
Optional.ofNullable(transaction));
- break;
- default:
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ break;
+ default:
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
"Invalid operation for event transactionId: " + event.getTransactionId());
- }
+ }
- } else {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ } else {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
"Invalid payload for event transactionId: " + event.getTransactionId());
+ }
+ } catch (ChampServiceException champException) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage(champException.getMessage());
+ event.setHttpErrorStatus(champException.getHttpStatus());
+
+ } catch (Exception ex) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage(ex.getMessage());
+ event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
}
- } catch (ChampServiceException champException) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage(champException.getMessage());
- event.setHttpErrorStatus(champException.getHttpStatus());
-
- } catch (Exception ex) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage(ex.getMessage());
- event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
}
- }
- if (event.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ if (event.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
"Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult());
- } else {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult());
+ } else {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
"Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
- + event.getErrorMessage());
- }
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
+ + event.getErrorMessage());
+ }
- champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
+ champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
}
}
@@ -281,30 +282,30 @@ public class ChampAsyncRequestProcessor extends TimerTask {
if (events == null || !events.iterator().hasNext()) {
logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
- }
-
- for (String event : events) {
- try {
- GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
- GraphEvent requestEvent = requestEnvelope.getBody();
- auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ } else {
+ for (String event : events) {
+ try {
+ GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
+ GraphEvent requestEvent = requestEnvelope.getBody();
+ auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
"Event received of type: " + requestEvent.getObjectType() + " with key: "
- + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
- + " , operation: " + requestEvent.getOperation().toString());
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
"Event received of type: " + requestEvent.getObjectType() + " with key: "
- + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
- + " , operation: " + requestEvent.getOperation().toString());
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
- // Try to submit the event to be published to the event bus.
- if (!requestProcesserEventQueue.offer(requestEnvelope)) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ // Try to submit the event to be published to the event bus.
+ if (!requestProcesserEventQueue.offer(requestEnvelope)) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
"Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
- }
+ }
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ }
}
}
diff --git a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java
index c3f3859..558c8c1 100644
--- a/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java
+++ b/champ-service/src/main/java/org/onap/champ/async/ChampAsyncResponsePublisher.java
@@ -115,37 +115,37 @@ public class ChampAsyncResponsePublisher {
@Override
public void run() {
- while (true) {
- GraphEventEnvelope eventEnvelope = null;
- GraphEvent event = null;
- try {
- // Get the next event to be published from the queue.
- eventEnvelope = responsePublisherEventQueue.take();
- event = eventEnvelope.getBody();
- } catch (InterruptedException e) {
- // Restore the interrupted status.
- Thread.currentThread().interrupt();
- }
- // Publish the response
- try {
- event.setTimestamp(System.currentTimeMillis());
- asyncResponsePublisher.sendSync(eventEnvelope.toJson());
- if (event.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
- "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult());
- } else {
- logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
- "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
- + event.getErrorMessage());
- }
- } catch (Exception ex) {
- logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
- }
-
+ while (true) {
+ try {
+ // Get the next event to be published from the queue.
+ GraphEventEnvelope eventEnvelope = responsePublisherEventQueue.take();
+ GraphEvent event = eventEnvelope.getBody();
+
+ // Publish the response
+ try {
+ event.setTimestamp(System.currentTimeMillis());
+ asyncResponsePublisher.sendSync(eventEnvelope.toJson());
+ if (event.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
+ "Response published for Event of type: " + event.getObjectType() + " with key: " + event
+ .getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult());
+ } else {
+ logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
+ "Response published for Event of type: " + event.getObjectType() + " with key: " + event
+ .getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
+ + event.getErrorMessage());
+ }
+ } catch (Exception ex) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
}
}
}
diff --git a/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java b/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java
index ef9f534..74b761e 100644
--- a/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java
+++ b/champ-service/src/main/java/org/onap/champ/util/ChampProperties.java
@@ -22,52 +22,44 @@ package org.onap.champ.util;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
public class ChampProperties {
- private static Properties properties;
+ private static Properties properties;
- static {
- properties = new Properties();
- File file = new File(ChampServiceConstants.CHAMP_CONFIG_FILE);
- try {
- properties.load(new FileInputStream(file));
- } catch (IOException e) {
- e.printStackTrace();
- Runtime.getRuntime().halt(1);
+ static {
+ properties = new Properties();
+ try (FileInputStream fileInputStream = new FileInputStream(
+ new File(ChampServiceConstants.CHAMP_CONFIG_FILE))
+ ) {
+ properties.load(fileInputStream);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Runtime.getRuntime().halt(1);
+ }
}
- }
- public static String get(String key) {
- return properties.getProperty(key);
- }
-
- public static String get(String key, String defaultValue) {
- return properties.getProperty(key, defaultValue);
- }
-
- public static void put(String key, String value) {
- properties.setProperty(key, value);
- FileOutputStream fileOut = null;
- try {
- fileOut = new FileOutputStream(new File(ChampServiceConstants.CHAMP_CONFIG_FILE));
- properties.store(fileOut, "Added property: " + key);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
+ public static String get(String key) {
+ return properties.getProperty(key);
+ }
- try {
- fileOut.close();
- } catch (IOException ex) {
- ex.printStackTrace();
- }
+ public static String get(String key, String defaultValue) {
+ return properties.getProperty(key, defaultValue);
}
- }
+ public static void put(String key, String value) {
+ properties.setProperty(key, value);
+ try (FileOutputStream fileOut = new FileOutputStream(
+ new File(ChampServiceConstants.CHAMP_CONFIG_FILE))
+ ) {
+ properties.store(fileOut, "Added property: " + key);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}