summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-01-28 10:24:55 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-01-28 14:56:35 +0100
commitc2343da9dda10cad1cea5a7b3e4995a43a4796c5 (patch)
treea62670fc112128ee0500f9fe1aff9bbb1f114745
parent556b783cd4d083d234649ca52bb820af5d1fbcde (diff)
Encode outgoing packages using WTP
Change-Id: I5a2c14846168bcc1a77bec6a96ecdd3114c5016a Issue-ID: DCAEGEN2-1069 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java8
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java4
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java37
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java13
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java62
-rw-r--r--services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java13
-rw-r--r--services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java3
-rw-r--r--services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java8
-rw-r--r--services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java89
9 files changed, 200 insertions, 37 deletions
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
index baa6d6b3..3000e3d6 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
@@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.vavr.control.Try;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
@@ -42,9 +43,10 @@ public class ProducerCore {
public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator);
- final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(allocator);
+ final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
return Flux.from(messages)
- .map(protobufEncoder::encode)
- .map(wireFrameEncoder::encode);
+ .map(protobufEncoder::encode)
+ .map(wireFrameEncoder::encode)
+ .map(Try::get);
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
index 1d7b8b62..fbe8ea3c 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
@@ -23,8 +23,8 @@ import io.netty.buffer.ByteBufAllocator;
public class EncodersFactory {
- public ProtobufEncoder createProtobufEncoder(ByteBufAllocator allocator) {
- return new ProtobufEncoder(allocator);
+ public ProtobufEncoder createProtobufEncoder() {
+ return new ProtobufEncoder();
}
public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) {
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java
new file mode 100644
index 00000000..ad10c04d
--- /dev/null
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
+
+import java.nio.ByteBuffer;
+
+public enum PayloadType {
+ UNDEFINED(new byte[]{0x00, 0x00}),
+ PROTOBUF(new byte[]{0x00, 0x01});
+
+ private final byte[] payloadTypeBytes;
+
+ PayloadType(byte[] payloadTypeBytes) {
+ this.payloadTypeBytes = payloadTypeBytes;
+ }
+
+ public ByteBuffer getPayloadTypeBytes() {
+ return ByteBuffer.wrap(payloadTypeBytes).asReadOnlyBuffer();
+ }
+}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
index bb861c28..305716fe 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
@@ -19,26 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
public class ProtobufEncoder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufEncoder.class);
- private final ByteBufAllocator allocator;
-
- public ProtobufEncoder(ByteBufAllocator allocator) {
- this.allocator = allocator;
- }
- public ByteBuf encode(VesEvent event) {
+ public ByteBuffer encode(VesEvent event) {
LOGGER.debug("Encoding VesEvent '{}'", event);
- return allocator.buffer().writeBytes(event.toByteArray());
+ return event.toByteString().asReadOnlyByteBuffer();
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
index a0807c68..a946ceac 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
@@ -17,15 +17,31 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.vavr.control.Try;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
public class WireFrameEncoder {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
+ private static final short MARKER_BYTE = 0xAA;
+ private static final short SUPPORTED_VERSION_MAJOR = 0x01;
+ private static final short SUPPORTED_VERSION_MINOR = 0x00;
+ private static final int RESERVED_BYTES_COUNT = 3;
+ private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker
+ 2 * Byte.BYTES + // single byte fields (versions)
+ RESERVED_BYTES_COUNT * Byte.BYTES + // reserved bytes
+ 1 * Short.BYTES + // paylaod type
+ 1 * Integer.BYTES; // payload length
private final ByteBufAllocator allocator;
@@ -33,11 +49,47 @@ public class WireFrameEncoder {
this.allocator = allocator;
}
- public ByteBuf encode(ByteBuf payload) {
- final ByteBuf encodedMessage = allocator.buffer();
- encodedMessage.writeByte(0xAA);
- encodedMessage.writeBytes(payload);
- encodedMessage.writeByte(0x0a);
+ public Try<ByteBuf> encode(ByteBuffer payload) {
+ return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF))
+ .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex));
+ }
+
+ private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException {
+ if (payload == null) {
+ throw new WTPEncodingException("Payload is null");
+ }
+
+ final int payloadSize = payload.remaining();
+ if (payloadSize == 0) {
+ throw new WTPEncodingException("Payload is empty");
+ }
+
+ final ByteBuf encodedMessage = allocator.buffer(HEADER_SIZE + payloadSize);
+ writeBasicWTPFrameHeaderBeginning(encodedMessage);
+ writePayloadMessageHeaderEnding(encodedMessage, payloadType, payload, payloadSize);
return encodedMessage;
}
+
+ private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) {
+ encodedMessage.writeByte(MARKER_BYTE);
+ encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR);
+ encodedMessage.writeByte(SUPPORTED_VERSION_MINOR);
+ encodedMessage.writeZero(RESERVED_BYTES_COUNT);
+ }
+
+ private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage,
+ PayloadType payloadType,
+ ByteBuffer payload,
+ int payloadSize) {
+ encodedMessage.writeBytes(payloadType.getPayloadTypeBytes());
+ encodedMessage.writeInt(payloadSize);
+ encodedMessage.writeBytes(payload);
+ }
}
+
+
+class WTPEncodingException extends RuntimeException {
+ WTPEncodingException(String message) {
+ super(message);
+ }
+} \ No newline at end of file
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
index c4211ff4..02cc6e5f 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
@@ -30,6 +30,7 @@ import static org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.vavr.control.Try;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
@@ -38,6 +39,8 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encode
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
+import java.nio.ByteBuffer;
+
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
@@ -56,12 +59,12 @@ public class ProducerCoreTest {
public void encode_should_encode_message_stream_to_wire_frame() {
final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class);
- final ByteBuf protoBuffer = Unpooled.copiedBuffer(new byte[3]);
- final ByteBuf wireFrameBuffer = Unpooled.copiedBuffer(new byte[5]);
+ final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]);
+ final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
- when(encodersFactoryMock.createProtobufEncoder(ByteBufAllocator.DEFAULT)).thenReturn(protobufEncoder);
+ when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder);
// given
@@ -72,12 +75,12 @@ public class ProducerCoreTest {
final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast();
// then
- verify(encodersFactoryMock).createProtobufEncoder(ByteBufAllocator.DEFAULT);
+ verify(encodersFactoryMock).createProtobufEncoder();
verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT);
verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
assertThat(lastMessage).isNotNull();
- assertThat(lastMessage).isEqualTo(wireFrameBuffer);
+ assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
}
}
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
index c7439ced..3065db28 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
@@ -17,6 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +35,7 @@ public class EncodersFactoryTest {
@Test
public void factory_methods_should_create_non_null_encoders_objects() {
// when
- final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(ByteBufAllocator.DEFAULT);
+ final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT);
// then
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java
index 042874c4..ba5514ee 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java
@@ -27,9 +27,11 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.utils.VesEvents;
import org.onap.ves.VesEventOuterClass.VesEvent;
+import java.nio.ByteBuffer;
+
public class ProtobufEncoderTest {
- private final ProtobufEncoder protobufEncoder = new ProtobufEncoder(ByteBufAllocator.DEFAULT);
+ private final ProtobufEncoder protobufEncoder = new ProtobufEncoder();
@Test
void todo() {
@@ -37,9 +39,9 @@ public class ProtobufEncoderTest {
final VesEvent message = VesEvents.defaultVesEvent();
// when
- final ByteBuf encodedMessage = protobufEncoder.encode(message);
+ final ByteBuffer encodedMessage = protobufEncoder.encode(message);
// then
- assertThat(encodedMessage.readableBytes()).isGreaterThan(0);
+ assertThat(encodedMessage.remaining()).isGreaterThan(0);
}
}
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
index 97c38cf2..a0a67d9b 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
@@ -17,29 +17,100 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import static org.assertj.core.api.Assertions.assertThat;
+import io.vavr.control.Try;
+import org.junit.jupiter.api.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
public class WireFrameEncoderTest {
+ private static final byte MARKER_BYTE = (byte) 0xAA;
+ private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01;
+ private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00;
+ private static final int RESERVED_BYTES_COUNT = 3;
+ private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker
+ 2 * Byte.BYTES + // single byte fields (versions)
+ RESERVED_BYTES_COUNT * java.lang.Byte.BYTES + // reserved bytes
+ 1 * Short.BYTES + // paylaod type
+ 1 * Integer.BYTES; // payload length
private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT);
@Test
- void todo() {
- // given
- final ByteBuf buffer = Unpooled.buffer(0);
+ void encode_givenNullPayload_shouldThrowEncodingException() {
+ final ByteBuffer buffer = null;
- // when
- final ByteBuf encodedBuffer = wireFrameEncoder.encode(buffer);
+ Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
- // then
- assertThat(encodedBuffer.readableBytes()).isGreaterThan(0);
+ assertThat(encodedBuffer.isFailure()).isTrue();
+ assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
}
+ @Test
+ void encode_givenEmptyPayload_shouldThrowEncodingException() {
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(0);
+
+ Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+
+ assertThat(encodedBuffer.isFailure()).isTrue();
+ assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
+ }
+
+ @Test
+ void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithPayloadAtTheEnd() {
+ final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C};
+ final int bufferSize = payloadBytes.length;
+ final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
+
+ final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+
+ assertThat(encodedBuffer.isSuccess()).isTrue();
+ final ByteBuf actualEncodedBuffer = encodedBuffer.get();
+ assertBufferSizeIs(actualEncodedBuffer, HEADER_SIZE + bufferSize);
+ assertValidHeaderBeggining(actualEncodedBuffer);
+ skipReservedBytes(actualEncodedBuffer);
+ assertNextBytesAreInOrder(actualEncodedBuffer, (byte) 0x00, (byte) 0x01);
+ assertNextBytesAreInOrder(actualEncodedBuffer, intToBytes(bufferSize));
+ assertNextBytesAreInOrder(actualEncodedBuffer, payloadBytes);
+ assertAllBytesVerified(actualEncodedBuffer);
+ }
+
+ private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) {
+ for (int i = 0; i < bytes.length; i++) {
+ assertThat(encodedBuffer.readByte())
+ .describedAs("byte in " + (i + 1) + " assertion")
+ .isEqualTo(bytes[i]);
+ }
+ }
+
+ private void assertValidHeaderBeggining(ByteBuf encodedBuffer) {
+ assertNextBytesAreInOrder(encodedBuffer,
+ MARKER_BYTE,
+ SUPPORTED_VERSION_MAJOR,
+ SUPPORTED_VERSION_MINOR);
+ }
+
+ private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) {
+ assertThat(encodedBuffer.readableBytes()).describedAs("buffer's readable bytes").isEqualTo(headerSize);
+ }
+
+ private void skipReservedBytes(ByteBuf encodedBuffer) {
+ encodedBuffer.readBytes(RESERVED_BYTES_COUNT);
+ }
+
+ private void assertAllBytesVerified(ByteBuf encodedBuffer) {
+ assertThat(encodedBuffer.readableBytes())
+ .describedAs("all bytes should've been asserted")
+ .isEqualTo(0);
+ }
+
+ private byte[] intToBytes(int integer) {
+ return ByteBuffer.allocate(4).putInt(integer).array();
+ }
}