summaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
blob: 9904847aaa9ec6f0f6bf70aa138b115fd76802e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
/*-
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * SPDX-License-Identifier: Apache-2.0
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.apex.service.engine.main;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.service.engine.event.ApexEvent;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventProducer;
import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
import org.onap.policy.apex.service.engine.event.impl.EventProducerFactory;
import org.onap.policy.apex.service.engine.event.impl.EventProtocolFactory;
import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;

/**
 * This event marshaler handles events coming out of Apex and sends them on, handles threading,
 * event queuing, transformations and sending using the configured sending technology.
 *
 * @author Liam Fallon (liam.fallon@ericsson.com)
 */
public class ApexEventMarshaller implements ApexEventListener, Runnable {
    // Get a reference to the logger
    private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEventMarshaller.class);

    // Interval to wait between thread shutdown checks
    private static final int MARSHALLER_SHUTDOWN_WAIT_INTERVAL = 10;

    // The amount of time to wait between polls of the event queue in milliseconds
    private static final long EVENT_QUEUE_POLL_INTERVAL = 20;

    // The name of the marshaler
    private final String name;

    // The engine service and producer parameters
    private final EngineServiceParameters engineServiceParameters;
    private final EventHandlerParameters producerParameters;

    // Apex event producer and event converter, all conversions are to and from string
    // representation of events
    private ApexEventProducer producer;
    private ApexEventProtocolConverter converter;

    // Temporary event holder for events coming out of Apex
    private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();

    // The marshaler thread and stopping flag
    private Thread marshallerThread;
    private boolean stopOrderedFlag = false;

    /**
     * Create the marshaler.
     *
     * @param name the name of the marshaler
     * @param engineServiceParameters the engine service parameters for this Apex engine
     * @param producerParameters the producer parameters for this specific marshaler
     */
    public ApexEventMarshaller(final String name, final EngineServiceParameters engineServiceParameters,
            final EventHandlerParameters producerParameters) {
        this.name = name;
        this.engineServiceParameters = engineServiceParameters;
        this.producerParameters = producerParameters;
    }

    /**
     * Configure the marshaler by setting up the producer and event converter and initialize the
     * thread for event sending.
     *
     * @throws ApexActivatorException on errors initializing the producer
     * @throws ApexEventException on errors initializing event handling
     */
    public void init() throws ApexActivatorException, ApexEventException {
        // Create the producer for sending events and the converter for transforming events
        producer = new EventProducerFactory().createProducer(name, producerParameters);

        // Initialize the producer
        producer.init(this.name, this.producerParameters);

        // Create the converter for transforming events
        converter = new EventProtocolFactory().createConverter(name, producerParameters.getEventProtocolParameters());

        // Configure and start the event sending thread
        final String threadName =
                engineServiceParameters.getEngineKey().getName() + ':' + this.getClass().getName() + ':' + this.name;
        marshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
        marshallerThread.setDaemon(true);
        marshallerThread.start();
    }

    /**
     * Gets the name of the marshaler.
     *
     * @return the marshaler name
     */
    public String getName() {
        return name;
    }

    /**
     * Gets the technology specific producer for this marshaler.
     *
     * @return the producer
     */
    public ApexEventProducer getProducer() {
        return producer;
    }

    /**
     * Gets the event protocol converter for this marshaler.
     *
     * @return the event protocol converter
     */
    public ApexEventProtocolConverter getConverter() {
        return converter;
    }

    /**
     * Callback method called on implementations of this interface when Apex emits an event.
     *
     * @param apexEvent the apex event emitted by Apex
     */
    @Override
    public void onApexEvent(final ApexEvent apexEvent) {
        // Check if we are filtering events on this marshaler, if so check the event name against
        // the filter
        if (producerParameters.isSetEventNameFilter()
                && !apexEvent.getName().matches(producerParameters.getEventNameFilter())) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("onMessage(): event {} not processed, filtered  out by filter", apexEvent,
                        producerParameters.getEventNameFilter());
            }

            // Ignore this event
            return;
        }

        // Push the event onto the queue for handling
        try {
            queue.put(apexEvent);
        } catch (final InterruptedException e) {
            // restore the interrupt status
            Thread.currentThread().interrupt();
            LOGGER.warn("Failed to queue the event: " + apexEvent, e);
        }
    }

    /**
     * Run a thread that runs forever (well until system termination anyway) and listens for
     * outgoing events on the queue.
     */
    @Override
    public void run() {
        // Run until interrupted
        while (marshallerThread.isAlive() && !stopOrderedFlag) {
            try {
                // Take the next event from the queue
                final ApexEvent apexEvent = queue.poll(EVENT_QUEUE_POLL_INTERVAL, TimeUnit.MILLISECONDS);
                if (apexEvent == null) {
                    continue;
                }

                // Process the next Apex event from the queue
                final Object event = converter.fromApexEvent(apexEvent);

                producer.sendEvent(apexEvent.getExecutionID(), apexEvent.getName(), event);

                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("event sent : " + apexEvent.toString());
                }
            } catch (final InterruptedException e) {
                // restore the interrupt status
                Thread.currentThread().interrupt();
                LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
                break;
            } catch (final Exception e) {
                LOGGER.warn("Error while forwarding events for " + marshallerThread.getName(), e);
                continue;
            }
        }

        // Stop event production if we are not synchronized,;in the synchronized case, the producer
        // takes care of its own cleanup.
        producer.stop();
    }

    /**
     * Get the marshaler thread.
     *
     * @return the marshaler thread
     */
    public Thread getThread() {
        return marshallerThread;
    }

    /**
     * Stop the Apex event marshaller's event producer using its termination mechanism.
     */
    public void stop() {
        LOGGER.entry("shutting down Apex event marshaller . . .");

        // Order the stop
        stopOrderedFlag = true;

        // Wait for thread shutdown
        while (marshallerThread.isAlive()) {
            ThreadUtilities.sleep(MARSHALLER_SHUTDOWN_WAIT_INTERVAL);
        }

        LOGGER.exit("shut down Apex event marshaller");
    }
}