aboutsummaryrefslogtreecommitdiffstats
path: root/appc-client/client-lib/src/main/java/org/openecomp
diff options
context:
space:
mode:
Diffstat (limited to 'appc-client/client-lib/src/main/java/org/openecomp')
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java85
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java76
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java45
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java314
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java70
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java43
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java28
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java36
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java68
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java33
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java47
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java69
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java36
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java85
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java50
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java107
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java65
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java98
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java82
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java77
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java40
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java157
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java56
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java125
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java39
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java40
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java61
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java38
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java82
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java38
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java44
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java79
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java98
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java30
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java38
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java102
-rw-r--r--appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java36
37 files changed, 0 insertions, 2617 deletions
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java
deleted file mode 100644
index c5d6120f0..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-/** Abstract request response handler class, responsible for common functionality of
- * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler}
- */
-abstract class AbstractRequestResponseHandler implements RequestResponseHandler {
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class);
- ICoreResponseHandler businessCallback;
- protected String corrID;
- CoreManager coreManager;
-
-
- AbstractRequestResponseHandler(String corrID,
- ICoreResponseHandler businessCallback,
- CoreManager coreManager)
- {
- this.businessCallback = businessCallback;
- this.corrID = corrID;
- this.coreManager = coreManager;
- }
-
- public synchronized void handleResponse(final MessageContext ctx, final String response) {
- try {
- coreManager.submitTask(ctx.getCorrelationID(), new Runnable() {
- @Override
- public void run() {
- LOG.info("handling response of corrID <" + corrID + ">" + "response " + response);
- if(coreManager.isExistHandler(corrID)) {
- runTask(response, ctx.getType());
- }
-
- }
- });
- } catch (InterruptedException e) {
- LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e);
- }
- }
-
- /**
- *
- * @param response - Response
- * @param type - Type of Response
- */
- abstract void runTask(String response, String type);
-
- @Override
- public void sendRequest(String request, String corrId, String rpcName) throws CoreException {
- if(!coreManager.isShutdownInProgress()) {
- coreManager.registerHandler(corrId, this);
- coreManager.sendRequest(request, corrId, rpcName);
- coreManager.startTimer(corrId);
- }else{
- throw new CoreException("Shutdown is in progress. Request will not be handled");
- }
- }
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java
deleted file mode 100644
index 1bcf0af99..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.util.concurrent.TimeoutException;
-
-/** Handles async responses
- */
-class AsyncRequestResponseHandler extends AbstractRequestResponseHandler {
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class);
-
- AsyncRequestResponseHandler(String corrID,
- ICoreResponseHandler businessCallback,
- CoreManager coreManager)
- {
- super(corrID, businessCallback, coreManager);
- }
-
- /**
- * Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and
- * unregisters the handler.
- * @param response - Response
- * @param type - Type of Response
- */
- public void runTask(String response, String type) {
- boolean finalTask = false;
- try {
- finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type);
- } catch (Exception e){
- LOG.error("Error on API layer, for request with correlation-id " + corrID, e);
- }
- if (finalTask){
- coreManager.cancelTimer(corrID);
- coreManager.unregisterHandler(corrID);
- }
- else{
- response = null;
- type = null;
- }
- }
-
- /**
- * Calls to API layer for sending timeout exception.
- */
- @Override
- public void onTimeOut() {
- LOG.info("timeout for request with correlation-id " + corrID);
- ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID));
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java
deleted file mode 100644
index a2bf6e250..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-
-public class CoreException extends Exception {
-
- public CoreException() {
- super();
- }
-
- public CoreException(String message) {
- super(message);
- }
-
- public CoreException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public CoreException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java
deleted file mode 100644
index c1c23890e..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import org.onap.appc.client.impl.protocol.*;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
- */
-class CoreManager{
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
- private final ProtocolFactory protocolFactory;
- protected AsyncProtocol protocol;
- private final RetrieveMessageCallback protocolCallback = null;
- private final CoreRegistry registry;
- private final ITimerService timerService;
- private final TaskQueueManager queueManager;
- private String DEFAULT_TIMEOUT = "300000";
- private final static String RESPONSE_TIMEOUT = "client.response.timeout";
- private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
- private boolean isForceShutdown = false;
- private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
- private long shutdownTimeout;
-
- CoreManager(Properties prop) throws CoreException {
- protocolFactory = ProtocolFactory.getInstance();
- try {
- initProtocol(prop);
- }catch (ProtocolException e){
- throw new CoreException(e);
- }
- registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
- String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
- long responseTimeout = Long.parseLong(timeoutProp);
- String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
- shutdownTimeout = Long.parseLong(gracefulTimeout);
- timerService = new TimerServiceImpl(responseTimeout);
- queueManager = new TaskQueueManager(prop);
- listenShutdown();
- }
-
- /**
- * initiates protocol layer services.
- * @param prop - Properties
- */
- private void initProtocol(Properties prop) throws ProtocolException {
- protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
- protocol.init(prop, getProtocolCallback());
- }
-
- /**
- * Creates protocol response callback
- * @return - @{@link ProtocolResponseCallbackImpl}
- */
- RetrieveMessageCallback getProtocolCallback(){
- return new ProtocolResponseCallbackImpl();
- }
-
- /**
- * Registers a new handler in registry
- * @param corrID - Correlation ID
- * @param requestResponseHandler handler to be called when response arrives
- */
- void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
- registry.register(corrID, requestResponseHandler);
- }
-
- /**
- * Remove a handler from registry service by correlation ID.
- * @param corrID - Correlation ID
- * @return - @{@link RequestResponseHandler}
- */
- RequestResponseHandler unregisterHandler(String corrID){
- return (RequestResponseHandler) registry.unregister(corrID);
- }
-
- /**
- * Checks in registry service if a handler is existing.
- * @param corrID - Correlation ID
- * @return - boolean
- */
- boolean isExistHandler(String corrID) {
- return registry.isExist(corrID);
- }
-
- /**
- * Starts timer for timeout event when a request was send successfully.
- * @param corrID - Correlation ID
- */
- void startTimer(String corrID){
- timerService.add(corrID, new TimeoutHandlerImpl(corrID));
- }
-
- /**
- * Cancels timer for fimeout event, in case when complete response was received
- * @param corrID
- */
- void cancelTimer(String corrID){
- timerService.cancel(corrID);
- }
-
- /**
- * Submits a new task to Queue manager. it is using for both response and timeout tasks
- * @param corrID - Correlation ID
- * @param task - @{@link Runnable} task.
- * @throws InterruptedException
- */
- void submitTask(String corrID, Runnable task) throws InterruptedException {
- queueManager.submit(corrID, task);
- }
-
- /**
- * Sends request to protocol.
- * @param request - Request
- * @param corrId - Correlation ID
- * @param rpcName - RPC name
- * @throws CoreException - @{@link CoreException}
- */
- void sendRequest(String request, String corrId, String rpcName) throws CoreException {
- MessageContext ctx = getMessageContext(corrId, rpcName);
- try {
- protocol.sendRequest(request, ctx);
- } catch (ProtocolException e) {
- unregisterHandler(corrId);
- throw new CoreException(e);
- }
- }
-
- /**
- * Creates @{@link MessageContext}
- * @param correlationId - Correlation ID
- * @param rpcName - RPC Name
- * @return - @{@link MessageContext}
- */
- private MessageContext getMessageContext(String correlationId, String rpcName){
- MessageContext msgCtx = new MessageContext();
- msgCtx.setCorrelationID(correlationId);
- msgCtx.setRpc(rpcName);
- return msgCtx;
- }
-
- /**
- * Implements response callback from protocol and filters responses by correlation ID.
- * Only registered events(by correlation ID) will be handled.
- */
- private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
- @Override
- public void onResponse(String response, MessageContext context) {
- String corrID = context.getCorrelationID();
- if (corrID != null) {
- RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
- if (messageHandler != null) {
- LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
- messageHandler.handleResponse(context, response);
- }
- }
- }
- }
-
-
- /**
- * listens to @{@link Runtime} shutdown event
- */
- private void listenShutdown() {
- Runtime.getRuntime().addShutdownHook(new Thread(){
- public void run(){
- gracefulShutdown();
- }
- });
- }
-
- /**
- * Implements shutdown for client library.
- * @param isForceShutdown - true force shutdown, false graceful shutdown
- */
- void shutdown(boolean isForceShutdown){
- if(isForceShutdown){
- forceShutdown();
- }else{
- gracefulShutdown();
- }
- }
-
- /**
- * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
- * shutdown only when either all request will be handled or graceful shutdown will be time out.
- */
- synchronized void gracefulShutdown(){
- isGracefulShutdown.set(true);
- if(registry.isEmpty()){
- forceShutdown();
- }
- else{
- try {
- LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
- wait(shutdownTimeout);
- LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
- forceShutdown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- }
-
- /**
- * Closes Protocol, stops Queue Manager and shutdowns Time Service.
- */
- private void forceShutdown(){
- isForceShutdown = true;
- try {
- LOG.info("Starting shutdown process.");
- protocol.shutdown();
- queueManager.stopQueueManager();
- timerService.shutdown();
- } catch (InterruptedException e) {
- LOG.info("Client library shutdown in progress ", e);
- }
- }
-
- /**
- *
- * @return - true when shutdown is in process
- */
- boolean isShutdownInProgress(){
- return isForceShutdown || isGracefulShutdown.get();
- }
-
- /**
- * Timeout handler implementation.
- * This handler is responsible to assign a task for handling of timeout events.
- *
- */
- private class TimeoutHandlerImpl implements ITimeoutHandler {
-
- private final String corrID;
-
- TimeoutHandlerImpl(String corrID) {
- this.corrID = corrID;
- }
-
- /**
- * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
- * this queue is shared between both timeout and handlers which belong to same correlation ID.
- */
- @Override
- public void onTimeout() {
- try {
- submitTask(corrID, new Runnable() {
- @Override
- public void run() {
- RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
- if (requestResponseHandler != null) {
- requestResponseHandler.onTimeOut();
- }
- }
- });
- } catch (InterruptedException e) {
- LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
- }
- }
- }
-
-
- /**
- * Wakes Up graceful shutdown.
- */
- class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
- @Override
- public synchronized void emptyCallback() {
- LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
- if(isGracefulShutdown.get()){
- wakeUpShutdown();
- }
- }
- }
-
- /**
- * wakes up waiting shutdown.
- */
- private synchronized void wakeUpShutdown(){
- notifyAll();
- }
-
-}
-
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java
deleted file mode 100644
index e0a0c5b34..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/** client lib Registry
- */
-class CoreRegistry<T>{
- private Map<String, T> registry =
- new ConcurrentHashMap<String, T>();
-
- final private EmptyRegistryCallback emptyRegistryCallback;
-
-
- CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){
- this.emptyRegistryCallback = emptyRegistryCallback;
- }
-
- void register(String key, T obj) {
- registry.put(key, obj);
- }
-
- <T> T unregister(String key) {
- T item = (T) registry.remove(key);
- if(registry.isEmpty()) {
- emptyRegistryCallback.emptyCallback();
- }
- return item;
- }
-
- <T> T get(String key){
- return (T) registry.get(key);
- }
-
- synchronized boolean isExist(String key) {
- return registry.containsKey(key);
- }
-
- boolean isEmpty(){
- return registry.isEmpty();
- }
-
- public interface EmptyRegistryCallback{
- void emptyCallback();
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java
deleted file mode 100644
index 862d56dc3..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-public interface ICoreAsyncResponseHandler extends ICoreResponseHandler{
-
- /**
- * Core response to incoming message
- * @param message response accepted from protocol
- * @param type type of response
- * @return true if message is final, false otherwise
- */
- boolean onResponse(String message, String type);
-
- /**
- * Core reaction to an event of exception
- * @param e the exception which have been thrown
- */
- void onException(Exception e);
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java
deleted file mode 100644
index 555640dfd..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-public interface ICoreResponseHandler {
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java
deleted file mode 100644
index 996d3d8d2..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-public interface ICoreSyncResponseHandler extends ICoreResponseHandler{
-
- /**
- * Core response to incoming message, should return completed message only
- * @param message response accepted from protocol
- * @param type type of response
- * @return true if message is final, false otherwise
- */
- <T> T onResponse(String message, String type) throws CoreException;
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java
deleted file mode 100644
index 93cf20b3f..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-
-/**
- */
-public interface IInvocationManager {
-
- /**
- * initializes the manager
- * @param prop properties to read from
- * @throws CoreException thrown if madatory fields are not set right
- */
- void init(Properties prop) throws CoreException;
-
- /**
- * handles the flow of an async request
- * @param request the request body
- * @param listener business response handler
- * @param correlationId unique id of the request
- * @param rpcName rpc call name
- * @throws CoreException thrown if the request failed to be sent
- */
- void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException;
-
- /**
- * handles to flow of a sync request
- * @param request the request body
- * @param callback business response handler
- * @param correlationId unique id of the request
- * @param rpcName rpc call name
- * @return the output object to be returned
- * @throws CoreException thrown if the request failed to be sent
- * @throws TimeoutException thrown if timeout has exceeded
- */
- <T> T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException;
-
- /**
- * shuts the invocation manager down.
- * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully
- */
- void shutdown(boolean isForceShutdown);
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java
deleted file mode 100644
index 0f3b81a6f..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-interface ITimeoutHandler {
-
- /**
- * handles timeout event
- */
- void onTimeout();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java
deleted file mode 100644
index 96b06033f..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-interface ITimerService {
-
- /**
- * add a new timeout handler to a request
- * @param correlationID the id of the request
- * @param handler to be called once "timeout' time has arrived
- */
- void add(String correlationID, ITimeoutHandler handler);
-
- /**
- * cancel the timeout handler of a request
- * @param correlationID the id of the request
- */
- void cancel(String correlationID);
-
-
- /**
- * shuts the timer service down immediately
- */
- void shutdown();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java
deleted file mode 100644
index 8179da107..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-
-/**
- * layer for passing requests from API to Core
- */
-class InvocationManager implements IInvocationManager{
-
- protected CoreManager coreManager = null;
-
- InvocationManager(){
- }
-
- public void init(Properties properties) throws CoreException {
- coreManager = new CoreManager(properties);
- }
-
- /**
- *
- * @param request
- * @param businessCallback
- * @param correlationId
- * @param rpcName
- * @throws CoreException
- */
- public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException {
- AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager);
- requestResponseHandler.sendRequest(request, correlationId, rpcName);
- }
-
- public <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException {
- SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager);
- requestResponseHandler.sendRequest(request, correlationId, rpcName);
- T responseObject = (T) requestResponseHandler.getResponse();
- return responseObject;
- }
-
- @Override
- public void shutdown(boolean isForceShutdown) {
- coreManager.shutdown(isForceShutdown);
- }
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java
deleted file mode 100644
index c9face762..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-public abstract class InvocationManagerFactory {
- private static IInvocationManager invocationManager = null;
-
- public static synchronized IInvocationManager getInstance(){
- if(invocationManager == null){
- invocationManager = new InvocationManager();
- }
- return invocationManager;
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java
deleted file mode 100644
index 6fab66bb3..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-/** Helper class for wrapping request/response information.
- */
-public class MessageContext {
-
- /**
- * valid values of type are response/error
- */
- private String type;
-
- /**
- * RPC name
- */
- private String rpc;
-
- /**
- * correlation ID
- */
- private String correlationID;
-
- /**
- * partitioner for message bus usage
- */
- private String partitioner;
-
-
- public String getRpc() {
- return rpc;
- }
-
- public void setRpc(String rpc) {
- this.rpc = rpc;
- }
-
- public String getCorrelationID() {
- return correlationID;
- }
-
- public void setCorrelationID(String correlationID) {
- this.correlationID = correlationID;
- }
-
- public String getPartiton() {
- return partitioner;
- }
-
- public void setPartiton(String partitioner) {
- this.partitioner = partitioner;
- }
-
- public void setType(String type){
- this.type = type;
- }
-
- public String getType(){
- return type;
- }
-
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java
deleted file mode 100644
index 8e05a2974..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-interface RequestResponseHandler {
-
- /**
- * sends request, registers handler of response and start timer.
- * @param request - Request
- * @param corrId - correlation ID
- * @param rpcName - RPC name
- * @throws CoreException - @{@link CoreException}
- */
- void sendRequest(String request, String corrId, String rpcName) throws CoreException;
-
- /**
- * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is
- * still existing in core registry @{@link CoreRegistry}, others timeout was occurred .
- * @param ctx - Message Context @{@link MessageContext}
- * @param response - Response from backend
- */
- void handleResponse(MessageContext ctx, String response);
-
- /**
- * handles timeout event
- */
- void onTimeOut();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java
deleted file mode 100644
index 90b0a9926..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.util.concurrent.TimeoutException;
-
-/** Handles sync requests
- */
-class SyncRequestResponseHandler<T> extends AbstractRequestResponseHandler {
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class);
- private T responseObject = null;
- private CoreException coreException = null;
- private TimeoutException timeoutException = null;
-
- SyncRequestResponseHandler(String corrID,
- ICoreResponseHandler callback,
- CoreManager coreManager){
- super(corrID, callback, coreManager);
- }
-
- /**
- * Calls API callback for getting response object. in case of complete response notifies consumer
- * thread for receiving response
- * @param response - Response
- * @param type - Type of Response
- */
- synchronized void runTask(String response, String type) {
- try {
- responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type);
- } catch (CoreException e) {
- coreException = e;
- }
- if(responseObject != null || coreException != null) {
- notify();
- }
- }
-
-
- /**
- * Returns response. goes sleep until coming either timeout event or complete response
- */
- public synchronized <T> T getResponse() throws CoreException, TimeoutException {
- try{
- if(!isResponseReceived()){
- wait();
- }
- if (coreException != null) {
- throw coreException;
- }
- if ( timeoutException != null) {
- throw timeoutException;
- }
-
- } catch (InterruptedException e) {
- throw new CoreException(e);
- } finally{
- coreManager.unregisterHandler(corrID);
- coreManager.cancelTimer(corrID);
- }
- return (T) responseObject;
- }
-
- /**
- * indicates if a response received
- * @return
- */
- private boolean isResponseReceived() {
- return responseObject != null;
- }
-
- @Override
- public synchronized void onTimeOut() {
- LOG.error("sync response handler on timeout correlation ID <" + corrID + ">.");
- timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID);
- notify();
- }
-
-
-
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java
deleted file mode 100644
index 4ceeb3f08..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/** Responsible to ensure synchronous handling of responses and timouts.
- */
-class TaskQueue implements Runnable{
-
- private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class);
-
- private boolean isShutdown;
-
- synchronized void addTask(Runnable task) throws InterruptedException {
- queue.put(task);
- }
-
- public void run() {
- Runnable task;
- while(!Thread.currentThread().isInterrupted() && !isShutdown){
- try {
- task = queue.take();
- task.run();
- } catch (InterruptedException e) {
- LOG.error("could not take task from queue", e);
- } catch (RuntimeException e) {
- LOG.error("could not run task", e);
- }
- LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown);
- }
- LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process.");
- }
-
- void stopQueue(){
- isShutdown = true;
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java
deleted file mode 100644
index b87349411..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/** Creates a task queue pool that reuses a fixed number of threads.
- * Assigns one thread for each queue.
- */
-class TaskQueueManager {
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class);
- private ExecutorService executorService;
- private final static String DEFAULT_POOL_SIZE = "10";
- private final static String CLIENT_POOL_SIZE = "client.pool.size";
- private TaskQueue[] queues;
- private int poolInt;
-
- TaskQueueManager(Properties properties){
- String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE);
- poolInt = Integer.parseInt(size);
- this.executorService = Executors.newFixedThreadPool(poolInt);
- initTaskQueues();
- }
-
- private void initTaskQueues(){
- queues = new TaskQueue[poolInt];
- for(int i=0; i<poolInt; i++){
- queues[i] = new TaskQueue();
- this.executorService.submit(queues[i]);
- }
- }
-
- void submit(String corrID, Runnable task) throws InterruptedException {
- TaskQueue queue = getTaskQueue(corrID);
- queue.addTask(task);
- }
-
- /**
- * ensures synchronous handling all responses and timeout belongs to same correlation ID
- * @param corrID
- * @return - @{@link TaskQueue}
- */
- private TaskQueue getTaskQueue(String corrID){
- int index = Math.abs(corrID.hashCode()) % poolInt;
- return queues[index];
- }
-
- /**
- * goes over queues for stopping threads
- * @throws InterruptedException
- */
- void stopQueueManager() throws InterruptedException {
- for(int i=0; i<poolInt; i++){
- queues[i].stopQueue();
- queues[i].addTask(new Runnable() {
- @Override
- public void run() {
- /**
- * wake up the queue for stopping thread
- */
- }
- });
- }
- List<Runnable> listTask = executorService.shutdownNow();
- if (!executorService.awaitTermination(6, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- LOG.info("the amount of tasks that never commenced execution " + listTask.size());
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java
deleted file mode 100644
index fa2d0804d..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.core;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.util.List;
-import java.util.concurrent.*;
-
-class TimerServiceImpl implements ITimerService {
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class);
- private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
- private final ConcurrentHashMap<String, Future> timeOutEvents = new ConcurrentHashMap<>();
- private final long responseTimeout;
-
- TimerServiceImpl(long responseTimeout) {
- this.responseTimeout = responseTimeout;
- }
-
- @Override
- public synchronized void cancel(String correlationID) {
- Future timeOutEvent = timeOutEvents.remove(correlationID);
- if (timeOutEvent != null){
- timeOutEvent.cancel(true);
- }
- }
-
- @Override
- public synchronized void add(String correlationID, ITimeoutHandler handler) {
- Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS);
- timeOutEvents.put(correlationID, timeOutEvent);
- }
-
- @Override
- public void shutdown() {
- List<Runnable> listTask = scheduler.shutdownNow();
- LOG.info("the amount of tasks that never commenced execution " + listTask.size());
- }
-
- private class HandleTimeout implements Runnable {
-
- String correlationID;
- ITimeoutHandler handler;
-
- HandleTimeout(String correlationID, ITimeoutHandler handler) {
- this.correlationID = correlationID;
- this.handler = handler;
- }
-
- @Override
- public void run(){
- System.out.println("Timeout event of request " + correlationID);
- handler.onTimeout();
- timeOutEvents.remove(correlationID);
- }
- }
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java
deleted file mode 100644
index a76f0a90b..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import org.onap.appc.client.impl.core.MessageContext;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-
-class APPCMessageReaderWriter implements MessageReader, MessageWriter {
-
- private final ObjectMapper mapper;
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class);
-
- APPCMessageReaderWriter() {
- mapper = new ObjectMapper();
- }
-
- public String read(String payload, MessageContext context) throws ProtocolException {
- try {
- ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class);
- context.setType(protocolMessage.getType());
- context.setRpc(protocolMessage.getRpcName());
- context.setCorrelationID(protocolMessage.getCorrelationID());
- context.setPartiton(protocolMessage.getPartition());
- String body = protocolMessage.getBody().toString();
- LOG.debug("Received body : <" + body + ">");
- return body;
- } catch (IOException e) {
- throw new ProtocolException(e);
- }
-
- }
-
- public String write(String payload, MessageContext context) throws ProtocolException {
- try {
- ProtocolMessage protocolMessage = new ProtocolMessage();
- protocolMessage.setVersion("2.0");
- protocolMessage.setType(context.getType());
- protocolMessage.setRpcName(context.getRpc());
- protocolMessage.setCorrelationID(context.getCorrelationID());
- protocolMessage.setPartition(context.getPartiton());
- JsonNode body = mapper.readTree(payload);
- protocolMessage.setBody(body);
- String message = mapper.writeValueAsString(protocolMessage);
- return message;
- } catch (IOException e) {
- throw new ProtocolException(e);
- }
- }
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java
deleted file mode 100644
index 94d2d6b85..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import org.onap.appc.client.impl.core.MessageContext;
-
-public interface AsyncProtocol extends Protocol {
-
- /**
- * sends a string message to underlying message bus/java API
- * @param payload - meesage body
- * @param context - message headers
- * @throws ProtocolException
- */
- void sendRequest(String payload, MessageContext context) throws ProtocolException;
-
- void shutdown();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java
deleted file mode 100644
index 82626d802..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import org.onap.appc.client.impl.core.MessageContext;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-class AsyncProtocolImpl implements AsyncProtocol {
-
- /**
- * message bus listener thread handler
- */
- private Future listenerHandler;
- /**
- * called when messages are fetched - called for a single message
- */
- private RetrieveMessageCallback callback;
- /**
- * message bus client used to send/fetch
- */
- private MessagingService messageService;
- /**
- * Message reader used to extract body and context from reponse message
- */
- private MessageReader messageReader;
- /**
- * Message writer used to construct meesage from body and context
- */
- private MessageWriter messageWriter;
-
- /**
- * shutdown indicator
- */
- private boolean isShutdown = false;
-
- /**
- * executor service for listener usage
- */
- private ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
-
-
- AsyncProtocolImpl() {
-
- messageService = new UEBMessagingService();
- messageReader = new APPCMessageReaderWriter();
- messageWriter = (MessageWriter) messageReader;
- }
-
- public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
-
- if (callback == null) {
- throw new ProtocolException("Callback param should not be null!");
- }
- this.callback = callback;
-
- try {
- messageService.init(props);
- //get message bus listener thread
- //start the thread after initializing services
- listenerHandler = executorService.submit(new Listener());
- } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
- throw new ProtocolException(e);
- }
- }
-
- public void sendRequest(String payload, MessageContext context) throws ProtocolException {
-
- //get message to be sent to appc from payload and context
- String message = messageWriter.write(payload, context);
- try {
- messageService.send(context.getPartiton(), message);
- LOG.debug("Successfully send message: " + message);
- } catch (IOException e) {
- throw new ProtocolException(e);
- }
- }
-
- @Override
- public void shutdown() {
- isShutdown = true;
- messageService.close();
- LOG.warn("The protocol layer in shutdown stage.");
- executorService.shutdownNow();
- }
-
- public class Listener implements Runnable {
-
-
- public void run() {
-
- while (!isShutdown) {
- List<String> messages = new ArrayList<>();
- try {
- messages = messageService.fetch();
- LOG.debug("Successfully fetched " + messages.size() + " messages");
- } catch (IOException e) {
- LOG.error("Fetching " + messages.size() + " messages failed");
- }
- for (String message : messages) {
-
- MessageContext context = new MessageContext();
- String payload = null;
-
- try {
- //get payload and context from message to be sent to core layer
- payload = messageReader.read(message, context);
- LOG.debug("Got body: " + payload);
- //call core layer response handler
- if(!isShutdown) {
- callback.onResponse(payload, context);
- }else{
- LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
- context.getCorrelationID() + "> response ", message);
- }
- } catch (ProtocolException e) {
- LOG.error("Failed to read message from UEB. message is: " + message);
- }
- }
- }
- }
- }
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java
deleted file mode 100644
index 4765a58ef..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import java.io.IOException;
-import java.util.List;
-
-interface Consumer {
-
- /**
- * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty.
- *
- * @return A list of strings representing the messages pulled from the topic.
- * @throws IOException
- */
- List<String> fetch() throws IOException;
-
- /**
- * Gets a batch of messages from the topic.
- *
- * @param limit The amount of messages to fetch
- * @return A list of strings representing the messages pulled from the topic.
- * @throws IOException
- */
- List<String> fetch(int limit) throws IOException;
-
- /**
- * Send dummy fetch request to register client to be able to fetch messages
- * @throws IOException
- */
- void registerForRead() throws IOException;
-
- void close();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java
deleted file mode 100644
index 913f80f44..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-class ConsumerImpl implements Consumer {
-
- private static final int DEFAULT_LIMIT = 1000;
-
- private Collection<String> hosts;
- private String topic;
- private String group;
- private String groupId;
- private int timeout;
-
- private String authKey;
- private String authSecret;
-
- private CambriaConsumer consumer = null;
-
- /**
- * constructor
- * @param urls
- * @param topicName
- * @param consumerName
- * @param consumerId
- * @param timeout
- */
- public ConsumerImpl(Collection<String> urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
- this.hosts = urls;
- this.topic = topicName;
- this.group = consumerName;
- this.groupId = consumerId;
- this.authKey = apiKey;
- this.authSecret = apiSecret;
- this.timeout = timeout;
- consumer = getConsumer();
- }
-
-
- public List<String> fetch() throws IOException {
-
- return fetch(DEFAULT_LIMIT);
- }
-
- public List<String> fetch(int limit) throws IOException {
-
- List<String> out = new ArrayList<String>();
- try {
- for(String msg : consumer.fetch(timeout,limit)){
- out.add(msg);
- }
- } catch (IOException e) {
- throw e;
- }
- return out;
- }
-
- public void registerForRead() throws IOException {
-
- int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages
- consumer.fetch(waitForRegisteration, 1);
- }
-
- /**
- * init cambria consumer
- * @return CambriaConsumer
- */
- private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
-
- ConsumerBuilder builder = new ConsumerBuilder();
-
- builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId);
- builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout);
- builder.receivingAtMost(DEFAULT_LIMIT);
-
- // Add credentials if provided
- if (authKey != null && authSecret != null) {
-
- Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey");
- apiKeyField.setAccessible(true);
- apiKeyField.set(builder, "");
- builder.authenticatedBy(authKey, authSecret);
- }
-
- return builder.build();
- }
-
- @Override
- public void close() {
- consumer.close();
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java
deleted file mode 100644
index 19688d696..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import org.onap.appc.client.impl.core.MessageContext;
-
-public interface MessageReader {
-
- /**
- * reads payload, fills the context out of payload headers, and returns the body of the payload
- * @param payload incoming message
- * @param context context to fill
- * @return body of the payload
- * @throws ProtocolException
- */
- String read(String payload, MessageContext context) throws ProtocolException;
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java
deleted file mode 100644
index 0849bc4a4..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import org.onap.appc.client.impl.core.MessageContext;
-import com.fasterxml.jackson.databind.JsonNode;
-
-public interface MessageWriter {
-
- /**
- * builds a message out of context and payload
- * @param payload body of the message
- * @param context headers of the message
- * @return the message to write/send
- * @throws ProtocolException
- */
- String write(String payload, MessageContext context) throws ProtocolException;
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java
deleted file mode 100644
index 029378931..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.List;
-import java.util.Properties;
-
-interface MessagingService {
-
- /**
- * initialize consumer/publisher
- * @param props
- */
- void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException;
-
- /**
- * sends a string as is
- * @param partition
- * @param body
- */
- void send(String partition, String body) throws IOException;
-
- /**
- * retrieve messages from bus - timeout extracted from props or see impl
- * @return
- */
- List<String> fetch() throws IOException;
-
- /**
- * retrieve messages from bus - timeout extracted from props or see impl
- * @param limit
- * @return
- */
- List<String> fetch(int limit) throws IOException;
-
- void close();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java
deleted file mode 100644
index f290e8a89..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import java.io.IOException;
-
-interface Producer {
-
- /**
- * send a message to a partition via ueb
- * @param data
- */
- void post(String Partition, String data) throws IOException;
-
- void close();
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java
deleted file mode 100644
index 7729db98d..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Collection;
-
-class ProducerImpl implements Producer {
-
- private Collection<String> hosts;
- private String topic;
- private CambriaBatchingPublisher producer;
-
- private String authKey;
- private String authSecret;
-
- public ProducerImpl(Collection<String> urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException {
-
- topic = topicName;
- hosts = urls;
- authKey = apiKey;
- authSecret = apiSecret;
- producer = getProducer();
- }
-
- public void post(String partition, String data) throws IOException {
-
- producer.send(partition, data);
- }
-
- /**
- * get cambria producer
- * @return
- */
- private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException {
-
- PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts);
-
- // Add credentials if provided
- if (authKey != null && authSecret != null) {
- builder.authenticatedBy(authKey, authSecret);
- }
-
- CambriaBatchingPublisher client = null;
-
- client = builder.onTopic(topic).build();
-
- return client;
- }
-
- @Override
- public void close() {
- producer.close();
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java
deleted file mode 100644
index eaa21d857..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import java.util.Properties;
-
-public interface Protocol {
-
- /**
- * init protocol properties and callback
- * @param props
- * @param callback
- * @throws ProtocolException
- */
- void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException;
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java
deleted file mode 100644
index eb0537b80..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-public class ProtocolException extends Exception {
-
- public ProtocolException() {
- super();
- }
-
- public ProtocolException(String message) {
- super(message);
- }
-
- public ProtocolException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ProtocolException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java
deleted file mode 100644
index 98e7d669b..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProtocolFactory {
-
- private static ProtocolFactory instance;
- private Map<ProtocolType,Protocol> protocols;
-
- /**
- * Singleton factory
- */
- private ProtocolFactory(){
-
- protocols = new HashMap<ProtocolType, Protocol>();
- }
-
- /**
- * get factory instance
- * @return factory instance
- */
- public static synchronized ProtocolFactory getInstance(){
-
- if (instance == null) {
- instance = new ProtocolFactory();
- }
- return instance;
- }
-
- /**
- * returns instantiated protocol object
- * @param type of protocol object
- * @return protocol object
- */
- public Protocol getProtocolObject(ProtocolType type) throws ProtocolException {
-
- Protocol protocol = protocols.get(type);
- synchronized (this) {
- if (protocol == null) {
- switch (type) {
- case SYNC:
- throw new ProtocolException("Protocol SYNC is not implemented");
- case ASYNC:
- protocol = new AsyncProtocolImpl();
- protocols.put(type, protocol);
- break;
- default:
- throw new ProtocolException("Protocol type not found");
- }
- }
- }
- return protocol;
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java
deleted file mode 100644
index c02ea5607..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.JsonNode;
-
-class ProtocolMessage {
-
- private String version;
- private String type;
- private String rpcName;
- private String correlationID; // correlation-id
- private String partition; // cambria.partition
- private JsonNode body;
-
- @JsonProperty
- String getVersion() {
- return version;
- }
-
- @JsonProperty
- void setVersion(String version) {
- this.version = version;
- }
-
- @JsonProperty
- String getType() {
- return type;
- }
-
- @JsonProperty
- void setType(String type) {
- this.type = type;
- }
-
- @JsonProperty("rpc-name")
- String getRpcName() {
- return rpcName;
- }
-
- @JsonProperty("rpc-name")
- void setRpcName(String rpcName) {
- this.rpcName = rpcName;
- }
-
- @JsonProperty("correlation-id")
- String getCorrelationID() {
- return correlationID;
- }
-
- @JsonProperty("correlation-id")
- void setCorrelationID(String correlationID) {
- this.correlationID = correlationID;
- }
-
- @JsonProperty("cambria.partition")
- String getPartition() {
- return partition;
- }
-
- @JsonProperty("cambria.partition")
- void setPartition(String partition) {
- this.partition = partition;
- }
-
- @JsonProperty
- JsonNode getBody() {
- return body;
- }
-
- @JsonProperty
- void setBody(JsonNode body) {
- this.body = body;
- }
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java
deleted file mode 100644
index cc2eca447..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-public enum ProtocolType {
-
- SYNC, ASYNC;
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java
deleted file mode 100644
index 8fc486bb8..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-
-import org.onap.appc.client.impl.core.MessageContext;
-
-public interface RetrieveMessageCallback {
-
- /**
- * called when response received
- * @param payload
- * @param context
- */
- void onResponse(String payload, MessageContext context);
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java
deleted file mode 100644
index df51861b8..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.*;
-
-class UEBMessagingService implements MessagingService {
-
- private Consumer consumer;
- private Producer producer;
-
- private final String DEFAULT_READ_TIMEOUT_MS = "60000";
- private final String DEFAULT_READ_LIMIT = "1000";
-
- private int readLimit;
-
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class);
-
- @SuppressWarnings("Since15")
- public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
-
- if (props != null) {
- String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ);
- String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE);
- String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER);
- String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET);
- String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS);
- Integer readTimeout = Integer.parseInt(readTimeoutString);
- String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT);
- readLimit = Integer.parseInt(readLimitString);
- //get hosts pool
- Collection<String> pool = new HashSet<String>();
- String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS);
- if (hostNames != null && !hostNames.isEmpty()) {
- for (String name : hostNames.split(",")) {
- pool.add(name);
- }
- }
-
- //generate consumer id and group - same value for both
- String consumerName = UUID.randomUUID().toString();
- String consumerID = consumerName;
-
- //create consumer and producer
- consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret);
- producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret);
-
- //initial consumer registration
- try {
- consumer.registerForRead();
- }catch(Exception e){
- LOG.error("Message consumer failed to register client "+consumerID);
- }
- }
- }
-
- public void send(String partition, String body) throws IOException {
- producer.post(partition, body);
- }
-
- public List<String> fetch() throws IOException {
- return consumer.fetch(readLimit);
- }
-
- public List<String> fetch(int limit) throws IOException {
- return consumer.fetch(limit);
- }
-
- @Override
- public void close() {
- consumer.close();
- producer.close();
- }
-
-}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java
deleted file mode 100644
index 5c1916f2b..000000000
--- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.client.impl.protocol;
-
-class UEBPropertiesKeys {
-
- static final String TOPIC_READ = "topic.read";
- static final String TOPIC_READ_TIMEOUT = "topic.read.timeout";
- static final String READ_LIMIT = "topic.read.limit";
- static final String TOPIC_WRITE = "topic.write";
- static final String AUTH_USER = "client.key";
- static final String AUTH_SECRET = "client.secret";
- static final String HOSTS = "poolMembers";
-}