aboutsummaryrefslogtreecommitdiffstats
path: root/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java
blob: d71e562b64afda56593bf7253be7d25c000c7a02 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package org.onap.so.client.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(SystemStubsExtension.class)
public class KafkaConsumerImplTest {
    private KafkaConsumerImpl consumer;
    private static MockConsumer<String, String> mockConsumer;
    @SystemStub
    EnvironmentVariables environmentVariables = new EnvironmentVariables();

    @Before
    public void setup() {
        environmentVariables.set("JAAS_CONFIG", "jaas.config");
        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        configureMockConsumer();
    }

    @Test
    public void consumerShouldConsumeMessages() throws Exception {
        consumer = new KafkaConsumerImpl("localhost:9092");
        consumer.setConsumer(mockConsumer);
        List<String> response = consumer.get("TOPIC", "CG1", "C1");
        assertThat(response).contains("I", "like", "pizza");
    }

    private void configureMockConsumer() {
        mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0)));

        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L);
        mockConsumer.updateBeginningOffsets(beginningOffsets);
        mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I"));
        mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like"));
        mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza"));

    }
}