From 781b1a6df324419c846c84ea983c18fc8362bfd3 Mon Sep 17 00:00:00 2001 From: Patrick Brady Date: Wed, 13 Dec 2017 11:19:06 -0800 Subject: Third part of onap rename This part of the commit changes the folder structure on all other folders of appc. Change-Id: I8acfa11cdfcdcd36be0e137245d1dd7324f1abd3 Signed-off-by: Patrick Brady Issue-ID: APPC-13 --- .../impl/core/AbstractRequestResponseHandler.java | 85 ++++++ .../impl/core/AsyncRequestResponseHandler.java | 76 +++++ .../onap/appc/client/impl/core/CoreException.java | 45 +++ .../onap/appc/client/impl/core/CoreManager.java | 314 +++++++++++++++++++++ .../onap/appc/client/impl/core/CoreRegistry.java | 70 +++++ .../impl/core/ICoreAsyncResponseHandler.java | 43 +++ .../client/impl/core/ICoreResponseHandler.java | 28 ++ .../client/impl/core/ICoreSyncResponseHandler.java | 36 +++ .../appc/client/impl/core/IInvocationManager.java | 68 +++++ .../appc/client/impl/core/ITimeoutHandler.java | 33 +++ .../onap/appc/client/impl/core/ITimerService.java | 47 +++ .../appc/client/impl/core/InvocationManager.java | 69 +++++ .../client/impl/core/InvocationManagerFactory.java | 36 +++ .../onap/appc/client/impl/core/MessageContext.java | 85 ++++++ .../client/impl/core/RequestResponseHandler.java | 50 ++++ .../impl/core/SyncRequestResponseHandler.java | 107 +++++++ .../org/onap/appc/client/impl/core/TaskQueue.java | 65 +++++ .../appc/client/impl/core/TaskQueueManager.java | 98 +++++++ .../appc/client/impl/core/TimerServiceImpl.java | 82 ++++++ .../impl/protocol/APPCMessageReaderWriter.java | 77 +++++ .../appc/client/impl/protocol/AsyncProtocol.java | 40 +++ .../client/impl/protocol/AsyncProtocolImpl.java | 157 +++++++++++ .../onap/appc/client/impl/protocol/Consumer.java | 56 ++++ .../appc/client/impl/protocol/ConsumerImpl.java | 125 ++++++++ .../appc/client/impl/protocol/MessageReader.java | 39 +++ .../appc/client/impl/protocol/MessageWriter.java | 40 +++ .../client/impl/protocol/MessagingService.java | 61 ++++ .../onap/appc/client/impl/protocol/Producer.java | 38 +++ .../appc/client/impl/protocol/ProducerImpl.java | 82 ++++++ .../onap/appc/client/impl/protocol/Protocol.java | 38 +++ .../client/impl/protocol/ProtocolException.java | 44 +++ .../appc/client/impl/protocol/ProtocolFactory.java | 79 ++++++ .../appc/client/impl/protocol/ProtocolMessage.java | 98 +++++++ .../appc/client/impl/protocol/ProtocolType.java | 30 ++ .../impl/protocol/RetrieveMessageCallback.java | 38 +++ .../client/impl/protocol/UEBMessagingService.java | 102 +++++++ .../client/impl/protocol/UEBPropertiesKeys.java | 36 +++ .../impl/core/AbstractRequestResponseHandler.java | 85 ------ .../impl/core/AsyncRequestResponseHandler.java | 76 ----- .../appc/client/impl/core/CoreException.java | 45 --- .../appc/client/impl/core/CoreManager.java | 314 --------------------- .../appc/client/impl/core/CoreRegistry.java | 70 ----- .../impl/core/ICoreAsyncResponseHandler.java | 43 --- .../client/impl/core/ICoreResponseHandler.java | 28 -- .../client/impl/core/ICoreSyncResponseHandler.java | 36 --- .../appc/client/impl/core/IInvocationManager.java | 68 ----- .../appc/client/impl/core/ITimeoutHandler.java | 33 --- .../appc/client/impl/core/ITimerService.java | 47 --- .../appc/client/impl/core/InvocationManager.java | 69 ----- .../client/impl/core/InvocationManagerFactory.java | 36 --- .../appc/client/impl/core/MessageContext.java | 85 ------ .../client/impl/core/RequestResponseHandler.java | 50 ---- .../impl/core/SyncRequestResponseHandler.java | 107 ------- .../openecomp/appc/client/impl/core/TaskQueue.java | 65 ----- .../appc/client/impl/core/TaskQueueManager.java | 98 ------- .../appc/client/impl/core/TimerServiceImpl.java | 82 ------ .../impl/protocol/APPCMessageReaderWriter.java | 77 ----- .../appc/client/impl/protocol/AsyncProtocol.java | 40 --- .../client/impl/protocol/AsyncProtocolImpl.java | 157 ----------- .../appc/client/impl/protocol/Consumer.java | 56 ---- .../appc/client/impl/protocol/ConsumerImpl.java | 125 -------- .../appc/client/impl/protocol/MessageReader.java | 39 --- .../appc/client/impl/protocol/MessageWriter.java | 40 --- .../client/impl/protocol/MessagingService.java | 61 ---- .../appc/client/impl/protocol/Producer.java | 38 --- .../appc/client/impl/protocol/ProducerImpl.java | 82 ------ .../appc/client/impl/protocol/Protocol.java | 38 --- .../client/impl/protocol/ProtocolException.java | 44 --- .../appc/client/impl/protocol/ProtocolFactory.java | 79 ------ .../appc/client/impl/protocol/ProtocolMessage.java | 98 ------- .../appc/client/impl/protocol/ProtocolType.java | 30 -- .../impl/protocol/RetrieveMessageCallback.java | 38 --- .../client/impl/protocol/UEBMessagingService.java | 102 ------- .../client/impl/protocol/UEBPropertiesKeys.java | 36 --- .../appc/client/impl/core/ResponseManagerTest.java | 163 +++++++++++ .../onap/appc/client/impl/core/SyncFlowTest.java | 151 ++++++++++ .../impl/protocol/APPCMessageReaderWriterTest.java | 104 +++++++ .../impl/protocol/TestAsyncProtocolImpl.java | 91 ++++++ .../TestAsyncProtocolImplMissingProps.java | 75 +++++ .../TestAsyncProtocolImplNullCallback.java | 60 ++++ .../impl/protocol/TestUEBMessagingService.java | 70 +++++ .../appc/client/impl/core/ResponseManagerTest.java | 163 ----------- .../appc/client/impl/core/SyncFlowTest.java | 151 ---------- .../impl/protocol/APPCMessageReaderWriterTest.java | 104 ------- .../impl/protocol/TestAsyncProtocolImpl.java | 91 ------ .../TestAsyncProtocolImplMissingProps.java | 75 ----- .../TestAsyncProtocolImplNullCallback.java | 60 ---- .../impl/protocol/TestUEBMessagingService.java | 70 ----- 88 files changed, 3331 insertions(+), 3331 deletions(-) create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java create mode 100644 appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java delete mode 100644 appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/ResponseManagerTest.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/SyncFlowTest.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriterTest.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImpl.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java create mode 100644 appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestUEBMessagingService.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java delete mode 100644 appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java (limited to 'appc-client/client-lib') diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java new file mode 100644 index 000000000..c5d6120f0 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** Abstract request response handler class, responsible for common functionality of + * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler} + */ +abstract class AbstractRequestResponseHandler implements RequestResponseHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class); + ICoreResponseHandler businessCallback; + protected String corrID; + CoreManager coreManager; + + + AbstractRequestResponseHandler(String corrID, + ICoreResponseHandler businessCallback, + CoreManager coreManager) + { + this.businessCallback = businessCallback; + this.corrID = corrID; + this.coreManager = coreManager; + } + + public synchronized void handleResponse(final MessageContext ctx, final String response) { + try { + coreManager.submitTask(ctx.getCorrelationID(), new Runnable() { + @Override + public void run() { + LOG.info("handling response of corrID <" + corrID + ">" + "response " + response); + if(coreManager.isExistHandler(corrID)) { + runTask(response, ctx.getType()); + } + + } + }); + } catch (InterruptedException e) { + LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e); + } + } + + /** + * + * @param response - Response + * @param type - Type of Response + */ + abstract void runTask(String response, String type); + + @Override + public void sendRequest(String request, String corrId, String rpcName) throws CoreException { + if(!coreManager.isShutdownInProgress()) { + coreManager.registerHandler(corrId, this); + coreManager.sendRequest(request, corrId, rpcName); + coreManager.startTimer(corrId); + }else{ + throw new CoreException("Shutdown is in progress. Request will not be handled"); + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java new file mode 100644 index 000000000..1bcf0af99 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.concurrent.TimeoutException; + +/** Handles async responses + */ +class AsyncRequestResponseHandler extends AbstractRequestResponseHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class); + + AsyncRequestResponseHandler(String corrID, + ICoreResponseHandler businessCallback, + CoreManager coreManager) + { + super(corrID, businessCallback, coreManager); + } + + /** + * Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and + * unregisters the handler. + * @param response - Response + * @param type - Type of Response + */ + public void runTask(String response, String type) { + boolean finalTask = false; + try { + finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type); + } catch (Exception e){ + LOG.error("Error on API layer, for request with correlation-id " + corrID, e); + } + if (finalTask){ + coreManager.cancelTimer(corrID); + coreManager.unregisterHandler(corrID); + } + else{ + response = null; + type = null; + } + } + + /** + * Calls to API layer for sending timeout exception. + */ + @Override + public void onTimeOut() { + LOG.info("timeout for request with correlation-id " + corrID); + ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID)); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java new file mode 100644 index 000000000..a2bf6e250 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + + +public class CoreException extends Exception { + + public CoreException() { + super(); + } + + public CoreException(String message) { + super(message); + } + + public CoreException(String message, Throwable cause) { + super(message, cause); + } + + public CoreException(Throwable cause) { + super(cause); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java new file mode 100644 index 000000000..c1c23890e --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java @@ -0,0 +1,314 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import org.onap.appc.client.impl.protocol.*; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events. + */ +class CoreManager{ + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class); + private final ProtocolFactory protocolFactory; + protected AsyncProtocol protocol; + private final RetrieveMessageCallback protocolCallback = null; + private final CoreRegistry registry; + private final ITimerService timerService; + private final TaskQueueManager queueManager; + private String DEFAULT_TIMEOUT = "300000"; + private final static String RESPONSE_TIMEOUT = "client.response.timeout"; + private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout"; + private boolean isForceShutdown = false; + private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false); + private long shutdownTimeout; + + CoreManager(Properties prop) throws CoreException { + protocolFactory = ProtocolFactory.getInstance(); + try { + initProtocol(prop); + }catch (ProtocolException e){ + throw new CoreException(e); + } + registry = new CoreRegistry(new EmptyRegistryCallbackImpl()); + String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT); + long responseTimeout = Long.parseLong(timeoutProp); + String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT); + shutdownTimeout = Long.parseLong(gracefulTimeout); + timerService = new TimerServiceImpl(responseTimeout); + queueManager = new TaskQueueManager(prop); + listenShutdown(); + } + + /** + * initiates protocol layer services. + * @param prop - Properties + */ + private void initProtocol(Properties prop) throws ProtocolException { + protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC); + protocol.init(prop, getProtocolCallback()); + } + + /** + * Creates protocol response callback + * @return - @{@link ProtocolResponseCallbackImpl} + */ + RetrieveMessageCallback getProtocolCallback(){ + return new ProtocolResponseCallbackImpl(); + } + + /** + * Registers a new handler in registry + * @param corrID - Correlation ID + * @param requestResponseHandler handler to be called when response arrives + */ + void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){ + registry.register(corrID, requestResponseHandler); + } + + /** + * Remove a handler from registry service by correlation ID. + * @param corrID - Correlation ID + * @return - @{@link RequestResponseHandler} + */ + RequestResponseHandler unregisterHandler(String corrID){ + return (RequestResponseHandler) registry.unregister(corrID); + } + + /** + * Checks in registry service if a handler is existing. + * @param corrID - Correlation ID + * @return - boolean + */ + boolean isExistHandler(String corrID) { + return registry.isExist(corrID); + } + + /** + * Starts timer for timeout event when a request was send successfully. + * @param corrID - Correlation ID + */ + void startTimer(String corrID){ + timerService.add(corrID, new TimeoutHandlerImpl(corrID)); + } + + /** + * Cancels timer for fimeout event, in case when complete response was received + * @param corrID + */ + void cancelTimer(String corrID){ + timerService.cancel(corrID); + } + + /** + * Submits a new task to Queue manager. it is using for both response and timeout tasks + * @param corrID - Correlation ID + * @param task - @{@link Runnable} task. + * @throws InterruptedException + */ + void submitTask(String corrID, Runnable task) throws InterruptedException { + queueManager.submit(corrID, task); + } + + /** + * Sends request to protocol. + * @param request - Request + * @param corrId - Correlation ID + * @param rpcName - RPC name + * @throws CoreException - @{@link CoreException} + */ + void sendRequest(String request, String corrId, String rpcName) throws CoreException { + MessageContext ctx = getMessageContext(corrId, rpcName); + try { + protocol.sendRequest(request, ctx); + } catch (ProtocolException e) { + unregisterHandler(corrId); + throw new CoreException(e); + } + } + + /** + * Creates @{@link MessageContext} + * @param correlationId - Correlation ID + * @param rpcName - RPC Name + * @return - @{@link MessageContext} + */ + private MessageContext getMessageContext(String correlationId, String rpcName){ + MessageContext msgCtx = new MessageContext(); + msgCtx.setCorrelationID(correlationId); + msgCtx.setRpc(rpcName); + return msgCtx; + } + + /** + * Implements response callback from protocol and filters responses by correlation ID. + * Only registered events(by correlation ID) will be handled. + */ + private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback { + @Override + public void onResponse(String response, MessageContext context) { + String corrID = context.getCorrelationID(); + if (corrID != null) { + RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID); + if (messageHandler != null) { + LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response); + messageHandler.handleResponse(context, response); + } + } + } + } + + + /** + * listens to @{@link Runtime} shutdown event + */ + private void listenShutdown() { + Runtime.getRuntime().addShutdownHook(new Thread(){ + public void run(){ + gracefulShutdown(); + } + }); + } + + /** + * Implements shutdown for client library. + * @param isForceShutdown - true force shutdown, false graceful shutdown + */ + void shutdown(boolean isForceShutdown){ + if(isForceShutdown){ + forceShutdown(); + }else{ + gracefulShutdown(); + } + } + + /** + * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force + * shutdown only when either all request will be handled or graceful shutdown will be time out. + */ + synchronized void gracefulShutdown(){ + isGracefulShutdown.set(true); + if(registry.isEmpty()){ + forceShutdown(); + } + else{ + try { + LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">"); + wait(shutdownTimeout); + LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">"); + forceShutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + + /** + * Closes Protocol, stops Queue Manager and shutdowns Time Service. + */ + private void forceShutdown(){ + isForceShutdown = true; + try { + LOG.info("Starting shutdown process."); + protocol.shutdown(); + queueManager.stopQueueManager(); + timerService.shutdown(); + } catch (InterruptedException e) { + LOG.info("Client library shutdown in progress ", e); + } + } + + /** + * + * @return - true when shutdown is in process + */ + boolean isShutdownInProgress(){ + return isForceShutdown || isGracefulShutdown.get(); + } + + /** + * Timeout handler implementation. + * This handler is responsible to assign a task for handling of timeout events. + * + */ + private class TimeoutHandlerImpl implements ITimeoutHandler { + + private final String corrID; + + TimeoutHandlerImpl(String corrID) { + this.corrID = corrID; + } + + /** + * When a timeout event is occurring, the new Timeout task will be assigned into a queue, + * this queue is shared between both timeout and handlers which belong to same correlation ID. + */ + @Override + public void onTimeout() { + try { + submitTask(corrID, new Runnable() { + @Override + public void run() { + RequestResponseHandler requestResponseHandler = unregisterHandler(corrID); + if (requestResponseHandler != null) { + requestResponseHandler.onTimeOut(); + } + } + }); + } catch (InterruptedException e) { + LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e); + } + } + } + + + /** + * Wakes Up graceful shutdown. + */ + class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback { + @Override + public synchronized void emptyCallback() { + LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">"); + if(isGracefulShutdown.get()){ + wakeUpShutdown(); + } + } + } + + /** + * wakes up waiting shutdown. + */ + private synchronized void wakeUpShutdown(){ + notifyAll(); + } + +} + diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java new file mode 100644 index 000000000..e0a0c5b34 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** client lib Registry + */ +class CoreRegistry{ + private Map registry = + new ConcurrentHashMap(); + + final private EmptyRegistryCallback emptyRegistryCallback; + + + CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){ + this.emptyRegistryCallback = emptyRegistryCallback; + } + + void register(String key, T obj) { + registry.put(key, obj); + } + + T unregister(String key) { + T item = (T) registry.remove(key); + if(registry.isEmpty()) { + emptyRegistryCallback.emptyCallback(); + } + return item; + } + + T get(String key){ + return (T) registry.get(key); + } + + synchronized boolean isExist(String key) { + return registry.containsKey(key); + } + + boolean isEmpty(){ + return registry.isEmpty(); + } + + public interface EmptyRegistryCallback{ + void emptyCallback(); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java new file mode 100644 index 000000000..862d56dc3 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public interface ICoreAsyncResponseHandler extends ICoreResponseHandler{ + + /** + * Core response to incoming message + * @param message response accepted from protocol + * @param type type of response + * @return true if message is final, false otherwise + */ + boolean onResponse(String message, String type); + + /** + * Core reaction to an event of exception + * @param e the exception which have been thrown + */ + void onException(Exception e); + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java new file mode 100644 index 000000000..555640dfd --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public interface ICoreResponseHandler { +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java new file mode 100644 index 000000000..996d3d8d2 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public interface ICoreSyncResponseHandler extends ICoreResponseHandler{ + + /** + * Core response to incoming message, should return completed message only + * @param message response accepted from protocol + * @param type type of response + * @return true if message is final, false otherwise + */ + T onResponse(String message, String type) throws CoreException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java new file mode 100644 index 000000000..93cf20b3f --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +/** + */ +public interface IInvocationManager { + + /** + * initializes the manager + * @param prop properties to read from + * @throws CoreException thrown if madatory fields are not set right + */ + void init(Properties prop) throws CoreException; + + /** + * handles the flow of an async request + * @param request the request body + * @param listener business response handler + * @param correlationId unique id of the request + * @param rpcName rpc call name + * @throws CoreException thrown if the request failed to be sent + */ + void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException; + + /** + * handles to flow of a sync request + * @param request the request body + * @param callback business response handler + * @param correlationId unique id of the request + * @param rpcName rpc call name + * @return the output object to be returned + * @throws CoreException thrown if the request failed to be sent + * @throws TimeoutException thrown if timeout has exceeded + */ + T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException; + + /** + * shuts the invocation manager down. + * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully + */ + void shutdown(boolean isForceShutdown); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java new file mode 100644 index 000000000..0f3b81a6f --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +interface ITimeoutHandler { + + /** + * handles timeout event + */ + void onTimeout(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java new file mode 100644 index 000000000..96b06033f --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +interface ITimerService { + + /** + * add a new timeout handler to a request + * @param correlationID the id of the request + * @param handler to be called once "timeout' time has arrived + */ + void add(String correlationID, ITimeoutHandler handler); + + /** + * cancel the timeout handler of a request + * @param correlationID the id of the request + */ + void cancel(String correlationID); + + + /** + * shuts the timer service down immediately + */ + void shutdown(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java new file mode 100644 index 000000000..8179da107 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +/** + * layer for passing requests from API to Core + */ +class InvocationManager implements IInvocationManager{ + + protected CoreManager coreManager = null; + + InvocationManager(){ + } + + public void init(Properties properties) throws CoreException { + coreManager = new CoreManager(properties); + } + + /** + * + * @param request + * @param businessCallback + * @param correlationId + * @param rpcName + * @throws CoreException + */ + public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException { + AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager); + requestResponseHandler.sendRequest(request, correlationId, rpcName); + } + + public T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException { + SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager); + requestResponseHandler.sendRequest(request, correlationId, rpcName); + T responseObject = (T) requestResponseHandler.getResponse(); + return responseObject; + } + + @Override + public void shutdown(boolean isForceShutdown) { + coreManager.shutdown(isForceShutdown); + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java new file mode 100644 index 000000000..c9face762 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public abstract class InvocationManagerFactory { + private static IInvocationManager invocationManager = null; + + public static synchronized IInvocationManager getInstance(){ + if(invocationManager == null){ + invocationManager = new InvocationManager(); + } + return invocationManager; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java new file mode 100644 index 000000000..6fab66bb3 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +/** Helper class for wrapping request/response information. + */ +public class MessageContext { + + /** + * valid values of type are response/error + */ + private String type; + + /** + * RPC name + */ + private String rpc; + + /** + * correlation ID + */ + private String correlationID; + + /** + * partitioner for message bus usage + */ + private String partitioner; + + + public String getRpc() { + return rpc; + } + + public void setRpc(String rpc) { + this.rpc = rpc; + } + + public String getCorrelationID() { + return correlationID; + } + + public void setCorrelationID(String correlationID) { + this.correlationID = correlationID; + } + + public String getPartiton() { + return partitioner; + } + + public void setPartiton(String partitioner) { + this.partitioner = partitioner; + } + + public void setType(String type){ + this.type = type; + } + + public String getType(){ + return type; + } + + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java new file mode 100644 index 000000000..8e05a2974 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +interface RequestResponseHandler { + + /** + * sends request, registers handler of response and start timer. + * @param request - Request + * @param corrId - correlation ID + * @param rpcName - RPC name + * @throws CoreException - @{@link CoreException} + */ + void sendRequest(String request, String corrId, String rpcName) throws CoreException; + + /** + * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is + * still existing in core registry @{@link CoreRegistry}, others timeout was occurred . + * @param ctx - Message Context @{@link MessageContext} + * @param response - Response from backend + */ + void handleResponse(MessageContext ctx, String response); + + /** + * handles timeout event + */ + void onTimeOut(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java new file mode 100644 index 000000000..90b0a9926 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java @@ -0,0 +1,107 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.concurrent.TimeoutException; + +/** Handles sync requests + */ +class SyncRequestResponseHandler extends AbstractRequestResponseHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class); + private T responseObject = null; + private CoreException coreException = null; + private TimeoutException timeoutException = null; + + SyncRequestResponseHandler(String corrID, + ICoreResponseHandler callback, + CoreManager coreManager){ + super(corrID, callback, coreManager); + } + + /** + * Calls API callback for getting response object. in case of complete response notifies consumer + * thread for receiving response + * @param response - Response + * @param type - Type of Response + */ + synchronized void runTask(String response, String type) { + try { + responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type); + } catch (CoreException e) { + coreException = e; + } + if(responseObject != null || coreException != null) { + notify(); + } + } + + + /** + * Returns response. goes sleep until coming either timeout event or complete response + */ + public synchronized T getResponse() throws CoreException, TimeoutException { + try{ + if(!isResponseReceived()){ + wait(); + } + if (coreException != null) { + throw coreException; + } + if ( timeoutException != null) { + throw timeoutException; + } + + } catch (InterruptedException e) { + throw new CoreException(e); + } finally{ + coreManager.unregisterHandler(corrID); + coreManager.cancelTimer(corrID); + } + return (T) responseObject; + } + + /** + * indicates if a response received + * @return + */ + private boolean isResponseReceived() { + return responseObject != null; + } + + @Override + public synchronized void onTimeOut() { + LOG.error("sync response handler on timeout correlation ID <" + corrID + ">."); + timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID); + notify(); + } + + + + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java new file mode 100644 index 000000000..4ceeb3f08 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** Responsible to ensure synchronous handling of responses and timouts. + */ +class TaskQueue implements Runnable{ + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class); + + private boolean isShutdown; + + synchronized void addTask(Runnable task) throws InterruptedException { + queue.put(task); + } + + public void run() { + Runnable task; + while(!Thread.currentThread().isInterrupted() && !isShutdown){ + try { + task = queue.take(); + task.run(); + } catch (InterruptedException e) { + LOG.error("could not take task from queue", e); + } catch (RuntimeException e) { + LOG.error("could not run task", e); + } + LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown); + } + LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process."); + } + + void stopQueue(){ + isShutdown = true; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java new file mode 100644 index 000000000..b87349411 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** Creates a task queue pool that reuses a fixed number of threads. + * Assigns one thread for each queue. + */ +class TaskQueueManager { + private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class); + private ExecutorService executorService; + private final static String DEFAULT_POOL_SIZE = "10"; + private final static String CLIENT_POOL_SIZE = "client.pool.size"; + private TaskQueue[] queues; + private int poolInt; + + TaskQueueManager(Properties properties){ + String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE); + poolInt = Integer.parseInt(size); + this.executorService = Executors.newFixedThreadPool(poolInt); + initTaskQueues(); + } + + private void initTaskQueues(){ + queues = new TaskQueue[poolInt]; + for(int i=0; i listTask = executorService.shutdownNow(); + if (!executorService.awaitTermination(6, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + LOG.info("the amount of tasks that never commenced execution " + listTask.size()); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java new file mode 100644 index 000000000..fa2d0804d --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.List; +import java.util.concurrent.*; + +class TimerServiceImpl implements ITimerService { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap timeOutEvents = new ConcurrentHashMap<>(); + private final long responseTimeout; + + TimerServiceImpl(long responseTimeout) { + this.responseTimeout = responseTimeout; + } + + @Override + public synchronized void cancel(String correlationID) { + Future timeOutEvent = timeOutEvents.remove(correlationID); + if (timeOutEvent != null){ + timeOutEvent.cancel(true); + } + } + + @Override + public synchronized void add(String correlationID, ITimeoutHandler handler) { + Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS); + timeOutEvents.put(correlationID, timeOutEvent); + } + + @Override + public void shutdown() { + List listTask = scheduler.shutdownNow(); + LOG.info("the amount of tasks that never commenced execution " + listTask.size()); + } + + private class HandleTimeout implements Runnable { + + String correlationID; + ITimeoutHandler handler; + + HandleTimeout(String correlationID, ITimeoutHandler handler) { + this.correlationID = correlationID; + this.handler = handler; + } + + @Override + public void run(){ + System.out.println("Timeout event of request " + correlationID); + handler.onTimeout(); + timeOutEvents.remove(correlationID); + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java new file mode 100644 index 000000000..a76f0a90b --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java @@ -0,0 +1,77 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +class APPCMessageReaderWriter implements MessageReader, MessageWriter { + + private final ObjectMapper mapper; + private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class); + + APPCMessageReaderWriter() { + mapper = new ObjectMapper(); + } + + public String read(String payload, MessageContext context) throws ProtocolException { + try { + ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class); + context.setType(protocolMessage.getType()); + context.setRpc(protocolMessage.getRpcName()); + context.setCorrelationID(protocolMessage.getCorrelationID()); + context.setPartiton(protocolMessage.getPartition()); + String body = protocolMessage.getBody().toString(); + LOG.debug("Received body : <" + body + ">"); + return body; + } catch (IOException e) { + throw new ProtocolException(e); + } + + } + + public String write(String payload, MessageContext context) throws ProtocolException { + try { + ProtocolMessage protocolMessage = new ProtocolMessage(); + protocolMessage.setVersion("2.0"); + protocolMessage.setType(context.getType()); + protocolMessage.setRpcName(context.getRpc()); + protocolMessage.setCorrelationID(context.getCorrelationID()); + protocolMessage.setPartition(context.getPartiton()); + JsonNode body = mapper.readTree(payload); + protocolMessage.setBody(body); + String message = mapper.writeValueAsString(protocolMessage); + return message; + } catch (IOException e) { + throw new ProtocolException(e); + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java new file mode 100644 index 000000000..94d2d6b85 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; + +public interface AsyncProtocol extends Protocol { + + /** + * sends a string message to underlying message bus/java API + * @param payload - meesage body + * @param context - message headers + * @throws ProtocolException + */ + void sendRequest(String payload, MessageContext context) throws ProtocolException; + + void shutdown(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java new file mode 100644 index 000000000..82626d802 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java @@ -0,0 +1,157 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +class AsyncProtocolImpl implements AsyncProtocol { + + /** + * message bus listener thread handler + */ + private Future listenerHandler; + /** + * called when messages are fetched - called for a single message + */ + private RetrieveMessageCallback callback; + /** + * message bus client used to send/fetch + */ + private MessagingService messageService; + /** + * Message reader used to extract body and context from reponse message + */ + private MessageReader messageReader; + /** + * Message writer used to construct meesage from body and context + */ + private MessageWriter messageWriter; + + /** + * shutdown indicator + */ + private boolean isShutdown = false; + + /** + * executor service for listener usage + */ + private ExecutorService executorService = Executors.newSingleThreadExecutor(); + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class); + + + AsyncProtocolImpl() { + + messageService = new UEBMessagingService(); + messageReader = new APPCMessageReaderWriter(); + messageWriter = (MessageWriter) messageReader; + } + + public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException { + + if (callback == null) { + throw new ProtocolException("Callback param should not be null!"); + } + this.callback = callback; + + try { + messageService.init(props); + //get message bus listener thread + //start the thread after initializing services + listenerHandler = executorService.submit(new Listener()); + } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) { + throw new ProtocolException(e); + } + } + + public void sendRequest(String payload, MessageContext context) throws ProtocolException { + + //get message to be sent to appc from payload and context + String message = messageWriter.write(payload, context); + try { + messageService.send(context.getPartiton(), message); + LOG.debug("Successfully send message: " + message); + } catch (IOException e) { + throw new ProtocolException(e); + } + } + + @Override + public void shutdown() { + isShutdown = true; + messageService.close(); + LOG.warn("The protocol layer in shutdown stage."); + executorService.shutdownNow(); + } + + public class Listener implements Runnable { + + + public void run() { + + while (!isShutdown) { + List messages = new ArrayList<>(); + try { + messages = messageService.fetch(); + LOG.debug("Successfully fetched " + messages.size() + " messages"); + } catch (IOException e) { + LOG.error("Fetching " + messages.size() + " messages failed"); + } + for (String message : messages) { + + MessageContext context = new MessageContext(); + String payload = null; + + try { + //get payload and context from message to be sent to core layer + payload = messageReader.read(message, context); + LOG.debug("Got body: " + payload); + //call core layer response handler + if(!isShutdown) { + callback.onResponse(payload, context); + }else{ + LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" + + context.getCorrelationID() + "> response ", message); + } + } catch (ProtocolException e) { + LOG.error("Failed to read message from UEB. message is: " + message); + } + } + } + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java new file mode 100644 index 000000000..4765a58ef --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.io.IOException; +import java.util.List; + +interface Consumer { + + /** + * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. + * + * @return A list of strings representing the messages pulled from the topic. + * @throws IOException + */ + List fetch() throws IOException; + + /** + * Gets a batch of messages from the topic. + * + * @param limit The amount of messages to fetch + * @return A list of strings representing the messages pulled from the topic. + * @throws IOException + */ + List fetch(int limit) throws IOException; + + /** + * Send dummy fetch request to register client to be able to fetch messages + * @throws IOException + */ + void registerForRead() throws IOException; + + void close(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java new file mode 100644 index 000000000..913f80f44 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +class ConsumerImpl implements Consumer { + + private static final int DEFAULT_LIMIT = 1000; + + private Collection hosts; + private String topic; + private String group; + private String groupId; + private int timeout; + + private String authKey; + private String authSecret; + + private CambriaConsumer consumer = null; + + /** + * constructor + * @param urls + * @param topicName + * @param consumerName + * @param consumerId + * @param timeout + */ + public ConsumerImpl(Collection urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + this.hosts = urls; + this.topic = topicName; + this.group = consumerName; + this.groupId = consumerId; + this.authKey = apiKey; + this.authSecret = apiSecret; + this.timeout = timeout; + consumer = getConsumer(); + } + + + public List fetch() throws IOException { + + return fetch(DEFAULT_LIMIT); + } + + public List fetch(int limit) throws IOException { + + List out = new ArrayList(); + try { + for(String msg : consumer.fetch(timeout,limit)){ + out.add(msg); + } + } catch (IOException e) { + throw e; + } + return out; + } + + public void registerForRead() throws IOException { + + int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages + consumer.fetch(waitForRegisteration, 1); + } + + /** + * init cambria consumer + * @return CambriaConsumer + */ + private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + + ConsumerBuilder builder = new ConsumerBuilder(); + + builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId); + builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout); + builder.receivingAtMost(DEFAULT_LIMIT); + + // Add credentials if provided + if (authKey != null && authSecret != null) { + + Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey"); + apiKeyField.setAccessible(true); + apiKeyField.set(builder, ""); + builder.authenticatedBy(authKey, authSecret); + } + + return builder.build(); + } + + @Override + public void close() { + consumer.close(); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java new file mode 100644 index 000000000..19688d696 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; + +public interface MessageReader { + + /** + * reads payload, fills the context out of payload headers, and returns the body of the payload + * @param payload incoming message + * @param context context to fill + * @return body of the payload + * @throws ProtocolException + */ + String read(String payload, MessageContext context) throws ProtocolException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java new file mode 100644 index 000000000..0849bc4a4 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import com.fasterxml.jackson.databind.JsonNode; + +public interface MessageWriter { + + /** + * builds a message out of context and payload + * @param payload body of the message + * @param context headers of the message + * @return the message to write/send + * @throws ProtocolException + */ + String write(String payload, MessageContext context) throws ProtocolException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java new file mode 100644 index 000000000..029378931 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Properties; + +interface MessagingService { + + /** + * initialize consumer/publisher + * @param props + */ + void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException; + + /** + * sends a string as is + * @param partition + * @param body + */ + void send(String partition, String body) throws IOException; + + /** + * retrieve messages from bus - timeout extracted from props or see impl + * @return + */ + List fetch() throws IOException; + + /** + * retrieve messages from bus - timeout extracted from props or see impl + * @param limit + * @return + */ + List fetch(int limit) throws IOException; + + void close(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java new file mode 100644 index 000000000..f290e8a89 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.io.IOException; + +interface Producer { + + /** + * send a message to a partition via ueb + * @param data + */ + void post(String Partition, String data) throws IOException; + + void close(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java new file mode 100644 index 000000000..7729db98d --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.Collection; + +class ProducerImpl implements Producer { + + private Collection hosts; + private String topic; + private CambriaBatchingPublisher producer; + + private String authKey; + private String authSecret; + + public ProducerImpl(Collection urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException { + + topic = topicName; + hosts = urls; + authKey = apiKey; + authSecret = apiSecret; + producer = getProducer(); + } + + public void post(String partition, String data) throws IOException { + + producer.send(partition, data); + } + + /** + * get cambria producer + * @return + */ + private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException { + + PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts); + + // Add credentials if provided + if (authKey != null && authSecret != null) { + builder.authenticatedBy(authKey, authSecret); + } + + CambriaBatchingPublisher client = null; + + client = builder.onTopic(topic).build(); + + return client; + } + + @Override + public void close() { + producer.close(); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java new file mode 100644 index 000000000..eaa21d857 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.util.Properties; + +public interface Protocol { + + /** + * init protocol properties and callback + * @param props + * @param callback + * @throws ProtocolException + */ + void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java new file mode 100644 index 000000000..eb0537b80 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +public class ProtocolException extends Exception { + + public ProtocolException() { + super(); + } + + public ProtocolException(String message) { + super(message); + } + + public ProtocolException(String message, Throwable cause) { + super(message, cause); + } + + public ProtocolException(Throwable cause) { + super(cause); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java new file mode 100644 index 000000000..98e7d669b --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.util.HashMap; +import java.util.Map; + +public class ProtocolFactory { + + private static ProtocolFactory instance; + private Map protocols; + + /** + * Singleton factory + */ + private ProtocolFactory(){ + + protocols = new HashMap(); + } + + /** + * get factory instance + * @return factory instance + */ + public static synchronized ProtocolFactory getInstance(){ + + if (instance == null) { + instance = new ProtocolFactory(); + } + return instance; + } + + /** + * returns instantiated protocol object + * @param type of protocol object + * @return protocol object + */ + public Protocol getProtocolObject(ProtocolType type) throws ProtocolException { + + Protocol protocol = protocols.get(type); + synchronized (this) { + if (protocol == null) { + switch (type) { + case SYNC: + throw new ProtocolException("Protocol SYNC is not implemented"); + case ASYNC: + protocol = new AsyncProtocolImpl(); + protocols.put(type, protocol); + break; + default: + throw new ProtocolException("Protocol type not found"); + } + } + } + return protocol; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java new file mode 100644 index 000000000..c02ea5607 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +class ProtocolMessage { + + private String version; + private String type; + private String rpcName; + private String correlationID; // correlation-id + private String partition; // cambria.partition + private JsonNode body; + + @JsonProperty + String getVersion() { + return version; + } + + @JsonProperty + void setVersion(String version) { + this.version = version; + } + + @JsonProperty + String getType() { + return type; + } + + @JsonProperty + void setType(String type) { + this.type = type; + } + + @JsonProperty("rpc-name") + String getRpcName() { + return rpcName; + } + + @JsonProperty("rpc-name") + void setRpcName(String rpcName) { + this.rpcName = rpcName; + } + + @JsonProperty("correlation-id") + String getCorrelationID() { + return correlationID; + } + + @JsonProperty("correlation-id") + void setCorrelationID(String correlationID) { + this.correlationID = correlationID; + } + + @JsonProperty("cambria.partition") + String getPartition() { + return partition; + } + + @JsonProperty("cambria.partition") + void setPartition(String partition) { + this.partition = partition; + } + + @JsonProperty + JsonNode getBody() { + return body; + } + + @JsonProperty + void setBody(JsonNode body) { + this.body = body; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java new file mode 100644 index 000000000..cc2eca447 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +public enum ProtocolType { + + SYNC, ASYNC; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java new file mode 100644 index 000000000..8fc486bb8 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + + +import org.onap.appc.client.impl.core.MessageContext; + +public interface RetrieveMessageCallback { + + /** + * called when response received + * @param payload + * @param context + */ + void onResponse(String payload, MessageContext context); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java new file mode 100644 index 000000000..df51861b8 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java @@ -0,0 +1,102 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.*; + +class UEBMessagingService implements MessagingService { + + private Consumer consumer; + private Producer producer; + + private final String DEFAULT_READ_TIMEOUT_MS = "60000"; + private final String DEFAULT_READ_LIMIT = "1000"; + + private int readLimit; + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class); + + @SuppressWarnings("Since15") + public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + + if (props != null) { + String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ); + String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE); + String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER); + String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET); + String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS); + Integer readTimeout = Integer.parseInt(readTimeoutString); + String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT); + readLimit = Integer.parseInt(readLimitString); + //get hosts pool + Collection pool = new HashSet(); + String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS); + if (hostNames != null && !hostNames.isEmpty()) { + for (String name : hostNames.split(",")) { + pool.add(name); + } + } + + //generate consumer id and group - same value for both + String consumerName = UUID.randomUUID().toString(); + String consumerID = consumerName; + + //create consumer and producer + consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret); + producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret); + + //initial consumer registration + try { + consumer.registerForRead(); + }catch(Exception e){ + LOG.error("Message consumer failed to register client "+consumerID); + } + } + } + + public void send(String partition, String body) throws IOException { + producer.post(partition, body); + } + + public List fetch() throws IOException { + return consumer.fetch(readLimit); + } + + public List fetch(int limit) throws IOException { + return consumer.fetch(limit); + } + + @Override + public void close() { + consumer.close(); + producer.close(); + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java new file mode 100644 index 000000000..5c1916f2b --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +class UEBPropertiesKeys { + + static final String TOPIC_READ = "topic.read"; + static final String TOPIC_READ_TIMEOUT = "topic.read.timeout"; + static final String READ_LIMIT = "topic.read.limit"; + static final String TOPIC_WRITE = "topic.write"; + static final String AUTH_USER = "client.key"; + static final String AUTH_SECRET = "client.secret"; + static final String HOSTS = "poolMembers"; +} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java deleted file mode 100644 index c5d6120f0..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -/** Abstract request response handler class, responsible for common functionality of - * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler} - */ -abstract class AbstractRequestResponseHandler implements RequestResponseHandler { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class); - ICoreResponseHandler businessCallback; - protected String corrID; - CoreManager coreManager; - - - AbstractRequestResponseHandler(String corrID, - ICoreResponseHandler businessCallback, - CoreManager coreManager) - { - this.businessCallback = businessCallback; - this.corrID = corrID; - this.coreManager = coreManager; - } - - public synchronized void handleResponse(final MessageContext ctx, final String response) { - try { - coreManager.submitTask(ctx.getCorrelationID(), new Runnable() { - @Override - public void run() { - LOG.info("handling response of corrID <" + corrID + ">" + "response " + response); - if(coreManager.isExistHandler(corrID)) { - runTask(response, ctx.getType()); - } - - } - }); - } catch (InterruptedException e) { - LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e); - } - } - - /** - * - * @param response - Response - * @param type - Type of Response - */ - abstract void runTask(String response, String type); - - @Override - public void sendRequest(String request, String corrId, String rpcName) throws CoreException { - if(!coreManager.isShutdownInProgress()) { - coreManager.registerHandler(corrId, this); - coreManager.sendRequest(request, corrId, rpcName); - coreManager.startTimer(corrId); - }else{ - throw new CoreException("Shutdown is in progress. Request will not be handled"); - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java deleted file mode 100644 index 1bcf0af99..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.concurrent.TimeoutException; - -/** Handles async responses - */ -class AsyncRequestResponseHandler extends AbstractRequestResponseHandler { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class); - - AsyncRequestResponseHandler(String corrID, - ICoreResponseHandler businessCallback, - CoreManager coreManager) - { - super(corrID, businessCallback, coreManager); - } - - /** - * Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and - * unregisters the handler. - * @param response - Response - * @param type - Type of Response - */ - public void runTask(String response, String type) { - boolean finalTask = false; - try { - finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type); - } catch (Exception e){ - LOG.error("Error on API layer, for request with correlation-id " + corrID, e); - } - if (finalTask){ - coreManager.cancelTimer(corrID); - coreManager.unregisterHandler(corrID); - } - else{ - response = null; - type = null; - } - } - - /** - * Calls to API layer for sending timeout exception. - */ - @Override - public void onTimeOut() { - LOG.info("timeout for request with correlation-id " + corrID); - ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID)); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java deleted file mode 100644 index a2bf6e250..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - - -public class CoreException extends Exception { - - public CoreException() { - super(); - } - - public CoreException(String message) { - super(message); - } - - public CoreException(String message, Throwable cause) { - super(message, cause); - } - - public CoreException(Throwable cause) { - super(cause); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java deleted file mode 100644 index c1c23890e..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java +++ /dev/null @@ -1,314 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import org.onap.appc.client.impl.protocol.*; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events. - */ -class CoreManager{ - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class); - private final ProtocolFactory protocolFactory; - protected AsyncProtocol protocol; - private final RetrieveMessageCallback protocolCallback = null; - private final CoreRegistry registry; - private final ITimerService timerService; - private final TaskQueueManager queueManager; - private String DEFAULT_TIMEOUT = "300000"; - private final static String RESPONSE_TIMEOUT = "client.response.timeout"; - private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout"; - private boolean isForceShutdown = false; - private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false); - private long shutdownTimeout; - - CoreManager(Properties prop) throws CoreException { - protocolFactory = ProtocolFactory.getInstance(); - try { - initProtocol(prop); - }catch (ProtocolException e){ - throw new CoreException(e); - } - registry = new CoreRegistry(new EmptyRegistryCallbackImpl()); - String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT); - long responseTimeout = Long.parseLong(timeoutProp); - String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT); - shutdownTimeout = Long.parseLong(gracefulTimeout); - timerService = new TimerServiceImpl(responseTimeout); - queueManager = new TaskQueueManager(prop); - listenShutdown(); - } - - /** - * initiates protocol layer services. - * @param prop - Properties - */ - private void initProtocol(Properties prop) throws ProtocolException { - protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC); - protocol.init(prop, getProtocolCallback()); - } - - /** - * Creates protocol response callback - * @return - @{@link ProtocolResponseCallbackImpl} - */ - RetrieveMessageCallback getProtocolCallback(){ - return new ProtocolResponseCallbackImpl(); - } - - /** - * Registers a new handler in registry - * @param corrID - Correlation ID - * @param requestResponseHandler handler to be called when response arrives - */ - void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){ - registry.register(corrID, requestResponseHandler); - } - - /** - * Remove a handler from registry service by correlation ID. - * @param corrID - Correlation ID - * @return - @{@link RequestResponseHandler} - */ - RequestResponseHandler unregisterHandler(String corrID){ - return (RequestResponseHandler) registry.unregister(corrID); - } - - /** - * Checks in registry service if a handler is existing. - * @param corrID - Correlation ID - * @return - boolean - */ - boolean isExistHandler(String corrID) { - return registry.isExist(corrID); - } - - /** - * Starts timer for timeout event when a request was send successfully. - * @param corrID - Correlation ID - */ - void startTimer(String corrID){ - timerService.add(corrID, new TimeoutHandlerImpl(corrID)); - } - - /** - * Cancels timer for fimeout event, in case when complete response was received - * @param corrID - */ - void cancelTimer(String corrID){ - timerService.cancel(corrID); - } - - /** - * Submits a new task to Queue manager. it is using for both response and timeout tasks - * @param corrID - Correlation ID - * @param task - @{@link Runnable} task. - * @throws InterruptedException - */ - void submitTask(String corrID, Runnable task) throws InterruptedException { - queueManager.submit(corrID, task); - } - - /** - * Sends request to protocol. - * @param request - Request - * @param corrId - Correlation ID - * @param rpcName - RPC name - * @throws CoreException - @{@link CoreException} - */ - void sendRequest(String request, String corrId, String rpcName) throws CoreException { - MessageContext ctx = getMessageContext(corrId, rpcName); - try { - protocol.sendRequest(request, ctx); - } catch (ProtocolException e) { - unregisterHandler(corrId); - throw new CoreException(e); - } - } - - /** - * Creates @{@link MessageContext} - * @param correlationId - Correlation ID - * @param rpcName - RPC Name - * @return - @{@link MessageContext} - */ - private MessageContext getMessageContext(String correlationId, String rpcName){ - MessageContext msgCtx = new MessageContext(); - msgCtx.setCorrelationID(correlationId); - msgCtx.setRpc(rpcName); - return msgCtx; - } - - /** - * Implements response callback from protocol and filters responses by correlation ID. - * Only registered events(by correlation ID) will be handled. - */ - private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback { - @Override - public void onResponse(String response, MessageContext context) { - String corrID = context.getCorrelationID(); - if (corrID != null) { - RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID); - if (messageHandler != null) { - LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response); - messageHandler.handleResponse(context, response); - } - } - } - } - - - /** - * listens to @{@link Runtime} shutdown event - */ - private void listenShutdown() { - Runtime.getRuntime().addShutdownHook(new Thread(){ - public void run(){ - gracefulShutdown(); - } - }); - } - - /** - * Implements shutdown for client library. - * @param isForceShutdown - true force shutdown, false graceful shutdown - */ - void shutdown(boolean isForceShutdown){ - if(isForceShutdown){ - forceShutdown(); - }else{ - gracefulShutdown(); - } - } - - /** - * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force - * shutdown only when either all request will be handled or graceful shutdown will be time out. - */ - synchronized void gracefulShutdown(){ - isGracefulShutdown.set(true); - if(registry.isEmpty()){ - forceShutdown(); - } - else{ - try { - LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">"); - wait(shutdownTimeout); - LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">"); - forceShutdown(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - } - } - - /** - * Closes Protocol, stops Queue Manager and shutdowns Time Service. - */ - private void forceShutdown(){ - isForceShutdown = true; - try { - LOG.info("Starting shutdown process."); - protocol.shutdown(); - queueManager.stopQueueManager(); - timerService.shutdown(); - } catch (InterruptedException e) { - LOG.info("Client library shutdown in progress ", e); - } - } - - /** - * - * @return - true when shutdown is in process - */ - boolean isShutdownInProgress(){ - return isForceShutdown || isGracefulShutdown.get(); - } - - /** - * Timeout handler implementation. - * This handler is responsible to assign a task for handling of timeout events. - * - */ - private class TimeoutHandlerImpl implements ITimeoutHandler { - - private final String corrID; - - TimeoutHandlerImpl(String corrID) { - this.corrID = corrID; - } - - /** - * When a timeout event is occurring, the new Timeout task will be assigned into a queue, - * this queue is shared between both timeout and handlers which belong to same correlation ID. - */ - @Override - public void onTimeout() { - try { - submitTask(corrID, new Runnable() { - @Override - public void run() { - RequestResponseHandler requestResponseHandler = unregisterHandler(corrID); - if (requestResponseHandler != null) { - requestResponseHandler.onTimeOut(); - } - } - }); - } catch (InterruptedException e) { - LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e); - } - } - } - - - /** - * Wakes Up graceful shutdown. - */ - class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback { - @Override - public synchronized void emptyCallback() { - LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">"); - if(isGracefulShutdown.get()){ - wakeUpShutdown(); - } - } - } - - /** - * wakes up waiting shutdown. - */ - private synchronized void wakeUpShutdown(){ - notifyAll(); - } - -} - diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java deleted file mode 100644 index e0a0c5b34..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java +++ /dev/null @@ -1,70 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** client lib Registry - */ -class CoreRegistry{ - private Map registry = - new ConcurrentHashMap(); - - final private EmptyRegistryCallback emptyRegistryCallback; - - - CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){ - this.emptyRegistryCallback = emptyRegistryCallback; - } - - void register(String key, T obj) { - registry.put(key, obj); - } - - T unregister(String key) { - T item = (T) registry.remove(key); - if(registry.isEmpty()) { - emptyRegistryCallback.emptyCallback(); - } - return item; - } - - T get(String key){ - return (T) registry.get(key); - } - - synchronized boolean isExist(String key) { - return registry.containsKey(key); - } - - boolean isEmpty(){ - return registry.isEmpty(); - } - - public interface EmptyRegistryCallback{ - void emptyCallback(); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java deleted file mode 100644 index 862d56dc3..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public interface ICoreAsyncResponseHandler extends ICoreResponseHandler{ - - /** - * Core response to incoming message - * @param message response accepted from protocol - * @param type type of response - * @return true if message is final, false otherwise - */ - boolean onResponse(String message, String type); - - /** - * Core reaction to an event of exception - * @param e the exception which have been thrown - */ - void onException(Exception e); - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java deleted file mode 100644 index 555640dfd..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public interface ICoreResponseHandler { -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java deleted file mode 100644 index 996d3d8d2..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public interface ICoreSyncResponseHandler extends ICoreResponseHandler{ - - /** - * Core response to incoming message, should return completed message only - * @param message response accepted from protocol - * @param type type of response - * @return true if message is final, false otherwise - */ - T onResponse(String message, String type) throws CoreException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java deleted file mode 100644 index 93cf20b3f..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import java.util.Properties; -import java.util.concurrent.TimeoutException; - -/** - */ -public interface IInvocationManager { - - /** - * initializes the manager - * @param prop properties to read from - * @throws CoreException thrown if madatory fields are not set right - */ - void init(Properties prop) throws CoreException; - - /** - * handles the flow of an async request - * @param request the request body - * @param listener business response handler - * @param correlationId unique id of the request - * @param rpcName rpc call name - * @throws CoreException thrown if the request failed to be sent - */ - void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException; - - /** - * handles to flow of a sync request - * @param request the request body - * @param callback business response handler - * @param correlationId unique id of the request - * @param rpcName rpc call name - * @return the output object to be returned - * @throws CoreException thrown if the request failed to be sent - * @throws TimeoutException thrown if timeout has exceeded - */ - T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException; - - /** - * shuts the invocation manager down. - * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully - */ - void shutdown(boolean isForceShutdown); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java deleted file mode 100644 index 0f3b81a6f..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -interface ITimeoutHandler { - - /** - * handles timeout event - */ - void onTimeout(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java deleted file mode 100644 index 96b06033f..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java +++ /dev/null @@ -1,47 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -interface ITimerService { - - /** - * add a new timeout handler to a request - * @param correlationID the id of the request - * @param handler to be called once "timeout' time has arrived - */ - void add(String correlationID, ITimeoutHandler handler); - - /** - * cancel the timeout handler of a request - * @param correlationID the id of the request - */ - void cancel(String correlationID); - - - /** - * shuts the timer service down immediately - */ - void shutdown(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java deleted file mode 100644 index 8179da107..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java +++ /dev/null @@ -1,69 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import java.util.Properties; -import java.util.concurrent.TimeoutException; - -/** - * layer for passing requests from API to Core - */ -class InvocationManager implements IInvocationManager{ - - protected CoreManager coreManager = null; - - InvocationManager(){ - } - - public void init(Properties properties) throws CoreException { - coreManager = new CoreManager(properties); - } - - /** - * - * @param request - * @param businessCallback - * @param correlationId - * @param rpcName - * @throws CoreException - */ - public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException { - AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager); - requestResponseHandler.sendRequest(request, correlationId, rpcName); - } - - public T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException { - SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager); - requestResponseHandler.sendRequest(request, correlationId, rpcName); - T responseObject = (T) requestResponseHandler.getResponse(); - return responseObject; - } - - @Override - public void shutdown(boolean isForceShutdown) { - coreManager.shutdown(isForceShutdown); - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java deleted file mode 100644 index c9face762..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public abstract class InvocationManagerFactory { - private static IInvocationManager invocationManager = null; - - public static synchronized IInvocationManager getInstance(){ - if(invocationManager == null){ - invocationManager = new InvocationManager(); - } - return invocationManager; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java deleted file mode 100644 index 6fab66bb3..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java +++ /dev/null @@ -1,85 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -/** Helper class for wrapping request/response information. - */ -public class MessageContext { - - /** - * valid values of type are response/error - */ - private String type; - - /** - * RPC name - */ - private String rpc; - - /** - * correlation ID - */ - private String correlationID; - - /** - * partitioner for message bus usage - */ - private String partitioner; - - - public String getRpc() { - return rpc; - } - - public void setRpc(String rpc) { - this.rpc = rpc; - } - - public String getCorrelationID() { - return correlationID; - } - - public void setCorrelationID(String correlationID) { - this.correlationID = correlationID; - } - - public String getPartiton() { - return partitioner; - } - - public void setPartiton(String partitioner) { - this.partitioner = partitioner; - } - - public void setType(String type){ - this.type = type; - } - - public String getType(){ - return type; - } - - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java deleted file mode 100644 index 8e05a2974..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -interface RequestResponseHandler { - - /** - * sends request, registers handler of response and start timer. - * @param request - Request - * @param corrId - correlation ID - * @param rpcName - RPC name - * @throws CoreException - @{@link CoreException} - */ - void sendRequest(String request, String corrId, String rpcName) throws CoreException; - - /** - * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is - * still existing in core registry @{@link CoreRegistry}, others timeout was occurred . - * @param ctx - Message Context @{@link MessageContext} - * @param response - Response from backend - */ - void handleResponse(MessageContext ctx, String response); - - /** - * handles timeout event - */ - void onTimeOut(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java deleted file mode 100644 index 90b0a9926..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.concurrent.TimeoutException; - -/** Handles sync requests - */ -class SyncRequestResponseHandler extends AbstractRequestResponseHandler { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class); - private T responseObject = null; - private CoreException coreException = null; - private TimeoutException timeoutException = null; - - SyncRequestResponseHandler(String corrID, - ICoreResponseHandler callback, - CoreManager coreManager){ - super(corrID, callback, coreManager); - } - - /** - * Calls API callback for getting response object. in case of complete response notifies consumer - * thread for receiving response - * @param response - Response - * @param type - Type of Response - */ - synchronized void runTask(String response, String type) { - try { - responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type); - } catch (CoreException e) { - coreException = e; - } - if(responseObject != null || coreException != null) { - notify(); - } - } - - - /** - * Returns response. goes sleep until coming either timeout event or complete response - */ - public synchronized T getResponse() throws CoreException, TimeoutException { - try{ - if(!isResponseReceived()){ - wait(); - } - if (coreException != null) { - throw coreException; - } - if ( timeoutException != null) { - throw timeoutException; - } - - } catch (InterruptedException e) { - throw new CoreException(e); - } finally{ - coreManager.unregisterHandler(corrID); - coreManager.cancelTimer(corrID); - } - return (T) responseObject; - } - - /** - * indicates if a response received - * @return - */ - private boolean isResponseReceived() { - return responseObject != null; - } - - @Override - public synchronized void onTimeOut() { - LOG.error("sync response handler on timeout correlation ID <" + corrID + ">."); - timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID); - notify(); - } - - - - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java deleted file mode 100644 index 4ceeb3f08..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** Responsible to ensure synchronous handling of responses and timouts. - */ -class TaskQueue implements Runnable{ - - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class); - - private boolean isShutdown; - - synchronized void addTask(Runnable task) throws InterruptedException { - queue.put(task); - } - - public void run() { - Runnable task; - while(!Thread.currentThread().isInterrupted() && !isShutdown){ - try { - task = queue.take(); - task.run(); - } catch (InterruptedException e) { - LOG.error("could not take task from queue", e); - } catch (RuntimeException e) { - LOG.error("could not run task", e); - } - LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown); - } - LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process."); - } - - void stopQueue(){ - isShutdown = true; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java deleted file mode 100644 index b87349411..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** Creates a task queue pool that reuses a fixed number of threads. - * Assigns one thread for each queue. - */ -class TaskQueueManager { - private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class); - private ExecutorService executorService; - private final static String DEFAULT_POOL_SIZE = "10"; - private final static String CLIENT_POOL_SIZE = "client.pool.size"; - private TaskQueue[] queues; - private int poolInt; - - TaskQueueManager(Properties properties){ - String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE); - poolInt = Integer.parseInt(size); - this.executorService = Executors.newFixedThreadPool(poolInt); - initTaskQueues(); - } - - private void initTaskQueues(){ - queues = new TaskQueue[poolInt]; - for(int i=0; i listTask = executorService.shutdownNow(); - if (!executorService.awaitTermination(6, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - LOG.info("the amount of tasks that never commenced execution " + listTask.size()); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java deleted file mode 100644 index fa2d0804d..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.List; -import java.util.concurrent.*; - -class TimerServiceImpl implements ITimerService { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private final ConcurrentHashMap timeOutEvents = new ConcurrentHashMap<>(); - private final long responseTimeout; - - TimerServiceImpl(long responseTimeout) { - this.responseTimeout = responseTimeout; - } - - @Override - public synchronized void cancel(String correlationID) { - Future timeOutEvent = timeOutEvents.remove(correlationID); - if (timeOutEvent != null){ - timeOutEvent.cancel(true); - } - } - - @Override - public synchronized void add(String correlationID, ITimeoutHandler handler) { - Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS); - timeOutEvents.put(correlationID, timeOutEvent); - } - - @Override - public void shutdown() { - List listTask = scheduler.shutdownNow(); - LOG.info("the amount of tasks that never commenced execution " + listTask.size()); - } - - private class HandleTimeout implements Runnable { - - String correlationID; - ITimeoutHandler handler; - - HandleTimeout(String correlationID, ITimeoutHandler handler) { - this.correlationID = correlationID; - this.handler = handler; - } - - @Override - public void run(){ - System.out.println("Timeout event of request " + correlationID); - handler.onTimeout(); - timeOutEvents.remove(correlationID); - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java deleted file mode 100644 index a76f0a90b..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java +++ /dev/null @@ -1,77 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -class APPCMessageReaderWriter implements MessageReader, MessageWriter { - - private final ObjectMapper mapper; - private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class); - - APPCMessageReaderWriter() { - mapper = new ObjectMapper(); - } - - public String read(String payload, MessageContext context) throws ProtocolException { - try { - ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class); - context.setType(protocolMessage.getType()); - context.setRpc(protocolMessage.getRpcName()); - context.setCorrelationID(protocolMessage.getCorrelationID()); - context.setPartiton(protocolMessage.getPartition()); - String body = protocolMessage.getBody().toString(); - LOG.debug("Received body : <" + body + ">"); - return body; - } catch (IOException e) { - throw new ProtocolException(e); - } - - } - - public String write(String payload, MessageContext context) throws ProtocolException { - try { - ProtocolMessage protocolMessage = new ProtocolMessage(); - protocolMessage.setVersion("2.0"); - protocolMessage.setType(context.getType()); - protocolMessage.setRpcName(context.getRpc()); - protocolMessage.setCorrelationID(context.getCorrelationID()); - protocolMessage.setPartition(context.getPartiton()); - JsonNode body = mapper.readTree(payload); - protocolMessage.setBody(body); - String message = mapper.writeValueAsString(protocolMessage); - return message; - } catch (IOException e) { - throw new ProtocolException(e); - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java deleted file mode 100644 index 94d2d6b85..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java +++ /dev/null @@ -1,40 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; - -public interface AsyncProtocol extends Protocol { - - /** - * sends a string message to underlying message bus/java API - * @param payload - meesage body - * @param context - message headers - * @throws ProtocolException - */ - void sendRequest(String payload, MessageContext context) throws ProtocolException; - - void shutdown(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java deleted file mode 100644 index 82626d802..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java +++ /dev/null @@ -1,157 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -class AsyncProtocolImpl implements AsyncProtocol { - - /** - * message bus listener thread handler - */ - private Future listenerHandler; - /** - * called when messages are fetched - called for a single message - */ - private RetrieveMessageCallback callback; - /** - * message bus client used to send/fetch - */ - private MessagingService messageService; - /** - * Message reader used to extract body and context from reponse message - */ - private MessageReader messageReader; - /** - * Message writer used to construct meesage from body and context - */ - private MessageWriter messageWriter; - - /** - * shutdown indicator - */ - private boolean isShutdown = false; - - /** - * executor service for listener usage - */ - private ExecutorService executorService = Executors.newSingleThreadExecutor(); - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class); - - - AsyncProtocolImpl() { - - messageService = new UEBMessagingService(); - messageReader = new APPCMessageReaderWriter(); - messageWriter = (MessageWriter) messageReader; - } - - public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException { - - if (callback == null) { - throw new ProtocolException("Callback param should not be null!"); - } - this.callback = callback; - - try { - messageService.init(props); - //get message bus listener thread - //start the thread after initializing services - listenerHandler = executorService.submit(new Listener()); - } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) { - throw new ProtocolException(e); - } - } - - public void sendRequest(String payload, MessageContext context) throws ProtocolException { - - //get message to be sent to appc from payload and context - String message = messageWriter.write(payload, context); - try { - messageService.send(context.getPartiton(), message); - LOG.debug("Successfully send message: " + message); - } catch (IOException e) { - throw new ProtocolException(e); - } - } - - @Override - public void shutdown() { - isShutdown = true; - messageService.close(); - LOG.warn("The protocol layer in shutdown stage."); - executorService.shutdownNow(); - } - - public class Listener implements Runnable { - - - public void run() { - - while (!isShutdown) { - List messages = new ArrayList<>(); - try { - messages = messageService.fetch(); - LOG.debug("Successfully fetched " + messages.size() + " messages"); - } catch (IOException e) { - LOG.error("Fetching " + messages.size() + " messages failed"); - } - for (String message : messages) { - - MessageContext context = new MessageContext(); - String payload = null; - - try { - //get payload and context from message to be sent to core layer - payload = messageReader.read(message, context); - LOG.debug("Got body: " + payload); - //call core layer response handler - if(!isShutdown) { - callback.onResponse(payload, context); - }else{ - LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" + - context.getCorrelationID() + "> response ", message); - } - } catch (ProtocolException e) { - LOG.error("Failed to read message from UEB. message is: " + message); - } - } - } - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java deleted file mode 100644 index 4765a58ef..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java +++ /dev/null @@ -1,56 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.io.IOException; -import java.util.List; - -interface Consumer { - - /** - * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. - * - * @return A list of strings representing the messages pulled from the topic. - * @throws IOException - */ - List fetch() throws IOException; - - /** - * Gets a batch of messages from the topic. - * - * @param limit The amount of messages to fetch - * @return A list of strings representing the messages pulled from the topic. - * @throws IOException - */ - List fetch(int limit) throws IOException; - - /** - * Send dummy fetch request to register client to be able to fetch messages - * @throws IOException - */ - void registerForRead() throws IOException; - - void close(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java deleted file mode 100644 index 913f80f44..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java +++ /dev/null @@ -1,125 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -class ConsumerImpl implements Consumer { - - private static final int DEFAULT_LIMIT = 1000; - - private Collection hosts; - private String topic; - private String group; - private String groupId; - private int timeout; - - private String authKey; - private String authSecret; - - private CambriaConsumer consumer = null; - - /** - * constructor - * @param urls - * @param topicName - * @param consumerName - * @param consumerId - * @param timeout - */ - public ConsumerImpl(Collection urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - this.hosts = urls; - this.topic = topicName; - this.group = consumerName; - this.groupId = consumerId; - this.authKey = apiKey; - this.authSecret = apiSecret; - this.timeout = timeout; - consumer = getConsumer(); - } - - - public List fetch() throws IOException { - - return fetch(DEFAULT_LIMIT); - } - - public List fetch(int limit) throws IOException { - - List out = new ArrayList(); - try { - for(String msg : consumer.fetch(timeout,limit)){ - out.add(msg); - } - } catch (IOException e) { - throw e; - } - return out; - } - - public void registerForRead() throws IOException { - - int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages - consumer.fetch(waitForRegisteration, 1); - } - - /** - * init cambria consumer - * @return CambriaConsumer - */ - private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - - ConsumerBuilder builder = new ConsumerBuilder(); - - builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId); - builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout); - builder.receivingAtMost(DEFAULT_LIMIT); - - // Add credentials if provided - if (authKey != null && authSecret != null) { - - Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey"); - apiKeyField.setAccessible(true); - apiKeyField.set(builder, ""); - builder.authenticatedBy(authKey, authSecret); - } - - return builder.build(); - } - - @Override - public void close() { - consumer.close(); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java deleted file mode 100644 index 19688d696..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java +++ /dev/null @@ -1,39 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; - -public interface MessageReader { - - /** - * reads payload, fills the context out of payload headers, and returns the body of the payload - * @param payload incoming message - * @param context context to fill - * @return body of the payload - * @throws ProtocolException - */ - String read(String payload, MessageContext context) throws ProtocolException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java deleted file mode 100644 index 0849bc4a4..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java +++ /dev/null @@ -1,40 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import com.fasterxml.jackson.databind.JsonNode; - -public interface MessageWriter { - - /** - * builds a message out of context and payload - * @param payload body of the message - * @param context headers of the message - * @return the message to write/send - * @throws ProtocolException - */ - String write(String payload, MessageContext context) throws ProtocolException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java deleted file mode 100644 index 029378931..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.List; -import java.util.Properties; - -interface MessagingService { - - /** - * initialize consumer/publisher - * @param props - */ - void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException; - - /** - * sends a string as is - * @param partition - * @param body - */ - void send(String partition, String body) throws IOException; - - /** - * retrieve messages from bus - timeout extracted from props or see impl - * @return - */ - List fetch() throws IOException; - - /** - * retrieve messages from bus - timeout extracted from props or see impl - * @param limit - * @return - */ - List fetch(int limit) throws IOException; - - void close(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java deleted file mode 100644 index f290e8a89..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.io.IOException; - -interface Producer { - - /** - * send a message to a partition via ueb - * @param data - */ - void post(String Partition, String data) throws IOException; - - void close(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java deleted file mode 100644 index 7729db98d..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Collection; - -class ProducerImpl implements Producer { - - private Collection hosts; - private String topic; - private CambriaBatchingPublisher producer; - - private String authKey; - private String authSecret; - - public ProducerImpl(Collection urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException { - - topic = topicName; - hosts = urls; - authKey = apiKey; - authSecret = apiSecret; - producer = getProducer(); - } - - public void post(String partition, String data) throws IOException { - - producer.send(partition, data); - } - - /** - * get cambria producer - * @return - */ - private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException { - - PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts); - - // Add credentials if provided - if (authKey != null && authSecret != null) { - builder.authenticatedBy(authKey, authSecret); - } - - CambriaBatchingPublisher client = null; - - client = builder.onTopic(topic).build(); - - return client; - } - - @Override - public void close() { - producer.close(); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java deleted file mode 100644 index eaa21d857..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.util.Properties; - -public interface Protocol { - - /** - * init protocol properties and callback - * @param props - * @param callback - * @throws ProtocolException - */ - void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java deleted file mode 100644 index eb0537b80..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java +++ /dev/null @@ -1,44 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -public class ProtocolException extends Exception { - - public ProtocolException() { - super(); - } - - public ProtocolException(String message) { - super(message); - } - - public ProtocolException(String message, Throwable cause) { - super(message, cause); - } - - public ProtocolException(Throwable cause) { - super(cause); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java deleted file mode 100644 index 98e7d669b..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.util.HashMap; -import java.util.Map; - -public class ProtocolFactory { - - private static ProtocolFactory instance; - private Map protocols; - - /** - * Singleton factory - */ - private ProtocolFactory(){ - - protocols = new HashMap(); - } - - /** - * get factory instance - * @return factory instance - */ - public static synchronized ProtocolFactory getInstance(){ - - if (instance == null) { - instance = new ProtocolFactory(); - } - return instance; - } - - /** - * returns instantiated protocol object - * @param type of protocol object - * @return protocol object - */ - public Protocol getProtocolObject(ProtocolType type) throws ProtocolException { - - Protocol protocol = protocols.get(type); - synchronized (this) { - if (protocol == null) { - switch (type) { - case SYNC: - throw new ProtocolException("Protocol SYNC is not implemented"); - case ASYNC: - protocol = new AsyncProtocolImpl(); - protocols.put(type, protocol); - break; - default: - throw new ProtocolException("Protocol type not found"); - } - } - } - return protocol; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java deleted file mode 100644 index c02ea5607..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java +++ /dev/null @@ -1,98 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.JsonNode; - -class ProtocolMessage { - - private String version; - private String type; - private String rpcName; - private String correlationID; // correlation-id - private String partition; // cambria.partition - private JsonNode body; - - @JsonProperty - String getVersion() { - return version; - } - - @JsonProperty - void setVersion(String version) { - this.version = version; - } - - @JsonProperty - String getType() { - return type; - } - - @JsonProperty - void setType(String type) { - this.type = type; - } - - @JsonProperty("rpc-name") - String getRpcName() { - return rpcName; - } - - @JsonProperty("rpc-name") - void setRpcName(String rpcName) { - this.rpcName = rpcName; - } - - @JsonProperty("correlation-id") - String getCorrelationID() { - return correlationID; - } - - @JsonProperty("correlation-id") - void setCorrelationID(String correlationID) { - this.correlationID = correlationID; - } - - @JsonProperty("cambria.partition") - String getPartition() { - return partition; - } - - @JsonProperty("cambria.partition") - void setPartition(String partition) { - this.partition = partition; - } - - @JsonProperty - JsonNode getBody() { - return body; - } - - @JsonProperty - void setBody(JsonNode body) { - this.body = body; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java deleted file mode 100644 index cc2eca447..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java +++ /dev/null @@ -1,30 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -public enum ProtocolType { - - SYNC, ASYNC; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java deleted file mode 100644 index 8fc486bb8..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - - -import org.onap.appc.client.impl.core.MessageContext; - -public interface RetrieveMessageCallback { - - /** - * called when response received - * @param payload - * @param context - */ - void onResponse(String payload, MessageContext context); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java deleted file mode 100644 index df51861b8..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java +++ /dev/null @@ -1,102 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.*; - -class UEBMessagingService implements MessagingService { - - private Consumer consumer; - private Producer producer; - - private final String DEFAULT_READ_TIMEOUT_MS = "60000"; - private final String DEFAULT_READ_LIMIT = "1000"; - - private int readLimit; - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class); - - @SuppressWarnings("Since15") - public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - - if (props != null) { - String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ); - String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE); - String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER); - String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET); - String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS); - Integer readTimeout = Integer.parseInt(readTimeoutString); - String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT); - readLimit = Integer.parseInt(readLimitString); - //get hosts pool - Collection pool = new HashSet(); - String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS); - if (hostNames != null && !hostNames.isEmpty()) { - for (String name : hostNames.split(",")) { - pool.add(name); - } - } - - //generate consumer id and group - same value for both - String consumerName = UUID.randomUUID().toString(); - String consumerID = consumerName; - - //create consumer and producer - consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret); - producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret); - - //initial consumer registration - try { - consumer.registerForRead(); - }catch(Exception e){ - LOG.error("Message consumer failed to register client "+consumerID); - } - } - } - - public void send(String partition, String body) throws IOException { - producer.post(partition, body); - } - - public List fetch() throws IOException { - return consumer.fetch(readLimit); - } - - public List fetch(int limit) throws IOException { - return consumer.fetch(limit); - } - - @Override - public void close() { - consumer.close(); - producer.close(); - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java deleted file mode 100644 index 5c1916f2b..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -class UEBPropertiesKeys { - - static final String TOPIC_READ = "topic.read"; - static final String TOPIC_READ_TIMEOUT = "topic.read.timeout"; - static final String READ_LIMIT = "topic.read.limit"; - static final String TOPIC_WRITE = "topic.write"; - static final String AUTH_USER = "client.key"; - static final String AUTH_SECRET = "client.secret"; - static final String HOSTS = "poolMembers"; -} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/ResponseManagerTest.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/ResponseManagerTest.java new file mode 100644 index 000000000..cc157e5c4 --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/ResponseManagerTest.java @@ -0,0 +1,163 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import org.onap.appc.client.impl.core.AsyncRequestResponseHandler; +import org.onap.appc.client.impl.core.CoreException; +import org.onap.appc.client.impl.core.CoreManager; +import org.onap.appc.client.impl.core.ICoreAsyncResponseHandler; +import org.onap.appc.client.impl.core.MessageContext; +import org.onap.appc.client.impl.protocol.AsyncProtocol; +import org.onap.appc.client.impl.protocol.ProtocolException; +import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; +import org.junit.Before; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.mockito.Mockito.mock; + +public class ResponseManagerTest { + + private final ExecutorService executorService = Executors.newFixedThreadPool(10); + ICoreAsyncResponseHandler listener1 = new ListenerImpl(); + ICoreAsyncResponseHandler listener2 = new SleeepListenerImpl(); + ICoreAsyncResponseHandler listener3 = new ListenerImpl(); + CoreManager coreManager = null; + + public void initialize() throws CoreException { + Properties prop = new Properties(); + prop.setProperty("client.pool.size", "10"); + prop.setProperty("client.response.timeout", "7000"); + coreManager = new ResponseManagerTest.CoreManagerTest(prop); + } + + void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException { + AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager); + requestResponseHandler.sendRequest(request, correlationId, rpcName); + } + + public void simpleResponseTest() throws Exception { + System.out.println("simpleResponseTest"); + asyncRequest("request 1", listener1,"vasia1", "test"); + MessageContext msgCtx = new MessageContext(); + msgCtx.setCorrelationID("vasia1"); + msgCtx.setType("response"); + coreManager.getProtocolCallback().onResponse("vasia1 response",msgCtx); + coreManager.getProtocolCallback().onResponse("vasia2 response",msgCtx); + Thread.sleep(10); + } + + public void twoResponseTest() throws Exception { + System.out.println("twoResponseTest"); + asyncRequest("twoResponseTest request 1", listener2,"vasia2", "test"); + MessageContext msgCtx = new MessageContext(); + msgCtx.setCorrelationID("vasia2"); + msgCtx.setType("response"); + coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx); + Thread.sleep(100); + asyncRequest("twoResponseTest request 2", listener1,"vasia1", "test"); + MessageContext msgCtx2 = new MessageContext(); + msgCtx2.setCorrelationID("vasia1"); + msgCtx2.setType("response"); + coreManager.getProtocolCallback().onResponse("first of vasia1",msgCtx2); + Thread.sleep(150); + } + + public void threeResponseTest() throws Exception { + System.out.println("treeResponseTest"); + asyncRequest("threeResponseTest request 2", listener1,"vasia4", "test"); + asyncRequest("threeResponseTest request 1", listener2,"vasia2", "test"); + MessageContext msgCtx2 = new MessageContext(); + msgCtx2.setCorrelationID("vasia2"); + msgCtx2.setType("response"); + coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx2); + + asyncRequest("threeResponseTest request 2", listener1,"vasia1", "test"); + MessageContext msgCtx1 = new MessageContext(); + msgCtx1.setCorrelationID("vasia1"); + msgCtx1.setType("response"); + coreManager.getProtocolCallback().onResponse("vasia1",msgCtx1); + + asyncRequest("threeResponseTest request 3", listener3,"vasia3", "test"); + MessageContext msgCtx3 = new MessageContext(); + msgCtx3.setCorrelationID("vasia3"); + msgCtx3.setType("response"); + coreManager.getProtocolCallback().onResponse("three1",msgCtx3); + + coreManager.getProtocolCallback().onResponse("three2", msgCtx3); + + coreManager.getProtocolCallback().onResponse("first1", msgCtx1); + Thread.sleep(250); + + coreManager.getProtocolCallback().onResponse("first2", msgCtx1); + Thread.sleep(10000); + } + + private class ListenerImpl implements ICoreAsyncResponseHandler{ + + public boolean onResponse(String message, String type) { + System.out.println("callback " + message); + return message != null; + } + + @Override + public void onException(Exception e) { + e.printStackTrace(); + } + } + + private class SleeepListenerImpl implements ICoreAsyncResponseHandler{ + + public boolean onResponse(String message, String type) { + try { + Thread.sleep(150); + System.out.println("sleep callback " + message); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return message != null; + } + + @Override + public void onException(Exception e) { + e.printStackTrace(); + } + } + + class CoreManagerTest extends CoreManager{ + CoreManagerTest(Properties properties) throws CoreException { + super(properties); + protocol = mock(AsyncProtocol.class); + } + protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException { + } + + protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException { + + } + } +} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/SyncFlowTest.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/SyncFlowTest.java new file mode 100644 index 000000000..2e69369b9 --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/core/SyncFlowTest.java @@ -0,0 +1,151 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import org.onap.appc.client.impl.core.CoreException; +import org.onap.appc.client.impl.core.CoreManager; +import org.onap.appc.client.impl.core.ICoreSyncResponseHandler; +import org.onap.appc.client.impl.core.MessageContext; +import org.onap.appc.client.impl.core.SyncRequestResponseHandler; +import org.onap.appc.client.impl.protocol.AsyncProtocol; +import org.onap.appc.client.impl.protocol.ProtocolException; +import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Mockito.mock; + +public class SyncFlowTest { + CoreManager coreManager = null; + + public void initialize() throws CoreException { + Properties prop = new Properties(); + prop.setProperty("client.pool.size", "10"); + prop.setProperty("client.response.timeout", "7000"); + coreManager = new CoreManagerTest(prop); + } + + T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException { + SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager); + requestResponseHandler.sendRequest(request, correlationId, rpcName); + T responseObject = (T) requestResponseHandler.getResponse(); + return responseObject; + } + + public void blockRequestTest(){ + ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1(); + try { + syncRequest("request 1", handler, "vasia1", "test"); + }catch (Throwable e){ + e.printStackTrace(); + Assert.assertTrue(e != null); + } + + } + + public void blockRequestSucceedTest() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + final ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1(); + try { + executorService.submit(new Runnable() { + public void run() { + System.out.println("Send request"); + T response; + try { + response = syncRequest("request 1", handler, "vasia1", "test"); + System.out.println("=======" + response.toString()); + } catch (CoreException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } + }); + }catch (Throwable e){ + Assert.assertTrue((RuntimeException)e != null); + } + Thread.sleep(2000); + executorService.submit(new Runnable() { + public void run() { + MessageContext ctx = new MessageContext(); + ctx.setCorrelationID("vasia1"); + ctx.setType("response"); + try { + System.out.println("Send response 1"); + coreManager.getProtocolCallback().onResponse("response for request 1", ctx); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + Thread.sleep(2000); + executorService.submit(new Runnable() { + public void run() { + MessageContext ctx = new MessageContext(); + ctx.setCorrelationID("vasia1"); + ctx.setType("response"); + try { + System.out.println("Send response 2"); + coreManager.getProtocolCallback().onResponse("response for request 1 final", ctx); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + Thread.sleep(1000); + + } + + class ICoreSyncResponseHandlerImpl1 implements ICoreSyncResponseHandler{ + + + public T onResponse(String message, String type) { + System.out.println("Received message = " + message) ; + if(message.contains("final")){ + return (T) new String(message); + } + return null; + } + } + + class CoreManagerTest extends CoreManager{ + CoreManagerTest(Properties properties) throws CoreException { + super(properties); + protocol = mock(AsyncProtocol.class); + } + protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException { + } + + protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException{ + + } + } +} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriterTest.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriterTest.java new file mode 100644 index 000000000..1d41a52f8 --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriterTest.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import org.onap.appc.client.impl.protocol.APPCMessageReaderWriter; +import org.onap.appc.client.impl.protocol.ProtocolException; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringWriter; + +public class APPCMessageReaderWriterTest { + + private APPCMessageReaderWriter messageReaderWriter; + private ObjectMapper mapper; + + private static final String VERSION = "2.0"; + private static final String TYPE = "typeTest"; + private static final String CORRELATION_ID = "correlationIdTest"; + private static final String PARTITION = "partitionTest"; + private static final String RPC = "rpcTest"; + private static final String PAYLOAD = "{\"key1\":\"val1\",\"key2\":\"val2\",\"key3\":{\"key3.1\":\"val3.1\"}}"; + + @Before + public void init() throws IOException { + mapper = new ObjectMapper(); + messageReaderWriter = new APPCMessageReaderWriter(); + } + + @Test + public void writeTest() throws IOException, ProtocolException { + MessageContext context = new MessageContext(); + context.setType(TYPE); + context.setCorrelationID(CORRELATION_ID); + context.setPartiton(PARTITION); + context.setRpc(RPC); + String payload = PAYLOAD; + String message = messageReaderWriter.write(payload, context); + + JsonNode messageJson = mapper.readTree(message); + Assert.assertEquals(VERSION, messageJson.get("version").asText()); + Assert.assertEquals(context.getType(), messageJson.get("type").asText()); + Assert.assertEquals(context.getCorrelationID(), messageJson.get("correlation-id").asText()); + Assert.assertEquals(context.getPartiton(), messageJson.get("cambria.partition").asText()); + Assert.assertEquals(context.getRpc(), messageJson.get("rpc-name").asText()); + Assert.assertEquals(payload, messageJson.get("body").toString()); + } + + @Test + public void readTest() throws IOException, ProtocolException { + ObjectNode node = mapper.createObjectNode(); + node.put("version", VERSION); + node.put("type", TYPE); + node.put("correlation-id", CORRELATION_ID); + node.put("cambria.partition", PARTITION); + node.put("rpc-name", RPC); + JsonNode payload = mapper.valueToTree(PAYLOAD); + node.set("body", payload); + String message = node.toString(); + + MessageContext returnContext = new MessageContext(); + String returnPayload = messageReaderWriter.read(message, returnContext); + + Assert.assertEquals(TYPE, returnContext.getType()); + Assert.assertEquals(CORRELATION_ID, returnContext.getCorrelationID()); + Assert.assertEquals(PARTITION, returnContext.getPartiton()); + Assert.assertEquals(RPC, returnContext.getRpc()); + Assert.assertEquals(payload.toString(), returnPayload); + } + +} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImpl.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImpl.java new file mode 100644 index 000000000..ceba10b58 --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImpl.java @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import org.onap.appc.client.impl.protocol.AsyncProtocol; +import org.onap.appc.client.impl.protocol.AsyncProtocolImpl; +import org.onap.appc.client.impl.protocol.ProtocolException; +import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; +import org.onap.appc.client.impl.protocol.UEBPropertiesKeys; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestAsyncProtocolImpl { + + private static AsyncProtocol protocol; + private static AtomicBoolean gotResponse; + private static Properties props; + + private static class TestCallback implements RetrieveMessageCallback{ + + public void onResponse(String payload, MessageContext context) { + Assert.assertNotEquals(null, payload); + Assert.assertNotEquals(null, context); + protocol = null; + gotResponse.set(true); + } + } + + @BeforeClass + public static void setUp() throws IOException, ProtocolException { + + gotResponse = new AtomicBoolean(false); + + props = new Properties(); + String propFileName = "ueb.properties"; + + InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName); + + props.load(input); + + protocol = new AsyncProtocolImpl(); + protocol.init(props, new TestCallback()); + } + + public void testSendRequest() throws ProtocolException { + + MessageContext context = new MessageContext(); + context.setType("Test"); + + protocol.sendRequest("{\"Test\":\"\"}", context); + + try { + Long timeToSleep = Long.parseLong((String)props.get(UEBPropertiesKeys.TOPIC_READ_TIMEOUT))*2; + Thread.sleep(timeToSleep); + } catch (InterruptedException e) { + Assert.assertFalse(e.getMessage(), false); + } + if (gotResponse.get() == false) { + Assert.assertFalse("Message was not read !", true); + } + } +} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java new file mode 100644 index 000000000..267282abd --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java @@ -0,0 +1,75 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import org.onap.appc.client.impl.protocol.AsyncProtocol; +import org.onap.appc.client.impl.protocol.AsyncProtocolImpl; +import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class TestAsyncProtocolImplMissingProps { + + private static AsyncProtocol protocol; + + private static class TestCallback implements RetrieveMessageCallback { + + public void onResponse(String payload, MessageContext context) { + Assert.assertFalse("bad Callback !",false); + } + } + + @Test + /** + * protocol should throw illegal argument exception due to null properties + */ + public void testSetUpMissingProps() { + + Properties props = new Properties(); + String propFileName = "ueb.missing.properties"; + + InputStream input = TestAsyncProtocolImplMissingProps.class.getClassLoader().getResourceAsStream(propFileName); + + try { + props.load(input); + } catch (IOException e) { + Assert.assertFalse(e.getMessage(),false); + } + + protocol = new AsyncProtocolImpl(); + try { + protocol.init(props, new TestCallback()); + } catch (IllegalArgumentException e) { + Assert.assertTrue(true); + } catch (Exception e) { + Assert.assertFalse(e.getMessage(),false); + } + } +} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java new file mode 100644 index 000000000..22954cd75 --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java @@ -0,0 +1,60 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.junit.Assert; +import org.junit.Test; +import org.onap.appc.client.impl.protocol.AsyncProtocol; +import org.onap.appc.client.impl.protocol.AsyncProtocolImpl; +import org.onap.appc.client.impl.protocol.ProtocolException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class TestAsyncProtocolImplNullCallback { + + private static AsyncProtocol protocol; + + public void testSetUpNoCallback() throws IOException { + + Properties props = new Properties(); + String propFileName = "ueb.properties"; + + InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName); + + props.load(input); + + protocol = new AsyncProtocolImpl(); + + try { + protocol.init(props, null); + } catch (ProtocolException e) { + Assert.assertTrue(true); + } catch (Exception e){ + Assert.assertFalse(e.getMessage(),false); + } + } +} diff --git a/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestUEBMessagingService.java b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestUEBMessagingService.java new file mode 100644 index 000000000..de192ab12 --- /dev/null +++ b/appc-client/client-lib/src/test/java/org/onap/appc/client/impl/protocol/TestUEBMessagingService.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.junit.*; +import org.junit.runners.MethodSorters; +import org.onap.appc.client.impl.protocol.MessagingService; +import org.onap.appc.client.impl.protocol.UEBMessagingService; + +import java.io.IOException; +import java.io.InputStream; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Properties; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestUEBMessagingService { + + private static MessagingService ueb; + + public static void setUp() throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + + Properties props = new Properties(); + String propFileName = "ueb.properties"; + + InputStream input = TestUEBMessagingService.class.getClassLoader().getResourceAsStream(propFileName); + + props.load(input); + + ueb = new UEBMessagingService(); + ueb.init(props); + } + + public void test1Send() throws IOException { + System.out.println("Here"); + + String message = "Test Message Service"; + ueb.send(null,message); + } + + public void test2Fetch() throws IOException { + + System.out.println("Here2"); + List messages = ueb.fetch(1); + Assert.assertEquals(1,messages.size()); + } + +} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java deleted file mode 100644 index cc157e5c4..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import org.onap.appc.client.impl.core.AsyncRequestResponseHandler; -import org.onap.appc.client.impl.core.CoreException; -import org.onap.appc.client.impl.core.CoreManager; -import org.onap.appc.client.impl.core.ICoreAsyncResponseHandler; -import org.onap.appc.client.impl.core.MessageContext; -import org.onap.appc.client.impl.protocol.AsyncProtocol; -import org.onap.appc.client.impl.protocol.ProtocolException; -import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; -import org.junit.Before; - -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.mockito.Mockito.mock; - -public class ResponseManagerTest { - - private final ExecutorService executorService = Executors.newFixedThreadPool(10); - ICoreAsyncResponseHandler listener1 = new ListenerImpl(); - ICoreAsyncResponseHandler listener2 = new SleeepListenerImpl(); - ICoreAsyncResponseHandler listener3 = new ListenerImpl(); - CoreManager coreManager = null; - - public void initialize() throws CoreException { - Properties prop = new Properties(); - prop.setProperty("client.pool.size", "10"); - prop.setProperty("client.response.timeout", "7000"); - coreManager = new ResponseManagerTest.CoreManagerTest(prop); - } - - void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException { - AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager); - requestResponseHandler.sendRequest(request, correlationId, rpcName); - } - - public void simpleResponseTest() throws Exception { - System.out.println("simpleResponseTest"); - asyncRequest("request 1", listener1,"vasia1", "test"); - MessageContext msgCtx = new MessageContext(); - msgCtx.setCorrelationID("vasia1"); - msgCtx.setType("response"); - coreManager.getProtocolCallback().onResponse("vasia1 response",msgCtx); - coreManager.getProtocolCallback().onResponse("vasia2 response",msgCtx); - Thread.sleep(10); - } - - public void twoResponseTest() throws Exception { - System.out.println("twoResponseTest"); - asyncRequest("twoResponseTest request 1", listener2,"vasia2", "test"); - MessageContext msgCtx = new MessageContext(); - msgCtx.setCorrelationID("vasia2"); - msgCtx.setType("response"); - coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx); - Thread.sleep(100); - asyncRequest("twoResponseTest request 2", listener1,"vasia1", "test"); - MessageContext msgCtx2 = new MessageContext(); - msgCtx2.setCorrelationID("vasia1"); - msgCtx2.setType("response"); - coreManager.getProtocolCallback().onResponse("first of vasia1",msgCtx2); - Thread.sleep(150); - } - - public void threeResponseTest() throws Exception { - System.out.println("treeResponseTest"); - asyncRequest("threeResponseTest request 2", listener1,"vasia4", "test"); - asyncRequest("threeResponseTest request 1", listener2,"vasia2", "test"); - MessageContext msgCtx2 = new MessageContext(); - msgCtx2.setCorrelationID("vasia2"); - msgCtx2.setType("response"); - coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx2); - - asyncRequest("threeResponseTest request 2", listener1,"vasia1", "test"); - MessageContext msgCtx1 = new MessageContext(); - msgCtx1.setCorrelationID("vasia1"); - msgCtx1.setType("response"); - coreManager.getProtocolCallback().onResponse("vasia1",msgCtx1); - - asyncRequest("threeResponseTest request 3", listener3,"vasia3", "test"); - MessageContext msgCtx3 = new MessageContext(); - msgCtx3.setCorrelationID("vasia3"); - msgCtx3.setType("response"); - coreManager.getProtocolCallback().onResponse("three1",msgCtx3); - - coreManager.getProtocolCallback().onResponse("three2", msgCtx3); - - coreManager.getProtocolCallback().onResponse("first1", msgCtx1); - Thread.sleep(250); - - coreManager.getProtocolCallback().onResponse("first2", msgCtx1); - Thread.sleep(10000); - } - - private class ListenerImpl implements ICoreAsyncResponseHandler{ - - public boolean onResponse(String message, String type) { - System.out.println("callback " + message); - return message != null; - } - - @Override - public void onException(Exception e) { - e.printStackTrace(); - } - } - - private class SleeepListenerImpl implements ICoreAsyncResponseHandler{ - - public boolean onResponse(String message, String type) { - try { - Thread.sleep(150); - System.out.println("sleep callback " + message); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return message != null; - } - - @Override - public void onException(Exception e) { - e.printStackTrace(); - } - } - - class CoreManagerTest extends CoreManager{ - CoreManagerTest(Properties properties) throws CoreException { - super(properties); - protocol = mock(AsyncProtocol.class); - } - protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException { - } - - protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException { - - } - } -} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java deleted file mode 100644 index 2e69369b9..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import org.onap.appc.client.impl.core.CoreException; -import org.onap.appc.client.impl.core.CoreManager; -import org.onap.appc.client.impl.core.ICoreSyncResponseHandler; -import org.onap.appc.client.impl.core.MessageContext; -import org.onap.appc.client.impl.core.SyncRequestResponseHandler; -import org.onap.appc.client.impl.protocol.AsyncProtocol; -import org.onap.appc.client.impl.protocol.ProtocolException; -import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; -import org.junit.Assert; -import org.junit.Before; - -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; - -import static org.mockito.Mockito.mock; - -public class SyncFlowTest { - CoreManager coreManager = null; - - public void initialize() throws CoreException { - Properties prop = new Properties(); - prop.setProperty("client.pool.size", "10"); - prop.setProperty("client.response.timeout", "7000"); - coreManager = new CoreManagerTest(prop); - } - - T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException { - SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager); - requestResponseHandler.sendRequest(request, correlationId, rpcName); - T responseObject = (T) requestResponseHandler.getResponse(); - return responseObject; - } - - public void blockRequestTest(){ - ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1(); - try { - syncRequest("request 1", handler, "vasia1", "test"); - }catch (Throwable e){ - e.printStackTrace(); - Assert.assertTrue(e != null); - } - - } - - public void blockRequestSucceedTest() throws InterruptedException { - ExecutorService executorService = Executors.newFixedThreadPool(2); - final ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1(); - try { - executorService.submit(new Runnable() { - public void run() { - System.out.println("Send request"); - T response; - try { - response = syncRequest("request 1", handler, "vasia1", "test"); - System.out.println("=======" + response.toString()); - } catch (CoreException e) { - e.printStackTrace(); - } catch (TimeoutException e) { - e.printStackTrace(); - } - } - }); - }catch (Throwable e){ - Assert.assertTrue((RuntimeException)e != null); - } - Thread.sleep(2000); - executorService.submit(new Runnable() { - public void run() { - MessageContext ctx = new MessageContext(); - ctx.setCorrelationID("vasia1"); - ctx.setType("response"); - try { - System.out.println("Send response 1"); - coreManager.getProtocolCallback().onResponse("response for request 1", ctx); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - Thread.sleep(2000); - executorService.submit(new Runnable() { - public void run() { - MessageContext ctx = new MessageContext(); - ctx.setCorrelationID("vasia1"); - ctx.setType("response"); - try { - System.out.println("Send response 2"); - coreManager.getProtocolCallback().onResponse("response for request 1 final", ctx); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - Thread.sleep(1000); - - } - - class ICoreSyncResponseHandlerImpl1 implements ICoreSyncResponseHandler{ - - - public T onResponse(String message, String type) { - System.out.println("Received message = " + message) ; - if(message.contains("final")){ - return (T) new String(message); - } - return null; - } - } - - class CoreManagerTest extends CoreManager{ - CoreManagerTest(Properties properties) throws CoreException { - super(properties); - protocol = mock(AsyncProtocol.class); - } - protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException { - } - - protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException{ - - } - } -} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java deleted file mode 100644 index 1d41a52f8..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import org.onap.appc.client.impl.protocol.APPCMessageReaderWriter; -import org.onap.appc.client.impl.protocol.ProtocolException; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.StringWriter; - -public class APPCMessageReaderWriterTest { - - private APPCMessageReaderWriter messageReaderWriter; - private ObjectMapper mapper; - - private static final String VERSION = "2.0"; - private static final String TYPE = "typeTest"; - private static final String CORRELATION_ID = "correlationIdTest"; - private static final String PARTITION = "partitionTest"; - private static final String RPC = "rpcTest"; - private static final String PAYLOAD = "{\"key1\":\"val1\",\"key2\":\"val2\",\"key3\":{\"key3.1\":\"val3.1\"}}"; - - @Before - public void init() throws IOException { - mapper = new ObjectMapper(); - messageReaderWriter = new APPCMessageReaderWriter(); - } - - @Test - public void writeTest() throws IOException, ProtocolException { - MessageContext context = new MessageContext(); - context.setType(TYPE); - context.setCorrelationID(CORRELATION_ID); - context.setPartiton(PARTITION); - context.setRpc(RPC); - String payload = PAYLOAD; - String message = messageReaderWriter.write(payload, context); - - JsonNode messageJson = mapper.readTree(message); - Assert.assertEquals(VERSION, messageJson.get("version").asText()); - Assert.assertEquals(context.getType(), messageJson.get("type").asText()); - Assert.assertEquals(context.getCorrelationID(), messageJson.get("correlation-id").asText()); - Assert.assertEquals(context.getPartiton(), messageJson.get("cambria.partition").asText()); - Assert.assertEquals(context.getRpc(), messageJson.get("rpc-name").asText()); - Assert.assertEquals(payload, messageJson.get("body").toString()); - } - - @Test - public void readTest() throws IOException, ProtocolException { - ObjectNode node = mapper.createObjectNode(); - node.put("version", VERSION); - node.put("type", TYPE); - node.put("correlation-id", CORRELATION_ID); - node.put("cambria.partition", PARTITION); - node.put("rpc-name", RPC); - JsonNode payload = mapper.valueToTree(PAYLOAD); - node.set("body", payload); - String message = node.toString(); - - MessageContext returnContext = new MessageContext(); - String returnPayload = messageReaderWriter.read(message, returnContext); - - Assert.assertEquals(TYPE, returnContext.getType()); - Assert.assertEquals(CORRELATION_ID, returnContext.getCorrelationID()); - Assert.assertEquals(PARTITION, returnContext.getPartiton()); - Assert.assertEquals(RPC, returnContext.getRpc()); - Assert.assertEquals(payload.toString(), returnPayload); - } - -} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java deleted file mode 100644 index ceba10b58..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import org.onap.appc.client.impl.protocol.AsyncProtocol; -import org.onap.appc.client.impl.protocol.AsyncProtocolImpl; -import org.onap.appc.client.impl.protocol.ProtocolException; -import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; -import org.onap.appc.client.impl.protocol.UEBPropertiesKeys; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -public class TestAsyncProtocolImpl { - - private static AsyncProtocol protocol; - private static AtomicBoolean gotResponse; - private static Properties props; - - private static class TestCallback implements RetrieveMessageCallback{ - - public void onResponse(String payload, MessageContext context) { - Assert.assertNotEquals(null, payload); - Assert.assertNotEquals(null, context); - protocol = null; - gotResponse.set(true); - } - } - - @BeforeClass - public static void setUp() throws IOException, ProtocolException { - - gotResponse = new AtomicBoolean(false); - - props = new Properties(); - String propFileName = "ueb.properties"; - - InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName); - - props.load(input); - - protocol = new AsyncProtocolImpl(); - protocol.init(props, new TestCallback()); - } - - public void testSendRequest() throws ProtocolException { - - MessageContext context = new MessageContext(); - context.setType("Test"); - - protocol.sendRequest("{\"Test\":\"\"}", context); - - try { - Long timeToSleep = Long.parseLong((String)props.get(UEBPropertiesKeys.TOPIC_READ_TIMEOUT))*2; - Thread.sleep(timeToSleep); - } catch (InterruptedException e) { - Assert.assertFalse(e.getMessage(), false); - } - if (gotResponse.get() == false) { - Assert.assertFalse("Message was not read !", true); - } - } -} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java deleted file mode 100644 index 267282abd..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java +++ /dev/null @@ -1,75 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import org.onap.appc.client.impl.protocol.AsyncProtocol; -import org.onap.appc.client.impl.protocol.AsyncProtocolImpl; -import org.onap.appc.client.impl.protocol.RetrieveMessageCallback; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -public class TestAsyncProtocolImplMissingProps { - - private static AsyncProtocol protocol; - - private static class TestCallback implements RetrieveMessageCallback { - - public void onResponse(String payload, MessageContext context) { - Assert.assertFalse("bad Callback !",false); - } - } - - @Test - /** - * protocol should throw illegal argument exception due to null properties - */ - public void testSetUpMissingProps() { - - Properties props = new Properties(); - String propFileName = "ueb.missing.properties"; - - InputStream input = TestAsyncProtocolImplMissingProps.class.getClassLoader().getResourceAsStream(propFileName); - - try { - props.load(input); - } catch (IOException e) { - Assert.assertFalse(e.getMessage(),false); - } - - protocol = new AsyncProtocolImpl(); - try { - protocol.init(props, new TestCallback()); - } catch (IllegalArgumentException e) { - Assert.assertTrue(true); - } catch (Exception e) { - Assert.assertFalse(e.getMessage(),false); - } - } -} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java deleted file mode 100644 index 22954cd75..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java +++ /dev/null @@ -1,60 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.junit.Assert; -import org.junit.Test; -import org.onap.appc.client.impl.protocol.AsyncProtocol; -import org.onap.appc.client.impl.protocol.AsyncProtocolImpl; -import org.onap.appc.client.impl.protocol.ProtocolException; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -public class TestAsyncProtocolImplNullCallback { - - private static AsyncProtocol protocol; - - public void testSetUpNoCallback() throws IOException { - - Properties props = new Properties(); - String propFileName = "ueb.properties"; - - InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName); - - props.load(input); - - protocol = new AsyncProtocolImpl(); - - try { - protocol.init(props, null); - } catch (ProtocolException e) { - Assert.assertTrue(true); - } catch (Exception e){ - Assert.assertFalse(e.getMessage(),false); - } - } -} diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java deleted file mode 100644 index de192ab12..000000000 --- a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java +++ /dev/null @@ -1,70 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.junit.*; -import org.junit.runners.MethodSorters; -import org.onap.appc.client.impl.protocol.MessagingService; -import org.onap.appc.client.impl.protocol.UEBMessagingService; - -import java.io.IOException; -import java.io.InputStream; -import java.security.GeneralSecurityException; -import java.util.List; -import java.util.Properties; - -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class TestUEBMessagingService { - - private static MessagingService ueb; - - public static void setUp() throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - - Properties props = new Properties(); - String propFileName = "ueb.properties"; - - InputStream input = TestUEBMessagingService.class.getClassLoader().getResourceAsStream(propFileName); - - props.load(input); - - ueb = new UEBMessagingService(); - ueb.init(props); - } - - public void test1Send() throws IOException { - System.out.println("Here"); - - String message = "Test Message Service"; - ueb.send(null,message); - } - - public void test2Fetch() throws IOException { - - System.out.println("Here2"); - List messages = ueb.fetch(1); - Assert.assertEquals(1,messages.size()); - } - -} -- cgit 1.2.3-korg