diff options
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main')
7 files changed, 402 insertions, 0 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java new file mode 100644 index 000000000..c93920083 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +import java.util.concurrent.TimeUnit; + +import org.openecomp.appc.exceptions.APPCException; + +public interface ExecutionQueueService<M extends Runnable> { + void putMessage(M message) throws APPCException; + void putMessage(M message, long timeout, TimeUnit unit) throws APPCException; + void registerMessageExpirationListener(MessageExpirationListener listener); +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java new file mode 100644 index 000000000..cd5cbeab1 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +public interface MessageExpirationListener<M> { + void onMessageExpiration(M message); +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java new file mode 100644 index 000000000..835e881aa --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.helper; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; + +public class Util { + + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + + public static int DEFAULT_QUEUE_SIZE = 10; + public static int DEFAULT_THREADPOOL_SIZE = 10; + + public static int getExecutionQueSize(){ + String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE)); + int size = DEFAULT_QUEUE_SIZE; + try{ + size = Integer.parseInt(sizeStr); + } + catch (NumberFormatException e){ + + } + return size; + } + + public static int getThreadPoolSize(){ + String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE)); + int size = DEFAULT_THREADPOOL_SIZE; + try{ + size = Integer.parseInt(sizeStr); + } + catch (NumberFormatException e){ + + } + return size; + } + + public static ThreadFactory getThreadFactory(final boolean isDaemon){ + return new ThreadFactory() { + final ThreadFactory factory = Executors.defaultThreadFactory(); + public Thread newThread(Runnable r) { + Thread t = factory.newThread(r); + t.setDaemon(isDaemon); + return t; + } + }; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java new file mode 100644 index 000000000..01e4358e9 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl; + +import org.openecomp.appc.executionqueue.ExecutionQueueService; + +public class ExecutionQueueServiceFactory { + + private static ExecutionQueueService executionQueueService =null; + + public static ExecutionQueueService getExecutionQueueService(){ + if(executionQueueService == null){ + synchronized (ExecutionQueueServiceFactory.class){ + if(executionQueueService == null) + executionQueueService = new ExecutionQueueServiceImpl(); + } + } + return executionQueueService; + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java new file mode 100644 index 000000000..c2be1b4ac --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl; + +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.openecomp.appc.exceptions.APPCException; +import org.openecomp.appc.executionqueue.ExecutionQueueService; +import org.openecomp.appc.executionqueue.MessageExpirationListener; +import org.openecomp.appc.executionqueue.impl.object.QueueMessage; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> { + + private static final EELFLogger logger = + EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + + ExecutionQueueServiceImpl(){ + + } + + @Override + public void putMessage(M message) throws APPCException { + this.putMessage(message,-1,null); + } + + @Override + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ + QueueMessage queueMessage = null; + + try { + Date expirationTime = calculateExpirationTime(timeout,unit); + queueMessage = new QueueMessage(message,expirationTime); + QueueManager queueManager = QueueManager.getInstance(); + boolean enqueueTask = queueManager.enqueueTask(queueMessage); + if(!enqueueTask){ + throw new APPCException("failed to put message in queue"); + } + } catch (Exception e) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); + throw new APPCException(e); + } + } + + @Override + public void registerMessageExpirationListener(MessageExpirationListener listener) { + QueueManager.getInstance().setListener(listener); + } + + private Date calculateExpirationTime(long timeToLive, TimeUnit unit) { + Date expirationTime = null; + if(timeToLive > 0){ + long currentTime = System.currentTimeMillis(); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive)); + expirationTime = cal.getTime(); + } + return expirationTime; + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java new file mode 100644 index 000000000..2d4907fa1 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import org.openecomp.appc.executionqueue.MessageExpirationListener; +import org.openecomp.appc.executionqueue.helper.Util; +import org.openecomp.appc.executionqueue.impl.object.QueueMessage; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +public class QueueManager { + + private LinkedBlockingQueue<QueueMessage> queue; + + private MessageExpirationListener listener; + + private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize(); + + private static int MAX_THREAD_SIZE = Util.getThreadPoolSize(); + + private ExecutorService messageExecutor; + + private static final EELFLogger logger = + EELFManager.getInstance().getLogger(QueueManager.class); + + private QueueManager(){ + init(); + } + + private static class QueueManagerHolder { + private static final QueueManager INSTANCE = new QueueManager(); + } + + public static QueueManager getInstance() { + return QueueManagerHolder.INSTANCE; + } + + private void init(){ + queue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); + messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true)); + + for(int i=0;i<MAX_THREAD_SIZE;i++){ + messageExecutor.submit(new Runnable() { + @Override + public void run() { + while (true){ + try{ + QueueMessage queueMessage = queue.take(); + if(messageExpired(queueMessage)){ + logger.debug("Message expired "+ queueMessage.getMessage()); + if(listener != null){ + listener.onMessageExpiration(queueMessage.getMessage()); + } + else{ + logger.warn("Listener not available for expired message "); + } + } + else{ + queueMessage.getMessage().run(); + } + } catch (Exception e) { + logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage()); + } + } + } + }); + } + } + + public void setListener(MessageExpirationListener listener) { + this.listener = listener; + } + + public boolean enqueueTask(QueueMessage queueMessage) { + return queue.offer(queueMessage); + } + + private boolean messageExpired(QueueMessage queueMessage) { + if(queueMessage.getExpirationTime() != null){ + return queueMessage.getExpirationTime().getTime() < System.currentTimeMillis(); + } + return false; + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java new file mode 100644 index 000000000..bbf805871 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl.object; + +import java.util.Date; + + +public class QueueMessage<M extends Runnable> { + M message; + Date expirationTime; + public QueueMessage(M message, Date expirationTime){ + this.message = message; + this.expirationTime = expirationTime; + } + + public M getMessage() { + return message; + } + + public Date getExpirationTime() { + return expirationTime; + } +} |