aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-02-04 15:18:28 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-02-08 16:45:00 +0100
commit244b070d680eaac727091193b0998c76c78cc230 (patch)
tree186f12361b03710f92254015bda035c8b750e739 /services
parentb84f6dd1e5ce8b425fcf7ccb4978d89159b35943 (diff)
Add send with raw payload method to HvVesProducer
Change-Id: I430b176373c8c351105c1d10047aace63319dd7c Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1164
Diffstat (limited to 'services')
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java34
-rw-r--r--services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java (renamed from services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java)2
-rw-r--r--services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java9
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java37
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java11
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java3
-rw-r--r--services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java20
-rw-r--r--services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java57
-rw-r--r--services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java9
9 files changed, 128 insertions, 54 deletions
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
index 3359e54b..9e9ed393 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
@@ -19,7 +19,9 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
+import java.nio.ByteBuffer;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.reactivestreams.Publisher;
@@ -37,7 +39,7 @@ import org.reactivestreams.Publisher;
* HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options);
*
* Flux.just(msg1, msg2, msg3)
- * .transform(hvVes::send)
+ * .transform(hvVes::sendRaw)
* .subscribe();
* </pre>
*
@@ -45,10 +47,10 @@ import org.reactivestreams.Publisher;
* @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
* @since 1.1.1
*/
-@FunctionalInterface
public interface HvVesProducer {
+
/**
- * Send the messages to the collector.
+ * Send ves events to the collector.
*
* Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
* case of any problem with sending the messages.
@@ -56,9 +58,31 @@ public interface HvVesProducer {
* Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once
* feeding it with a stream of consecutive events.
*
- * @param messages source of the messages to be sent
- * @return empty publisher which completes after messages are sent or error occurs
+ * @param messages source of ves events to be sent
+ * @return empty publisher which completes after ves events are sent or error occurs
* @since 1.1.1
*/
@NotNull Publisher<Void> send(Publisher<VesEvent> messages);
+
+ /**
+ * Send the specific type of messages as raw bytes to the collector.
+ *
+ * This is more generic version of @{@link #send(Publisher)},
+ * that accepts raw payload and explicit message type.
+ *
+ * Should be used when sending messages in format different from VES Common Event Format.
+ * As currently High-Volume VES Collector supports only VesEvent messages it is recommended to use the @{@link #send(Publisher)} method directly.
+ *
+ * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
+ * case of any problem with sending the messages.
+ *
+ * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once
+ * feeding it with a stream of consecutive events.
+ *
+ * @param messages source of raw messages to be sent
+ * @param payloadType type of messages to be sent
+ * @return empty publisher which completes after messages are sent or error occurs
+ * @since 1.1.1
+ */
+ @NotNull Publisher<Void> sendRaw(Publisher<ByteBuffer> messages, PayloadType payloadType);
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java
index ad10c04d..e59cd987 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
import java.nio.ByteBuffer;
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
index 746aae72..247cfad5 100644
--- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
@@ -19,18 +19,18 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
+import static org.assertj.core.api.Assertions.assertThat;
+
import io.netty.buffer.ByteBuf;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.PayloadType;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
import org.onap.ves.MeasDataCollectionOuterClass;
import org.onap.ves.VesEventOuterClass.CommonEventHeader;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
-import static org.assertj.core.api.Assertions.assertThat;
-
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
*/
@@ -71,7 +71,8 @@ class HvVesProducerIT {
}
private VesEvent createSimpleVesEvent() {
- final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection.newBuilder()
+ final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection
+ .newBuilder()
.addMeasInfo(MeasDataCollectionOuterClass.MeasInfo.newBuilder()
.addMeasValues(MeasDataCollectionOuterClass.MeasValue.newBuilder()
.addMeasResults(MeasDataCollectionOuterClass.MeasResult.newBuilder()
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
index 05873f6f..b4a209f6 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
@@ -19,11 +19,15 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
+import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.tcp.TcpClient;
@@ -36,23 +40,36 @@ public class HvVesProducerImpl implements HvVesProducer {
private final TcpClient tcpClient;
private final ProducerCore producerCore;
-
HvVesProducerImpl(TcpClient tcpClient, ProducerCore producerCore) {
this.tcpClient = tcpClient;
this.producerCore = producerCore;
}
@Override
- public @NotNull Mono<Void> send(Publisher<VesEvent> messages) {
- return tcpClient
- .handle((in, out) -> handle(messages, out))
- .connect()
- .then();
+ public @NotNull Publisher<Void> send(Publisher<VesEvent> messages) {
+ return handleConnection((in, out) -> handle(messages, out));
+ }
+
+ @Override
+ public @NotNull Publisher<Void> sendRaw(Publisher<ByteBuffer> messages, PayloadType payloadType) {
+ return handleConnection((in, out) -> handleRaw(messages, payloadType, out));
+ }
+
+ private Publisher<Void> handleConnection(
+ BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> handler) {
+ return tcpClient.handle(handler).connect().then();
}
private Publisher<Void> handle(Publisher<VesEvent> messages, NettyOutbound outbound) {
- return outbound
- .send(producerCore.encode(messages, outbound.alloc()))
- .then();
+ return push(producerCore.encode(messages, outbound.alloc()), outbound);
+ }
+
+ private Publisher<Void> handleRaw(Publisher<ByteBuffer> messages, PayloadType payloadType,
+ NettyOutbound outbound) {
+ return push(producerCore.encode(messages, payloadType, outbound.alloc()), outbound);
+ }
+
+ private Publisher<Void> push(Publisher<ByteBuf> messages, NettyOutbound outbound) {
+ return outbound.send(messages).then();
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
index 49d54fe8..3f065771 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
@@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.vavr.control.Try;
+import java.nio.ByteBuffer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
@@ -45,11 +47,16 @@ public class ProducerCore {
}
public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
- final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion);
final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
return Flux.from(messages)
.map(protobufEncoder::encode)
- .map(wireFrameEncoder::encode)
+ .transform(payload -> encode(payload, PayloadType.PROTOBUF, allocator));
+ }
+
+ public Flux<ByteBuf> encode(Publisher<ByteBuffer> messages, PayloadType payloadType, ByteBufAllocator allocator) {
+ final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion);
+ return Flux.from(messages)
+ .map(payload -> wireFrameEncoder.encode(payload, payloadType))
.map(Try::get);
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
index 5bee8461..73b09452 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
@@ -19,12 +19,11 @@
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
+import java.nio.ByteBuffer;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
index 29e3347f..af33f433 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
@@ -24,14 +24,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.vavr.control.Try;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
+import java.nio.ByteBuffer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
- */
public class WireFrameEncoder {
private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
private static final short MARKER_BYTE = 0xAA;
@@ -50,9 +47,10 @@ public class WireFrameEncoder {
this.wireFrameVersion = wireFrameVersion;
}
- public Try<ByteBuf> encode(ByteBuffer payload) {
- return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF))
- .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex));
+
+ public Try<ByteBuf> encode(ByteBuffer payload, PayloadType payloadType) {
+ return Try.of(() -> encodeMessageAs(payload, payloadType))
+ .onFailure(ex -> LOGGER.warn("Failed to encode payload", ex));
}
private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException {
@@ -79,9 +77,9 @@ public class WireFrameEncoder {
}
private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage,
- PayloadType payloadType,
- ByteBuffer payload,
- int payloadSize) {
+ PayloadType payloadType,
+ ByteBuffer payload,
+ int payloadSize) {
encodedMessage.writeBytes(payloadType.getPayloadTypeBytes());
encodedMessage.writeInt(payloadSize);
encodedMessage.writeBytes(payload);
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
index b79b0cf9..86c67f06 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
@@ -20,8 +20,8 @@
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -31,58 +31,85 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.vavr.control.Try;
+import java.nio.ByteBuffer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
-import java.nio.ByteBuffer;
-
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
-public class ProducerCoreTest {
+class ProducerCoreTest {
private ProducerCore producerCore;
private EncodersFactory encodersFactoryMock;
private WireFrameVersion wireFrameVersion;
@BeforeEach
- public void setUp() {
+ void setUp() {
encodersFactoryMock = mock(EncodersFactory.class);
wireFrameVersion = mock(WireFrameVersion.class);
producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion);
}
@Test
- public void encode_should_encode_message_stream_to_wire_frame() {
+ void encode_should_encode_raw_message_stream_to_wire_frame() {
+ final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
+ final ByteBuffer payload = ByteBuffer.wrap(new byte[3]);
+ final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
+
+ when(wireFrameEncoder.encode(payload, PayloadType.UNDEFINED)).thenReturn(wireFrameBuffer);
+ when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion))
+ .thenReturn(wireFrameEncoder);
+
+ // given
+ final int messageStreamSize = 2;
+ final Flux<ByteBuffer> rawMessageStream = Flux.just(payload).repeat(messageStreamSize - 1);
+
+ // when
+ final ByteBuf lastMessage = producerCore
+ .encode(rawMessageStream, PayloadType.UNDEFINED, ByteBufAllocator.DEFAULT)
+ .blockLast();
+
+ // then
+ verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
+ verify(wireFrameEncoder, times(messageStreamSize)).encode(payload, PayloadType.UNDEFINED);
+
+ assertThat(lastMessage).isNotNull();
+ assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
+ }
+
+ @Test
+ void encode_should_encode_ves_event_stream_to_wire_frame() {
final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class);
+
+ final VesEvent vesEvent = defaultVesEvent();
final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]);
final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
- when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
- when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
+ when(protobufEncoder.encode(vesEvent)).thenReturn(protoBuffer);
+ when(wireFrameEncoder.encode(protoBuffer, PayloadType.PROTOBUF)).thenReturn(wireFrameBuffer);
when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
- when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)).
- thenReturn(wireFrameEncoder);
+ when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion))
+ .thenReturn(wireFrameEncoder);
// given
final int messageStreamSize = 2;
- final Flux<VesEvent> messages = Flux.just(defaultVesEvent()).repeat(messageStreamSize - 1);
+ final Flux<VesEvent> messages = Flux.just(vesEvent).repeat(messageStreamSize - 1);
// when
final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast();
// then
verify(encodersFactoryMock).createProtobufEncoder();
- verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
- verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
- verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
+ verify(protobufEncoder, times(messageStreamSize)).encode(vesEvent);
assertThat(lastMessage).isNotNull();
assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
index d79d0dce..7352fbf5 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import java.nio.ByteBuffer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
public class WireFrameEncoderTest {
private static final byte MARKER_BYTE = (byte) 0xAA;
@@ -51,7 +52,7 @@ public class WireFrameEncoderTest {
void encode_givenNullPayload_shouldThrowEncodingException() {
final ByteBuffer buffer = null;
- Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+ Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF);
assertThat(encodedBuffer.isFailure()).isTrue();
assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
@@ -61,7 +62,7 @@ public class WireFrameEncoderTest {
void encode_givenEmptyPayload_shouldThrowEncodingException() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(0);
- Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+ Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF);
assertThat(encodedBuffer.isFailure()).isTrue();
assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
@@ -73,7 +74,7 @@ public class WireFrameEncoderTest {
final int bufferSize = payloadBytes.length;
final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
- final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+ final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF);
assertThat(encodedBuffer.isSuccess()).isTrue();
final ByteBuf actualEncodedBuffer = encodedBuffer.get();
@@ -95,7 +96,7 @@ public class WireFrameEncoderTest {
final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
// when
- final Try<ByteBuf> encodedBuffer = encoder.encode(buffer);
+ final Try<ByteBuf> encodedBuffer = encoder.encode(buffer, PayloadType.PROTOBUF);
// then
assertThat(encodedBuffer.isSuccess()).isTrue();