diff options
10 files changed, 138 insertions, 19 deletions
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java index 921db52b..98c77c58 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java @@ -51,10 +51,25 @@ public interface ProducerOptions { @Nullable SecurityKeys securityKeys(); + /** + * Version of Wire Transfer Protocol interface frame + * + * @return Version of interface frame + * @since 1.1.1 + */ + @NotNull + @Value.Default + default WireFrameVersion wireFrameVersion() { + return ImmutableWireFrameVersion.builder().build(); + } + + @Value.Check default void validate() { if (collectorAddresses().isEmpty()) { throw new IllegalArgumentException("address list cannot be empty"); } + } + } diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java new file mode 100644 index 00000000..98a03861 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * DCAEGEN2-SERVICES-SDK + * ================================================================================ + * Copyright (C) 2019 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import org.immutables.value.Value; + +@Value.Immutable +public interface WireFrameVersion { + short SUPPORTED_VERSION_MAJOR = 0x01; + short SUPPORTED_VERSION_MINOR = 0x00; + /*** + * Major version of Wire Transfer Protocol interface frame + * @return major version of interface frame + * @since 1.1.1 + */ + @Value.Default + @Value.Parameter + default short major() { + return SUPPORTED_VERSION_MAJOR; + } + + /*** + * Minor version of Wire Transfer Protocol interface frame + * @return minor version of interface frame + * @since 1.1.1 + */ + @Value.Default + @Value.Parameter + default short minor() { + return SUPPORTED_VERSION_MINOR; + } + + @Value.Check + default void validate() { + if (!(major() > 0 && minor() >=0)) { + throw new IllegalArgumentException("Invalid version"); + } + } + +} + diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java index ddc87bc4..121b940a 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java @@ -22,16 +22,20 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; import io.netty.buffer.ByteBuf; import io.vavr.collection.HashSet; import io.vavr.control.Try; + import java.net.InetSocketAddress; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; + import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; import org.onap.dcaegen2.services.sdk.security.ssl.Passwords; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; @@ -69,8 +73,10 @@ public class SystemUnderTestWrapper { public void start(ImmutableProducerOptions.Builder optionsBuilder) { InetSocketAddress collectorAddress = collector.start(); + WireFrameVersion WTPVersion = ImmutableWireFrameVersion.builder().build(); cut = HvVesProducerFactory.create( - optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build()); + optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)) + .wireFrameVersion(WTPVersion).build()); } public void stop() { diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java index ab10088b..e65f246a 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java @@ -41,8 +41,8 @@ public class HvVesProducerFactoryImpl extends HvVesProducerFactory { @Override protected @NotNull HvVesProducer createProducer(ProducerOptions options) { TcpClient tcpClient = TcpClient.create() - .addressSupplier(() -> options.collectorAddresses().head()); - ProducerCore producerCore = new ProducerCore(new EncodersFactory()); + .addressSupplier(() -> options.collectorAddresses().head()); + ProducerCore producerCore = new ProducerCore(new EncodersFactory(), options.wireFrameVersion()); if (options.securityKeys() == null) { LOGGER.warn("Using insecure connection"); diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java index 3000e3d6..49d54fe8 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java @@ -25,6 +25,7 @@ import io.vavr.control.Try; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -36,13 +37,15 @@ import reactor.core.publisher.Flux; public class ProducerCore { private final EncodersFactory encodersFactory; + private final WireFrameVersion wireFrameVersion; - public ProducerCore(EncodersFactory encodersFactory) { + public ProducerCore(EncodersFactory encodersFactory, WireFrameVersion wireFrameVersion) { this.encodersFactory = encodersFactory; + this.wireFrameVersion = wireFrameVersion; } public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) { - final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator); + final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion); final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(); return Flux.from(messages) .map(protobufEncoder::encode) diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java index fbe8ea3c..24da3b1e 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; import io.netty.buffer.ByteBufAllocator; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; public class EncodersFactory { @@ -27,7 +28,8 @@ public class EncodersFactory { return new ProtobufEncoder(); } - public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) { - return new WireFrameEncoder(allocator); + public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator, + WireFrameVersion wireFrameVersion) { + return new WireFrameEncoder(allocator, wireFrameVersion); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java index a946ceac..29e3347f 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encod import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.vavr.control.Try; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +35,6 @@ import java.nio.ByteBuffer; public class WireFrameEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class); private static final short MARKER_BYTE = 0xAA; - private static final short SUPPORTED_VERSION_MAJOR = 0x01; - private static final short SUPPORTED_VERSION_MINOR = 0x00; private static final int RESERVED_BYTES_COUNT = 3; private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker 2 * Byte.BYTES + // single byte fields (versions) @@ -44,9 +43,11 @@ public class WireFrameEncoder { 1 * Integer.BYTES; // payload length private final ByteBufAllocator allocator; + private final WireFrameVersion wireFrameVersion; - public WireFrameEncoder(ByteBufAllocator allocator) { + public WireFrameEncoder(ByteBufAllocator allocator, WireFrameVersion wireFrameVersion) { this.allocator = allocator; + this.wireFrameVersion = wireFrameVersion; } public Try<ByteBuf> encode(ByteBuffer payload) { @@ -72,8 +73,8 @@ public class WireFrameEncoder { private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) { encodedMessage.writeByte(MARKER_BYTE); - encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR); - encodedMessage.writeByte(SUPPORTED_VERSION_MINOR); + encodedMessage.writeByte(wireFrameVersion.major()); + encodedMessage.writeByte(wireFrameVersion.minor()); encodedMessage.writeZero(RESERVED_BYTES_COUNT); } diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java index 02cc6e5f..b79b0cf9 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; @@ -48,11 +49,13 @@ public class ProducerCoreTest { private ProducerCore producerCore; private EncodersFactory encodersFactoryMock; + private WireFrameVersion wireFrameVersion; @BeforeEach public void setUp() { encodersFactoryMock = mock(EncodersFactory.class); - producerCore = new ProducerCore(encodersFactoryMock); + wireFrameVersion = mock(WireFrameVersion.class); + producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion); } @Test @@ -65,7 +68,8 @@ public class ProducerCoreTest { when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer); when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer); when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder); - when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder); + when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)). + thenReturn(wireFrameEncoder); // given final int messageStreamSize = 2; @@ -76,7 +80,7 @@ public class ProducerCoreTest { // then verify(encodersFactoryMock).createProtobufEncoder(); - verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT); + verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class)); verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer); diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java index 3065db28..81b5cdd6 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java @@ -24,6 +24,8 @@ import static org.assertj.core.api.Assertions.assertThat; import io.netty.buffer.ByteBufAllocator; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; /** * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> @@ -36,7 +38,9 @@ public class EncodersFactoryTest { public void factory_methods_should_create_non_null_encoders_objects() { // when final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(); - final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT); + final WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.builder().build(); + final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT, + wireFrameVersion); // then assertThat(protobufEncoder).isNotNull(); diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java index a0a67d9b..d79d0dce 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java @@ -26,6 +26,8 @@ import io.vavr.control.Try; import org.junit.jupiter.api.Test; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import java.nio.ByteBuffer; @@ -33,6 +35,8 @@ public class WireFrameEncoderTest { private static final byte MARKER_BYTE = (byte) 0xAA; private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01; private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00; + private static final byte SAMPLE_VERSION_MAJOR = (byte) 0x02; + private static final byte SAMPLE_VERSION_MINOR = (byte) 0x01; private static final int RESERVED_BYTES_COUNT = 3; private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker 2 * Byte.BYTES + // single byte fields (versions) @@ -40,7 +44,8 @@ public class WireFrameEncoderTest { 1 * Short.BYTES + // paylaod type 1 * Integer.BYTES; // payload length - private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT); + private final WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.builder().build(); + private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); @Test void encode_givenNullPayload_shouldThrowEncodingException() { @@ -81,6 +86,23 @@ public class WireFrameEncoderTest { assertAllBytesVerified(actualEncodedBuffer); } + @Test + void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithSpecifiedWTPVersion() { + // given + WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.of(SAMPLE_VERSION_MAJOR, SAMPLE_VERSION_MINOR); + final WireFrameEncoder encoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); + final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C}; + final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes); + + // when + final Try<ByteBuf> encodedBuffer = encoder.encode(buffer); + + // then + assertThat(encodedBuffer.isSuccess()).isTrue(); + final ByteBuf versionBuffer = encodedBuffer.get(); + assertValidHeaderBeggining(versionBuffer, SAMPLE_VERSION_MAJOR, SAMPLE_VERSION_MINOR); + } + private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) { for (int i = 0; i < bytes.length; i++) { assertThat(encodedBuffer.readByte()) @@ -90,10 +112,14 @@ public class WireFrameEncoderTest { } private void assertValidHeaderBeggining(ByteBuf encodedBuffer) { + assertValidHeaderBeggining(encodedBuffer, SUPPORTED_VERSION_MAJOR, SUPPORTED_VERSION_MINOR); + } + + private void assertValidHeaderBeggining(ByteBuf encodedBuffer, byte majorWTPVersion, byte minorWTPVersion) { assertNextBytesAreInOrder(encodedBuffer, MARKER_BYTE, - SUPPORTED_VERSION_MAJOR, - SUPPORTED_VERSION_MINOR); + majorWTPVersion, + minorWTPVersion); } private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) { |