aboutsummaryrefslogtreecommitdiffstats
path: root/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/core/CoreManager.java
blob: 4c0c535d427ffc2cc7278ffe01abe6e5e46a49ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*-
 * ============LICENSE_START=======================================================
 * ONAP : APPC
 * ================================================================================
 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Copyright (C) 2017 Amdocs
 * =============================================================================
 * Modifications Copyright (C) 2019 IBM
 * =============================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * ============LICENSE_END=========================================================
 */

package org.onap.appc.client.impl.core;

import org.onap.appc.client.impl.protocol.*;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
 */
class CoreManager{

    private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
    private final ProtocolFactory protocolFactory;
    protected  AsyncProtocol protocol;
    private final RetrieveMessageCallback protocolCallback = null;
    private final CoreRegistry registry;
    private final ITimerService timerService;
    private final TaskQueueManager queueManager;
    private String DEFAULT_TIMEOUT = "300000";
    private final static String RESPONSE_TIMEOUT = "client.response.timeout";
    private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
    private boolean isForceShutdown = false;
    private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
    private long shutdownTimeout;

    CoreManager(Properties prop) throws CoreException {
        protocolFactory = ProtocolFactory.getInstance();
        try {
            initProtocol(prop);
        }catch (ProtocolException e){
            throw new CoreException(e);
        }
        registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
        String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
        long responseTimeout = Long.parseLong(timeoutProp);
        String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
        shutdownTimeout = Long.parseLong(gracefulTimeout);
        timerService = new TimerServiceImpl(responseTimeout);
        queueManager = new TaskQueueManager(prop);
        listenShutdown();
    }

    /**
     * initiates protocol layer services.
     * @param prop - Properties
     */
    private void initProtocol(Properties prop) throws ProtocolException {
        protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
        protocol.init(prop, getProtocolCallback());
    }

    /**
     * Creates protocol response callback
     * @return - @{@link ProtocolResponseCallbackImpl}
     */
    RetrieveMessageCallback getProtocolCallback(){
        return new ProtocolResponseCallbackImpl();
    }

    /**
     * Registers a new handler in registry
     * @param corrID - Correlation ID
     * @param requestResponseHandler handler to be called when response arrives
     */
    void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
        registry.register(corrID, requestResponseHandler);
    }

    /**
     * Remove a handler from registry service by correlation ID.
     * @param corrID - Correlation ID
     * @return - @{@link RequestResponseHandler}
     */
    RequestResponseHandler unregisterHandler(String corrID){
        return (RequestResponseHandler) registry.unregister(corrID);
    }

    /**
     * Checks in registry service if a handler is existing.
     * @param corrID - Correlation ID
     * @return - boolean
     */
    boolean isExistHandler(String corrID) {
        return registry.isExist(corrID);
    }

    /**
     * Starts timer for timeout event when a request was send successfully.
     * @param corrID - Correlation ID
     */
    void startTimer(String corrID){
        timerService.add(corrID, new TimeoutHandlerImpl(corrID));
    }

    /**
     * Cancels timer for fimeout event, in case when complete response was received
     * @param corrID
     */
    void cancelTimer(String corrID){
        timerService.cancel(corrID);
    }

    /**
     * Submits a new task to Queue manager. it is using for both response and timeout tasks
     * @param corrID - Correlation ID
     * @param task - @{@link Runnable} task.
     * @throws InterruptedException
     */
    void submitTask(String corrID, Runnable task) throws InterruptedException {
        queueManager.submit(corrID, task);
    }

    /**
     * Sends request to protocol.
     * @param request - Request
     * @param corrId - Correlation ID
     * @param rpcName - RPC name
     * @throws CoreException - @{@link CoreException}
     */
    void sendRequest(String request, String corrId, String rpcName) throws CoreException {
        MessageContext ctx = getMessageContext(corrId, rpcName);
        try {
            protocol.sendRequest(request, ctx);
        } catch (ProtocolException e) {
            unregisterHandler(corrId);
            throw new CoreException(e);
        }
    }

    /**
     * Creates @{@link MessageContext}
     * @param correlationId - Correlation ID
     * @param rpcName - RPC Name
     * @return - @{@link MessageContext}
     */
    private MessageContext getMessageContext(String correlationId, String rpcName){
        MessageContext msgCtx = new MessageContext();
        msgCtx.setCorrelationID(correlationId);
        msgCtx.setRpc(rpcName);
        return msgCtx;
    }

    /**
     * Implements response callback from protocol and filters responses by correlation ID.
     * Only registered events(by correlation ID) will be handled.
     */
    private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
        @Override
        public void onResponse(String response, MessageContext context) {
            String corrID = context.getCorrelationID();
            if (corrID != null) {
                RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
                if (messageHandler != null) {
                    LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
                    messageHandler.handleResponse(context, response);
                }
            }
        }
    }


    /**
     * listens to @{@link Runtime} shutdown event
     */
    private void listenShutdown() {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            public void run(){
                gracefulShutdown();
            }
        });
    }

    /**
     * Implements shutdown for client library.
     * @param isForceShutdown - true force shutdown, false graceful shutdown
     */
    void shutdown(boolean isForceShutdown){
        if(isForceShutdown){
            forceShutdown();
        }else{
            gracefulShutdown();
        }
    }

    /**
     * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
     * shutdown only when either all request will be handled or graceful shutdown will be time out.
     */
    synchronized void gracefulShutdown(){
        isGracefulShutdown.set(true);
        if(registry.isEmpty()){
            forceShutdown();
        }
        else{
            try {
                LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
                wait(shutdownTimeout);
                LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
                forceShutdown();
            } catch (InterruptedException e) {
                LOG.error("Interrupted Exception during gracefulShutdown ::", e);
                Thread.currentThread().interrupt();
            }

        }
    }

    /**
     * Closes Protocol, stops Queue Manager and shutdowns Time Service.
     */
    private void forceShutdown(){
        isForceShutdown = true;
        try {
            LOG.info("Starting shutdown process.");
            protocol.shutdown();
            queueManager.stopQueueManager();
            timerService.shutdown();
        } catch (InterruptedException e) {
            LOG.info("Client library shutdown in progress ", e);
            Thread.currentThread().interrupt();
        }
    }

    /**
     *
     * @return - true when shutdown is in process
     */
    boolean isShutdownInProgress(){
        return isForceShutdown || isGracefulShutdown.get();
    }

    /**
     * Timeout handler implementation.
     * This handler is responsible to assign a task for handling of timeout events.
     *
     */
    private class TimeoutHandlerImpl implements ITimeoutHandler {

        private final String corrID;

        TimeoutHandlerImpl(String corrID) {
            this.corrID = corrID;
        }

        /**
         * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
         * this queue is shared between both timeout and handlers which belong to same correlation ID.
         */
        @Override
        public void onTimeout() {
            try {
                submitTask(corrID, new Runnable() {
                    @Override
                    public void run() {
                        RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
                        if (requestResponseHandler != null) {
                            requestResponseHandler.onTimeOut();
                        }
                    }
                });
            } catch (InterruptedException e) {
                LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
                Thread.currentThread().interrupt();
            }
        }
    }


    /**
     * Wakes Up graceful shutdown.
     */
    class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
        @Override
        public synchronized void emptyCallback() {
            LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
            if(isGracefulShutdown.get()){
                wakeUpShutdown();
            }
        }
    }

    /**
     * wakes up waiting shutdown.
     */
    private synchronized void wakeUpShutdown(){
        notifyAll();
    }

}