summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-17 11:42:35 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-18 10:58:02 +0100
commit7a4a5d0fef06c29478f594cd6e82e5cb69cc70d2 (patch)
treef96f57bb68563ef0d5eeda85d7c477eefe76f345
parent89ec57e95480fad586b86b8545cec5dd690eae4c (diff)
HV-VES Client - ProducerOptions
* ProducerOptions written * very basic client implementation * added vavr dependency so it's easier to handle Java Issue-ID: DCAEGEN2-1098 Change-Id: I680948c61174f60cd78c8ee39b6f92419f913d36 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rw-r--r--pom.xml5
-rw-r--r--services/hv-ves-client/producer/api/pom.xml4
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java21
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java7
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java4
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java73
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java87
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java (renamed from services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java)35
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java37
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java109
-rw-r--r--services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java100
-rw-r--r--services/hv-ves-client/producer/api/src/test/resources/password.txt2
-rw-r--r--services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java16
-rw-r--r--services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java32
-rw-r--r--services/hv-ves-client/producer/ct/src/test/resources/client.p12bin0 -> 5183 bytes
-rw-r--r--services/hv-ves-client/producer/ct/src/test/resources/client.pass1
-rw-r--r--services/hv-ves-client/producer/ct/src/test/resources/trust.p12bin0 -> 1514 bytes
-rw-r--r--services/hv-ves-client/producer/ct/src/test/resources/trust.pass1
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java22
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java29
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java83
21 files changed, 626 insertions, 42 deletions
diff --git a/pom.xml b/pom.xml
index fe2684ed..038fc313 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,11 @@
<version>${immutables.version}</version>
</dependency>
<dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr</artifactId>
+ <version>0.9.3</version>
+ </dependency>
+ <dependency>
<groupId>org.immutables</groupId>
<artifactId>gson</artifactId>
<version>${immutables.version}</version>
diff --git a/services/hv-ves-client/producer/api/pom.xml b/services/hv-ves-client/producer/api/pom.xml
index 44e15c9d..1804b162 100644
--- a/services/hv-ves-client/producer/api/pom.xml
+++ b/services/hv-ves-client/producer/api/pom.xml
@@ -49,6 +49,10 @@
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<scope>provided</scope>
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java
index f90867d6..a7ad8361 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java
@@ -19,7 +19,7 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
-import java.util.Iterator;
+import io.vavr.collection.Stream;
import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,17 +35,12 @@ final class FactoryLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(FactoryLoader.class);
static <T> T findInstance(Class<T> clazz) {
- Iterator<T> instances = ServiceLoader.load(clazz).iterator();
- if (instances.hasNext()) {
- final T head = instances.next();
- if (instances.hasNext()) {
- LOGGER.warn("Found more than one implementation of {} on the class path. Using {}.",
- clazz.getSimpleName(), head.getClass().getName());
- }
- return head;
- } else {
- throw new IllegalStateException(
- "No " + clazz.getSimpleName() + " instances were configured. Are you sure you have runtime dependency on an implementation module?");
- }
+ return Stream.ofAll(ServiceLoader.load(clazz))
+ .headOption()
+ .peek(head -> LOGGER.info(
+ " Using {} as a {} implementation.", head.getClass().getName(), clazz.getSimpleName()))
+ .getOrElseThrow(() -> new IllegalStateException(
+ "No " + clazz.getSimpleName() + " instances were configured. "
+ + "Are you sure you have runtime dependency on an implementation module?"));
}
}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
index 43cd3feb..3359e54b 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.reactivestreams.Publisher;
@@ -32,7 +33,7 @@ import org.reactivestreams.Publisher;
* <p>Sample usage with <a href="https://projectreactor.io/">Project Reactor</a>:</p>
*
* <pre>
- * ProducerOptions options = {@link ImmutableProducerOptions}.builder(). ... .build()
+ * ProducerOptions options = ImmutableProducerOptions.builder(). ... .build()
* HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options);
*
* Flux.just(msg1, msg2, msg3)
@@ -41,6 +42,7 @@ import org.reactivestreams.Publisher;
* </pre>
*
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
* @since 1.1.1
*/
@FunctionalInterface
@@ -51,6 +53,9 @@ public interface HvVesProducer {
* Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
* case of any problem with sending the messages.
*
+ * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once
+ * feeding it with a stream of consecutive events.
+ *
* @param messages source of the messages to be sent
* @return empty publisher which completes after messages are sent or error occurs
* @since 1.1.1
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
index 3b3f91a6..1e28fbd9 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
/**
* <p>
@@ -32,12 +33,13 @@ import org.jetbrains.annotations.NotNull;
*
* <pre>
* {@link HvVesProducer} producer = HvVesProducerFactory.create(
- * {@link ImmutableProducerOptions}.builder().
+ * ImmutableProducerOptions.builder().
* ...
* .build())
* </pre>
*
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
* @since 1.1.1
*/
public abstract class HvVesProducerFactory {
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java
new file mode 100644
index 00000000..79ae32a8
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import io.vavr.CheckedFunction1;
+import io.vavr.Function1;
+import io.vavr.control.Try;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Simple password representation.
+ *
+ * A password can be used only once. After it the corresponding memory is zeroed.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.1
+ */
+public class Password {
+
+ private char[] value;
+
+ public Password(@NotNull char[] value) {
+ this.value = value;
+ }
+
+ /**
+ * Consume the password.
+ *
+ * After consumption following uses of this method will return Failure(GeneralSecurityException).
+ *
+ * @param user of the password
+ */
+ public <T> Try<T> use(Function1<char[], Try<T>> user) {
+ if (value == null)
+ return Try.failure(new GeneralSecurityException("Password had been already used so it is in cleared state"));
+
+ try {
+ return user.apply(value);
+ } finally {
+ clear();
+ }
+ }
+
+ public <T> Try<T> useChecked(CheckedFunction1<char[], T> user) {
+ return use(CheckedFunction1.liftTry(user));
+ }
+
+ public void clear() {
+ Arrays.fill(value, (char) 0);
+ value = null;
+ }
+}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java
new file mode 100644
index 00000000..cbadfea9
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import io.vavr.control.Try;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Utility functions for loading passwords.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.1
+ */
+public final class Passwords {
+
+ private Passwords() {
+ }
+
+ public static @NotNull Try<Password> fromFile(File file) {
+ return fromPath(file.toPath());
+ }
+
+ public static @NotNull Try<Password> fromPath(Path path) {
+ return Try.of(() -> {
+ final byte[] bytes = Files.readAllBytes(path);
+ final CharBuffer password = decodeChars(bytes);
+ final char[] result = convertToCharArray(password);
+ return new Password(result);
+ });
+ }
+
+ public static @NotNull Try<Password> fromResource(String resource) {
+ return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI()))
+ .flatMap(Passwords::fromPath);
+ }
+
+ private static @NotNull CharBuffer decodeChars(byte[] bytes) {
+ try {
+ return Charset.defaultCharset().decode(ByteBuffer.wrap(bytes));
+ } finally {
+ Arrays.fill(bytes, (byte) 0);
+ }
+ }
+
+ private static char[] convertToCharArray(CharBuffer password) {
+ try {
+ final char[] result = new char[password.limit()];
+ password.get(result);
+ return result;
+ } finally {
+ password.flip();
+ clearBuffer(password);
+ }
+ }
+
+ private static void clearBuffer(CharBuffer password) {
+ while (password.remaining() > 0) {
+ password.put((char) 0);
+ }
+ }
+}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
index d6f4151d..aead5253 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
@@ -1,34 +1,35 @@
/*
- * ============LICENSE_START=======================================================
+ * ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
- * ================================================================================
+ * =========================================================
* Copyright (C) 2019 Nokia. All rights reserved.
- * ================================================================================
+ * =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+import io.vavr.collection.Set;
import java.net.InetSocketAddress;
-import java.util.Set;
import org.immutables.value.Value;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.1
*/
@Value.Immutable
-@Value.Style(depluralize = true, depluralizeDictionary = "address:addresses")
public interface ProducerOptions {
/**
@@ -37,5 +38,23 @@ public interface ProducerOptions {
* @return configured collector endpoints
* @since 1.1.1
*/
+ @NotNull
Set<InetSocketAddress> collectorAddresses();
+
+ /**
+ * Security keys definition used when connecting to the collector.
+
+ *
+ * @return security keys definition or null when plain TCP sockets are to be used.
+ * @since 1.1.1
+ */
+ @Nullable
+ SecurityKeys securityKeys();
+
+ @Value.Check
+ default void validate() {
+ if (collectorAddresses().isEmpty()) {
+ throw new IllegalArgumentException("address list cannot be empty");
+ }
+ }
}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java
new file mode 100644
index 00000000..66af32fa
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import java.nio.file.Path;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.1
+ */
+@Value.Immutable
+public interface SecurityKeys {
+ Path keyStore();
+ Password keyStorePassword();
+
+ Path trustStore();
+ Password trustStorePassword();
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java
new file mode 100644
index 00000000..fbfeb5d5
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+import io.vavr.collection.Array;
+import io.vavr.control.Try;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+class PasswordTest {
+
+ @Test
+ void use_shouldInvokeConsumerWithStoredPassword() {
+ // given
+ final String password = "hej ho";
+ final Password cut = new Password(password.toCharArray());
+
+ // when
+ String result = cut.useChecked(String::new).get();
+
+ // then
+ assertThat(result).isEqualTo(password);
+ }
+
+ @Test
+ void use_shouldClearPasswordAfterUse() {
+ // given
+ final char[] passwordChars = "hej ho".toCharArray();
+ final Password cut = new Password(passwordChars);
+
+ // when
+ useThePassword(cut);
+
+ // then
+ assertAllCharsAreNull(passwordChars);
+ }
+
+ @Test
+ void use_shouldFail_whenItWasAlreadyCalled() {
+ // given
+ final Password cut = new Password("ala ma kota".toCharArray());
+
+ // when & then
+ useThePassword(cut).get();
+
+ assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() ->
+ useThePassword(cut).get());
+ }
+
+ @Test
+ void use_shouldFail_whenItWasCleared() {
+ // given
+ final Password cut = new Password("ala ma kota".toCharArray());
+
+ // when & then
+ cut.clear();
+
+ assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() ->
+ useThePassword(cut).get());
+ }
+
+ @Test
+ void clear_shouldClearThePassword() {
+ // given
+ final char[] passwordChars = "hej ho".toCharArray();
+ final Password cut = new Password(passwordChars);
+
+ // when
+ cut.clear();
+
+ // then
+ assertAllCharsAreNull(passwordChars);
+ }
+
+ private Try<Object> useThePassword(Password cut) {
+ return cut.use((pass) -> Try.success(42));
+ }
+
+ private void assertAllCharsAreNull(char[] passwordChars) {
+ assertThat(Array.ofAll(passwordChars).forAll(ch -> ch == '\0'))
+ .describedAs("all characters in " + Arrays.toString(passwordChars) + " should be == '\\0'")
+ .isTrue();
+ }
+} \ No newline at end of file
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java
new file mode 100644
index 00000000..9f91afb7
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
+
+import io.vavr.control.Try;
+import java.io.File;
+import java.net.URISyntaxException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since January 2019
+ */
+class PasswordsTest {
+
+ @Test
+ void fromFile() {
+ // given
+ final File file = new File("./src/test/resources/password.txt");
+
+ // when
+ final Try<Password> result = Passwords.fromFile(file);
+
+ // then
+ assertSuccessful(result);
+ assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line");
+ }
+
+ @Test
+ void fromPath() throws URISyntaxException {
+ // given
+ final Path path = Paths.get(PasswordsTest.class.getResource("/password.txt").toURI());
+
+ // when
+ final Try<Password> result = Passwords.fromPath(path);
+
+ // then
+ assertSuccessful(result);
+ assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line");
+ }
+
+ @Test
+ void fromPath_shouldFail_whenNotFound() {
+ // given
+ final Path path = Paths.get("/", UUID.randomUUID().toString());
+
+ // when
+ final Try<Password> result = Passwords.fromPath(path);
+
+ // then
+ assertThat(result.isFailure()).describedAs("Try.failure?").isTrue();
+ assertThat(result.getCause()).isInstanceOf(NoSuchFileException.class);
+ }
+
+ @Test
+ void fromResource() {
+ // given
+ final String resource = "/password.txt";
+
+ // when
+ final Try<Password> result = Passwords.fromResource(resource);
+
+ // then
+ assertSuccessful(result);
+ assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line");
+ }
+
+ private void assertSuccessful(Try<Password> result) {
+ assertThat(result.isSuccess()).describedAs("Try.success?").isTrue();
+ }
+
+ private String extractPassword(Try<Password> result) {
+ return result.flatMap(pass -> pass.useChecked(String::new)).get();
+ }
+} \ No newline at end of file
diff --git a/services/hv-ves-client/producer/api/src/test/resources/password.txt b/services/hv-ves-client/producer/api/src/test/resources/password.txt
new file mode 100644
index 00000000..93e4a005
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/test/resources/password.txt
@@ -0,0 +1,2 @@
+ja baczewski
+2nd line \ No newline at end of file
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
index 0aa51aa9..213e9766 100644
--- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
@@ -21,11 +21,12 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
import static org.assertj.core.api.Assertions.assertThat;
+import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.ves.VesEventOuterClass.CommonEventHeader;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
@@ -49,12 +50,17 @@ class HvVesProducerIT {
@Test
void todo() {
// given
- final Flux<VesEvent> input = Flux.just(VesEvent.getDefaultInstance());
+ final VesEvent sampleEvent = VesEvent.newBuilder()
+ .setCommonEventHeader(CommonEventHeader.newBuilder()
+ .setDomain("dummy")
+ .build())
+ .setEventFields(ByteString.copyFrom(new byte[]{0, 1, 2, 3}))
+ .build();
+
+ final Flux<VesEvent> input = Flux.just(sampleEvent);
// when
- // This will currently fail
- //final ByteBuf receivedData = sut.blockingSend(input);
- final ByteBuf receivedData = ByteBufAllocator.DEFAULT.buffer().writeByte(8);
+ final ByteBuf receivedData = sut.blockingSend(input);
// then
assertThat(receivedData.readableBytes())
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
index f459c98f..2cc2c0b2 100644
--- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
@@ -20,17 +20,20 @@
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
import io.netty.buffer.ByteBuf;
-
+import io.vavr.collection.HashSet;
+import io.vavr.control.Try;
import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
-
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Passwords;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -50,6 +53,16 @@ public class SystemUnderTestWrapper {
this(DEFAULT_TIMEOUT);
}
+ public void startSecure() {
+ start(ImmutableProducerOptions.builder()
+ .securityKeys(ImmutableSecurityKeys.builder()
+ .keyStore(resource("/client.p12").get())
+ .keyStorePassword(Passwords.fromResource("/client.pass").get())
+ .trustStore(resource("/trust.p12").get())
+ .trustStorePassword(Passwords.fromResource("/trust.pass").get())
+ .build()));
+ }
+
public void start() {
start(createDefaultOptions());
}
@@ -57,7 +70,7 @@ public class SystemUnderTestWrapper {
public void start(ImmutableProducerOptions.Builder optionsBuilder) {
InetSocketAddress collectorAddress = collector.start();
cut = HvVesProducerFactory.create(
- optionsBuilder.addCollectorAddress(collectorAddress).build());
+ optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build());
}
public void stop() {
@@ -66,9 +79,6 @@ public class SystemUnderTestWrapper {
public ByteBuf blockingSend(Flux<VesEvent> events) {
events.transform(cut::send).subscribe();
-
-
- Mono.from(cut.send(events)).block();
collector.blockUntilFirstClientIsHandled(timeout);
return collector.dataFromFirstClient();
}
@@ -77,4 +87,8 @@ public class SystemUnderTestWrapper {
return ImmutableProducerOptions.builder();
}
+ private Try<Path> resource(String resource) {
+ return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI()));
+ }
+
}
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.p12 b/services/hv-ves-client/producer/ct/src/test/resources/client.p12
new file mode 100644
index 00000000..68a0fb25
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/resources/client.p12
Binary files differ
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.pass b/services/hv-ves-client/producer/ct/src/test/resources/client.pass
new file mode 100644
index 00000000..e69c2de9
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/resources/client.pass
@@ -0,0 +1 @@
+onaponap \ No newline at end of file
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12
new file mode 100644
index 00000000..ed7f62d4
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12
Binary files differ
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.pass b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass
new file mode 100644
index 00000000..e69c2de9
--- /dev/null
+++ b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass
@@ -0,0 +1 @@
+onaponap \ No newline at end of file
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
index f50206ff..ad402f93 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
@@ -19,18 +19,36 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+import io.netty.handler.ssl.SslContext;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ProducerOptions;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.netty.tcp.TcpClient;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
*/
public class HvVesProducerFactoryImpl extends HvVesProducerFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerFactoryImpl.class);
+ private final SslFactory sslFactory = new SslFactory();
+
@Override
protected @NotNull HvVesProducer createProducer(ProducerOptions options) {
- return new HvVesProducerImpl();
+ TcpClient tcpClient = TcpClient.create()
+ .addressSupplier(() -> options.collectorAddresses().head());
+
+ if (options.securityKeys() == null) {
+ LOGGER.warn("Using insecure connection");
+ } else {
+ LOGGER.info("Using secure tunnel");
+ final SslContext ctx = sslFactory.createSecureContext(options.securityKeys()).get();
+ tcpClient = tcpClient.secure(ssl-> ssl.sslContext(ctx));
+ }
+
+ return new HvVesProducerImpl(tcpClient);
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
index 25128d1e..15038c3a 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
@@ -19,6 +19,8 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
import org.onap.ves.VesEventOuterClass.VesEvent;
@@ -27,18 +29,39 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.netty.NettyOutbound;
+import reactor.netty.tcp.TcpClient;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
*/
public class HvVesProducerImpl implements HvVesProducer {
+
private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerImpl.class);
+ private final TcpClient tcpClient;
+
+ HvVesProducerImpl(TcpClient tcpClient) {
+ this.tcpClient = tcpClient;
+ }
@Override
public @NotNull Mono<Void> send(Publisher<VesEvent> messages) {
- return Flux.from(messages)
- .doOnNext(msg -> LOGGER.info("Not-so-dummy sending: {}", msg.toString()))
- .then();
+ return tcpClient
+ .handle((inbound, outbound) -> handle(outbound, messages))
+ .connect().then();
+ }
+
+ private Publisher<Void> handle(NettyOutbound outbound, Publisher<VesEvent> messages) {
+ final Flux<ByteBuf> encodedMessages = Flux.from(messages)
+ .map(msg -> {
+ LOGGER.debug("Encoding VesEvent '{}'", msg);
+ final ByteBuf encodedMessage = outbound.alloc().buffer();
+ encodedMessage.writeCharSequence(msg.getCommonEventHeader().getDomain(), StandardCharsets.UTF_8);
+ encodedMessage.writeByte(0x0a);
+ return encodedMessage;
+ });
+
+ return outbound.send(encodedMessages).then();
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java
new file mode 100644
index 00000000..4661f595
--- /dev/null
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.vavr.Tuple;
+import io.vavr.control.Try;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Password;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.SecurityKeys;
+
+/*
+ * TODO: To be merged with org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory
+ */
+public class SslFactory {
+
+ /**
+ * Function for creating secure ssl context.
+ *
+ * @param keys - Security keys to be used
+ * @return configured SSL context
+ */
+ public Try<SslContext> createSecureContext(final SecurityKeys keys) {
+ final Try<KeyManagerFactory> keyManagerFactory =
+ keyManagerFactory(keys.keyStore(), keys.keyStorePassword());
+ final Try<TrustManagerFactory> trustManagerFactory =
+ trustManagerFactory(keys.trustStore(), keys.trustStorePassword());
+
+ return Try.success(SslContextBuilder.forClient())
+ .flatMap(ctx -> keyManagerFactory.map(ctx::keyManager))
+ .flatMap(ctx -> trustManagerFactory.map(ctx::trustManager))
+ .mapTry(SslContextBuilder::build);
+ }
+
+ private Try<KeyManagerFactory> keyManagerFactory(Path path, Password password) {
+ return password.useChecked(passwordChars -> {
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(loadKeyStoreFromFile(path, passwordChars), passwordChars);
+ return kmf;
+ });
+ }
+
+ private Try<TrustManagerFactory> trustManagerFactory(Path path, Password password) {
+ return password.useChecked(passwordChars -> {
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(loadKeyStoreFromFile(path, passwordChars));
+ return tmf;
+ });
+ }
+
+ private KeyStore loadKeyStoreFromFile(Path path, char[] keyStorePassword)
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = KeyStore.getInstance("pkcs12");
+ ks.load(Files.newInputStream(path, StandardOpenOption.READ), keyStorePassword);
+ return ks;
+ }
+} \ No newline at end of file