aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java')
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java131
1 files changed, 74 insertions, 57 deletions
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
index ba118c0..1abef1f 100644
--- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
@@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.nio.file.Paths;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
-import org.awaitility.Durations;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-
+import org.testcontainers.shaded.org.awaitility.Durations;
+import org.testcontainers.utility.DockerImageName;
@Testcontainers
@ExtendWith(MockitoExtension.class)
+@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';")
+@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT")
+@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN")
class ClientInitializerTest {
- private static final int SUCCESSFUL_STOP_MSG_INDEX = 2;
- private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3;
- private static final int SUCCESSFUL_INIT_MSG_INDEX = 0;
- private static final int SUCCESSFUL_DIST_MSG_INDEX = 3;
private static final int EXPECTED_HEAT_ARTIFACTS = 4;
+ private static final DistributionClientConfig clientConfig = new DistributionClientConfig();
private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class);
@Container
- public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest")
- .withNetworkMode("host");
+ CustomKafkaContainer kafka = buildBrokerInstance();
@Container
- public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest")
- .withNetworkMode("host");
- @Mock
- private Logger log;
+ public GenericContainer<?> mockSdc =
+ new GenericContainer<>(
+ "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop")
+ .withExposedPorts(30206);
@Mock
private Logger distClientLog;
private ClientInitializer clientInitializer;
private ClientNotifyCallback clientNotifyCallback;
@BeforeEach
- public void initializeClient() {
- DistributionClientConfig clientConfig = new DistributionClientConfig();
+ public void initializeClient() throws InterruptedException {
+ clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort());
List<ArtifactsValidator> validators = new ArrayList<>();
DistributionClientImpl client = new DistributionClientImpl(distClientLog);
clientNotifyCallback = new ClientNotifyCallback(validators, client);
clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client);
- }
-
- @Test
- void shouldRegisterToDmaapAfterClientInitialization() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
- clientInitializer.log = log;
- clientInitializer.initialize();
- verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly");
- assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly");
- }
-
- @Test
- void shouldUnregisterAndStopClient() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
clientInitializer.initialize();
- clientInitializer.stop();
- verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient");
- assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully");
+ Thread.sleep(1000);
+ setUpTopicsAndSendData();
}
@Test
- void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException {
-
- //given
- HttpRequest request = HttpRequest.newBuilder()
- .uri(URI.create("http://localhost:3904/events/testName/add"))
- .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json")))
- .build();
- HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
- //when
- clientInitializer.initialize();
+ void shouldDownloadArtifactsWithArtifactTypeHeat() {
waitForArtifacts();
List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts();
- //then
Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size());
+ clientInitializer.stop();
}
private void waitForArtifacts() {
@@ -132,4 +117,36 @@ class ClientInitializerTest {
.atMost(Durations.ONE_MINUTE)
.until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty());
}
+
+ private CustomKafkaContainer buildBrokerInstance() {
+ final Map<String, String> env = new LinkedHashMap<>();
+ env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
+ env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN");
+ env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"admin\" " +
+ "password=\"admin-secret\" " +
+ "user_admin=\"admin-secret\" " +
+ "user_producer=\"producer-secret\" " +
+ "user_consumer=\"consumer-secret\";");
+
+ return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withEmbeddedZookeeper()
+ .withStartupAttempts(1)
+ .withEnv(env)
+ .withFixedExposedPort(43219, 9093);
+ }
+
+ @SneakyThrows
+ private void setUpTopicsAndSendData() {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+ String content = Files.readString(Path.of("src/test/resources/artifacts.json"));
+ producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get();
+ }
}