diff options
Diffstat (limited to 'sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java')
-rw-r--r-- | sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java | 131 |
1 files changed, 74 insertions, 57 deletions
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java index ba118c0..1abef1f 100644 --- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java @@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.file.Paths; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; -import org.awaitility.Durations; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; - +import org.testcontainers.shaded.org.awaitility.Durations; +import org.testcontainers.utility.DockerImageName; @Testcontainers @ExtendWith(MockitoExtension.class) +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';") +@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT") +@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN") class ClientInitializerTest { - private static final int SUCCESSFUL_STOP_MSG_INDEX = 2; - private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3; - private static final int SUCCESSFUL_INIT_MSG_INDEX = 0; - private static final int SUCCESSFUL_DIST_MSG_INDEX = 3; private static final int EXPECTED_HEAT_ARTIFACTS = 4; + private static final DistributionClientConfig clientConfig = new DistributionClientConfig(); private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class); @Container - public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest") - .withNetworkMode("host"); + CustomKafkaContainer kafka = buildBrokerInstance(); @Container - public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest") - .withNetworkMode("host"); - @Mock - private Logger log; + public GenericContainer<?> mockSdc = + new GenericContainer<>( + "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop") + .withExposedPorts(30206); @Mock private Logger distClientLog; private ClientInitializer clientInitializer; private ClientNotifyCallback clientNotifyCallback; @BeforeEach - public void initializeClient() { - DistributionClientConfig clientConfig = new DistributionClientConfig(); + public void initializeClient() throws InterruptedException { + clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort()); List<ArtifactsValidator> validators = new ArrayList<>(); DistributionClientImpl client = new DistributionClientImpl(distClientLog); clientNotifyCallback = new ClientNotifyCallback(validators, client); clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client); - } - - @Test - void shouldRegisterToDmaapAfterClientInitialization() { - //given - final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when - clientInitializer.log = log; - clientInitializer.initialize(); - verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List<String> allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly"); - assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly"); - } - - @Test - void shouldUnregisterAndStopClient() { - //given - final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when clientInitializer.initialize(); - clientInitializer.stop(); - verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List<String> allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient"); - assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully"); + Thread.sleep(1000); + setUpTopicsAndSendData(); } @Test - void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException { - - //given - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:3904/events/testName/add")) - .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json"))) - .build(); - HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); - //when - clientInitializer.initialize(); + void shouldDownloadArtifactsWithArtifactTypeHeat() { waitForArtifacts(); List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts(); - //then Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size()); + clientInitializer.stop(); } private void waitForArtifacts() { @@ -132,4 +117,36 @@ class ClientInitializerTest { .atMost(Durations.ONE_MINUTE) .until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty()); } + + private CustomKafkaContainer buildBrokerInstance() { + final Map<String, String> env = new LinkedHashMap<>(); + env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT"); + env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN"); + env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"admin-secret\" " + + "user_admin=\"admin-secret\" " + + "user_producer=\"producer-secret\" " + + "user_consumer=\"consumer-secret\";"); + + return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withEmbeddedZookeeper() + .withStartupAttempts(1) + .withEnv(env) + .withFixedExposedPort(43219, 9093); + } + + @SneakyThrows + private void setUpTopicsAndSendData() { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer<String, String> producer = new KafkaProducer<>(props); + String content = Files.readString(Path.of("src/test/resources/artifacts.json")); + producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get(); + } } |