summaryrefslogtreecommitdiffstats
path: root/sdc-distribution-ci
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2022-06-16 09:38:26 +0100
committerefiacor <fiachra.corcoran@est.tech>2022-10-10 17:40:51 +0100
commitcff56489f774f937654cb6eb198d3d5ef41418a2 (patch)
tree3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-ci
parent1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff)
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details Add kafka config to IConfiguration interface Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-ci')
-rw-r--r--sdc-distribution-ci/etc/sdc-client.jks (renamed from sdc-distribution-ci/etc/asdc-client.jks)bin1177 -> 1177 bytes
-rw-r--r--sdc-distribution-ci/etc/sdcclientstore.jks (renamed from sdc-distribution-ci/etc/asdcclientstore.jks)bin907 -> 907 bytes
-rw-r--r--sdc-distribution-ci/pom.xml22
-rw-r--r--sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java75
-rw-r--r--sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java6
-rw-r--r--sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java (renamed from sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java)19
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java131
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java94
-rw-r--r--sdc-distribution-ci/src/test/resources/logback-test.xml14
9 files changed, 241 insertions, 120 deletions
diff --git a/sdc-distribution-ci/etc/asdc-client.jks b/sdc-distribution-ci/etc/sdc-client.jks
index eb0a0d3..eb0a0d3 100644
--- a/sdc-distribution-ci/etc/asdc-client.jks
+++ b/sdc-distribution-ci/etc/sdc-client.jks
Binary files differ
diff --git a/sdc-distribution-ci/etc/asdcclientstore.jks b/sdc-distribution-ci/etc/sdcclientstore.jks
index 5dc006d..5dc006d 100644
--- a/sdc-distribution-ci/etc/asdcclientstore.jks
+++ b/sdc-distribution-ci/etc/sdcclientstore.jks
Binary files differ
diff --git a/sdc-distribution-ci/pom.xml b/sdc-distribution-ci/pom.xml
index e11d428..707de83 100644
--- a/sdc-distribution-ci/pom.xml
+++ b/sdc-distribution-ci/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>1.4.5-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-ci</artifactId>
@@ -25,6 +25,13 @@
</properties>
<dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-distribution-client</artifactId>
@@ -136,6 +143,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.8.2</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito-core.version}</version>
@@ -181,6 +193,12 @@
<version>${httpclient.version}</version>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit-pioneer</groupId>
+ <artifactId>junit-pioneer</artifactId>
+ <version>1.4.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -195,7 +213,7 @@
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib</classpathPrefix>
- <mainClass>org.onap.test.it.RegisterToAsdcTopicIT</mainClass>
+ <mainClass>org.onap.test.it.RegisterToSdcTopicIT</mainClass>
</manifest>
<manifestEntries>
<Class-Path>lib/</Class-Path>
diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java
index ad18969..5a902c8 100644
--- a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java
+++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java
@@ -26,21 +26,23 @@ import java.util.List;
public class DistributionClientConfig implements IConfiguration {
- public static final String DEFAULT_ASDC_ADDRESS = "localhost:30206";
+ public static final String DEFAULT_SDC_ADDRESS = "localhost:30206";
public static final String DEFAULT_COMSUMER_ID = "dcae-openapi-manager";
public static final String DEFAULT_CONSUMER_GROUP = "noapp";
public static final String DEFAULT_ENVIRONMENT_NAME = "AUTO";
public static final String DEFAULT_PASSWORD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U";
public static final int DEFAULT_POLLING_INTERVAL = 20;
public static final int DEFAULT_POLLING_TIMEOUT = 20;
+ public static final String DEFAULT_STATUS_TOPIC = "STATUS-TOPIC";
+ public static final String DEFAULT_NOTIF_TOPIC = "NOTIF-TOPIC";
public static final String DEFAULT_USER = "dcae";
- public static final String DEFAULT_KEY_STORE_PATH = "etc/asdc-client.jks";
+ public static final String DEFAULT_KEY_STORE_PATH = "etc/sdc-client.jks";
public static final String DEFAULT_KEY_STORE_PASSWORD = "Aa123456";
public static final boolean DEFAULT_ACTIVATE_SERVER_TLS_AUTH = false;
public static final boolean DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES = true;
public static final boolean DEFAULT_USE_HTTPS_WITH_SDC = false;
- public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost";
- private String asdcAddress;
+ public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost:9092";
+ private String sdcAddress;
private String user;
private String password;
private int pollingInterval;
@@ -53,38 +55,23 @@ public class DistributionClientConfig implements IConfiguration {
private String keyStorePassword;
private boolean activateServerTLSAuth;
private boolean isFilterInEmptyResources;
- private boolean useHttpsWithDmaap;
private boolean useHttpsWithSDC;
private List<String> msgBusAddress;
+ private String sdcStatusTopicName;
+ private String sdcNotificationTopicName;
+ private String kafkaSecurityProtocolConfig;
+ private String kafkaSaslMechanism;
+ private String kafkaSaslJaasConfig;
private String httpProxyHost;
private int httpProxyPort;
private String httpsProxyHost;
private int httpsProxyPort;
private boolean useSystemProxy;
- public DistributionClientConfig(IConfiguration other) {
- this.asdcAddress = other.getAsdcAddress();
- this.comsumerID = other.getConsumerID();
- this.consumerGroup = other.getConsumerGroup();
- this.environmentName = other.getEnvironmentName();
- this.password = other.getPassword();
- this.pollingInterval = other.getPollingInterval();
- this.pollingTimeout = other.getPollingTimeout();
- this.relevantArtifactTypes = other.getRelevantArtifactTypes();
- this.user = other.getUser();
- this.keyStorePath = other.getKeyStorePath();
- this.keyStorePassword = other.getKeyStorePassword();
- this.activateServerTLSAuth = other.activateServerTLSAuth();
- this.isFilterInEmptyResources = other.isFilterInEmptyResources();
- this.httpProxyHost = other.getHttpProxyHost();
- this.httpProxyPort = other.getHttpProxyPort();
- this.httpsProxyHost = other.getHttpsProxyHost();
- this.httpsProxyPort = other.getHttpsProxyPort();
- this.useSystemProxy = other.isUseSystemProxy();
- }
-
public DistributionClientConfig() {
- this.asdcAddress = DEFAULT_ASDC_ADDRESS;
+ this.sdcAddress = DEFAULT_SDC_ADDRESS;
+ this.sdcStatusTopicName = DEFAULT_STATUS_TOPIC;
+ this.sdcNotificationTopicName = DEFAULT_NOTIF_TOPIC;
this.comsumerID = DEFAULT_COMSUMER_ID;
this.consumerGroup = DEFAULT_CONSUMER_GROUP;
this.environmentName = DEFAULT_ENVIRONMENT_NAME;
@@ -99,22 +86,23 @@ public class DistributionClientConfig implements IConfiguration {
this.activateServerTLSAuth = DEFAULT_ACTIVATE_SERVER_TLS_AUTH;
this.isFilterInEmptyResources = DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES;
this.useHttpsWithSDC = DEFAULT_USE_HTTPS_WITH_SDC;
- msgBusAddress = new ArrayList<>();
- msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
- msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
- msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
+ this.msgBusAddress = new ArrayList<>();
+ this.msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
}
@Override
- public String getAsdcAddress() {
- return asdcAddress;
+ public String getSdcAddress() {
+ return sdcAddress;
}
- @Override
public List<String> getMsgBusAddress() {
return msgBusAddress;
}
+ public void setMsgBusAddress(List<String> msgBusAddress) {
+ this.msgBusAddress = msgBusAddress;
+ }
+
@Override
public String getUser() {
return user;
@@ -173,8 +161,8 @@ public class DistributionClientConfig implements IConfiguration {
this.comsumerID = comsumerID;
}
- public void setAsdcAddress(String asdcAddress) {
- this.asdcAddress = asdcAddress;
+ public void setSdcAddress(String sdcAddress) {
+ this.sdcAddress = sdcAddress;
}
public void setUser(String user) {
@@ -217,7 +205,7 @@ public class DistributionClientConfig implements IConfiguration {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((asdcAddress == null) ? 0 : asdcAddress.hashCode());
+ result = prime * result + ((sdcAddress == null) ? 0 : sdcAddress.hashCode());
result = prime * result + ((comsumerID == null) ? 0 : comsumerID.hashCode());
result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
result = prime * result + ((environmentName == null) ? 0 : environmentName.hashCode());
@@ -251,11 +239,11 @@ public class DistributionClientConfig implements IConfiguration {
return false;
}
DistributionClientConfig other = (DistributionClientConfig) obj;
- if (asdcAddress == null) {
- if (other.asdcAddress != null) {
+ if (sdcAddress == null) {
+ if (other.sdcAddress != null) {
return false;
}
- } else if (!asdcAddress.equals(other.asdcAddress)) {
+ } else if (!sdcAddress.equals(other.sdcAddress)) {
return false;
}
if (comsumerID == null) {
@@ -322,7 +310,7 @@ public class DistributionClientConfig implements IConfiguration {
@Override
public String toString() {
- return "TestConfiguration [asdcAddress=" + asdcAddress + ", user=" + user + ", password=" + password
+ return "TestConfiguration [sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password
+ ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout
+ ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup
+ ", environmentName=" + environmentName + ", comsumerID=" + comsumerID + "]";
@@ -337,11 +325,6 @@ public class DistributionClientConfig implements IConfiguration {
this.isFilterInEmptyResources = isFilterInEmptyResources;
}
- @Override
- public Boolean isUseHttpsWithDmaap() {
- return this.useHttpsWithDmaap;
- }
-
public Boolean isUseHttpsWithSDC() {
return this.useHttpsWithSDC;
}
diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java
index 4dfe388..4cee4cf 100644
--- a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java
+++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java
@@ -23,7 +23,7 @@ import org.onap.sdc.api.consumer.IDistributionStatusMessage;
import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.api.notification.IResourceInstance;
-import org.onap.sdc.http.HttpAsdcClient;
+import org.onap.sdc.http.HttpSdcClient;
import org.onap.sdc.http.SdcConnectorClient;
import org.onap.sdc.impl.DistributionClientDownloadResultImpl;
import org.onap.sdc.impl.DistributionClientImpl;
@@ -46,8 +46,8 @@ public class ClientNotifyCallback implements INotificationCallback {
private final DistributionClientImpl distributionClient;
private final List<DistributionClientDownloadResultImpl> pulledArtifacts = new ArrayList<>();
DistributionClientConfig config = new DistributionClientConfig();
- HttpAsdcClient asdcClient = new HttpAsdcClient(config);
- SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, asdcClient);
+ HttpSdcClient sdcClient = new HttpSdcClient(config);
+ SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, sdcClient);
ArtifactsDownloader artifactsDownloader = new ArtifactsDownloader("/app/path", sdcConnectorClient);
public List<DistributionClientDownloadResultImpl> getPulledArtifacts() {
diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java
index 58baec7..c89ecd4 100644
--- a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java
+++ b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java
@@ -19,18 +19,15 @@
*/
package org.onap.test.it;
+import java.util.ArrayList;
+import java.util.List;
import org.onap.sdc.impl.DistributionClientImpl;
import org.onap.test.core.config.DistributionClientConfig;
import org.onap.test.core.service.ArtifactsValidator;
import org.onap.test.core.service.ClientInitializer;
import org.onap.test.core.service.ClientNotifyCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-public class RegisterToAsdcTopicIT {
+public class RegisterToSdcTopicIT {
public static void main(String[] args) {
@@ -40,12 +37,10 @@ public class RegisterToAsdcTopicIT {
ClientNotifyCallback callback = new ClientNotifyCallback(validators, client);
ClientInitializer clientInitializer = new ClientInitializer(clientConfig, callback, client);
clientInitializer.initialize();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- client.stop();
- System.out.println("Shutdown Hook is running !");
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ client.stop();
+ System.out.println("Shutdown Hook is running !");
+ }));
}
}
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
index ba118c0..1abef1f 100644
--- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
@@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.nio.file.Paths;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
-import org.awaitility.Durations;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-
+import org.testcontainers.shaded.org.awaitility.Durations;
+import org.testcontainers.utility.DockerImageName;
@Testcontainers
@ExtendWith(MockitoExtension.class)
+@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';")
+@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT")
+@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN")
class ClientInitializerTest {
- private static final int SUCCESSFUL_STOP_MSG_INDEX = 2;
- private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3;
- private static final int SUCCESSFUL_INIT_MSG_INDEX = 0;
- private static final int SUCCESSFUL_DIST_MSG_INDEX = 3;
private static final int EXPECTED_HEAT_ARTIFACTS = 4;
+ private static final DistributionClientConfig clientConfig = new DistributionClientConfig();
private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class);
@Container
- public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest")
- .withNetworkMode("host");
+ CustomKafkaContainer kafka = buildBrokerInstance();
@Container
- public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest")
- .withNetworkMode("host");
- @Mock
- private Logger log;
+ public GenericContainer<?> mockSdc =
+ new GenericContainer<>(
+ "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop")
+ .withExposedPorts(30206);
@Mock
private Logger distClientLog;
private ClientInitializer clientInitializer;
private ClientNotifyCallback clientNotifyCallback;
@BeforeEach
- public void initializeClient() {
- DistributionClientConfig clientConfig = new DistributionClientConfig();
+ public void initializeClient() throws InterruptedException {
+ clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort());
List<ArtifactsValidator> validators = new ArrayList<>();
DistributionClientImpl client = new DistributionClientImpl(distClientLog);
clientNotifyCallback = new ClientNotifyCallback(validators, client);
clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client);
- }
-
- @Test
- void shouldRegisterToDmaapAfterClientInitialization() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
- clientInitializer.log = log;
- clientInitializer.initialize();
- verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly");
- assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly");
- }
-
- @Test
- void shouldUnregisterAndStopClient() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
clientInitializer.initialize();
- clientInitializer.stop();
- verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient");
- assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully");
+ Thread.sleep(1000);
+ setUpTopicsAndSendData();
}
@Test
- void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException {
-
- //given
- HttpRequest request = HttpRequest.newBuilder()
- .uri(URI.create("http://localhost:3904/events/testName/add"))
- .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json")))
- .build();
- HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
- //when
- clientInitializer.initialize();
+ void shouldDownloadArtifactsWithArtifactTypeHeat() {
waitForArtifacts();
List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts();
- //then
Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size());
+ clientInitializer.stop();
}
private void waitForArtifacts() {
@@ -132,4 +117,36 @@ class ClientInitializerTest {
.atMost(Durations.ONE_MINUTE)
.until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty());
}
+
+ private CustomKafkaContainer buildBrokerInstance() {
+ final Map<String, String> env = new LinkedHashMap<>();
+ env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
+ env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN");
+ env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"admin\" " +
+ "password=\"admin-secret\" " +
+ "user_admin=\"admin-secret\" " +
+ "user_producer=\"producer-secret\" " +
+ "user_consumer=\"consumer-secret\";");
+
+ return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withEmbeddedZookeeper()
+ .withStartupAttempts(1)
+ .withEnv(env)
+ .withFixedExposedPort(43219, 9093);
+ }
+
+ @SneakyThrows
+ private void setUpTopicsAndSendData() {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+ String content = Files.readString(Path.of("src/test/resources/artifacts.json"));
+ producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get();
+ }
}
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java
new file mode 100644
index 0000000..e2eabc1
--- /dev/null
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.test.core.service;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class CustomKafkaContainer extends FixedHostPortGenericContainer<CustomKafkaContainer> {
+ protected String externalZookeeperConnect;
+
+ public CustomKafkaContainer(DockerImageName dockerImageName) {
+ super(String.valueOf(dockerImageName));
+ this.externalZookeeperConnect = null;
+ this.withExposedPorts(9093);
+ this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092");
+ this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+ this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+ this.withEnv("KAFKA_BROKER_ID", "1");
+ this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
+ this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
+ this.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
+ this.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
+ this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807");
+ this.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+ }
+
+ public CustomKafkaContainer withEmbeddedZookeeper() {
+ this.externalZookeeperConnect = null;
+ return this.self();
+ }
+
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093));
+ }
+
+ protected void configure() {
+ this.withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("BROKER://%s:9092", this.getNetwork() != null ? this.getNetworkAliases().get(0) : "localhost"));
+ String command = "";
+ if (this.externalZookeeperConnect != null) {
+ this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect);
+ } else {
+ this.addExposedPort(2181);
+ this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
+ command = command + "echo 'clientPort=2181' > zookeeper.properties\n";
+ command = command + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+ command = command + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+ command = command + "zookeeper-server-start zookeeper.properties &\n";
+ }
+
+ command = command + "echo '' > /etc/confluent/docker/ensure \n";
+ command = command + "/etc/confluent/docker/run \n";
+ this.withCommand("sh", "-c", command);
+ }
+
+ @SneakyThrows
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ try {
+ String brokerAdvertisedListener = this.brokerAdvertisedListener(containerInfo);
+ ExecResult result = this.execInContainer("kafka-configs", "--alter", "--bootstrap-server",
+ brokerAdvertisedListener, "--entity-type", "brokers", "--entity-name",
+ this.getEnvMap().get("KAFKA_BROKER_ID"), "--add-config",
+ "advertised.listeners=[" + String.join(",", this.getBootstrapServers(), brokerAdvertisedListener) + "]");
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(result.toString());
+ }
+ } catch (Throwable var4) {
+ throw var4;
+ }
+ }
+
+ protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+ return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+ }
+}
diff --git a/sdc-distribution-ci/src/test/resources/logback-test.xml b/sdc-distribution-ci/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..78c5b05
--- /dev/null
+++ b/sdc-distribution-ci/src/test/resources/logback-test.xml
@@ -0,0 +1,14 @@
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.testcontainers" level="INFO"/>
+ <logger name="com.github.dockerjava" level="WARN"/>
+</configuration> \ No newline at end of file