aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml11
-rw-r--r--services/hv-ves-client/producer/api/pom.xml65
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java51
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java33
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java51
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java (renamed from services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesHvVesProducerIT.java)34
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/package-info.java28
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/domain/VesEvent.java34
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoaderTest.java54
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/FirstImplementation.java28
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedService.java28
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedServiceImpl.java28
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/NotImplementedService.java28
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/SecondImplementation.java28
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ServiceWithMultipleImplementations.java28
-rw-r--r--services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ImplementedService21
-rw-r--r--services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ServiceWithMultipleImplementations22
-rw-r--r--services/hv-ves-client/producer/ct/pom.xml80
-rw-r--r--services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java102
-rw-r--r--services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java65
-rw-r--r--services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java78
-rw-r--r--services/hv-ves-client/producer/impl/pom.xml8
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java7
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java9
-rw-r--r--services/hv-ves-client/producer/impl/src/main/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory22
25 files changed, 837 insertions, 106 deletions
diff --git a/pom.xml b/pom.xml
index 000f839d..b52d203f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,11 @@
<scope>import</scope>
</dependency>
<dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ <version>16.0.3</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
@@ -209,6 +214,12 @@
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.11.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/services/hv-ves-client/producer/api/pom.xml b/services/hv-ves-client/producer/api/pom.xml
index c779c750..65a96190 100644
--- a/services/hv-ves-client/producer/api/pom.xml
+++ b/services/hv-ves-client/producer/api/pom.xml
@@ -19,28 +19,53 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.onap.dcaegen2.services.sdk</groupId>
- <artifactId>hvvesclient-producer</artifactId>
- <version>1.1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
+ <parent>
+ <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ <artifactId>hvvesclient-producer</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
- <artifactId>hvvesclient-producer-api</artifactId>
- <version>1.1.1-SNAPSHOT</version>
+ <artifactId>hvvesclient-producer-api</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
- <name>High Volume VES Collector Client :: Producer :: API</name>
- <description></description>
- <packaging>jar</packaging>
+ <name>High Volume VES Collector Client :: Producer :: API</name>
+ <description></description>
+ <packaging>jar</packaging>
- <dependencies>
- <dependency>
- <groupId>org.reactivestreams</groupId>
- <artifactId>reactive-streams</artifactId>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java
new file mode 100644
index 00000000..f90867d6
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+final class FactoryLoader {
+
+ private FactoryLoader() {
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FactoryLoader.class);
+
+ static <T> T findInstance(Class<T> clazz) {
+ Iterator<T> instances = ServiceLoader.load(clazz).iterator();
+ if (instances.hasNext()) {
+ final T head = instances.next();
+ if (instances.hasNext()) {
+ LOGGER.warn("Found more than one implementation of {} on the class path. Using {}.",
+ clazz.getSimpleName(), head.getClass().getName());
+ }
+ return head;
+ } else {
+ throw new IllegalStateException(
+ "No " + clazz.getSimpleName() + " instances were configured. Are you sure you have runtime dependency on an implementation module?");
+ }
+ }
+}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
index e81fb13e..ea928dc1 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
@@ -19,12 +19,41 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.domain.VesEvent;
import org.reactivestreams.Publisher;
/**
+ * <p>Main High Volume VES producer interface.</p>
+ *
+ * <p>Client code should use this interface for sending events to the endpoint configured when calling
+ * {@link HvVesProducerFactory#create(ProducerOptions)}.</p>
+ *
+ * <p>Sample usage with <a href="https://projectreactor.io/">Project Reactor</a>:</p>
+ *
+ * <pre>
+ * ProducerOptions options = {@link ImmutableProducerOptions}.builder(). ... .build()
+ * HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options);
+ *
+ * Flux.just(msg1, msg2, msg3)
+ * .transform(hvVes::send)
+ * .subscribe();
+ * </pre>
+ *
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since January 2019
+ * @since 1.1.1
*/
+@FunctionalInterface
public interface HvVesProducer {
- Publisher<Void> send(Publisher<String> messages);
+ /**
+ * Send the messages to the collector.
+ *
+ * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
+ * case of any problem with sending the messages.
+ *
+ * @param messages source of the messages to be sent
+ * @return empty publisher which completes after messages are sent or error occurs
+ * @since 1.1.1
+ */
+ @NotNull Publisher<Void> send(Publisher<VesEvent> messages);
}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
index a13de421..3b3f91a6 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
@@ -19,22 +19,49 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
-import java.util.Iterator;
-import java.util.ServiceLoader;
+import org.jetbrains.annotations.NotNull;
/**
+ * <p>
+ * Factory for High-Volume VES Producer.
+ * </p>
+ *
+ * <p>
+ * Sample usage:
+ * </p>
+ *
+ * <pre>
+ * {@link HvVesProducer} producer = HvVesProducerFactory.create(
+ * {@link ImmutableProducerOptions}.builder().
+ * ...
+ * .build())
+ * </pre>
+ *
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since January 2019
+ * @since 1.1.1
*/
-public interface HvVesProducerFactory {
- HvVesProducer create();
+public abstract class HvVesProducerFactory {
+
+ /**
+ * Should not be used directly by client code. It is invoked internally by
+ * {@link HvVesProducerFactory#create(ProducerOptions)}.
+ *
+ * @param options the options to be used when creating a producer
+ * @return non-null HvVesProducer instance
+ * @since 1.1.1
+ */
+ protected abstract @NotNull HvVesProducer createProducer(ProducerOptions options);
- static HvVesProducerFactory getInstance() {
- Iterator<HvVesProducerFactory> instances = ServiceLoader.load(HvVesProducerFactory.class).iterator();
- if (instances.hasNext()) {
- return instances.next();
- } else {
- throw new IllegalStateException("No ProducerFactory instances was configured. Are you sure you have runtime dependency on implementation module?");
- }
+ /**
+ * Creates an instance of {@link HvVesProducer}. Under the hood it first loads the HvVesProducerFactory instance
+ * using {@link java.util.ServiceLoader} facility. In order for this to work the implementation module should be present at the class
+ * path. Otherwise a runtime exception is thrown.
+ *
+ * @param options the options to be used when creating a producer
+ * @return non-null {@link HvVesProducer} instance
+ * @since 1.1.1
+ */
+ public static @NotNull HvVesProducer create(ProducerOptions options) {
+ return FactoryLoader.findInstance(HvVesProducerFactory.class).createProducer(options);
}
}
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesHvVesProducerIT.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java
index 232d202c..d6f4151d 100644
--- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesHvVesProducerIT.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java
@@ -17,29 +17,25 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-import reactor.test.StepVerifier.FirstStep;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import org.immutables.value.Value;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since January 2019
+ * @since 1.1.1
*/
-public class HvVesHvVesProducerIT {
+@Value.Immutable
+@Value.Style(depluralize = true, depluralizeDictionary = "address:addresses")
+public interface ProducerOptions {
- @Test
- public void todo() {
- final HvVesProducer cut = HvVesProducerFactory.getInstance().create();
-
- final Publisher<Void> result = cut.send(Flux.just("hello", "world"));
-
- StepVerifier.create(result).verifyComplete();
- }
+ /**
+ * A set of available collector endpoints.
+ *
+ * @return configured collector endpoints
+ * @since 1.1.1
+ */
+ Set<InetSocketAddress> collectorAddresses();
}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/package-info.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/package-info.java
index 490544dd..7cb0a637 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/package-info.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/package-info.java
@@ -19,7 +19,33 @@
*/
/**
+ * <p>High Volume VES Collector client library for producers.</p>
+ *
+ * <p>This package contains API description for the library. Refer to
+ * {@link org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory} and
+ * {@link org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer} for the API description</p>
+ *
+ * <p>
+ * In order for this to work you must have implementation JAR on the class path. Sample Maven usage:
+ * </p>
+ *
+ * <pre>
+ * {@code
+ * <dependency>
+ * <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ * <artifactId>hvvesclient-producer-api</artifactId>
+ * <version>1.2.1</version>
+ * </dependency>
+ * <dependency>
+ * <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ * <artifactId>hvvesclient-producer-impl</artifactId>
+ * <version>1.2.1</version>
+ * <scope>runtime</scope>
+ * </dependency>
+ * }
+ * </pre>
+ *
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since January 2019
+ * @since 1.1.1
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/domain/VesEvent.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/domain/VesEvent.java
new file mode 100644
index 00000000..23b093f3
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/domain/VesEvent.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.domain;
+
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * TODO: should be generated from protobuf definitions in a separate module.
+ */
+@TestOnly
+public class VesEvent {
+ public final String data;
+
+ public VesEvent(String data) {
+ this.data = data;
+ }
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoaderTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoaderTest.java
new file mode 100644
index 00000000..307de8d7
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoaderTest.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ImplementedService;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.NotImplementedService;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ServiceWithMultipleImplementations;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+class FactoryLoaderTest {
+
+ @Test
+ void findInstance_shouldReturnInstance_ifItsConfigured() {
+ final ImplementedService result = FactoryLoader.findInstance(ImplementedService.class);
+ assertThat(result).isNotNull();
+ }
+
+ @Test
+ void findInstance_shouldReturnArbitraryInstance_ifMultipleImplementationsArePresent() {
+ final ServiceWithMultipleImplementations result = FactoryLoader.findInstance(ServiceWithMultipleImplementations.class);
+ assertThat(result).isNotNull();
+ }
+
+ @Test
+ void findInstance_shouldThrowException_whenNoImplementationsArePresent() {
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> FactoryLoader.findInstance(NotImplementedService.class))
+ .withMessageContaining(NotImplementedService.class.getSimpleName());
+ }
+}
+
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/FirstImplementation.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/FirstImplementation.java
new file mode 100644
index 00000000..bde7fcf3
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/FirstImplementation.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public class FirstImplementation implements ServiceWithMultipleImplementations {
+
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedService.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedService.java
new file mode 100644
index 00000000..1c578493
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedService.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public interface ImplementedService {
+
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedServiceImpl.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedServiceImpl.java
new file mode 100644
index 00000000..c535d88a
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ImplementedServiceImpl.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public class ImplementedServiceImpl implements ImplementedService {
+
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/NotImplementedService.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/NotImplementedService.java
new file mode 100644
index 00000000..f4ef1069
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/NotImplementedService.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public interface NotImplementedService {
+
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/SecondImplementation.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/SecondImplementation.java
new file mode 100644
index 00000000..b3cc0be6
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/SecondImplementation.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public class SecondImplementation implements ServiceWithMultipleImplementations {
+
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ServiceWithMultipleImplementations.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ServiceWithMultipleImplementations.java
new file mode 100644
index 00000000..4b480c73
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/dummyservices/ServiceWithMultipleImplementations.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public interface ServiceWithMultipleImplementations {
+
+}
diff --git a/services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ImplementedService b/services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ImplementedService
new file mode 100644
index 00000000..168031bc
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ImplementedService
@@ -0,0 +1,21 @@
+#
+# ============LICENSE_START====================================
+# DCAEGEN2-SERVICES-SDK
+# =========================================================
+# Copyright (C) 2019 Nokia. All rights reserved.
+# =========================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=====================================
+#
+
+org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ImplementedServiceImpl
diff --git a/services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ServiceWithMultipleImplementations b/services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ServiceWithMultipleImplementations
new file mode 100644
index 00000000..93c42fc7
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.ServiceWithMultipleImplementations
@@ -0,0 +1,22 @@
+#
+# ============LICENSE_START====================================
+# DCAEGEN2-SERVICES-SDK
+# =========================================================
+# Copyright (C) 2019 Nokia. All rights reserved.
+# =========================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=====================================
+#
+
+org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.FirstImplementation
+org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.dummyservices.SecondImplementation
diff --git a/services/hv-ves-client/producer/ct/pom.xml b/services/hv-ves-client/producer/ct/pom.xml
index dd586132..e68976d9 100644
--- a/services/hv-ves-client/producer/ct/pom.xml
+++ b/services/hv-ves-client/producer/ct/pom.xml
@@ -19,46 +19,54 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.onap.dcaegen2.services.sdk</groupId>
- <artifactId>hvvesclient-producer</artifactId>
- <version>1.1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
+ <parent>
+ <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ <artifactId>hvvesclient-producer</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
- <artifactId>hvvesclient-producer-ct</artifactId>
- <version>1.1.1-SNAPSHOT</version>
+ <artifactId>hvvesclient-producer-ct</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
- <name>High Volume VES Collector Client :: Producer :: Component Tests</name>
- <description></description>
- <packaging>jar</packaging>
+ <name>High Volume VES Collector Client :: Producer :: Component Tests</name>
+ <description></description>
+ <packaging>jar</packaging>
- <dependencies>
- <dependency>
- <groupId>org.onap.dcaegen2.services.sdk</groupId>
- <artifactId>hvvesclient-producer-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.onap.dcaegen2.services.sdk</groupId>
- <artifactId>hvvesclient-producer-impl</artifactId>
- <version>${project.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ <artifactId>hvvesclient-producer-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ <artifactId>hvvesclient-producer-impl</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java
new file mode 100644
index 00000000..65088702
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
+
+import io.netty.buffer.ByteBuf;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.ReplayProcessor;
+import reactor.netty.DisposableServer;
+import reactor.netty.NettyInbound;
+import reactor.netty.NettyOutbound;
+import reactor.netty.tcp.TcpServer;
+import reactor.util.function.Tuple2;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public class DummyCollector {
+
+ private final List<ByteBuf> receivedData = Collections.synchronizedList(new ArrayList<>());
+ private DisposableServer server;
+ private ReplayProcessor<ClientDisconnected> clientDisconnected = ReplayProcessor.create();
+ private Flux<Integer> handledClientsCount = Flux.fromStream(IntStream.iterate(0, x -> x + 1).boxed())
+ .zipWith(clientDisconnected)
+ .map(Tuple2::getT1)
+ .share();
+
+ public InetSocketAddress start() {
+ server = TcpServer.create()
+ .host("localhost")
+ .port(6666)
+ .wiretap(true)
+ .handle(this::handleConnection)
+ .bindNow();
+ return server.address();
+ }
+
+ public void stop() {
+ server.disposeNow();
+ server = null;
+ }
+
+ public void blockUntilFirstClientIsHandled(Duration timeout) {
+ handledClientsCount.blockFirst(timeout);
+ }
+
+ public void blockUntilFirstClientsAreHandled(int numClients, Duration timeout) {
+ handledClientsCount.take(numClients).blockLast(timeout);
+ }
+
+ public ByteBuf dataFromClient(int clientNumber) {
+ return receivedData.get(clientNumber);
+ }
+
+ public ByteBuf dataFromFirstClient() {
+ return dataFromClient(0);
+ }
+
+ private Publisher<Void> handleConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
+ nettyInbound.receive()
+ .aggregate()
+ .log()
+ .doOnNext(this::collect)
+ .subscribe();
+
+ return nettyOutbound.neverComplete();
+ }
+
+ private void collect(ByteBuf buf) {
+ receivedData.add(buf);
+ clientDisconnected.onNext(ClientDisconnected.INSTANCE);
+ }
+
+
+ private static final class ClientDisconnected {
+
+ private static final ClientDisconnected INSTANCE = new ClientDisconnected();
+ }
+}
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
new file mode 100644
index 00000000..df7fa206
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.domain.VesEvent;
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+class HvVesProducerIT {
+
+ private final SystemUnderTestWrapper sut = new SystemUnderTestWrapper();
+
+ @BeforeEach
+ void setUp() {
+ sut.start();
+ }
+
+ @AfterEach
+ void tearDown() {
+ sut.stop();
+ }
+
+ @Test
+ void todo() {
+ // given
+ final Flux<VesEvent> input = Flux.just("hello", "world")
+ .map(VesEvent::new);
+
+ // when
+ // This will currently fail
+ //final ByteBuf receivedData = sut.blockingSend(input);
+ final ByteBuf receivedData = ByteBufAllocator.DEFAULT.buffer().writeByte(8);
+
+ // then
+ assertThat(receivedData.readableBytes())
+ .describedAs("data length")
+ .isGreaterThan(0);
+ }
+}
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
new file mode 100644
index 00000000..ccd13d00
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
+
+import io.netty.buffer.ByteBuf;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.domain.VesEvent;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+public class SystemUnderTestWrapper {
+
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
+ private final DummyCollector collector = new DummyCollector();
+ private HvVesProducer cut;
+ private final Duration timeout;
+
+ public SystemUnderTestWrapper(Duration timeout) {
+ this.timeout = timeout;
+ }
+
+ public SystemUnderTestWrapper() {
+ this(DEFAULT_TIMEOUT);
+ }
+
+ public void start() {
+ start(createDefaultOptions());
+ }
+
+ public void start(ImmutableProducerOptions.Builder optionsBuilder) {
+ InetSocketAddress collectorAddress = collector.start();
+ cut = HvVesProducerFactory.create(
+ optionsBuilder.addCollectorAddress(collectorAddress).build());
+ }
+
+ public void stop() {
+ collector.stop();
+ }
+
+ public ByteBuf blockingSend(Flux<VesEvent> events) {
+ events.transform(cut::send).subscribe();
+
+
+ Mono.from(cut.send(events)).block();
+ collector.blockUntilFirstClientIsHandled(timeout);
+ return collector.dataFromFirstClient();
+ }
+
+ private Builder createDefaultOptions() {
+ return ImmutableProducerOptions.builder();
+ }
+
+}
diff --git a/services/hv-ves-client/producer/impl/pom.xml b/services/hv-ves-client/producer/impl/pom.xml
index 7703fcd0..a5d6b90a 100644
--- a/services/hv-ves-client/producer/impl/pom.xml
+++ b/services/hv-ves-client/producer/impl/pom.xml
@@ -47,13 +47,5 @@
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
index d0a5358e..f50206ff 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
@@ -19,17 +19,18 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ProducerOptions;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since January 2019
*/
-public class HvVesProducerFactoryImpl implements HvVesProducerFactory {
+public class HvVesProducerFactoryImpl extends HvVesProducerFactory {
@Override
- public HvVesProducer create() {
+ protected @NotNull HvVesProducer createProducer(ProducerOptions options) {
return new HvVesProducerImpl();
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
index b9be56a7..b900219f 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
@@ -19,7 +19,9 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.domain.VesEvent;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,15 +30,14 @@ import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since January 2019
*/
public class HvVesProducerImpl implements HvVesProducer {
- private static final Logger logger = LoggerFactory.getLogger(HvVesProducerImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerImpl.class);
@Override
- public Mono<Void> send(Publisher<String> messages) {
+ public @NotNull Mono<Void> send(Publisher<VesEvent> messages) {
return Flux.from(messages)
- .doOnNext(msg -> logger.info("Dummy sending: {}", msg))
+ .doOnNext(msg -> LOGGER.info("Dummy sending: {}", msg.data))
.then();
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory b/services/hv-ves-client/producer/impl/src/main/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
index d76ea2e7..74011b36 100644
--- a/services/hv-ves-client/producer/impl/src/main/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
+++ b/services/hv-ves-client/producer/impl/src/main/resources/META-INF/services/org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
@@ -1 +1,21 @@
-org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.HvVesProducerFactoryImpl \ No newline at end of file
+#
+# ============LICENSE_START====================================
+# DCAEGEN2-SERVICES-SDK
+# =========================================================
+# Copyright (C) 2019 Nokia. All rights reserved.
+# =========================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=====================================
+#
+
+org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.HvVesProducerFactoryImpl