summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com>2018-01-29 15:00:04 +0000
committerGilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com>2018-01-30 10:24:18 +0000
commit4150ee34ae503c83734aca7e62ca9911b354c881 (patch)
tree6598a1d16483eb480664b8862c86ba801af6d661
parent4e6e8b2714f3e5fe6d6dc064d9d73275c5ee5437 (diff)
Initial code submit for Event Bus Client library
Change-Id: If0ff82c669872c734ebe0c72bc022beb96c53d11 Issue-ID: AAI-700 Signed-off-by: Gilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com>
-rw-r--r--License.txt20
-rw-r--r--README.txt3
-rw-r--r--event-client-api/pom.xml9
-rw-r--r--event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java67
-rw-r--r--event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java111
-rw-r--r--event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java41
-rw-r--r--event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java40
-rw-r--r--event-client-dmaap/pom.xml52
-rw-r--r--event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java201
-rw-r--r--event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java281
-rw-r--r--event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java41
-rw-r--r--event-client-dmaap/src/main/resources/logback.xml27
-rw-r--r--event-client-dmaap/src/main/resources/resources.properties38
-rw-r--r--event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java81
-rw-r--r--event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java118
-rw-r--r--event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java42
-rw-r--r--event-client-kafka/pom.xml31
-rw-r--r--event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java143
-rw-r--r--event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java185
-rw-r--r--event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java94
-rw-r--r--event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java77
-rw-r--r--pom.xml130
22 files changed, 1832 insertions, 0 deletions
diff --git a/License.txt b/License.txt
new file mode 100644
index 0000000..3452576
--- /dev/null
+++ b/License.txt
@@ -0,0 +1,20 @@
+============LICENSE_START=======================================================
+org.onap.aai
+================================================================================
+Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+Copyright © 2017 European Software Marketing Ltd.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+
+ECOMP is a trademark and service mark of AT&T Intellectual Property. \ No newline at end of file
diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..e18dc50
--- /dev/null
+++ b/README.txt
@@ -0,0 +1,3 @@
+ONAP Event Bus Client Library
+
+This is a shared library that will be used by ONAP components to publish and consume events on the ONAP Event Bus. \ No newline at end of file
diff --git a/event-client-api/pom.xml b/event-client-api/pom.xml
new file mode 100644
index 0000000..25ddcc8
--- /dev/null
+++ b/event-client-api/pom.xml
@@ -0,0 +1,9 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-service</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>event-client-api</artifactId>
+</project> \ No newline at end of file
diff --git a/event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java b/event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java
new file mode 100644
index 0000000..95effaf
--- /dev/null
+++ b/event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java
@@ -0,0 +1,67 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.api;
+
+public interface EventConsumer {
+
+ /**
+ * Fetches any relevant messages currently on the Event Bus, according to the configuration and manages the offsets
+ * for the client.
+ *
+ * @return List of messages fetched from the Event Bus.
+ * @throws Exception
+ */
+ public Iterable<String> consumeAndCommit() throws Exception;
+
+
+ /**
+ * Fetches any relevant messages currently on the Event Bus, according to the configuration and expects the client
+ * to explicitly call the {@link #commitOffsets()} in order to commit the offsets
+ *
+ * @return List of messages fetched from the Event Bus.
+ * @throws Exception
+ */
+ public Iterable<String> consume() throws Exception;
+
+ /**
+ * Fetches any relevant messages currently on the Event Bus with their offsets, according to the configuration and
+ * expects the client to explicitly call the {@link #commitOffsets()} in order to commit the offsets
+ *
+ * @throws Exception
+ */
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception;
+
+ /**
+ * Commits the offsets of the previously consumed messages from the Event Bus
+ *
+ * @throws Exception if the offsets could not be committed
+ */
+ public void commitOffsets() throws Exception;
+
+ /**
+ * Commits the offsets of messages consumed up to offset
+ *
+ * @throws Exception if the offsets could not be committed
+ */
+ public void commitOffsets(long offset) throws Exception;
+}
diff --git a/event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java b/event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java
new file mode 100644
index 0000000..a5de093
--- /dev/null
+++ b/event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java
@@ -0,0 +1,111 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.api;
+
+import java.util.Collection;
+
+public interface EventPublisher {
+
+ /**
+ * Publishes a message using the supplied partition key, using the parameters from the constructor.
+ *
+ * @param partitionKey The partition to publish the message on.
+ * @param message The String message to publish.
+ * @return The number of messages successfully sent
+ * @throws Exception
+ */
+ public int sendSync(String partitionKey, String message) throws Exception;
+
+ /**
+ * Publishes a message using the supplied partition key, using the parameters from the constructor.
+ *
+ * @param partitionKey The partition to publish the messages on.
+ * @param messages A Collection of messages to publish.
+ * @return The number of messages successfully sent
+ * @throws Exception
+ */
+ public int sendSync(String partitionKey, Collection<String> messages) throws Exception;
+
+ /**
+ * Publishes a message using the parameters from the constructor.
+ *
+ * @param message The String message to publish.
+ * @return The number of messages successfully sent
+ * @throws Exception
+ */
+ public int sendSync(String message) throws Exception;
+
+ /**
+ * Publishes a message using the parameters from the constructor.
+ *
+ * @param messages A Collection of messages to publish.
+ * @return The number of messages successfully sent
+ * @throws Exception
+ */
+ public int sendSync(Collection<String> messages) throws Exception;
+
+ /**
+ * Publishes a message using the supplied partition key, using the parameters from the constructor. The Async method
+ * returns immediately without caring if the message was properly published or not.
+ *
+ * @param partitionKey The partition to publish the message on.
+ * @param message The String message to publish.
+ * @throws Exception
+ */
+ public void sendAsync(String partitionKey, String message) throws Exception;
+
+ /**
+ * Publishes a message using the supplied partition key, using the parameters from the constructor. The Async method
+ * returns immediately without caring if the message was properly published or not.
+ *
+ * @param partitionKey The partition to publish the messages on.
+ * @param messages A Collection of messages to publish.
+ * @throws Exception
+ */
+ public void sendAsync(String partitionKey, Collection<String> messages) throws Exception;
+
+ /**
+ * Publishes a message using the parameters from the constructor. The Async method returns immediately without
+ * caring if the message was properly published or not.
+ *
+ * @param message The String message to publish.
+ * @throws Exception
+ */
+ public void sendAsync(String message) throws Exception;
+
+ /**
+ * Publishes a message using the parameters from the constructor. The Async method returns immediately without
+ * caring if the message was properly published or not.
+ *
+ * @param messages A Collection of messages to publish.
+ * @throws Exception
+ */
+ public void sendAsync(Collection<String> messages) throws Exception;
+
+ /**
+ * Closes the publisher.
+ */
+ public void close() throws Exception;
+
+
+}
diff --git a/event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java b/event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java
new file mode 100644
index 0000000..54972c9
--- /dev/null
+++ b/event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java
@@ -0,0 +1,41 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.api;
+
+public class MessageWithOffset {
+ private final long offset;
+ private final String message;
+
+ public MessageWithOffset(long offset, String message) {
+ this.offset = offset;
+ this.message = message;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java b/event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java
new file mode 100644
index 0000000..6565c73
--- /dev/null
+++ b/event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java
@@ -0,0 +1,40 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.api;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+public class TestMessageWithOffset {
+
+ @Test
+ public void testConstructor() {
+ MessageWithOffset msg = new MessageWithOffset(0L, "");
+ assertThat(msg.getOffset(), is(equalTo(0L)));
+ assertThat(msg.getMessage(), is(equalTo("")));
+ }
+
+}
diff --git a/event-client-dmaap/pom.xml b/event-client-dmaap/pom.xml
new file mode 100644
index 0000000..5134b67
--- /dev/null
+++ b/event-client-dmaap/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-service</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>event-client-dmaap</artifactId>
+
+ <properties>
+ <common.logging.version>1.2.0</common.logging.version>
+ <dmaap.client.version>1.1.0</dmaap.client.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
+ <artifactId>dmaapClient</artifactId>
+ <version>${dmaap.client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.logging-service</groupId>
+ <artifactId>common-logging</artifactId>
+ <version>${common.logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project> \ No newline at end of file
diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java
new file mode 100644
index 0000000..dab85dc
--- /dev/null
+++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java
@@ -0,0 +1,201 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+/**
+ *
+ */
+package org.onap.aai.event.client;
+
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.Properties;
+import javax.naming.OperationNotSupportedException;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.MessageWithOffset;
+
+/**
+ * Event Bus Client consumer API that uses AAF authentication with Username/Password.
+ */
+public class DMaaPEventConsumer implements EventConsumer {
+
+ public static final String DEFAULT_TRANSPORT_TYPE = "HTTPAAF";
+ public static final String DEFAULT_PROTOCOL = "http";
+ public static final int DEFAULT_MESSAGE_WAIT_TIMEOUT = 15000;
+ public static final int DEFAULT_MESSAGE_LIMIT = 1000;
+
+ private static final String OFFSET_UNSUPPORTED = "DMaaP does not support consuming with offsets.";
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(DMaaPEventConsumer.class);
+
+ public interface MRConsumerFactory {
+ public MRConsumerImpl createConsumer(List<String> hosts, String topic, String consumerGroup, String consumerId,
+ int timeoutMs, int messageLimit, String filter, String username, String password)
+ throws MalformedURLException;
+ }
+
+ private static MRConsumerFactory consumerFactory = MRConsumerImpl::new;
+
+ private MRConsumerImpl consumer;
+
+ /**
+ * Replace the consumer factory (intended to be used for testing purposes only).
+ *
+ * @param consumerFactory
+ */
+ static void setConsumerFactory(MRConsumerFactory consumerFactory) {
+ DMaaPEventConsumer.consumerFactory = consumerFactory;
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to consume from
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param consumerGroup The consumer group to consume from
+ * @param consumerId The consumer ID of the client
+ * @param timeoutMs Time in ms to wait for messages on the server before returning
+ * @param messageLimit Maximum number of messages that is returned per fetch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ * @param protocol The http protocol to use (http/https)
+ * @param filter A customizable message filter, or null if no filtering is required
+ * @throws MalformedURLException
+ */
+ public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup,
+ String consumerId, int timeoutMs, int messageLimit, String transportType, String protocol, String filter)
+ throws MalformedURLException {
+ consumer = consumerFactory.createConsumer(MRConsumerImpl.stringToList(host), topic, consumerGroup, consumerId,
+ timeoutMs, messageLimit, filter, username, password);
+ consumer.setHost(host);
+ consumer.setUsername(username);
+ consumer.setPassword(password);
+ consumer.setProtocolFlag(transportType);
+
+ // MRConsumerImpl still needs extra properties from the prop object.
+ Properties extraProps = new Properties();
+ extraProps.put("Protocol", protocol);
+ consumer.setProps(extraProps);
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to consume from
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param consumerGroup The consumer group to consume from
+ * @param consumerId The consumer ID of the client
+ * @param timeoutMs Time in ms to wait for messages on the server before returning
+ * @param messageLimit Maximum number of messages that is returned per fetch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ * @throws MalformedURLException
+ */
+ public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup,
+ String consumerId, int timeoutMs, int messageLimit, String transportType) throws MalformedURLException {
+ this(host, topic, username, password, consumerGroup, consumerId, timeoutMs, messageLimit, transportType,
+ DEFAULT_PROTOCOL, null);
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to consume from
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param consumerGroup The consumer group to consume from
+ * @param consumerId The consumer ID of the client
+ * @throws MalformedURLException
+ */
+ public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup,
+ String consumerId) throws MalformedURLException {
+ this(host, topic, username, password, consumerGroup, consumerId, DEFAULT_MESSAGE_WAIT_TIMEOUT,
+ DEFAULT_MESSAGE_LIMIT, DEFAULT_TRANSPORT_TYPE, DEFAULT_PROTOCOL, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#consumeAndCommit()
+ */
+ @Override
+ public Iterable<String> consumeAndCommit() throws Exception {
+ log.debug("Querying Event Bus for messages.");
+ return consumer.fetch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#consume()
+ */
+ @Override
+ public Iterable<String> consume() throws Exception {
+ return consumeAndCommit();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#consumeWithOffsets()
+ */
+ @Override
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
+ throw new OperationNotSupportedException(OFFSET_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#commitOffsets()
+ */
+ @Override
+ public void commitOffsets() throws Exception {
+ throw new OperationNotSupportedException(OFFSET_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventConsumer#commitOffsets(long)
+ */
+ @Override
+ public void commitOffsets(long offset) throws Exception {
+ throw new OperationNotSupportedException(OFFSET_UNSUPPORTED);
+ }
+
+}
diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java
new file mode 100644
index 0000000..4a8a83b
--- /dev/null
+++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java
@@ -0,0 +1,281 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client;
+
+import com.att.nsa.mr.client.MRPublisher.message;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
+
+/**
+ * Event Bus Client publisher API that uses AAF authentication with Username/Password.
+ */
+public class DMaaPEventPublisher implements EventPublisher {
+
+ public static final String DEFAULT_TRANSPORT_TYPE = "HTTPAAF";
+ public static final String DEFAULT_PROTOCOL = "http";
+ public static final String DEFAULT_CONTENT_TYPE = "application/json";
+ public static final int DEFAULT_BATCH_SIZE = 100;
+ public static final long DEFAULT_BATCH_AGE = 250;
+ public static final int DEFAULT_BATCH_DELAY = 50;
+ public static final String DEFAULT_PARTITION = "0";
+ public static final int CLOSE_TIMEOUT = 20;
+
+ private static final String ASYNC_UNSUPPORTED =
+ "This implementation of EventPublisher does not support async mode.";
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(DMaaPEventPublisher.class);
+
+ public interface MRPublisherFactory {
+ public MRSimplerBatchPublisher createPublisher(String host, String topic, int maxBatchSize, long maxAgeMs,
+ int delayBetweenBatchesMs);
+ }
+
+ private static MRPublisherFactory publisherFactory = DMaaPEventPublisher::createMRSimplerBatchPublisher;
+
+ private MRSimplerBatchPublisher publisher;
+
+ /**
+ * Replace the publisher factory (intended to be used for testing purposes only).
+ *
+ * @param publisherFactory
+ */
+ static void setPublisherFactory(MRPublisherFactory publisherFactory) {
+ DMaaPEventPublisher.publisherFactory = publisherFactory;
+ }
+
+ /**
+ * Provide the default factory method so that test code is able to restore this functionality.
+ *
+ * @return the default publisher factory implementation
+ */
+ static MRPublisherFactory getPublisherFactory() {
+ return publisherFactory;
+ }
+
+ /**
+ * Creates a new instance of MRSimplerBatchPublisher using the supplied parameters.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The name of the topic to which messages are published
+ * @param maxBatchSize The maximum batch size for each send operation
+ * @param maxAgeMs The maximum age of each batch before sending
+ * @param delayBetweenBatchesMs Time to wait between sending each batch
+ * @return a new MRSimplerBatchPublisher object
+ */
+ private static MRSimplerBatchPublisher createMRSimplerBatchPublisher(String host, String topic, int maxBatchSize,
+ long maxAgeMs, int delayBetweenBatchesMs) {
+ return new MRSimplerBatchPublisher.Builder().againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic)
+ .batchTo(maxBatchSize, maxAgeMs).httpThreadTime(delayBetweenBatchesMs).build();
+ }
+
+ /**
+ * Creates a new instance of DMaaPEventPublisher using supplied parameters.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to publish to
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param maxBatchSize The maximum batch size for each send
+ * @param maxAgeMs The max age of each batch in ms before sending
+ * @param delayBetweenBatchesMs Time in ms to wait between sending each batch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ * @param protocol The http protocol to use (http/https)
+ * @param contentType The content-type request header value (e.g. application/json)
+ */
+ public DMaaPEventPublisher(String host, String topic, String username, String password, int maxBatchSize,
+ long maxAgeMs, int delayBetweenBatchesMs, String transportType, String protocol, String contentType) {
+ publisher = getPublisherFactory().createPublisher(host, topic, maxBatchSize, maxAgeMs, delayBetweenBatchesMs);
+ publisher.setUsername(username);
+ publisher.setPassword(password);
+ publisher.setProtocolFlag(transportType);
+
+ // MRSimplerBatchPublisher still needs extra properties from the prop object.
+ Properties extraProps = new Properties();
+ extraProps.put("Protocol", protocol);
+ extraProps.put("contenttype", contentType);
+ publisher.setProps(extraProps);
+ }
+
+ /**
+ * Creates a new instance of DMaapEventPublisherAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to publish to
+ * @param username The username of the client application
+ * @param password The password for the username
+ * @param maxBatchSize The maximum batch size for each send
+ * @param maxAgeMs The max age of each batch in ms before sending
+ * @param delayBetweenBatchesMs Time in ms to wait between sending each batch
+ * @param transportType Specifies the request header type used in the request to DMaaP server.<br>
+ * Valid types:
+ * <li>DME2</li>
+ * <li>HTTPAAF</li>
+ * <li>HTTPAUTH</li>
+ * <li>HTTPNOAUTH</li>
+ */
+ public DMaaPEventPublisher(String host, String topic, String username, String password, int maxBatchSize,
+ long maxAgeMs, int delayBetweenBatchesMs, String transportType) {
+ this(host, topic, username, password, maxBatchSize, maxAgeMs, delayBetweenBatchesMs, transportType,
+ DEFAULT_PROTOCOL, DEFAULT_CONTENT_TYPE);
+ }
+
+ /**
+ * Creates a new instance of DMaapEventPublisherAAF using supplied parameters and default values.
+ *
+ * @param host The host and port of the DMaaP server in the format <b>host:port</b>
+ * @param topic The topic name to publish to
+ * @param username The username of the client application
+ * @param password The password for the username
+ */
+ public DMaaPEventPublisher(String host, String topic, String username, String password) {
+ this(host, topic, username, password, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_AGE, DEFAULT_BATCH_DELAY,
+ DEFAULT_TRANSPORT_TYPE, DEFAULT_PROTOCOL, DEFAULT_CONTENT_TYPE);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#close()
+ */
+ @Override
+ public void close() throws Exception {
+ publisher.close(CLOSE_TIMEOUT, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Close the publisher and return a list of unsent messages.
+ *
+ * @return a list of unsent messages.
+ * @throws Exception
+ */
+ public List<String> closeWithUnsent() throws Exception {
+ return publisher.close(CLOSE_TIMEOUT, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String)
+ */
+ @Override
+ public void sendAsync(String message) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.util.Collection)
+ */
+ @Override
+ public void sendAsync(Collection<String> messages) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String, java.lang.String)
+ */
+ @Override
+ public void sendAsync(String partition, String message) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String, java.util.Collection)
+ */
+ @Override
+ public void sendAsync(String partition, Collection<String> messages) throws Exception {
+ throw new UnsupportedOperationException(ASYNC_UNSUPPORTED);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String)
+ */
+ @Override
+ public int sendSync(String message) throws Exception {
+ return sendSync(DEFAULT_PARTITION, message);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.util.Collection)
+ */
+ @Override
+ public int sendSync(Collection<String> messages) throws Exception {
+ return sendSync(DEFAULT_PARTITION, messages);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String, java.lang.String)
+ */
+ @Override
+ public int sendSync(String partition, String message) throws Exception {
+ log.debug("Publishing message on partition " + partition + ": " + message);
+
+ publisher.getProps().put("partition", partition);
+ return publisher.send(partition, message);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String, java.util.Collection)
+ */
+ @Override
+ public int sendSync(String partition, Collection<String> messages) throws Exception {
+ log.debug("Publishing " + messages.size() + " messages on partition " + partition);
+
+ publisher.getProps().put("partition", partition);
+
+ Collection<message> dmaapMessages = new ArrayList<>();
+ for (String message : messages) {
+ dmaapMessages.add(new message(partition, message));
+ }
+ return publisher.send(dmaapMessages);
+ }
+
+}
diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java
new file mode 100644
index 0000000..16cc739
--- /dev/null
+++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java
@@ -0,0 +1,41 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client.logging;
+
+import com.att.eelf.i18n.EELFResourceManager;
+import org.onap.aai.cl.eelf.LogMessageEnum;
+
+/**
+ * Logger messages
+ */
+public enum ApplicationMsgs implements LogMessageEnum {
+
+ MESSAGE_ERROR, MESSAGE_INFO;
+
+ /**
+ * Static initializer to ensure the resource bundles for this class are loaded...
+ */
+ static {
+ EELFResourceManager.loadMessageBundle("resources");
+ }
+}
diff --git a/event-client-dmaap/src/main/resources/logback.xml b/event-client-dmaap/src/main/resources/logback.xml
new file mode 100644
index 0000000..5e5234c
--- /dev/null
+++ b/event-client-dmaap/src/main/resources/logback.xml
@@ -0,0 +1,27 @@
+<configuration>
+ <property name="logDirectory" value="logs/" />
+
+ <appender name="event-bus-client" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <File>${logDirectory}/application.log</File>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${logDirectory}/application.log.%d{yyyy-MM-dd}</fileNamePattern>
+ </rollingPolicy>
+ <encoder>
+ <pattern>%d %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="com.att.eelf" level="INFO" additivity="false">
+ <appender-ref ref="event-bus-client"/>
+ </logger>
+
+ <!-- DEBUG < INFO < WARN < ERROR -->
+ <logger name="event-bus-client" level="INFO" additivity="false">
+ <appender-ref ref="event-bus-client"/>
+ </logger>
+
+ <root level="INFO">
+ <appender-ref ref="event-bus-client"/>
+ </root>
+
+</configuration> \ No newline at end of file
diff --git a/event-client-dmaap/src/main/resources/resources.properties b/event-client-dmaap/src/main/resources/resources.properties
new file mode 100644
index 0000000..b8cd27a
--- /dev/null
+++ b/event-client-dmaap/src/main/resources/resources.properties
@@ -0,0 +1,38 @@
+#Resource key=Error Code|Message text|Resolution text |Description text
+#######
+#Newlines can be utilized to add some clarity ensuring continuing line
+#has atleast one leading space
+#ResourceKey=\
+# ERR0000E\
+# Sample error msg txt\
+# Sample resolution msg\
+# Sample description txt
+#
+######
+#Error code classification category
+#100 Permission errors
+#200 Availability errors/Timeouts
+#300 Data errors
+#400 Schema Interface type/validation errors
+#500 Business process errors
+#900 Unknown errors
+#
+########################################################################
+
+MESSAGE_INFO=\
+ |\
+ {0}|\
+ |\
+ |\
+
+MESSAGE_ERROR=\
+ |\
+ {0}|\
+ |\
+ |\
+
+MESSAGE_WARN=\
+ |\
+ {0}|\
+ |\
+ |\ \ No newline at end of file
diff --git a/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java
new file mode 100644
index 0000000..9298a35
--- /dev/null
+++ b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java
@@ -0,0 +1,81 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client;
+
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import java.net.MalformedURLException;
+import javax.naming.OperationNotSupportedException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDMaaPEventConsumer {
+
+ @Mock
+ public MRConsumerImpl mockDMaaPConsumer;
+
+ @Before
+ public void init() throws Exception {
+ DMaaPEventConsumer
+ .setConsumerFactory((hosts, topic, g, id, timeout, limit, f, user, pass) -> mockDMaaPConsumer);
+ }
+
+ @Test
+ public void testConstructors() throws MalformedURLException {
+ createConsumerWithDefaults();
+ new DMaaPEventConsumer("", "", "", "", "", "", 0, 0, "");
+ }
+
+ @Test
+ public void consumeZeroRecords() throws Exception {
+ DMaaPEventConsumer consumer = createConsumerWithDefaults();
+ consumer.consume();
+ consumer.consumeAndCommit();
+ }
+
+ @Test(expected = OperationNotSupportedException.class)
+ public void consumeWithOffsets() throws Exception {
+ DMaaPEventConsumer consumer = createConsumerWithDefaults();
+ consumer.consumeWithOffsets();
+ }
+
+ @Test(expected = OperationNotSupportedException.class)
+ public void commitOffsets() throws Exception {
+ DMaaPEventConsumer consumer = createConsumerWithDefaults();
+ consumer.commitOffsets();
+ }
+
+ @Test(expected = OperationNotSupportedException.class)
+ public void commitOffsetsLong() throws Exception {
+ DMaaPEventConsumer consumer = createConsumerWithDefaults();
+ consumer.commitOffsets(0);
+ }
+
+ private DMaaPEventConsumer createConsumerWithDefaults() throws MalformedURLException {
+ return new DMaaPEventConsumer("", "", "", "", "", "");
+ }
+}
diff --git a/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java
new file mode 100644
index 0000000..fc71f30
--- /dev/null
+++ b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java
@@ -0,0 +1,118 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client;
+
+import com.att.nsa.mr.client.MRPublisher.message;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.onap.aai.event.client.DMaaPEventPublisher.MRPublisherFactory;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDMaaPEventPublisher {
+
+ private static MRPublisherFactory defaultFactory = DMaaPEventPublisher.getPublisherFactory();
+
+ @Mock
+ public MRSimplerBatchPublisher mockDmaapPublisher;
+
+ @After
+ public void restorePublisherFactory() {
+ DMaaPEventPublisher.setPublisherFactory(defaultFactory);
+ }
+
+ public void mockPublisherFactory() {
+ DMaaPEventPublisher.setPublisherFactory((host, topic, size, age, delay) -> mockDmaapPublisher);
+ }
+
+ @Test
+ public void testDefaultFactory() throws MalformedURLException {
+ createPublisherWithDefaults();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreatePublisherWithoutHost() throws MalformedURLException {
+ new DMaaPEventPublisher("", "topic", "", "");
+ }
+
+ @Test
+ public void testConstructors() throws MalformedURLException {
+ createMockedPublisher();
+ new DMaaPEventPublisher("", "", "", "", 0, 0, 0, "");
+ }
+
+ @Test
+ public void publishSynchronous() throws Exception {
+ Mockito.when(mockDmaapPublisher.getProps()).thenReturn(new Properties());
+ message message = new message("partition", "message");
+ Mockito.when(mockDmaapPublisher.close(Mockito.anyInt(), Mockito.any())).thenReturn(Arrays.asList(message));
+ DMaaPEventPublisher publisher = createMockedPublisher();
+ publisher.sendSync("");
+ publisher.sendSync(Arrays.asList(""));
+ publisher.sendSync("key", "");
+ publisher.sendSync("key", Arrays.asList(""));
+ publisher.close();
+ publisher.closeWithUnsent();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void sendAsync() throws Exception {
+ DMaaPEventPublisher publisher = createMockedPublisher();
+ publisher.sendAsync("");
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void sendAsyncMultiple() throws Exception {
+ DMaaPEventPublisher publisher = createMockedPublisher();
+ publisher.sendAsync(Arrays.asList(""));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void sendAsyncWithPartition() throws Exception {
+ DMaaPEventPublisher publisher = createMockedPublisher();
+ publisher.sendAsync("partition", "");
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void sendAsyncWithPartitionMultiple() throws Exception {
+ DMaaPEventPublisher publisher = createMockedPublisher();
+ publisher.sendAsync("partition", Arrays.asList(""));
+ }
+
+ private DMaaPEventPublisher createPublisherWithDefaults() throws MalformedURLException {
+ return new DMaaPEventPublisher("host", "topic", "", "");
+ }
+
+ private DMaaPEventPublisher createMockedPublisher() throws MalformedURLException {
+ mockPublisherFactory();
+ return createPublisherWithDefaults();
+ }
+}
diff --git a/event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java b/event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java
new file mode 100644
index 0000000..12054b3
--- /dev/null
+++ b/event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java
@@ -0,0 +1,42 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client.logging;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+public class TestApplicationMsgs {
+
+ /**
+ * This test is used to ensure that the static initializer for ApplicationMsgs is invoked.
+ */
+ @Test
+ public void accessEnumValues() {
+ assertThat(ApplicationMsgs.values().length, is(not(0)));
+ }
+
+}
diff --git a/event-client-kafka/pom.xml b/event-client-kafka/pom.xml
new file mode 100644
index 0000000..75469b9
--- /dev/null
+++ b/event-client-kafka/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-service</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>event-client-kafka</artifactId>
+
+ <properties>
+ <common.logging.version>1.2.0</common.logging.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.logging-service</groupId>
+ <artifactId>common-logging</artifactId>
+ <version>${common.logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+</project> \ No newline at end of file
diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
new file mode 100644
index 0000000..e4da20a
--- /dev/null
+++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
@@ -0,0 +1,143 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.MessageWithOffset;
+
+/**
+ * Event Bus Client consumer API for Kafka Implementation .Its a wrapper around KafkaConsumer which is NOT thread safe.
+ * The KafkaConsumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer
+ * after use will leak these connections Ref : https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/
+ * KafkaConsumer.html
+ *
+ */
+public class KafkaEventConsumer implements EventConsumer {
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventConsumer.class);
+
+ public interface KafkaConsumerFactory {
+ public KafkaConsumer<String, String> createConsumer(Properties props);
+ }
+
+ private static KafkaConsumerFactory consumerFactory = KafkaConsumer::new;
+
+ private final KafkaConsumer<String, String> consumer;
+
+ /**
+ * Replace the consumer factory (intended to be used for testing purposes only).
+ *
+ * @param consumerFactory
+ */
+ static void setConsumerFactory(KafkaConsumerFactory consumerFactory) {
+ KafkaEventConsumer.consumerFactory = consumerFactory;
+ }
+
+ /**
+ *
+ * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
+ * client will make use of all servers irrespective of which servers are specified here for
+ * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list
+ * should be in the form host1:port1,host2:port2,....
+ * @param topic - Topic to consume the messages from
+ * @param groupId - A unique string that identifies the consumer group this consumer belongs to
+ */
+ public KafkaEventConsumer(String hosts, String topic, String groupId) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+ // Set this property, if auto commit should happen.
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumer = consumerFactory.createConsumer(props);
+ consumer.subscribe(Arrays.asList(topic));
+ }
+
+ public void close() {
+ consumer.close();
+ }
+
+ /**
+ *
+ */
+ @Override
+ public Iterable<String> consume() throws Exception {
+ log.debug("Querying Kafka for messages");
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ List<String> list = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : records) {
+ list.add(record.value());
+ }
+ return list;
+
+ }
+
+ @Override
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
+ log.debug("Querying Kafka for messages");
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ List<MessageWithOffset> list = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : records) {
+ list.add(new MessageWithOffset(record.offset(), record.value()));
+ }
+ return list;
+ }
+
+ @Override
+ public Iterable<String> consumeAndCommit() throws Exception {
+ Iterable<String> result = consume();
+ consumer.commitSync();
+ return result;
+ }
+
+ @Override
+ public void commitOffsets() throws Exception {
+ consumer.commitSync();
+ }
+
+ @Override
+ public void commitOffsets(long offset) throws Exception {
+ Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
+ offsetsMap.put(consumer.assignment().iterator().next(), new OffsetAndMetadata(offset));
+ consumer.commitSync(offsetsMap);
+ }
+
+}
diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java
new file mode 100644
index 0000000..c05d5c5
--- /dev/null
+++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java
@@ -0,0 +1,185 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client;
+
+import java.util.Collection;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
+
+/**
+ * Event Bus Client publisher implementation for Kafka .A KafkaProducer that publishes records to the Kafka cluster is
+ * thread safe and sharing a single producer instance across threads will generally be faster than having multiple
+ * instances. Ref :https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/
+ * producer/KafkaProducer.html
+ *
+ *
+ */
+public class KafkaEventPublisher implements EventPublisher {
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventPublisher.class);
+
+ public interface KafkaProducerFactory {
+ public KafkaProducer<String, String> createProducer(Properties props);
+ }
+
+ private static final String PUBLISHING = "Publishing ";
+
+ private static KafkaProducerFactory producerFactory = KafkaProducer::new;
+
+ private final KafkaProducer<String, String> producer;
+ private final String topic;
+
+ /**
+ * Replace the producer factory (intended to be used for testing purposes only).
+ *
+ * @param producerFactory
+ */
+ static void setProducerFactory(KafkaProducerFactory producerFactory) {
+ KafkaEventPublisher.producerFactory = producerFactory;
+ }
+
+ /**
+ *
+ * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
+ * client will make use of all servers irrespective of which servers are specified here for
+ * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list
+ * should be in the form host1:port1,host2:port2,....
+ * @param topic - Topic to publish the messages to
+ * @param bufferMemory - The total bytes of memory the producer can use to buffer records waiting to be sent to the
+ * server
+ * @param batchSize - The producer will attempt to batch records together into fewer requests whenever multiple
+ * records are being sent to the same partition. This helps performance on both the client and the server.
+ * This configuration controls the default batch size in bytes
+ * @param retries -Setting a value greater than zero will cause the client to resend any record whose send fails
+ * with a potentially transient error. Note that this retry is no different than if the client resent the
+ * record upon receiving the error.
+ */
+ public KafkaEventPublisher(String hosts, String topic, long bufferMemory, int batchSize, int retries) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producer = producerFactory.createProducer(props);
+ this.topic = topic;
+
+ }
+
+ /**
+ *
+ * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
+ * client will make use of all servers irrespective of which servers are specified here for
+ * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list
+ * should be in the form host1:port1,host2:port2,....
+ * @param topic - Topic to publish the messages to
+ */
+ public KafkaEventPublisher(String hosts, String topic) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producer = producerFactory.createProducer(props);
+ this.topic = topic;
+ }
+
+ /**
+ * Closes the publisher.
+ */
+ @Override
+ public void close() {
+ producer.close();
+ }
+
+ @Override
+ public int sendSync(String partitionKey, String message) throws Exception {
+ log.debug("Publishing message on partitionKey " + partitionKey + ": " + message);
+ producer.send(new ProducerRecord<String, String>(topic, partitionKey, message)).get();
+ return 1;
+ }
+
+ @Override
+ public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey);
+ for (String message : messages) {
+ sendSync(partitionKey, message);
+ }
+ return messages.size();
+ }
+
+ @Override
+ public int sendSync(String message) throws Exception {
+ log.debug("Publishing message : " + message);
+ producer.send(new ProducerRecord<String, String>(topic, message)).get();
+ return 1;
+ }
+
+ @Override
+ public int sendSync(Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages ");
+ for (String message : messages) {
+ sendSync(message);
+ }
+ return messages.size();
+ }
+
+ @Override
+ public void sendAsync(String partitionKey, String message) throws Exception {
+ log.debug("Publishing message on partitionKey " + partitionKey + ": " + message);
+ producer.send(new ProducerRecord<String, String>(topic, partitionKey, message));
+
+ }
+
+ @Override
+ public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey);
+ for (String message : messages) {
+ sendAsync(partitionKey, message);
+ }
+
+ }
+
+ @Override
+ public void sendAsync(String message) throws Exception {
+ log.debug("Publishing message : " + message);
+ producer.send(new ProducerRecord<String, String>(topic, message));
+
+ }
+
+ @Override
+ public void sendAsync(Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages ");
+ for (String message : messages) {
+ sendAsync(message);
+ }
+
+ }
+
+}
diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java
new file mode 100644
index 0000000..cd9993b
--- /dev/null
+++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java
@@ -0,0 +1,94 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestKafkaEventConsumer {
+
+ @Mock
+ public KafkaConsumer<String, String> mockKafkaConsumer;
+
+ @Before
+ public void init() throws Exception {
+ KafkaEventConsumer.setConsumerFactory(props -> mockKafkaConsumer);
+ }
+
+ @Test
+ public void testConstructor() {
+ new KafkaEventConsumer("", "", "");
+ }
+
+ @Test
+ public void consumeZeroRecords() throws Exception {
+ Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(ConsumerRecords.empty());
+ KafkaEventConsumer consumer = new KafkaEventConsumer("", "", "");
+ consumer.consume();
+ consumer.consumeWithOffsets();
+ consumer.consumeAndCommit();
+ consumer.close();
+ }
+
+ @Test
+ public void consumeMultipleRecords() throws Exception {
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ records.put(new TopicPartition(null, 0),
+ Arrays.asList(new ConsumerRecord<String, String>("topic", 0, 0, "key", "value")));
+ Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords<>(records));
+ KafkaEventConsumer consumer = new KafkaEventConsumer("", "", "");
+ consumer.consume();
+ consumer.consumeWithOffsets();
+ consumer.consumeAndCommit();
+ consumer.close();
+ }
+
+ @Test
+ public void commitOffsets() throws Exception {
+ List<TopicPartition> partitionsList = Arrays.asList(new TopicPartition(null, 0));
+ Set<TopicPartition> partitionsSet = Collections.unmodifiableSet(new HashSet<TopicPartition>(partitionsList));
+ Mockito.when(mockKafkaConsumer.assignment()).thenReturn(partitionsSet);
+ KafkaEventConsumer consumer = new KafkaEventConsumer("", "", "");
+ consumer.commitOffsets();
+ consumer.commitOffsets(0L);
+ consumer.close();
+ }
+
+}
diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java
new file mode 100644
index 0000000..8cb1dec
--- /dev/null
+++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java
@@ -0,0 +1,77 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client;
+
+import java.util.Arrays;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestKafkaEventPublisher {
+
+ @Mock
+ public KafkaProducer<String, String> mockKafkaProducer;
+
+ @Mock
+ private Future<RecordMetadata> mockedFuture;
+
+ @Before
+ public void init() throws Exception {
+ KafkaEventPublisher.setProducerFactory(props -> mockKafkaProducer);
+ }
+
+ @Test
+ public void testConstructors() {
+ new KafkaEventPublisher("hosts", "topic");
+ new KafkaEventPublisher("hosts", "topic", 0, 0, 0);
+ }
+
+ @Test
+ public void publishSynchronous() throws Exception {
+ Mockito.when(mockKafkaProducer.send(Mockito.any())).thenReturn(mockedFuture);
+ KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic");
+ publisher.sendSync("");
+ publisher.sendSync(Arrays.asList(""));
+ publisher.sendSync("key", "");
+ publisher.sendSync("key", Arrays.asList(""));
+ publisher.close();
+ }
+
+ @Test
+ public void publishAsynchronous() throws Exception {
+ KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic");
+ publisher.sendAsync("");
+ publisher.sendAsync(Arrays.asList(""));
+ publisher.sendAsync("key", "");
+ publisher.sendAsync("key", Arrays.asList(""));
+ publisher.close();
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..03e98b3
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,130 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.oparent</groupId>
+ <artifactId>oparent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-service</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <sitePath>/content/sites/site/org/onap/aai/${project.artifactId}/${project.version}</sitePath>
+ </properties>
+
+ <modules>
+ <module>event-client-api</module>
+ <module>event-client-dmaap</module>
+ <module>event-client-kafka</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-release-plugin</artifactId>
+ <version>2.4.2</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.scm</groupId>
+ <artifactId>maven-scm-provider-gitexe</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <groupId>com.mycila</groupId>
+ <artifactId>license-maven-plugin</artifactId>
+ <version>3.0</version>
+ <configuration>
+ <header>License.txt</header>
+ <includes>
+ <include>src/main/java/**</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>format</goal>
+ </goals>
+ <phase>process-sources</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <nexusUrl>${onap.nexus.url}</nexusUrl>
+ <stagingProfileId>176c31dfe190a</stagingProfileId>
+ <serverId>ecomp-staging</serverId>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>sonar-maven-plugin</artifactId>
+ <version>3.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <configuration>
+ <dumpOnExit>true</dumpOnExit>
+ </configuration>
+ <executions>
+ <execution>
+ <id>jacoco-initialize-unit-tests</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ <configuration>
+ <destFile>${project.build.directory}/coverage-reports/jacoco.exec</destFile>
+ <!-- <append>true</append> -->
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <distributionManagement>
+ <site>
+ <id>ecomp-javadoc</id>
+ <url>dav:${onap.nexus.url}${sitePath}</url>
+ </site>
+ </distributionManagement>
+
+</project> \ No newline at end of file