diff options
author | Gilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com> | 2018-01-29 15:00:04 +0000 |
---|---|---|
committer | Gilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com> | 2018-01-30 10:24:18 +0000 |
commit | 4150ee34ae503c83734aca7e62ca9911b354c881 (patch) | |
tree | 6598a1d16483eb480664b8862c86ba801af6d661 | |
parent | 4e6e8b2714f3e5fe6d6dc064d9d73275c5ee5437 (diff) |
Initial code submit for Event Bus Client library
Change-Id: If0ff82c669872c734ebe0c72bc022beb96c53d11
Issue-ID: AAI-700
Signed-off-by: Gilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com>
22 files changed, 1832 insertions, 0 deletions
diff --git a/License.txt b/License.txt new file mode 100644 index 0000000..3452576 --- /dev/null +++ b/License.txt @@ -0,0 +1,20 @@ +============LICENSE_START======================================================= +org.onap.aai +================================================================================ +Copyright © 2017 AT&T Intellectual Property. All rights reserved. +Copyright © 2017 European Software Marketing Ltd. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= + +ECOMP is a trademark and service mark of AT&T Intellectual Property.
\ No newline at end of file diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..e18dc50 --- /dev/null +++ b/README.txt @@ -0,0 +1,3 @@ +ONAP Event Bus Client Library + +This is a shared library that will be used by ONAP components to publish and consume events on the ONAP Event Bus.
\ No newline at end of file diff --git a/event-client-api/pom.xml b/event-client-api/pom.xml new file mode 100644 index 0000000..25ddcc8 --- /dev/null +++ b/event-client-api/pom.xml @@ -0,0 +1,9 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.aai.event</groupId> + <artifactId>event-client-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>event-client-api</artifactId> +</project>
\ No newline at end of file diff --git a/event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java b/event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java new file mode 100644 index 0000000..95effaf --- /dev/null +++ b/event-client-api/src/main/java/org/onap/aai/event/api/EventConsumer.java @@ -0,0 +1,67 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.api; + +public interface EventConsumer { + + /** + * Fetches any relevant messages currently on the Event Bus, according to the configuration and manages the offsets + * for the client. + * + * @return List of messages fetched from the Event Bus. + * @throws Exception + */ + public Iterable<String> consumeAndCommit() throws Exception; + + + /** + * Fetches any relevant messages currently on the Event Bus, according to the configuration and expects the client + * to explicitly call the {@link #commitOffsets()} in order to commit the offsets + * + * @return List of messages fetched from the Event Bus. + * @throws Exception + */ + public Iterable<String> consume() throws Exception; + + /** + * Fetches any relevant messages currently on the Event Bus with their offsets, according to the configuration and + * expects the client to explicitly call the {@link #commitOffsets()} in order to commit the offsets + * + * @throws Exception + */ + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception; + + /** + * Commits the offsets of the previously consumed messages from the Event Bus + * + * @throws Exception if the offsets could not be committed + */ + public void commitOffsets() throws Exception; + + /** + * Commits the offsets of messages consumed up to offset + * + * @throws Exception if the offsets could not be committed + */ + public void commitOffsets(long offset) throws Exception; +} diff --git a/event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java b/event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java new file mode 100644 index 0000000..a5de093 --- /dev/null +++ b/event-client-api/src/main/java/org/onap/aai/event/api/EventPublisher.java @@ -0,0 +1,111 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.api; + +import java.util.Collection; + +public interface EventPublisher { + + /** + * Publishes a message using the supplied partition key, using the parameters from the constructor. + * + * @param partitionKey The partition to publish the message on. + * @param message The String message to publish. + * @return The number of messages successfully sent + * @throws Exception + */ + public int sendSync(String partitionKey, String message) throws Exception; + + /** + * Publishes a message using the supplied partition key, using the parameters from the constructor. + * + * @param partitionKey The partition to publish the messages on. + * @param messages A Collection of messages to publish. + * @return The number of messages successfully sent + * @throws Exception + */ + public int sendSync(String partitionKey, Collection<String> messages) throws Exception; + + /** + * Publishes a message using the parameters from the constructor. + * + * @param message The String message to publish. + * @return The number of messages successfully sent + * @throws Exception + */ + public int sendSync(String message) throws Exception; + + /** + * Publishes a message using the parameters from the constructor. + * + * @param messages A Collection of messages to publish. + * @return The number of messages successfully sent + * @throws Exception + */ + public int sendSync(Collection<String> messages) throws Exception; + + /** + * Publishes a message using the supplied partition key, using the parameters from the constructor. The Async method + * returns immediately without caring if the message was properly published or not. + * + * @param partitionKey The partition to publish the message on. + * @param message The String message to publish. + * @throws Exception + */ + public void sendAsync(String partitionKey, String message) throws Exception; + + /** + * Publishes a message using the supplied partition key, using the parameters from the constructor. The Async method + * returns immediately without caring if the message was properly published or not. + * + * @param partitionKey The partition to publish the messages on. + * @param messages A Collection of messages to publish. + * @throws Exception + */ + public void sendAsync(String partitionKey, Collection<String> messages) throws Exception; + + /** + * Publishes a message using the parameters from the constructor. The Async method returns immediately without + * caring if the message was properly published or not. + * + * @param message The String message to publish. + * @throws Exception + */ + public void sendAsync(String message) throws Exception; + + /** + * Publishes a message using the parameters from the constructor. The Async method returns immediately without + * caring if the message was properly published or not. + * + * @param messages A Collection of messages to publish. + * @throws Exception + */ + public void sendAsync(Collection<String> messages) throws Exception; + + /** + * Closes the publisher. + */ + public void close() throws Exception; + + +} diff --git a/event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java b/event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java new file mode 100644 index 0000000..54972c9 --- /dev/null +++ b/event-client-api/src/main/java/org/onap/aai/event/api/MessageWithOffset.java @@ -0,0 +1,41 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.api; + +public class MessageWithOffset { + private final long offset; + private final String message; + + public MessageWithOffset(long offset, String message) { + this.offset = offset; + this.message = message; + } + + public long getOffset() { + return offset; + } + + public String getMessage() { + return message; + } +} diff --git a/event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java b/event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java new file mode 100644 index 0000000..6565c73 --- /dev/null +++ b/event-client-api/src/test/java/org/onap/aai/event/api/TestMessageWithOffset.java @@ -0,0 +1,40 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.api; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +public class TestMessageWithOffset { + + @Test + public void testConstructor() { + MessageWithOffset msg = new MessageWithOffset(0L, ""); + assertThat(msg.getOffset(), is(equalTo(0L))); + assertThat(msg.getMessage(), is(equalTo(""))); + } + +} diff --git a/event-client-dmaap/pom.xml b/event-client-dmaap/pom.xml new file mode 100644 index 0000000..5134b67 --- /dev/null +++ b/event-client-dmaap/pom.xml @@ -0,0 +1,52 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.aai.event</groupId> + <artifactId>event-client-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>event-client-dmaap</artifactId> + + <properties> + <common.logging.version>1.2.0</common.logging.version> + <dmaap.client.version>1.1.0</dmaap.client.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId> + <artifactId>dmaapClient</artifactId> + <version>${dmaap.client.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.onap.aai.logging-service</groupId> + <artifactId>common-logging</artifactId> + <version>${common.logging.version}</version> + </dependency> + <dependency> + <groupId>org.onap.aai.event</groupId> + <artifactId>event-client-api</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.0.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> +</project>
\ No newline at end of file diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java new file mode 100644 index 0000000..dab85dc --- /dev/null +++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventConsumer.java @@ -0,0 +1,201 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +/** + * + */ +package org.onap.aai.event.client; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import java.net.MalformedURLException; +import java.util.List; +import java.util.Properties; +import javax.naming.OperationNotSupportedException; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.MessageWithOffset; + +/** + * Event Bus Client consumer API that uses AAF authentication with Username/Password. + */ +public class DMaaPEventConsumer implements EventConsumer { + + public static final String DEFAULT_TRANSPORT_TYPE = "HTTPAAF"; + public static final String DEFAULT_PROTOCOL = "http"; + public static final int DEFAULT_MESSAGE_WAIT_TIMEOUT = 15000; + public static final int DEFAULT_MESSAGE_LIMIT = 1000; + + private static final String OFFSET_UNSUPPORTED = "DMaaP does not support consuming with offsets."; + + private static Logger log = LoggerFactory.getInstance().getLogger(DMaaPEventConsumer.class); + + public interface MRConsumerFactory { + public MRConsumerImpl createConsumer(List<String> hosts, String topic, String consumerGroup, String consumerId, + int timeoutMs, int messageLimit, String filter, String username, String password) + throws MalformedURLException; + } + + private static MRConsumerFactory consumerFactory = MRConsumerImpl::new; + + private MRConsumerImpl consumer; + + /** + * Replace the consumer factory (intended to be used for testing purposes only). + * + * @param consumerFactory + */ + static void setConsumerFactory(MRConsumerFactory consumerFactory) { + DMaaPEventConsumer.consumerFactory = consumerFactory; + } + + /** + * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The topic name to consume from + * @param username The username of the client application + * @param password The password for the username + * @param consumerGroup The consumer group to consume from + * @param consumerId The consumer ID of the client + * @param timeoutMs Time in ms to wait for messages on the server before returning + * @param messageLimit Maximum number of messages that is returned per fetch + * @param transportType Specifies the request header type used in the request to DMaaP server.<br> + * Valid types: + * <li>DME2</li> + * <li>HTTPAAF</li> + * <li>HTTPAUTH</li> + * <li>HTTPNOAUTH</li> + * @param protocol The http protocol to use (http/https) + * @param filter A customizable message filter, or null if no filtering is required + * @throws MalformedURLException + */ + public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup, + String consumerId, int timeoutMs, int messageLimit, String transportType, String protocol, String filter) + throws MalformedURLException { + consumer = consumerFactory.createConsumer(MRConsumerImpl.stringToList(host), topic, consumerGroup, consumerId, + timeoutMs, messageLimit, filter, username, password); + consumer.setHost(host); + consumer.setUsername(username); + consumer.setPassword(password); + consumer.setProtocolFlag(transportType); + + // MRConsumerImpl still needs extra properties from the prop object. + Properties extraProps = new Properties(); + extraProps.put("Protocol", protocol); + consumer.setProps(extraProps); + } + + /** + * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters and default values. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The topic name to consume from + * @param username The username of the client application + * @param password The password for the username + * @param consumerGroup The consumer group to consume from + * @param consumerId The consumer ID of the client + * @param timeoutMs Time in ms to wait for messages on the server before returning + * @param messageLimit Maximum number of messages that is returned per fetch + * @param transportType Specifies the request header type used in the request to DMaaP server.<br> + * Valid types: + * <li>DME2</li> + * <li>HTTPAAF</li> + * <li>HTTPAUTH</li> + * <li>HTTPNOAUTH</li> + * @throws MalformedURLException + */ + public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup, + String consumerId, int timeoutMs, int messageLimit, String transportType) throws MalformedURLException { + this(host, topic, username, password, consumerGroup, consumerId, timeoutMs, messageLimit, transportType, + DEFAULT_PROTOCOL, null); + } + + /** + * Creates a new instance of DMaaPEventConsumerAAF using supplied parameters and default values. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The topic name to consume from + * @param username The username of the client application + * @param password The password for the username + * @param consumerGroup The consumer group to consume from + * @param consumerId The consumer ID of the client + * @throws MalformedURLException + */ + public DMaaPEventConsumer(String host, String topic, String username, String password, String consumerGroup, + String consumerId) throws MalformedURLException { + this(host, topic, username, password, consumerGroup, consumerId, DEFAULT_MESSAGE_WAIT_TIMEOUT, + DEFAULT_MESSAGE_LIMIT, DEFAULT_TRANSPORT_TYPE, DEFAULT_PROTOCOL, null); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventConsumer#consumeAndCommit() + */ + @Override + public Iterable<String> consumeAndCommit() throws Exception { + log.debug("Querying Event Bus for messages."); + return consumer.fetch(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventConsumer#consume() + */ + @Override + public Iterable<String> consume() throws Exception { + return consumeAndCommit(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventConsumer#consumeWithOffsets() + */ + @Override + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { + throw new OperationNotSupportedException(OFFSET_UNSUPPORTED); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventConsumer#commitOffsets() + */ + @Override + public void commitOffsets() throws Exception { + throw new OperationNotSupportedException(OFFSET_UNSUPPORTED); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventConsumer#commitOffsets(long) + */ + @Override + public void commitOffsets(long offset) throws Exception { + throw new OperationNotSupportedException(OFFSET_UNSUPPORTED); + } + +} diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java new file mode 100644 index 0000000..4a8a83b --- /dev/null +++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/DMaaPEventPublisher.java @@ -0,0 +1,281 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.client; + +import com.att.nsa.mr.client.MRPublisher.message; +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventPublisher; + +/** + * Event Bus Client publisher API that uses AAF authentication with Username/Password. + */ +public class DMaaPEventPublisher implements EventPublisher { + + public static final String DEFAULT_TRANSPORT_TYPE = "HTTPAAF"; + public static final String DEFAULT_PROTOCOL = "http"; + public static final String DEFAULT_CONTENT_TYPE = "application/json"; + public static final int DEFAULT_BATCH_SIZE = 100; + public static final long DEFAULT_BATCH_AGE = 250; + public static final int DEFAULT_BATCH_DELAY = 50; + public static final String DEFAULT_PARTITION = "0"; + public static final int CLOSE_TIMEOUT = 20; + + private static final String ASYNC_UNSUPPORTED = + "This implementation of EventPublisher does not support async mode."; + + private static Logger log = LoggerFactory.getInstance().getLogger(DMaaPEventPublisher.class); + + public interface MRPublisherFactory { + public MRSimplerBatchPublisher createPublisher(String host, String topic, int maxBatchSize, long maxAgeMs, + int delayBetweenBatchesMs); + } + + private static MRPublisherFactory publisherFactory = DMaaPEventPublisher::createMRSimplerBatchPublisher; + + private MRSimplerBatchPublisher publisher; + + /** + * Replace the publisher factory (intended to be used for testing purposes only). + * + * @param publisherFactory + */ + static void setPublisherFactory(MRPublisherFactory publisherFactory) { + DMaaPEventPublisher.publisherFactory = publisherFactory; + } + + /** + * Provide the default factory method so that test code is able to restore this functionality. + * + * @return the default publisher factory implementation + */ + static MRPublisherFactory getPublisherFactory() { + return publisherFactory; + } + + /** + * Creates a new instance of MRSimplerBatchPublisher using the supplied parameters. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The name of the topic to which messages are published + * @param maxBatchSize The maximum batch size for each send operation + * @param maxAgeMs The maximum age of each batch before sending + * @param delayBetweenBatchesMs Time to wait between sending each batch + * @return a new MRSimplerBatchPublisher object + */ + private static MRSimplerBatchPublisher createMRSimplerBatchPublisher(String host, String topic, int maxBatchSize, + long maxAgeMs, int delayBetweenBatchesMs) { + return new MRSimplerBatchPublisher.Builder().againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic) + .batchTo(maxBatchSize, maxAgeMs).httpThreadTime(delayBetweenBatchesMs).build(); + } + + /** + * Creates a new instance of DMaaPEventPublisher using supplied parameters. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The topic name to publish to + * @param username The username of the client application + * @param password The password for the username + * @param maxBatchSize The maximum batch size for each send + * @param maxAgeMs The max age of each batch in ms before sending + * @param delayBetweenBatchesMs Time in ms to wait between sending each batch + * @param transportType Specifies the request header type used in the request to DMaaP server.<br> + * Valid types: + * <li>DME2</li> + * <li>HTTPAAF</li> + * <li>HTTPAUTH</li> + * <li>HTTPNOAUTH</li> + * @param protocol The http protocol to use (http/https) + * @param contentType The content-type request header value (e.g. application/json) + */ + public DMaaPEventPublisher(String host, String topic, String username, String password, int maxBatchSize, + long maxAgeMs, int delayBetweenBatchesMs, String transportType, String protocol, String contentType) { + publisher = getPublisherFactory().createPublisher(host, topic, maxBatchSize, maxAgeMs, delayBetweenBatchesMs); + publisher.setUsername(username); + publisher.setPassword(password); + publisher.setProtocolFlag(transportType); + + // MRSimplerBatchPublisher still needs extra properties from the prop object. + Properties extraProps = new Properties(); + extraProps.put("Protocol", protocol); + extraProps.put("contenttype", contentType); + publisher.setProps(extraProps); + } + + /** + * Creates a new instance of DMaapEventPublisherAAF using supplied parameters and default values. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The topic name to publish to + * @param username The username of the client application + * @param password The password for the username + * @param maxBatchSize The maximum batch size for each send + * @param maxAgeMs The max age of each batch in ms before sending + * @param delayBetweenBatchesMs Time in ms to wait between sending each batch + * @param transportType Specifies the request header type used in the request to DMaaP server.<br> + * Valid types: + * <li>DME2</li> + * <li>HTTPAAF</li> + * <li>HTTPAUTH</li> + * <li>HTTPNOAUTH</li> + */ + public DMaaPEventPublisher(String host, String topic, String username, String password, int maxBatchSize, + long maxAgeMs, int delayBetweenBatchesMs, String transportType) { + this(host, topic, username, password, maxBatchSize, maxAgeMs, delayBetweenBatchesMs, transportType, + DEFAULT_PROTOCOL, DEFAULT_CONTENT_TYPE); + } + + /** + * Creates a new instance of DMaapEventPublisherAAF using supplied parameters and default values. + * + * @param host The host and port of the DMaaP server in the format <b>host:port</b> + * @param topic The topic name to publish to + * @param username The username of the client application + * @param password The password for the username + */ + public DMaaPEventPublisher(String host, String topic, String username, String password) { + this(host, topic, username, password, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_AGE, DEFAULT_BATCH_DELAY, + DEFAULT_TRANSPORT_TYPE, DEFAULT_PROTOCOL, DEFAULT_CONTENT_TYPE); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#close() + */ + @Override + public void close() throws Exception { + publisher.close(CLOSE_TIMEOUT, TimeUnit.SECONDS); + } + + /** + * Close the publisher and return a list of unsent messages. + * + * @return a list of unsent messages. + * @throws Exception + */ + public List<String> closeWithUnsent() throws Exception { + return publisher.close(CLOSE_TIMEOUT, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList()); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String) + */ + @Override + public void sendAsync(String message) throws Exception { + throw new UnsupportedOperationException(ASYNC_UNSUPPORTED); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.util.Collection) + */ + @Override + public void sendAsync(Collection<String> messages) throws Exception { + throw new UnsupportedOperationException(ASYNC_UNSUPPORTED); + + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String, java.lang.String) + */ + @Override + public void sendAsync(String partition, String message) throws Exception { + throw new UnsupportedOperationException(ASYNC_UNSUPPORTED); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendAsync(java.lang.String, java.util.Collection) + */ + @Override + public void sendAsync(String partition, Collection<String> messages) throws Exception { + throw new UnsupportedOperationException(ASYNC_UNSUPPORTED); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String) + */ + @Override + public int sendSync(String message) throws Exception { + return sendSync(DEFAULT_PARTITION, message); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendSync(java.util.Collection) + */ + @Override + public int sendSync(Collection<String> messages) throws Exception { + return sendSync(DEFAULT_PARTITION, messages); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String, java.lang.String) + */ + @Override + public int sendSync(String partition, String message) throws Exception { + log.debug("Publishing message on partition " + partition + ": " + message); + + publisher.getProps().put("partition", partition); + return publisher.send(partition, message); + } + + /* + * (non-Javadoc) + * + * @see org.onap.aai.event.api.EventPublisher#sendSync(java.lang.String, java.util.Collection) + */ + @Override + public int sendSync(String partition, Collection<String> messages) throws Exception { + log.debug("Publishing " + messages.size() + " messages on partition " + partition); + + publisher.getProps().put("partition", partition); + + Collection<message> dmaapMessages = new ArrayList<>(); + for (String message : messages) { + dmaapMessages.add(new message(partition, message)); + } + return publisher.send(dmaapMessages); + } + +} diff --git a/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java b/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java new file mode 100644 index 0000000..16cc739 --- /dev/null +++ b/event-client-dmaap/src/main/java/org/onap/aai/event/client/logging/ApplicationMsgs.java @@ -0,0 +1,41 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.client.logging; + +import com.att.eelf.i18n.EELFResourceManager; +import org.onap.aai.cl.eelf.LogMessageEnum; + +/** + * Logger messages + */ +public enum ApplicationMsgs implements LogMessageEnum { + + MESSAGE_ERROR, MESSAGE_INFO; + + /** + * Static initializer to ensure the resource bundles for this class are loaded... + */ + static { + EELFResourceManager.loadMessageBundle("resources"); + } +} diff --git a/event-client-dmaap/src/main/resources/logback.xml b/event-client-dmaap/src/main/resources/logback.xml new file mode 100644 index 0000000..5e5234c --- /dev/null +++ b/event-client-dmaap/src/main/resources/logback.xml @@ -0,0 +1,27 @@ +<configuration> + <property name="logDirectory" value="logs/" /> + + <appender name="event-bus-client" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <File>${logDirectory}/application.log</File> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${logDirectory}/application.log.%d{yyyy-MM-dd}</fileNamePattern> + </rollingPolicy> + <encoder> + <pattern>%d %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="com.att.eelf" level="INFO" additivity="false"> + <appender-ref ref="event-bus-client"/> + </logger> + + <!-- DEBUG < INFO < WARN < ERROR --> + <logger name="event-bus-client" level="INFO" additivity="false"> + <appender-ref ref="event-bus-client"/> + </logger> + + <root level="INFO"> + <appender-ref ref="event-bus-client"/> + </root> + +</configuration>
\ No newline at end of file diff --git a/event-client-dmaap/src/main/resources/resources.properties b/event-client-dmaap/src/main/resources/resources.properties new file mode 100644 index 0000000..b8cd27a --- /dev/null +++ b/event-client-dmaap/src/main/resources/resources.properties @@ -0,0 +1,38 @@ +#Resource key=Error Code|Message text|Resolution text |Description text +####### +#Newlines can be utilized to add some clarity ensuring continuing line +#has atleast one leading space +#ResourceKey=\ +# ERR0000E\ +# Sample error msg txt\ +# Sample resolution msg\ +# Sample description txt +# +###### +#Error code classification category +#100 Permission errors +#200 Availability errors/Timeouts +#300 Data errors +#400 Schema Interface type/validation errors +#500 Business process errors +#900 Unknown errors +# +######################################################################## + +MESSAGE_INFO=\ + |\ + {0}|\ + |\ + |\ + +MESSAGE_ERROR=\ + |\ + {0}|\ + |\ + |\ + +MESSAGE_WARN=\ + |\ + {0}|\ + |\ + |\
\ No newline at end of file diff --git a/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java new file mode 100644 index 0000000..9298a35 --- /dev/null +++ b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventConsumer.java @@ -0,0 +1,81 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import java.net.MalformedURLException; +import javax.naming.OperationNotSupportedException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestDMaaPEventConsumer { + + @Mock + public MRConsumerImpl mockDMaaPConsumer; + + @Before + public void init() throws Exception { + DMaaPEventConsumer + .setConsumerFactory((hosts, topic, g, id, timeout, limit, f, user, pass) -> mockDMaaPConsumer); + } + + @Test + public void testConstructors() throws MalformedURLException { + createConsumerWithDefaults(); + new DMaaPEventConsumer("", "", "", "", "", "", 0, 0, ""); + } + + @Test + public void consumeZeroRecords() throws Exception { + DMaaPEventConsumer consumer = createConsumerWithDefaults(); + consumer.consume(); + consumer.consumeAndCommit(); + } + + @Test(expected = OperationNotSupportedException.class) + public void consumeWithOffsets() throws Exception { + DMaaPEventConsumer consumer = createConsumerWithDefaults(); + consumer.consumeWithOffsets(); + } + + @Test(expected = OperationNotSupportedException.class) + public void commitOffsets() throws Exception { + DMaaPEventConsumer consumer = createConsumerWithDefaults(); + consumer.commitOffsets(); + } + + @Test(expected = OperationNotSupportedException.class) + public void commitOffsetsLong() throws Exception { + DMaaPEventConsumer consumer = createConsumerWithDefaults(); + consumer.commitOffsets(0); + } + + private DMaaPEventConsumer createConsumerWithDefaults() throws MalformedURLException { + return new DMaaPEventConsumer("", "", "", "", "", ""); + } +} diff --git a/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java new file mode 100644 index 0000000..fc71f30 --- /dev/null +++ b/event-client-dmaap/src/test/java/org/onap/aai/event/client/TestDMaaPEventPublisher.java @@ -0,0 +1,118 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import com.att.nsa.mr.client.MRPublisher.message; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import java.net.MalformedURLException; +import java.util.Arrays; +import java.util.Properties; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.onap.aai.event.client.DMaaPEventPublisher.MRPublisherFactory; + +@RunWith(MockitoJUnitRunner.class) +public class TestDMaaPEventPublisher { + + private static MRPublisherFactory defaultFactory = DMaaPEventPublisher.getPublisherFactory(); + + @Mock + public MRSimplerBatchPublisher mockDmaapPublisher; + + @After + public void restorePublisherFactory() { + DMaaPEventPublisher.setPublisherFactory(defaultFactory); + } + + public void mockPublisherFactory() { + DMaaPEventPublisher.setPublisherFactory((host, topic, size, age, delay) -> mockDmaapPublisher); + } + + @Test + public void testDefaultFactory() throws MalformedURLException { + createPublisherWithDefaults(); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreatePublisherWithoutHost() throws MalformedURLException { + new DMaaPEventPublisher("", "topic", "", ""); + } + + @Test + public void testConstructors() throws MalformedURLException { + createMockedPublisher(); + new DMaaPEventPublisher("", "", "", "", 0, 0, 0, ""); + } + + @Test + public void publishSynchronous() throws Exception { + Mockito.when(mockDmaapPublisher.getProps()).thenReturn(new Properties()); + message message = new message("partition", "message"); + Mockito.when(mockDmaapPublisher.close(Mockito.anyInt(), Mockito.any())).thenReturn(Arrays.asList(message)); + DMaaPEventPublisher publisher = createMockedPublisher(); + publisher.sendSync(""); + publisher.sendSync(Arrays.asList("")); + publisher.sendSync("key", ""); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + publisher.closeWithUnsent(); + } + + @Test(expected = UnsupportedOperationException.class) + public void sendAsync() throws Exception { + DMaaPEventPublisher publisher = createMockedPublisher(); + publisher.sendAsync(""); + } + + @Test(expected = UnsupportedOperationException.class) + public void sendAsyncMultiple() throws Exception { + DMaaPEventPublisher publisher = createMockedPublisher(); + publisher.sendAsync(Arrays.asList("")); + } + + @Test(expected = UnsupportedOperationException.class) + public void sendAsyncWithPartition() throws Exception { + DMaaPEventPublisher publisher = createMockedPublisher(); + publisher.sendAsync("partition", ""); + } + + @Test(expected = UnsupportedOperationException.class) + public void sendAsyncWithPartitionMultiple() throws Exception { + DMaaPEventPublisher publisher = createMockedPublisher(); + publisher.sendAsync("partition", Arrays.asList("")); + } + + private DMaaPEventPublisher createPublisherWithDefaults() throws MalformedURLException { + return new DMaaPEventPublisher("host", "topic", "", ""); + } + + private DMaaPEventPublisher createMockedPublisher() throws MalformedURLException { + mockPublisherFactory(); + return createPublisherWithDefaults(); + } +} diff --git a/event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java b/event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java new file mode 100644 index 0000000..12054b3 --- /dev/null +++ b/event-client-dmaap/src/test/java/org/onap/aai/event/client/logging/TestApplicationMsgs.java @@ -0,0 +1,42 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client.logging; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +public class TestApplicationMsgs { + + /** + * This test is used to ensure that the static initializer for ApplicationMsgs is invoked. + */ + @Test + public void accessEnumValues() { + assertThat(ApplicationMsgs.values().length, is(not(0))); + } + +} diff --git a/event-client-kafka/pom.xml b/event-client-kafka/pom.xml new file mode 100644 index 0000000..75469b9 --- /dev/null +++ b/event-client-kafka/pom.xml @@ -0,0 +1,31 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.aai.event</groupId> + <artifactId>event-client-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>event-client-kafka</artifactId> + + <properties> + <common.logging.version>1.2.0</common.logging.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.0</version> + </dependency> + <dependency> + <groupId>org.onap.aai.logging-service</groupId> + <artifactId>common-logging</artifactId> + <version>${common.logging.version}</version> + </dependency> + <dependency> + <groupId>org.onap.aai.event</groupId> + <artifactId>event-client-api</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> +</project>
\ No newline at end of file diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java new file mode 100644 index 0000000..e4da20a --- /dev/null +++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java @@ -0,0 +1,143 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.MessageWithOffset; + +/** + * Event Bus Client consumer API for Kafka Implementation .Its a wrapper around KafkaConsumer which is NOT thread safe. + * The KafkaConsumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer + * after use will leak these connections Ref : https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ + * KafkaConsumer.html + * + */ +public class KafkaEventConsumer implements EventConsumer { + + private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventConsumer.class); + + public interface KafkaConsumerFactory { + public KafkaConsumer<String, String> createConsumer(Properties props); + } + + private static KafkaConsumerFactory consumerFactory = KafkaConsumer::new; + + private final KafkaConsumer<String, String> consumer; + + /** + * Replace the consumer factory (intended to be used for testing purposes only). + * + * @param consumerFactory + */ + static void setConsumerFactory(KafkaConsumerFactory consumerFactory) { + KafkaEventConsumer.consumerFactory = consumerFactory; + } + + /** + * + * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The + * client will make use of all servers irrespective of which servers are specified here for + * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list + * should be in the form host1:port1,host2:port2,.... + * @param topic - Topic to consume the messages from + * @param groupId - A unique string that identifies the consumer group this consumer belongs to + */ + public KafkaEventConsumer(String hosts, String topic, String groupId) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + // Set this property, if auto commit should happen. + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = consumerFactory.createConsumer(props); + consumer.subscribe(Arrays.asList(topic)); + } + + public void close() { + consumer.close(); + } + + /** + * + */ + @Override + public Iterable<String> consume() throws Exception { + log.debug("Querying Kafka for messages"); + ConsumerRecords<String, String> records = consumer.poll(0); + List<String> list = new ArrayList<>(); + for (ConsumerRecord<String, String> record : records) { + list.add(record.value()); + } + return list; + + } + + @Override + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { + log.debug("Querying Kafka for messages"); + ConsumerRecords<String, String> records = consumer.poll(0); + List<MessageWithOffset> list = new ArrayList<>(); + for (ConsumerRecord<String, String> record : records) { + list.add(new MessageWithOffset(record.offset(), record.value())); + } + return list; + } + + @Override + public Iterable<String> consumeAndCommit() throws Exception { + Iterable<String> result = consume(); + consumer.commitSync(); + return result; + } + + @Override + public void commitOffsets() throws Exception { + consumer.commitSync(); + } + + @Override + public void commitOffsets(long offset) throws Exception { + Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); + offsetsMap.put(consumer.assignment().iterator().next(), new OffsetAndMetadata(offset)); + consumer.commitSync(offsetsMap); + } + +} diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java new file mode 100644 index 0000000..c05d5c5 --- /dev/null +++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java @@ -0,0 +1,185 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.client; + +import java.util.Collection; +import java.util.Properties; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventPublisher; + +/** + * Event Bus Client publisher implementation for Kafka .A KafkaProducer that publishes records to the Kafka cluster is + * thread safe and sharing a single producer instance across threads will generally be faster than having multiple + * instances. Ref :https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/ + * producer/KafkaProducer.html + * + * + */ +public class KafkaEventPublisher implements EventPublisher { + + private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventPublisher.class); + + public interface KafkaProducerFactory { + public KafkaProducer<String, String> createProducer(Properties props); + } + + private static final String PUBLISHING = "Publishing "; + + private static KafkaProducerFactory producerFactory = KafkaProducer::new; + + private final KafkaProducer<String, String> producer; + private final String topic; + + /** + * Replace the producer factory (intended to be used for testing purposes only). + * + * @param producerFactory + */ + static void setProducerFactory(KafkaProducerFactory producerFactory) { + KafkaEventPublisher.producerFactory = producerFactory; + } + + /** + * + * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The + * client will make use of all servers irrespective of which servers are specified here for + * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list + * should be in the form host1:port1,host2:port2,.... + * @param topic - Topic to publish the messages to + * @param bufferMemory - The total bytes of memory the producer can use to buffer records waiting to be sent to the + * server + * @param batchSize - The producer will attempt to batch records together into fewer requests whenever multiple + * records are being sent to the same partition. This helps performance on both the client and the server. + * This configuration controls the default batch size in bytes + * @param retries -Setting a value greater than zero will cause the client to resend any record whose send fails + * with a potentially transient error. Note that this retry is no different than if the client resent the + * record upon receiving the error. + */ + public KafkaEventPublisher(String hosts, String topic, long bufferMemory, int batchSize, int retries) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + props.put(ProducerConfig.RETRIES_CONFIG, retries); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producer = producerFactory.createProducer(props); + this.topic = topic; + + } + + /** + * + * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The + * client will make use of all servers irrespective of which servers are specified here for + * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list + * should be in the form host1:port1,host2:port2,.... + * @param topic - Topic to publish the messages to + */ + public KafkaEventPublisher(String hosts, String topic) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producer = producerFactory.createProducer(props); + this.topic = topic; + } + + /** + * Closes the publisher. + */ + @Override + public void close() { + producer.close(); + } + + @Override + public int sendSync(String partitionKey, String message) throws Exception { + log.debug("Publishing message on partitionKey " + partitionKey + ": " + message); + producer.send(new ProducerRecord<String, String>(topic, partitionKey, message)).get(); + return 1; + } + + @Override + public int sendSync(String partitionKey, Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey); + for (String message : messages) { + sendSync(partitionKey, message); + } + return messages.size(); + } + + @Override + public int sendSync(String message) throws Exception { + log.debug("Publishing message : " + message); + producer.send(new ProducerRecord<String, String>(topic, message)).get(); + return 1; + } + + @Override + public int sendSync(Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages "); + for (String message : messages) { + sendSync(message); + } + return messages.size(); + } + + @Override + public void sendAsync(String partitionKey, String message) throws Exception { + log.debug("Publishing message on partitionKey " + partitionKey + ": " + message); + producer.send(new ProducerRecord<String, String>(topic, partitionKey, message)); + + } + + @Override + public void sendAsync(String partitionKey, Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey); + for (String message : messages) { + sendAsync(partitionKey, message); + } + + } + + @Override + public void sendAsync(String message) throws Exception { + log.debug("Publishing message : " + message); + producer.send(new ProducerRecord<String, String>(topic, message)); + + } + + @Override + public void sendAsync(Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages "); + for (String message : messages) { + sendAsync(message); + } + + } + +} diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java new file mode 100644 index 0000000..cd9993b --- /dev/null +++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java @@ -0,0 +1,94 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestKafkaEventConsumer { + + @Mock + public KafkaConsumer<String, String> mockKafkaConsumer; + + @Before + public void init() throws Exception { + KafkaEventConsumer.setConsumerFactory(props -> mockKafkaConsumer); + } + + @Test + public void testConstructor() { + new KafkaEventConsumer("", "", ""); + } + + @Test + public void consumeZeroRecords() throws Exception { + Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(ConsumerRecords.empty()); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void consumeMultipleRecords() throws Exception { + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + records.put(new TopicPartition(null, 0), + Arrays.asList(new ConsumerRecord<String, String>("topic", 0, 0, "key", "value"))); + Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords<>(records)); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void commitOffsets() throws Exception { + List<TopicPartition> partitionsList = Arrays.asList(new TopicPartition(null, 0)); + Set<TopicPartition> partitionsSet = Collections.unmodifiableSet(new HashSet<TopicPartition>(partitionsList)); + Mockito.when(mockKafkaConsumer.assignment()).thenReturn(partitionsSet); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.commitOffsets(); + consumer.commitOffsets(0L); + consumer.close(); + } + +} diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java new file mode 100644 index 0000000..8cb1dec --- /dev/null +++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java @@ -0,0 +1,77 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import java.util.Arrays; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestKafkaEventPublisher { + + @Mock + public KafkaProducer<String, String> mockKafkaProducer; + + @Mock + private Future<RecordMetadata> mockedFuture; + + @Before + public void init() throws Exception { + KafkaEventPublisher.setProducerFactory(props -> mockKafkaProducer); + } + + @Test + public void testConstructors() { + new KafkaEventPublisher("hosts", "topic"); + new KafkaEventPublisher("hosts", "topic", 0, 0, 0); + } + + @Test + public void publishSynchronous() throws Exception { + Mockito.when(mockKafkaProducer.send(Mockito.any())).thenReturn(mockedFuture); + KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic"); + publisher.sendSync(""); + publisher.sendSync(Arrays.asList("")); + publisher.sendSync("key", ""); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + } + + @Test + public void publishAsynchronous() throws Exception { + KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic"); + publisher.sendAsync(""); + publisher.sendAsync(Arrays.asList("")); + publisher.sendAsync("key", ""); + publisher.sendAsync("key", Arrays.asList("")); + publisher.close(); + } +} @@ -0,0 +1,130 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.oparent</groupId> + <artifactId>oparent</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <groupId>org.onap.aai.event</groupId> + <artifactId>event-client-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <sitePath>/content/sites/site/org/onap/aai/${project.artifactId}/${project.version}</sitePath> + </properties> + + <modules> + <module>event-client-api</module> + <module>event-client-dmaap</module> + <module>event-client-kafka</module> + </modules> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-release-plugin</artifactId> + <version>2.4.2</version> + <dependencies> + <dependency> + <groupId>org.apache.maven.scm</groupId> + <artifactId>maven-scm-provider-gitexe</artifactId> + <version>1.8.1</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <version>3.0</version> + <configuration> + <header>License.txt</header> + <includes> + <include>src/main/java/**</include> + </includes> + </configuration> + <executions> + <execution> + <goals> + <goal>format</goal> + </goals> + <phase>process-sources</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.sonatype.plugins</groupId> + <artifactId>nexus-staging-maven-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <nexusUrl>${onap.nexus.url}</nexusUrl> + <stagingProfileId>176c31dfe190a</stagingProfileId> + <serverId>ecomp-staging</serverId> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>sonar-maven-plugin</artifactId> + <version>3.2</version> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + <configuration> + <dumpOnExit>true</dumpOnExit> + </configuration> + <executions> + <execution> + <id>jacoco-initialize-unit-tests</id> + <goals> + <goal>prepare-agent</goal> + </goals> + <configuration> + <destFile>${project.build.directory}/coverage-reports/jacoco.exec</destFile> + <!-- <append>true</append> --> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <distributionManagement> + <site> + <id>ecomp-javadoc</id> + <url>dav:${onap.nexus.url}${sitePath}</url> + </site> + </distributionManagement> + +</project>
\ No newline at end of file |