summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-x[-rw-r--r--]dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java6
-rwxr-xr-x[-rw-r--r--]dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java89
2 files changed, 55 insertions, 40 deletions
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
index 7e257a12..18c00d56 100644..100755
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
@@ -41,7 +41,11 @@ public class DmaapListener {
Properties properties = new Properties();
String propFileName = DMAAP_LISTENER_PROPERTIES;
String propPath = null;
- String propDir = System.getenv(SDNC_CONFIG_DIR);
+ String propDir = System.getProperty(SDNC_CONFIG_DIR);
+ if(propDir == null) {
+ propDir = System.getenv(SDNC_CONFIG_DIR);
+ LOG.debug(SDNC_CONFIG_DIR + " read from environment variable with value " + propDir);
+ }
List<SdncDmaapConsumer> consumers = new LinkedList<>();
if (args.length > 0) {
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
index c02ec5df..2a9e0b14 100644..100755
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
@@ -64,45 +64,56 @@ public class MessageRouterHttpClient implements SdncDmaapConsumer {
}
- @Override
- public void run() {
- if (isReady) {
- isRunning = true;
- while (isRunning) {
- try {
- Response response = getMessages.invoke();
- Log.info("GET " + uri + " returned http status " + response.getStatus());
- String entity = response.readEntity(String.class);
- if (entity.contains("{")) {
- // Get rid of opening ["
- entity = entity.substring(2);
- // Get rid of closing "]
- entity = entity.substring(0, entity.length() - 2);
- // This replacement effectively un-escapes the JSON
- for (String message : entity.split("\",\"")) {
- try {
- processMsg(message.replace("\\\"", "\""));
- } catch (InvalidMessageException e) {
- Log.error("Message could not be processed", e);
- }
- }
- } else {
- Log.info("Entity doesn't appear to contain JSON elements");
- }
- } catch (Exception e) {
- Log.error("GET " + uri + " failed.", e);
- } finally {
- Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
- try {
- Thread.sleep(fetchPause);
- } catch (InterruptedException e) {
- Log.error("Could not sleep thread", e);
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
+ @Override
+ public void run() {
+ if (isReady) {
+ isRunning = true;
+ while (isRunning) {
+ try {
+ Response response = getMessages.invoke();
+ Log.info("GET " + uri + " returned http status " + response.getStatus());
+ String entity = response.readEntity(String.class);
+ if (response.getStatus() < 300) {
+ if (entity.contains("{")) {
+ // Get rid of opening ["
+ entity = entity.substring(2);
+ // Get rid of closing "]
+ entity = entity.substring(0, entity.length() - 2);
+ // This replacement effectively un-escapes the JSON
+ for (String message : entity.split("\",\"")) {
+ try {
+ processMsg(message.replace("\\\"", "\""));
+ } catch (InvalidMessageException e) {
+ Log.error("Message could not be processed", e);
+ }
+ }
+ } else {
+ if (entity.length() < 1) {
+ Log.info("GET was successful, but the server returned an empty message body.");
+ } else {
+ Log.info(
+ "GET was successful, but entity is not valid JSON. Message body will be logged, but not processed");
+ Log.info(entity);
+ }
+ }
+ } else {
+ Log.info("GET failed, message body will be logged, but not processed.");
+ Log.info(entity);
+ }
+ } catch (Exception e) {
+ Log.error("GET " + uri + " failed.", e);
+ } finally {
+ Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
+ try {
+ Thread.sleep(fetchPause);
+ } catch (InterruptedException e) {
+ Log.error("Could not sleep thread", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
@Override
public void init(Properties baseProperties, String consumerPropertiesPath) {