summaryrefslogtreecommitdiffstats
path: root/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/AsyncTaskHelper.java
blob: db6033752d0de97e883ae77c663a4153f940edea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*-
 * ============LICENSE_START=======================================================
 * ONAP : APPC
 * ================================================================================
 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Copyright (C) 2017 Amdocs
 * =============================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
 * ============LICENSE_END=========================================================
 */

package org.openecomp.appc.oam.util;

import com.att.eelf.configuration.EELFLogger;
import org.openecomp.appc.oam.AppcOam;
import org.openecomp.appc.oam.processor.BaseActionRunnable;
import org.osgi.framework.Bundle;
import org.osgi.framework.FrameworkUtil;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Utility class provides general async task related help.
 */
@SuppressWarnings("unchecked")
public class AsyncTaskHelper {
    final int MMODE_TASK_DELAY = 10000;
    final int COMMON_INITIAL_DELAY = 0;
    final int COMMON_INTERVAL = 1000;

    private final EELFLogger logger;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ThreadPoolExecutor bundleOperationService;

    /** Reference to the Async task */
    private volatile Future<?> backgroundOamTask;
    /** Reference to the runnable of Async task */
    private volatile BaseActionRunnable taskRunnable;

    /**
     * Constructor
     * @param eelfLogger of the logger
     */
    public AsyncTaskHelper(EELFLogger eelfLogger) {
        logger = eelfLogger;

        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
                (runnable) -> {
                    Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
                    return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
                }
        );

        bundleOperationService = new ThreadPoolExecutor(
                0,
                10,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue(),// BlockingQueue<Runnable> workQueue
                (runnable) -> new Thread(runnable, "OAM bundler operation executor")//ThreadFactory
        );
    }

    void addThreadsToPool() {
        bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
    }

    void removeThreadsFromPoolWhenDone() {
        bundleOperationService.setCorePoolSize(0);
    }

    /**
     * Terminate the class <bS>ScheduledExecutorService</b>
     */
    public void close() {
        logDebug("Start shutdown scheduleExcutorService.");
        scheduledExecutorService.shutdown();
        bundleOperationService.shutdown();
        logDebug("Completed shutdown scheduleExcutorService.");
    }

    /**
     * Get current async task refernce
     * @return the class <b>backgroundOamTask</b>
     */
    public Future<?> getCurrentAsyncTask() {
        return backgroundOamTask;
    }

    /**
     * Schedule a service for async task with the passed in parameters
     * @param rpc of the REST API call, decides how to schedule the service
     * @param runnable of the to be scheduled service.
     * @return the reference of the scheduled task
     */
    public Future<?> scheduleAsyncTask(final AppcOam.RPC rpc, final BaseActionRunnable runnable) {
        int initialDelay, interval;
        switch (rpc) {
            case maintenance_mode:
                initialDelay = interval =MMODE_TASK_DELAY;
                break;
            case start:
            case stop:
            case restart:
                initialDelay = COMMON_INITIAL_DELAY;
                interval = COMMON_INTERVAL;
                break;
            default:
                // should not get here. Log it and return null
                logDebug(String.format("Cannot scheudle task for unsupported RPC(%s).", rpc.name()));
                return null;
        }

        // Always cancel existing  async task
        if (backgroundOamTask != null) {
            logDebug("Cancelling background task in schedule task.");
            backgroundOamTask.cancel(true);
            if (taskRunnable != null) {
                taskRunnable.abortRunnable(rpc);
            }
        }

        taskRunnable = runnable;
        backgroundOamTask = scheduledExecutorService.scheduleWithFixedDelay(
                runnable, initialDelay, interval, TimeUnit.MILLISECONDS);

        return backgroundOamTask;
    }

    Future<?> submitBundleLcOperation(final Callable callable) {
        return bundleOperationService.submit(callable);
    }

    /**
     * Cancle a previously schedule task. If the task is the same as backgroundOamTask, set it to null.
     * @param task to be canceled
     */
    public void cancelAsyncTask(Future<?> task) {
        task.cancel(false);
        if (task == backgroundOamTask) {
            backgroundOamTask = null;
            taskRunnable = null;
            logDebug("Cancelling background task in cancel task.");
        }
    }

    /**
     * Genral debug log when debug logging level is enabled.
     * @param message of the log message format
     * @param args of the objects listed in the message format
     */
    private void logDebug(String message, Object... args) {
        if (logger.isDebugEnabled()) {
            logger.debug(String.format(message, args));
        }
    }
}