From 244b070d680eaac727091193b0998c76c78cc230 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Mon, 4 Feb 2019 15:18:28 +0100 Subject: Add send with raw payload method to HvVesProducer Change-Id: I430b176373c8c351105c1d10047aace63319dd7c Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1164 --- .../hvves/client/producer/api/HvVesProducer.java | 34 +++++++++++-- .../client/producer/api/options/PayloadType.java | 37 ++++++++++++++ .../hvves/client/producer/ct/HvVesProducerIT.java | 9 ++-- .../client/producer/impl/HvVesProducerImpl.java | 37 ++++++++++---- .../hvves/client/producer/impl/ProducerCore.java | 11 ++++- .../client/producer/impl/encoders/PayloadType.java | 37 -------------- .../producer/impl/encoders/ProtobufEncoder.java | 3 +- .../producer/impl/encoders/WireFrameEncoder.java | 20 ++++---- .../client/producer/impl/ProducerCoreTest.java | 57 ++++++++++++++++------ .../impl/encoders/WireFrameEncoderTest.java | 9 ++-- 10 files changed, 164 insertions(+), 90 deletions(-) create mode 100644 services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java delete mode 100644 services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java index 3359e54b..9e9ed393 100644 --- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java @@ -19,7 +19,9 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api; +import java.nio.ByteBuffer; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; @@ -37,7 +39,7 @@ import org.reactivestreams.Publisher; * HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options); * * Flux.just(msg1, msg2, msg3) - * .transform(hvVes::send) + * .transform(hvVes::sendRaw) * .subscribe(); * * @@ -45,10 +47,10 @@ import org.reactivestreams.Publisher; * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions * @since 1.1.1 */ -@FunctionalInterface public interface HvVesProducer { + /** - * Send the messages to the collector. + * Send ves events to the collector. * * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in * case of any problem with sending the messages. @@ -56,9 +58,31 @@ public interface HvVesProducer { * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once * feeding it with a stream of consecutive events. * - * @param messages source of the messages to be sent - * @return empty publisher which completes after messages are sent or error occurs + * @param messages source of ves events to be sent + * @return empty publisher which completes after ves events are sent or error occurs * @since 1.1.1 */ @NotNull Publisher send(Publisher messages); + + /** + * Send the specific type of messages as raw bytes to the collector. + * + * This is more generic version of @{@link #send(Publisher)}, + * that accepts raw payload and explicit message type. + * + * Should be used when sending messages in format different from VES Common Event Format. + * As currently High-Volume VES Collector supports only VesEvent messages it is recommended to use the @{@link #send(Publisher)} method directly. + * + * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in + * case of any problem with sending the messages. + * + * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once + * feeding it with a stream of consecutive events. + * + * @param messages source of raw messages to be sent + * @param payloadType type of messages to be sent + * @return empty publisher which completes after messages are sent or error occurs + * @since 1.1.1 + */ + @NotNull Publisher sendRaw(Publisher messages, PayloadType payloadType); } diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java new file mode 100644 index 00000000..e59cd987 --- /dev/null +++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * DCAEGEN2-SERVICES-SDK + * ================================================================================ + * Copyright (C) 2019 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options; + +import java.nio.ByteBuffer; + +public enum PayloadType { + UNDEFINED(new byte[]{0x00, 0x00}), + PROTOBUF(new byte[]{0x00, 0x01}); + + private final byte[] payloadTypeBytes; + + PayloadType(byte[] payloadTypeBytes) { + this.payloadTypeBytes = payloadTypeBytes; + } + + public ByteBuffer getPayloadTypeBytes() { + return ByteBuffer.wrap(payloadTypeBytes).asReadOnlyBuffer(); + } +} diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java index 746aae72..247cfad5 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java @@ -19,18 +19,18 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; +import static org.assertj.core.api.Assertions.assertThat; + import io.netty.buffer.ByteBuf; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.PayloadType; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.ves.MeasDataCollectionOuterClass; import org.onap.ves.VesEventOuterClass.CommonEventHeader; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; -import static org.assertj.core.api.Assertions.assertThat; - /** * @author Piotr Jaszczyk */ @@ -71,7 +71,8 @@ class HvVesProducerIT { } private VesEvent createSimpleVesEvent() { - final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection.newBuilder() + final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection + .newBuilder() .addMeasInfo(MeasDataCollectionOuterClass.MeasInfo.newBuilder() .addMeasValues(MeasDataCollectionOuterClass.MeasValue.newBuilder() .addMeasResults(MeasDataCollectionOuterClass.MeasResult.newBuilder() diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java index 05873f6f..b4a209f6 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java @@ -19,11 +19,15 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import java.util.function.BiFunction; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.ves.VesEventOuterClass.VesEvent; import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; +import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.tcp.TcpClient; @@ -36,23 +40,36 @@ public class HvVesProducerImpl implements HvVesProducer { private final TcpClient tcpClient; private final ProducerCore producerCore; - HvVesProducerImpl(TcpClient tcpClient, ProducerCore producerCore) { this.tcpClient = tcpClient; this.producerCore = producerCore; } @Override - public @NotNull Mono send(Publisher messages) { - return tcpClient - .handle((in, out) -> handle(messages, out)) - .connect() - .then(); + public @NotNull Publisher send(Publisher messages) { + return handleConnection((in, out) -> handle(messages, out)); + } + + @Override + public @NotNull Publisher sendRaw(Publisher messages, PayloadType payloadType) { + return handleConnection((in, out) -> handleRaw(messages, payloadType, out)); + } + + private Publisher handleConnection( + BiFunction> handler) { + return tcpClient.handle(handler).connect().then(); } private Publisher handle(Publisher messages, NettyOutbound outbound) { - return outbound - .send(producerCore.encode(messages, outbound.alloc())) - .then(); + return push(producerCore.encode(messages, outbound.alloc()), outbound); + } + + private Publisher handleRaw(Publisher messages, PayloadType payloadType, + NettyOutbound outbound) { + return push(producerCore.encode(messages, payloadType, outbound.alloc()), outbound); + } + + private Publisher push(Publisher messages, NettyOutbound outbound) { + return outbound.send(messages).then(); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java index 49d54fe8..3f065771 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java @@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.vavr.control.Try; +import java.nio.ByteBuffer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; @@ -45,11 +47,16 @@ public class ProducerCore { } public Flux encode(Publisher messages, ByteBufAllocator allocator) { - final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion); final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(); return Flux.from(messages) .map(protobufEncoder::encode) - .map(wireFrameEncoder::encode) + .transform(payload -> encode(payload, PayloadType.PROTOBUF, allocator)); + } + + public Flux encode(Publisher messages, PayloadType payloadType, ByteBufAllocator allocator) { + final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion); + return Flux.from(messages) + .map(payload -> wireFrameEncoder.encode(payload, payloadType)) .map(Try::get); } } diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java deleted file mode 100644 index ad10c04d..00000000 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * DCAEGEN2-SERVICES-SDK - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; - -import java.nio.ByteBuffer; - -public enum PayloadType { - UNDEFINED(new byte[]{0x00, 0x00}), - PROTOBUF(new byte[]{0x00, 0x01}); - - private final byte[] payloadTypeBytes; - - PayloadType(byte[] payloadTypeBytes) { - this.payloadTypeBytes = payloadTypeBytes; - } - - public ByteBuffer getPayloadTypeBytes() { - return ByteBuffer.wrap(payloadTypeBytes).asReadOnlyBuffer(); - } -} diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java index 5bee8461..73b09452 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java @@ -19,12 +19,11 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders; +import java.nio.ByteBuffer; import org.onap.ves.VesEventOuterClass.VesEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - /** * @author Jakub Dudycz */ diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java index 29e3347f..af33f433 100644 --- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java +++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java @@ -24,14 +24,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.vavr.control.Try; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; +import java.nio.ByteBuffer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - -/** - * @author Jakub Dudycz - */ public class WireFrameEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class); private static final short MARKER_BYTE = 0xAA; @@ -50,9 +47,10 @@ public class WireFrameEncoder { this.wireFrameVersion = wireFrameVersion; } - public Try encode(ByteBuffer payload) { - return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF)) - .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex)); + + public Try encode(ByteBuffer payload, PayloadType payloadType) { + return Try.of(() -> encodeMessageAs(payload, payloadType)) + .onFailure(ex -> LOGGER.warn("Failed to encode payload", ex)); } private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException { @@ -79,9 +77,9 @@ public class WireFrameEncoder { } private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage, - PayloadType payloadType, - ByteBuffer payload, - int payloadSize) { + PayloadType payloadType, + ByteBuffer payload, + int payloadSize) { encodedMessage.writeBytes(payloadType.getPayloadTypeBytes()); encodedMessage.writeInt(payloadSize); encodedMessage.writeBytes(payload); diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java index b79b0cf9..86c67f06 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java @@ -20,8 +20,8 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -31,58 +31,85 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.vavr.control.Try; +import java.nio.ByteBuffer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder; import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder; -import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; -import java.nio.ByteBuffer; - /** * @author Jakub Dudycz */ -public class ProducerCoreTest { +class ProducerCoreTest { private ProducerCore producerCore; private EncodersFactory encodersFactoryMock; private WireFrameVersion wireFrameVersion; @BeforeEach - public void setUp() { + void setUp() { encodersFactoryMock = mock(EncodersFactory.class); wireFrameVersion = mock(WireFrameVersion.class); producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion); } @Test - public void encode_should_encode_message_stream_to_wire_frame() { + void encode_should_encode_raw_message_stream_to_wire_frame() { + final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class); + final ByteBuffer payload = ByteBuffer.wrap(new byte[3]); + final Try wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5])); + + when(wireFrameEncoder.encode(payload, PayloadType.UNDEFINED)).thenReturn(wireFrameBuffer); + when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)) + .thenReturn(wireFrameEncoder); + + // given + final int messageStreamSize = 2; + final Flux rawMessageStream = Flux.just(payload).repeat(messageStreamSize - 1); + + // when + final ByteBuf lastMessage = producerCore + .encode(rawMessageStream, PayloadType.UNDEFINED, ByteBufAllocator.DEFAULT) + .blockLast(); + + // then + verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); + verify(wireFrameEncoder, times(messageStreamSize)).encode(payload, PayloadType.UNDEFINED); + + assertThat(lastMessage).isNotNull(); + assertThat(lastMessage).isEqualTo(wireFrameBuffer.get()); + } + + @Test + void encode_should_encode_ves_event_stream_to_wire_frame() { final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class); final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class); + + final VesEvent vesEvent = defaultVesEvent(); final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]); final Try wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5])); - when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer); - when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer); + when(protobufEncoder.encode(vesEvent)).thenReturn(protoBuffer); + when(wireFrameEncoder.encode(protoBuffer, PayloadType.PROTOBUF)).thenReturn(wireFrameBuffer); when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder); - when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)). - thenReturn(wireFrameEncoder); + when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)) + .thenReturn(wireFrameEncoder); // given final int messageStreamSize = 2; - final Flux messages = Flux.just(defaultVesEvent()).repeat(messageStreamSize - 1); + final Flux messages = Flux.just(vesEvent).repeat(messageStreamSize - 1); // when final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast(); // then verify(encodersFactoryMock).createProtobufEncoder(); - verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion); - verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class)); - verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer); + verify(protobufEncoder, times(messageStreamSize)).encode(vesEvent); assertThat(lastMessage).isNotNull(); assertThat(lastMessage).isEqualTo(wireFrameBuffer.get()); diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java index d79d0dce..7352fbf5 100644 --- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java +++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java @@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion; import java.nio.ByteBuffer; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType; public class WireFrameEncoderTest { private static final byte MARKER_BYTE = (byte) 0xAA; @@ -51,7 +52,7 @@ public class WireFrameEncoderTest { void encode_givenNullPayload_shouldThrowEncodingException() { final ByteBuffer buffer = null; - Try encodedBuffer = wireFrameEncoder.encode(buffer); + Try encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF); assertThat(encodedBuffer.isFailure()).isTrue(); assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class); @@ -61,7 +62,7 @@ public class WireFrameEncoderTest { void encode_givenEmptyPayload_shouldThrowEncodingException() { final ByteBuffer buffer = ByteBuffer.allocateDirect(0); - Try encodedBuffer = wireFrameEncoder.encode(buffer); + Try encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF); assertThat(encodedBuffer.isFailure()).isTrue(); assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class); @@ -73,7 +74,7 @@ public class WireFrameEncoderTest { final int bufferSize = payloadBytes.length; final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes); - final Try encodedBuffer = wireFrameEncoder.encode(buffer); + final Try encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF); assertThat(encodedBuffer.isSuccess()).isTrue(); final ByteBuf actualEncodedBuffer = encodedBuffer.get(); @@ -95,7 +96,7 @@ public class WireFrameEncoderTest { final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes); // when - final Try encodedBuffer = encoder.encode(buffer); + final Try encodedBuffer = encoder.encode(buffer, PayloadType.PROTOBUF); // then assertThat(encodedBuffer.isSuccess()).isTrue(); -- cgit 1.2.3-korg