diff options
Diffstat (limited to 'appc-client/client-lib/src/main/java/org/openecomp')
37 files changed, 0 insertions, 2617 deletions
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java deleted file mode 100644 index c5d6120f0..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -/** Abstract request response handler class, responsible for common functionality of - * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler} - */ -abstract class AbstractRequestResponseHandler implements RequestResponseHandler { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class); - ICoreResponseHandler businessCallback; - protected String corrID; - CoreManager coreManager; - - - AbstractRequestResponseHandler(String corrID, - ICoreResponseHandler businessCallback, - CoreManager coreManager) - { - this.businessCallback = businessCallback; - this.corrID = corrID; - this.coreManager = coreManager; - } - - public synchronized void handleResponse(final MessageContext ctx, final String response) { - try { - coreManager.submitTask(ctx.getCorrelationID(), new Runnable() { - @Override - public void run() { - LOG.info("handling response of corrID <" + corrID + ">" + "response " + response); - if(coreManager.isExistHandler(corrID)) { - runTask(response, ctx.getType()); - } - - } - }); - } catch (InterruptedException e) { - LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e); - } - } - - /** - * - * @param response - Response - * @param type - Type of Response - */ - abstract void runTask(String response, String type); - - @Override - public void sendRequest(String request, String corrId, String rpcName) throws CoreException { - if(!coreManager.isShutdownInProgress()) { - coreManager.registerHandler(corrId, this); - coreManager.sendRequest(request, corrId, rpcName); - coreManager.startTimer(corrId); - }else{ - throw new CoreException("Shutdown is in progress. Request will not be handled"); - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java deleted file mode 100644 index 1bcf0af99..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.concurrent.TimeoutException; - -/** Handles async responses - */ -class AsyncRequestResponseHandler extends AbstractRequestResponseHandler { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class); - - AsyncRequestResponseHandler(String corrID, - ICoreResponseHandler businessCallback, - CoreManager coreManager) - { - super(corrID, businessCallback, coreManager); - } - - /** - * Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and - * unregisters the handler. - * @param response - Response - * @param type - Type of Response - */ - public void runTask(String response, String type) { - boolean finalTask = false; - try { - finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type); - } catch (Exception e){ - LOG.error("Error on API layer, for request with correlation-id " + corrID, e); - } - if (finalTask){ - coreManager.cancelTimer(corrID); - coreManager.unregisterHandler(corrID); - } - else{ - response = null; - type = null; - } - } - - /** - * Calls to API layer for sending timeout exception. - */ - @Override - public void onTimeOut() { - LOG.info("timeout for request with correlation-id " + corrID); - ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID)); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java deleted file mode 100644 index a2bf6e250..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - - -public class CoreException extends Exception { - - public CoreException() { - super(); - } - - public CoreException(String message) { - super(message); - } - - public CoreException(String message, Throwable cause) { - super(message, cause); - } - - public CoreException(Throwable cause) { - super(cause); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java deleted file mode 100644 index c1c23890e..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java +++ /dev/null @@ -1,314 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import org.onap.appc.client.impl.protocol.*; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events. - */ -class CoreManager{ - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class); - private final ProtocolFactory protocolFactory; - protected AsyncProtocol protocol; - private final RetrieveMessageCallback protocolCallback = null; - private final CoreRegistry registry; - private final ITimerService timerService; - private final TaskQueueManager queueManager; - private String DEFAULT_TIMEOUT = "300000"; - private final static String RESPONSE_TIMEOUT = "client.response.timeout"; - private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout"; - private boolean isForceShutdown = false; - private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false); - private long shutdownTimeout; - - CoreManager(Properties prop) throws CoreException { - protocolFactory = ProtocolFactory.getInstance(); - try { - initProtocol(prop); - }catch (ProtocolException e){ - throw new CoreException(e); - } - registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl()); - String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT); - long responseTimeout = Long.parseLong(timeoutProp); - String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT); - shutdownTimeout = Long.parseLong(gracefulTimeout); - timerService = new TimerServiceImpl(responseTimeout); - queueManager = new TaskQueueManager(prop); - listenShutdown(); - } - - /** - * initiates protocol layer services. - * @param prop - Properties - */ - private void initProtocol(Properties prop) throws ProtocolException { - protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC); - protocol.init(prop, getProtocolCallback()); - } - - /** - * Creates protocol response callback - * @return - @{@link ProtocolResponseCallbackImpl} - */ - RetrieveMessageCallback getProtocolCallback(){ - return new ProtocolResponseCallbackImpl(); - } - - /** - * Registers a new handler in registry - * @param corrID - Correlation ID - * @param requestResponseHandler handler to be called when response arrives - */ - void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){ - registry.register(corrID, requestResponseHandler); - } - - /** - * Remove a handler from registry service by correlation ID. - * @param corrID - Correlation ID - * @return - @{@link RequestResponseHandler} - */ - RequestResponseHandler unregisterHandler(String corrID){ - return (RequestResponseHandler) registry.unregister(corrID); - } - - /** - * Checks in registry service if a handler is existing. - * @param corrID - Correlation ID - * @return - boolean - */ - boolean isExistHandler(String corrID) { - return registry.isExist(corrID); - } - - /** - * Starts timer for timeout event when a request was send successfully. - * @param corrID - Correlation ID - */ - void startTimer(String corrID){ - timerService.add(corrID, new TimeoutHandlerImpl(corrID)); - } - - /** - * Cancels timer for fimeout event, in case when complete response was received - * @param corrID - */ - void cancelTimer(String corrID){ - timerService.cancel(corrID); - } - - /** - * Submits a new task to Queue manager. it is using for both response and timeout tasks - * @param corrID - Correlation ID - * @param task - @{@link Runnable} task. - * @throws InterruptedException - */ - void submitTask(String corrID, Runnable task) throws InterruptedException { - queueManager.submit(corrID, task); - } - - /** - * Sends request to protocol. - * @param request - Request - * @param corrId - Correlation ID - * @param rpcName - RPC name - * @throws CoreException - @{@link CoreException} - */ - void sendRequest(String request, String corrId, String rpcName) throws CoreException { - MessageContext ctx = getMessageContext(corrId, rpcName); - try { - protocol.sendRequest(request, ctx); - } catch (ProtocolException e) { - unregisterHandler(corrId); - throw new CoreException(e); - } - } - - /** - * Creates @{@link MessageContext} - * @param correlationId - Correlation ID - * @param rpcName - RPC Name - * @return - @{@link MessageContext} - */ - private MessageContext getMessageContext(String correlationId, String rpcName){ - MessageContext msgCtx = new MessageContext(); - msgCtx.setCorrelationID(correlationId); - msgCtx.setRpc(rpcName); - return msgCtx; - } - - /** - * Implements response callback from protocol and filters responses by correlation ID. - * Only registered events(by correlation ID) will be handled. - */ - private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback { - @Override - public void onResponse(String response, MessageContext context) { - String corrID = context.getCorrelationID(); - if (corrID != null) { - RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID); - if (messageHandler != null) { - LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response); - messageHandler.handleResponse(context, response); - } - } - } - } - - - /** - * listens to @{@link Runtime} shutdown event - */ - private void listenShutdown() { - Runtime.getRuntime().addShutdownHook(new Thread(){ - public void run(){ - gracefulShutdown(); - } - }); - } - - /** - * Implements shutdown for client library. - * @param isForceShutdown - true force shutdown, false graceful shutdown - */ - void shutdown(boolean isForceShutdown){ - if(isForceShutdown){ - forceShutdown(); - }else{ - gracefulShutdown(); - } - } - - /** - * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force - * shutdown only when either all request will be handled or graceful shutdown will be time out. - */ - synchronized void gracefulShutdown(){ - isGracefulShutdown.set(true); - if(registry.isEmpty()){ - forceShutdown(); - } - else{ - try { - LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">"); - wait(shutdownTimeout); - LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">"); - forceShutdown(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - } - } - - /** - * Closes Protocol, stops Queue Manager and shutdowns Time Service. - */ - private void forceShutdown(){ - isForceShutdown = true; - try { - LOG.info("Starting shutdown process."); - protocol.shutdown(); - queueManager.stopQueueManager(); - timerService.shutdown(); - } catch (InterruptedException e) { - LOG.info("Client library shutdown in progress ", e); - } - } - - /** - * - * @return - true when shutdown is in process - */ - boolean isShutdownInProgress(){ - return isForceShutdown || isGracefulShutdown.get(); - } - - /** - * Timeout handler implementation. - * This handler is responsible to assign a task for handling of timeout events. - * - */ - private class TimeoutHandlerImpl implements ITimeoutHandler { - - private final String corrID; - - TimeoutHandlerImpl(String corrID) { - this.corrID = corrID; - } - - /** - * When a timeout event is occurring, the new Timeout task will be assigned into a queue, - * this queue is shared between both timeout and handlers which belong to same correlation ID. - */ - @Override - public void onTimeout() { - try { - submitTask(corrID, new Runnable() { - @Override - public void run() { - RequestResponseHandler requestResponseHandler = unregisterHandler(corrID); - if (requestResponseHandler != null) { - requestResponseHandler.onTimeOut(); - } - } - }); - } catch (InterruptedException e) { - LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e); - } - } - } - - - /** - * Wakes Up graceful shutdown. - */ - class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback { - @Override - public synchronized void emptyCallback() { - LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">"); - if(isGracefulShutdown.get()){ - wakeUpShutdown(); - } - } - } - - /** - * wakes up waiting shutdown. - */ - private synchronized void wakeUpShutdown(){ - notifyAll(); - } - -} - diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java deleted file mode 100644 index e0a0c5b34..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java +++ /dev/null @@ -1,70 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** client lib Registry - */ -class CoreRegistry<T>{ - private Map<String, T> registry = - new ConcurrentHashMap<String, T>(); - - final private EmptyRegistryCallback emptyRegistryCallback; - - - CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){ - this.emptyRegistryCallback = emptyRegistryCallback; - } - - void register(String key, T obj) { - registry.put(key, obj); - } - - <T> T unregister(String key) { - T item = (T) registry.remove(key); - if(registry.isEmpty()) { - emptyRegistryCallback.emptyCallback(); - } - return item; - } - - <T> T get(String key){ - return (T) registry.get(key); - } - - synchronized boolean isExist(String key) { - return registry.containsKey(key); - } - - boolean isEmpty(){ - return registry.isEmpty(); - } - - public interface EmptyRegistryCallback{ - void emptyCallback(); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java deleted file mode 100644 index 862d56dc3..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public interface ICoreAsyncResponseHandler extends ICoreResponseHandler{ - - /** - * Core response to incoming message - * @param message response accepted from protocol - * @param type type of response - * @return true if message is final, false otherwise - */ - boolean onResponse(String message, String type); - - /** - * Core reaction to an event of exception - * @param e the exception which have been thrown - */ - void onException(Exception e); - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java deleted file mode 100644 index 555640dfd..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public interface ICoreResponseHandler { -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java deleted file mode 100644 index 996d3d8d2..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public interface ICoreSyncResponseHandler extends ICoreResponseHandler{ - - /** - * Core response to incoming message, should return completed message only - * @param message response accepted from protocol - * @param type type of response - * @return true if message is final, false otherwise - */ - <T> T onResponse(String message, String type) throws CoreException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java deleted file mode 100644 index 93cf20b3f..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import java.util.Properties; -import java.util.concurrent.TimeoutException; - -/** - */ -public interface IInvocationManager { - - /** - * initializes the manager - * @param prop properties to read from - * @throws CoreException thrown if madatory fields are not set right - */ - void init(Properties prop) throws CoreException; - - /** - * handles the flow of an async request - * @param request the request body - * @param listener business response handler - * @param correlationId unique id of the request - * @param rpcName rpc call name - * @throws CoreException thrown if the request failed to be sent - */ - void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException; - - /** - * handles to flow of a sync request - * @param request the request body - * @param callback business response handler - * @param correlationId unique id of the request - * @param rpcName rpc call name - * @return the output object to be returned - * @throws CoreException thrown if the request failed to be sent - * @throws TimeoutException thrown if timeout has exceeded - */ - <T> T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException; - - /** - * shuts the invocation manager down. - * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully - */ - void shutdown(boolean isForceShutdown); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java deleted file mode 100644 index 0f3b81a6f..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -interface ITimeoutHandler { - - /** - * handles timeout event - */ - void onTimeout(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java deleted file mode 100644 index 96b06033f..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java +++ /dev/null @@ -1,47 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -interface ITimerService { - - /** - * add a new timeout handler to a request - * @param correlationID the id of the request - * @param handler to be called once "timeout' time has arrived - */ - void add(String correlationID, ITimeoutHandler handler); - - /** - * cancel the timeout handler of a request - * @param correlationID the id of the request - */ - void cancel(String correlationID); - - - /** - * shuts the timer service down immediately - */ - void shutdown(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java deleted file mode 100644 index 8179da107..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java +++ /dev/null @@ -1,69 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import java.util.Properties; -import java.util.concurrent.TimeoutException; - -/** - * layer for passing requests from API to Core - */ -class InvocationManager implements IInvocationManager{ - - protected CoreManager coreManager = null; - - InvocationManager(){ - } - - public void init(Properties properties) throws CoreException { - coreManager = new CoreManager(properties); - } - - /** - * - * @param request - * @param businessCallback - * @param correlationId - * @param rpcName - * @throws CoreException - */ - public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException { - AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager); - requestResponseHandler.sendRequest(request, correlationId, rpcName); - } - - public <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException { - SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager); - requestResponseHandler.sendRequest(request, correlationId, rpcName); - T responseObject = (T) requestResponseHandler.getResponse(); - return responseObject; - } - - @Override - public void shutdown(boolean isForceShutdown) { - coreManager.shutdown(isForceShutdown); - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java deleted file mode 100644 index c9face762..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -public abstract class InvocationManagerFactory { - private static IInvocationManager invocationManager = null; - - public static synchronized IInvocationManager getInstance(){ - if(invocationManager == null){ - invocationManager = new InvocationManager(); - } - return invocationManager; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java deleted file mode 100644 index 6fab66bb3..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java +++ /dev/null @@ -1,85 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -/** Helper class for wrapping request/response information. - */ -public class MessageContext { - - /** - * valid values of type are response/error - */ - private String type; - - /** - * RPC name - */ - private String rpc; - - /** - * correlation ID - */ - private String correlationID; - - /** - * partitioner for message bus usage - */ - private String partitioner; - - - public String getRpc() { - return rpc; - } - - public void setRpc(String rpc) { - this.rpc = rpc; - } - - public String getCorrelationID() { - return correlationID; - } - - public void setCorrelationID(String correlationID) { - this.correlationID = correlationID; - } - - public String getPartiton() { - return partitioner; - } - - public void setPartiton(String partitioner) { - this.partitioner = partitioner; - } - - public void setType(String type){ - this.type = type; - } - - public String getType(){ - return type; - } - - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java deleted file mode 100644 index 8e05a2974..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -interface RequestResponseHandler { - - /** - * sends request, registers handler of response and start timer. - * @param request - Request - * @param corrId - correlation ID - * @param rpcName - RPC name - * @throws CoreException - @{@link CoreException} - */ - void sendRequest(String request, String corrId, String rpcName) throws CoreException; - - /** - * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is - * still existing in core registry @{@link CoreRegistry}, others timeout was occurred . - * @param ctx - Message Context @{@link MessageContext} - * @param response - Response from backend - */ - void handleResponse(MessageContext ctx, String response); - - /** - * handles timeout event - */ - void onTimeOut(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java deleted file mode 100644 index 90b0a9926..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.concurrent.TimeoutException; - -/** Handles sync requests - */ -class SyncRequestResponseHandler<T> extends AbstractRequestResponseHandler { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class); - private T responseObject = null; - private CoreException coreException = null; - private TimeoutException timeoutException = null; - - SyncRequestResponseHandler(String corrID, - ICoreResponseHandler callback, - CoreManager coreManager){ - super(corrID, callback, coreManager); - } - - /** - * Calls API callback for getting response object. in case of complete response notifies consumer - * thread for receiving response - * @param response - Response - * @param type - Type of Response - */ - synchronized void runTask(String response, String type) { - try { - responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type); - } catch (CoreException e) { - coreException = e; - } - if(responseObject != null || coreException != null) { - notify(); - } - } - - - /** - * Returns response. goes sleep until coming either timeout event or complete response - */ - public synchronized <T> T getResponse() throws CoreException, TimeoutException { - try{ - if(!isResponseReceived()){ - wait(); - } - if (coreException != null) { - throw coreException; - } - if ( timeoutException != null) { - throw timeoutException; - } - - } catch (InterruptedException e) { - throw new CoreException(e); - } finally{ - coreManager.unregisterHandler(corrID); - coreManager.cancelTimer(corrID); - } - return (T) responseObject; - } - - /** - * indicates if a response received - * @return - */ - private boolean isResponseReceived() { - return responseObject != null; - } - - @Override - public synchronized void onTimeOut() { - LOG.error("sync response handler on timeout correlation ID <" + corrID + ">."); - timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID); - notify(); - } - - - - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java deleted file mode 100644 index 4ceeb3f08..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** Responsible to ensure synchronous handling of responses and timouts. - */ -class TaskQueue implements Runnable{ - - private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); - private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class); - - private boolean isShutdown; - - synchronized void addTask(Runnable task) throws InterruptedException { - queue.put(task); - } - - public void run() { - Runnable task; - while(!Thread.currentThread().isInterrupted() && !isShutdown){ - try { - task = queue.take(); - task.run(); - } catch (InterruptedException e) { - LOG.error("could not take task from queue", e); - } catch (RuntimeException e) { - LOG.error("could not run task", e); - } - LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown); - } - LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process."); - } - - void stopQueue(){ - isShutdown = true; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java deleted file mode 100644 index b87349411..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** Creates a task queue pool that reuses a fixed number of threads. - * Assigns one thread for each queue. - */ -class TaskQueueManager { - private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class); - private ExecutorService executorService; - private final static String DEFAULT_POOL_SIZE = "10"; - private final static String CLIENT_POOL_SIZE = "client.pool.size"; - private TaskQueue[] queues; - private int poolInt; - - TaskQueueManager(Properties properties){ - String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE); - poolInt = Integer.parseInt(size); - this.executorService = Executors.newFixedThreadPool(poolInt); - initTaskQueues(); - } - - private void initTaskQueues(){ - queues = new TaskQueue[poolInt]; - for(int i=0; i<poolInt; i++){ - queues[i] = new TaskQueue(); - this.executorService.submit(queues[i]); - } - } - - void submit(String corrID, Runnable task) throws InterruptedException { - TaskQueue queue = getTaskQueue(corrID); - queue.addTask(task); - } - - /** - * ensures synchronous handling all responses and timeout belongs to same correlation ID - * @param corrID - * @return - @{@link TaskQueue} - */ - private TaskQueue getTaskQueue(String corrID){ - int index = Math.abs(corrID.hashCode()) % poolInt; - return queues[index]; - } - - /** - * goes over queues for stopping threads - * @throws InterruptedException - */ - void stopQueueManager() throws InterruptedException { - for(int i=0; i<poolInt; i++){ - queues[i].stopQueue(); - queues[i].addTask(new Runnable() { - @Override - public void run() { - /** - * wake up the queue for stopping thread - */ - } - }); - } - List<Runnable> listTask = executorService.shutdownNow(); - if (!executorService.awaitTermination(6, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - LOG.info("the amount of tasks that never commenced execution " + listTask.size()); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java deleted file mode 100644 index fa2d0804d..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.core; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.util.List; -import java.util.concurrent.*; - -class TimerServiceImpl implements ITimerService { - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private final ConcurrentHashMap<String, Future> timeOutEvents = new ConcurrentHashMap<>(); - private final long responseTimeout; - - TimerServiceImpl(long responseTimeout) { - this.responseTimeout = responseTimeout; - } - - @Override - public synchronized void cancel(String correlationID) { - Future timeOutEvent = timeOutEvents.remove(correlationID); - if (timeOutEvent != null){ - timeOutEvent.cancel(true); - } - } - - @Override - public synchronized void add(String correlationID, ITimeoutHandler handler) { - Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS); - timeOutEvents.put(correlationID, timeOutEvent); - } - - @Override - public void shutdown() { - List<Runnable> listTask = scheduler.shutdownNow(); - LOG.info("the amount of tasks that never commenced execution " + listTask.size()); - } - - private class HandleTimeout implements Runnable { - - String correlationID; - ITimeoutHandler handler; - - HandleTimeout(String correlationID, ITimeoutHandler handler) { - this.correlationID = correlationID; - this.handler = handler; - } - - @Override - public void run(){ - System.out.println("Timeout event of request " + correlationID); - handler.onTimeout(); - timeOutEvents.remove(correlationID); - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java deleted file mode 100644 index a76f0a90b..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java +++ /dev/null @@ -1,77 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -class APPCMessageReaderWriter implements MessageReader, MessageWriter { - - private final ObjectMapper mapper; - private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class); - - APPCMessageReaderWriter() { - mapper = new ObjectMapper(); - } - - public String read(String payload, MessageContext context) throws ProtocolException { - try { - ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class); - context.setType(protocolMessage.getType()); - context.setRpc(protocolMessage.getRpcName()); - context.setCorrelationID(protocolMessage.getCorrelationID()); - context.setPartiton(protocolMessage.getPartition()); - String body = protocolMessage.getBody().toString(); - LOG.debug("Received body : <" + body + ">"); - return body; - } catch (IOException e) { - throw new ProtocolException(e); - } - - } - - public String write(String payload, MessageContext context) throws ProtocolException { - try { - ProtocolMessage protocolMessage = new ProtocolMessage(); - protocolMessage.setVersion("2.0"); - protocolMessage.setType(context.getType()); - protocolMessage.setRpcName(context.getRpc()); - protocolMessage.setCorrelationID(context.getCorrelationID()); - protocolMessage.setPartition(context.getPartiton()); - JsonNode body = mapper.readTree(payload); - protocolMessage.setBody(body); - String message = mapper.writeValueAsString(protocolMessage); - return message; - } catch (IOException e) { - throw new ProtocolException(e); - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java deleted file mode 100644 index 94d2d6b85..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java +++ /dev/null @@ -1,40 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; - -public interface AsyncProtocol extends Protocol { - - /** - * sends a string message to underlying message bus/java API - * @param payload - meesage body - * @param context - message headers - * @throws ProtocolException - */ - void sendRequest(String payload, MessageContext context) throws ProtocolException; - - void shutdown(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java deleted file mode 100644 index 82626d802..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java +++ /dev/null @@ -1,157 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -class AsyncProtocolImpl implements AsyncProtocol { - - /** - * message bus listener thread handler - */ - private Future listenerHandler; - /** - * called when messages are fetched - called for a single message - */ - private RetrieveMessageCallback callback; - /** - * message bus client used to send/fetch - */ - private MessagingService messageService; - /** - * Message reader used to extract body and context from reponse message - */ - private MessageReader messageReader; - /** - * Message writer used to construct meesage from body and context - */ - private MessageWriter messageWriter; - - /** - * shutdown indicator - */ - private boolean isShutdown = false; - - /** - * executor service for listener usage - */ - private ExecutorService executorService = Executors.newSingleThreadExecutor(); - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class); - - - AsyncProtocolImpl() { - - messageService = new UEBMessagingService(); - messageReader = new APPCMessageReaderWriter(); - messageWriter = (MessageWriter) messageReader; - } - - public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException { - - if (callback == null) { - throw new ProtocolException("Callback param should not be null!"); - } - this.callback = callback; - - try { - messageService.init(props); - //get message bus listener thread - //start the thread after initializing services - listenerHandler = executorService.submit(new Listener()); - } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) { - throw new ProtocolException(e); - } - } - - public void sendRequest(String payload, MessageContext context) throws ProtocolException { - - //get message to be sent to appc from payload and context - String message = messageWriter.write(payload, context); - try { - messageService.send(context.getPartiton(), message); - LOG.debug("Successfully send message: " + message); - } catch (IOException e) { - throw new ProtocolException(e); - } - } - - @Override - public void shutdown() { - isShutdown = true; - messageService.close(); - LOG.warn("The protocol layer in shutdown stage."); - executorService.shutdownNow(); - } - - public class Listener implements Runnable { - - - public void run() { - - while (!isShutdown) { - List<String> messages = new ArrayList<>(); - try { - messages = messageService.fetch(); - LOG.debug("Successfully fetched " + messages.size() + " messages"); - } catch (IOException e) { - LOG.error("Fetching " + messages.size() + " messages failed"); - } - for (String message : messages) { - - MessageContext context = new MessageContext(); - String payload = null; - - try { - //get payload and context from message to be sent to core layer - payload = messageReader.read(message, context); - LOG.debug("Got body: " + payload); - //call core layer response handler - if(!isShutdown) { - callback.onResponse(payload, context); - }else{ - LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" + - context.getCorrelationID() + "> response ", message); - } - } catch (ProtocolException e) { - LOG.error("Failed to read message from UEB. message is: " + message); - } - } - } - } - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java deleted file mode 100644 index 4765a58ef..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java +++ /dev/null @@ -1,56 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.io.IOException; -import java.util.List; - -interface Consumer { - - /** - * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. - * - * @return A list of strings representing the messages pulled from the topic. - * @throws IOException - */ - List<String> fetch() throws IOException; - - /** - * Gets a batch of messages from the topic. - * - * @param limit The amount of messages to fetch - * @return A list of strings representing the messages pulled from the topic. - * @throws IOException - */ - List<String> fetch(int limit) throws IOException; - - /** - * Send dummy fetch request to register client to be able to fetch messages - * @throws IOException - */ - void registerForRead() throws IOException; - - void close(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java deleted file mode 100644 index 913f80f44..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java +++ /dev/null @@ -1,125 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -class ConsumerImpl implements Consumer { - - private static final int DEFAULT_LIMIT = 1000; - - private Collection<String> hosts; - private String topic; - private String group; - private String groupId; - private int timeout; - - private String authKey; - private String authSecret; - - private CambriaConsumer consumer = null; - - /** - * constructor - * @param urls - * @param topicName - * @param consumerName - * @param consumerId - * @param timeout - */ - public ConsumerImpl(Collection<String> urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - this.hosts = urls; - this.topic = topicName; - this.group = consumerName; - this.groupId = consumerId; - this.authKey = apiKey; - this.authSecret = apiSecret; - this.timeout = timeout; - consumer = getConsumer(); - } - - - public List<String> fetch() throws IOException { - - return fetch(DEFAULT_LIMIT); - } - - public List<String> fetch(int limit) throws IOException { - - List<String> out = new ArrayList<String>(); - try { - for(String msg : consumer.fetch(timeout,limit)){ - out.add(msg); - } - } catch (IOException e) { - throw e; - } - return out; - } - - public void registerForRead() throws IOException { - - int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages - consumer.fetch(waitForRegisteration, 1); - } - - /** - * init cambria consumer - * @return CambriaConsumer - */ - private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - - ConsumerBuilder builder = new ConsumerBuilder(); - - builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId); - builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout); - builder.receivingAtMost(DEFAULT_LIMIT); - - // Add credentials if provided - if (authKey != null && authSecret != null) { - - Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey"); - apiKeyField.setAccessible(true); - apiKeyField.set(builder, ""); - builder.authenticatedBy(authKey, authSecret); - } - - return builder.build(); - } - - @Override - public void close() { - consumer.close(); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java deleted file mode 100644 index 19688d696..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java +++ /dev/null @@ -1,39 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; - -public interface MessageReader { - - /** - * reads payload, fills the context out of payload headers, and returns the body of the payload - * @param payload incoming message - * @param context context to fill - * @return body of the payload - * @throws ProtocolException - */ - String read(String payload, MessageContext context) throws ProtocolException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java deleted file mode 100644 index 0849bc4a4..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java +++ /dev/null @@ -1,40 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import org.onap.appc.client.impl.core.MessageContext; -import com.fasterxml.jackson.databind.JsonNode; - -public interface MessageWriter { - - /** - * builds a message out of context and payload - * @param payload body of the message - * @param context headers of the message - * @return the message to write/send - * @throws ProtocolException - */ - String write(String payload, MessageContext context) throws ProtocolException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java deleted file mode 100644 index 029378931..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.List; -import java.util.Properties; - -interface MessagingService { - - /** - * initialize consumer/publisher - * @param props - */ - void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException; - - /** - * sends a string as is - * @param partition - * @param body - */ - void send(String partition, String body) throws IOException; - - /** - * retrieve messages from bus - timeout extracted from props or see impl - * @return - */ - List<String> fetch() throws IOException; - - /** - * retrieve messages from bus - timeout extracted from props or see impl - * @param limit - * @return - */ - List<String> fetch(int limit) throws IOException; - - void close(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java deleted file mode 100644 index f290e8a89..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.io.IOException; - -interface Producer { - - /** - * send a message to a partition via ueb - * @param data - */ - void post(String Partition, String data) throws IOException; - - void close(); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java deleted file mode 100644 index 7729db98d..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Collection; - -class ProducerImpl implements Producer { - - private Collection<String> hosts; - private String topic; - private CambriaBatchingPublisher producer; - - private String authKey; - private String authSecret; - - public ProducerImpl(Collection<String> urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException { - - topic = topicName; - hosts = urls; - authKey = apiKey; - authSecret = apiSecret; - producer = getProducer(); - } - - public void post(String partition, String data) throws IOException { - - producer.send(partition, data); - } - - /** - * get cambria producer - * @return - */ - private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException { - - PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts); - - // Add credentials if provided - if (authKey != null && authSecret != null) { - builder.authenticatedBy(authKey, authSecret); - } - - CambriaBatchingPublisher client = null; - - client = builder.onTopic(topic).build(); - - return client; - } - - @Override - public void close() { - producer.close(); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java deleted file mode 100644 index eaa21d857..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.util.Properties; - -public interface Protocol { - - /** - * init protocol properties and callback - * @param props - * @param callback - * @throws ProtocolException - */ - void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java deleted file mode 100644 index eb0537b80..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java +++ /dev/null @@ -1,44 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -public class ProtocolException extends Exception { - - public ProtocolException() { - super(); - } - - public ProtocolException(String message) { - super(message); - } - - public ProtocolException(String message, Throwable cause) { - super(message, cause); - } - - public ProtocolException(Throwable cause) { - super(cause); - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java deleted file mode 100644 index 98e7d669b..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import java.util.HashMap; -import java.util.Map; - -public class ProtocolFactory { - - private static ProtocolFactory instance; - private Map<ProtocolType,Protocol> protocols; - - /** - * Singleton factory - */ - private ProtocolFactory(){ - - protocols = new HashMap<ProtocolType, Protocol>(); - } - - /** - * get factory instance - * @return factory instance - */ - public static synchronized ProtocolFactory getInstance(){ - - if (instance == null) { - instance = new ProtocolFactory(); - } - return instance; - } - - /** - * returns instantiated protocol object - * @param type of protocol object - * @return protocol object - */ - public Protocol getProtocolObject(ProtocolType type) throws ProtocolException { - - Protocol protocol = protocols.get(type); - synchronized (this) { - if (protocol == null) { - switch (type) { - case SYNC: - throw new ProtocolException("Protocol SYNC is not implemented"); - case ASYNC: - protocol = new AsyncProtocolImpl(); - protocols.put(type, protocol); - break; - default: - throw new ProtocolException("Protocol type not found"); - } - } - } - return protocol; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java deleted file mode 100644 index c02ea5607..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java +++ /dev/null @@ -1,98 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.JsonNode; - -class ProtocolMessage { - - private String version; - private String type; - private String rpcName; - private String correlationID; // correlation-id - private String partition; // cambria.partition - private JsonNode body; - - @JsonProperty - String getVersion() { - return version; - } - - @JsonProperty - void setVersion(String version) { - this.version = version; - } - - @JsonProperty - String getType() { - return type; - } - - @JsonProperty - void setType(String type) { - this.type = type; - } - - @JsonProperty("rpc-name") - String getRpcName() { - return rpcName; - } - - @JsonProperty("rpc-name") - void setRpcName(String rpcName) { - this.rpcName = rpcName; - } - - @JsonProperty("correlation-id") - String getCorrelationID() { - return correlationID; - } - - @JsonProperty("correlation-id") - void setCorrelationID(String correlationID) { - this.correlationID = correlationID; - } - - @JsonProperty("cambria.partition") - String getPartition() { - return partition; - } - - @JsonProperty("cambria.partition") - void setPartition(String partition) { - this.partition = partition; - } - - @JsonProperty - JsonNode getBody() { - return body; - } - - @JsonProperty - void setBody(JsonNode body) { - this.body = body; - } -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java deleted file mode 100644 index cc2eca447..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java +++ /dev/null @@ -1,30 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -public enum ProtocolType { - - SYNC, ASYNC; -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java deleted file mode 100644 index 8fc486bb8..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - - -import org.onap.appc.client.impl.core.MessageContext; - -public interface RetrieveMessageCallback { - - /** - * called when response received - * @param payload - * @param context - */ - void onResponse(String payload, MessageContext context); -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java deleted file mode 100644 index df51861b8..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java +++ /dev/null @@ -1,102 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.*; - -class UEBMessagingService implements MessagingService { - - private Consumer consumer; - private Producer producer; - - private final String DEFAULT_READ_TIMEOUT_MS = "60000"; - private final String DEFAULT_READ_LIMIT = "1000"; - - private int readLimit; - - private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class); - - @SuppressWarnings("Since15") - public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { - - if (props != null) { - String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ); - String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE); - String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER); - String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET); - String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS); - Integer readTimeout = Integer.parseInt(readTimeoutString); - String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT); - readLimit = Integer.parseInt(readLimitString); - //get hosts pool - Collection<String> pool = new HashSet<String>(); - String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS); - if (hostNames != null && !hostNames.isEmpty()) { - for (String name : hostNames.split(",")) { - pool.add(name); - } - } - - //generate consumer id and group - same value for both - String consumerName = UUID.randomUUID().toString(); - String consumerID = consumerName; - - //create consumer and producer - consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret); - producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret); - - //initial consumer registration - try { - consumer.registerForRead(); - }catch(Exception e){ - LOG.error("Message consumer failed to register client "+consumerID); - } - } - } - - public void send(String partition, String body) throws IOException { - producer.post(partition, body); - } - - public List<String> fetch() throws IOException { - return consumer.fetch(readLimit); - } - - public List<String> fetch(int limit) throws IOException { - return consumer.fetch(limit); - } - - @Override - public void close() { - consumer.close(); - producer.close(); - } - -} diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java deleted file mode 100644 index 5c1916f2b..000000000 --- a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.client.impl.protocol; - -class UEBPropertiesKeys { - - static final String TOPIC_READ = "topic.read"; - static final String TOPIC_READ_TIMEOUT = "topic.read.timeout"; - static final String READ_LIMIT = "topic.read.limit"; - static final String TOPIC_WRITE = "topic.write"; - static final String AUTH_USER = "client.key"; - static final String AUTH_SECRET = "client.secret"; - static final String HOSTS = "poolMembers"; -} |