diff options
Diffstat (limited to 'appc-client/client-lib/src/main/java/org/onap')
37 files changed, 2617 insertions, 0 deletions
diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java new file mode 100644 index 000000000..c5d6120f0 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AbstractRequestResponseHandler.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** Abstract request response handler class, responsible for common functionality of + * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler} + */ +abstract class AbstractRequestResponseHandler implements RequestResponseHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class); + ICoreResponseHandler businessCallback; + protected String corrID; + CoreManager coreManager; + + + AbstractRequestResponseHandler(String corrID, + ICoreResponseHandler businessCallback, + CoreManager coreManager) + { + this.businessCallback = businessCallback; + this.corrID = corrID; + this.coreManager = coreManager; + } + + public synchronized void handleResponse(final MessageContext ctx, final String response) { + try { + coreManager.submitTask(ctx.getCorrelationID(), new Runnable() { + @Override + public void run() { + LOG.info("handling response of corrID <" + corrID + ">" + "response " + response); + if(coreManager.isExistHandler(corrID)) { + runTask(response, ctx.getType()); + } + + } + }); + } catch (InterruptedException e) { + LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e); + } + } + + /** + * + * @param response - Response + * @param type - Type of Response + */ + abstract void runTask(String response, String type); + + @Override + public void sendRequest(String request, String corrId, String rpcName) throws CoreException { + if(!coreManager.isShutdownInProgress()) { + coreManager.registerHandler(corrId, this); + coreManager.sendRequest(request, corrId, rpcName); + coreManager.startTimer(corrId); + }else{ + throw new CoreException("Shutdown is in progress. Request will not be handled"); + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java new file mode 100644 index 000000000..1bcf0af99 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/AsyncRequestResponseHandler.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.concurrent.TimeoutException; + +/** Handles async responses + */ +class AsyncRequestResponseHandler extends AbstractRequestResponseHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class); + + AsyncRequestResponseHandler(String corrID, + ICoreResponseHandler businessCallback, + CoreManager coreManager) + { + super(corrID, businessCallback, coreManager); + } + + /** + * Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and + * unregisters the handler. + * @param response - Response + * @param type - Type of Response + */ + public void runTask(String response, String type) { + boolean finalTask = false; + try { + finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type); + } catch (Exception e){ + LOG.error("Error on API layer, for request with correlation-id " + corrID, e); + } + if (finalTask){ + coreManager.cancelTimer(corrID); + coreManager.unregisterHandler(corrID); + } + else{ + response = null; + type = null; + } + } + + /** + * Calls to API layer for sending timeout exception. + */ + @Override + public void onTimeOut() { + LOG.info("timeout for request with correlation-id " + corrID); + ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID)); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java new file mode 100644 index 000000000..a2bf6e250 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreException.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + + +public class CoreException extends Exception { + + public CoreException() { + super(); + } + + public CoreException(String message) { + super(message); + } + + public CoreException(String message, Throwable cause) { + super(message, cause); + } + + public CoreException(Throwable cause) { + super(cause); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java new file mode 100644 index 000000000..c1c23890e --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java @@ -0,0 +1,314 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import org.onap.appc.client.impl.protocol.*; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events. + */ +class CoreManager{ + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class); + private final ProtocolFactory protocolFactory; + protected AsyncProtocol protocol; + private final RetrieveMessageCallback protocolCallback = null; + private final CoreRegistry registry; + private final ITimerService timerService; + private final TaskQueueManager queueManager; + private String DEFAULT_TIMEOUT = "300000"; + private final static String RESPONSE_TIMEOUT = "client.response.timeout"; + private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout"; + private boolean isForceShutdown = false; + private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false); + private long shutdownTimeout; + + CoreManager(Properties prop) throws CoreException { + protocolFactory = ProtocolFactory.getInstance(); + try { + initProtocol(prop); + }catch (ProtocolException e){ + throw new CoreException(e); + } + registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl()); + String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT); + long responseTimeout = Long.parseLong(timeoutProp); + String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT); + shutdownTimeout = Long.parseLong(gracefulTimeout); + timerService = new TimerServiceImpl(responseTimeout); + queueManager = new TaskQueueManager(prop); + listenShutdown(); + } + + /** + * initiates protocol layer services. + * @param prop - Properties + */ + private void initProtocol(Properties prop) throws ProtocolException { + protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC); + protocol.init(prop, getProtocolCallback()); + } + + /** + * Creates protocol response callback + * @return - @{@link ProtocolResponseCallbackImpl} + */ + RetrieveMessageCallback getProtocolCallback(){ + return new ProtocolResponseCallbackImpl(); + } + + /** + * Registers a new handler in registry + * @param corrID - Correlation ID + * @param requestResponseHandler handler to be called when response arrives + */ + void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){ + registry.register(corrID, requestResponseHandler); + } + + /** + * Remove a handler from registry service by correlation ID. + * @param corrID - Correlation ID + * @return - @{@link RequestResponseHandler} + */ + RequestResponseHandler unregisterHandler(String corrID){ + return (RequestResponseHandler) registry.unregister(corrID); + } + + /** + * Checks in registry service if a handler is existing. + * @param corrID - Correlation ID + * @return - boolean + */ + boolean isExistHandler(String corrID) { + return registry.isExist(corrID); + } + + /** + * Starts timer for timeout event when a request was send successfully. + * @param corrID - Correlation ID + */ + void startTimer(String corrID){ + timerService.add(corrID, new TimeoutHandlerImpl(corrID)); + } + + /** + * Cancels timer for fimeout event, in case when complete response was received + * @param corrID + */ + void cancelTimer(String corrID){ + timerService.cancel(corrID); + } + + /** + * Submits a new task to Queue manager. it is using for both response and timeout tasks + * @param corrID - Correlation ID + * @param task - @{@link Runnable} task. + * @throws InterruptedException + */ + void submitTask(String corrID, Runnable task) throws InterruptedException { + queueManager.submit(corrID, task); + } + + /** + * Sends request to protocol. + * @param request - Request + * @param corrId - Correlation ID + * @param rpcName - RPC name + * @throws CoreException - @{@link CoreException} + */ + void sendRequest(String request, String corrId, String rpcName) throws CoreException { + MessageContext ctx = getMessageContext(corrId, rpcName); + try { + protocol.sendRequest(request, ctx); + } catch (ProtocolException e) { + unregisterHandler(corrId); + throw new CoreException(e); + } + } + + /** + * Creates @{@link MessageContext} + * @param correlationId - Correlation ID + * @param rpcName - RPC Name + * @return - @{@link MessageContext} + */ + private MessageContext getMessageContext(String correlationId, String rpcName){ + MessageContext msgCtx = new MessageContext(); + msgCtx.setCorrelationID(correlationId); + msgCtx.setRpc(rpcName); + return msgCtx; + } + + /** + * Implements response callback from protocol and filters responses by correlation ID. + * Only registered events(by correlation ID) will be handled. + */ + private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback { + @Override + public void onResponse(String response, MessageContext context) { + String corrID = context.getCorrelationID(); + if (corrID != null) { + RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID); + if (messageHandler != null) { + LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response); + messageHandler.handleResponse(context, response); + } + } + } + } + + + /** + * listens to @{@link Runtime} shutdown event + */ + private void listenShutdown() { + Runtime.getRuntime().addShutdownHook(new Thread(){ + public void run(){ + gracefulShutdown(); + } + }); + } + + /** + * Implements shutdown for client library. + * @param isForceShutdown - true force shutdown, false graceful shutdown + */ + void shutdown(boolean isForceShutdown){ + if(isForceShutdown){ + forceShutdown(); + }else{ + gracefulShutdown(); + } + } + + /** + * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force + * shutdown only when either all request will be handled or graceful shutdown will be time out. + */ + synchronized void gracefulShutdown(){ + isGracefulShutdown.set(true); + if(registry.isEmpty()){ + forceShutdown(); + } + else{ + try { + LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">"); + wait(shutdownTimeout); + LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">"); + forceShutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + + /** + * Closes Protocol, stops Queue Manager and shutdowns Time Service. + */ + private void forceShutdown(){ + isForceShutdown = true; + try { + LOG.info("Starting shutdown process."); + protocol.shutdown(); + queueManager.stopQueueManager(); + timerService.shutdown(); + } catch (InterruptedException e) { + LOG.info("Client library shutdown in progress ", e); + } + } + + /** + * + * @return - true when shutdown is in process + */ + boolean isShutdownInProgress(){ + return isForceShutdown || isGracefulShutdown.get(); + } + + /** + * Timeout handler implementation. + * This handler is responsible to assign a task for handling of timeout events. + * + */ + private class TimeoutHandlerImpl implements ITimeoutHandler { + + private final String corrID; + + TimeoutHandlerImpl(String corrID) { + this.corrID = corrID; + } + + /** + * When a timeout event is occurring, the new Timeout task will be assigned into a queue, + * this queue is shared between both timeout and handlers which belong to same correlation ID. + */ + @Override + public void onTimeout() { + try { + submitTask(corrID, new Runnable() { + @Override + public void run() { + RequestResponseHandler requestResponseHandler = unregisterHandler(corrID); + if (requestResponseHandler != null) { + requestResponseHandler.onTimeOut(); + } + } + }); + } catch (InterruptedException e) { + LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e); + } + } + } + + + /** + * Wakes Up graceful shutdown. + */ + class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback { + @Override + public synchronized void emptyCallback() { + LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">"); + if(isGracefulShutdown.get()){ + wakeUpShutdown(); + } + } + } + + /** + * wakes up waiting shutdown. + */ + private synchronized void wakeUpShutdown(){ + notifyAll(); + } + +} + diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java new file mode 100644 index 000000000..e0a0c5b34 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreRegistry.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** client lib Registry + */ +class CoreRegistry<T>{ + private Map<String, T> registry = + new ConcurrentHashMap<String, T>(); + + final private EmptyRegistryCallback emptyRegistryCallback; + + + CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){ + this.emptyRegistryCallback = emptyRegistryCallback; + } + + void register(String key, T obj) { + registry.put(key, obj); + } + + <T> T unregister(String key) { + T item = (T) registry.remove(key); + if(registry.isEmpty()) { + emptyRegistryCallback.emptyCallback(); + } + return item; + } + + <T> T get(String key){ + return (T) registry.get(key); + } + + synchronized boolean isExist(String key) { + return registry.containsKey(key); + } + + boolean isEmpty(){ + return registry.isEmpty(); + } + + public interface EmptyRegistryCallback{ + void emptyCallback(); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java new file mode 100644 index 000000000..862d56dc3 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreAsyncResponseHandler.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public interface ICoreAsyncResponseHandler extends ICoreResponseHandler{ + + /** + * Core response to incoming message + * @param message response accepted from protocol + * @param type type of response + * @return true if message is final, false otherwise + */ + boolean onResponse(String message, String type); + + /** + * Core reaction to an event of exception + * @param e the exception which have been thrown + */ + void onException(Exception e); + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java new file mode 100644 index 000000000..555640dfd --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreResponseHandler.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public interface ICoreResponseHandler { +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java new file mode 100644 index 000000000..996d3d8d2 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ICoreSyncResponseHandler.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public interface ICoreSyncResponseHandler extends ICoreResponseHandler{ + + /** + * Core response to incoming message, should return completed message only + * @param message response accepted from protocol + * @param type type of response + * @return true if message is final, false otherwise + */ + <T> T onResponse(String message, String type) throws CoreException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java new file mode 100644 index 000000000..93cf20b3f --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/IInvocationManager.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +/** + */ +public interface IInvocationManager { + + /** + * initializes the manager + * @param prop properties to read from + * @throws CoreException thrown if madatory fields are not set right + */ + void init(Properties prop) throws CoreException; + + /** + * handles the flow of an async request + * @param request the request body + * @param listener business response handler + * @param correlationId unique id of the request + * @param rpcName rpc call name + * @throws CoreException thrown if the request failed to be sent + */ + void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException; + + /** + * handles to flow of a sync request + * @param request the request body + * @param callback business response handler + * @param correlationId unique id of the request + * @param rpcName rpc call name + * @return the output object to be returned + * @throws CoreException thrown if the request failed to be sent + * @throws TimeoutException thrown if timeout has exceeded + */ + <T> T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException; + + /** + * shuts the invocation manager down. + * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully + */ + void shutdown(boolean isForceShutdown); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java new file mode 100644 index 000000000..0f3b81a6f --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimeoutHandler.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +interface ITimeoutHandler { + + /** + * handles timeout event + */ + void onTimeout(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java new file mode 100644 index 000000000..96b06033f --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/ITimerService.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +interface ITimerService { + + /** + * add a new timeout handler to a request + * @param correlationID the id of the request + * @param handler to be called once "timeout' time has arrived + */ + void add(String correlationID, ITimeoutHandler handler); + + /** + * cancel the timeout handler of a request + * @param correlationID the id of the request + */ + void cancel(String correlationID); + + + /** + * shuts the timer service down immediately + */ + void shutdown(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java new file mode 100644 index 000000000..8179da107 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManager.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +/** + * layer for passing requests from API to Core + */ +class InvocationManager implements IInvocationManager{ + + protected CoreManager coreManager = null; + + InvocationManager(){ + } + + public void init(Properties properties) throws CoreException { + coreManager = new CoreManager(properties); + } + + /** + * + * @param request + * @param businessCallback + * @param correlationId + * @param rpcName + * @throws CoreException + */ + public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException { + AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager); + requestResponseHandler.sendRequest(request, correlationId, rpcName); + } + + public <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException { + SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager); + requestResponseHandler.sendRequest(request, correlationId, rpcName); + T responseObject = (T) requestResponseHandler.getResponse(); + return responseObject; + } + + @Override + public void shutdown(boolean isForceShutdown) { + coreManager.shutdown(isForceShutdown); + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java new file mode 100644 index 000000000..c9face762 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/InvocationManagerFactory.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +public abstract class InvocationManagerFactory { + private static IInvocationManager invocationManager = null; + + public static synchronized IInvocationManager getInstance(){ + if(invocationManager == null){ + invocationManager = new InvocationManager(); + } + return invocationManager; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java new file mode 100644 index 000000000..6fab66bb3 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/MessageContext.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +/** Helper class for wrapping request/response information. + */ +public class MessageContext { + + /** + * valid values of type are response/error + */ + private String type; + + /** + * RPC name + */ + private String rpc; + + /** + * correlation ID + */ + private String correlationID; + + /** + * partitioner for message bus usage + */ + private String partitioner; + + + public String getRpc() { + return rpc; + } + + public void setRpc(String rpc) { + this.rpc = rpc; + } + + public String getCorrelationID() { + return correlationID; + } + + public void setCorrelationID(String correlationID) { + this.correlationID = correlationID; + } + + public String getPartiton() { + return partitioner; + } + + public void setPartiton(String partitioner) { + this.partitioner = partitioner; + } + + public void setType(String type){ + this.type = type; + } + + public String getType(){ + return type; + } + + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java new file mode 100644 index 000000000..8e05a2974 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/RequestResponseHandler.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +interface RequestResponseHandler { + + /** + * sends request, registers handler of response and start timer. + * @param request - Request + * @param corrId - correlation ID + * @param rpcName - RPC name + * @throws CoreException - @{@link CoreException} + */ + void sendRequest(String request, String corrId, String rpcName) throws CoreException; + + /** + * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is + * still existing in core registry @{@link CoreRegistry}, others timeout was occurred . + * @param ctx - Message Context @{@link MessageContext} + * @param response - Response from backend + */ + void handleResponse(MessageContext ctx, String response); + + /** + * handles timeout event + */ + void onTimeOut(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java new file mode 100644 index 000000000..90b0a9926 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/SyncRequestResponseHandler.java @@ -0,0 +1,107 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.concurrent.TimeoutException; + +/** Handles sync requests + */ +class SyncRequestResponseHandler<T> extends AbstractRequestResponseHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class); + private T responseObject = null; + private CoreException coreException = null; + private TimeoutException timeoutException = null; + + SyncRequestResponseHandler(String corrID, + ICoreResponseHandler callback, + CoreManager coreManager){ + super(corrID, callback, coreManager); + } + + /** + * Calls API callback for getting response object. in case of complete response notifies consumer + * thread for receiving response + * @param response - Response + * @param type - Type of Response + */ + synchronized void runTask(String response, String type) { + try { + responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type); + } catch (CoreException e) { + coreException = e; + } + if(responseObject != null || coreException != null) { + notify(); + } + } + + + /** + * Returns response. goes sleep until coming either timeout event or complete response + */ + public synchronized <T> T getResponse() throws CoreException, TimeoutException { + try{ + if(!isResponseReceived()){ + wait(); + } + if (coreException != null) { + throw coreException; + } + if ( timeoutException != null) { + throw timeoutException; + } + + } catch (InterruptedException e) { + throw new CoreException(e); + } finally{ + coreManager.unregisterHandler(corrID); + coreManager.cancelTimer(corrID); + } + return (T) responseObject; + } + + /** + * indicates if a response received + * @return + */ + private boolean isResponseReceived() { + return responseObject != null; + } + + @Override + public synchronized void onTimeOut() { + LOG.error("sync response handler on timeout correlation ID <" + corrID + ">."); + timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID); + notify(); + } + + + + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java new file mode 100644 index 000000000..4ceeb3f08 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueue.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** Responsible to ensure synchronous handling of responses and timouts. + */ +class TaskQueue implements Runnable{ + + private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class); + + private boolean isShutdown; + + synchronized void addTask(Runnable task) throws InterruptedException { + queue.put(task); + } + + public void run() { + Runnable task; + while(!Thread.currentThread().isInterrupted() && !isShutdown){ + try { + task = queue.take(); + task.run(); + } catch (InterruptedException e) { + LOG.error("could not take task from queue", e); + } catch (RuntimeException e) { + LOG.error("could not run task", e); + } + LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown); + } + LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process."); + } + + void stopQueue(){ + isShutdown = true; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java new file mode 100644 index 000000000..b87349411 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TaskQueueManager.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** Creates a task queue pool that reuses a fixed number of threads. + * Assigns one thread for each queue. + */ +class TaskQueueManager { + private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class); + private ExecutorService executorService; + private final static String DEFAULT_POOL_SIZE = "10"; + private final static String CLIENT_POOL_SIZE = "client.pool.size"; + private TaskQueue[] queues; + private int poolInt; + + TaskQueueManager(Properties properties){ + String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE); + poolInt = Integer.parseInt(size); + this.executorService = Executors.newFixedThreadPool(poolInt); + initTaskQueues(); + } + + private void initTaskQueues(){ + queues = new TaskQueue[poolInt]; + for(int i=0; i<poolInt; i++){ + queues[i] = new TaskQueue(); + this.executorService.submit(queues[i]); + } + } + + void submit(String corrID, Runnable task) throws InterruptedException { + TaskQueue queue = getTaskQueue(corrID); + queue.addTask(task); + } + + /** + * ensures synchronous handling all responses and timeout belongs to same correlation ID + * @param corrID + * @return - @{@link TaskQueue} + */ + private TaskQueue getTaskQueue(String corrID){ + int index = Math.abs(corrID.hashCode()) % poolInt; + return queues[index]; + } + + /** + * goes over queues for stopping threads + * @throws InterruptedException + */ + void stopQueueManager() throws InterruptedException { + for(int i=0; i<poolInt; i++){ + queues[i].stopQueue(); + queues[i].addTask(new Runnable() { + @Override + public void run() { + /** + * wake up the queue for stopping thread + */ + } + }); + } + List<Runnable> listTask = executorService.shutdownNow(); + if (!executorService.awaitTermination(6, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + LOG.info("the amount of tasks that never commenced execution " + listTask.size()); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java new file mode 100644 index 000000000..fa2d0804d --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/TimerServiceImpl.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.core; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.util.List; +import java.util.concurrent.*; + +class TimerServiceImpl implements ITimerService { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<String, Future> timeOutEvents = new ConcurrentHashMap<>(); + private final long responseTimeout; + + TimerServiceImpl(long responseTimeout) { + this.responseTimeout = responseTimeout; + } + + @Override + public synchronized void cancel(String correlationID) { + Future timeOutEvent = timeOutEvents.remove(correlationID); + if (timeOutEvent != null){ + timeOutEvent.cancel(true); + } + } + + @Override + public synchronized void add(String correlationID, ITimeoutHandler handler) { + Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS); + timeOutEvents.put(correlationID, timeOutEvent); + } + + @Override + public void shutdown() { + List<Runnable> listTask = scheduler.shutdownNow(); + LOG.info("the amount of tasks that never commenced execution " + listTask.size()); + } + + private class HandleTimeout implements Runnable { + + String correlationID; + ITimeoutHandler handler; + + HandleTimeout(String correlationID, ITimeoutHandler handler) { + this.correlationID = correlationID; + this.handler = handler; + } + + @Override + public void run(){ + System.out.println("Timeout event of request " + correlationID); + handler.onTimeout(); + timeOutEvents.remove(correlationID); + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java new file mode 100644 index 000000000..a76f0a90b --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/APPCMessageReaderWriter.java @@ -0,0 +1,77 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +class APPCMessageReaderWriter implements MessageReader, MessageWriter { + + private final ObjectMapper mapper; + private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class); + + APPCMessageReaderWriter() { + mapper = new ObjectMapper(); + } + + public String read(String payload, MessageContext context) throws ProtocolException { + try { + ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class); + context.setType(protocolMessage.getType()); + context.setRpc(protocolMessage.getRpcName()); + context.setCorrelationID(protocolMessage.getCorrelationID()); + context.setPartiton(protocolMessage.getPartition()); + String body = protocolMessage.getBody().toString(); + LOG.debug("Received body : <" + body + ">"); + return body; + } catch (IOException e) { + throw new ProtocolException(e); + } + + } + + public String write(String payload, MessageContext context) throws ProtocolException { + try { + ProtocolMessage protocolMessage = new ProtocolMessage(); + protocolMessage.setVersion("2.0"); + protocolMessage.setType(context.getType()); + protocolMessage.setRpcName(context.getRpc()); + protocolMessage.setCorrelationID(context.getCorrelationID()); + protocolMessage.setPartition(context.getPartiton()); + JsonNode body = mapper.readTree(payload); + protocolMessage.setBody(body); + String message = mapper.writeValueAsString(protocolMessage); + return message; + } catch (IOException e) { + throw new ProtocolException(e); + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java new file mode 100644 index 000000000..94d2d6b85 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocol.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; + +public interface AsyncProtocol extends Protocol { + + /** + * sends a string message to underlying message bus/java API + * @param payload - meesage body + * @param context - message headers + * @throws ProtocolException + */ + void sendRequest(String payload, MessageContext context) throws ProtocolException; + + void shutdown(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java new file mode 100644 index 000000000..82626d802 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java @@ -0,0 +1,157 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +class AsyncProtocolImpl implements AsyncProtocol { + + /** + * message bus listener thread handler + */ + private Future listenerHandler; + /** + * called when messages are fetched - called for a single message + */ + private RetrieveMessageCallback callback; + /** + * message bus client used to send/fetch + */ + private MessagingService messageService; + /** + * Message reader used to extract body and context from reponse message + */ + private MessageReader messageReader; + /** + * Message writer used to construct meesage from body and context + */ + private MessageWriter messageWriter; + + /** + * shutdown indicator + */ + private boolean isShutdown = false; + + /** + * executor service for listener usage + */ + private ExecutorService executorService = Executors.newSingleThreadExecutor(); + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class); + + + AsyncProtocolImpl() { + + messageService = new UEBMessagingService(); + messageReader = new APPCMessageReaderWriter(); + messageWriter = (MessageWriter) messageReader; + } + + public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException { + + if (callback == null) { + throw new ProtocolException("Callback param should not be null!"); + } + this.callback = callback; + + try { + messageService.init(props); + //get message bus listener thread + //start the thread after initializing services + listenerHandler = executorService.submit(new Listener()); + } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) { + throw new ProtocolException(e); + } + } + + public void sendRequest(String payload, MessageContext context) throws ProtocolException { + + //get message to be sent to appc from payload and context + String message = messageWriter.write(payload, context); + try { + messageService.send(context.getPartiton(), message); + LOG.debug("Successfully send message: " + message); + } catch (IOException e) { + throw new ProtocolException(e); + } + } + + @Override + public void shutdown() { + isShutdown = true; + messageService.close(); + LOG.warn("The protocol layer in shutdown stage."); + executorService.shutdownNow(); + } + + public class Listener implements Runnable { + + + public void run() { + + while (!isShutdown) { + List<String> messages = new ArrayList<>(); + try { + messages = messageService.fetch(); + LOG.debug("Successfully fetched " + messages.size() + " messages"); + } catch (IOException e) { + LOG.error("Fetching " + messages.size() + " messages failed"); + } + for (String message : messages) { + + MessageContext context = new MessageContext(); + String payload = null; + + try { + //get payload and context from message to be sent to core layer + payload = messageReader.read(message, context); + LOG.debug("Got body: " + payload); + //call core layer response handler + if(!isShutdown) { + callback.onResponse(payload, context); + }else{ + LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" + + context.getCorrelationID() + "> response ", message); + } + } catch (ProtocolException e) { + LOG.error("Failed to read message from UEB. message is: " + message); + } + } + } + } + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java new file mode 100644 index 000000000..4765a58ef --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Consumer.java @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.io.IOException; +import java.util.List; + +interface Consumer { + + /** + * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. + * + * @return A list of strings representing the messages pulled from the topic. + * @throws IOException + */ + List<String> fetch() throws IOException; + + /** + * Gets a batch of messages from the topic. + * + * @param limit The amount of messages to fetch + * @return A list of strings representing the messages pulled from the topic. + * @throws IOException + */ + List<String> fetch(int limit) throws IOException; + + /** + * Send dummy fetch request to register client to be able to fetch messages + * @throws IOException + */ + void registerForRead() throws IOException; + + void close(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java new file mode 100644 index 000000000..913f80f44 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ConsumerImpl.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +class ConsumerImpl implements Consumer { + + private static final int DEFAULT_LIMIT = 1000; + + private Collection<String> hosts; + private String topic; + private String group; + private String groupId; + private int timeout; + + private String authKey; + private String authSecret; + + private CambriaConsumer consumer = null; + + /** + * constructor + * @param urls + * @param topicName + * @param consumerName + * @param consumerId + * @param timeout + */ + public ConsumerImpl(Collection<String> urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + this.hosts = urls; + this.topic = topicName; + this.group = consumerName; + this.groupId = consumerId; + this.authKey = apiKey; + this.authSecret = apiSecret; + this.timeout = timeout; + consumer = getConsumer(); + } + + + public List<String> fetch() throws IOException { + + return fetch(DEFAULT_LIMIT); + } + + public List<String> fetch(int limit) throws IOException { + + List<String> out = new ArrayList<String>(); + try { + for(String msg : consumer.fetch(timeout,limit)){ + out.add(msg); + } + } catch (IOException e) { + throw e; + } + return out; + } + + public void registerForRead() throws IOException { + + int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages + consumer.fetch(waitForRegisteration, 1); + } + + /** + * init cambria consumer + * @return CambriaConsumer + */ + private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + + ConsumerBuilder builder = new ConsumerBuilder(); + + builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId); + builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout); + builder.receivingAtMost(DEFAULT_LIMIT); + + // Add credentials if provided + if (authKey != null && authSecret != null) { + + Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey"); + apiKeyField.setAccessible(true); + apiKeyField.set(builder, ""); + builder.authenticatedBy(authKey, authSecret); + } + + return builder.build(); + } + + @Override + public void close() { + consumer.close(); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java new file mode 100644 index 000000000..19688d696 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageReader.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; + +public interface MessageReader { + + /** + * reads payload, fills the context out of payload headers, and returns the body of the payload + * @param payload incoming message + * @param context context to fill + * @return body of the payload + * @throws ProtocolException + */ + String read(String payload, MessageContext context) throws ProtocolException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java new file mode 100644 index 000000000..0849bc4a4 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessageWriter.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import org.onap.appc.client.impl.core.MessageContext; +import com.fasterxml.jackson.databind.JsonNode; + +public interface MessageWriter { + + /** + * builds a message out of context and payload + * @param payload body of the message + * @param context headers of the message + * @return the message to write/send + * @throws ProtocolException + */ + String write(String payload, MessageContext context) throws ProtocolException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java new file mode 100644 index 000000000..029378931 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/MessagingService.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Properties; + +interface MessagingService { + + /** + * initialize consumer/publisher + * @param props + */ + void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException; + + /** + * sends a string as is + * @param partition + * @param body + */ + void send(String partition, String body) throws IOException; + + /** + * retrieve messages from bus - timeout extracted from props or see impl + * @return + */ + List<String> fetch() throws IOException; + + /** + * retrieve messages from bus - timeout extracted from props or see impl + * @param limit + * @return + */ + List<String> fetch(int limit) throws IOException; + + void close(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java new file mode 100644 index 000000000..f290e8a89 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Producer.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.io.IOException; + +interface Producer { + + /** + * send a message to a partition via ueb + * @param data + */ + void post(String Partition, String data) throws IOException; + + void close(); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java new file mode 100644 index 000000000..7729db98d --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProducerImpl.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.Collection; + +class ProducerImpl implements Producer { + + private Collection<String> hosts; + private String topic; + private CambriaBatchingPublisher producer; + + private String authKey; + private String authSecret; + + public ProducerImpl(Collection<String> urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException { + + topic = topicName; + hosts = urls; + authKey = apiKey; + authSecret = apiSecret; + producer = getProducer(); + } + + public void post(String partition, String data) throws IOException { + + producer.send(partition, data); + } + + /** + * get cambria producer + * @return + */ + private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException { + + PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts); + + // Add credentials if provided + if (authKey != null && authSecret != null) { + builder.authenticatedBy(authKey, authSecret); + } + + CambriaBatchingPublisher client = null; + + client = builder.onTopic(topic).build(); + + return client; + } + + @Override + public void close() { + producer.close(); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java new file mode 100644 index 000000000..eaa21d857 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/Protocol.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.util.Properties; + +public interface Protocol { + + /** + * init protocol properties and callback + * @param props + * @param callback + * @throws ProtocolException + */ + void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java new file mode 100644 index 000000000..eb0537b80 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolException.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +public class ProtocolException extends Exception { + + public ProtocolException() { + super(); + } + + public ProtocolException(String message) { + super(message); + } + + public ProtocolException(String message, Throwable cause) { + super(message, cause); + } + + public ProtocolException(Throwable cause) { + super(cause); + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java new file mode 100644 index 000000000..98e7d669b --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolFactory.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import java.util.HashMap; +import java.util.Map; + +public class ProtocolFactory { + + private static ProtocolFactory instance; + private Map<ProtocolType,Protocol> protocols; + + /** + * Singleton factory + */ + private ProtocolFactory(){ + + protocols = new HashMap<ProtocolType, Protocol>(); + } + + /** + * get factory instance + * @return factory instance + */ + public static synchronized ProtocolFactory getInstance(){ + + if (instance == null) { + instance = new ProtocolFactory(); + } + return instance; + } + + /** + * returns instantiated protocol object + * @param type of protocol object + * @return protocol object + */ + public Protocol getProtocolObject(ProtocolType type) throws ProtocolException { + + Protocol protocol = protocols.get(type); + synchronized (this) { + if (protocol == null) { + switch (type) { + case SYNC: + throw new ProtocolException("Protocol SYNC is not implemented"); + case ASYNC: + protocol = new AsyncProtocolImpl(); + protocols.put(type, protocol); + break; + default: + throw new ProtocolException("Protocol type not found"); + } + } + } + return protocol; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java new file mode 100644 index 000000000..c02ea5607 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolMessage.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +class ProtocolMessage { + + private String version; + private String type; + private String rpcName; + private String correlationID; // correlation-id + private String partition; // cambria.partition + private JsonNode body; + + @JsonProperty + String getVersion() { + return version; + } + + @JsonProperty + void setVersion(String version) { + this.version = version; + } + + @JsonProperty + String getType() { + return type; + } + + @JsonProperty + void setType(String type) { + this.type = type; + } + + @JsonProperty("rpc-name") + String getRpcName() { + return rpcName; + } + + @JsonProperty("rpc-name") + void setRpcName(String rpcName) { + this.rpcName = rpcName; + } + + @JsonProperty("correlation-id") + String getCorrelationID() { + return correlationID; + } + + @JsonProperty("correlation-id") + void setCorrelationID(String correlationID) { + this.correlationID = correlationID; + } + + @JsonProperty("cambria.partition") + String getPartition() { + return partition; + } + + @JsonProperty("cambria.partition") + void setPartition(String partition) { + this.partition = partition; + } + + @JsonProperty + JsonNode getBody() { + return body; + } + + @JsonProperty + void setBody(JsonNode body) { + this.body = body; + } +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java new file mode 100644 index 000000000..cc2eca447 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/ProtocolType.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +public enum ProtocolType { + + SYNC, ASYNC; +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java new file mode 100644 index 000000000..8fc486bb8 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/RetrieveMessageCallback.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + + +import org.onap.appc.client.impl.core.MessageContext; + +public interface RetrieveMessageCallback { + + /** + * called when response received + * @param payload + * @param context + */ + void onResponse(String payload, MessageContext context); +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java new file mode 100644 index 000000000..df51861b8 --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java @@ -0,0 +1,102 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.*; + +class UEBMessagingService implements MessagingService { + + private Consumer consumer; + private Producer producer; + + private final String DEFAULT_READ_TIMEOUT_MS = "60000"; + private final String DEFAULT_READ_LIMIT = "1000"; + + private int readLimit; + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class); + + @SuppressWarnings("Since15") + public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException { + + if (props != null) { + String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ); + String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE); + String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER); + String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET); + String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS); + Integer readTimeout = Integer.parseInt(readTimeoutString); + String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT); + readLimit = Integer.parseInt(readLimitString); + //get hosts pool + Collection<String> pool = new HashSet<String>(); + String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS); + if (hostNames != null && !hostNames.isEmpty()) { + for (String name : hostNames.split(",")) { + pool.add(name); + } + } + + //generate consumer id and group - same value for both + String consumerName = UUID.randomUUID().toString(); + String consumerID = consumerName; + + //create consumer and producer + consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret); + producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret); + + //initial consumer registration + try { + consumer.registerForRead(); + }catch(Exception e){ + LOG.error("Message consumer failed to register client "+consumerID); + } + } + } + + public void send(String partition, String body) throws IOException { + producer.post(partition, body); + } + + public List<String> fetch() throws IOException { + return consumer.fetch(readLimit); + } + + public List<String> fetch(int limit) throws IOException { + return consumer.fetch(limit); + } + + @Override + public void close() { + consumer.close(); + producer.close(); + } + +} diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java new file mode 100644 index 000000000..5c1916f2b --- /dev/null +++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBPropertiesKeys.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.client.impl.protocol; + +class UEBPropertiesKeys { + + static final String TOPIC_READ = "topic.read"; + static final String TOPIC_READ_TIMEOUT = "topic.read.timeout"; + static final String READ_LIMIT = "topic.read.limit"; + static final String TOPIC_WRITE = "topic.write"; + static final String AUTH_USER = "client.key"; + static final String AUTH_SECRET = "client.secret"; + static final String HOSTS = "poolMembers"; +} |