summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
blob: c6cbf3438b4f96795f3578b11d6b6c5152075442 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*-
 * ============LICENSE_START=======================================================
 * policy-endpoints
 * ================================================================================
 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.common.endpoints.event.comm.bus;

import java.util.List;

import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.internal.TopicBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * NOOP topic sink
 */
public class NoopTopicSink extends TopicBase implements TopicSink {

    /**
     * logger
     */
    private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class);

    /**
     * net logger
     */
    private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);

    /**
     * constructor
     * 
     * @param servers servers
     * @param topic topic
     * @throws IllegalArgumentException if an invalid argument has been passed in
     */
    public NoopTopicSink(List<String> servers, String topic) {
        super(servers, topic);
    }

    @Override
    public boolean send(String message) {

        if (message == null || message.isEmpty()) {
            throw new IllegalArgumentException("Message to send is empty");
        }

        if (!this.alive) {
            throw new IllegalStateException(this + " is stopped");
        }

        try {
            synchronized (this) {
                this.recentEvents.add(message);
            }

            netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(),
                    message);

            broadcast(message);
        } catch (Exception e) {
            logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
            return false;
        }

        return true;
    }

    @Override
    public CommInfrastructure getTopicCommInfrastructure() {
        return CommInfrastructure.NOOP;
    }

    @Override
    public boolean start() {
        logger.info("{}: starting", this);

        synchronized (this) {

            if (this.alive) {
                return true;
            }

            if (locked) {
                throw new IllegalStateException(this + " is locked.");
            }

            this.alive = true;
        }

        return true;
    }

    @Override
    public boolean stop() {
        synchronized (this) {
            this.alive = false;
        }
        return true;
    }

    @Override
    public void shutdown() {
        this.stop();
    }

    @Override
    public String toString() {
        return "NoopTopicSink [toString()=" + super.toString() + "]";
    }

}