aboutsummaryrefslogtreecommitdiffstats
path: root/appc-client/client-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'appc-client/client-lib/src')
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java85
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java76
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java45
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java314
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java70
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java43
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java28
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java36
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java68
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java33
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java47
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java69
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java36
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java85
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java50
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java107
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java65
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java98
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java82
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java77
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java40
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java157
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java56
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java125
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java39
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java40
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java61
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java38
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java82
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java38
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java44
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java79
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java98
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java30
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java38
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java102
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java36
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java163
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java151
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java104
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java91
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java75
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java60
-rw-r--r--appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java70
-rw-r--r--appc-client/client-lib/src/test/resources/ueb.missing.properties30
-rw-r--r--appc-client/client-lib/src/test/resources/ueb.properties29
46 files changed, 3390 insertions, 0 deletions
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java
new file mode 100644
index 000000000..82d631965
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java
@@ -0,0 +1,85 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/** Abstract request response handler class, responsible for common functionality of
+ * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler}
+ */
+abstract class AbstractRequestResponseHandler implements RequestResponseHandler {
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class);
+ ICoreResponseHandler businessCallback;
+ protected String corrID;
+ CoreManager coreManager;
+
+
+ AbstractRequestResponseHandler(String corrID,
+ ICoreResponseHandler businessCallback,
+ CoreManager coreManager)
+ {
+ this.businessCallback = businessCallback;
+ this.corrID = corrID;
+ this.coreManager = coreManager;
+ }
+
+ public synchronized void handleResponse(final MessageContext ctx, final String response) {
+ try {
+ coreManager.submitTask(ctx.getCorrelationID(), new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("handling response of corrID <" + corrID + ">" + "response " + response);
+ if(coreManager.isExistHandler(corrID)) {
+ runTask(response, ctx.getType());
+ }
+
+ }
+ });
+ } catch (InterruptedException e) {
+ LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e);
+ }
+ }
+
+ /**
+ *
+ * @param response - Response
+ * @param type - Type of Response
+ */
+ abstract void runTask(String response, String type);
+
+ @Override
+ public void sendRequest(String request, String corrId, String rpcName) throws CoreException {
+ if(!coreManager.isShutdownInProgress()) {
+ coreManager.registerHandler(corrId, this);
+ coreManager.sendRequest(request, corrId, rpcName);
+ coreManager.startTimer(corrId);
+ }else{
+ throw new CoreException("Shutdown is in progress. Request will not be handled");
+ }
+ }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java
new file mode 100644
index 000000000..86756e6b2
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.concurrent.TimeoutException;
+
+/** Handles async responses
+ */
+class AsyncRequestResponseHandler extends AbstractRequestResponseHandler {
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class);
+
+ AsyncRequestResponseHandler(String corrID,
+ ICoreResponseHandler businessCallback,
+ CoreManager coreManager)
+ {
+ super(corrID, businessCallback, coreManager);
+ }
+
+ /**
+ * Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and
+ * unregisters the handler.
+ * @param response - Response
+ * @param type - Type of Response
+ */
+ public void runTask(String response, String type) {
+ boolean finalTask = false;
+ try {
+ finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type);
+ } catch (Exception e){
+ LOG.error("Error on API layer, for request with correlation-id " + corrID, e);
+ }
+ if (finalTask){
+ coreManager.cancelTimer(corrID);
+ coreManager.unregisterHandler(corrID);
+ }
+ else{
+ response = null;
+ type = null;
+ }
+ }
+
+ /**
+ * Calls to API layer for sending timeout exception.
+ */
+ @Override
+ public void onTimeOut() {
+ LOG.info("timeout for request with correlation-id " + corrID);
+ ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID));
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java
new file mode 100644
index 000000000..009a1f42e
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+
+public class CoreException extends Exception {
+
+ public CoreException() {
+ super();
+ }
+
+ public CoreException(String message) {
+ super(message);
+ }
+
+ public CoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CoreException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java
new file mode 100644
index 000000000..f299dc748
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java
@@ -0,0 +1,314 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import org.openecomp.appc.client.impl.protocol.*;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
+ */
+class CoreManager{
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
+ private final ProtocolFactory protocolFactory;
+ protected AsyncProtocol protocol;
+ private final RetrieveMessageCallback protocolCallback = null;
+ private final CoreRegistry registry;
+ private final ITimerService timerService;
+ private final TaskQueueManager queueManager;
+ private String DEFAULT_TIMEOUT = "300000";
+ private final static String RESPONSE_TIMEOUT = "client.response.timeout";
+ private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
+ private boolean isForceShutdown = false;
+ private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
+ private long shutdownTimeout;
+
+ CoreManager(Properties prop) throws CoreException {
+ protocolFactory = ProtocolFactory.getInstance();
+ try {
+ initProtocol(prop);
+ }catch (ProtocolException e){
+ throw new CoreException(e);
+ }
+ registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
+ String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
+ long responseTimeout = Long.parseLong(timeoutProp);
+ String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
+ shutdownTimeout = Long.parseLong(gracefulTimeout);
+ timerService = new TimerServiceImpl(responseTimeout);
+ queueManager = new TaskQueueManager(prop);
+ listenShutdown();
+ }
+
+ /**
+ * initiates protocol layer services.
+ * @param prop - Properties
+ */
+ private void initProtocol(Properties prop) throws ProtocolException {
+ protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
+ protocol.init(prop, getProtocolCallback());
+ }
+
+ /**
+ * Creates protocol response callback
+ * @return - @{@link ProtocolResponseCallbackImpl}
+ */
+ RetrieveMessageCallback getProtocolCallback(){
+ return new ProtocolResponseCallbackImpl();
+ }
+
+ /**
+ * Registers a new handler in registry
+ * @param corrID - Correlation ID
+ * @param requestResponseHandler handler to be called when response arrives
+ */
+ void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
+ registry.register(corrID, requestResponseHandler);
+ }
+
+ /**
+ * Remove a handler from registry service by correlation ID.
+ * @param corrID - Correlation ID
+ * @return - @{@link RequestResponseHandler}
+ */
+ RequestResponseHandler unregisterHandler(String corrID){
+ return (RequestResponseHandler) registry.unregister(corrID);
+ }
+
+ /**
+ * Checks in registry service if a handler is existing.
+ * @param corrID - Correlation ID
+ * @return - boolean
+ */
+ boolean isExistHandler(String corrID) {
+ return registry.isExist(corrID);
+ }
+
+ /**
+ * Starts timer for timeout event when a request was send successfully.
+ * @param corrID - Correlation ID
+ */
+ void startTimer(String corrID){
+ timerService.add(corrID, new TimeoutHandlerImpl(corrID));
+ }
+
+ /**
+ * Cancels timer for fimeout event, in case when complete response was received
+ * @param corrID
+ */
+ void cancelTimer(String corrID){
+ timerService.cancel(corrID);
+ }
+
+ /**
+ * Submits a new task to Queue manager. it is using for both response and timeout tasks
+ * @param corrID - Correlation ID
+ * @param task - @{@link Runnable} task.
+ * @throws InterruptedException
+ */
+ void submitTask(String corrID, Runnable task) throws InterruptedException {
+ queueManager.submit(corrID, task);
+ }
+
+ /**
+ * Sends request to protocol.
+ * @param request - Request
+ * @param corrId - Correlation ID
+ * @param rpcName - RPC name
+ * @throws CoreException - @{@link CoreException}
+ */
+ void sendRequest(String request, String corrId, String rpcName) throws CoreException {
+ MessageContext ctx = getMessageContext(corrId, rpcName);
+ try {
+ protocol.sendRequest(request, ctx);
+ } catch (ProtocolException e) {
+ unregisterHandler(corrId);
+ throw new CoreException(e);
+ }
+ }
+
+ /**
+ * Creates @{@link MessageContext}
+ * @param correlationId - Correlation ID
+ * @param rpcName - RPC Name
+ * @return - @{@link MessageContext}
+ */
+ private MessageContext getMessageContext(String correlationId, String rpcName){
+ MessageContext msgCtx = new MessageContext();
+ msgCtx.setCorrelationID(correlationId);
+ msgCtx.setRpc(rpcName);
+ return msgCtx;
+ }
+
+ /**
+ * Implements response callback from protocol and filters responses by correlation ID.
+ * Only registered events(by correlation ID) will be handled.
+ */
+ private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
+ @Override
+ public void onResponse(String response, MessageContext context) {
+ String corrID = context.getCorrelationID();
+ if (corrID != null) {
+ RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
+ if (messageHandler != null) {
+ LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
+ messageHandler.handleResponse(context, response);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * listens to @{@link Runtime} shutdown event
+ */
+ private void listenShutdown() {
+ Runtime.getRuntime().addShutdownHook(new Thread(){
+ public void run(){
+ gracefulShutdown();
+ }
+ });
+ }
+
+ /**
+ * Implements shutdown for client library.
+ * @param isForceShutdown - true force shutdown, false graceful shutdown
+ */
+ void shutdown(boolean isForceShutdown){
+ if(isForceShutdown){
+ forceShutdown();
+ }else{
+ gracefulShutdown();
+ }
+ }
+
+ /**
+ * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
+ * shutdown only when either all request will be handled or graceful shutdown will be time out.
+ */
+ synchronized void gracefulShutdown(){
+ isGracefulShutdown.set(true);
+ if(registry.isEmpty()){
+ forceShutdown();
+ }
+ else{
+ try {
+ LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
+ wait(shutdownTimeout);
+ LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
+ forceShutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ /**
+ * Closes Protocol, stops Queue Manager and shutdowns Time Service.
+ */
+ private void forceShutdown(){
+ isForceShutdown = true;
+ try {
+ LOG.info("Starting shutdown process.");
+ protocol.shutdown();
+ queueManager.stopQueueManager();
+ timerService.shutdown();
+ } catch (InterruptedException e) {
+ LOG.info("Client library shutdown in progress ", e);
+ }
+ }
+
+ /**
+ *
+ * @return - true when shutdown is in process
+ */
+ boolean isShutdownInProgress(){
+ return isForceShutdown || isGracefulShutdown.get();
+ }
+
+ /**
+ * Timeout handler implementation.
+ * This handler is responsible to assign a task for handling of timeout events.
+ *
+ */
+ private class TimeoutHandlerImpl implements ITimeoutHandler {
+
+ private final String corrID;
+
+ TimeoutHandlerImpl(String corrID) {
+ this.corrID = corrID;
+ }
+
+ /**
+ * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
+ * this queue is shared between both timeout and handlers which belong to same correlation ID.
+ */
+ @Override
+ public void onTimeout() {
+ try {
+ submitTask(corrID, new Runnable() {
+ @Override
+ public void run() {
+ RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
+ if (requestResponseHandler != null) {
+ requestResponseHandler.onTimeOut();
+ }
+ }
+ });
+ } catch (InterruptedException e) {
+ LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
+ }
+ }
+ }
+
+
+ /**
+ * Wakes Up graceful shutdown.
+ */
+ class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
+ @Override
+ public synchronized void emptyCallback() {
+ LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
+ if(isGracefulShutdown.get()){
+ wakeUpShutdown();
+ }
+ }
+ }
+
+ /**
+ * wakes up waiting shutdown.
+ */
+ private synchronized void wakeUpShutdown(){
+ notifyAll();
+ }
+
+}
+
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java
new file mode 100644
index 000000000..8d5d0b729
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** client lib Registry
+ */
+class CoreRegistry<T>{
+ private Map<String, T> registry =
+ new ConcurrentHashMap<String, T>();
+
+ final private EmptyRegistryCallback emptyRegistryCallback;
+
+
+ CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){
+ this.emptyRegistryCallback = emptyRegistryCallback;
+ }
+
+ void register(String key, T obj) {
+ registry.put(key, obj);
+ }
+
+ <T> T unregister(String key) {
+ T item = (T) registry.remove(key);
+ if(registry.isEmpty()) {
+ emptyRegistryCallback.emptyCallback();
+ }
+ return item;
+ }
+
+ <T> T get(String key){
+ return (T) registry.get(key);
+ }
+
+ synchronized boolean isExist(String key) {
+ return registry.containsKey(key);
+ }
+
+ boolean isEmpty(){
+ return registry.isEmpty();
+ }
+
+ public interface EmptyRegistryCallback{
+ void emptyCallback();
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java
new file mode 100644
index 000000000..23fce28dc
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public interface ICoreAsyncResponseHandler extends ICoreResponseHandler{
+
+ /**
+ * Core response to incoming message
+ * @param message response accepted from protocol
+ * @param type type of response
+ * @return true if message is final, false otherwise
+ */
+ boolean onResponse(String message, String type);
+
+ /**
+ * Core reaction to an event of exception
+ * @param e the exception which have been thrown
+ */
+ void onException(Exception e);
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java
new file mode 100644
index 000000000..4f0cb8a8c
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public interface ICoreResponseHandler {
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java
new file mode 100644
index 000000000..332986733
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public interface ICoreSyncResponseHandler extends ICoreResponseHandler{
+
+ /**
+ * Core response to incoming message, should return completed message only
+ * @param message response accepted from protocol
+ * @param type type of response
+ * @return true if message is final, false otherwise
+ */
+ <T> T onResponse(String message, String type) throws CoreException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java
new file mode 100644
index 000000000..831263aad
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+/**
+ */
+public interface IInvocationManager {
+
+ /**
+ * initializes the manager
+ * @param prop properties to read from
+ * @throws CoreException thrown if madatory fields are not set right
+ */
+ void init(Properties prop) throws CoreException;
+
+ /**
+ * handles the flow of an async request
+ * @param request the request body
+ * @param listener business response handler
+ * @param correlationId unique id of the request
+ * @param rpcName rpc call name
+ * @throws CoreException thrown if the request failed to be sent
+ */
+ void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException;
+
+ /**
+ * handles to flow of a sync request
+ * @param request the request body
+ * @param callback business response handler
+ * @param correlationId unique id of the request
+ * @param rpcName rpc call name
+ * @return the output object to be returned
+ * @throws CoreException thrown if the request failed to be sent
+ * @throws TimeoutException thrown if timeout has exceeded
+ */
+ <T> T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException;
+
+ /**
+ * shuts the invocation manager down.
+ * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully
+ */
+ void shutdown(boolean isForceShutdown);
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java
new file mode 100644
index 000000000..e2ec1d6cc
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+interface ITimeoutHandler {
+
+ /**
+ * handles timeout event
+ */
+ void onTimeout();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java
new file mode 100644
index 000000000..c5a4145f9
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+interface ITimerService {
+
+ /**
+ * add a new timeout handler to a request
+ * @param correlationID the id of the request
+ * @param handler to be called once "timeout' time has arrived
+ */
+ void add(String correlationID, ITimeoutHandler handler);
+
+ /**
+ * cancel the timeout handler of a request
+ * @param correlationID the id of the request
+ */
+ void cancel(String correlationID);
+
+
+ /**
+ * shuts the timer service down immediately
+ */
+ void shutdown();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java
new file mode 100644
index 000000000..0582244ff
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * layer for passing requests from API to Core
+ */
+class InvocationManager implements IInvocationManager{
+
+ protected CoreManager coreManager = null;
+
+ InvocationManager(){
+ }
+
+ public void init(Properties properties) throws CoreException {
+ coreManager = new CoreManager(properties);
+ }
+
+ /**
+ *
+ * @param request
+ * @param businessCallback
+ * @param correlationId
+ * @param rpcName
+ * @throws CoreException
+ */
+ public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException {
+ AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+ requestResponseHandler.sendRequest(request, correlationId, rpcName);
+ }
+
+ public <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException {
+ SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+ requestResponseHandler.sendRequest(request, correlationId, rpcName);
+ T responseObject = (T) requestResponseHandler.getResponse();
+ return responseObject;
+ }
+
+ @Override
+ public void shutdown(boolean isForceShutdown) {
+ coreManager.shutdown(isForceShutdown);
+ }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java
new file mode 100644
index 000000000..32b3ff22f
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public abstract class InvocationManagerFactory {
+ private static IInvocationManager invocationManager = null;
+
+ public static synchronized IInvocationManager getInstance(){
+ if(invocationManager == null){
+ invocationManager = new InvocationManager();
+ }
+ return invocationManager;
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java
new file mode 100644
index 000000000..abd644cea
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java
@@ -0,0 +1,85 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+/** Helper class for wrapping request/response information.
+ */
+public class MessageContext {
+
+ /**
+ * valid values of type are response/error
+ */
+ private String type;
+
+ /**
+ * RPC name
+ */
+ private String rpc;
+
+ /**
+ * correlation ID
+ */
+ private String correlationID;
+
+ /**
+ * partitioner for message bus usage
+ */
+ private String partitioner;
+
+
+ public String getRpc() {
+ return rpc;
+ }
+
+ public void setRpc(String rpc) {
+ this.rpc = rpc;
+ }
+
+ public String getCorrelationID() {
+ return correlationID;
+ }
+
+ public void setCorrelationID(String correlationID) {
+ this.correlationID = correlationID;
+ }
+
+ public String getPartiton() {
+ return partitioner;
+ }
+
+ public void setPartiton(String partitioner) {
+ this.partitioner = partitioner;
+ }
+
+ public void setType(String type){
+ this.type = type;
+ }
+
+ public String getType(){
+ return type;
+ }
+
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java
new file mode 100644
index 000000000..7264e35f8
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+interface RequestResponseHandler {
+
+ /**
+ * sends request, registers handler of response and start timer.
+ * @param request - Request
+ * @param corrId - correlation ID
+ * @param rpcName - RPC name
+ * @throws CoreException - @{@link CoreException}
+ */
+ void sendRequest(String request, String corrId, String rpcName) throws CoreException;
+
+ /**
+ * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is
+ * still existing in core registry @{@link CoreRegistry}, others timeout was occurred .
+ * @param ctx - Message Context @{@link MessageContext}
+ * @param response - Response from backend
+ */
+ void handleResponse(MessageContext ctx, String response);
+
+ /**
+ * handles timeout event
+ */
+ void onTimeOut();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java
new file mode 100644
index 000000000..e7a65766e
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java
@@ -0,0 +1,107 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.concurrent.TimeoutException;
+
+/** Handles sync requests
+ */
+class SyncRequestResponseHandler<T> extends AbstractRequestResponseHandler {
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class);
+ private T responseObject = null;
+ private CoreException coreException = null;
+ private TimeoutException timeoutException = null;
+
+ SyncRequestResponseHandler(String corrID,
+ ICoreResponseHandler callback,
+ CoreManager coreManager){
+ super(corrID, callback, coreManager);
+ }
+
+ /**
+ * Calls API callback for getting response object. in case of complete response notifies consumer
+ * thread for receiving response
+ * @param response - Response
+ * @param type - Type of Response
+ */
+ synchronized void runTask(String response, String type) {
+ try {
+ responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type);
+ } catch (CoreException e) {
+ coreException = e;
+ }
+ if(responseObject != null || coreException != null) {
+ notify();
+ }
+ }
+
+
+ /**
+ * Returns response. goes sleep until coming either timeout event or complete response
+ */
+ public synchronized <T> T getResponse() throws CoreException, TimeoutException {
+ try{
+ if(!isResponseReceived()){
+ wait();
+ }
+ if (coreException != null) {
+ throw coreException;
+ }
+ if ( timeoutException != null) {
+ throw timeoutException;
+ }
+
+ } catch (InterruptedException e) {
+ throw new CoreException(e);
+ } finally{
+ coreManager.unregisterHandler(corrID);
+ coreManager.cancelTimer(corrID);
+ }
+ return (T) responseObject;
+ }
+
+ /**
+ * indicates if a response received
+ * @return
+ */
+ private boolean isResponseReceived() {
+ return responseObject != null;
+ }
+
+ @Override
+ public synchronized void onTimeOut() {
+ LOG.error("sync response handler on timeout correlation ID <" + corrID + ">.");
+ timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID);
+ notify();
+ }
+
+
+
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java
new file mode 100644
index 000000000..e75d33f91
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/** Responsible to ensure synchronous handling of responses and timouts.
+ */
+class TaskQueue implements Runnable{
+
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class);
+
+ private boolean isShutdown;
+
+ synchronized void addTask(Runnable task) throws InterruptedException {
+ queue.put(task);
+ }
+
+ public void run() {
+ Runnable task;
+ while(!Thread.currentThread().isInterrupted() && !isShutdown){
+ try {
+ task = queue.take();
+ task.run();
+ } catch (InterruptedException e) {
+ LOG.error("could not take task from queue", e);
+ } catch (RuntimeException e) {
+ LOG.error("could not run task", e);
+ }
+ LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown);
+ }
+ LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process.");
+ }
+
+ void stopQueue(){
+ isShutdown = true;
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java
new file mode 100644
index 000000000..1d1fc1533
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Creates a task queue pool that reuses a fixed number of threads.
+ * Assigns one thread for each queue.
+ */
+class TaskQueueManager {
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class);
+ private ExecutorService executorService;
+ private final static String DEFAULT_POOL_SIZE = "10";
+ private final static String CLIENT_POOL_SIZE = "client.pool.size";
+ private TaskQueue[] queues;
+ private int poolInt;
+
+ TaskQueueManager(Properties properties){
+ String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE);
+ poolInt = Integer.parseInt(size);
+ this.executorService = Executors.newFixedThreadPool(poolInt);
+ initTaskQueues();
+ }
+
+ private void initTaskQueues(){
+ queues = new TaskQueue[poolInt];
+ for(int i=0; i<poolInt; i++){
+ queues[i] = new TaskQueue();
+ this.executorService.submit(queues[i]);
+ }
+ }
+
+ void submit(String corrID, Runnable task) throws InterruptedException {
+ TaskQueue queue = getTaskQueue(corrID);
+ queue.addTask(task);
+ }
+
+ /**
+ * ensures synchronous handling all responses and timeout belongs to same correlation ID
+ * @param corrID
+ * @return - @{@link TaskQueue}
+ */
+ private TaskQueue getTaskQueue(String corrID){
+ int index = Math.abs(corrID.hashCode()) % poolInt;
+ return queues[index];
+ }
+
+ /**
+ * goes over queues for stopping threads
+ * @throws InterruptedException
+ */
+ void stopQueueManager() throws InterruptedException {
+ for(int i=0; i<poolInt; i++){
+ queues[i].stopQueue();
+ queues[i].addTask(new Runnable() {
+ @Override
+ public void run() {
+ /**
+ * wake up the queue for stopping thread
+ */
+ }
+ });
+ }
+ List<Runnable> listTask = executorService.shutdownNow();
+ if (!executorService.awaitTermination(6, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ LOG.info("the amount of tasks that never commenced execution " + listTask.size());
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java
new file mode 100644
index 000000000..10f664c65
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java
@@ -0,0 +1,82 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+class TimerServiceImpl implements ITimerService {
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class);
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+ private final ConcurrentHashMap<String, Future> timeOutEvents = new ConcurrentHashMap<>();
+ private final long responseTimeout;
+
+ TimerServiceImpl(long responseTimeout) {
+ this.responseTimeout = responseTimeout;
+ }
+
+ @Override
+ public synchronized void cancel(String correlationID) {
+ Future timeOutEvent = timeOutEvents.remove(correlationID);
+ if (timeOutEvent != null){
+ timeOutEvent.cancel(true);
+ }
+ }
+
+ @Override
+ public synchronized void add(String correlationID, ITimeoutHandler handler) {
+ Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS);
+ timeOutEvents.put(correlationID, timeOutEvent);
+ }
+
+ @Override
+ public void shutdown() {
+ List<Runnable> listTask = scheduler.shutdownNow();
+ LOG.info("the amount of tasks that never commenced execution " + listTask.size());
+ }
+
+ private class HandleTimeout implements Runnable {
+
+ String correlationID;
+ ITimeoutHandler handler;
+
+ HandleTimeout(String correlationID, ITimeoutHandler handler) {
+ this.correlationID = correlationID;
+ this.handler = handler;
+ }
+
+ @Override
+ public void run(){
+ System.out.println("Timeout event of request " + correlationID);
+ handler.onTimeout();
+ timeOutEvents.remove(correlationID);
+ }
+ }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java
new file mode 100644
index 000000000..1415514ac
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java
@@ -0,0 +1,77 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+class APPCMessageReaderWriter implements MessageReader, MessageWriter {
+
+ private final ObjectMapper mapper;
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class);
+
+ APPCMessageReaderWriter() {
+ mapper = new ObjectMapper();
+ }
+
+ public String read(String payload, MessageContext context) throws ProtocolException {
+ try {
+ ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class);
+ context.setType(protocolMessage.getType());
+ context.setRpc(protocolMessage.getRpcName());
+ context.setCorrelationID(protocolMessage.getCorrelationID());
+ context.setPartiton(protocolMessage.getPartition());
+ String body = protocolMessage.getBody().toString();
+ LOG.debug("Received body : <" + body + ">");
+ return body;
+ } catch (IOException e) {
+ throw new ProtocolException(e);
+ }
+
+ }
+
+ public String write(String payload, MessageContext context) throws ProtocolException {
+ try {
+ ProtocolMessage protocolMessage = new ProtocolMessage();
+ protocolMessage.setVersion("2.0");
+ protocolMessage.setType(context.getType());
+ protocolMessage.setRpcName(context.getRpc());
+ protocolMessage.setCorrelationID(context.getCorrelationID());
+ protocolMessage.setPartition(context.getPartiton());
+ JsonNode body = mapper.readTree(payload);
+ protocolMessage.setBody(body);
+ String message = mapper.writeValueAsString(protocolMessage);
+ return message;
+ } catch (IOException e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java
new file mode 100644
index 000000000..6d42d7cf4
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+
+public interface AsyncProtocol extends Protocol {
+
+ /**
+ * sends a string message to underlying message bus/java API
+ * @param payload - meesage body
+ * @param context - message headers
+ * @throws ProtocolException
+ */
+ void sendRequest(String payload, MessageContext context) throws ProtocolException;
+
+ void shutdown();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java
new file mode 100644
index 000000000..a20c2a0cb
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java
@@ -0,0 +1,157 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+class AsyncProtocolImpl implements AsyncProtocol {
+
+ /**
+ * message bus listener thread handler
+ */
+ private Future listenerHandler;
+ /**
+ * called when messages are fetched - called for a single message
+ */
+ private RetrieveMessageCallback callback;
+ /**
+ * message bus client used to send/fetch
+ */
+ private MessagingService messageService;
+ /**
+ * Message reader used to extract body and context from reponse message
+ */
+ private MessageReader messageReader;
+ /**
+ * Message writer used to construct meesage from body and context
+ */
+ private MessageWriter messageWriter;
+
+ /**
+ * shutdown indicator
+ */
+ private boolean isShutdown = false;
+
+ /**
+ * executor service for listener usage
+ */
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+
+
+ AsyncProtocolImpl() {
+
+ messageService = new UEBMessagingService();
+ messageReader = new APPCMessageReaderWriter();
+ messageWriter = (MessageWriter) messageReader;
+ }
+
+ public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
+
+ if (callback == null) {
+ throw new ProtocolException("Callback param should not be null!");
+ }
+ this.callback = callback;
+
+ try {
+ messageService.init(props);
+ //get message bus listener thread
+ //start the thread after initializing services
+ listenerHandler = executorService.submit(new Listener());
+ } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+ public void sendRequest(String payload, MessageContext context) throws ProtocolException {
+
+ //get message to be sent to appc from payload and context
+ String message = messageWriter.write(payload, context);
+ try {
+ messageService.send(context.getPartiton(), message);
+ LOG.debug("Successfully send message: " + message);
+ } catch (IOException e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ isShutdown = true;
+ messageService.close();
+ LOG.warn("The protocol layer in shutdown stage.");
+ executorService.shutdownNow();
+ }
+
+ public class Listener implements Runnable {
+
+
+ public void run() {
+
+ while (!isShutdown) {
+ List<String> messages = new ArrayList<>();
+ try {
+ messages = messageService.fetch();
+ LOG.debug("Successfully fetched " + messages.size() + " messages");
+ } catch (IOException e) {
+ LOG.error("Fetching " + messages.size() + " messages failed");
+ }
+ for (String message : messages) {
+
+ MessageContext context = new MessageContext();
+ String payload = null;
+
+ try {
+ //get payload and context from message to be sent to core layer
+ payload = messageReader.read(message, context);
+ LOG.debug("Got body: " + payload);
+ //call core layer response handler
+ if(!isShutdown) {
+ callback.onResponse(payload, context);
+ }else{
+ LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
+ context.getCorrelationID() + "> response ", message);
+ }
+ } catch (ProtocolException e) {
+ LOG.error("Failed to read message from UEB. message is: " + message);
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java
new file mode 100644
index 000000000..ec9606c19
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java
@@ -0,0 +1,56 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+interface Consumer {
+
+ /**
+ * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty.
+ *
+ * @return A list of strings representing the messages pulled from the topic.
+ * @throws IOException
+ */
+ List<String> fetch() throws IOException;
+
+ /**
+ * Gets a batch of messages from the topic.
+ *
+ * @param limit The amount of messages to fetch
+ * @return A list of strings representing the messages pulled from the topic.
+ * @throws IOException
+ */
+ List<String> fetch(int limit) throws IOException;
+
+ /**
+ * Send dummy fetch request to register client to be able to fetch messages
+ * @throws IOException
+ */
+ void registerForRead() throws IOException;
+
+ void close();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java
new file mode 100644
index 000000000..99d884e98
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+class ConsumerImpl implements Consumer {
+
+ private static final int DEFAULT_LIMIT = 1000;
+
+ private Collection<String> hosts;
+ private String topic;
+ private String group;
+ private String groupId;
+ private int timeout;
+
+ private String authKey;
+ private String authSecret;
+
+ private CambriaConsumer consumer = null;
+
+ /**
+ * constructor
+ * @param urls
+ * @param topicName
+ * @param consumerName
+ * @param consumerId
+ * @param timeout
+ */
+ public ConsumerImpl(Collection<String> urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+ this.hosts = urls;
+ this.topic = topicName;
+ this.group = consumerName;
+ this.groupId = consumerId;
+ this.authKey = apiKey;
+ this.authSecret = apiSecret;
+ this.timeout = timeout;
+ consumer = getConsumer();
+ }
+
+
+ public List<String> fetch() throws IOException {
+
+ return fetch(DEFAULT_LIMIT);
+ }
+
+ public List<String> fetch(int limit) throws IOException {
+
+ List<String> out = new ArrayList<String>();
+ try {
+ for(String msg : consumer.fetch(timeout,limit)){
+ out.add(msg);
+ }
+ } catch (IOException e) {
+ throw e;
+ }
+ return out;
+ }
+
+ public void registerForRead() throws IOException {
+
+ int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages
+ consumer.fetch(waitForRegisteration, 1);
+ }
+
+ /**
+ * init cambria consumer
+ * @return CambriaConsumer
+ */
+ private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+
+ ConsumerBuilder builder = new ConsumerBuilder();
+
+ builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId);
+ builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout);
+ builder.receivingAtMost(DEFAULT_LIMIT);
+
+ // Add credentials if provided
+ if (authKey != null && authSecret != null) {
+
+ Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey");
+ apiKeyField.setAccessible(true);
+ apiKeyField.set(builder, "");
+ builder.authenticatedBy(authKey, authSecret);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java
new file mode 100644
index 000000000..682220a2d
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+
+public interface MessageReader {
+
+ /**
+ * reads payload, fills the context out of payload headers, and returns the body of the payload
+ * @param payload incoming message
+ * @param context context to fill
+ * @return body of the payload
+ * @throws ProtocolException
+ */
+ String read(String payload, MessageContext context) throws ProtocolException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java
new file mode 100644
index 000000000..a19fd3fb4
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import com.fasterxml.jackson.databind.JsonNode;
+
+public interface MessageWriter {
+
+ /**
+ * builds a message out of context and payload
+ * @param payload body of the message
+ * @param context headers of the message
+ * @return the message to write/send
+ * @throws ProtocolException
+ */
+ String write(String payload, MessageContext context) throws ProtocolException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java
new file mode 100644
index 000000000..791e1224b
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.Properties;
+
+interface MessagingService {
+
+ /**
+ * initialize consumer/publisher
+ * @param props
+ */
+ void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException;
+
+ /**
+ * sends a string as is
+ * @param partition
+ * @param body
+ */
+ void send(String partition, String body) throws IOException;
+
+ /**
+ * retrieve messages from bus - timeout extracted from props or see impl
+ * @return
+ */
+ List<String> fetch() throws IOException;
+
+ /**
+ * retrieve messages from bus - timeout extracted from props or see impl
+ * @param limit
+ * @return
+ */
+ List<String> fetch(int limit) throws IOException;
+
+ void close();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java
new file mode 100644
index 000000000..2f2490a54
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.io.IOException;
+
+interface Producer {
+
+ /**
+ * send a message to a partition via ueb
+ * @param data
+ */
+ void post(String Partition, String data) throws IOException;
+
+ void close();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java
new file mode 100644
index 000000000..1455fd922
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java
@@ -0,0 +1,82 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.Collection;
+
+class ProducerImpl implements Producer {
+
+ private Collection<String> hosts;
+ private String topic;
+ private CambriaBatchingPublisher producer;
+
+ private String authKey;
+ private String authSecret;
+
+ public ProducerImpl(Collection<String> urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException {
+
+ topic = topicName;
+ hosts = urls;
+ authKey = apiKey;
+ authSecret = apiSecret;
+ producer = getProducer();
+ }
+
+ public void post(String partition, String data) throws IOException {
+
+ producer.send(partition, data);
+ }
+
+ /**
+ * get cambria producer
+ * @return
+ */
+ private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException {
+
+ PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts);
+
+ // Add credentials if provided
+ if (authKey != null && authSecret != null) {
+ builder.authenticatedBy(authKey, authSecret);
+ }
+
+ CambriaBatchingPublisher client = null;
+
+ client = builder.onTopic(topic).build();
+
+ return client;
+ }
+
+ @Override
+ public void close() {
+ producer.close();
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java
new file mode 100644
index 000000000..19e7d6024
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.util.Properties;
+
+public interface Protocol {
+
+ /**
+ * init protocol properties and callback
+ * @param props
+ * @param callback
+ * @throws ProtocolException
+ */
+ void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java
new file mode 100644
index 000000000..87e1318ee
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+public class ProtocolException extends Exception {
+
+ public ProtocolException() {
+ super();
+ }
+
+ public ProtocolException(String message) {
+ super(message);
+ }
+
+ public ProtocolException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProtocolException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java
new file mode 100644
index 000000000..f9364e192
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProtocolFactory {
+
+ private static ProtocolFactory instance;
+ private Map<ProtocolType,Protocol> protocols;
+
+ /**
+ * Singleton factory
+ */
+ private ProtocolFactory(){
+
+ protocols = new HashMap<ProtocolType, Protocol>();
+ }
+
+ /**
+ * get factory instance
+ * @return factory instance
+ */
+ public static synchronized ProtocolFactory getInstance(){
+
+ if (instance == null) {
+ instance = new ProtocolFactory();
+ }
+ return instance;
+ }
+
+ /**
+ * returns instantiated protocol object
+ * @param type of protocol object
+ * @return protocol object
+ */
+ public Protocol getProtocolObject(ProtocolType type) throws ProtocolException {
+
+ Protocol protocol = protocols.get(type);
+ synchronized (this) {
+ if (protocol == null) {
+ switch (type) {
+ case SYNC:
+ throw new ProtocolException("Protocol SYNC is not implemented");
+ case ASYNC:
+ protocol = new AsyncProtocolImpl();
+ protocols.put(type, protocol);
+ break;
+ default:
+ throw new ProtocolException("Protocol type not found");
+ }
+ }
+ }
+ return protocol;
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java
new file mode 100644
index 000000000..161a590a0
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+
+class ProtocolMessage {
+
+ private String version;
+ private String type;
+ private String rpcName;
+ private String correlationID; // correlation-id
+ private String partition; // cambria.partition
+ private JsonNode body;
+
+ @JsonProperty
+ String getVersion() {
+ return version;
+ }
+
+ @JsonProperty
+ void setVersion(String version) {
+ this.version = version;
+ }
+
+ @JsonProperty
+ String getType() {
+ return type;
+ }
+
+ @JsonProperty
+ void setType(String type) {
+ this.type = type;
+ }
+
+ @JsonProperty("rpc-name")
+ String getRpcName() {
+ return rpcName;
+ }
+
+ @JsonProperty("rpc-name")
+ void setRpcName(String rpcName) {
+ this.rpcName = rpcName;
+ }
+
+ @JsonProperty("correlation-id")
+ String getCorrelationID() {
+ return correlationID;
+ }
+
+ @JsonProperty("correlation-id")
+ void setCorrelationID(String correlationID) {
+ this.correlationID = correlationID;
+ }
+
+ @JsonProperty("cambria.partition")
+ String getPartition() {
+ return partition;
+ }
+
+ @JsonProperty("cambria.partition")
+ void setPartition(String partition) {
+ this.partition = partition;
+ }
+
+ @JsonProperty
+ JsonNode getBody() {
+ return body;
+ }
+
+ @JsonProperty
+ void setBody(JsonNode body) {
+ this.body = body;
+ }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java
new file mode 100644
index 000000000..133fbf8ad
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+public enum ProtocolType {
+
+ SYNC, ASYNC;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java
new file mode 100644
index 000000000..46105553a
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+
+public interface RetrieveMessageCallback {
+
+ /**
+ * called when response received
+ * @param payload
+ * @param context
+ */
+ void onResponse(String payload, MessageContext context);
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java
new file mode 100644
index 000000000..22563f68a
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java
@@ -0,0 +1,102 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.*;
+
+class UEBMessagingService implements MessagingService {
+
+ private Consumer consumer;
+ private Producer producer;
+
+ private final String DEFAULT_READ_TIMEOUT_MS = "60000";
+ private final String DEFAULT_READ_LIMIT = "1000";
+
+ private int readLimit;
+
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class);
+
+ @SuppressWarnings("Since15")
+ public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+
+ if (props != null) {
+ String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ);
+ String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE);
+ String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER);
+ String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET);
+ String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS);
+ Integer readTimeout = Integer.parseInt(readTimeoutString);
+ String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT);
+ readLimit = Integer.parseInt(readLimitString);
+ //get hosts pool
+ Collection<String> pool = new HashSet<String>();
+ String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS);
+ if (hostNames != null && !hostNames.isEmpty()) {
+ for (String name : hostNames.split(",")) {
+ pool.add(name);
+ }
+ }
+
+ //generate consumer id and group - same value for both
+ String consumerName = UUID.randomUUID().toString();
+ String consumerID = consumerName;
+
+ //create consumer and producer
+ consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret);
+ producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret);
+
+ //initial consumer registration
+ try {
+ consumer.registerForRead();
+ }catch(Exception e){
+ LOG.error("Message consumer failed to register client "+consumerID);
+ }
+ }
+ }
+
+ public void send(String partition, String body) throws IOException {
+ producer.post(partition, body);
+ }
+
+ public List<String> fetch() throws IOException {
+ return consumer.fetch(readLimit);
+ }
+
+ public List<String> fetch(int limit) throws IOException {
+ return consumer.fetch(limit);
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ producer.close();
+ }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java
new file mode 100644
index 000000000..8bbcaf404
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+class UEBPropertiesKeys {
+
+ static final String TOPIC_READ = "topic.read";
+ static final String TOPIC_READ_TIMEOUT = "topic.read.timeout";
+ static final String READ_LIMIT = "topic.read.limit";
+ static final String TOPIC_WRITE = "topic.write";
+ static final String AUTH_USER = "client.key";
+ static final String AUTH_SECRET = "client.secret";
+ static final String HOSTS = "poolMembers";
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java
new file mode 100644
index 000000000..d1720f03b
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java
@@ -0,0 +1,163 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import org.openecomp.appc.client.impl.core.AsyncRequestResponseHandler;
+import org.openecomp.appc.client.impl.core.CoreException;
+import org.openecomp.appc.client.impl.core.CoreManager;
+import org.openecomp.appc.client.impl.core.ICoreAsyncResponseHandler;
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.junit.Before;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.mock;
+
+public class ResponseManagerTest {
+
+ private final ExecutorService executorService = Executors.newFixedThreadPool(10);
+ ICoreAsyncResponseHandler listener1 = new ListenerImpl();
+ ICoreAsyncResponseHandler listener2 = new SleeepListenerImpl();
+ ICoreAsyncResponseHandler listener3 = new ListenerImpl();
+ CoreManager coreManager = null;
+
+ public void initialize() throws CoreException {
+ Properties prop = new Properties();
+ prop.setProperty("client.pool.size", "10");
+ prop.setProperty("client.response.timeout", "7000");
+ coreManager = new ResponseManagerTest.CoreManagerTest(prop);
+ }
+
+ void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException {
+ AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+ requestResponseHandler.sendRequest(request, correlationId, rpcName);
+ }
+
+ public void simpleResponseTest() throws Exception {
+ System.out.println("simpleResponseTest");
+ asyncRequest("request 1", listener1,"vasia1", "test");
+ MessageContext msgCtx = new MessageContext();
+ msgCtx.setCorrelationID("vasia1");
+ msgCtx.setType("response");
+ coreManager.getProtocolCallback().onResponse("vasia1 response",msgCtx);
+ coreManager.getProtocolCallback().onResponse("vasia2 response",msgCtx);
+ Thread.sleep(10);
+ }
+
+ public void twoResponseTest() throws Exception {
+ System.out.println("twoResponseTest");
+ asyncRequest("twoResponseTest request 1", listener2,"vasia2", "test");
+ MessageContext msgCtx = new MessageContext();
+ msgCtx.setCorrelationID("vasia2");
+ msgCtx.setType("response");
+ coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx);
+ Thread.sleep(100);
+ asyncRequest("twoResponseTest request 2", listener1,"vasia1", "test");
+ MessageContext msgCtx2 = new MessageContext();
+ msgCtx2.setCorrelationID("vasia1");
+ msgCtx2.setType("response");
+ coreManager.getProtocolCallback().onResponse("first of vasia1",msgCtx2);
+ Thread.sleep(150);
+ }
+
+ public void threeResponseTest() throws Exception {
+ System.out.println("treeResponseTest");
+ asyncRequest("threeResponseTest request 2", listener1,"vasia4", "test");
+ asyncRequest("threeResponseTest request 1", listener2,"vasia2", "test");
+ MessageContext msgCtx2 = new MessageContext();
+ msgCtx2.setCorrelationID("vasia2");
+ msgCtx2.setType("response");
+ coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx2);
+
+ asyncRequest("threeResponseTest request 2", listener1,"vasia1", "test");
+ MessageContext msgCtx1 = new MessageContext();
+ msgCtx1.setCorrelationID("vasia1");
+ msgCtx1.setType("response");
+ coreManager.getProtocolCallback().onResponse("vasia1",msgCtx1);
+
+ asyncRequest("threeResponseTest request 3", listener3,"vasia3", "test");
+ MessageContext msgCtx3 = new MessageContext();
+ msgCtx3.setCorrelationID("vasia3");
+ msgCtx3.setType("response");
+ coreManager.getProtocolCallback().onResponse("three1",msgCtx3);
+
+ coreManager.getProtocolCallback().onResponse("three2", msgCtx3);
+
+ coreManager.getProtocolCallback().onResponse("first1", msgCtx1);
+ Thread.sleep(250);
+
+ coreManager.getProtocolCallback().onResponse("first2", msgCtx1);
+ Thread.sleep(10000);
+ }
+
+ private class ListenerImpl implements ICoreAsyncResponseHandler{
+
+ public boolean onResponse(String message, String type) {
+ System.out.println("callback " + message);
+ return message != null;
+ }
+
+ @Override
+ public void onException(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class SleeepListenerImpl implements ICoreAsyncResponseHandler{
+
+ public boolean onResponse(String message, String type) {
+ try {
+ Thread.sleep(150);
+ System.out.println("sleep callback " + message);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return message != null;
+ }
+
+ @Override
+ public void onException(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ class CoreManagerTest extends CoreManager{
+ CoreManagerTest(Properties properties) throws CoreException {
+ super(properties);
+ protocol = mock(AsyncProtocol.class);
+ }
+ protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException {
+ }
+
+ protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException {
+
+ }
+ }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java
new file mode 100644
index 000000000..4b4bce578
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import org.openecomp.appc.client.impl.core.CoreException;
+import org.openecomp.appc.client.impl.core.CoreManager;
+import org.openecomp.appc.client.impl.core.ICoreSyncResponseHandler;
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.core.SyncRequestResponseHandler;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Mockito.mock;
+
+public class SyncFlowTest {
+ CoreManager coreManager = null;
+
+ public void initialize() throws CoreException {
+ Properties prop = new Properties();
+ prop.setProperty("client.pool.size", "10");
+ prop.setProperty("client.response.timeout", "7000");
+ coreManager = new CoreManagerTest(prop);
+ }
+
+ <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException {
+ SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+ requestResponseHandler.sendRequest(request, correlationId, rpcName);
+ T responseObject = (T) requestResponseHandler.getResponse();
+ return responseObject;
+ }
+
+ public void blockRequestTest(){
+ ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1();
+ try {
+ syncRequest("request 1", handler, "vasia1", "test");
+ }catch (Throwable e){
+ e.printStackTrace();
+ Assert.assertTrue(e != null);
+ }
+
+ }
+
+ public <T> void blockRequestSucceedTest() throws InterruptedException {
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1();
+ try {
+ executorService.submit(new Runnable() {
+ public void run() {
+ System.out.println("Send request");
+ T response;
+ try {
+ response = syncRequest("request 1", handler, "vasia1", "test");
+ System.out.println("=======" + response.toString());
+ } catch (CoreException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }catch (Throwable e){
+ Assert.assertTrue((RuntimeException)e != null);
+ }
+ Thread.sleep(2000);
+ executorService.submit(new Runnable() {
+ public void run() {
+ MessageContext ctx = new MessageContext();
+ ctx.setCorrelationID("vasia1");
+ ctx.setType("response");
+ try {
+ System.out.println("Send response 1");
+ coreManager.getProtocolCallback().onResponse("response for request 1", ctx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Thread.sleep(2000);
+ executorService.submit(new Runnable() {
+ public void run() {
+ MessageContext ctx = new MessageContext();
+ ctx.setCorrelationID("vasia1");
+ ctx.setType("response");
+ try {
+ System.out.println("Send response 2");
+ coreManager.getProtocolCallback().onResponse("response for request 1 final", ctx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ Thread.sleep(1000);
+
+ }
+
+ class ICoreSyncResponseHandlerImpl1 implements ICoreSyncResponseHandler{
+
+
+ public <T> T onResponse(String message, String type) {
+ System.out.println("Received message = " + message) ;
+ if(message.contains("final")){
+ return (T) new String(message);
+ }
+ return null;
+ }
+ }
+
+ class CoreManagerTest extends CoreManager{
+ CoreManagerTest(Properties properties) throws CoreException {
+ super(properties);
+ protocol = mock(AsyncProtocol.class);
+ }
+ protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException {
+ }
+
+ protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException{
+
+ }
+ }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java
new file mode 100644
index 000000000..91f615720
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.APPCMessageReaderWriter;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+public class APPCMessageReaderWriterTest {
+
+ private APPCMessageReaderWriter messageReaderWriter;
+ private ObjectMapper mapper;
+
+ private static final String VERSION = "2.0";
+ private static final String TYPE = "typeTest";
+ private static final String CORRELATION_ID = "correlationIdTest";
+ private static final String PARTITION = "partitionTest";
+ private static final String RPC = "rpcTest";
+ private static final String PAYLOAD = "{\"key1\":\"val1\",\"key2\":\"val2\",\"key3\":{\"key3.1\":\"val3.1\"}}";
+
+ @Before
+ public void init() throws IOException {
+ mapper = new ObjectMapper();
+ messageReaderWriter = new APPCMessageReaderWriter();
+ }
+
+ @Test
+ public void writeTest() throws IOException, ProtocolException {
+ MessageContext context = new MessageContext();
+ context.setType(TYPE);
+ context.setCorrelationID(CORRELATION_ID);
+ context.setPartiton(PARTITION);
+ context.setRpc(RPC);
+ String payload = PAYLOAD;
+ String message = messageReaderWriter.write(payload, context);
+
+ JsonNode messageJson = mapper.readTree(message);
+ Assert.assertEquals(VERSION, messageJson.get("version").asText());
+ Assert.assertEquals(context.getType(), messageJson.get("type").asText());
+ Assert.assertEquals(context.getCorrelationID(), messageJson.get("correlation-id").asText());
+ Assert.assertEquals(context.getPartiton(), messageJson.get("cambria.partition").asText());
+ Assert.assertEquals(context.getRpc(), messageJson.get("rpc-name").asText());
+ Assert.assertEquals(payload, messageJson.get("body").toString());
+ }
+
+ @Test
+ public void readTest() throws IOException, ProtocolException {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("version", VERSION);
+ node.put("type", TYPE);
+ node.put("correlation-id", CORRELATION_ID);
+ node.put("cambria.partition", PARTITION);
+ node.put("rpc-name", RPC);
+ JsonNode payload = mapper.valueToTree(PAYLOAD);
+ node.set("body", payload);
+ String message = node.toString();
+
+ MessageContext returnContext = new MessageContext();
+ String returnPayload = messageReaderWriter.read(message, returnContext);
+
+ Assert.assertEquals(TYPE, returnContext.getType());
+ Assert.assertEquals(CORRELATION_ID, returnContext.getCorrelationID());
+ Assert.assertEquals(PARTITION, returnContext.getPartiton());
+ Assert.assertEquals(RPC, returnContext.getRpc());
+ Assert.assertEquals(payload.toString(), returnPayload);
+ }
+
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java
new file mode 100644
index 000000000..046f5a814
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java
@@ -0,0 +1,91 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocolImpl;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.openecomp.appc.client.impl.protocol.UEBPropertiesKeys;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestAsyncProtocolImpl {
+
+ private static AsyncProtocol protocol;
+ private static AtomicBoolean gotResponse;
+ private static Properties props;
+
+ private static class TestCallback implements RetrieveMessageCallback{
+
+ public void onResponse(String payload, MessageContext context) {
+ Assert.assertNotEquals(null, payload);
+ Assert.assertNotEquals(null, context);
+ protocol = null;
+ gotResponse.set(true);
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws IOException, ProtocolException {
+
+ gotResponse = new AtomicBoolean(false);
+
+ props = new Properties();
+ String propFileName = "ueb.properties";
+
+ InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName);
+
+ props.load(input);
+
+ protocol = new AsyncProtocolImpl();
+ protocol.init(props, new TestCallback());
+ }
+
+ public void testSendRequest() throws ProtocolException {
+
+ MessageContext context = new MessageContext();
+ context.setType("Test");
+
+ protocol.sendRequest("{\"Test\":\"\"}", context);
+
+ try {
+ Long timeToSleep = Long.parseLong((String)props.get(UEBPropertiesKeys.TOPIC_READ_TIMEOUT))*2;
+ Thread.sleep(timeToSleep);
+ } catch (InterruptedException e) {
+ Assert.assertFalse(e.getMessage(), false);
+ }
+ if (gotResponse.get() == false) {
+ Assert.assertFalse("Message was not read !", true);
+ }
+ }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java
new file mode 100644
index 000000000..9611a7900
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocolImpl;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class TestAsyncProtocolImplMissingProps {
+
+ private static AsyncProtocol protocol;
+
+ private static class TestCallback implements RetrieveMessageCallback {
+
+ public void onResponse(String payload, MessageContext context) {
+ Assert.assertFalse("bad Callback !",false);
+ }
+ }
+
+ @Test
+ /**
+ * protocol should throw illegal argument exception due to null properties
+ */
+ public void testSetUpMissingProps() {
+
+ Properties props = new Properties();
+ String propFileName = "ueb.missing.properties";
+
+ InputStream input = TestAsyncProtocolImplMissingProps.class.getClassLoader().getResourceAsStream(propFileName);
+
+ try {
+ props.load(input);
+ } catch (IOException e) {
+ Assert.assertFalse(e.getMessage(),false);
+ }
+
+ protocol = new AsyncProtocolImpl();
+ try {
+ protocol.init(props, new TestCallback());
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(true);
+ } catch (Exception e) {
+ Assert.assertFalse(e.getMessage(),false);
+ }
+ }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java
new file mode 100644
index 000000000..e537037f7
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java
@@ -0,0 +1,60 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocolImpl;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class TestAsyncProtocolImplNullCallback {
+
+ private static AsyncProtocol protocol;
+
+ public void testSetUpNoCallback() throws IOException {
+
+ Properties props = new Properties();
+ String propFileName = "ueb.properties";
+
+ InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName);
+
+ props.load(input);
+
+ protocol = new AsyncProtocolImpl();
+
+ try {
+ protocol.init(props, null);
+ } catch (ProtocolException e) {
+ Assert.assertTrue(true);
+ } catch (Exception e){
+ Assert.assertFalse(e.getMessage(),false);
+ }
+ }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java
new file mode 100644
index 000000000..109065ab4
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.junit.*;
+import org.junit.runners.MethodSorters;
+import org.openecomp.appc.client.impl.protocol.MessagingService;
+import org.openecomp.appc.client.impl.protocol.UEBMessagingService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.Properties;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestUEBMessagingService {
+
+ private static MessagingService ueb;
+
+ public static void setUp() throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+
+ Properties props = new Properties();
+ String propFileName = "ueb.properties";
+
+ InputStream input = TestUEBMessagingService.class.getClassLoader().getResourceAsStream(propFileName);
+
+ props.load(input);
+
+ ueb = new UEBMessagingService();
+ ueb.init(props);
+ }
+
+ public void test1Send() throws IOException {
+ System.out.println("Here");
+
+ String message = "Test Message Service";
+ ueb.send(null,message);
+ }
+
+ public void test2Fetch() throws IOException {
+
+ System.out.println("Here2");
+ List<String> messages = ueb.fetch(1);
+ Assert.assertEquals(1,messages.size());
+ }
+
+}
diff --git a/appc-client/client-lib/src/test/resources/ueb.missing.properties b/appc-client/client-lib/src/test/resources/ueb.missing.properties
new file mode 100644
index 000000000..54ba58284
--- /dev/null
+++ b/appc-client/client-lib/src/test/resources/ueb.missing.properties
@@ -0,0 +1,30 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+# ============LICENSE_END=========================================================
+###
+
+topic.read=APPC-UNIT-TEST
+topic.read.timeout=60000
+topic.write=APPC-UNIT-TEST
+client.name=APPC-CLIENT
+client.id=0
+
diff --git a/appc-client/client-lib/src/test/resources/ueb.properties b/appc-client/client-lib/src/test/resources/ueb.properties
new file mode 100644
index 000000000..026d5f4e8
--- /dev/null
+++ b/appc-client/client-lib/src/test/resources/ueb.properties
@@ -0,0 +1,29 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+# ============LICENSE_END=========================================================
+###
+
+poolMembers=10.147.29.67:3904
+topic.read=APPC-TEST1
+topic.read.timeout=2500
+topic.write=APPC-TEST1
+