diff options
9 files changed, 200 insertions, 37 deletions
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java index baa6d6b3..3000e3d6 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.vavr.control.Try; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; @@ -42,9 +43,10 @@ public class ProducerCore { public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) { final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator); - final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(allocator); + final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(); return Flux.from(messages) - .map(protobufEncoder::encode) - .map(wireFrameEncoder::encode); + .map(protobufEncoder::encode) + .map(wireFrameEncoder::encode) + .map(Try::get); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java index 1d7b8b62..fbe8ea3c 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java @@ -23,8 +23,8 @@ import io.netty.buffer.ByteBufAllocator; public class EncodersFactory { - public ProtobufEncoder createProtobufEncoder(ByteBufAllocator allocator) { - return new ProtobufEncoder(allocator); + public ProtobufEncoder createProtobufEncoder() { + return new ProtobufEncoder(); } public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) { diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java new file mode 100644 index 00000000..ad10c04d --- /dev/null +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * DCAEGEN2-SERVICES-SDK + * ================================================================================ + * Copyright (C) 2019 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; + +import java.nio.ByteBuffer; + +public enum PayloadType { + UNDEFINED(new byte[]{0x00, 0x00}), + PROTOBUF(new byte[]{0x00, 0x01}); + + private final byte[] payloadTypeBytes; + + PayloadType(byte[] payloadTypeBytes) { + this.payloadTypeBytes = payloadTypeBytes; + } + + public ByteBuffer getPayloadTypeBytes() { + return ByteBuffer.wrap(payloadTypeBytes).asReadOnlyBuffer(); + } +} diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java index bb861c28..305716fe 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java @@ -19,26 +19,21 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import org.onap.ves.VesEventOuterClass.VesEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; + /** * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> */ public class ProtobufEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufEncoder.class); - private final ByteBufAllocator allocator; - - public ProtobufEncoder(ByteBufAllocator allocator) { - this.allocator = allocator; - } - public ByteBuf encode(VesEvent event) { + public ByteBuffer encode(VesEvent event) { LOGGER.debug("Encoding VesEvent '{}'", event); - return allocator.buffer().writeBytes(event.toByteArray()); + return event.toByteString().asReadOnlyByteBuffer(); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java index a0807c68..a946ceac 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java @@ -17,15 +17,31 @@ * limitations under the License. * ============LICENSE_END========================================================= */ + package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.vavr.control.Try; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; /** * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> */ public class WireFrameEncoder { + private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class); + private static final short MARKER_BYTE = 0xAA; + private static final short SUPPORTED_VERSION_MAJOR = 0x01; + private static final short SUPPORTED_VERSION_MINOR = 0x00; + private static final int RESERVED_BYTES_COUNT = 3; + private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker + 2 * Byte.BYTES + // single byte fields (versions) + RESERVED_BYTES_COUNT * Byte.BYTES + // reserved bytes + 1 * Short.BYTES + // paylaod type + 1 * Integer.BYTES; // payload length private final ByteBufAllocator allocator; @@ -33,11 +49,47 @@ public class WireFrameEncoder { this.allocator = allocator; } - public ByteBuf encode(ByteBuf payload) { - final ByteBuf encodedMessage = allocator.buffer(); - encodedMessage.writeByte(0xAA); - encodedMessage.writeBytes(payload); - encodedMessage.writeByte(0x0a); + public Try<ByteBuf> encode(ByteBuffer payload) { + return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF)) + .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex)); + } + + private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException { + if (payload == null) { + throw new WTPEncodingException("Payload is null"); + } + + final int payloadSize = payload.remaining(); + if (payloadSize == 0) { + throw new WTPEncodingException("Payload is empty"); + } + + final ByteBuf encodedMessage = allocator.buffer(HEADER_SIZE + payloadSize); + writeBasicWTPFrameHeaderBeginning(encodedMessage); + writePayloadMessageHeaderEnding(encodedMessage, payloadType, payload, payloadSize); return encodedMessage; } + + private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) { + encodedMessage.writeByte(MARKER_BYTE); + encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR); + encodedMessage.writeByte(SUPPORTED_VERSION_MINOR); + encodedMessage.writeZero(RESERVED_BYTES_COUNT); + } + + private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage, + PayloadType payloadType, + ByteBuffer payload, + int payloadSize) { + encodedMessage.writeBytes(payloadType.getPayloadTypeBytes()); + encodedMessage.writeInt(payloadSize); + encodedMessage.writeBytes(payload); + } } + + +class WTPEncodingException extends RuntimeException { + WTPEncodingException(String message) { + super(message); + } +}
\ No newline at end of file diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java index c4211ff4..02cc6e5f 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java @@ -30,6 +30,7 @@ import static org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import io.vavr.control.Try; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; @@ -38,6 +39,8 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encode import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; +import java.nio.ByteBuffer; + /** * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> */ @@ -56,12 +59,12 @@ public class ProducerCoreTest { public void encode_should_encode_message_stream_to_wire_frame() { final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class); final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class); - final ByteBuf protoBuffer = Unpooled.copiedBuffer(new byte[3]); - final ByteBuf wireFrameBuffer = Unpooled.copiedBuffer(new byte[5]); + final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]); + final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5])); when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer); when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer); - when(encodersFactoryMock.createProtobufEncoder(ByteBufAllocator.DEFAULT)).thenReturn(protobufEncoder); + when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder); when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder); // given @@ -72,12 +75,12 @@ public class ProducerCoreTest { final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast(); // then - verify(encodersFactoryMock).createProtobufEncoder(ByteBufAllocator.DEFAULT); + verify(encodersFactoryMock).createProtobufEncoder(); verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT); verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class)); verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer); assertThat(lastMessage).isNotNull(); - assertThat(lastMessage).isEqualTo(wireFrameBuffer); + assertThat(lastMessage).isEqualTo(wireFrameBuffer.get()); } } diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java index c7439ced..3065db28 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java @@ -17,6 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ + package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; import static org.assertj.core.api.Assertions.assertThat; @@ -34,7 +35,7 @@ public class EncodersFactoryTest { @Test public void factory_methods_should_create_non_null_encoders_objects() { // when - final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(ByteBufAllocator.DEFAULT); + final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(); final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT); // then diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java index 042874c4..ba5514ee 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java @@ -27,9 +27,11 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.utils.VesEvents; import org.onap.ves.VesEventOuterClass.VesEvent; +import java.nio.ByteBuffer; + public class ProtobufEncoderTest { - private final ProtobufEncoder protobufEncoder = new ProtobufEncoder(ByteBufAllocator.DEFAULT); + private final ProtobufEncoder protobufEncoder = new ProtobufEncoder(); @Test void todo() { @@ -37,9 +39,9 @@ public class ProtobufEncoderTest { final VesEvent message = VesEvents.defaultVesEvent(); // when - final ByteBuf encodedMessage = protobufEncoder.encode(message); + final ByteBuffer encodedMessage = protobufEncoder.encode(message); // then - assertThat(encodedMessage.readableBytes()).isGreaterThan(0); + assertThat(encodedMessage.remaining()).isGreaterThan(0); } } diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java index 97c38cf2..a0a67d9b 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java @@ -17,29 +17,100 @@ * limitations under the License. * ============LICENSE_END========================================================= */ + package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; import static org.assertj.core.api.Assertions.assertThat; +import io.vavr.control.Try; +import org.junit.jupiter.api.Test; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; public class WireFrameEncoderTest { + private static final byte MARKER_BYTE = (byte) 0xAA; + private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01; + private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00; + private static final int RESERVED_BYTES_COUNT = 3; + private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker + 2 * Byte.BYTES + // single byte fields (versions) + RESERVED_BYTES_COUNT * java.lang.Byte.BYTES + // reserved bytes + 1 * Short.BYTES + // paylaod type + 1 * Integer.BYTES; // payload length private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT); @Test - void todo() { - // given - final ByteBuf buffer = Unpooled.buffer(0); + void encode_givenNullPayload_shouldThrowEncodingException() { + final ByteBuffer buffer = null; - // when - final ByteBuf encodedBuffer = wireFrameEncoder.encode(buffer); + Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer); - // then - assertThat(encodedBuffer.readableBytes()).isGreaterThan(0); + assertThat(encodedBuffer.isFailure()).isTrue(); + assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class); } + @Test + void encode_givenEmptyPayload_shouldThrowEncodingException() { + final ByteBuffer buffer = ByteBuffer.allocateDirect(0); + + Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer); + + assertThat(encodedBuffer.isFailure()).isTrue(); + assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class); + } + + @Test + void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithPayloadAtTheEnd() { + final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C}; + final int bufferSize = payloadBytes.length; + final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes); + + final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer); + + assertThat(encodedBuffer.isSuccess()).isTrue(); + final ByteBuf actualEncodedBuffer = encodedBuffer.get(); + assertBufferSizeIs(actualEncodedBuffer, HEADER_SIZE + bufferSize); + assertValidHeaderBeggining(actualEncodedBuffer); + skipReservedBytes(actualEncodedBuffer); + assertNextBytesAreInOrder(actualEncodedBuffer, (byte) 0x00, (byte) 0x01); + assertNextBytesAreInOrder(actualEncodedBuffer, intToBytes(bufferSize)); + assertNextBytesAreInOrder(actualEncodedBuffer, payloadBytes); + assertAllBytesVerified(actualEncodedBuffer); + } + + private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) { + for (int i = 0; i < bytes.length; i++) { + assertThat(encodedBuffer.readByte()) + .describedAs("byte in " + (i + 1) + " assertion") + .isEqualTo(bytes[i]); + } + } + + private void assertValidHeaderBeggining(ByteBuf encodedBuffer) { + assertNextBytesAreInOrder(encodedBuffer, + MARKER_BYTE, + SUPPORTED_VERSION_MAJOR, + SUPPORTED_VERSION_MINOR); + } + + private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) { + assertThat(encodedBuffer.readableBytes()).describedAs("buffer's readable bytes").isEqualTo(headerSize); + } + + private void skipReservedBytes(ByteBuf encodedBuffer) { + encodedBuffer.readBytes(RESERVED_BYTES_COUNT); + } + + private void assertAllBytesVerified(ByteBuf encodedBuffer) { + assertThat(encodedBuffer.readableBytes()) + .describedAs("all bytes should've been asserted") + .isEqualTo(0); + } + + private byte[] intToBytes(int integer) { + return ByteBuffer.allocate(4).putInt(integer).array(); + } } |