diff options
author | efiacor <fiachra.corcoran@est.tech> | 2022-06-16 09:38:26 +0100 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2022-10-10 17:40:51 +0100 |
commit | cff56489f774f937654cb6eb198d3d5ef41418a2 (patch) | |
tree | 3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-ci/src/test/java | |
parent | 1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff) |
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details
Add kafka config to IConfiguration interface
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f
Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-ci/src/test/java')
-rw-r--r-- | sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java | 131 | ||||
-rw-r--r-- | sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java | 94 |
2 files changed, 168 insertions, 57 deletions
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java index ba118c0..1abef1f 100644 --- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java @@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.file.Paths; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; -import org.awaitility.Durations; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; - +import org.testcontainers.shaded.org.awaitility.Durations; +import org.testcontainers.utility.DockerImageName; @Testcontainers @ExtendWith(MockitoExtension.class) +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';") +@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT") +@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN") class ClientInitializerTest { - private static final int SUCCESSFUL_STOP_MSG_INDEX = 2; - private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3; - private static final int SUCCESSFUL_INIT_MSG_INDEX = 0; - private static final int SUCCESSFUL_DIST_MSG_INDEX = 3; private static final int EXPECTED_HEAT_ARTIFACTS = 4; + private static final DistributionClientConfig clientConfig = new DistributionClientConfig(); private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class); @Container - public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest") - .withNetworkMode("host"); + CustomKafkaContainer kafka = buildBrokerInstance(); @Container - public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest") - .withNetworkMode("host"); - @Mock - private Logger log; + public GenericContainer<?> mockSdc = + new GenericContainer<>( + "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop") + .withExposedPorts(30206); @Mock private Logger distClientLog; private ClientInitializer clientInitializer; private ClientNotifyCallback clientNotifyCallback; @BeforeEach - public void initializeClient() { - DistributionClientConfig clientConfig = new DistributionClientConfig(); + public void initializeClient() throws InterruptedException { + clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort()); List<ArtifactsValidator> validators = new ArrayList<>(); DistributionClientImpl client = new DistributionClientImpl(distClientLog); clientNotifyCallback = new ClientNotifyCallback(validators, client); clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client); - } - - @Test - void shouldRegisterToDmaapAfterClientInitialization() { - //given - final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when - clientInitializer.log = log; - clientInitializer.initialize(); - verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List<String> allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly"); - assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly"); - } - - @Test - void shouldUnregisterAndStopClient() { - //given - final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when clientInitializer.initialize(); - clientInitializer.stop(); - verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List<String> allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient"); - assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully"); + Thread.sleep(1000); + setUpTopicsAndSendData(); } @Test - void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException { - - //given - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:3904/events/testName/add")) - .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json"))) - .build(); - HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); - //when - clientInitializer.initialize(); + void shouldDownloadArtifactsWithArtifactTypeHeat() { waitForArtifacts(); List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts(); - //then Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size()); + clientInitializer.stop(); } private void waitForArtifacts() { @@ -132,4 +117,36 @@ class ClientInitializerTest { .atMost(Durations.ONE_MINUTE) .until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty()); } + + private CustomKafkaContainer buildBrokerInstance() { + final Map<String, String> env = new LinkedHashMap<>(); + env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT"); + env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN"); + env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"admin-secret\" " + + "user_admin=\"admin-secret\" " + + "user_producer=\"producer-secret\" " + + "user_consumer=\"consumer-secret\";"); + + return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withEmbeddedZookeeper() + .withStartupAttempts(1) + .withEnv(env) + .withFixedExposedPort(43219, 9093); + } + + @SneakyThrows + private void setUpTopicsAndSendData() { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer<String, String> producer = new KafkaProducer<>(props); + String content = Files.readString(Path.of("src/test/resources/artifacts.json")); + producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get(); + } } diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java new file mode 100644 index 0000000..e2eabc1 --- /dev/null +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.test.core.service; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class CustomKafkaContainer extends FixedHostPortGenericContainer<CustomKafkaContainer> { + protected String externalZookeeperConnect; + + public CustomKafkaContainer(DockerImageName dockerImageName) { + super(String.valueOf(dockerImageName)); + this.externalZookeeperConnect = null; + this.withExposedPorts(9093); + this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"); + this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + this.withEnv("KAFKA_BROKER_ID", "1"); + this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); + this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + this.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"); + this.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1"); + this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807"); + this.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public CustomKafkaContainer withEmbeddedZookeeper() { + this.externalZookeeperConnect = null; + return this.self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093)); + } + + protected void configure() { + this.withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("BROKER://%s:9092", this.getNetwork() != null ? this.getNetworkAliases().get(0) : "localhost")); + String command = ""; + if (this.externalZookeeperConnect != null) { + this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect); + } else { + this.addExposedPort(2181); + this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); + command = command + "echo 'clientPort=2181' > zookeeper.properties\n"; + command = command + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command = command + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command = command + "zookeeper-server-start zookeeper.properties &\n"; + } + + command = command + "echo '' > /etc/confluent/docker/ensure \n"; + command = command + "/etc/confluent/docker/run \n"; + this.withCommand("sh", "-c", command); + } + + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + try { + String brokerAdvertisedListener = this.brokerAdvertisedListener(containerInfo); + ExecResult result = this.execInContainer("kafka-configs", "--alter", "--bootstrap-server", + brokerAdvertisedListener, "--entity-type", "brokers", "--entity-name", + this.getEnvMap().get("KAFKA_BROKER_ID"), "--add-config", + "advertised.listeners=[" + String.join(",", this.getBootstrapServers(), brokerAdvertisedListener) + "]"); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } catch (Throwable var4) { + throw var4; + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } +} |