From 7a4a5d0fef06c29478f594cd6e82e5cb69cc70d2 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 17 Jan 2019 11:42:35 +0100 Subject: HV-VES Client - ProducerOptions * ProducerOptions written * very basic client implementation * added vavr dependency so it's easier to handle Java Issue-ID: DCAEGEN2-1098 Change-Id: I680948c61174f60cd78c8ee39b6f92419f913d36 Signed-off-by: Piotr Jaszczyk --- pom.xml | 5 + services/hv-ves-client/producer/api/pom.xml | 4 + .../hvves/client/producer/api/FactoryLoader.java | 21 ++-- .../hvves/client/producer/api/HvVesProducer.java | 7 +- .../client/producer/api/HvVesProducerFactory.java | 4 +- .../hvves/client/producer/api/ProducerOptions.java | 41 -------- .../client/producer/api/options/Password.java | 73 ++++++++++++++ .../client/producer/api/options/Passwords.java | 87 ++++++++++++++++ .../producer/api/options/ProducerOptions.java | 60 ++++++++++++ .../client/producer/api/options/SecurityKeys.java | 37 +++++++ .../client/producer/api/options/PasswordTest.java | 109 +++++++++++++++++++++ .../client/producer/api/options/PasswordsTest.java | 100 +++++++++++++++++++ .../producer/api/src/test/resources/password.txt | 2 + .../hvves/client/producer/ct/HvVesProducerIT.java | 16 ++- .../client/producer/ct/SystemUnderTestWrapper.java | 32 ++++-- .../producer/ct/src/test/resources/client.p12 | Bin 0 -> 5183 bytes .../producer/ct/src/test/resources/client.pass | 1 + .../producer/ct/src/test/resources/trust.p12 | Bin 0 -> 1514 bytes .../producer/ct/src/test/resources/trust.pass | 1 + .../producer/impl/HvVesProducerFactoryImpl.java | 22 ++++- .../client/producer/impl/HvVesProducerImpl.java | 29 +++++- .../hvves/client/producer/impl/SslFactory.java | 83 ++++++++++++++++ 22 files changed, 659 insertions(+), 75 deletions(-) delete mode 100644 services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java create mode 100644 services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java create mode 100644 services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java create mode 100644 services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java create mode 100644 services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java create mode 100644 services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java create mode 100644 services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java create mode 100644 services/hv-ves-client/producer/api/src/test/resources/password.txt create mode 100644 services/hv-ves-client/producer/ct/src/test/resources/client.p12 create mode 100644 services/hv-ves-client/producer/ct/src/test/resources/client.pass create mode 100644 services/hv-ves-client/producer/ct/src/test/resources/trust.p12 create mode 100644 services/hv-ves-client/producer/ct/src/test/resources/trust.pass create mode 100644 services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java diff --git a/pom.xml b/pom.xml index fe2684ed..038fc313 100644 --- a/pom.xml +++ b/pom.xml @@ -180,6 +180,11 @@ value ${immutables.version} + + io.vavr + vavr + 0.9.3 + org.immutables gson diff --git a/services/hv-ves-client/producer/api/pom.xml b/services/hv-ves-client/producer/api/pom.xml index 44e15c9d..1804b162 100644 --- a/services/hv-ves-client/producer/api/pom.xml +++ b/services/hv-ves-client/producer/api/pom.xml @@ -48,6 +48,10 @@ org.reactivestreams reactive-streams + + io.vavr + vavr + org.immutables value diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java index f90867d6..a7ad8361 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java @@ -19,7 +19,7 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; -import java.util.Iterator; +import io.vavr.collection.Stream; import java.util.ServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,17 +35,12 @@ final class FactoryLoader { private static final Logger LOGGER = LoggerFactory.getLogger(FactoryLoader.class); static T findInstance(Class clazz) { - Iterator instances = ServiceLoader.load(clazz).iterator(); - if (instances.hasNext()) { - final T head = instances.next(); - if (instances.hasNext()) { - LOGGER.warn("Found more than one implementation of {} on the class path. Using {}.", - clazz.getSimpleName(), head.getClass().getName()); - } - return head; - } else { - throw new IllegalStateException( - "No " + clazz.getSimpleName() + " instances were configured. Are you sure you have runtime dependency on an implementation module?"); - } + return Stream.ofAll(ServiceLoader.load(clazz)) + .headOption() + .peek(head -> LOGGER.info( + " Using {} as a {} implementation.", head.getClass().getName(), clazz.getSimpleName())) + .getOrElseThrow(() -> new IllegalStateException( + "No " + clazz.getSimpleName() + " instances were configured. " + + "Are you sure you have runtime dependency on an implementation module?")); } } diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java index 43cd3feb..3359e54b 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; @@ -32,7 +33,7 @@ import org.reactivestreams.Publisher; *

