summaryrefslogtreecommitdiffstats
path: root/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java')
-rw-r--r--appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java157
1 files changed, 157 insertions, 0 deletions
diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java
new file mode 100644
index 000000000..82626d802
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java
@@ -0,0 +1,157 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.client.impl.protocol;
+
+import org.onap.appc.client.impl.core.MessageContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+class AsyncProtocolImpl implements AsyncProtocol {
+
+ /**
+ * message bus listener thread handler
+ */
+ private Future listenerHandler;
+ /**
+ * called when messages are fetched - called for a single message
+ */
+ private RetrieveMessageCallback callback;
+ /**
+ * message bus client used to send/fetch
+ */
+ private MessagingService messageService;
+ /**
+ * Message reader used to extract body and context from reponse message
+ */
+ private MessageReader messageReader;
+ /**
+ * Message writer used to construct meesage from body and context
+ */
+ private MessageWriter messageWriter;
+
+ /**
+ * shutdown indicator
+ */
+ private boolean isShutdown = false;
+
+ /**
+ * executor service for listener usage
+ */
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+
+
+ AsyncProtocolImpl() {
+
+ messageService = new UEBMessagingService();
+ messageReader = new APPCMessageReaderWriter();
+ messageWriter = (MessageWriter) messageReader;
+ }
+
+ public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
+
+ if (callback == null) {
+ throw new ProtocolException("Callback param should not be null!");
+ }
+ this.callback = callback;
+
+ try {
+ messageService.init(props);
+ //get message bus listener thread
+ //start the thread after initializing services
+ listenerHandler = executorService.submit(new Listener());
+ } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+ public void sendRequest(String payload, MessageContext context) throws ProtocolException {
+
+ //get message to be sent to appc from payload and context
+ String message = messageWriter.write(payload, context);
+ try {
+ messageService.send(context.getPartiton(), message);
+ LOG.debug("Successfully send message: " + message);
+ } catch (IOException e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ isShutdown = true;
+ messageService.close();
+ LOG.warn("The protocol layer in shutdown stage.");
+ executorService.shutdownNow();
+ }
+
+ public class Listener implements Runnable {
+
+
+ public void run() {
+
+ while (!isShutdown) {
+ List<String> messages = new ArrayList<>();
+ try {
+ messages = messageService.fetch();
+ LOG.debug("Successfully fetched " + messages.size() + " messages");
+ } catch (IOException e) {
+ LOG.error("Fetching " + messages.size() + " messages failed");
+ }
+ for (String message : messages) {
+
+ MessageContext context = new MessageContext();
+ String payload = null;
+
+ try {
+ //get payload and context from message to be sent to core layer
+ payload = messageReader.read(message, context);
+ LOG.debug("Got body: " + payload);
+ //call core layer response handler
+ if(!isShutdown) {
+ callback.onResponse(payload, context);
+ }else{
+ LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
+ context.getCorrelationID() + "> response ", message);
+ }
+ } catch (ProtocolException e) {
+ LOG.error("Failed to read message from UEB. message is: " + message);
+ }
+ }
+ }
+ }
+ }
+
+}