aboutsummaryrefslogtreecommitdiffstats
path: root/event-client-dmaap/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'event-client-dmaap/src/main/java')
-rw-r--r--event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java201
-rw-r--r--event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java281
-rw-r--r--event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java41
3 files changed, 523 insertions, 0 deletions
diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java
new file mode 100644
index 0000000..dab85dc
--- /dev/null
+++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java
@@ -0,0 +1,201 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+/**
+ *
+ */
+package org.onap.aai.event.client;
+
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.Properties;
+import javax.naming.OperationNotSupportedException;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.MessageWithOffset;
+
+/**
+ * Event Bus Client consumer API that uses AAF authentication with Username/Password.
+ */
+public class DMaaPEventConsumer implements EventConsumer {
+
+ public static final String DEFAULT_TRANSPORT_TYPE = "HTTPAAF";
+ public static final String DEFAULT_PROTOCOL = "http";
+ public static final int DEFAULT_MESSAGE_WAIT_TIMEOUT = 15000;
+ public static final int DEFAULT_MESSAGE_LIMIT = 1000;
+
+ private static final String OFFSET_UNSUPPORTED = "DMaaP does not support consuming with offsets.";
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(DMaaPEventConsumer.class);
+
+ public interface MRConsumerFactory {
+ public MRConsumerImpl createConsumer(List<String> hosts, String topic, String consumerGroup, String consumerId,
+ int timeoutMs, int messageLimit, String filter, String username, String password)
+ throws MalformedURLException;
+ }
+
+ private static MRConsumerFactory consumerFactory = MRConsumerImpl::new;
+
+ private MRConsumerImpl consumer;
+
+ /**
+ * Replace the consumer factory (intended to be used for testing purposes only).
+ *
+ * @param consumerFactory
+ */
+ static void setConsumerFactory(MRConsumerFactory consumerFactory) {
+ DMaaPEventConsumer.consumerFactory = consumerFactory;
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to consume from
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param consumerGroup The consumer group to consume from
+ * @param consumerId The consumer ID of the client
+ * @param timeoutMs Time in ms to wait for messages on the server before returning
+ * @param messageLimit Maximum number of messages that is returned per fetch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ * @param protocol The http protocol to use (http/https)
+ * @param filter A customizable message filter, or null if no filtering is required
+ * @throws MalformedURLException
+ */
+ public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup,
+ String consumerId, int timeoutMs, int messageLimit, String transportType, String protocol, String filter)
+ throws MalformedURLException {
+ consumer = consumerFactory.createConsumer(MRConsumerImpl.stringToList(host), topic, consumerGroup, consumerId,
+ timeoutMs, messageLimit, filter, username, password);
+ consumer.setHost(host);
+ consumer.setUsername(username);
+ consumer.setPassword(password);
+ consumer.setProtocolFlag(transportType);
+
+ // MRConsumerImpl still needs extra properties from the prop object.
+ Properties extraProps = new Properties();
+ extraProps.put("Protocol", protocol);
+ consumer.setProps(extraProps);
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to consume from
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param consumerGroup The consumer group to consume from
+ * @param consumerId The consumer ID of the client
+ * @param timeoutMs Time in ms to wait for messages on the server before returning
+ * @param messageLimit Maximum number of messages that is returned per fetch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ * @throws MalformedURLException
+ */
+ public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup,
+ String consumerId, int timeoutMs, int messageLimit, String transportType) throws MalformedURLException {
+ this(host, topic, username, password, consumerGroup, consumerId, timeoutMs, messageLimit, transportType,
+ DEFAULT_PROTOCOL, null);
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to consume from
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param consumerGroup The consumer group to consume from
+ * @param consumerId The consumer ID of the client
+ * @throws MalformedURLException
+ */
+ public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup,
+ String consumerId) throws MalformedURLException {
+ this(host, topic, username, password, consumerGroup, consumerId, DEFAULT_MESSAGE_WAIT_TIMEOUT,
+ DEFAULT_MESSAGE_LIMIT, DEFAULT_TRANSPORT_TYPE, DEFAULT_PROTOCOL, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#consumeAndCommit()
+ */
+ @Override
+ public Iterable<String> consumeAndCommit() throws Exception {
+ log.debug("Querying Event Bus for messages.");
+ return consumer.fetch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#consume()
+ */
+ @Override
+ public Iterable<String> consume() throws Exception {
+ return consumeAndCommit();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#consumeWithOffsets()
+ */
+ @Override
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
+ throw new OperationNotSupportedException(OFFSET_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#commitOffsets()
+ */
+ @Override
+ public void commitOffsets() throws Exception {
+ throw new OperationNotSupportedException(OFFSET_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#commitOffsets(long)
+ */
+ @Override
+ public void commitOffsets(long offset) throws Exception {
+ throw new OperationNotSupportedException(OFFSET_UNSUPPORTED);
+ }
+
+}
diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java
new file mode 100644
index 0000000..4a8a83b
--- /dev/null
+++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java
@@ -0,0 +1,281 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client;
+
+import com.att.nsa.mr.client.MRPublisher.message;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
+
+/**
+ * Event Bus Client publisher API that uses AAF authentication with Username/Password.
+ */
+public class DMaaPEventPublisher implements EventPublisher {
+
+ public static final String DEFAULT_TRANSPORT_TYPE = "HTTPAAF";
+ public static final String DEFAULT_PROTOCOL = "http";
+ public static final String DEFAULT_CONTENT_TYPE = "application/json";
+ public static final int DEFAULT_BATCH_SIZE = 100;
+ public static final long DEFAULT_BATCH_AGE = 250;
+ public static final int DEFAULT_BATCH_DELAY = 50;
+ public static final String DEFAULT_PARTITION = "0";
+ public static final int CLOSE_TIMEOUT = 20;
+
+ private static final String ASYNC_UNSUPPORTED =
+ "This implementation of EventPublisher does not support async mode.";
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(DMaaPEventPublisher.class);
+
+ public interface MRPublisherFactory {
+ public MRSimplerBatchPublisher createPublisher(String host, String topic, int maxBatchSize, long maxAgeMs,
+ int delayBetweenBatchesMs);
+ }
+
+ private static MRPublisherFactory publisherFactory = DMaaPEventPublisher::createMRSimplerBatchPublisher;
+
+ private MRSimplerBatchPublisher publisher;
+
+ /**
+ * Replace the publisher factory (intended to be used for testing purposes only).
+ *
+ * @param publisherFactory
+ */
+ static void setPublisherFactory(MRPublisherFactory publisherFactory) {
+ DMaaPEventPublisher.publisherFactory = publisherFactory;
+ }
+
+ /**
+ * Provide the default factory method so that test code is able to restore this functionality.
+ *
+ * @return the default publisher factory implementation
+ */
+ static MRPublisherFactory getPublisherFactory() {
+ return publisherFactory;
+ }
+
+ /**
+ * Creates a new instance of MRSimplerBatchPublisher using the supplied parameters.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The name of the topic to which messages are published
+ * @param maxBatchSize The maximum batch size for each send operation
+ * @param maxAgeMs The maximum age of each batch before sending
+ * @param delayBetweenBatchesMs Time to wait between sending each batch
+ * @return a new MRSimplerBatchPublisher object
+ */
+ private static MRSimplerBatchPublisher createMRSimplerBatchPublisher(String host, String topic, int maxBatchSize,
+ long maxAgeMs, int delayBetweenBatchesMs) {
+ return new MRSimplerBatchPublisher.Builder().againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic)
+ .batchTo(maxBatchSize, maxAgeMs).httpThreadTime(delayBetweenBatchesMs).build();
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventPublisher using supplied parameters.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to publish to
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param maxBatchSize The maximum batch size for each send
+ * @param maxAgeMs The max age of each batch in ms before sending
+ * @param delayBetweenBatchesMs Time in ms to wait between sending each batch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ * @param protocol The http protocol to use (http/https)
+ * @param contentType The content-type request header value (e.g. application/json)
+ */
+ public DMaaPEventPublisher(String host, String topic, String username, String password, int maxBatchSize,
+ long maxAgeMs, int delayBetweenBatchesMs, String transportType, String protocol, String contentType) {
+ publisher = getPublisherFactory().createPublisher(host, topic, maxBatchSize, maxAgeMs, delayBetweenBatchesMs);
+ publisher.setUsername(username);
+ publisher.setPassword(password);
+ publisher.setProtocolFlag(transportType);
+
+ // MRSimplerBatchPublisher still needs extra properties from the prop object.
+ Properties extraProps = new Properties();
+ extraProps.put("Protocol", protocol);
+ extraProps.put("contenttype", contentType);
+ publisher.setProps(extraProps);
+ }
+
+ /**
+ * Creates a new instance of DMaapEventPublisherAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to publish to
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param maxBatchSize The maximum batch size for each send
+ * @param maxAgeMs The max age of each batch in ms before sending
+ * @param delayBetweenBatchesMs Time in ms to wait between sending each batch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ */
+ public DMaaPEventPublisher(String host, String topic, String username, String password, int maxBatchSize,
+ long maxAgeMs, int delayBetweenBatchesMs, String transportType) {
+ this(host, topic, username, password, maxBatchSize, maxAgeMs, delayBetweenBatchesMs, transportType,
+ DEFAULT_PROTOCOL, DEFAULT_CONTENT_TYPE);
+ }
+
+ /**
+ * Creates a new instance of DMaapEventPublisherAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to publish to
+ * @param username The username of the client application
+ * @param password The password for the username
+ */
+ public DMaaPEventPublisher(String host, String topic, String username, String password) {
+ this(host, topic, username, password, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_AGE, DEFAULT_BATCH_DELAY,
+ DEFAULT_TRANSPORT_TYPE, DEFAULT_PROTOCOL, DEFAULT_CONTENT_TYPE);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#close()
+ */
+ @Override
+ public void close() throws Exception {
+ publisher.close(CLOSE_TIMEOUT, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Close the publisher and return a list of unsent messages.
+ *
+ * @return a list of unsent messages.
+ * @throws Exception
+ */
+ public List<String> closeWithUnsent() throws Exception {
+ return publisher.close(CLOSE_TIMEOUT, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String)
+ */
+ @Override
+ public void sendAsync(String message) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.util.Collection)
+ */
+ @Override
+ public void sendAsync(Collection<String> messages) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String, java.lang.String)
+ */
+ @Override
+ public void sendAsync(String partition, String message) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String, java.util.Collection)
+ */
+ @Override
+ public void sendAsync(String partition, Collection<String> messages) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String)
+ */
+ @Override
+ public int sendSync(String message) throws Exception {
+ return sendSync(DEFAULT_PARTITION, message);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.util.Collection)
+ */
+ @Override
+ public int sendSync(Collection<String> messages) throws Exception {
+ return sendSync(DEFAULT_PARTITION, messages);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String, java.lang.String)
+ */
+ @Override
+ public int sendSync(String partition, String message) throws Exception {
+ log.debug("Publishing message on partition " + partition + ": " + message);
+
+ publisher.getProps().put("partition", partition);
+ return publisher.send(partition, message);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String, java.util.Collection)
+ */
+ @Override
+ public int sendSync(String partition, Collection<String> messages) throws Exception {
+ log.debug("Publishing " + messages.size() + " messages on partition " + partition);
+
+ publisher.getProps().put("partition", partition);
+
+ Collection<message> dmaapMessages = new ArrayList<>();
+ for (String message : messages) {
+ dmaapMessages.add(new message(partition, message));
+ }
+ return publisher.send(dmaapMessages);
+ }
+
+}
diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java
new file mode 100644
index 0000000..16cc739
--- /dev/null
+++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java
@@ -0,0 +1,41 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client.logging;
+
+import com.att.eelf.i18n.EELFResourceManager;
+import org.onap.aai.cl.eelf.LogMessageEnum;
+
+/**
+ * Logger messages
+ */
+public enum ApplicationMsgs implements LogMessageEnum {
+
+ MESSAGE_ERROR, MESSAGE_INFO;
+
+ /**
+ * Static initializer to ensure the resource bundles for this class are loaded...
+ */
+ static {
+ EELFResourceManager.loadMessageBundle("resources");
+ }
+}