diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-02-07 11:38:09 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-02-07 11:38:09 +0000 |
commit | a334dcdb591729addd1d5a091e9474a7e76c2f6f (patch) | |
tree | d994918ee3bc0531475de22e2b980f5aa7833958 | |
parent | 6b50f21a75f76ebad011188c42b6406d7c097537 (diff) | |
parent | f3ccafac1af5be273bc0cffb97f1416c57f04599 (diff) |
Merge "Simple OK case for HV-VES"
3 files changed, 81 insertions, 13 deletions
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java index 65088702..46aeacc6 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/DummyCollector.java @@ -82,6 +82,7 @@ public class DummyCollector { private Publisher<Void> handleConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound) { nettyInbound.receive() .aggregate() + .retain() .log() .doOnNext(this::collect) .subscribe(); diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java index 213e9766..746aae72 100644 --- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java @@ -19,22 +19,29 @@ */ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.PayloadType; +import org.onap.ves.MeasDataCollectionOuterClass; import org.onap.ves.VesEventOuterClass.CommonEventHeader; import org.onap.ves.VesEventOuterClass.VesEvent; import reactor.core.publisher.Flux; +import static org.assertj.core.api.Assertions.assertThat; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> */ class HvVesProducerIT { + private static final int INFO_ID = 17; + private static final long VALUE = 5l; + private static final int MEAS_TYPE = 3; + private static final int PERIOD = 1000; + private static final String OBJECT_INSTANCE_ID = "DH-1"; + private final SystemUnderTestWrapper sut = new SystemUnderTestWrapper(); @BeforeEach @@ -48,23 +55,40 @@ class HvVesProducerIT { } @Test - void todo() { + void singleMessageTest() throws Exception { // given - final VesEvent sampleEvent = VesEvent.newBuilder() - .setCommonEventHeader(CommonEventHeader.newBuilder() - .setDomain("dummy") - .build()) - .setEventFields(ByteString.copyFrom(new byte[]{0, 1, 2, 3})) - .build(); + final VesEvent sampleEvent = createSimpleVesEvent(); final Flux<VesEvent> input = Flux.just(sampleEvent); // when final ByteBuf receivedData = sut.blockingSend(input); // then - assertThat(receivedData.readableBytes()) - .describedAs("data length") - .isGreaterThan(0); + WireProtocolDecoder decoded = WireProtocolDecoder.decode(receivedData); + assertThat(decoded.type).isEqualTo(PayloadType.PROTOBUF.getPayloadTypeBytes().getShort()); + assertThat(decoded.event).isEqualTo(sampleEvent); + } + + private VesEvent createSimpleVesEvent() { + final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection.newBuilder() + .addMeasInfo(MeasDataCollectionOuterClass.MeasInfo.newBuilder() + .addMeasValues(MeasDataCollectionOuterClass.MeasValue.newBuilder() + .addMeasResults(MeasDataCollectionOuterClass.MeasResult.newBuilder() + .setIValue(VALUE).build()) + .build()) + .setIMeasInfoId(INFO_ID) + .setIMeasTypes(MeasDataCollectionOuterClass.MeasInfo.IMeasTypes.newBuilder() + .addIMeasType(MEAS_TYPE)) + .build()) + .setGranularityPeriod(PERIOD) + .addMeasObjInstIdList(OBJECT_INSTANCE_ID) + .build(); + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setDomain("RTPM") + .build()) + .setEventFields(content.toByteString()) + .build(); } } diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/WireProtocolDecoder.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/WireProtocolDecoder.java new file mode 100644 index 00000000..fb00a348 --- /dev/null +++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/WireProtocolDecoder.java @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * DCAEGEN2-SERVICES-SDK + * ================================================================================ + * Copyright (C) 2019 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.buffer.ByteBuf; +import org.onap.ves.VesEventOuterClass; + +public class WireProtocolDecoder { + byte[] wireProtocolHeader; + short type; + int payloadSize; + VesEventOuterClass.VesEvent event; + + public static WireProtocolDecoder decode(ByteBuf message) throws InvalidProtocolBufferException { + WireProtocolDecoder wpu = new WireProtocolDecoder(); + wpu.wireProtocolHeader = new byte[6]; + message.readBytes(wpu.wireProtocolHeader); + wpu.type = message.readShort(); + wpu.payloadSize = message.readInt(); + byte[] payload = new byte[wpu.payloadSize]; + message.readBytes(payload); + wpu.event = VesEventOuterClass.VesEvent.parseFrom(payload); + return wpu; + } +} |