diff options
author | Patrick Brady <pb071s@att.com> | 2017-02-15 23:11:26 -0800 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2017-02-15 23:13:06 -0800 |
commit | 1c192d2dd68724e292b6a30f463085a262e1e813 (patch) | |
tree | d0e2b3a396e169863cd0efaa835c8675e9d5aaac /appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src | |
parent | c69ba05c7508aa7d7f675189a45c8c87569369ef (diff) |
Moving all files to root directory
Change-Id: Ica5535fd6ec85f350fe1640b42137b49f83f10f0
Signed-off-by: Patrick Brady <pb071s@att.com>
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src')
10 files changed, 555 insertions, 0 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java new file mode 100644 index 000000000..c93920083 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/ExecutionQueueService.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +import java.util.concurrent.TimeUnit; + +import org.openecomp.appc.exceptions.APPCException; + +public interface ExecutionQueueService<M extends Runnable> { + void putMessage(M message) throws APPCException; + void putMessage(M message, long timeout, TimeUnit unit) throws APPCException; + void registerMessageExpirationListener(MessageExpirationListener listener); +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java new file mode 100644 index 000000000..cd5cbeab1 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/MessageExpirationListener.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +public interface MessageExpirationListener<M> { + void onMessageExpiration(M message); +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java new file mode 100644 index 000000000..835e881aa --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.helper; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; + +public class Util { + + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + + public static int DEFAULT_QUEUE_SIZE = 10; + public static int DEFAULT_THREADPOOL_SIZE = 10; + + public static int getExecutionQueSize(){ + String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE)); + int size = DEFAULT_QUEUE_SIZE; + try{ + size = Integer.parseInt(sizeStr); + } + catch (NumberFormatException e){ + + } + return size; + } + + public static int getThreadPoolSize(){ + String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE)); + int size = DEFAULT_THREADPOOL_SIZE; + try{ + size = Integer.parseInt(sizeStr); + } + catch (NumberFormatException e){ + + } + return size; + } + + public static ThreadFactory getThreadFactory(final boolean isDaemon){ + return new ThreadFactory() { + final ThreadFactory factory = Executors.defaultThreadFactory(); + public Thread newThread(Runnable r) { + Thread t = factory.newThread(r); + t.setDaemon(isDaemon); + return t; + } + }; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java new file mode 100644 index 000000000..01e4358e9 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceFactory.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl; + +import org.openecomp.appc.executionqueue.ExecutionQueueService; + +public class ExecutionQueueServiceFactory { + + private static ExecutionQueueService executionQueueService =null; + + public static ExecutionQueueService getExecutionQueueService(){ + if(executionQueueService == null){ + synchronized (ExecutionQueueServiceFactory.class){ + if(executionQueueService == null) + executionQueueService = new ExecutionQueueServiceImpl(); + } + } + return executionQueueService; + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java new file mode 100644 index 000000000..c2be1b4ac --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl; + +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.openecomp.appc.exceptions.APPCException; +import org.openecomp.appc.executionqueue.ExecutionQueueService; +import org.openecomp.appc.executionqueue.MessageExpirationListener; +import org.openecomp.appc.executionqueue.impl.object.QueueMessage; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> { + + private static final EELFLogger logger = + EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + + ExecutionQueueServiceImpl(){ + + } + + @Override + public void putMessage(M message) throws APPCException { + this.putMessage(message,-1,null); + } + + @Override + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ + QueueMessage queueMessage = null; + + try { + Date expirationTime = calculateExpirationTime(timeout,unit); + queueMessage = new QueueMessage(message,expirationTime); + QueueManager queueManager = QueueManager.getInstance(); + boolean enqueueTask = queueManager.enqueueTask(queueMessage); + if(!enqueueTask){ + throw new APPCException("failed to put message in queue"); + } + } catch (Exception e) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); + throw new APPCException(e); + } + } + + @Override + public void registerMessageExpirationListener(MessageExpirationListener listener) { + QueueManager.getInstance().setListener(listener); + } + + private Date calculateExpirationTime(long timeToLive, TimeUnit unit) { + Date expirationTime = null; + if(timeToLive > 0){ + long currentTime = System.currentTimeMillis(); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive)); + expirationTime = cal.getTime(); + } + return expirationTime; + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java new file mode 100644 index 000000000..2d4907fa1 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import org.openecomp.appc.executionqueue.MessageExpirationListener; +import org.openecomp.appc.executionqueue.helper.Util; +import org.openecomp.appc.executionqueue.impl.object.QueueMessage; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +public class QueueManager { + + private LinkedBlockingQueue<QueueMessage> queue; + + private MessageExpirationListener listener; + + private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize(); + + private static int MAX_THREAD_SIZE = Util.getThreadPoolSize(); + + private ExecutorService messageExecutor; + + private static final EELFLogger logger = + EELFManager.getInstance().getLogger(QueueManager.class); + + private QueueManager(){ + init(); + } + + private static class QueueManagerHolder { + private static final QueueManager INSTANCE = new QueueManager(); + } + + public static QueueManager getInstance() { + return QueueManagerHolder.INSTANCE; + } + + private void init(){ + queue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); + messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true)); + + for(int i=0;i<MAX_THREAD_SIZE;i++){ + messageExecutor.submit(new Runnable() { + @Override + public void run() { + while (true){ + try{ + QueueMessage queueMessage = queue.take(); + if(messageExpired(queueMessage)){ + logger.debug("Message expired "+ queueMessage.getMessage()); + if(listener != null){ + listener.onMessageExpiration(queueMessage.getMessage()); + } + else{ + logger.warn("Listener not available for expired message "); + } + } + else{ + queueMessage.getMessage().run(); + } + } catch (Exception e) { + logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage()); + } + } + } + }); + } + } + + public void setListener(MessageExpirationListener listener) { + this.listener = listener; + } + + public boolean enqueueTask(QueueMessage queueMessage) { + return queue.offer(queueMessage); + } + + private boolean messageExpired(QueueMessage queueMessage) { + if(queueMessage.getExpirationTime() != null){ + return queueMessage.getExpirationTime().getTime() < System.currentTimeMillis(); + } + return false; + } + +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java new file mode 100644 index 000000000..bbf805871 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/object/QueueMessage.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue.impl.object; + +import java.util.Date; + + +public class QueueMessage<M extends Runnable> { + M message; + Date expirationTime; + public QueueMessage(M message, Date expirationTime){ + this.message = message; + this.expirationTime = expirationTime; + } + + public M getMessage() { + return message; + } + + public Date getExpirationTime() { + return expirationTime; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/Listener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/Listener.java new file mode 100644 index 000000000..294ae61b1 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/Listener.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +import org.openecomp.appc.executionqueue.MessageExpirationListener; + + +public class Listener implements MessageExpirationListener { + + boolean listenerExecuted = false; + + public boolean isListenerExecuted() { + return listenerExecuted; + } + + @Override + public void onMessageExpiration(Object message) { + listenerExecuted = true; + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/Message.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/Message.java new file mode 100644 index 000000000..61c847842 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/Message.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + + +public class Message implements Runnable { + + boolean runExecuted = false; + + public boolean isRunExecuted() { + return runExecuted; + } + + @Override + public void run() { + this.runExecuted = true; + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java new file mode 100644 index 000000000..a8a20c10a --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java @@ -0,0 +1,72 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.openecomp.appc.exceptions.APPCException; +import org.openecomp.appc.executionqueue.ExecutionQueueService; +import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory; +import org.powermock.api.mockito.PowerMockito; + +import java.util.concurrent.TimeUnit; + + +public class TestExecutionQueueService { + + @Test + public void testPositiveFlow(){ + Message message = new Message(); + ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService(); + try { + service.putMessage(message); + waitFor(5000); + Assert.assertTrue(message.isRunExecuted()); + } catch (APPCException e) { + Assert.fail(e.toString()); + } + } + +// @Test + public void testTimeout(){ + ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService(); + Message message = new Message(); + Listener listener = new Listener(); + service.registerMessageExpirationListener(listener); + try { + service.putMessage(message,1, TimeUnit.MILLISECONDS); + waitFor(5000); + Assert.assertTrue(listener.isListenerExecuted()); + } catch (APPCException e) { + e.printStackTrace(); + } + } + + private void waitFor(long milliSeconds){ + try { + Thread.sleep(milliSeconds); + } catch (InterruptedException e) { + Assert.fail(e.toString()); + } + } +} |