aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java')
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java119
1 files changed, 89 insertions, 30 deletions
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
index c23a1886..f33ff43e 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
@@ -18,30 +18,35 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.services.prh.tasks;
+package org.onap.dcaegen2.services.prh.tasks.commit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.tasks.commit.EpochDateTimeConversion;
-import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
-import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.kafka.support.Acknowledgment;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
import reactor.core.publisher.Flux;
-
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-
-import static org.mockito.Mockito.when;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(MockitoExtension.class)
public class KafkaConsumerTaskImplTest {
@@ -49,38 +54,92 @@ public class KafkaConsumerTaskImplTest {
@Mock
private Acknowledgment acknowledgment;
- @Mock
+ private KafkaConsumerTaskImpl kafkaConsumerTask;
+
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- @Mock
private EpochDateTimeConversion epochDateTimeConversion;
- @InjectMocks
- private KafkaConsumerTaskImpl kafkaConsumerTask;
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled;
+
+ private JsonObject cbsConfigJsonForAutoCommitDisabled;
+
+ @BeforeEach
+ void beforeEach() throws JsonSyntaxException, IOException, URISyntaxException {
+ cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
+ dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ epochDateTimeConversion = new EpochDateTimeConversion();
+
+ }
@Test
- public void onMessageTest(){
- List<ConsumerRecord<String, String>> list = new ArrayList<>();
- TimestampType timestampType = null;
- Headers headers = new RecordHeaders();
- epochDateTimeConversion.setDaysForRecords("3");
- ConsumerRecord<String, String> records = new ConsumerRecord<>
- ("test-topic", 1, 1l, 0l, timestampType, 1, 1, "test-key", "test-value", headers
- , null);
- list.add(records);
- kafkaConsumerTask.onMessage(list, acknowledgment);
+ void beforeOnMessageTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ List<ConsumerRecord<String, String>> list = new ArrayList<>();
+ TimestampType timestampType = null;
+ Headers headers = new RecordHeaders();
+ epochDateTimeConversion.setDaysForRecords("3");
+ ConsumerRecord<String, String> records = new ConsumerRecord<>("test-topic", 1, 1l, 0l, timestampType, 1, 1,
+ "test-key", "test-value", headers, null);
+ list.add(records);
+ kafkaConsumerTask.onMessage(list, acknowledgment);
+ String actualTopicInList = list.get(0).topic();
+ String expectedTopicInList = "test-topic";
+ assertEquals(expectedTopicInList, actualTopicInList, "topic is not matching");
+ assertThat(kafkaConsumerTask.getOffset().equals(acknowledgment));
+ assertThat(kafkaConsumerTask.getJsonEvent().contains("test-topic"));
+ });
}
@Test
- public void commitOffsetTest(){
- kafkaConsumerTask.commitOffset();
+ void beforeCommitOffsetTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ kafkaConsumerTask.commitOffset();
+ });
}
@Test
- public void executeTest() throws JSONException {
- List<String> jsonEvent = new ArrayList<>();
- ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().correlationId("123").build();
- when(dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent)).thenReturn(Flux.just(consumerDmaapModel));
- kafkaConsumerTask.execute();
+ void beforeExecuteTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ String event = getResourceContent("integration/event.json");
+ java.util.List<String> eventList = new ArrayList<>();
+ eventList.add(event);
+ kafkaConsumerTask.setJsonEvent(eventList);
+ Flux<ConsumerDmaapModel> flux = kafkaConsumerTask.execute();
+ String expectedSourceName = "NOK6061ZW8";
+ String actualSourceName = flux.blockFirst().getCorrelationId();
+
+ String expectedOamV4IpAddress = "val3";
+ String actualOamV4IpAddress = flux.blockFirst().getIpv4();
+
+ String expectedOamV6IpAddress = "val4";
+ String actualOamV6IpAddress = flux.blockFirst().getIpv6();
+
+ assertEquals(expectedSourceName, actualSourceName, "SourceName is not matching");
+ assertEquals(expectedOamV4IpAddress, actualOamV4IpAddress, "OamV4IpAddress is not matching");
+ assertEquals(expectedOamV6IpAddress, actualOamV6IpAddress, "OamV6IpAddress is not matching");
+ });
+ }
+
+ private static String getResourceContent(String resourceName) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
+ }
}
+
}