Sample usage with Project Reactor:

* *
- *     ProducerOptions options = {@link ImmutableProducerOptions}.builder(). ... .build()
+ *     ProducerOptions options = ImmutableProducerOptions.builder(). ... .build()
  *     HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options);
  *
  *     Flux.just(msg1, msg2, msg3)
@@ -41,6 +42,7 @@ import org.reactivestreams.Publisher;
  * 
* * @author Piotr Jaszczyk + * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions * @since 1.1.1 */ @FunctionalInterface @@ -51,6 +53,9 @@ public interface HvVesProducer { * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in * case of any problem with sending the messages. * + * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once + * feeding it with a stream of consecutive events. + * * @param messages source of the messages to be sent * @return empty publisher which completes after messages are sent or error occurs * @since 1.1.1 diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java index 3b3f91a6..1e28fbd9 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; /** *

@@ -32,12 +33,13 @@ import org.jetbrains.annotations.NotNull; * *

  *     {@link HvVesProducer} producer = HvVesProducerFactory.create(
- *          {@link ImmutableProducerOptions}.builder().
+ *          ImmutableProducerOptions.builder().
  *              ...
  *              .build())
  * 
* * @author Piotr Jaszczyk + * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions * @since 1.1.1 */ public abstract class HvVesProducerFactory { diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java deleted file mode 100644 index d6f4151d..00000000 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * DCAEGEN2-SERVICES-SDK - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; - -import java.net.InetSocketAddress; -import java.util.Set; -import org.immutables.value.Value; - -/** - * @author Piotr Jaszczyk - * @since 1.1.1 - */ -@Value.Immutable -@Value.Style(depluralize = true, depluralizeDictionary = "address:addresses") -public interface ProducerOptions { - - /** - * A set of available collector endpoints. - * - * @return configured collector endpoints - * @since 1.1.1 - */ - Set collectorAddresses(); -} diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java new file mode 100644 index 00000000..79ae32a8 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import io.vavr.CheckedFunction1; +import io.vavr.Function1; +import io.vavr.control.Try; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; + +/** + * Simple password representation. + * + * A password can be used only once. After it the corresponding memory is zeroed. + * + * @author Piotr Jaszczyk + * @since 1.1.1 + */ +public class Password { + + private char[] value; + + public Password(@NotNull char[] value) { + this.value = value; + } + + /** + * Consume the password. + * + * After consumption following uses of this method will return Failure(GeneralSecurityException). + * + * @param user of the password + */ + public Try use(Function1> user) { + if (value == null) + return Try.failure(new GeneralSecurityException("Password had been already used so it is in cleared state")); + + try { + return user.apply(value); + } finally { + clear(); + } + } + + public Try useChecked(CheckedFunction1 user) { + return use(CheckedFunction1.liftTry(user)); + } + + public void clear() { + Arrays.fill(value, (char) 0); + value = null; + } +} diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java new file mode 100644 index 00000000..cbadfea9 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import io.vavr.control.Try; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; + +/** + * Utility functions for loading passwords. + * + * @author Piotr Jaszczyk + * @since 1.1.1 + */ +public final class Passwords { + + private Passwords() { + } + + public static @NotNull Try fromFile(File file) { + return fromPath(file.toPath()); + } + + public static @NotNull Try fromPath(Path path) { + return Try.of(() -> { + final byte[] bytes = Files.readAllBytes(path); + final CharBuffer password = decodeChars(bytes); + final char[] result = convertToCharArray(password); + return new Password(result); + }); + } + + public static @NotNull Try fromResource(String resource) { + return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI())) + .flatMap(Passwords::fromPath); + } + + private static @NotNull CharBuffer decodeChars(byte[] bytes) { + try { + return Charset.defaultCharset().decode(ByteBuffer.wrap(bytes)); + } finally { + Arrays.fill(bytes, (byte) 0); + } + } + + private static char[] convertToCharArray(CharBuffer password) { + try { + final char[] result = new char[password.limit()]; + password.get(result); + return result; + } finally { + password.flip(); + clearBuffer(password); + } + } + + private static void clearBuffer(CharBuffer password) { + while (password.remaining() > 0) { + password.put((char) 0); + } + } +} diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java new file mode 100644 index 00000000..aead5253 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import io.vavr.collection.Set; +import java.net.InetSocketAddress; +import org.immutables.value.Value; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * @author Piotr Jaszczyk + * @since 1.1.1 + */ +@Value.Immutable +public interface ProducerOptions { + + /** + * A set of available collector endpoints. + * + * @return configured collector endpoints + * @since 1.1.1 + */ + @NotNull + Set collectorAddresses(); + + /** + * Security keys definition used when connecting to the collector. + + * + * @return security keys definition or null when plain TCP sockets are to be used. + * @since 1.1.1 + */ + @Nullable + SecurityKeys securityKeys(); + + @Value.Check + default void validate() { + if (collectorAddresses().isEmpty()) { + throw new IllegalArgumentException("address list cannot be empty"); + } + } +} diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java new file mode 100644 index 00000000..66af32fa --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import java.nio.file.Path; +import org.immutables.value.Value; + +/** + * @author Piotr Jaszczyk + * @since 1.1.1 + */ +@Value.Immutable +public interface SecurityKeys { + Path keyStore(); + Password keyStorePassword(); + + Path trustStore(); + Password trustStorePassword(); +} diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java new file mode 100644 index 00000000..fbfeb5d5 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import io.vavr.collection.Array; +import io.vavr.control.Try; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import org.junit.jupiter.api.Test; + +/** + * @author Piotr Jaszczyk + */ +class PasswordTest { + + @Test + void use_shouldInvokeConsumerWithStoredPassword() { + // given + final String password = "hej ho"; + final Password cut = new Password(password.toCharArray()); + + // when + String result = cut.useChecked(String::new).get(); + + // then + assertThat(result).isEqualTo(password); + } + + @Test + void use_shouldClearPasswordAfterUse() { + // given + final char[] passwordChars = "hej ho".toCharArray(); + final Password cut = new Password(passwordChars); + + // when + useThePassword(cut); + + // then + assertAllCharsAreNull(passwordChars); + } + + @Test + void use_shouldFail_whenItWasAlreadyCalled() { + // given + final Password cut = new Password("ala ma kota".toCharArray()); + + // when & then + useThePassword(cut).get(); + + assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() -> + useThePassword(cut).get()); + } + + @Test + void use_shouldFail_whenItWasCleared() { + // given + final Password cut = new Password("ala ma kota".toCharArray()); + + // when & then + cut.clear(); + + assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() -> + useThePassword(cut).get()); + } + + @Test + void clear_shouldClearThePassword() { + // given + final char[] passwordChars = "hej ho".toCharArray(); + final Password cut = new Password(passwordChars); + + // when + cut.clear(); + + // then + assertAllCharsAreNull(passwordChars); + } + + private Try useThePassword(Password cut) { + return cut.use((pass) -> Try.success(42)); + } + + private void assertAllCharsAreNull(char[] passwordChars) { + assertThat(Array.ofAll(passwordChars).forAll(ch -> ch == '\0')) + .describedAs("all characters in " + Arrays.toString(passwordChars) + " should be == '\\0'") + .isTrue(); + } +} \ No newline at end of file diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java new file mode 100644 index 00000000..9f91afb7 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.fail; + +import io.vavr.control.Try; +import java.io.File; +import java.net.URISyntaxException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +/** + * @author Piotr Jaszczyk + * @since January 2019 + */ +class PasswordsTest { + + @Test + void fromFile() { + // given + final File file = new File("./src/test/resources/password.txt"); + + // when + final Try result = Passwords.fromFile(file); + + // then + assertSuccessful(result); + assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line"); + } + + @Test + void fromPath() throws URISyntaxException { + // given + final Path path = Paths.get(PasswordsTest.class.getResource("/password.txt").toURI()); + + // when + final Try result = Passwords.fromPath(path); + + // then + assertSuccessful(result); + assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line"); + } + + @Test + void fromPath_shouldFail_whenNotFound() { + // given + final Path path = Paths.get("/", UUID.randomUUID().toString()); + + // when + final Try result = Passwords.fromPath(path); + + // then + assertThat(result.isFailure()).describedAs("Try.failure?").isTrue(); + assertThat(result.getCause()).isInstanceOf(NoSuchFileException.class); + } + + @Test + void fromResource() { + // given + final String resource = "/password.txt"; + + // when + final Try result = Passwords.fromResource(resource); + + // then + assertSuccessful(result); + assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line"); + } + + private void assertSuccessful(Try result) { + assertThat(result.isSuccess()).describedAs("Try.success?").isTrue(); + } + + private String extractPassword(Try result) { + return result.flatMap(pass -> pass.useChecked(String::new)).get(); + } +} \ No newline at end of file diff --git a/services/hv-ves-client/producer/api/src/test/resources/password.txt b/services/hv-ves-client/producer/api/src/test/resources/password.txt new file mode 100644 index 00000000..93e4a005 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/test/resources/password.txt @@ -0,0 +1,2 @@ +ja baczewski +2nd line \ No newline at end of file diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java index 0aa51aa9..213e9766 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java @@ -21,11 +21,12 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; import static org.assertj.core.api.Assertions.assertThat; +import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.ves.VesEventOuterClass.CommonEventHeader; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; @@ -49,12 +50,17 @@ class HvVesProducerIT { @Test void todo() { // given - final Flux input = Flux.just(VesEvent.getDefaultInstance()); + final VesEvent sampleEvent = VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setDomain("dummy") + .build()) + .setEventFields(ByteString.copyFrom(new byte[]{0, 1, 2, 3})) + .build(); + + final Flux input = Flux.just(sampleEvent); // when - // This will currently fail - //final ByteBuf receivedData = sut.blockingSend(input); - final ByteBuf receivedData = ByteBufAllocator.DEFAULT.buffer().writeByte(8); + final ByteBuf receivedData = sut.blockingSend(input); // then assertThat(receivedData.readableBytes()) diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java index f459c98f..2cc2c0b2 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java @@ -20,17 +20,20 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; import io.netty.buffer.ByteBuf; - +import io.vavr.collection.HashSet; +import io.vavr.control.Try; import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; - import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions.Builder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Passwords; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author Piotr Jaszczyk @@ -50,6 +53,16 @@ public class SystemUnderTestWrapper { this(DEFAULT_TIMEOUT); } + public void startSecure() { + start(ImmutableProducerOptions.builder() + .securityKeys(ImmutableSecurityKeys.builder() + .keyStore(resource("/client.p12").get()) + .keyStorePassword(Passwords.fromResource("/client.pass").get()) + .trustStore(resource("/trust.p12").get()) + .trustStorePassword(Passwords.fromResource("/trust.pass").get()) + .build())); + } + public void start() { start(createDefaultOptions()); } @@ -57,7 +70,7 @@ public class SystemUnderTestWrapper { public void start(ImmutableProducerOptions.Builder optionsBuilder) { InetSocketAddress collectorAddress = collector.start(); cut = HvVesProducerFactory.create( - optionsBuilder.addCollectorAddress(collectorAddress).build()); + optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build()); } public void stop() { @@ -66,9 +79,6 @@ public class SystemUnderTestWrapper { public ByteBuf blockingSend(Flux events) { events.transform(cut::send).subscribe(); - - - Mono.from(cut.send(events)).block(); collector.blockUntilFirstClientIsHandled(timeout); return collector.dataFromFirstClient(); } @@ -77,4 +87,8 @@ public class SystemUnderTestWrapper { return ImmutableProducerOptions.builder(); } + private Try resource(String resource) { + return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI())); + } + } diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.p12 b/services/hv-ves-client/producer/ct/src/test/resources/client.p12 new file mode 100644 index 00000000..68a0fb25 Binary files /dev/null and b/services/hv-ves-client/producer/ct/src/test/resources/client.p12 differ diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.pass b/services/hv-ves-client/producer/ct/src/test/resources/client.pass new file mode 100644 index 00000000..e69c2de9 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/resources/client.pass @@ -0,0 +1 @@ +onaponap \ No newline at end of file diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 new file mode 100644 index 00000000..ed7f62d4 Binary files /dev/null and b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 differ diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.pass b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass new file mode 100644 index 00000000..e69c2de9 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass @@ -0,0 +1 @@ +onaponap \ No newline at end of file diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java index f50206ff..ad402f93 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java @@ -19,18 +19,36 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; +import io.netty.handler.ssl.SslContext; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ProducerOptions; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.netty.tcp.TcpClient; /** * @author Piotr Jaszczyk */ public class HvVesProducerFactoryImpl extends HvVesProducerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerFactoryImpl.class); + private final SslFactory sslFactory = new SslFactory(); + @Override protected @NotNull HvVesProducer createProducer(ProducerOptions options) { - return new HvVesProducerImpl(); + TcpClient tcpClient = TcpClient.create() + .addressSupplier(() -> options.collectorAddresses().head()); + + if (options.securityKeys() == null) { + LOGGER.warn("Using insecure connection"); + } else { + LOGGER.info("Using secure tunnel"); + final SslContext ctx = sslFactory.createSecureContext(options.securityKeys()).get(); + tcpClient = tcpClient.secure(ssl-> ssl.sslContext(ctx)); + } + + return new HvVesProducerImpl(tcpClient); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java index 25128d1e..15038c3a 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java @@ -19,6 +19,8 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; +import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.ves.VesEventOuterClass.VesEvent; @@ -27,18 +29,39 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.NettyOutbound; +import reactor.netty.tcp.TcpClient; /** * @author Piotr Jaszczyk */ public class HvVesProducerImpl implements HvVesProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerImpl.class); + private final TcpClient tcpClient; + + HvVesProducerImpl(TcpClient tcpClient) { + this.tcpClient = tcpClient; + } @Override public @NotNull Mono send(Publisher messages) { - return Flux.from(messages) - .doOnNext(msg -> LOGGER.info("Not-so-dummy sending: {}", msg.toString())) - .then(); + return tcpClient + .handle((inbound, outbound) -> handle(outbound, messages)) + .connect().then(); + } + + private Publisher handle(NettyOutbound outbound, Publisher messages) { + final Flux encodedMessages = Flux.from(messages) + .map(msg -> { + LOGGER.debug("Encoding VesEvent '{}'", msg); + final ByteBuf encodedMessage = outbound.alloc().buffer(); + encodedMessage.writeCharSequence(msg.getCommonEventHeader().getDomain(), StandardCharsets.UTF_8); + encodedMessage.writeByte(0x0a); + return encodedMessage; + }); + + return outbound.send(encodedMessages).then(); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java new file mode 100644 index 00000000..4661f595 --- /dev/null +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.vavr.Tuple; +import io.vavr.control.Try; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Password; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.SecurityKeys; + +/* + * TODO: To be merged with org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory + */ +public class SslFactory { + + /** + * Function for creating secure ssl context. + * + * @param keys - Security keys to be used + * @return configured SSL context + */ + public Try createSecureContext(final SecurityKeys keys) { + final Try keyManagerFactory = + keyManagerFactory(keys.keyStore(), keys.keyStorePassword()); + final Try trustManagerFactory = + trustManagerFactory(keys.trustStore(), keys.trustStorePassword()); + + return Try.success(SslContextBuilder.forClient()) + .flatMap(ctx -> keyManagerFactory.map(ctx::keyManager)) + .flatMap(ctx -> trustManagerFactory.map(ctx::trustManager)) + .mapTry(SslContextBuilder::build); + } + + private Try keyManagerFactory(Path path, Password password) { + return password.useChecked(passwordChars -> { + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(loadKeyStoreFromFile(path, passwordChars), passwordChars); + return kmf; + }); + } + + private Try trustManagerFactory(Path path, Password password) { + return password.useChecked(passwordChars -> { + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(loadKeyStoreFromFile(path, passwordChars)); + return tmf; + }); + } + + private KeyStore loadKeyStoreFromFile(Path path, char[] keyStorePassword) + throws GeneralSecurityException, IOException { + KeyStore ks = KeyStore.getInstance("pkcs12"); + ks.load(Files.newInputStream(path, StandardOpenOption.READ), keyStorePassword); + return ks; + } +} \ No newline at end of file -- cgit 1.2.3-korg