/*- * ============LICENSE_START======================================================= * ONAP * ================================================================================ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.common.endpoints.event.comm.client; import java.util.Arrays; import java.util.List; import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; /** * A "bidirectional" topic, which is a pair of topics, one of which is used to publish * requests and the other to receive responses. */ @Getter public class BidirectionalTopicClient { private final String sinkTopic; private final String sourceTopic; private final TopicSinkClient sinkClient; private final TopicSource source; private final CommInfrastructure sinkTopicCommInfrastructure; private final CommInfrastructure sourceTopicCommInfrastructure; /** * Constructs the object. * * @param sinkTopic sink topic name * @param sourceTopic source topic name * @throws BidirectionalTopicClientException if either topic does not exist */ public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { this.sinkTopic = sinkTopic; this.sourceTopic = sourceTopic; // init sinkClient try { // if the manager is overridden here, then override it in the sink client, too this.sinkClient = new TopicSinkClient(sinkTopic) { @Override protected List getTopicSinks(String topic) { return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic); } }; } catch (TopicSinkClientException e) { throw new BidirectionalTopicClientException(e); } // init source List sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); if (sources.isEmpty()) { throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic); } else if (sources.size() > 1) { throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic); } this.source = sources.get(0); this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure(); this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure(); } public TopicSink getSink() { return sinkClient.getSink(); } public boolean send(Object message) { return sinkClient.send(message); } public void register(TopicListener topicListener) { source.register(topicListener); } public boolean offer(String event) { return source.offer(event); } public void unregister(TopicListener topicListener) { source.unregister(topicListener); } // these may be overridden by junit tests protected TopicEndpoint getTopicEndpointManager() { return TopicEndpointManager.getManager(); } }