/*
* ============LICENSE_START=======================================================
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.services.prh.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.jayway.jsonpath.JsonPath;
import reactor.core.publisher.Flux;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.MainApp;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.client.WireMock.patch;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import static java.lang.ClassLoader.getSystemResource;
import static org.mockito.Mockito.when;
/**
* * @author Pranit Kapdule on
* * 24/08/23
* */
@SpringBootTest
@AutoConfigureWireMock(port = 0)
@ActiveProfiles(value = "autoCommitDisabled")
class PrhWorkflowIntegrationForAutoCommitDisabledTest {
@Autowired
private ScheduledTasksWithCommit scheduledTasksWithCommit;
@MockBean
private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit; // just to disable scheduling - some
// configurability in ScheduledTaskRunner not
// to start tasks at app startup would be
// welcome
@MockBean
private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
@Autowired
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@Configuration
@Import(MainApp.class)
static class CbsConfigTestConfig {
@Value("http://localhost:${wiremock.server.port}")
private String wiremockServerAddress;
protected KafkaConfiguration kafkaConfiguration;
@Bean
public CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode() {
JsonObject cbsConfigJson = new Gson()
.fromJson(getResourceContent("autoCommitDisabledConfigurationFromCbs2.json")
.replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
.replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress), JsonObject.class);
CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
try {
cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
} catch (Exception e) {
//Exception is expected as environment variable for JAAS_CONFIG is not available
if (e.getMessage() == "kafkaJaasConfig") {
kafkaConfiguration = new ImmutableKafkaConfiguration.Builder().kafkaBoostrapServerConfig("0.0.0.0")
.groupIdConfig("CG1").kafkaSaslMechanism("SASL_MECHANISM")
.kafkaSecurityProtocol("SEC-PROTOCOL").kafkaJaasConfig("JAAS_CONFIG").build();
cbsConfigurationForAutoCommitDisabledMode.setKafkaConfiguration(kafkaConfiguration);
}
}
return cbsConfigurationForAutoCommitDisabledMode;
};
}
@BeforeEach
void resetWireMock() {
WireMock.reset();
}
@Test
void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
});
}
void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() throws JSONException {
when(kafkaConsumerTaskImpl.execute()).thenReturn(Flux.empty());
scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
verify(0, anyRequestedFor(urlPathMatching("/aai.*")));
verify(0, postRequestedFor(urlPathMatching("/events.*")));
}
@Test
void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
});
}
void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification()
throws JSONException, JsonMappingException, JsonProcessingException {
String event = getResourceContent("integration/event.json");
String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
java.util.List eventList = new ArrayList<>();
eventList.add(event);
Flux fluxList = dmaapConsumerJsonParser
.getConsumerDmaapModelFromKafkaConsumerRecord(eventList);
stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
.withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
}
private static String getResourceContent(String resourceName) {
try {
return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
} catch (Exception e) {
throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
}
}
}