From 24e223cc8da0acbb4727354e9da77b8213c04477 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Thu, 25 Jan 2018 18:38:57 +0100 Subject: TaskQueueManager fixes Change-Id: I995f315ebe27bf09afedab90b787366a998c083c Issue-ID: APPC-529 Signed-off-by: Jakub Dudycz --- .../client/impl/protocol/AsyncProtocolImpl.java | 45 ++++++++++++---------- 1 file changed, 25 insertions(+), 20 deletions(-) (limited to 'appc-client/client-lib/src/main/java/org/onap') diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java index 82626d802..8567d993a 100644 --- a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java @@ -70,7 +70,7 @@ class AsyncProtocolImpl implements AsyncProtocol { */ private ExecutorService executorService = Executors.newSingleThreadExecutor(); - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class); AsyncProtocolImpl() { @@ -80,6 +80,7 @@ class AsyncProtocolImpl implements AsyncProtocol { messageWriter = (MessageWriter) messageReader; } + @Override public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException { if (callback == null) { @@ -97,6 +98,7 @@ class AsyncProtocolImpl implements AsyncProtocol { } } + @Override public void sendRequest(String payload, MessageContext context) throws ProtocolException { //get message to be sent to appc from payload and context @@ -119,7 +121,7 @@ class AsyncProtocolImpl implements AsyncProtocol { public class Listener implements Runnable { - + @Override public void run() { while (!isShutdown) { @@ -128,28 +130,31 @@ class AsyncProtocolImpl implements AsyncProtocol { messages = messageService.fetch(); LOG.debug("Successfully fetched " + messages.size() + " messages"); } catch (IOException e) { - LOG.error("Fetching " + messages.size() + " messages failed"); + LOG.error("Fetching " + messages.size() + " messages failed", e); } for (String message : messages) { + handleMessage(message); + } + } + } - MessageContext context = new MessageContext(); - String payload = null; - - try { - //get payload and context from message to be sent to core layer - payload = messageReader.read(message, context); - LOG.debug("Got body: " + payload); - //call core layer response handler - if(!isShutdown) { - callback.onResponse(payload, context); - }else{ - LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" + - context.getCorrelationID() + "> response ", message); - } - } catch (ProtocolException e) { - LOG.error("Failed to read message from UEB. message is: " + message); - } + private void handleMessage(String message) { + MessageContext context = new MessageContext(); + String payload; + + try { + //get payload and context from message to be sent to core layer + payload = messageReader.read(message, context); + LOG.debug("Got body: " + payload); + //call core layer response handler + if (!isShutdown) { + callback.onResponse(payload, context); + } else { + LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" + + context.getCorrelationID() + "> response ", message); } + } catch (ProtocolException e) { + LOG.error("Failed to read message from UEB. message is: " + message, e); } } } -- cgit 1.2.3-korg