From 781b1a6df324419c846c84ea983c18fc8362bfd3 Mon Sep 17 00:00:00 2001 From: Patrick Brady Date: Wed, 13 Dec 2017 11:19:06 -0800 Subject: Third part of onap rename This part of the commit changes the folder structure on all other folders of appc. Change-Id: I8acfa11cdfcdcd36be0e137245d1dd7324f1abd3 Signed-off-by: Patrick Brady Issue-ID: APPC-13 --- .../appc/executionqueue/ExecutionQueueService.java | 35 ++++++ .../executionqueue/MessageExpirationListener.java | 29 +++++ .../org/onap/appc/executionqueue/helper/Util.java | 101 ++++++++++++++++ .../impl/ExecutionQueueServiceFactory.java | 38 ++++++ .../impl/ExecutionQueueServiceImpl.java | 88 ++++++++++++++ .../appc/executionqueue/impl/QueueManager.java | 127 +++++++++++++++++++++ .../executionqueue/impl/object/QueueMessage.java | 46 ++++++++ .../appc/executionqueue/ExecutionQueueService.java | 35 ------ .../executionqueue/MessageExpirationListener.java | 29 ----- .../openecomp/appc/executionqueue/helper/Util.java | 101 ---------------- .../impl/ExecutionQueueServiceFactory.java | 38 ------ .../impl/ExecutionQueueServiceImpl.java | 88 -------------- .../appc/executionqueue/impl/QueueManager.java | 127 --------------------- .../executionqueue/impl/object/QueueMessage.java | 46 -------- 14 files changed, 464 insertions(+), 464 deletions(-) create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/MessageExpirationListener.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main') diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java new file mode 100644 index 000000000..1423962ef --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue; + +import java.util.concurrent.TimeUnit; + +import org.onap.appc.exceptions.APPCException; + +public interface ExecutionQueueService { + void putMessage(M message) throws APPCException; + void putMessage(M message, long timeout, TimeUnit unit) throws APPCException; + void registerMessageExpirationListener(MessageExpirationListener listener); +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/MessageExpirationListener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/MessageExpirationListener.java new file mode 100644 index 000000000..c1f3592c6 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/MessageExpirationListener.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue; + +public interface MessageExpirationListener { + void onMessageExpiration(M message); +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java new file mode 100644 index 000000000..58279e8a7 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue.helper; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.onap.appc.configuration.Configuration; +import org.onap.appc.configuration.ConfigurationFactory; +import org.onap.appc.executionqueue.impl.QueueManager; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class Util { + + private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); + private final int default_queue_size = 10; + private final int default_threadpool_size = 10; + private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; + private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; + + private Configuration configuration; + + /** + * Initialization. + *

