summaryrefslogtreecommitdiffstats
path: root/sdc-distribution-ci/src/test/java
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2022-06-16 09:38:26 +0100
committerefiacor <fiachra.corcoran@est.tech>2022-10-10 17:40:51 +0100
commitcff56489f774f937654cb6eb198d3d5ef41418a2 (patch)
tree3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-ci/src/test/java
parent1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff)
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details Add kafka config to IConfiguration interface Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-ci/src/test/java')
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java131
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java94
2 files changed, 168 insertions, 57 deletions
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
index ba118c0..1abef1f 100644
--- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
@@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.nio.file.Paths;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
-import org.awaitility.Durations;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-
+import org.testcontainers.shaded.org.awaitility.Durations;
+import org.testcontainers.utility.DockerImageName;
@Testcontainers
@ExtendWith(MockitoExtension.class)
+@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';")
+@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT")
+@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN")
class ClientInitializerTest {
- private static final int SUCCESSFUL_STOP_MSG_INDEX = 2;
- private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3;
- private static final int SUCCESSFUL_INIT_MSG_INDEX = 0;
- private static final int SUCCESSFUL_DIST_MSG_INDEX = 3;
private static final int EXPECTED_HEAT_ARTIFACTS = 4;
+ private static final DistributionClientConfig clientConfig = new DistributionClientConfig();
private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class);
@Container
- public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest")
- .withNetworkMode("host");
+ CustomKafkaContainer kafka = buildBrokerInstance();
@Container
- public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest")
- .withNetworkMode("host");
- @Mock
- private Logger log;
+ public GenericContainer<?> mockSdc =
+ new GenericContainer<>(
+ "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop")
+ .withExposedPorts(30206);
@Mock
private Logger distClientLog;
private ClientInitializer clientInitializer;
private ClientNotifyCallback clientNotifyCallback;
@BeforeEach
- public void initializeClient() {
- DistributionClientConfig clientConfig = new DistributionClientConfig();
+ public void initializeClient() throws InterruptedException {
+ clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort());
List<ArtifactsValidator> validators = new ArrayList<>();
DistributionClientImpl client = new DistributionClientImpl(distClientLog);
clientNotifyCallback = new ClientNotifyCallback(validators, client);
clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client);
- }
-
- @Test
- void shouldRegisterToDmaapAfterClientInitialization() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
- clientInitializer.log = log;
- clientInitializer.initialize();
- verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly");
- assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly");
- }
-
- @Test
- void shouldUnregisterAndStopClient() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
clientInitializer.initialize();
- clientInitializer.stop();
- verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient");
- assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully");
+ Thread.sleep(1000);
+ setUpTopicsAndSendData();
}
@Test
- void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException {
-
- //given
- HttpRequest request = HttpRequest.newBuilder()
- .uri(URI.create("http://localhost:3904/events/testName/add"))
- .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json")))
- .build();
- HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
- //when
- clientInitializer.initialize();
+ void shouldDownloadArtifactsWithArtifactTypeHeat() {
waitForArtifacts();
List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts();
- //then
Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size());
+ clientInitializer.stop();
}
private void waitForArtifacts() {
@@ -132,4 +117,36 @@ class ClientInitializerTest {
.atMost(Durations.ONE_MINUTE)
.until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty());
}
+
+ private CustomKafkaContainer buildBrokerInstance() {
+ final Map<String, String> env = new LinkedHashMap<>();
+ env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
+ env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN");
+ env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"admin\" " +
+ "password=\"admin-secret\" " +
+ "user_admin=\"admin-secret\" " +
+ "user_producer=\"producer-secret\" " +
+ "user_consumer=\"consumer-secret\";");
+
+ return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withEmbeddedZookeeper()
+ .withStartupAttempts(1)
+ .withEnv(env)
+ .withFixedExposedPort(43219, 9093);
+ }
+
+ @SneakyThrows
+ private void setUpTopicsAndSendData() {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+ String content = Files.readString(Path.of("src/test/resources/artifacts.json"));
+ producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get();
+ }
}
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java
new file mode 100644
index 0000000..e2eabc1
--- /dev/null
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.test.core.service;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class CustomKafkaContainer extends FixedHostPortGenericContainer<CustomKafkaContainer> {
+ protected String externalZookeeperConnect;
+
+ public CustomKafkaContainer(DockerImageName dockerImageName) {
+ super(String.valueOf(dockerImageName));
+ this.externalZookeeperConnect = null;
+ this.withExposedPorts(9093);
+ this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092");
+ this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+ this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+ this.withEnv("KAFKA_BROKER_ID", "1");
+ this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
+ this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
+ this.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
+ this.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
+ this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807");
+ this.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+ }
+
+ public CustomKafkaContainer withEmbeddedZookeeper() {
+ this.externalZookeeperConnect = null;
+ return this.self();
+ }
+
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093));
+ }
+
+ protected void configure() {
+ this.withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("BROKER://%s:9092", this.getNetwork() != null ? this.getNetworkAliases().get(0) : "localhost"));
+ String command = "";
+ if (this.externalZookeeperConnect != null) {
+ this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect);
+ } else {
+ this.addExposedPort(2181);
+ this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
+ command = command + "echo 'clientPort=2181' > zookeeper.properties\n";
+ command = command + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+ command = command + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+ command = command + "zookeeper-server-start zookeeper.properties &\n";
+ }
+
+ command = command + "echo '' > /etc/confluent/docker/ensure \n";
+ command = command + "/etc/confluent/docker/run \n";
+ this.withCommand("sh", "-c", command);
+ }
+
+ @SneakyThrows
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ try {
+ String brokerAdvertisedListener = this.brokerAdvertisedListener(containerInfo);
+ ExecResult result = this.execInContainer("kafka-configs", "--alter", "--bootstrap-server",
+ brokerAdvertisedListener, "--entity-type", "brokers", "--entity-name",
+ this.getEnvMap().get("KAFKA_BROKER_ID"), "--add-config",
+ "advertised.listeners=[" + String.join(",", this.getBootstrapServers(), brokerAdvertisedListener) + "]");
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(result.toString());
+ }
+ } catch (Throwable var4) {
+ throw var4;
+ }
+ }
+
+ protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+ return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+ }
+}