diff options
author | efiacor <fiachra.corcoran@est.tech> | 2022-06-16 09:38:26 +0100 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2022-10-10 17:40:51 +0100 |
commit | cff56489f774f937654cb6eb198d3d5ef41418a2 (patch) | |
tree | 3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-ci | |
parent | 1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff) |
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details
Add kafka config to IConfiguration interface
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f
Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-ci')
-rw-r--r-- | sdc-distribution-ci/etc/sdc-client.jks (renamed from sdc-distribution-ci/etc/asdc-client.jks) | bin | 1177 -> 1177 bytes | |||
-rw-r--r-- | sdc-distribution-ci/etc/sdcclientstore.jks (renamed from sdc-distribution-ci/etc/asdcclientstore.jks) | bin | 907 -> 907 bytes | |||
-rw-r--r-- | sdc-distribution-ci/pom.xml | 22 | ||||
-rw-r--r-- | sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java | 75 | ||||
-rw-r--r-- | sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java | 6 | ||||
-rw-r--r-- | sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java (renamed from sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java) | 19 | ||||
-rw-r--r-- | sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java | 131 | ||||
-rw-r--r-- | sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java | 94 | ||||
-rw-r--r-- | sdc-distribution-ci/src/test/resources/logback-test.xml | 14 |
9 files changed, 241 insertions, 120 deletions
diff --git a/sdc-distribution-ci/etc/asdc-client.jks b/sdc-distribution-ci/etc/sdc-client.jks Binary files differindex eb0a0d3..eb0a0d3 100644 --- a/sdc-distribution-ci/etc/asdc-client.jks +++ b/sdc-distribution-ci/etc/sdc-client.jks diff --git a/sdc-distribution-ci/etc/asdcclientstore.jks b/sdc-distribution-ci/etc/sdcclientstore.jks Binary files differindex 5dc006d..5dc006d 100644 --- a/sdc-distribution-ci/etc/asdcclientstore.jks +++ b/sdc-distribution-ci/etc/sdcclientstore.jks diff --git a/sdc-distribution-ci/pom.xml b/sdc-distribution-ci/pom.xml index e11d428..707de83 100644 --- a/sdc-distribution-ci/pom.xml +++ b/sdc-distribution-ci/pom.xml @@ -7,7 +7,7 @@ <parent> <groupId>org.onap.sdc.sdc-distribution-client</groupId> <artifactId>sdc-main-distribution-client</artifactId> - <version>1.4.5-SNAPSHOT</version> + <version>2.0.0-SNAPSHOT</version> </parent> <artifactId>sdc-distribution-ci</artifactId> @@ -25,6 +25,13 @@ </properties> <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.onap.sdc.sdc-distribution-client</groupId> <artifactId>sdc-distribution-client</artifactId> @@ -136,6 +143,11 @@ </exclusions> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>2.8.2</version> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <version>${mockito-core.version}</version> @@ -181,6 +193,12 @@ <version>${httpclient.version}</version> <scope>runtime</scope> </dependency> + <dependency> + <groupId>org.junit-pioneer</groupId> + <artifactId>junit-pioneer</artifactId> + <version>1.4.2</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -195,7 +213,7 @@ <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib</classpathPrefix> - <mainClass>org.onap.test.it.RegisterToAsdcTopicIT</mainClass> + <mainClass>org.onap.test.it.RegisterToSdcTopicIT</mainClass> </manifest> <manifestEntries> <Class-Path>lib/</Class-Path> diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java index ad18969..5a902c8 100644 --- a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java +++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java @@ -26,21 +26,23 @@ import java.util.List; public class DistributionClientConfig implements IConfiguration { - public static final String DEFAULT_ASDC_ADDRESS = "localhost:30206"; + public static final String DEFAULT_SDC_ADDRESS = "localhost:30206"; public static final String DEFAULT_COMSUMER_ID = "dcae-openapi-manager"; public static final String DEFAULT_CONSUMER_GROUP = "noapp"; public static final String DEFAULT_ENVIRONMENT_NAME = "AUTO"; public static final String DEFAULT_PASSWORD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"; public static final int DEFAULT_POLLING_INTERVAL = 20; public static final int DEFAULT_POLLING_TIMEOUT = 20; + public static final String DEFAULT_STATUS_TOPIC = "STATUS-TOPIC"; + public static final String DEFAULT_NOTIF_TOPIC = "NOTIF-TOPIC"; public static final String DEFAULT_USER = "dcae"; - public static final String DEFAULT_KEY_STORE_PATH = "etc/asdc-client.jks"; + public static final String DEFAULT_KEY_STORE_PATH = "etc/sdc-client.jks"; public static final String DEFAULT_KEY_STORE_PASSWORD = "Aa123456"; public static final boolean DEFAULT_ACTIVATE_SERVER_TLS_AUTH = false; public static final boolean DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES = true; public static final boolean DEFAULT_USE_HTTPS_WITH_SDC = false; - public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost"; - private String asdcAddress; + public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost:9092"; + private String sdcAddress; private String user; private String password; private int pollingInterval; @@ -53,38 +55,23 @@ public class DistributionClientConfig implements IConfiguration { private String keyStorePassword; private boolean activateServerTLSAuth; private boolean isFilterInEmptyResources; - private boolean useHttpsWithDmaap; private boolean useHttpsWithSDC; private List<String> msgBusAddress; + private String sdcStatusTopicName; + private String sdcNotificationTopicName; + private String kafkaSecurityProtocolConfig; + private String kafkaSaslMechanism; + private String kafkaSaslJaasConfig; private String httpProxyHost; private int httpProxyPort; private String httpsProxyHost; private int httpsProxyPort; private boolean useSystemProxy; - public DistributionClientConfig(IConfiguration other) { - this.asdcAddress = other.getAsdcAddress(); - this.comsumerID = other.getConsumerID(); - this.consumerGroup = other.getConsumerGroup(); - this.environmentName = other.getEnvironmentName(); - this.password = other.getPassword(); - this.pollingInterval = other.getPollingInterval(); - this.pollingTimeout = other.getPollingTimeout(); - this.relevantArtifactTypes = other.getRelevantArtifactTypes(); - this.user = other.getUser(); - this.keyStorePath = other.getKeyStorePath(); - this.keyStorePassword = other.getKeyStorePassword(); - this.activateServerTLSAuth = other.activateServerTLSAuth(); - this.isFilterInEmptyResources = other.isFilterInEmptyResources(); - this.httpProxyHost = other.getHttpProxyHost(); - this.httpProxyPort = other.getHttpProxyPort(); - this.httpsProxyHost = other.getHttpsProxyHost(); - this.httpsProxyPort = other.getHttpsProxyPort(); - this.useSystemProxy = other.isUseSystemProxy(); - } - public DistributionClientConfig() { - this.asdcAddress = DEFAULT_ASDC_ADDRESS; + this.sdcAddress = DEFAULT_SDC_ADDRESS; + this.sdcStatusTopicName = DEFAULT_STATUS_TOPIC; + this.sdcNotificationTopicName = DEFAULT_NOTIF_TOPIC; this.comsumerID = DEFAULT_COMSUMER_ID; this.consumerGroup = DEFAULT_CONSUMER_GROUP; this.environmentName = DEFAULT_ENVIRONMENT_NAME; @@ -99,22 +86,23 @@ public class DistributionClientConfig implements IConfiguration { this.activateServerTLSAuth = DEFAULT_ACTIVATE_SERVER_TLS_AUTH; this.isFilterInEmptyResources = DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES; this.useHttpsWithSDC = DEFAULT_USE_HTTPS_WITH_SDC; - msgBusAddress = new ArrayList<>(); - msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); - msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); - msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); + this.msgBusAddress = new ArrayList<>(); + this.msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); } @Override - public String getAsdcAddress() { - return asdcAddress; + public String getSdcAddress() { + return sdcAddress; } - @Override public List<String> getMsgBusAddress() { return msgBusAddress; } + public void setMsgBusAddress(List<String> msgBusAddress) { + this.msgBusAddress = msgBusAddress; + } + @Override public String getUser() { return user; @@ -173,8 +161,8 @@ public class DistributionClientConfig implements IConfiguration { this.comsumerID = comsumerID; } - public void setAsdcAddress(String asdcAddress) { - this.asdcAddress = asdcAddress; + public void setSdcAddress(String sdcAddress) { + this.sdcAddress = sdcAddress; } public void setUser(String user) { @@ -217,7 +205,7 @@ public class DistributionClientConfig implements IConfiguration { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((asdcAddress == null) ? 0 : asdcAddress.hashCode()); + result = prime * result + ((sdcAddress == null) ? 0 : sdcAddress.hashCode()); result = prime * result + ((comsumerID == null) ? 0 : comsumerID.hashCode()); result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode()); result = prime * result + ((environmentName == null) ? 0 : environmentName.hashCode()); @@ -251,11 +239,11 @@ public class DistributionClientConfig implements IConfiguration { return false; } DistributionClientConfig other = (DistributionClientConfig) obj; - if (asdcAddress == null) { - if (other.asdcAddress != null) { + if (sdcAddress == null) { + if (other.sdcAddress != null) { return false; } - } else if (!asdcAddress.equals(other.asdcAddress)) { + } else if (!sdcAddress.equals(other.sdcAddress)) { return false; } if (comsumerID == null) { @@ -322,7 +310,7 @@ public class DistributionClientConfig implements IConfiguration { @Override public String toString() { - return "TestConfiguration [asdcAddress=" + asdcAddress + ", user=" + user + ", password=" + password + return "TestConfiguration [sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password + ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout + ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup + ", environmentName=" + environmentName + ", comsumerID=" + comsumerID + "]"; @@ -337,11 +325,6 @@ public class DistributionClientConfig implements IConfiguration { this.isFilterInEmptyResources = isFilterInEmptyResources; } - @Override - public Boolean isUseHttpsWithDmaap() { - return this.useHttpsWithDmaap; - } - public Boolean isUseHttpsWithSDC() { return this.useHttpsWithSDC; } diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java index 4dfe388..4cee4cf 100644 --- a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java +++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java @@ -23,7 +23,7 @@ import org.onap.sdc.api.consumer.IDistributionStatusMessage; import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.api.notification.IResourceInstance; -import org.onap.sdc.http.HttpAsdcClient; +import org.onap.sdc.http.HttpSdcClient; import org.onap.sdc.http.SdcConnectorClient; import org.onap.sdc.impl.DistributionClientDownloadResultImpl; import org.onap.sdc.impl.DistributionClientImpl; @@ -46,8 +46,8 @@ public class ClientNotifyCallback implements INotificationCallback { private final DistributionClientImpl distributionClient; private final List<DistributionClientDownloadResultImpl> pulledArtifacts = new ArrayList<>(); DistributionClientConfig config = new DistributionClientConfig(); - HttpAsdcClient asdcClient = new HttpAsdcClient(config); - SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, asdcClient); + HttpSdcClient sdcClient = new HttpSdcClient(config); + SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, sdcClient); ArtifactsDownloader artifactsDownloader = new ArtifactsDownloader("/app/path", sdcConnectorClient); public List<DistributionClientDownloadResultImpl> getPulledArtifacts() { diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java index 58baec7..c89ecd4 100644 --- a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java +++ b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java @@ -19,18 +19,15 @@ */ package org.onap.test.it; +import java.util.ArrayList; +import java.util.List; import org.onap.sdc.impl.DistributionClientImpl; import org.onap.test.core.config.DistributionClientConfig; import org.onap.test.core.service.ArtifactsValidator; import org.onap.test.core.service.ClientInitializer; import org.onap.test.core.service.ClientNotifyCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -public class RegisterToAsdcTopicIT { +public class RegisterToSdcTopicIT { public static void main(String[] args) { @@ -40,12 +37,10 @@ public class RegisterToAsdcTopicIT { ClientNotifyCallback callback = new ClientNotifyCallback(validators, client); ClientInitializer clientInitializer = new ClientInitializer(clientConfig, callback, client); clientInitializer.initialize(); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - client.stop(); - System.out.println("Shutdown Hook is running !"); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + client.stop(); + System.out.println("Shutdown Hook is running !"); + })); } } diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java index ba118c0..1abef1f 100644 --- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java @@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.file.Paths; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; -import org.awaitility.Durations; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; - +import org.testcontainers.shaded.org.awaitility.Durations; +import org.testcontainers.utility.DockerImageName; @Testcontainers @ExtendWith(MockitoExtension.class) +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';") +@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT") +@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN") class ClientInitializerTest { - private static final int SUCCESSFUL_STOP_MSG_INDEX = 2; - private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3; - private static final int SUCCESSFUL_INIT_MSG_INDEX = 0; - private static final int SUCCESSFUL_DIST_MSG_INDEX = 3; private static final int EXPECTED_HEAT_ARTIFACTS = 4; + private static final DistributionClientConfig clientConfig = new DistributionClientConfig(); private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class); @Container - public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest") - .withNetworkMode("host"); + CustomKafkaContainer kafka = buildBrokerInstance(); @Container - public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest") - .withNetworkMode("host"); - @Mock - private Logger log; + public GenericContainer<?> mockSdc = + new GenericContainer<>( + "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop") + .withExposedPorts(30206); @Mock private Logger distClientLog; private ClientInitializer clientInitializer; private ClientNotifyCallback clientNotifyCallback; @BeforeEach - public void initializeClient() { - DistributionClientConfig clientConfig = new DistributionClientConfig(); + public void initializeClient() throws InterruptedException { + clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort()); List<ArtifactsValidator> validators = new ArrayList<>(); DistributionClientImpl client = new DistributionClientImpl(distClientLog); clientNotifyCallback = new ClientNotifyCallback(validators, client); clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client); - } - - @Test - void shouldRegisterToDmaapAfterClientInitialization() { - //given - final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when - clientInitializer.log = log; - clientInitializer.initialize(); - verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List<String> allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly"); - assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly"); - } - - @Test - void shouldUnregisterAndStopClient() { - //given - final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when clientInitializer.initialize(); - clientInitializer.stop(); - verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List<String> allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient"); - assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully"); + Thread.sleep(1000); + setUpTopicsAndSendData(); } @Test - void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException { - - //given - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:3904/events/testName/add")) - .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json"))) - .build(); - HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); - //when - clientInitializer.initialize(); + void shouldDownloadArtifactsWithArtifactTypeHeat() { waitForArtifacts(); List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts(); - //then Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size()); + clientInitializer.stop(); } private void waitForArtifacts() { @@ -132,4 +117,36 @@ class ClientInitializerTest { .atMost(Durations.ONE_MINUTE) .until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty()); } + + private CustomKafkaContainer buildBrokerInstance() { + final Map<String, String> env = new LinkedHashMap<>(); + env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT"); + env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN"); + env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"admin-secret\" " + + "user_admin=\"admin-secret\" " + + "user_producer=\"producer-secret\" " + + "user_consumer=\"consumer-secret\";"); + + return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withEmbeddedZookeeper() + .withStartupAttempts(1) + .withEnv(env) + .withFixedExposedPort(43219, 9093); + } + + @SneakyThrows + private void setUpTopicsAndSendData() { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer<String, String> producer = new KafkaProducer<>(props); + String content = Files.readString(Path.of("src/test/resources/artifacts.json")); + producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get(); + } } diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java new file mode 100644 index 0000000..e2eabc1 --- /dev/null +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.test.core.service; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class CustomKafkaContainer extends FixedHostPortGenericContainer<CustomKafkaContainer> { + protected String externalZookeeperConnect; + + public CustomKafkaContainer(DockerImageName dockerImageName) { + super(String.valueOf(dockerImageName)); + this.externalZookeeperConnect = null; + this.withExposedPorts(9093); + this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"); + this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + this.withEnv("KAFKA_BROKER_ID", "1"); + this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); + this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + this.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"); + this.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1"); + this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807"); + this.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public CustomKafkaContainer withEmbeddedZookeeper() { + this.externalZookeeperConnect = null; + return this.self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093)); + } + + protected void configure() { + this.withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("BROKER://%s:9092", this.getNetwork() != null ? this.getNetworkAliases().get(0) : "localhost")); + String command = ""; + if (this.externalZookeeperConnect != null) { + this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect); + } else { + this.addExposedPort(2181); + this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); + command = command + "echo 'clientPort=2181' > zookeeper.properties\n"; + command = command + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command = command + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command = command + "zookeeper-server-start zookeeper.properties &\n"; + } + + command = command + "echo '' > /etc/confluent/docker/ensure \n"; + command = command + "/etc/confluent/docker/run \n"; + this.withCommand("sh", "-c", command); + } + + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + try { + String brokerAdvertisedListener = this.brokerAdvertisedListener(containerInfo); + ExecResult result = this.execInContainer("kafka-configs", "--alter", "--bootstrap-server", + brokerAdvertisedListener, "--entity-type", "brokers", "--entity-name", + this.getEnvMap().get("KAFKA_BROKER_ID"), "--add-config", + "advertised.listeners=[" + String.join(",", this.getBootstrapServers(), brokerAdvertisedListener) + "]"); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } catch (Throwable var4) { + throw var4; + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } +} diff --git a/sdc-distribution-ci/src/test/resources/logback-test.xml b/sdc-distribution-ci/src/test/resources/logback-test.xml new file mode 100644 index 0000000..78c5b05 --- /dev/null +++ b/sdc-distribution-ci/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + + <logger name="org.testcontainers" level="INFO"/> + <logger name="com.github.dockerjava" level="WARN"/> +</configuration>
\ No newline at end of file |