aboutsummaryrefslogtreecommitdiffstats
path: root/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java')
-rw-r--r--ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java64
1 files changed, 33 insertions, 31 deletions
diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
index 27d6b6f76..a1d0ece83 100644
--- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
+++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java
@@ -94,35 +94,36 @@ public class ManualNotificationUpdateThread implements Runnable {
} catch (MalformedURLException e) {
LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
}
-
- String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
- SendMessage(consumerTopic, "Starting-Topic");
- final LinkedList<String> urlList = new LinkedList<> ();
- for ( String u : clusterList.split ( "," ) ){
- urlList.add ( u );
- }
-
- try {
- CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
- } catch (MalformedURLException | GeneralSecurityException e1) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
- }
+ if(aURL != null){
+ String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
+ SendMessage(consumerTopic, "Starting-Topic");
+ final LinkedList<String> urlList = new LinkedList<> ();
+ for ( String u : clusterList.split ( "," ) ){
+ urlList.add ( u );
+ }
- while (this.isRunning()) {
- LOGGER.debug("While loop test _ take out ");
try {
- for ( String msg : CConsumer.fetch () ){
- LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
- returnTopic = processMessage(msg);
- if(returnTopic != null){
- SendMessage(returnTopic, update);
+ CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
+ } catch (MalformedURLException | GeneralSecurityException e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
+ }
+
+ while (this.isRunning()) {
+ LOGGER.debug("While loop test _ take out ");
+ try {
+ for ( String msg : CConsumer.fetch () ){
+ LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
+ returnTopic = processMessage(msg);
+ if(returnTopic != null){
+ SendMessage(returnTopic, update);
+ }
}
+ } catch (IOException e) {
+ LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
}
- } catch (IOException e) {
- LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
}
+ LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
}
- LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
} else if ("dmaap".equals(propNotificationType)) {
String dmaapServers = null;
try {
@@ -200,14 +201,15 @@ public class ManualNotificationUpdateThread implements Runnable {
} catch (Exception e) {
LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
}
-
- try {
- pub.send( "pdpReturnMessage", message );
- LOGGER.debug("Sending to Message to tpoic" + topic);
- } catch (IOException e) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
- }
- pub.close();
+ if(pub != null){
+ try {
+ pub.send( "pdpReturnMessage", message );
+ LOGGER.debug("Sending to Message to tpoic" + topic);
+ pub.close();
+ } catch (IOException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e);
+ }
+ }
}
private String processMessage(String msg) {