diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-17 11:42:35 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-18 10:58:02 +0100 |
commit | 7a4a5d0fef06c29478f594cd6e82e5cb69cc70d2 (patch) | |
tree | f96f57bb68563ef0d5eeda85d7c477eefe76f345 | |
parent | 89ec57e95480fad586b86b8545cec5dd690eae4c (diff) |
HV-VES Client - ProducerOptions
* ProducerOptions written
* very basic client implementation
* added vavr dependency so it's easier to handle Java
Issue-ID: DCAEGEN2-1098
Change-Id: I680948c61174f60cd78c8ee39b6f92419f913d36
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
21 files changed, 626 insertions, 42 deletions
@@ -181,6 +181,11 @@ <version>${immutables.version}</version> </dependency> <dependency> + <groupId>io.vavr</groupId> + <artifactId>vavr</artifactId> + <version>0.9.3</version> + </dependency> + <dependency> <groupId>org.immutables</groupId> <artifactId>gson</artifactId> <version>${immutables.version}</version> diff --git a/services/hv-ves-client/producer/api/pom.xml b/services/hv-ves-client/producer/api/pom.xml index 44e15c9d..1804b162 100644 --- a/services/hv-ves-client/producer/api/pom.xml +++ b/services/hv-ves-client/producer/api/pom.xml @@ -49,6 +49,10 @@ <artifactId>reactive-streams</artifactId> </dependency> <dependency> + <groupId>io.vavr</groupId> + <artifactId>vavr</artifactId> + </dependency> + <dependency> <groupId>org.immutables</groupId> <artifactId>value</artifactId> <scope>provided</scope> diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java index f90867d6..a7ad8361 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java @@ -19,7 +19,7 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; -import java.util.Iterator; +import io.vavr.collection.Stream; import java.util.ServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,17 +35,12 @@ final class FactoryLoader { private static final Logger LOGGER = LoggerFactory.getLogger(FactoryLoader.class); static <T> T findInstance(Class<T> clazz) { - Iterator<T> instances = ServiceLoader.load(clazz).iterator(); - if (instances.hasNext()) { - final T head = instances.next(); - if (instances.hasNext()) { - LOGGER.warn("Found more than one implementation of {} on the class path. Using {}.", - clazz.getSimpleName(), head.getClass().getName()); - } - return head; - } else { - throw new IllegalStateException( - "No " + clazz.getSimpleName() + " instances were configured. Are you sure you have runtime dependency on an implementation module?"); - } + return Stream.ofAll(ServiceLoader.load(clazz)) + .headOption() + .peek(head -> LOGGER.info( + " Using {} as a {} implementation.", head.getClass().getName(), clazz.getSimpleName())) + .getOrElseThrow(() -> new IllegalStateException( + "No " + clazz.getSimpleName() + " instances were configured. " + + "Are you sure you have runtime dependency on an implementation module?")); } } diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java index 43cd3feb..3359e54b 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; @@ -32,7 +33,7 @@ import org.reactivestreams.Publisher; * <p>Sample usage with <a href="https://projectreactor.io/">Project Reactor</a>:</p> * * <pre> - * ProducerOptions options = {@link ImmutableProducerOptions}.builder(). ... .build() + * ProducerOptions options = ImmutableProducerOptions.builder(). ... .build() * HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options); * * Flux.just(msg1, msg2, msg3) @@ -41,6 +42,7 @@ import org.reactivestreams.Publisher; * </pre> * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions * @since 1.1.1 */ @FunctionalInterface @@ -51,6 +53,9 @@ public interface HvVesProducer { * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in * case of any problem with sending the messages. * + * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once + * feeding it with a stream of consecutive events. + * * @param messages source of the messages to be sent * @return empty publisher which completes after messages are sent or error occurs * @since 1.1.1 diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java index 3b3f91a6..1e28fbd9 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; /** * <p> @@ -32,12 +33,13 @@ import org.jetbrains.annotations.NotNull; * * <pre> * {@link HvVesProducer} producer = HvVesProducerFactory.create( - * {@link ImmutableProducerOptions}.builder(). + * ImmutableProducerOptions.builder(). * ... * .build()) * </pre> * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions * @since 1.1.1 */ public abstract class HvVesProducerFactory { diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java new file mode 100644 index 00000000..79ae32a8 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import io.vavr.CheckedFunction1; +import io.vavr.Function1; +import io.vavr.control.Try; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; + +/** + * Simple password representation. + * + * A password can be used only once. After it the corresponding memory is zeroed. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.1 + */ +public class Password { + + private char[] value; + + public Password(@NotNull char[] value) { + this.value = value; + } + + /** + * Consume the password. + * + * After consumption following uses of this method will return Failure(GeneralSecurityException). + * + * @param user of the password + */ + public <T> Try<T> use(Function1<char[], Try<T>> user) { + if (value == null) + return Try.failure(new GeneralSecurityException("Password had been already used so it is in cleared state")); + + try { + return user.apply(value); + } finally { + clear(); + } + } + + public <T> Try<T> useChecked(CheckedFunction1<char[], T> user) { + return use(CheckedFunction1.liftTry(user)); + } + + public void clear() { + Arrays.fill(value, (char) 0); + value = null; + } +} diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java new file mode 100644 index 00000000..cbadfea9 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import io.vavr.control.Try; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; + +/** + * Utility functions for loading passwords. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.1 + */ +public final class Passwords { + + private Passwords() { + } + + public static @NotNull Try<Password> fromFile(File file) { + return fromPath(file.toPath()); + } + + public static @NotNull Try<Password> fromPath(Path path) { + return Try.of(() -> { + final byte[] bytes = Files.readAllBytes(path); + final CharBuffer password = decodeChars(bytes); + final char[] result = convertToCharArray(password); + return new Password(result); + }); + } + + public static @NotNull Try<Password> fromResource(String resource) { + return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI())) + .flatMap(Passwords::fromPath); + } + + private static @NotNull CharBuffer decodeChars(byte[] bytes) { + try { + return Charset.defaultCharset().decode(ByteBuffer.wrap(bytes)); + } finally { + Arrays.fill(bytes, (byte) 0); + } + } + + private static char[] convertToCharArray(CharBuffer password) { + try { + final char[] result = new char[password.limit()]; + password.get(result); + return result; + } finally { + password.flip(); + clearBuffer(password); + } + } + + private static void clearBuffer(CharBuffer password) { + while (password.remaining() > 0) { + password.put((char) 0); + } + } +} diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java index d6f4151d..aead5253 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java @@ -1,34 +1,35 @@ /* - * ============LICENSE_START======================================================= + * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK - * ================================================================================ + * ========================================================= * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ + * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; +import io.vavr.collection.Set; import java.net.InetSocketAddress; -import java.util.Set; import org.immutables.value.Value; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.1 */ @Value.Immutable -@Value.Style(depluralize = true, depluralizeDictionary = "address:addresses") public interface ProducerOptions { /** @@ -37,5 +38,23 @@ public interface ProducerOptions { * @return configured collector endpoints * @since 1.1.1 */ + @NotNull Set<InetSocketAddress> collectorAddresses(); + + /** + * Security keys definition used when connecting to the collector. + + * + * @return security keys definition or null when plain TCP sockets are to be used. + * @since 1.1.1 + */ + @Nullable + SecurityKeys securityKeys(); + + @Value.Check + default void validate() { + if (collectorAddresses().isEmpty()) { + throw new IllegalArgumentException("address list cannot be empty"); + } + } } diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java new file mode 100644 index 00000000..66af32fa --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import java.nio.file.Path; +import org.immutables.value.Value; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.1 + */ +@Value.Immutable +public interface SecurityKeys { + Path keyStore(); + Password keyStorePassword(); + + Path trustStore(); + Password trustStorePassword(); +} diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java new file mode 100644 index 00000000..fbfeb5d5 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import io.vavr.collection.Array; +import io.vavr.control.Try; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import org.junit.jupiter.api.Test; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + */ +class PasswordTest { + + @Test + void use_shouldInvokeConsumerWithStoredPassword() { + // given + final String password = "hej ho"; + final Password cut = new Password(password.toCharArray()); + + // when + String result = cut.useChecked(String::new).get(); + + // then + assertThat(result).isEqualTo(password); + } + + @Test + void use_shouldClearPasswordAfterUse() { + // given + final char[] passwordChars = "hej ho".toCharArray(); + final Password cut = new Password(passwordChars); + + // when + useThePassword(cut); + + // then + assertAllCharsAreNull(passwordChars); + } + + @Test + void use_shouldFail_whenItWasAlreadyCalled() { + // given + final Password cut = new Password("ala ma kota".toCharArray()); + + // when & then + useThePassword(cut).get(); + + assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() -> + useThePassword(cut).get()); + } + + @Test + void use_shouldFail_whenItWasCleared() { + // given + final Password cut = new Password("ala ma kota".toCharArray()); + + // when & then + cut.clear(); + + assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() -> + useThePassword(cut).get()); + } + + @Test + void clear_shouldClearThePassword() { + // given + final char[] passwordChars = "hej ho".toCharArray(); + final Password cut = new Password(passwordChars); + + // when + cut.clear(); + + // then + assertAllCharsAreNull(passwordChars); + } + + private Try<Object> useThePassword(Password cut) { + return cut.use((pass) -> Try.success(42)); + } + + private void assertAllCharsAreNull(char[] passwordChars) { + assertThat(Array.ofAll(passwordChars).forAll(ch -> ch == '\0')) + .describedAs("all characters in " + Arrays.toString(passwordChars) + " should be == '\\0'") + .isTrue(); + } +}
\ No newline at end of file diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java new file mode 100644 index 00000000..9f91afb7 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.fail; + +import io.vavr.control.Try; +import java.io.File; +import java.net.URISyntaxException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since January 2019 + */ +class PasswordsTest { + + @Test + void fromFile() { + // given + final File file = new File("./src/test/resources/password.txt"); + + // when + final Try<Password> result = Passwords.fromFile(file); + + // then + assertSuccessful(result); + assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line"); + } + + @Test + void fromPath() throws URISyntaxException { + // given + final Path path = Paths.get(PasswordsTest.class.getResource("/password.txt").toURI()); + + // when + final Try<Password> result = Passwords.fromPath(path); + + // then + assertSuccessful(result); + assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line"); + } + + @Test + void fromPath_shouldFail_whenNotFound() { + // given + final Path path = Paths.get("/", UUID.randomUUID().toString()); + + // when + final Try<Password> result = Passwords.fromPath(path); + + // then + assertThat(result.isFailure()).describedAs("Try.failure?").isTrue(); + assertThat(result.getCause()).isInstanceOf(NoSuchFileException.class); + } + + @Test + void fromResource() { + // given + final String resource = "/password.txt"; + + // when + final Try<Password> result = Passwords.fromResource(resource); + + // then + assertSuccessful(result); + assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line"); + } + + private void assertSuccessful(Try<Password> result) { + assertThat(result.isSuccess()).describedAs("Try.success?").isTrue(); + } + + private String extractPassword(Try<Password> result) { + return result.flatMap(pass -> pass.useChecked(String::new)).get(); + } +}
\ No newline at end of file diff --git a/services/hv-ves-client/producer/api/src/test/resources/password.txt b/services/hv-ves-client/producer/api/src/test/resources/password.txt new file mode 100644 index 00000000..93e4a005 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/test/resources/password.txt @@ -0,0 +1,2 @@ +ja baczewski +2nd line
\ No newline at end of file diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java index 0aa51aa9..213e9766 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java @@ -21,11 +21,12 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; import static org.assertj.core.api.Assertions.assertThat; +import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.ves.VesEventOuterClass.CommonEventHeader; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; @@ -49,12 +50,17 @@ class HvVesProducerIT { @Test void todo() { // given - final Flux<VesEvent> input = Flux.just(VesEvent.getDefaultInstance()); + final VesEvent sampleEvent = VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setDomain("dummy") + .build()) + .setEventFields(ByteString.copyFrom(new byte[]{0, 1, 2, 3})) + .build(); + + final Flux<VesEvent> input = Flux.just(sampleEvent); // when - // This will currently fail - //final ByteBuf receivedData = sut.blockingSend(input); - final ByteBuf receivedData = ByteBufAllocator.DEFAULT.buffer().writeByte(8); + final ByteBuf receivedData = sut.blockingSend(input); // then assertThat(receivedData.readableBytes()) diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java index f459c98f..2cc2c0b2 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java @@ -20,17 +20,20 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; import io.netty.buffer.ByteBuf; - +import io.vavr.collection.HashSet; +import io.vavr.control.Try; import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; - import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions.Builder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Passwords; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -50,6 +53,16 @@ public class SystemUnderTestWrapper { this(DEFAULT_TIMEOUT); } + public void startSecure() { + start(ImmutableProducerOptions.builder() + .securityKeys(ImmutableSecurityKeys.builder() + .keyStore(resource("/client.p12").get()) + .keyStorePassword(Passwords.fromResource("/client.pass").get()) + .trustStore(resource("/trust.p12").get()) + .trustStorePassword(Passwords.fromResource("/trust.pass").get()) + .build())); + } + public void start() { start(createDefaultOptions()); } @@ -57,7 +70,7 @@ public class SystemUnderTestWrapper { public void start(ImmutableProducerOptions.Builder optionsBuilder) { InetSocketAddress collectorAddress = collector.start(); cut = HvVesProducerFactory.create( - optionsBuilder.addCollectorAddress(collectorAddress).build()); + optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build()); } public void stop() { @@ -66,9 +79,6 @@ public class SystemUnderTestWrapper { public ByteBuf blockingSend(Flux<VesEvent> events) { events.transform(cut::send).subscribe(); - - - Mono.from(cut.send(events)).block(); collector.blockUntilFirstClientIsHandled(timeout); return collector.dataFromFirstClient(); } @@ -77,4 +87,8 @@ public class SystemUnderTestWrapper { return ImmutableProducerOptions.builder(); } + private Try<Path> resource(String resource) { + return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI())); + } + } diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.p12 b/services/hv-ves-client/producer/ct/src/test/resources/client.p12 Binary files differnew file mode 100644 index 00000000..68a0fb25 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/resources/client.p12 diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.pass b/services/hv-ves-client/producer/ct/src/test/resources/client.pass new file mode 100644 index 00000000..e69c2de9 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/resources/client.pass @@ -0,0 +1 @@ +onaponap
\ No newline at end of file diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 Binary files differnew file mode 100644 index 00000000..ed7f62d4 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.pass b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass new file mode 100644 index 00000000..e69c2de9 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass @@ -0,0 +1 @@ +onaponap
\ No newline at end of file diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java index f50206ff..ad402f93 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java @@ -19,18 +19,36 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; +import io.netty.handler.ssl.SslContext; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ProducerOptions; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.netty.tcp.TcpClient; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> */ public class HvVesProducerFactoryImpl extends HvVesProducerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerFactoryImpl.class); + private final SslFactory sslFactory = new SslFactory(); + @Override protected @NotNull HvVesProducer createProducer(ProducerOptions options) { - return new HvVesProducerImpl(); + TcpClient tcpClient = TcpClient.create() + .addressSupplier(() -> options.collectorAddresses().head()); + + if (options.securityKeys() == null) { + LOGGER.warn("Using insecure connection"); + } else { + LOGGER.info("Using secure tunnel"); + final SslContext ctx = sslFactory.createSecureContext(options.securityKeys()).get(); + tcpClient = tcpClient.secure(ssl-> ssl.sslContext(ctx)); + } + + return new HvVesProducerImpl(tcpClient); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java index 25128d1e..15038c3a 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java @@ -19,6 +19,8 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; +import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.ves.VesEventOuterClass.VesEvent; @@ -27,18 +29,39 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.NettyOutbound; +import reactor.netty.tcp.TcpClient; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> */ public class HvVesProducerImpl implements HvVesProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerImpl.class); + private final TcpClient tcpClient; + + HvVesProducerImpl(TcpClient tcpClient) { + this.tcpClient = tcpClient; + } @Override public @NotNull Mono<Void> send(Publisher<VesEvent> messages) { - return Flux.from(messages) - .doOnNext(msg -> LOGGER.info("Not-so-dummy sending: {}", msg.toString())) - .then(); + return tcpClient + .handle((inbound, outbound) -> handle(outbound, messages)) + .connect().then(); + } + + private Publisher<Void> handle(NettyOutbound outbound, Publisher<VesEvent> messages) { + final Flux<ByteBuf> encodedMessages = Flux.from(messages) + .map(msg -> { + LOGGER.debug("Encoding VesEvent '{}'", msg); + final ByteBuf encodedMessage = outbound.alloc().buffer(); + encodedMessage.writeCharSequence(msg.getCommonEventHeader().getDomain(), StandardCharsets.UTF_8); + encodedMessage.writeByte(0x0a); + return encodedMessage; + }); + + return outbound.send(encodedMessages).then(); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java new file mode 100644 index 00000000..4661f595 --- /dev/null +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.vavr.Tuple; +import io.vavr.control.Try; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Password; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.SecurityKeys; + +/* + * TODO: To be merged with org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory + */ +public class SslFactory { + + /** + * Function for creating secure ssl context. + * + * @param keys - Security keys to be used + * @return configured SSL context + */ + public Try<SslContext> createSecureContext(final SecurityKeys keys) { + final Try<KeyManagerFactory> keyManagerFactory = + keyManagerFactory(keys.keyStore(), keys.keyStorePassword()); + final Try<TrustManagerFactory> trustManagerFactory = + trustManagerFactory(keys.trustStore(), keys.trustStorePassword()); + + return Try.success(SslContextBuilder.forClient()) + .flatMap(ctx -> keyManagerFactory.map(ctx::keyManager)) + .flatMap(ctx -> trustManagerFactory.map(ctx::trustManager)) + .mapTry(SslContextBuilder::build); + } + + private Try<KeyManagerFactory> keyManagerFactory(Path path, Password password) { + return password.useChecked(passwordChars -> { + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(loadKeyStoreFromFile(path, passwordChars), passwordChars); + return kmf; + }); + } + + private Try<TrustManagerFactory> trustManagerFactory(Path path, Password password) { + return password.useChecked(passwordChars -> { + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(loadKeyStoreFromFile(path, passwordChars)); + return tmf; + }); + } + + private KeyStore loadKeyStoreFromFile(Path path, char[] keyStorePassword) + throws GeneralSecurityException, IOException { + KeyStore ks = KeyStore.getInstance("pkcs12"); + ks.load(Files.newInputStream(path, StandardOpenOption.READ), keyStorePassword); + return ks; + } +}
\ No newline at end of file |