aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
blob: 6032d17e16086311e08d5b9c00c87800aa7c927c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
 * ============LICENSE_START=======================================================
 * ONAP PAP
 * ================================================================================
 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.pap.main.comm;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.pap.main.PolicyPapException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Publishes messages to a topic. Maintains a queue of references to data that is to be
 * published. Once the publisher removes a reference from a queue, it sets it to
 * {@link null} to indicate that it is being processed. Until it has been set to
 * {@link null}, clients are free to atomically update the reference to new values, thus
 * maintaining their place in the queue.
 *
 * <p>This class has not been tested for multiple threads invoking {@link #run()}
 * simultaneously.
 */
public class Publisher implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Publisher.class);

    /**
     * Used to send to the topic.
     */
    private final TopicSinkClient client;

    /**
     * Request queue. The references may contain {@code null}.
     */
    private final BlockingQueue<QueueToken<PdpMessage>> queue = new LinkedBlockingQueue<>();

    /**
     * Set to {@code true} to cause the publisher to stop running.
     */
    private volatile boolean stopNow = false;

    /**
     * Constructs the object.
     *
     * @param topic name of the topic to which to publish
     * @throws PolicyPapException if the topic sink does not exist
     */
    public Publisher(String topic) throws PolicyPapException {
        try {
            this.client = new TopicSinkClient(topic);
        } catch (TopicSinkClientException e) {
            throw new PolicyPapException(e);
        }
    }

    /**
     * Stops the publisher, if it's running.
     */
    public void stop() {
        stopNow = true;

        // add an empty reference so the thread doesn't block on the queue
        queue.add(new QueueToken<>(null));
    }

    /**
     * Adds an item to the queue. The referenced objects are assumed to be POJOs and will
     * be converted to JSON via the {@link StandardCoder} prior to publishing.
     *
     * @param ref reference to the message to be published
     */
    public void enqueue(QueueToken<PdpMessage> ref) {
        queue.add(ref);
    }

    /**
     * Continuously publishes items in the queue until {@link #stop()} is invoked.
     */
    @Override
    public void run() {
        for (;;) {
            QueueToken<PdpMessage> token = getNext();

            if (stopNow) {
                // unblock any other publisher threads
                queue.offer(new QueueToken<>(null));
                break;
            }

            PdpMessage data = token.replaceItem(null);
            if (data == null) {
                continue;
            }

            client.send(data);
        }
    }

    /**
     * Gets the next item from the queue. If the thread is interrupted, then it sets
     * {@link #stopNow}.
     *
     * @return the next item, or a reference containing {@code null} if this is
     *         interrupted
     */
    private QueueToken<PdpMessage> getNext() {
        try {
            return queue.take();

        } catch (InterruptedException e) {
            logger.warn("Publisher stopping due to interrupt");
            stopNow = true;
            Thread.currentThread().interrupt();
            return new QueueToken<>(null);
        }
    }
}