diff options
-rwxr-xr-x[-rw-r--r--] | dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java | 6 | ||||
-rwxr-xr-x[-rw-r--r--] | dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java | 89 |
2 files changed, 55 insertions, 40 deletions
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java index 7e257a12..18c00d56 100644..100755 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java @@ -41,7 +41,11 @@ public class DmaapListener { Properties properties = new Properties(); String propFileName = DMAAP_LISTENER_PROPERTIES; String propPath = null; - String propDir = System.getenv(SDNC_CONFIG_DIR); + String propDir = System.getProperty(SDNC_CONFIG_DIR); + if(propDir == null) { + propDir = System.getenv(SDNC_CONFIG_DIR); + LOG.debug(SDNC_CONFIG_DIR + " read from environment variable with value " + propDir); + } List<SdncDmaapConsumer> consumers = new LinkedList<>(); if (args.length > 0) { diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java index c02ec5df..2a9e0b14 100644..100755 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java @@ -64,45 +64,56 @@ public class MessageRouterHttpClient implements SdncDmaapConsumer { } - @Override - public void run() { - if (isReady) { - isRunning = true; - while (isRunning) { - try { - Response response = getMessages.invoke(); - Log.info("GET " + uri + " returned http status " + response.getStatus()); - String entity = response.readEntity(String.class); - if (entity.contains("{")) { - // Get rid of opening [" - entity = entity.substring(2); - // Get rid of closing "] - entity = entity.substring(0, entity.length() - 2); - // This replacement effectively un-escapes the JSON - for (String message : entity.split("\",\"")) { - try { - processMsg(message.replace("\\\"", "\"")); - } catch (InvalidMessageException e) { - Log.error("Message could not be processed", e); - } - } - } else { - Log.info("Entity doesn't appear to contain JSON elements"); - } - } catch (Exception e) { - Log.error("GET " + uri + " failed.", e); - } finally { - Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again."); - try { - Thread.sleep(fetchPause); - } catch (InterruptedException e) { - Log.error("Could not sleep thread", e); - Thread.currentThread().interrupt(); - } - } - } - } - } + @Override + public void run() { + if (isReady) { + isRunning = true; + while (isRunning) { + try { + Response response = getMessages.invoke(); + Log.info("GET " + uri + " returned http status " + response.getStatus()); + String entity = response.readEntity(String.class); + if (response.getStatus() < 300) { + if (entity.contains("{")) { + // Get rid of opening [" + entity = entity.substring(2); + // Get rid of closing "] + entity = entity.substring(0, entity.length() - 2); + // This replacement effectively un-escapes the JSON + for (String message : entity.split("\",\"")) { + try { + processMsg(message.replace("\\\"", "\"")); + } catch (InvalidMessageException e) { + Log.error("Message could not be processed", e); + } + } + } else { + if (entity.length() < 1) { + Log.info("GET was successful, but the server returned an empty message body."); + } else { + Log.info( + "GET was successful, but entity is not valid JSON. Message body will be logged, but not processed"); + Log.info(entity); + } + } + } else { + Log.info("GET failed, message body will be logged, but not processed."); + Log.info(entity); + } + } catch (Exception e) { + Log.error("GET " + uri + " failed.", e); + } finally { + Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again."); + try { + Thread.sleep(fetchPause); + } catch (InterruptedException e) { + Log.error("Could not sleep thread", e); + Thread.currentThread().interrupt(); + } + } + } + } + } @Override public void init(Properties baseProperties, String consumerPropertiesPath) { |