Used by blueprint. + */ + public void init() { + + configuration = ConfigurationFactory.getConfiguration(); + } + + public int getExecutionQueueSize() { + String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size)); + + int size = default_queue_size; + try { + size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); + } + + return size; + } + + public int getThreadPoolSize() { + String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size)); + + int size = default_threadpool_size; + try { + size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " + + e.getMessage(), e); + } + + return size; + } + + public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) { + return new ThreadFactory() { + private final String THREAD_NAME_PATTERN = "%s-%d"; + private final ThreadFactory factory = Executors.defaultThreadFactory(); + private final AtomicInteger counter = new AtomicInteger(); + + public Thread newThread(Runnable r) { + Thread t = factory.newThread(r); + t.setDaemon(isDaemon); + if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { + final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter + .incrementAndGet()); + t.setName(threadName); + } + return t; + } + }; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java new file mode 100644 index 000000000..f071be6f0 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue.impl; + +import org.onap.appc.executionqueue.ExecutionQueueService; + +public class ExecutionQueueServiceFactory { + + private static class ExecutionQueueServiceHolder { + public static final ExecutionQueueService executionQueueService = new ExecutionQueueServiceImpl(); + } + + public static ExecutionQueueService getExecutionQueueService() { + return ExecutionQueueServiceHolder.executionQueueService; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java new file mode 100644 index 000000000..0634a0eb2 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -0,0 +1,88 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue.impl; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.onap.appc.exceptions.APPCException; +import org.onap.appc.executionqueue.ExecutionQueueService; +import org.onap.appc.executionqueue.MessageExpirationListener; +import org.onap.appc.executionqueue.impl.object.QueueMessage; + +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +public class ExecutionQueueServiceImpl implements ExecutionQueueService { + + private static final EELFLogger logger = + EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + + private QueueManager queueManager; + + public ExecutionQueueServiceImpl() { + //do nothing + } + + /** + * Injected by blueprint + * + * @param queueManager queue manager to be set + */ + public void setQueueManager(QueueManager queueManager) { + this.queueManager = queueManager; + } + + @Override + public void putMessage(M message) throws APPCException { + this.putMessage(message, -1, null); + } + + @Override + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { + Instant expirationTime = calculateExpirationTime(timeout, unit); + boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); + if (!enqueueTask) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); + throw new APPCException("Failed to put message in queue"); + } + } + + @Override + public void registerMessageExpirationListener(MessageExpirationListener listener) { + queueManager.setListener(listener); + } + + private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { + if (timeToLive > 0 && unit != null) { + // as of Java 8, there is no built-in conversion method from + // TimeUnit to ChronoUnit; do it manually + return Instant.now().plusMillis(unit.toMillis(timeToLive)); + } else { + // never expires + return Instant.MAX; + } + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java new file mode 100644 index 000000000..db0e3d4c5 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java @@ -0,0 +1,127 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue.impl; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.onap.appc.executionqueue.MessageExpirationListener; +import org.onap.appc.executionqueue.helper.Util; +import org.onap.appc.executionqueue.impl.object.QueueMessage; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class QueueManager { + + private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); + + private MessageExpirationListener listener; + private ExecutorService messageExecutor; + private int max_thread_size; + private int max_queue_size; + private Util executionQueueUtil; + + public QueueManager() { + //do nothing + } + + /** + * Initialization method used by blueprint + */ + public void init() { + max_thread_size = executionQueueUtil.getThreadPoolSize(); + max_queue_size = executionQueueUtil.getExecutionQueueSize(); + messageExecutor = new ThreadPoolExecutor( + max_thread_size, + max_thread_size, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(max_queue_size), + executionQueueUtil.getThreadFactory(true, "appc-dispatcher"), + new ThreadPoolExecutor.AbortPolicy()); + } + + /** + * Destory method used by blueprint + */ + public void stop() { + // Disable new tasks from being submitted + messageExecutor.shutdown(); + List rejectedRunnables = messageExecutor.shutdownNow(); + logger.info(String.format("Rejected %d waiting tasks include ", rejectedRunnables.size())); + + try { + messageExecutor.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + while (!messageExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) { + logger.debug("QueueManager is being shut down because it still has threads not interrupted"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + messageExecutor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + public void setListener(MessageExpirationListener listener) { + this.listener = listener; + } + + /** + * Injected by blueprint + * + * @param executionQueueUtil Util to be set + */ + public void setExecutionQueueUtil(Util executionQueueUtil) { + this.executionQueueUtil = executionQueueUtil; + } + + public boolean enqueueTask(QueueMessage queueMessage) { + boolean isEnqueued = true; + try { + messageExecutor.execute(() -> { + if (queueMessage.isExpired()) { + logger.debug("Message expired " + queueMessage.getMessage()); + if (listener != null) { + listener.onMessageExpiration(queueMessage.getMessage()); + } else { + logger.warn("Listener not available for expired message "); + } + } else { + queueMessage.getMessage().run(); + } + }); + } catch (RejectedExecutionException ree) { + isEnqueued = false; + } + + return isEnqueued; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java new file mode 100644 index 000000000..bb48da7e5 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue.impl.object; + +import java.time.Instant; +import java.util.Objects; + + +public class QueueMessage { + private final M message; + private final Instant expirationTime; + public QueueMessage(M message, Instant expirationTime){ + this.message = message; + this.expirationTime = Objects.requireNonNull(expirationTime); + } + + public M getMessage() { + return message; + } + + public boolean isExpired() { + return expirationTime.isBefore(Instant.now()); + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java deleted file mode 100644 index 1423962ef..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java +++ /dev/null @@ -1,35 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue; - -import java.util.concurrent.TimeUnit; - -import org.onap.appc.exceptions.APPCException; - -public interface ExecutionQueueService { - void putMessage(M message) throws APPCException; - void putMessage(M message, long timeout, TimeUnit unit) throws APPCException; - void registerMessageExpirationListener(MessageExpirationListener listener); -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java deleted file mode 100644 index c1f3592c6..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue; - -public interface MessageExpirationListener { - void onMessageExpiration(M message); -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java deleted file mode 100644 index 58279e8a7..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java +++ /dev/null @@ -1,101 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.helper; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.executionqueue.impl.QueueManager; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public class Util { - - private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); - private final int default_queue_size = 10; - private final int default_threadpool_size = 10; - private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; - private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; - - private Configuration configuration; - - /** - * Initialization. - *

Used by blueprint. - */ - public void init() { - - configuration = ConfigurationFactory.getConfiguration(); - } - - public int getExecutionQueueSize() { - String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size)); - - int size = default_queue_size; - try { - size = Integer.parseInt(sizeStr); - } catch (NumberFormatException e) { - logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); - } - - return size; - } - - public int getThreadPoolSize() { - String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size)); - - int size = default_threadpool_size; - try { - size = Integer.parseInt(sizeStr); - } catch (NumberFormatException e) { - logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " - + e.getMessage(), e); - } - - return size; - } - - public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) { - return new ThreadFactory() { - private final String THREAD_NAME_PATTERN = "%s-%d"; - private final ThreadFactory factory = Executors.defaultThreadFactory(); - private final AtomicInteger counter = new AtomicInteger(); - - public Thread newThread(Runnable r) { - Thread t = factory.newThread(r); - t.setDaemon(isDaemon); - if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { - final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter - .incrementAndGet()); - t.setName(threadName); - } - return t; - } - }; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java deleted file mode 100644 index f071be6f0..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.impl; - -import org.onap.appc.executionqueue.ExecutionQueueService; - -public class ExecutionQueueServiceFactory { - - private static class ExecutionQueueServiceHolder { - public static final ExecutionQueueService executionQueueService = new ExecutionQueueServiceImpl(); - } - - public static ExecutionQueueService getExecutionQueueService() { - return ExecutionQueueServiceHolder.executionQueueService; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java deleted file mode 100644 index 0634a0eb2..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ /dev/null @@ -1,88 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.impl; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.onap.appc.exceptions.APPCException; -import org.onap.appc.executionqueue.ExecutionQueueService; -import org.onap.appc.executionqueue.MessageExpirationListener; -import org.onap.appc.executionqueue.impl.object.QueueMessage; - -import java.time.Instant; -import java.util.concurrent.TimeUnit; - -public class ExecutionQueueServiceImpl implements ExecutionQueueService { - - private static final EELFLogger logger = - EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); - - private QueueManager queueManager; - - public ExecutionQueueServiceImpl() { - //do nothing - } - - /** - * Injected by blueprint - * - * @param queueManager queue manager to be set - */ - public void setQueueManager(QueueManager queueManager) { - this.queueManager = queueManager; - } - - @Override - public void putMessage(M message) throws APPCException { - this.putMessage(message, -1, null); - } - - @Override - public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { - Instant expirationTime = calculateExpirationTime(timeout, unit); - boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); - if (!enqueueTask) { - logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); - throw new APPCException("Failed to put message in queue"); - } - } - - @Override - public void registerMessageExpirationListener(MessageExpirationListener listener) { - queueManager.setListener(listener); - } - - private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { - if (timeToLive > 0 && unit != null) { - // as of Java 8, there is no built-in conversion method from - // TimeUnit to ChronoUnit; do it manually - return Instant.now().plusMillis(unit.toMillis(timeToLive)); - } else { - // never expires - return Instant.MAX; - } - } - -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java deleted file mode 100644 index db0e3d4c5..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java +++ /dev/null @@ -1,127 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.impl; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.onap.appc.executionqueue.MessageExpirationListener; -import org.onap.appc.executionqueue.helper.Util; -import org.onap.appc.executionqueue.impl.object.QueueMessage; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class QueueManager { - - private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); - - private MessageExpirationListener listener; - private ExecutorService messageExecutor; - private int max_thread_size; - private int max_queue_size; - private Util executionQueueUtil; - - public QueueManager() { - //do nothing - } - - /** - * Initialization method used by blueprint - */ - public void init() { - max_thread_size = executionQueueUtil.getThreadPoolSize(); - max_queue_size = executionQueueUtil.getExecutionQueueSize(); - messageExecutor = new ThreadPoolExecutor( - max_thread_size, - max_thread_size, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(max_queue_size), - executionQueueUtil.getThreadFactory(true, "appc-dispatcher"), - new ThreadPoolExecutor.AbortPolicy()); - } - - /** - * Destory method used by blueprint - */ - public void stop() { - // Disable new tasks from being submitted - messageExecutor.shutdown(); - List rejectedRunnables = messageExecutor.shutdownNow(); - logger.info(String.format("Rejected %d waiting tasks include ", rejectedRunnables.size())); - - try { - messageExecutor.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - while (!messageExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) { - logger.debug("QueueManager is being shut down because it still has threads not interrupted"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - messageExecutor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - public void setListener(MessageExpirationListener listener) { - this.listener = listener; - } - - /** - * Injected by blueprint - * - * @param executionQueueUtil Util to be set - */ - public void setExecutionQueueUtil(Util executionQueueUtil) { - this.executionQueueUtil = executionQueueUtil; - } - - public boolean enqueueTask(QueueMessage queueMessage) { - boolean isEnqueued = true; - try { - messageExecutor.execute(() -> { - if (queueMessage.isExpired()) { - logger.debug("Message expired " + queueMessage.getMessage()); - if (listener != null) { - listener.onMessageExpiration(queueMessage.getMessage()); - } else { - logger.warn("Listener not available for expired message "); - } - } else { - queueMessage.getMessage().run(); - } - }); - } catch (RejectedExecutionException ree) { - isEnqueued = false; - } - - return isEnqueued; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java deleted file mode 100644 index bb48da7e5..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java +++ /dev/null @@ -1,46 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.impl.object; - -import java.time.Instant; -import java.util.Objects; - - -public class QueueMessage { - private final M message; - private final Instant expirationTime; - public QueueMessage(M message, Instant expirationTime){ - this.message = message; - this.expirationTime = Objects.requireNonNull(expirationTime); - } - - public M getMessage() { - return message; - } - - public boolean isExpired() { - return expirationTime.isBefore(Instant.now()); - } -} -- cgit 1.2.3-korg