diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal')
9 files changed, 1692 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java new file mode 100644 index 00000000..6fee5ce0 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -0,0 +1,204 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Properties; + +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; + +/** + * Wrapper around libraries to consume from message bus + * + */ +public interface BusConsumer { + + /** + * fetch messages + * + * @return list of messages + * @throws Exception when error encountered by underlying libraries + */ + public Iterable<String> fetch() throws Exception; + + /** + * close underlying library consumer + */ + public void close(); + + /** + * Cambria based consumer + */ + public static class CambriaConsumerWrapper implements BusConsumer { + /** + * Cambria client + */ + protected CambriaConsumer consumer; + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws IllegalArgumentException { + + ConsumerBuilder builder = + new CambriaClientBuilders.ConsumerBuilder(); + + builder.knownAs(consumerGroup, consumerInstance) + .usingHosts(servers) + .onTopic(topic) + .waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit); + + if (apiKey != null && !apiKey.isEmpty() && + apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * {@inheritDoc} + */ + public Iterable<String> fetch() throws Exception { + return this.consumer.fetch(); + } + + /** + * {@inheritDoc} + */ + public void close() { + this.consumer.close(); + } + + @Override + public String toString() { + return "CambriaConsumerWrapper []"; + } + } + + /** + * MR based consumer + */ + public static class DmaapConsumerWrapper implements BusConsumer { + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + */ + public DmaapConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String aafLogin, String aafPassword, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws Exception { + + this.consumer = new MRConsumerImpl(servers, topic, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, + null, apiKey, apiSecret); + + this.consumer.setUsername(aafLogin); + this.consumer.setPassword(aafPassword); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + this.consumer.setProps(props); + this.consumer.setHost(servers.get(0) + ":3904");; + } + + /** + * {@inheritDoc} + */ + public Iterable<String> fetch() throws Exception { + return this.consumer.fetch(); + } + + /** + * {@inheritDoc} + */ + public void close() { + this.consumer.close(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder. + append("DmaapConsumerWrapper ["). + append("consumer.getAuthDate()=").append(consumer.getAuthDate()). + append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). + append(", consumer.getHost()=").append(consumer.getHost()). + append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). + append(", consumer.getUsername()=").append(consumer.getUsername()). + append("]"); + return builder.toString(); + } + } + + +} + + + + diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java new file mode 100644 index 00000000..798bf989 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -0,0 +1,231 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.openecomp.policy.common.logging.eelf.PolicyLogger; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import com.fasterxml.jackson.annotation.JsonIgnore; + +public interface BusPublisher { + + /** + * sends a message + * + * @param partition id + * @param message the message + * @return true if success, false otherwise + * @throws IllegalArgumentException if no message provided + */ + public boolean send(String partitionId, String message) throws IllegalArgumentException; + + /** + * closes the publisher + */ + public void close(); + + /** + * Cambria based library publisher + */ + public static class CambriaPublisherWrapper implements BusPublisher { + + /** + * The actual Cambria publisher + */ + @JsonIgnore + protected volatile CambriaBatchingPublisher publisher; + + public CambriaPublisherWrapper(List<String> servers, String topic, + String apiKey, + String apiSecret) + throws IllegalArgumentException { + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); + + builder.usingHosts(servers) + .onTopic(topic); + + // Only supported in 0.2.4 version + // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER); + + if (apiKey != null && !apiKey.isEmpty() && + apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.publisher = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) + throws IllegalArgumentException { + if (message == null) + throw new IllegalArgumentException("No message provided"); + + try { + this.publisher.send(partitionId, message); + } catch (Exception e) { + PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), + "SEND of " + message + " IN " + + this + " cannot be performed because of " + + e.getMessage()); + return false; + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (PolicyLogger.isInfoEnabled()) + PolicyLogger.info(CambriaPublisherWrapper.class.getName(), + "CREATION: " + this); + + try { + this.publisher.close(); + } catch (Exception e) { + PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), + "CLOSE on " + this + " FAILED because of " + + e.getMessage()); + } + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CambriaPublisherWrapper ["). + append("publisher.getPendingMessageCount()="). + append(publisher.getPendingMessageCount()). + append("]"); + return builder.toString(); + } + + } + + /** + * DmaapClient library wrapper + */ + public static class DmaapPublisherWrapper implements BusPublisher { + /** + * MR based Publisher + */ + protected MRSimplerBatchPublisher publisher; + + public DmaapPublisherWrapper(List<String> servers, String topic, + String aafLogin, + String aafPassword) { + + ArrayList<String> dmaapServers = new ArrayList<String>(); + for (String server: servers) { + dmaapServers.add(server + ":3904"); + } + + this.publisher = + new MRSimplerBatchPublisher.Builder(). + againstUrls(dmaapServers). + onTopic(topic). + build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + this.publisher.setUsername(aafLogin); + this.publisher.setPassword(aafPassword); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + props.setProperty("contenttype", "application/json"); + + this.publisher.setProps(props); + + this.publisher.setHost(servers.get(0)); + + if (PolicyLogger.isInfoEnabled()) + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), + "CREATION: " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (PolicyLogger.isInfoEnabled()) + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), + "CREATION: " + this); + + try { + this.publisher.close(1, TimeUnit.SECONDS); + } catch (Exception e) { + PolicyLogger.warn(DmaapPublisherWrapper.class.getName(), + "CLOSE: " + this + " because of " + + e.getMessage()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) + throws IllegalArgumentException { + if (message == null) + throw new IllegalArgumentException("No message provided"); + + this.publisher.send(partitionId, message); + return true; + + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("DmaapPublisherWrapper ["). + append("publisher.getAuthDate()=").append(publisher.getAuthDate()). + append(", publisher.getAuthKey()=").append(publisher.getAuthKey()). + append(", publisher.getHost()=").append(publisher.getHost()). + append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()). + append(", publisher.getUsername()=").append(publisher.getUsername()). + append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()). + append("]"); + return builder.toString(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java new file mode 100644 index 00000000..e36e3afc --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.apache.commons.collections4.queue.CircularFifoQueue; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.BusTopic; + +public abstract class BusTopicBase implements BusTopic, Topic { + + protected List<String> servers; + + protected String topic; + + protected String apiKey; + protected String apiSecret; + + protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10); + + public BusTopicBase(List<String> servers, + String topic, + String apiKey, + String apiSecret) + throws IllegalArgumentException { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("An UEB Topic must be provided"); + } + + this.servers = servers; + this.topic = topic; + + this.apiKey = apiKey; + this.apiSecret = apiSecret; + } + + /** + * {@inheritDoc} + */ + @Override + public String getTopic() { + return topic; + } + + /** + * {@inheritDoc} + */ + @Override + public List<String> getServers() { + return servers; + } + + /** + * {@inheritDoc} + */ + @Override + public String getApiKey() { + return apiKey; + } + + /** + * {@inheritDoc} + */ + @Override + public String getApiSecret() { + return apiSecret; + } + + /** + * @return the recentEvents + */ + @Override + public synchronized String[] getRecentEvents() { + String[] events = new String[recentEvents.size()]; + return recentEvents.toArray(events); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=") + .append(apiKey).append(", apiSecret=").append(apiSecret).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java new file mode 100644 index 00000000..bd88818b --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java @@ -0,0 +1,284 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import org.openecomp.policy.drools.event.comm.bus.BusTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.eelf.MessageCodes; + +/** + * Transport Agnostic Bus Topic Sink to carry out the core functionality + * to interact with a sink regardless if it is UEB or DMaaP. + * + */ +public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { + + /** + * logger + */ + private static org.openecomp.policy.common.logging.flexlogger.Logger logger = + FlexLogger.getLogger(InlineBusTopicSink.class); + + /** + * Not to be converted to PolicyLogger. + * This will contain all in/out traffic and only that in a single file in a concise format. + */ + protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER); + + /** + * The partition key to publish to + */ + protected String partitionId; + + /** + * Am I running? + * reflects invocation of start()/stop() + * !locked & start() => alive + * stop() => !alive + */ + protected volatile boolean alive = false; + + /** + * Am I locked? + * reflects invocation of lock()/unlock() operations + * locked => !alive (but not in the other direction necessarily) + * locked => !offer, !run, !start, !stop (but this last one is obvious + * since locked => !alive) + */ + protected volatile boolean locked = false; + + /** + * message bus publisher + */ + protected BusPublisher publisher; + + /** + * constructor for abstract sink + * + * @param servers servers + * @param topic topic + * @param apiKey api secret + * @param apiSecret api secret + * @param partitionId partition id + * @throws IllegalArgumentException in invalid parameters are passed in + */ + public InlineBusTopicSink(List<String> servers, String topic, + String apiKey, String apiSecret, String partitionId) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret); + + if (partitionId == null || partitionId.isEmpty()) { + this.partitionId = UUID.randomUUID ().toString(); + } + } + + /** + * Initialize the Bus publisher + */ + public abstract void init(); + + /** + * {@inheritDoc} + */ + @Override + public boolean start() throws IllegalStateException { + + if (logger.isInfoEnabled()) + logger.info("START: " + this); + + synchronized(this) { + + if (this.alive) + return true; + + if (locked) + throw new IllegalStateException(this + " is locked."); + + this.alive = true; + } + + this.init(); + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() { + + BusPublisher publisherCopy; + synchronized(this) { + this.alive = false; + publisherCopy = this.publisher; + this.publisher = null; + } + + if (publisherCopy != null) { + try { + publisherCopy.close(); + } catch (Exception e) { + logger.warn(MessageCodes.EXCEPTION_ERROR, e, "PUBLISHER.CLOSE", this.toString()); + e.printStackTrace(); + } + } else { + logger.warn("No publisher to close: " + this); + return false; + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean lock() { + + if (logger.isInfoEnabled()) + logger.info("LOCK: " + this); + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + return this.stop(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean unlock() { + + if (logger.isInfoEnabled()) + logger.info("UNLOCK: " + this); + + synchronized(this) { + if (!this.locked) + return true; + + this.locked = false; + } + + try { + return this.start(); + } catch (Exception e) { + logger.warn("can't start after unlocking " + this + + " because of " + e.getMessage()); + e.printStackTrace(); + return false; + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocked() { + return this.locked; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAlive() { + return this.alive; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String message) throws IllegalArgumentException, IllegalStateException { + + if (message == null || message.isEmpty()) { + throw new IllegalArgumentException("Message to send is empty"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + try { + synchronized (this) { + this.recentEvents.add(message); + } + + if (networkLogger.isInfoEnabled()) { + networkLogger.info("[OUT|" + this.getTopicCommInfrastructure() + "|" + + this.topic + "]:" + + message); + } + + publisher.send(this.partitionId, message); + } catch (Exception e) { + logger.error("can't start after unlocking " + this + + " because of " + e.getMessage()); + e.printStackTrace(); + return false; + } + + return true; + } + + + /** + * {@inheritDoc} + */ + @Override + public void setPartitionKey(String partitionKey) { + this.partitionId = partitionKey; + } + + /** + * {@inheritDoc} + */ + @Override + public String getPartitionKey() { + return this.partitionId; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IllegalStateException { + this.stop(); + } + + /** + * {@inheritDoc} + */ + @Override + public abstract CommInfrastructure getTopicCommInfrastructure(); + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java new file mode 100644 index 00000000..417c6d47 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; + +/** + * This implementation publishes events for the associated DMAAP topic, + * inline with the calling thread. + */ +public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { + + protected static Logger logger = + FlexLogger.getLogger(InlineDmaapTopicSink.class); + + protected final String userName; + protected final String password; + + public InlineDmaapTopicSink(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String partitionKey) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, partitionKey); + + this.userName = userName; + this.password = password; + } + + + @Override + public void init() { + this.publisher = + new BusPublisher.DmaapPublisherWrapper(this.servers, + this.topic, + this.userName, + this.password); + if (logger.isInfoEnabled()) + logger.info("DMAAP SINK TOPIC created " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.DMAAP; + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password) + .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=") + .append(super.toString()).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java new file mode 100644 index 00000000..2d4b1552 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.UebTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; + +/** + * This implementation publishes events for the associated UEB topic, + * inline with the calling thread. + */ +public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { + + /** + * logger + */ + private static org.openecomp.policy.common.logging.flexlogger.Logger logger = + FlexLogger.getLogger(InlineUebTopicSink.class); + + /** + * Argument-based UEB Topic Writer instantiation + * + * @param servers list of UEB servers available for publishing + * @param topic the topic to publish to + * @param apiKey the api key (optional) + * @param apiSecret the api secret (optional) + * @param partitionId the partition key (optional, autogenerated if not provided) + * + * @throws IllegalArgumentException if invalid arguments are detected + */ + public InlineUebTopicSink(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String partitionId) + throws IllegalArgumentException { + super(servers, topic, apiKey, apiSecret, partitionId); + } + + /** + * Instantiation of internal resources + */ + @Override + public void init() { + + this.publisher = + new BusPublisher.CambriaPublisherWrapper(this.servers, + this.topic, + this.apiKey, + this.apiSecret); + if (logger.isInfoEnabled()) + logger.info("UEB SINK TOPIC created " + this); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.UEB; + } +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java new file mode 100644 index 00000000..f37c349e --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -0,0 +1,477 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import org.openecomp.policy.drools.event.comm.TopicListener; +import org.openecomp.policy.drools.event.comm.bus.BusTopicSource; +import org.openecomp.policy.common.logging.eelf.MessageCodes; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; + +/** + * This topic source implementation specializes in reading messages + * over a bus topic source and notifying its listeners + */ +public abstract class SingleThreadedBusTopicSource + extends BusTopicBase + implements Runnable, BusTopicSource { + + private String className = SingleThreadedBusTopicSource.class.getName(); + /** + * Not to be converted to PolicyLogger. + * This will contain all instract /out traffic and only that in a single file in a concise format. + */ + protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER); + + /** + * Bus consumer group + */ + protected final String consumerGroup; + + /** + * Bus consumer instance + */ + protected final String consumerInstance; + + /** + * Bus fetch timeout + */ + protected final int fetchTimeout; + + /** + * Bus fetch limit + */ + protected final int fetchLimit; + + /** + * Message Bus Consumer + */ + protected BusConsumer consumer; + + /** + * Am I running? + * reflects invocation of start()/stop() + * !locked & start() => alive + * stop() => !alive + */ + protected volatile boolean alive = false; + + /** + * Am I locked? + * reflects invocation of lock()/unlock() operations + * locked => !alive (but not in the other direction necessarily) + * locked => !offer, !run, !start, !stop (but this last one is obvious + * since locked => !alive) + */ + protected volatile boolean locked = false; + + /** + * Independent thread reading message over my topic + */ + protected Thread busPollerThread; + + /** + * All my subscribers for new message notifications + */ + protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>(); + + /** + * + * @param servers Bus servers + * @param topic Bus Topic to be monitored + * @param apiKey Bus API Key (optional) + * @param apiSecret Bus API Secret (optional) + * @param consumerGroup Bus Reader Consumer Group + * @param consumerInstance Bus Reader Instance + * @param fetchTimeout Bus fetch timeout + * @param fetchLimit Bus fetch limit + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedBusTopicSource(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret); + + if (consumerGroup == null || consumerGroup.isEmpty()) { + this.consumerGroup = UUID.randomUUID ().toString(); + } else { + this.consumerGroup = consumerGroup; + } + + if (consumerInstance == null || consumerInstance.isEmpty()) { + this.consumerInstance = DEFAULT_CONSUMER_INSTANCE; + } else { + this.consumerInstance = consumerInstance; + } + + if (fetchTimeout <= 0) { + this.fetchTimeout = NO_TIMEOUT_MS_FETCH; + } else { + this.fetchTimeout = fetchTimeout; + } + + if (fetchLimit <= 0) { + this.fetchLimit = NO_LIMIT_FETCH; + } else { + this.fetchLimit = fetchLimit; + } + } + + /** + * Initialize the Bus client + */ + public abstract void init() throws Exception; + + /** + * {@inheritDoc} + */ + @Override + public void register(TopicListener topicListener) + throws IllegalArgumentException { + + PolicyLogger.info(className,"REGISTER: " + topicListener + " INTO " + this); + + synchronized(this) { + if (topicListener == null) + throw new IllegalArgumentException("TopicListener must be provided"); + + /* check that this listener is not registered already */ + for (TopicListener listener: this.topicListeners) { + if (listener == topicListener) { + // already registered + return; + } + } + + this.topicListeners.add(topicListener); + } + + try { + this.start(); + } catch (Exception e) { + PolicyLogger.info(className, "new registration of " + topicListener + + ",but can't start source because of " + e.getMessage()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void unregister(TopicListener topicListener) { + + PolicyLogger.info(className, "UNREGISTER: " + topicListener + " FROM " + this); + + boolean stop = false; + synchronized (this) { + if (topicListener == null) + throw new IllegalArgumentException("TopicListener must be provided"); + + this.topicListeners.remove(topicListener); + stop = (this.topicListeners.isEmpty()); + } + + if (stop) { + this.stop(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean lock() { + PolicyLogger.info(className, "LOCK: " + this); + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + return this.stop(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean unlock() { + PolicyLogger.info(className, "UNLOCK: " + this); + + synchronized(this) { + if (!this.locked) + return true; + + this.locked = false; + } + + try { + return this.start(); + } catch (Exception e) { + PolicyLogger.warn("can't start after unlocking " + this + + " because of " + e.getMessage()); + return false; + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean start() throws IllegalStateException { + + PolicyLogger.info(className, "START: " + this); + + synchronized(this) { + + if (alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + if (this.busPollerThread == null || + !this.busPollerThread.isAlive() || + this.consumer == null) { + + try { + this.init(); + this.alive = true; + this.busPollerThread = new Thread(this); + this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic()); + busPollerThread.start(); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e); + } + } + } + + return this.alive; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() { + PolicyLogger.info(className, "STOP: " + this); + + synchronized(this) { + BusConsumer consumerCopy = this.consumer; + + this.alive = false; + this.consumer = null; + + if (consumerCopy != null) { + try { + consumerCopy.close(); + } catch (Exception e) { + PolicyLogger.warn(MessageCodes.EXCEPTION_ERROR, e, "CONSUMER.CLOSE", this.toString()); + } + } + } + + Thread.yield(); + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocked() { + return this.locked; + } + + /** + * broadcast event to all listeners + * + * @param message the event + * @return true if all notifications are performed with no error, false otherwise + */ + protected boolean broadcast(String message) { + + /* take a snapshot of listeners */ + List<TopicListener> snapshotListeners = this.snapshotTopicListeners(); + + boolean success = true; + for (TopicListener topicListener: snapshotListeners) { + try { + topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); + } catch (Exception e) { + PolicyLogger.warn(this.className, "ERROR notifying " + topicListener.toString() + + " because of " + e.getMessage() + " @ " + this.toString()); + success = false; + } + } + return success; + } + + /** + * take a snapshot of current topic listeners + * + * @return the topic listeners + */ + protected synchronized List<TopicListener> snapshotTopicListeners() { + @SuppressWarnings("unchecked") + List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone(); + return listeners; + } + + /** + * Run thread method for the Bus Reader + */ + @Override + public void run() { + while (this.alive) { + try { + for (String event: this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + if (networkLogger.isInfoEnabled()) { + networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + + this.topic + "]:" + + event); + } + + PolicyLogger.info(className, this.topic + " <-- " + event); + broadcast(event); + + if (!this.alive) + break; + } + } catch (Exception e) { + PolicyLogger.error( MessageCodes.EXCEPTION_ERROR, className, e, "CONSUMER.FETCH", this.toString()); + } + } + + PolicyLogger.warn(this.className, "Exiting: " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean offer(String event) { + PolicyLogger.info(className, "OFFER: " + event + " TO " + this); + + if (!this.alive) { + throw new IllegalStateException(this + " is not alive."); + } + + synchronized (this) { + this.recentEvents.add(event); + } + + if (networkLogger.isInfoEnabled()) { + networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + + this.topic + "]:" + + event); + } + + + return broadcast(event); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup) + .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout) + .append(", fetchLimit=").append(fetchLimit) + .append(", consumer=").append(this.consumer).append(", alive=") + .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread) + .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString()) + .append("]"); + return builder.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAlive() { + return alive; + } + + /** + * {@inheritDoc} + */ + @Override + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * {@inheritDoc} + */ + @Override + public String getConsumerInstance() { + return consumerInstance; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IllegalStateException { + this.stop(); + this.topicListeners.clear(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getFetchTimeout() { + return fetchTimeout; + } + + /** + * {@inheritDoc} + */ + @Override + public int getFetchLimit() { + return fetchLimit; + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java new file mode 100644 index 00000000..e65d44a7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -0,0 +1,120 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; + +/** + * This topic reader implementation specializes in reading messages + * over DMAAP topic and notifying its listeners + */ +public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource + implements DmaapTopicSource, Runnable { + + protected final String userName; + protected final String password; + private String className = SingleThreadedDmaapTopicSource.class.getName(); + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedDmaapTopicSource(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws IllegalArgumentException { + + + super(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + this.userName = userName; + this.password = password; + + try { + this.init(); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException(e); + } + } + + + /** + * Initialize the Cambria or MR Client + */ + @Override + public void init() throws Exception { + + if (this.userName == null || this.userName.isEmpty() || + this.password == null || this.password.isEmpty()) { + this.consumer = + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit); + } else { + this.consumer = + new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.userName, this.password, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit); + } + + PolicyLogger.info(className, "CREATION: " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.DMAAP; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=") + .append((password == null || password.isEmpty()) ? "-" : password.length()) + .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java new file mode 100644 index 00000000..edb55c75 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.UebTopicSource; + +/** + * This topic source implementation specializes in reading messages + * over an UEB Bus topic source and notifying its listeners + */ +public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource + implements UebTopicSource { + + /** + * + * @param servers UEB servers + * @param topic UEB Topic to be monitored + * @param apiKey UEB API Key (optional) + * @param apiSecret UEB API Secret (optional) + * @param consumerGroup UEB Reader Consumer Group + * @param consumerInstance UEB Reader Instance + * @param fetchTimeout UEB fetch timeout + * @param fetchLimit UEB fetch limit + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedUebTopicSource(List<String> servers, String topic, + String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + + this.init(); + } + + /** + * Initialize the Cambria client + */ + @Override + public void init() { + this.consumer = + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.UEB; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=") + .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + +} |