diff options
9 files changed, 128 insertions, 54 deletions
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java index 3359e54b..9e9ed393 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java @@ -19,7 +19,9 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; +import java.nio.ByteBuffer; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; @@ -37,7 +39,7 @@ import org.reactivestreams.Publisher; * HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options); * * Flux.just(msg1, msg2, msg3) - * .transform(hvVes::send) + * .transform(hvVes::sendRaw) * .subscribe(); * </pre> * @@ -45,10 +47,10 @@ import org.reactivestreams.Publisher; * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions * @since 1.1.1 */ -@FunctionalInterface public interface HvVesProducer { + /** - * Send the messages to the collector. + * Send ves events to the collector. * * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in * case of any problem with sending the messages. @@ -56,9 +58,31 @@ public interface HvVesProducer { * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once * feeding it with a stream of consecutive events. * - * @param messages source of the messages to be sent - * @return empty publisher which completes after messages are sent or error occurs + * @param messages source of ves events to be sent + * @return empty publisher which completes after ves events are sent or error occurs * @since 1.1.1 */ @NotNull Publisher<Void> send(Publisher<VesEvent> messages); + + /** + * Send the specific type of messages as raw bytes to the collector. + * + * This is more generic version of @{@link #send(Publisher)}, + * that accepts raw payload and explicit message type. + * + * Should be used when sending messages in format different from VES Common Event Format. + * As currently High-Volume VES Collector supports only VesEvent messages it is recommended to use the @{@link #send(Publisher)} method directly. + * + * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in + * case of any problem with sending the messages. + * + * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once + * feeding it with a stream of consecutive events. + * + * @param messages source of raw messages to be sent + * @param payloadType type of messages to be sent + * @return empty publisher which completes after messages are sent or error occurs + * @since 1.1.1 + */ + @NotNull Publisher<Void> sendRaw(Publisher<ByteBuffer> messages, PayloadType payloadType); } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java index ad10c04d..e59cd987 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; import java.nio.ByteBuffer; diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java index 746aae72..247cfad5 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java @@ -19,18 +19,18 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; +import static org.assertj.core.api.Assertions.assertThat; + import io.netty.buffer.ByteBuf; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.PayloadType; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.ves.MeasDataCollectionOuterClass; import org.onap.ves.VesEventOuterClass.CommonEventHeader; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; -import static org.assertj.core.api.Assertions.assertThat; - /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> */ @@ -71,7 +71,8 @@ class HvVesProducerIT { } private VesEvent createSimpleVesEvent() { - final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection.newBuilder() + final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection + .newBuilder() .addMeasInfo(MeasDataCollectionOuterClass.MeasInfo.newBuilder() .addMeasValues(MeasDataCollectionOuterClass.MeasValue.newBuilder() .addMeasResults(MeasDataCollectionOuterClass.MeasResult.newBuilder() diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java index 05873f6f..b4a209f6 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java @@ -19,11 +19,15 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import java.util.function.BiFunction; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; +import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.tcp.TcpClient; @@ -36,23 +40,36 @@ public class HvVesProducerImpl implements HvVesProducer { private final TcpClient tcpClient; private final ProducerCore producerCore; - HvVesProducerImpl(TcpClient tcpClient, ProducerCore producerCore) { this.tcpClient = tcpClient; this.producerCore = producerCore; } @Override - public @NotNull Mono<Void> send(Publisher<VesEvent> messages) { - return tcpClient - .handle((in, out) -> handle(messages, out)) - .connect() - .then(); + public @NotNull Publisher<Void> send(Publisher<VesEvent> messages) { + return handleConnection((in, out) -> handle(messages, out)); + } + + @Override + public @NotNull Publisher<Void> sendRaw(Publisher<ByteBuffer> messages, PayloadType payloadType) { + return handleConnection((in, out) -> handleRaw(messages, payloadType, out)); + } + + private Publisher<Void> handleConnection( + BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> handler) { + return tcpClient.handle(handler).connect().then(); } private Publisher<Void> handle(Publisher<VesEvent> messages, NettyOutbound outbound) { - return outbound - .send(producerCore.encode(messages, outbound.alloc())) - .then(); + return push(producerCore.encode(messages, outbound.alloc()), outbound); + } + + private Publisher<Void> handleRaw(Publisher<ByteBuffer> messages, PayloadType payloadType, + NettyOutbound outbound) { + return push(producerCore.encode(messages, payloadType, outbound.alloc()), outbound); + } + + private Publisher<Void> push(Publisher<ByteBuf> messages, NettyOutbound outbound) { + return outbound.send(messages).then(); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java index 49d54fe8..3f065771 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java @@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.vavr.control.Try; +import java.nio.ByteBuffer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; @@ -45,11 +47,16 @@ public class ProducerCore { } public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) { - final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion); final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(); return Flux.from(messages) .map(protobufEncoder::encode) - .map(wireFrameEncoder::encode) + .transform(payload -> encode(payload, PayloadType.PROTOBUF, allocator)); + } + + public Flux<ByteBuf> encode(Publisher<ByteBuffer> messages, PayloadType payloadType, ByteBufAllocator allocator) { + final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion); + return Flux.from(messages) + .map(payload -> wireFrameEncoder.encode(payload, payloadType)) .map(Try::get); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java index 5bee8461..73b09452 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java @@ -19,12 +19,11 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; +import java.nio.ByteBuffer; import org.onap.ves.VesEventOuterClass.VesEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - /** * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> */ diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java index 29e3347f..af33f433 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java @@ -24,14 +24,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.vavr.control.Try; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; +import java.nio.ByteBuffer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - -/** - * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> - */ public class WireFrameEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class); private static final short MARKER_BYTE = 0xAA; @@ -50,9 +47,10 @@ public class WireFrameEncoder { this.wireFrameVersion = wireFrameVersion; } - public Try<ByteBuf> encode(ByteBuffer payload) { - return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF)) - .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex)); + + public Try<ByteBuf> encode(ByteBuffer payload, PayloadType payloadType) { + return Try.of(() -> encodeMessageAs(payload, payloadType)) + .onFailure(ex -> LOGGER.warn("Failed to encode payload", ex)); } private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException { @@ -79,9 +77,9 @@ public class WireFrameEncoder { } private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage, - PayloadType payloadType, - ByteBuffer payload, - int payloadSize) { + PayloadType payloadType, + ByteBuffer payload, + int payloadSize) { encodedMessage.writeBytes(payloadType.getPayloadTypeBytes()); encodedMessage.writeInt(payloadSize); encodedMessage.writeBytes(payload); diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java index b79b0cf9..86c67f06 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java @@ -20,8 +20,8 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -31,58 +31,85 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.vavr.control.Try; +import java.nio.ByteBuffer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; -import java.nio.ByteBuffer; - /** * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a> */ -public class ProducerCoreTest { +class ProducerCoreTest { private ProducerCore producerCore; private EncodersFactory encodersFactoryMock; private WireFrameVersion wireFrameVersion; @BeforeEach - public void setUp() { + void setUp() { encodersFactoryMock = mock(EncodersFactory.class); wireFrameVersion = mock(WireFrameVersion.class); producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion); } @Test - public void encode_should_encode_message_stream_to_wire_frame() { + void encode_should_encode_raw_message_stream_to_wire_frame() { + final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class); + final ByteBuffer payload = ByteBuffer.wrap(new byte[3]); + final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5])); + + when(wireFrameEncoder.encode(payload, PayloadType.UNDEFINED)).thenReturn(wireFrameBuffer); + when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)) + .thenReturn(wireFrameEncoder); + + // given + final int messageStreamSize = 2; + final Flux<ByteBuffer> rawMessageStream = Flux.just(payload).repeat(messageStreamSize - 1); + + // when + final ByteBuf lastMessage = producerCore + .encode(rawMessageStream, PayloadType.UNDEFINED, ByteBufAllocator.DEFAULT) + .blockLast(); + + // then + verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); + verify(wireFrameEncoder, times(messageStreamSize)).encode(payload, PayloadType.UNDEFINED); + + assertThat(lastMessage).isNotNull(); + assertThat(lastMessage).isEqualTo(wireFrameBuffer.get()); + } + + @Test + void encode_should_encode_ves_event_stream_to_wire_frame() { final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class); final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class); + + final VesEvent vesEvent = defaultVesEvent(); final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]); final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5])); - when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer); - when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer); + when(protobufEncoder.encode(vesEvent)).thenReturn(protoBuffer); + when(wireFrameEncoder.encode(protoBuffer, PayloadType.PROTOBUF)).thenReturn(wireFrameBuffer); when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder); - when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)). - thenReturn(wireFrameEncoder); + when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)) + .thenReturn(wireFrameEncoder); // given final int messageStreamSize = 2; - final Flux<VesEvent> messages = Flux.just(defaultVesEvent()).repeat(messageStreamSize - 1); + final Flux<VesEvent> messages = Flux.just(vesEvent).repeat(messageStreamSize - 1); // when final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast(); // then verify(encodersFactoryMock).createProtobufEncoder(); - verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); - verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class)); - verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer); + verify(protobufEncoder, times(messageStreamSize)).encode(vesEvent); assertThat(lastMessage).isNotNull(); assertThat(lastMessage).isEqualTo(wireFrameBuffer.get()); diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java index d79d0dce..7352fbf5 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java @@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import java.nio.ByteBuffer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; public class WireFrameEncoderTest { private static final byte MARKER_BYTE = (byte) 0xAA; @@ -51,7 +52,7 @@ public class WireFrameEncoderTest { void encode_givenNullPayload_shouldThrowEncodingException() { final ByteBuffer buffer = null; - Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer); + Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF); assertThat(encodedBuffer.isFailure()).isTrue(); assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class); @@ -61,7 +62,7 @@ public class WireFrameEncoderTest { void encode_givenEmptyPayload_shouldThrowEncodingException() { final ByteBuffer buffer = ByteBuffer.allocateDirect(0); - Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer); + Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF); assertThat(encodedBuffer.isFailure()).isTrue(); assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class); @@ -73,7 +74,7 @@ public class WireFrameEncoderTest { final int bufferSize = payloadBytes.length; final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes); - final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer); + final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF); assertThat(encodedBuffer.isSuccess()).isTrue(); final ByteBuf actualEncodedBuffer = encodedBuffer.get(); @@ -95,7 +96,7 @@ public class WireFrameEncoderTest { final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes); // when - final Try<ByteBuf> encodedBuffer = encoder.encode(buffer); + final Try<ByteBuf> encodedBuffer = encoder.encode(buffer, PayloadType.PROTOBUF); // then assertThat(encodedBuffer.isSuccess()).isTrue(); |