diff options
Diffstat (limited to 'ms/blueprintsprocessor')
71 files changed, 2875 insertions, 490 deletions
diff --git a/ms/blueprintsprocessor/application/pom.xml b/ms/blueprintsprocessor/application/pom.xml index ed1b67dfd..0ee6ac339 100755 --- a/ms/blueprintsprocessor/application/pom.xml +++ b/ms/blueprintsprocessor/application/pom.xml @@ -24,7 +24,7 @@ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>parent</artifactId> <version>0.7.0-SNAPSHOT</version> - <relativePath>../parent</relativePath> + <relativePath>..</relativePath> </parent> <artifactId>application</artifactId> @@ -38,7 +38,6 @@ <name.space>org.onap.ccsdk.cds</name.space> <serviceArtifactName>blueprintsprocessor</serviceArtifactName> <image.name>onap/ccsdk-blueprintsprocessor</image.name> - <docker.buildArg.https_proxy>${https_proxy}</docker.buildArg.https_proxy> <docker.push.phase>deploy</docker.push.phase> <docker.verbose>true</docker.verbose> <ccsdk.project.version>${project.version}</ccsdk.project.version> @@ -141,23 +140,42 @@ <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> + <!-- BEGIN UAT --> + <dependency> + <groupId>org.skyscreamer</groupId> + <artifactId>jsonassert</artifactId> + </dependency> <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>com.nhaarman.mockitokotlin2</groupId> <artifactId>mockito-kotlin</artifactId> - <version>2.1.0</version> - <scope>test</scope> + <version>2.2.0</version> </dependency> <dependency> <groupId>com.schibsted.spt.data</groupId> <artifactId>jslt</artifactId> <version>0.1.8</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${apache.httpcomponents.client.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>${apache.httpcomponents.client.version}</version> + </dependency> + <dependency> + <groupId>com.github.tomakehurst</groupId> + <artifactId>wiremock-jre8</artifactId> + <version>2.25.0</version> <scope>test</scope> </dependency> + <!-- END UAT --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml index e4bb00773..27f72b59b 100755 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml @@ -58,7 +58,11 @@ services: STICKYSELECTORKEY: ENVCONTEXT: dev APP_PORT: 50052 - BASIC_AUTH: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw== + #AUTH_TYPE: basic-auth + #AUTH_TOKEN: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw== + AUTH_TYPE: tls-auth + AUTH_CERT_CHAIN: /opt/app/onap/python/py-executor-chain.pem + AUTH_PRIVATE_KEY: /opt/app/onap/python/py-executor-key.pem LOG_FILE: /opt/app/onap/logs/application.log volumes: diff --git a/ms/blueprintsprocessor/application/src/main/docker/Dockerfile b/ms/blueprintsprocessor/application/src/main/docker/Dockerfile index dab0a4c01..207cec5cb 100755 --- a/ms/blueprintsprocessor/application/src/main/docker/Dockerfile +++ b/ms/blueprintsprocessor/application/src/main/docker/Dockerfile @@ -1,8 +1,5 @@ FROM omahoco1/alpine-java-python -ENV HTTP_PROXY ${HTTP_PROXY} -ENV HTTPS_PROXY ${HTTPS_PROXY} - # add entrypoint COPY run.source /etc/run.source COPY startService.sh /startService.sh @@ -15,4 +12,4 @@ RUN tar -xzf /source.tar.gz -C /tmp \ && rm -rf /source.tar.gz \ && rm -rf /tmp/@project.build.finalName@ -ENTRYPOINT /startService.sh
\ No newline at end of file +ENTRYPOINT /startService.sh diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintGRPCServer.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintGRPCServer.kt index 160a1b1b4..2d39eaa1f 100644 --- a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintGRPCServer.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintGRPCServer.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor import io.grpc.ServerBuilder import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.BluePrintManagementGRPCHandler +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor import org.onap.ccsdk.cds.blueprintsprocessor.security.BasicAuthServerInterceptor import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.BluePrintProcessingGRPCHandler import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -44,6 +45,7 @@ open class BlueprintGRPCServer(private val bluePrintProcessingGRPCHandler: BlueP log.info("Starting Blueprint Processor GRPC Starting..") val server = ServerBuilder .forPort(grpcPort!!) + .intercept(GrpcServerLoggingInterceptor()) .intercept(authInterceptor) .addService(bluePrintProcessingGRPCHandler) .addService(bluePrintManagementGRPCHandler) diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt index 5ed5ff450..68fbf256c 100644 --- a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt @@ -16,7 +16,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.LoggingService +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.RestLoggerService import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.springframework.stereotype.Component import org.springframework.web.server.ServerWebExchange @@ -29,7 +29,7 @@ import reactor.util.context.Context open class LoggingWebFilter : WebFilter { override fun filter(serverWebExchange: ServerWebExchange, webFilterChain: WebFilterChain): Mono<Void> { - val loggingService = LoggingService() + val loggingService = RestLoggerService() loggingService.entering(serverWebExchange.request) val filterChain = webFilterChain.filter(serverWebExchange).subscriberContext( Context.of(MDCContext, MDCContext())) diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/JsonNormalizer.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/JsonNormalizer.kt index 69673f931..1a625c279 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/JsonNormalizer.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/JsonNormalizer.kt @@ -17,21 +17,20 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ContainerNode -import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.ObjectNode import com.schibsted.spt.data.jslt.Parser -class JsonNormalizer { +internal class JsonNormalizer { companion object { - fun getNormalizer(mapper: ObjectMapper, jsltSpec: JsonNode): (String) -> String { - if (jsltSpec is MissingNode) { + fun getNormalizer(mapper: ObjectMapper, jsltSpec: JsonNode?): (String) -> String { + if (jsltSpec == null) { return { it } } return { s: String -> diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/MoreMatchers.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/MoreMatchers.kt index 71e07ab4c..163544fc9 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/MoreMatchers.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/MoreMatchers.kt @@ -17,7 +17,7 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat import com.google.common.collect.Maps import org.mockito.ArgumentMatcher diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/PathDeserializer.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/PathDeserializer.kt index 1a232f2d3..6b1b0c676 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/PathDeserializer.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/PathDeserializer.kt @@ -17,13 +17,13 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.deser.std.StdDeserializer -class PathDeserializer : StdDeserializer<String>(String::class.java) { +internal class PathDeserializer : StdDeserializer<String>(String::class.java) { override fun deserialize(jp: JsonParser, ctxt: DeserializationContext?): String { val path = jp.codec.readValue(jp, Any::class.java) return flatJoin(path) diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/UatDefinition.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatDefinition.kt index abb1dfcd1..3046f1041 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/UatDefinition.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatDefinition.kt @@ -17,45 +17,76 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat import com.fasterxml.jackson.annotation.JsonAlias +import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import com.fasterxml.jackson.databind.node.MissingNode +import com.fasterxml.jackson.module.kotlin.convertValue +import org.yaml.snakeyaml.DumperOptions import org.yaml.snakeyaml.Yaml -import java.nio.file.Path +import org.yaml.snakeyaml.nodes.Tag -data class ProcessDefinition(val name: String, val request: JsonNode, val expectedResponse: JsonNode, - val responseNormalizerSpec: JsonNode = MissingNode.getInstance()) +@JsonInclude(JsonInclude.Include.NON_EMPTY) +data class ProcessDefinition(val name: String, val request: JsonNode, val expectedResponse: JsonNode? = null, + val responseNormalizerSpec: JsonNode? = null) +@JsonInclude(JsonInclude.Include.NON_EMPTY) data class RequestDefinition(val method: String, @JsonDeserialize(using = PathDeserializer::class) val path: String, val headers: Map<String, String> = emptyMap(), - val body: JsonNode = MissingNode.getInstance()) + val body: JsonNode? = null) -data class ResponseDefinition(val status: Int = 200, val body: JsonNode = MissingNode.getInstance()) { +@JsonInclude(JsonInclude.Include.NON_EMPTY) +data class ResponseDefinition(val status: Int = 200, val body: JsonNode? = null) { companion object { val DEFAULT_RESPONSE = ResponseDefinition() } } +@JsonInclude(JsonInclude.Include.NON_EMPTY) data class ExpectationDefinition(val request: RequestDefinition, val response: ResponseDefinition = ResponseDefinition.DEFAULT_RESPONSE) +@JsonInclude(JsonInclude.Include.NON_EMPTY) data class ServiceDefinition(val selector: String, val expectations: List<ExpectationDefinition>) +@JsonInclude(JsonInclude.Include.NON_EMPTY) data class UatDefinition(val processes: List<ProcessDefinition>, @JsonAlias("external-services") val externalServices: List<ServiceDefinition> = emptyList()) { - companion object { - fun load(mapper: ObjectMapper, path: Path): UatDefinition { - return path.toFile().reader().use { reader -> - mapper.convertValue(Yaml().load(reader), UatDefinition::class.java) + fun dump(mapper: ObjectMapper, excludedProperties: List<String> = emptyList()): String { + val uatAsMap: Map<String, Any> = mapper.convertValue(this) + if (excludedProperties.isNotEmpty()) { + pruneTree(uatAsMap, excludedProperties) + } + return Yaml().dumpAs(uatAsMap, Tag.MAP, DumperOptions.FlowStyle.BLOCK) + } + + fun toBare(): UatDefinition { + val newProcesses = processes.map { p -> + ProcessDefinition(p.name, p.request, null, p.responseNormalizerSpec) + } + return UatDefinition(newProcesses) + } + + private fun pruneTree(node: Any?, excludedProperties: List<String>) { + when (node) { + is MutableMap<*, *> -> { + excludedProperties.forEach { key -> node.remove(key) } + node.forEach { (_, value) -> pruneTree(value, excludedProperties) } } + is List<*> -> node.forEach { value -> pruneTree(value, excludedProperties) } } } + + companion object { + fun load(mapper: ObjectMapper, spec: String): UatDefinition = + mapper.convertValue(Yaml().load(spec), UatDefinition::class.java) + + } } diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatExecutor.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatExecutor.kt new file mode 100644 index 000000000..6678075bd --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatExecutor.kt @@ -0,0 +1,324 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.argThat +import com.nhaarman.mockitokotlin2.atLeast +import com.nhaarman.mockitokotlin2.atLeastOnce +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.verifyNoMoreInteractions +import com.nhaarman.mockitokotlin2.whenever +import org.apache.http.HttpHeaders +import org.apache.http.HttpStatus +import org.apache.http.client.HttpClient +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.entity.mime.HttpMultipartMode +import org.apache.http.entity.mime.MultipartEntityBuilder +import org.apache.http.impl.client.HttpClientBuilder +import org.apache.http.message.BasicHeader +import org.hamcrest.CoreMatchers.equalTo +import org.hamcrest.CoreMatchers.notNullValue +import org.hamcrest.MatcherAssert.assertThat +import org.mockito.Answers +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService.WebClientResponse +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.COLOR_MOCKITO +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.markerOf +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.MockInvocationLogger +import org.skyscreamer.jsonassert.JSONAssert +import org.skyscreamer.jsonassert.JSONCompareMode +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.core.env.ConfigurableEnvironment +import org.springframework.http.MediaType +import org.springframework.stereotype.Component +import org.springframework.util.Base64Utils +import java.util.concurrent.ConcurrentHashMap + +/** + * Assumptions: + * + * - Application HTTP service is bound to loopback interface; + * - Password is either defined in plain (with "{noop}" prefix), or it's the same of username. + * + * @author Eliezio Oliveira + */ +@Component +class UatExecutor( + private val environment: ConfigurableEnvironment, + private val restClientFactory: BluePrintRestLibPropertyService, + private val mapper: ObjectMapper +) { + + companion object { + private const val NOOP_PASSWORD_PREFIX = "{noop}" + + private val log: Logger = LoggerFactory.getLogger(UatExecutor::class.java) + private val mockLoggingListener = MockInvocationLogger(markerOf(COLOR_MOCKITO)) + } + + // use lazy evaluation to postpone until localServerPort is injected by Spring + private val baseUrl: String by lazy { + "http://127.0.0.1:${localServerPort()}" + } + + @Throws(AssertionError::class) + fun execute(uatSpec: String, cbaBytes: ByteArray) { + val uat = UatDefinition.load(mapper, uatSpec) + execute(uat, cbaBytes) + } + + /** + * + * The UAT can range from minimum to completely defined. + * + * @return an updated UAT with all NB and SB messages. + */ + @Throws(AssertionError::class) + fun execute(uat: UatDefinition, cbaBytes: ByteArray): UatDefinition { + val defaultHeaders = listOf(BasicHeader(HttpHeaders.AUTHORIZATION, clientAuthToken())) + val httpClient = HttpClientBuilder.create() + .setDefaultHeaders(defaultHeaders) + .build() + // Only if externalServices are defined + val mockInterceptor = MockPreInterceptor() + // Always defined and used, whatever the case + val spyInterceptor = SpyPostInterceptor(mapper) + restClientFactory.setInterceptors(mockInterceptor, spyInterceptor) + try { + // Configure mocked external services and save their expected requests for further validation + val requestsPerClient = uat.externalServices.associateBy( + { service -> + createRestClientMock(service.expectations).also { restClient -> + // side-effect: register restClient to override real instance + mockInterceptor.registerMock(service.selector, restClient) + } + }, + { service -> service.expectations.map { it.request } } + ) + + val newProcesses = httpClient.use { client -> + uploadBlueprint(client, cbaBytes) + + // Run processes + uat.processes.map { process -> + log.info("Executing process '${process.name}'") + val responseNormalizer = JsonNormalizer.getNormalizer(mapper, process.responseNormalizerSpec) + val actualResponse = processBlueprint(client, process.request, + process.expectedResponse, responseNormalizer) + ProcessDefinition(process.name, process.request, actualResponse, process.responseNormalizerSpec) + } + } + + // Validate requests to external services + for ((mockClient, requests) in requestsPerClient) { + requests.forEach { request -> + verify(mockClient, atLeastOnce()).exchangeResource( + eq(request.method), + eq(request.path), + argThat { assertJsonEquals(request.body, this) }, + argThat(RequiredMapEntriesMatcher(request.headers))) + } + // Don't mind the invocations to the overloaded exchangeResource(String, String, String) + verify(mockClient, atLeast(0)).exchangeResource(any(), any(), any()) + verifyNoMoreInteractions(mockClient) + } + + val newExternalServices = spyInterceptor.getSpies() + .map(SpyService::asServiceDefinition) + + return UatDefinition(newProcesses, newExternalServices) + } finally { + restClientFactory.clearInterceptors() + } + } + + private fun createRestClientMock(restExpectations: List<ExpectationDefinition>) + : BlueprintWebClientService { + val restClient = mock<BlueprintWebClientService>( + defaultAnswer = Answers.RETURNS_SMART_NULLS, + // our custom verboseLogging handler + invocationListeners = arrayOf(mockLoggingListener) + ) + + // Delegates to overloaded exchangeResource(String, String, String, Map<String, String>) + whenever(restClient.exchangeResource(any(), any(), any())) + .thenAnswer { invocation -> + val method = invocation.arguments[0] as String + val path = invocation.arguments[1] as String + val request = invocation.arguments[2] as String + restClient.exchangeResource(method, path, request, emptyMap()) + } + for (expectation in restExpectations) { + whenever(restClient.exchangeResource( + eq(expectation.request.method), + eq(expectation.request.path), + any(), + any())) + .thenReturn(WebClientResponse(expectation.response.status, expectation.response.body.toString())) + } + return restClient + } + + @Throws(AssertionError::class) + private fun uploadBlueprint(client: HttpClient, cbaBytes: ByteArray) { + val multipartEntity = MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("file", cbaBytes, ContentType.DEFAULT_BINARY, "cba.zip") + .build() + val request = HttpPost("$baseUrl/api/v1/blueprint-model/publish").apply { + entity = multipartEntity + } + client.execute(request) { response -> + val statusLine = response.statusLine + assertThat(statusLine.statusCode, equalTo(HttpStatus.SC_OK)) + } + } + + @Throws(AssertionError::class) + private fun processBlueprint(client: HttpClient, requestBody: JsonNode, + expectedResponse: JsonNode?, responseNormalizer: (String) -> String): JsonNode { + val stringEntity = StringEntity(mapper.writeValueAsString(requestBody), ContentType.APPLICATION_JSON) + val request = HttpPost("$baseUrl/api/v1/execution-service/process").apply { + entity = stringEntity + } + val response = client.execute(request) { response -> + val statusLine = response.statusLine + assertThat(statusLine.statusCode, equalTo(HttpStatus.SC_OK)) + val entity = response.entity + assertThat("Response contains no content", entity, notNullValue()) + entity.content.bufferedReader().use { it.readText() } + } + val actualResponse = responseNormalizer(response) + if (expectedResponse != null) { + assertJsonEquals(expectedResponse, actualResponse) + } + return mapper.readTree(actualResponse)!! + } + + @Throws(AssertionError::class) + private fun assertJsonEquals(expected: JsonNode?, actual: String): Boolean { + // special case + if ((expected == null) && actual.isBlank()) { + return true + } + // general case + JSONAssert.assertEquals(expected?.toString(), actual, JSONCompareMode.LENIENT) + // assertEquals throws an exception whenever match fails + return true + } + + private fun localServerPort(): Int = + (environment.getProperty("local.server.port") + ?: environment.getRequiredProperty("blueprint.httpPort")).toInt() + + private fun clientAuthToken(): String { + val username = environment.getRequiredProperty("security.user.name") + val password = environment.getRequiredProperty("security.user.password") + val plainPassword = when { + password.startsWith(NOOP_PASSWORD_PREFIX) -> password.substring(NOOP_PASSWORD_PREFIX.length) + else -> username + } + return "Basic " + Base64Utils.encodeToString("$username:$plainPassword".toByteArray()) + } + + private class MockPreInterceptor : BluePrintRestLibPropertyService.PreInterceptor { + private val mocks = ConcurrentHashMap<String, BlueprintWebClientService>() + + override fun getInstance(jsonNode: JsonNode): BlueprintWebClientService? { + TODO("jsonNode-keyed services not yet supported") + } + + override fun getInstance(selector: String): BlueprintWebClientService? = + mocks[selector] + + fun registerMock(selector: String, client: BlueprintWebClientService) { + mocks[selector] = client + } + } + + private class SpyPostInterceptor(private val mapper: ObjectMapper) : BluePrintRestLibPropertyService.PostInterceptor { + + private val spies = ConcurrentHashMap<String, SpyService>() + + override fun getInstance(jsonNode: JsonNode, service: BlueprintWebClientService): BlueprintWebClientService { + TODO("jsonNode-keyed services not yet supported") + } + + override fun getInstance(selector: String, service: BlueprintWebClientService): BlueprintWebClientService { + val spiedService = SpyService(mapper, selector, service) + spies[selector] = spiedService + return spiedService + } + + fun getSpies(): List<SpyService> = + spies.values.toList() + } + + private class SpyService(private val mapper: ObjectMapper, + val selector: String, + private val realService: BlueprintWebClientService) : + BlueprintWebClientService by realService { + + private val expectations: MutableList<ExpectationDefinition> = mutableListOf() + + override fun exchangeResource(methodType: String, path: String, request: String): WebClientResponse<String> = + exchangeResource(methodType, path, request, DEFAULT_HEADERS) + + override fun exchangeResource(methodType: String, path: String, request: String, + headers: Map<String, String>): WebClientResponse<String> { + val requestDefinition = RequestDefinition(methodType, path, headers, toJson(request)) + val realAnswer = realService.exchangeResource(methodType, path, request, headers) + val responseBody = when { + // TODO: confirm if we need to normalize the response here + realAnswer.status == HttpStatus.SC_OK -> toJson(realAnswer.body) + else -> null + } + val responseDefinition = ResponseDefinition(realAnswer.status, responseBody) + expectations.add(ExpectationDefinition(requestDefinition, responseDefinition)) + return realAnswer + } + + fun asServiceDefinition() = + ServiceDefinition(selector, expectations) + + private fun toJson(str: String): JsonNode? { + return when { + str.isNotBlank() -> mapper.readTree(str) + else -> null + } + } + + companion object { + private val DEFAULT_HEADERS = mapOf( + HttpHeaders.CONTENT_TYPE to MediaType.APPLICATION_JSON_VALUE, + HttpHeaders.ACCEPT to MediaType.APPLICATION_JSON_VALUE + ) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatServices.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatServices.kt new file mode 100644 index 000000000..f133fd7c7 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatServices.kt @@ -0,0 +1,121 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat + +import com.fasterxml.jackson.databind.ObjectMapper +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.COLOR_SERVICES +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.resetContextColor +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.setContextColor +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.UAT_SPECIFICATION_FILE +import org.springframework.context.annotation.Profile +import org.springframework.http.HttpStatus +import org.springframework.http.MediaType +import org.springframework.http.codec.multipart.FilePart +import org.springframework.security.access.prepost.PreAuthorize +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestPart +import org.springframework.web.bind.annotation.RestController +import org.springframework.web.server.ResponseStatusException +import java.io.File +import java.util.zip.ZipFile + +/** + * Supporting services to help creating UAT specifications. + * + * @author Eliezio Oliveira + */ +@RestController +@RequestMapping("/api/v1/uat") +@Profile("uat") +open class UatServices(private val uatExecutor: UatExecutor, private val mapper: ObjectMapper) { + + @PostMapping("/verify", consumes = [MediaType.MULTIPART_FORM_DATA_VALUE]) + @PreAuthorize("hasRole('USER')") + @Suppress("BlockingMethodInNonBlockingContext") + open fun verify(@RequestPart("cba") cbaFile: FilePart) = runBlocking { + setContextColor(COLOR_SERVICES) + val tempFile = createTempFile() + try { + cbaFile.transferTo(tempFile) + val uatSpec = readZipEntryAsText(tempFile, UAT_SPECIFICATION_FILE) + val cbaBytes = tempFile.readBytes() + uatExecutor.execute(uatSpec, cbaBytes) + } catch (e: AssertionError) { + throw ResponseStatusException(HttpStatus.BAD_REQUEST, e.message) + } catch (t: Throwable) { + throw ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, t.message, t) + } finally { + tempFile.delete() + resetContextColor() + } + } + + @PostMapping("/spy", consumes = [MediaType.MULTIPART_FORM_DATA_VALUE], produces = ["text/vnd.yaml"]) + @PreAuthorize("hasRole('USER')") + @Suppress("BlockingMethodInNonBlockingContext") + open fun spy(@RequestPart("cba") cbaFile: FilePart, + @RequestPart("uat", required = false) uatFile: FilePart?): String = runBlocking { + val tempFile = createTempFile() + setContextColor(COLOR_SERVICES) + try { + cbaFile.transferTo(tempFile) + val uatSpec = when { + uatFile != null -> uatFile.readText() + else -> readZipEntryAsText(tempFile, UAT_SPECIFICATION_FILE) + } + val uat = UatDefinition.load(mapper, uatSpec) + val cbaBytes = tempFile.readBytes() + val updatedUat = uatExecutor.execute(uat, cbaBytes) + return@runBlocking updatedUat.dump(mapper, FIELDS_TO_EXCLUDE) + } catch (t: Throwable) { + throw ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, t.message, t) + } finally { + tempFile.delete() + resetContextColor() + } + } + + private fun FilePart.readText(): String { + val tempFile = createTempFile() + try { + transferTo(tempFile).block() + return tempFile.readText() + } finally { + tempFile.delete() + } + } + + @Suppress("SameParameterValue") + private fun readZipEntryAsText(file: File, entryName: String): String { + return ZipFile(file).use { zipFile -> zipFile.readEntryAsText(entryName) } + } + + private fun ZipFile.readEntryAsText(entryName: String): String { + val zipEntry = getEntry(entryName) + return getInputStream(zipEntry).readBytes().toString(Charsets.UTF_8) + } + + companion object { + // Fields that can be safely ignored from BPP response, and can be omitted on the UAT specification. + private val FIELDS_TO_EXCLUDE = listOf("timestamp") + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/CollectionUtils2.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/ColorMarker.kt index 63d64cae4..10139c839 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/CollectionUtils2.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/ColorMarker.kt @@ -17,15 +17,8 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat.logging -import org.springframework.util.CollectionUtils -import org.springframework.util.MultiValueMap +import org.slf4j.Marker - -/** - * Convenient method to create a single-entry MultiValueMap. - */ -fun <K, V> toMultiValueMap(key: K, vararg values: V): MultiValueMap<K, V> { - return CollectionUtils.toMultiValueMap(mapOf(key to values.asList())) -} +class ColorMarker internal constructor(private val dlg: Marker) : Marker by dlg
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/LogColor.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/LogColor.kt new file mode 100644 index 000000000..dce516933 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/LogColor.kt @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat.logging + +import org.slf4j.MDC +import org.slf4j.MarkerFactory + +object LogColor { + + const val COLOR_SERVICES = "green" + const val COLOR_TEST_CLIENT = "yellow" + const val COLOR_MOCKITO = "cyan" + const val COLOR_WIREMOCK = "blue" + + // The Slf4j MDC key that will hold the global color + const val MDC_COLOR_KEY = "color" + + fun setContextColor(color: String) { + MDC.put(MDC_COLOR_KEY, color) + } + + fun resetContextColor() { + MDC.remove(MDC_COLOR_KEY) + } + + fun markerOf(color: String): ColorMarker = + ColorMarker(MarkerFactory.getMarker(color)) +} diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/MockInvocationLogger.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/MockInvocationLogger.kt new file mode 100644 index 000000000..f8e6bd486 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/MockInvocationLogger.kt @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat.logging + +import org.mockito.listeners.InvocationListener +import org.mockito.listeners.MethodInvocationReport +import org.slf4j.LoggerFactory +import org.slf4j.Marker +import java.util.concurrent.atomic.AtomicInteger + +/** + * Logs all Mockito's mock/spy invocations. + * + * Used for debugging interactions with a mock. + */ +class MockInvocationLogger(private val marker: Marker) : InvocationListener { + + private val mockInvocationsCounter = AtomicInteger() + + override fun reportInvocation(report: MethodInvocationReport) { + val sb = StringBuilder() + sb.appendln("Method invocation #${mockInvocationsCounter.incrementAndGet()} on mock/spy") + report.locationOfStubbing?.let { location -> + sb.append(INDENT).append("stubbed ").appendln(location) + } + sb.appendln(report.invocation) + sb.append(INDENT).append("invoked ").appendln(report.invocation.location) + if (report.threwException()) { + sb.append(INDENT).append("has thrown -> ").append(report.throwable.javaClass.name) + report.throwable.message?.let { message -> + sb.append(" with message ").append(message) + } + sb.appendln() + } else { + sb.append(INDENT).append("has returned -> \"").append(report.returnedValue).append('"') + report.returnedValue?.let { value -> + sb.append(" (").append(value.javaClass.name).append(')') + } + sb.appendln() + } + log.info(marker, sb.toString()) + } + + companion object { + private const val INDENT = " " + private val log = LoggerFactory.getLogger(MockInvocationLogger::class.java) + } +} diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/SmartColorDiscriminator.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/SmartColorDiscriminator.kt new file mode 100644 index 000000000..d7b38d3fa --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/logging/SmartColorDiscriminator.kt @@ -0,0 +1,41 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat.logging + +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.sift.AbstractDiscriminator +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.MDC_COLOR_KEY + +class SmartColorDiscriminator : AbstractDiscriminator<ILoggingEvent>() { + var defaultValue: String = "white" + + override fun getKey(): String { + return MDC_COLOR_KEY + } + + fun setKey() { + throw UnsupportedOperationException("Key not settable. Using $MDC_COLOR_KEY") + } + + override fun getDiscriminatingValue(e: ILoggingEvent): String = + (e.marker as? ColorMarker)?.name + ?: e.mdcPropertyMap?.get(MDC_COLOR_KEY) + ?: defaultValue +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-local.yml b/ms/blueprintsprocessor/application/src/main/resources/application-local.yml new file mode 100644 index 000000000..de2cf4e52 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/resources/application-local.yml @@ -0,0 +1,65 @@ +appName: ControllerBluePrints +appVersion: 1.0.0 +blueprints: + processor: + functions: + python: + executor: + executionPath: ./components/scripts/python/ccsdk_blueprints + modulePaths: ./components/scripts/python/ccsdk_blueprints,./components/scripts/python/ccsdk_netconf,./components/scripts/python/ccsdk_restconf +blueprintsprocessor: + blueprintArchivePath: /tmp/cds/archive + blueprintDeployPath: /tmp/cds/deploy + blueprintWorkingPath: /tmp/cds/work + db: + primary: + driverClassName: org.mariadb.jdbc.Driver + hibernateDDLAuto: none + hibernateDialect: org.hibernate.dialect.MySQL5InnoDBDialect + hibernateHbm2ddlAuto: update + hibernateNamingStrategy: org.hibernate.cfg.ImprovedNamingStrategy + password: sdnctl + url: jdbc:mysql://localhost:3306/sdnctl + username: sdnctl + grpcEnable: false + grpcPort: 9111 + httpPort: 8080 + loadModelType: false + loadResourceDictionary: false + messageclient: + self-service-api: + bootstrapServers: 127.0.0.1:9092 + clientId: default-client-id + consumerTopic: receiver.t + groupId: receiver-id + kafkaEnable: false + topic: producer.t + type: kafka-basic-auth + remoteScriptCommand: + enabled: true + restclient: + sdncodl: + password: Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U + type: basic-auth + url: http://localhost:8282/ + username: admin + restconfEnabled: true +controllerblueprints: + loadInitialData: true +logging: + level: + org: + springframework: + boot: + context: + config: debug +ms_name: org.onap.ccsdk.apps.controllerblueprints +spring: + datasource: + password: sdnctl + url: jdbc:mysql://localhost:3306/sdnctl + username: sdnctl + security: + user: + name: ccsdkapps + password: '{bcrypt}$2a$10$duaUzVUVW0YPQCSIbGEkQOXwafZGwQ/b32/Ys4R1iwSSawFgz7QNu' diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-uat.yml b/ms/blueprintsprocessor/application/src/main/resources/application-uat.yml new file mode 100644 index 000000000..f00d62b0f --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/resources/application-uat.yml @@ -0,0 +1,4 @@ +server: + error: + include-exception: true + include-stacktrace: always diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintsAcceptanceTest.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintsAcceptanceTest.kt deleted file mode 100644 index ce7434f8e..000000000 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintsAcceptanceTest.kt +++ /dev/null @@ -1,242 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ -package org.onap.ccsdk.cds.blueprintsprocessor - -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.node.MissingNode -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.argThat -import com.nhaarman.mockitokotlin2.atLeast -import com.nhaarman.mockitokotlin2.atLeastOnce -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import com.nhaarman.mockitokotlin2.verifyNoMoreInteractions -import com.nhaarman.mockitokotlin2.whenever -import org.junit.ClassRule -import org.junit.Rule -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.mockito.Answers -import org.onap.ccsdk.cds.blueprintsprocessor.rest.RestLibConstants -import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService -import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService.WebClientResponse -import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintArchiveUtils.Companion.compressToBytes -import org.skyscreamer.jsonassert.JSONAssert -import org.skyscreamer.jsonassert.JSONCompareMode -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.boot.test.mock.mockito.MockBean -import org.springframework.core.io.ByteArrayResource -import org.springframework.core.io.Resource -import org.springframework.http.MediaType -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.TestPropertySource -import org.springframework.test.context.junit4.rules.SpringClassRule -import org.springframework.test.context.junit4.rules.SpringMethodRule -import org.springframework.test.web.reactive.server.EntityExchangeResult -import org.springframework.test.web.reactive.server.WebTestClient -import reactor.core.publisher.Mono -import java.io.File -import java.nio.charset.StandardCharsets -import java.nio.file.Paths -import kotlin.test.BeforeTest -import kotlin.test.Test - -// Only one runner can be configured with jUnit 4. We had to replace the SpringRunner by equivalent jUnit rules. -// See more on https://docs.spring.io/autorepo/docs/spring-framework/current/spring-framework-reference/testing.html#testcontext-junit4-rules -@RunWith(Parameterized::class) -// Set blueprintsprocessor.httpPort=0 to trigger a random port selection -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) -@AutoConfigureWebTestClient(timeout = "PT10S") -@ContextConfiguration(initializers = [ - WorkingFoldersInitializer::class, - TestSecuritySettings.ServerContextInitializer::class -]) -@TestPropertySource(locations = ["classpath:application-test.properties"]) -class BlueprintsAcceptanceTest(private val blueprintName: String, private val filename: String) { - - companion object { - const val UAT_BLUEPRINTS_BASE_DIR = "../../../components/model-catalog/blueprint-model/uat-blueprints" - const val EMBEDDED_UAT_FILE = "Tests/uat.yaml" - - @ClassRule - @JvmField - val springClassRule = SpringClassRule() - - val log: Logger = LoggerFactory.getLogger(BlueprintsAcceptanceTest::class.java) - - /** - * Generates the parameters to create a test instance for every blueprint found under UAT_BLUEPRINTS_BASE_DIR - * that contains the proper UAT definition file. - */ - @Parameterized.Parameters(name = "{index} {0}") - @JvmStatic - fun testParameters(): List<Array<String>> { - return File(UAT_BLUEPRINTS_BASE_DIR) - .listFiles { file -> file.isDirectory && File(file, EMBEDDED_UAT_FILE).isFile } - ?.map { file -> arrayOf(file.nameWithoutExtension, file.canonicalPath) } - ?: emptyList() - } - } - - @Rule - @JvmField - val springMethodRule = SpringMethodRule() - - @MockBean(name = RestLibConstants.SERVICE_BLUEPRINT_REST_LIB_PROPERTY, answer = Answers.RETURNS_SMART_NULLS) - lateinit var restClientFactory: BluePrintRestLibPropertyService - - @Autowired - // Bean is created programmatically by {@link WorkingFoldersInitializer#initialize(String)} - @Suppress("SpringJavaInjectionPointsAutowiringInspection") - lateinit var tempFolder: ExtendedTemporaryFolder - - @Autowired - lateinit var webTestClient: WebTestClient - - @Autowired - lateinit var mapper: ObjectMapper - - @BeforeTest - fun cleanupTemporaryFolder() { - tempFolder.deleteAllFiles() - } - - @Test - fun testBlueprint() { - val uat = UatDefinition.load(mapper, Paths.get(filename, EMBEDDED_UAT_FILE)) - - uploadBlueprint(blueprintName) - - // Configure mocked external services and save their expected requests for further validation - val requestsPerClient = uat.externalServices.associateBy( - { service -> createRestClientMock(service.selector, service.expectations) }, - { service -> service.expectations.map { it.request } } - ) - - // Run processes - for (process in uat.processes) { - log.info("Executing process '${process.name}'") - processBlueprint(process.request, process.expectedResponse, - JsonNormalizer.getNormalizer(mapper, process.responseNormalizerSpec)) - } - - // Validate requests to external services - for ((mockClient, requests) in requestsPerClient) { - requests.forEach { request -> - verify(mockClient, atLeastOnce()).exchangeResource( - eq(request.method), - eq(request.path), - argThat { assertJsonEqual(request.body, this) }, - argThat(RequiredMapEntriesMatcher(request.headers))) - } - // Don't mind the invocations to the overloaded exchangeResource(String, String, String) - verify(mockClient, atLeast(0)).exchangeResource(any(), any(), any()) - verifyNoMoreInteractions(mockClient) - } - } - - private fun createRestClientMock(selector: String, restExpectations: List<ExpectationDefinition>) - : BlueprintWebClientService { - val restClient = mock<BlueprintWebClientService>(verboseLogging = true, - defaultAnswer = Answers.RETURNS_SMART_NULLS) - - // Delegates to overloaded exchangeResource(String, String, String, Map<String, String>) - whenever(restClient.exchangeResource(any(), any(), any())) - .thenAnswer { invocation -> - val method = invocation.arguments[0] as String - val path = invocation.arguments[1] as String - val request = invocation.arguments[2] as String - restClient.exchangeResource(method, path, request, emptyMap()) - } - for (expectation in restExpectations) { - whenever(restClient.exchangeResource( - eq(expectation.request.method), - eq(expectation.request.path), - any(), - any())) - .thenReturn(WebClientResponse(expectation.response.status, expectation.response.body.toString())) - } - - whenever(restClientFactory.blueprintWebClientService(selector)) - .thenReturn(restClient) - return restClient - } - - private fun uploadBlueprint(blueprintName: String) { - val body = toMultiValueMap("file", getBlueprintAsResource(blueprintName)) - webTestClient - .post() - .uri("/api/v1/blueprint-model/publish") - .header("Authorization", TestSecuritySettings.clientAuthToken()) - .syncBody(body) - .exchange() - .expectStatus().isOk - } - - private fun processBlueprint(request: JsonNode, expectedResponse: JsonNode, - responseNormalizer: (String) -> String) { - webTestClient - .post() - .uri("/api/v1/execution-service/process") - .header("Authorization", TestSecuritySettings.clientAuthToken()) - .contentType(MediaType.APPLICATION_JSON_UTF8) - .body(Mono.just(request.toString()), String::class.java) - .exchange() - .expectStatus().isOk - .expectBody() - .consumeWith { response -> - assertJsonEqual(expectedResponse, responseNormalizer(getBodyAsString(response))) - } - } - - private fun getBlueprintAsResource(blueprintName: String): Resource { - val baseDir = Paths.get(UAT_BLUEPRINTS_BASE_DIR, blueprintName) - val zipBytes = compressToBytes(baseDir) - return object : ByteArrayResource(zipBytes) { - // Filename has to be returned in order to be able to post - override fun getFilename() = "$blueprintName.zip" - } - } - - private fun assertJsonEqual(expected: JsonNode, actual: String): Boolean { - if ((actual == "") && (expected is MissingNode)) { - return true - } - JSONAssert.assertEquals(expected.toString(), actual, JSONCompareMode.LENIENT) - // assertEquals throws an exception whenever match fails - return true - } - - private fun getBodyAsString(result: EntityExchangeResult<ByteArray>): String { - val body = result.responseBody - if ((body == null) || body.isEmpty()) { - return "" - } - val charset = result.responseHeaders.contentType?.charset ?: StandardCharsets.UTF_8 - return String(body, charset) - } -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/BaseUatTest.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/BaseUatTest.kt new file mode 100644 index 000000000..ec338f274 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/BaseUatTest.kt @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat + +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.COLOR_TEST_CLIENT +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.resetContextColor +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.setContextColor +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.AfterTest +import kotlin.test.BeforeTest + +@RunWith(SpringRunner::class) +// Also set blueprintsprocessor.httpPort=0 +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ContextConfiguration(initializers = [ + WorkingFoldersInitializer::class, + TestSecuritySettings.ServerContextInitializer::class +]) +@TestPropertySource(locations = ["classpath:application-test.properties"]) +abstract class BaseUatTest { + + @BeforeTest + fun setScope() { + setContextColor(COLOR_TEST_CLIENT) + } + + @AfterTest + fun clearScope() { + resetContextColor() + } + + companion object { + const val UAT_BLUEPRINTS_BASE_DIR = "../../../components/model-catalog/blueprint-model/uat-blueprints" + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/BlueprintsAcceptanceTest.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/BlueprintsAcceptanceTest.kt new file mode 100644 index 000000000..4fed0ce67 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/BlueprintsAcceptanceTest.kt @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat + +import org.junit.ClassRule +import org.junit.Rule +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.UAT_SPECIFICATION_FILE +import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintArchiveUtils.Companion.compressToBytes +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.context.junit4.rules.SpringClassRule +import org.springframework.test.context.junit4.rules.SpringMethodRule +import java.io.File +import java.nio.file.FileSystem +import java.nio.file.FileSystems +import kotlin.test.BeforeTest +import kotlin.test.Test + +// Only one runner can be configured with jUnit 4. We had to replace the SpringRunner by equivalent jUnit rules. +// See more on https://docs.spring.io/autorepo/docs/spring-framework/current/spring-framework-reference/testing.html#testcontext-junit4-rules +@RunWith(Parameterized::class) +class BlueprintsAcceptanceTest(@Suppress("unused") private val blueprintName: String, // readable test description + private val rootFs: FileSystem): BaseUatTest() { + + companion object { + + @ClassRule + @JvmField + val springClassRule = SpringClassRule() + + /** + * Generates the parameters to create a test instance for every blueprint found under UAT_BLUEPRINTS_BASE_DIR + * that contains the proper UAT definition file. + */ + @Parameterized.Parameters(name = "{index} {0}") + @JvmStatic + fun scanUatEmpoweredBlueprints(): List<Array<Any>> { + return (File(UAT_BLUEPRINTS_BASE_DIR) + .listFiles { file -> file.isDirectory && File(file, UAT_SPECIFICATION_FILE).isFile } + ?: throw RuntimeException("Failed to scan $UAT_BLUEPRINTS_BASE_DIR")) + .map { file -> + arrayOf( + file.nameWithoutExtension, + FileSystems.newFileSystem(file.canonicalFile.toPath(), null) + ) + } + } + } + + @Rule + @JvmField + val springMethodRule = SpringMethodRule() + + @Autowired + // Bean is created programmatically by {@link WorkingFoldersInitializer#initialize(String)} + @Suppress("SpringJavaInjectionPointsAutowiringInspection") + lateinit var tempFolder: ExtendedTemporaryFolder + + @Autowired + lateinit var uatExecutor: UatExecutor + + @BeforeTest + fun cleanupTemporaryFolder() { + tempFolder.deleteAllFiles() + } + + @Test + fun runUat() { + val uatSpec = rootFs.getPath(UAT_SPECIFICATION_FILE).toFile().readText() + val cbaBytes = compressToBytes(rootFs.getPath("/")) + uatExecutor.execute(uatSpec, cbaBytes) + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/ExtendedTemporaryFolder.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/ExtendedTemporaryFolder.kt index 57b4573ef..1c0067c36 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/ExtendedTemporaryFolder.kt +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/ExtendedTemporaryFolder.kt @@ -17,9 +17,8 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat -import org.junit.rules.TemporaryFolder import java.io.File import java.io.IOException import java.nio.file.* @@ -27,25 +26,27 @@ import java.nio.file.attribute.* import javax.annotation.PreDestroy class ExtendedTemporaryFolder { - private val tempFolder = TemporaryFolder() - - init { - tempFolder.create() - } + private val tempFolder = createTempDir("uat") @PreDestroy - fun delete() = tempFolder.delete() + fun delete() = tempFolder.deleteRecursively() /** * A delegate to org.junit.rules.TemporaryFolder.TemporaryFolder.newFolder(String). */ - fun newFolder(folder: String): File = tempFolder.newFolder(folder) + fun newFolder(folderName: String): File { + val dir = File(tempFolder, folderName) + if (!dir.mkdir()) { + throw IOException("Unable to create temporary directory $dir.") + } + return dir + } /** * Delete all files under the root temporary folder recursively. The folders are preserved. */ fun deleteAllFiles() { - Files.walkFileTree(tempFolder.root.toPath(), object : SimpleFileVisitor<Path>() { + Files.walkFileTree(tempFolder.toPath(), object : SimpleFileVisitor<Path>() { @Throws(IOException::class) override fun visitFile(file: Path?, attrs: BasicFileAttributes?): FileVisitResult { file?.toFile()?.delete() diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/MarkedSlf4jNotifier.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/MarkedSlf4jNotifier.kt new file mode 100644 index 000000000..13ebd9e4b --- /dev/null +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/MarkedSlf4jNotifier.kt @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat + +import com.github.tomakehurst.wiremock.common.Notifier +import org.slf4j.LoggerFactory +import org.slf4j.Marker + +class MarkedSlf4jNotifier(private val marker: Marker) : Notifier { + + override fun info(message: String) { + log.info(marker, message) + } + + override fun error(message: String) { + log.error(marker, message) + } + + override fun error(message: String, t: Throwable) { + log.error(marker, message, t) + } + + companion object { + private val log = LoggerFactory.getLogger("uat.WireMock") + } +} diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/TestSecuritySettings.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/TestSecuritySettings.kt index f7ab2554c..216df9aef 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/TestSecuritySettings.kt +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/TestSecuritySettings.kt @@ -17,13 +17,12 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat import org.springframework.context.ApplicationContextInitializer import org.springframework.context.ConfigurableApplicationContext import org.springframework.test.context.support.TestPropertySourceUtils import org.springframework.util.Base64Utils -import java.nio.charset.StandardCharsets class TestSecuritySettings { companion object { @@ -31,7 +30,7 @@ class TestSecuritySettings { private const val authPassword = "Heisenberg" fun clientAuthToken() = - "Basic " + Base64Utils.encodeToString("$authUsername:$authPassword".toByteArray(StandardCharsets.UTF_8)) + "Basic " + Base64Utils.encodeToString("$authUsername:$authPassword".toByteArray()) } class ServerContextInitializer : ApplicationContextInitializer<ConfigurableApplicationContext> { diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatServicesTest.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatServicesTest.kt new file mode 100644 index 000000000..78dc7099c --- /dev/null +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/UatServicesTest.kt @@ -0,0 +1,260 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.cds.blueprintsprocessor.uat + +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.MappingBuilder +import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder +import com.github.tomakehurst.wiremock.client.VerificationException +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.equalTo +import com.github.tomakehurst.wiremock.client.WireMock.equalToJson +import com.github.tomakehurst.wiremock.client.WireMock.request +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import org.apache.http.HttpStatus +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.ContentType +import org.apache.http.entity.mime.HttpMultipartMode +import org.apache.http.entity.mime.MultipartEntityBuilder +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClientBuilder +import org.apache.http.message.BasicHeader +import org.hamcrest.CoreMatchers +import org.hamcrest.MatcherAssert.assertThat +import org.hamcrest.Matchers.equalToIgnoringCase +import org.jetbrains.kotlin.konan.util.prefixIfNot +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.COLOR_WIREMOCK +import org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.LogColor.markerOf +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.UAT_SPECIFICATION_FILE +import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintArchiveUtils.Companion.compressToBytes +import org.skyscreamer.jsonassert.JSONAssert +import org.skyscreamer.jsonassert.JSONCompareMode +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.web.server.LocalServerPort +import org.springframework.core.env.ConfigurableEnvironment +import org.springframework.core.env.MapPropertySource +import org.springframework.http.HttpHeaders +import org.springframework.http.MediaType +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.support.TestPropertySourceUtils.INLINED_PROPERTIES_PROPERTY_SOURCE_NAME +import org.yaml.snakeyaml.Yaml +import java.nio.file.Paths +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertNotNull + +@ActiveProfiles("uat") +@Suppress("MemberVisibilityCanBePrivate") +class UatServicesTest : BaseUatTest() { + + companion object { + private const val BLUEPRINT_NAME = "pnf_config" + private val BLUEPRINT_BASE_DIR = Paths.get(UAT_BLUEPRINTS_BASE_DIR, BLUEPRINT_NAME) + private val UAT_PATH = BLUEPRINT_BASE_DIR.resolve(UAT_SPECIFICATION_FILE) + private val wireMockMarker = markerOf(COLOR_WIREMOCK) + } + + @Autowired + lateinit var mapper: ObjectMapper + + @Autowired + lateinit var environment: ConfigurableEnvironment + + private val ephemeralProperties = mutableSetOf<String>() + private val startedMockServers = mutableListOf<WireMockServer>() + + private fun setProperties(properties: Map<String, String>) { + inlinedPropertySource().putAll(properties) + ephemeralProperties += properties.keys + } + + @AfterTest + fun resetProperties() { + val source = inlinedPropertySource() + ephemeralProperties.forEach { key -> source.remove(key) } + ephemeralProperties.clear() + } + + @AfterTest + fun stopMockServers() { + startedMockServers.forEach { mockServer -> + try { + mockServer.checkForUnmatchedRequests() + } finally { + mockServer.stop() + } + } + startedMockServers.clear() + } + + private fun inlinedPropertySource(): MutableMap<String, Any> = + (environment.propertySources[INLINED_PROPERTIES_PROPERTY_SOURCE_NAME] as MapPropertySource).source + + @LocalServerPort + var localServerPort: Int = 0 + + // use lazy evaluation to postpone until localServerPort is injected by Spring + val baseUrl: String by lazy { + "http://127.0.0.1:$localServerPort" + } + + lateinit var httpClient: CloseableHttpClient + + @BeforeTest + fun setupHttpClient() { + val defaultHeaders = listOf(BasicHeader(org.apache.http.HttpHeaders.AUTHORIZATION, + TestSecuritySettings.clientAuthToken())) + httpClient = HttpClientBuilder.create() + .setDefaultHeaders(defaultHeaders) + .build() + } + + @Test + fun `verify service validates candidate UAT`() { + // GIVEN + val cbaBytes = compressToBytes(BLUEPRINT_BASE_DIR) + val multipartEntity = MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("cba", cbaBytes, ContentType.DEFAULT_BINARY, "cba.zip") + .build() + val request = HttpPost("$baseUrl/api/v1/uat/verify").apply { + entity = multipartEntity + } + + // WHEN + httpClient.execute(request) { response -> + + // THEN + val statusLine = response.statusLine + assertThat(statusLine.statusCode, CoreMatchers.equalTo(HttpStatus.SC_OK)) + } + } + + @Test + fun `spy service generates complete UAT from bare UAT`() { + // GIVEN + val uatSpec = UAT_PATH.toFile().readText() + val fullUat = UatDefinition.load(mapper, uatSpec) + val expectedJson = mapper.writeValueAsString(fullUat) + + val bareUatBytes = fullUat.toBare().dump(mapper).toByteArray() + + fullUat.externalServices.forEach { service -> + val mockServer = createMockServer(service) + mockServer.start() + startedMockServers += mockServer + setPropertiesForMockServer(service, mockServer) + } + + val cbaBytes = compressToBytes(BLUEPRINT_BASE_DIR) + val multipartEntity = MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("cba", cbaBytes, ContentType.DEFAULT_BINARY, "cba.zip") + .addBinaryBody("uat", bareUatBytes, ContentType.DEFAULT_BINARY, "uat.yaml") + .build() + val request = HttpPost("$baseUrl/api/v1/uat/spy").apply { + entity = multipartEntity + } + + // WHEN + httpClient.execute(request) { response -> + + // THEN + val statusLine = response.statusLine + assertThat(statusLine.statusCode, CoreMatchers.equalTo(HttpStatus.SC_OK)) + val entity = response.entity + assertNotNull(entity) + val contentType = ContentType.get(entity) + assertThat(contentType.mimeType, equalToIgnoringCase("text/vnd.yaml")) + val yamlResponse = entity.content.bufferedReader().readText() + val jsonResponse = yamlToJson(yamlResponse) + JSONAssert.assertEquals(expectedJson, jsonResponse, JSONCompareMode.LENIENT) + } + } + + private fun createMockServer(service: ServiceDefinition): WireMockServer { + val mockServer = WireMockServer(wireMockConfig() + .dynamicPort() + .notifier(MarkedSlf4jNotifier(wireMockMarker)) + ) + service.expectations.forEach { expectation -> + + val request = expectation.request + val response = expectation.response + // WebTestClient always use absolute path, prefixing with "/" if necessary + val urlPattern = urlEqualTo(request.path.prefixIfNot("/")) + val mappingBuilder: MappingBuilder = request(request.method, urlPattern) + request.headers.forEach { (key, value) -> + mappingBuilder.withHeader(key, equalTo(value)) + } + if (request.body != null) { + mappingBuilder.withRequestBody(equalToJson(mapper.writeValueAsString(request.body), true, true)) + } + + val responseDefinitionBuilder: ResponseDefinitionBuilder = aResponse() + .withStatus(response.status) + if (response.body != null) { + responseDefinitionBuilder.withBody(mapper.writeValueAsBytes(response.body)) + .withHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + } + + mappingBuilder.willReturn(responseDefinitionBuilder) + + mockServer.stubFor(mappingBuilder) + } + return mockServer + } + + private fun setPropertiesForMockServer(service: ServiceDefinition, mockServer: WireMockServer) { + val selector = service.selector + val httpPort = mockServer.port() + val properties = mapOf( + "blueprintsprocessor.restclient.$selector.type" to "basic-auth", + "blueprintsprocessor.restclient.$selector.url" to "http://localhost:$httpPort/", + // TODO credentials should be validated + "blueprintsprocessor.restclient.$selector.username" to "admin", + "blueprintsprocessor.restclient.$selector.password" to "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U" + ) + setProperties(properties) + } + + /** + * Borrowed from com.github.tomakehurst.wiremock.junit.WireMockRule.checkForUnmatchedRequests + */ + private fun WireMockServer.checkForUnmatchedRequests() { + val unmatchedRequests = findAllUnmatchedRequests() + if (unmatchedRequests.isNotEmpty()) { + val nearMisses = findNearMissesForAllUnmatchedRequests() + if (nearMisses.isEmpty()) { + throw VerificationException.forUnmatchedRequests(unmatchedRequests) + } else { + throw VerificationException.forUnmatchedNearMisses(nearMisses) + } + } + } + + private fun yamlToJson(yaml: String): String { + val map: Map<String, Any> = Yaml().load(yaml) + return mapper.writeValueAsString(map) + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/WorkingFoldersInitializer.kt b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/WorkingFoldersInitializer.kt index 37615cb1a..ab9ae31a0 100644 --- a/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/WorkingFoldersInitializer.kt +++ b/ms/blueprintsprocessor/application/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/uat/WorkingFoldersInitializer.kt @@ -17,7 +17,7 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ -package org.onap.ccsdk.cds.blueprintsprocessor +package org.onap.ccsdk.cds.blueprintsprocessor.uat import org.springframework.beans.factory.support.BeanDefinitionBuilder import org.springframework.beans.factory.support.BeanDefinitionRegistry diff --git a/ms/blueprintsprocessor/application/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/application/src/test/resources/logback-test.xml index 70d94f5a7..f635e7925 100644 --- a/ms/blueprintsprocessor/application/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/application/src/test/resources/logback-test.xml @@ -16,10 +16,17 @@ --> <configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %-40.40logger{39} : %msg%n</pattern> - </encoder> + <appender name="SIFT" class="ch.qos.logback.classic.sift.SiftingAppender"> + <discriminator class="org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.SmartColorDiscriminator"> + <defaultValue>white</defaultValue> + </discriminator> + <sift> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%${color}(%d{HH:mm:ss.SSS} %-5level %-40.40logger{39} : %msg%n)</pattern> + </encoder> + </appender> + </sift> </appender> <logger name="org.springframework.web.HttpLogging" level="trace"/> @@ -34,8 +41,11 @@ <logger name="org.hibernate.SQL" level="debug"/> <logger name="org.hibernate.type.descriptor.sql" level="trace"/> + <logger name="org.apache.http" level="debug"/> + <logger name="org.apache.http.wire" level="error"/> + <root level="info"> - <appender-ref ref="STDOUT"/> + <appender-ref ref="SIFT"/> </root> </configuration> diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index 2a227ebe1..6b1f186c9 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -46,6 +46,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic const val INPUT_PACKAGES = "packages" const val DEFAULT_SELECTOR = "remote-python" + const val ATTRIBUTE_EXEC_CMD_STATUS = "status" const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs" const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs" const val ATTRIBUTE_RESPONSE_DATA = "response-data" @@ -53,7 +54,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic override suspend fun processNB(executionRequest: ExecutionServiceInput) { - log.info("Processing : $operationInputs") + log.debug("Processing : $operationInputs") val bluePrintContext = bluePrintRuntimeService.bluePrintContext() val blueprintName = bluePrintContext.name() @@ -109,12 +110,17 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic ) val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput) log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}") - setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response)) + val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response) + setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logs) setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive()) - check(prepareEnvOutput.status == StatusType.SUCCESS) { - "failed to get prepare remote env response status for requestId(${prepareEnvInput.requestId})" + + if (prepareEnvOutput.status != StatusType.SUCCESS) { + setNodeOutputErrors(prepareEnvOutput.status.name, logs) + } else { + setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logs, "".asJsonPrimitive()) } } + // Populate command execution properties and pass it to the remote server val properties = dynamicProperties?.returnNullIfMissing()?.rootFieldsToMap() ?: hashMapOf() @@ -124,10 +130,13 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic command = scriptCommand, properties = properties) val remoteExecutionOutput = remoteScriptExecutionService.executeCommand(remoteExecutionInput) - log.info("$ATTRIBUTE_EXEC_CMD_LOG - ${remoteExecutionOutput.response}") - setAttribute(ATTRIBUTE_EXEC_CMD_LOG, JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)) - check(remoteExecutionOutput.status == StatusType.SUCCESS) { - "failed to get prepare remote command response status for requestId(${remoteExecutionOutput.requestId})" + + val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response) + if (remoteExecutionOutput.status != StatusType.SUCCESS) { + setNodeOutputErrors(remoteExecutionOutput.status.name,logs, remoteExecutionOutput.payload) + } else { + setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs, + remoteExecutionOutput.payload) } } catch (e: Exception) { @@ -139,7 +148,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) { bluePrintRuntimeService.getBluePrintError() - .addError("Failed in ComponentJythonExecutor : ${runtimeException.message}") + .addError("Failed in ComponentRemotePythonExecutor : ${runtimeException.message}") } private fun formatNestedJsonNode(node: JsonNode): String { @@ -151,4 +160,27 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic } return sb.toString() } + + /** + * Utility function to set the output properties of the executor node + */ + private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) { + setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status) + log.info("Executor status : $status") + setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) + log.info("Executor artifacts: $artifacts") + setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) + log.info("Executor message : $message") + } + + /** + * Utility function to set the output properties and errors of the executor node, in cas of errors + */ + private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive() ) { + setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive()) + setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) + setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) + + addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.asText()) + } } diff --git a/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt b/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt index d103bbf08..89af42579 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt @@ -194,6 +194,7 @@ class MockRemoteScriptExecutionService : RemoteScriptExecutionService { assertNotNull(prepareEnvInput.packages, "failed to get packages") val remoteScriptExecutionOutput = mockk<RemoteScriptExecutionOutput>() + every { remoteScriptExecutionOutput.payload } returns "payload".asJsonPrimitive() every { remoteScriptExecutionOutput.response } returns listOf("prepared successfully") every { remoteScriptExecutionOutput.status } returns StatusType.SUCCESS return remoteScriptExecutionOutput @@ -203,6 +204,7 @@ class MockRemoteScriptExecutionService : RemoteScriptExecutionService { assertEquals(remoteExecutionInput.requestId, "123456-1000", "failed to match request id") val remoteScriptExecutionOutput = mockk<RemoteScriptExecutionOutput>() + every { remoteScriptExecutionOutput.payload } returns "payload".asJsonPrimitive() every { remoteScriptExecutionOutput.response } returns listOf("processed successfully") every { remoteScriptExecutionOutput.status } returns StatusType.SUCCESS return remoteScriptExecutionOutput diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml index 5945e29fd..15cabb260 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- ~ Copyright © 2019 IBM. + ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -42,5 +43,9 @@ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>processor-core</artifactId> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + </dependency> </dependencies> </project> diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt new file mode 100644 index 000000000..55cf0941d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt @@ -0,0 +1,27 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc + +import io.grpc.Metadata + +fun Metadata.getStringKey(key: String): String? { + return this.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)) +} + +fun Metadata.putStringKeyValue(key: String, value: String) { + this.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt index 1bef3a0f2..0ec049a3c 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration @@ -23,10 +28,25 @@ import org.springframework.context.annotation.Configuration @ComponentScan open class BluePrintGrpcLibConfiguration +/** + * Exposed Dependency Service by this GRPC Lib Module + */ +fun BluePrintDependencyService.grpcLibPropertyService(): BluePrintGrpcLibPropertyService = + instance(GRPCLibConstants.SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY) + +fun BluePrintDependencyService.grpcClientService(selector: String): BluePrintGrpcClientService { + return grpcLibPropertyService().blueprintGrpcClientService(selector) +} + +fun BluePrintDependencyService.grpcClientService(jsonNode: JsonNode): BluePrintGrpcClientService { + return grpcLibPropertyService().blueprintGrpcClientService(jsonNode) +} + class GRPCLibConstants { companion object { const val SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY = "blueprint-grpc-lib-property-service" const val TYPE_TOKEN_AUTH = "token-auth" const val TYPE_BASIC_AUTH = "basic-auth" + const val TYPE_TLS_AUTH = "tls-auth" } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt index 76e60bd0d..47d16fbc7 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,24 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc +/** GRPC Server Properties */ +open class GrpcServerProperties { + lateinit var type: String + var port: Int = -1 +} + +open class TokenAuthGrpcServerProperties : GrpcServerProperties() { + lateinit var token: String +} + +open class TLSAuthGrpcServerProperties : GrpcServerProperties() { + lateinit var certChain: String + lateinit var privateKey: String + /** Below Used only for Mutual TLS */ + var trustCertCollection: String? = null +} + +/** GRPC Client Properties */ open class GrpcClientProperties { lateinit var type: String lateinit var host: String @@ -26,6 +45,13 @@ open class TokenAuthGrpcClientProperties : GrpcClientProperties() { lateinit var token: String } +open class TLSAuthGrpcClientProperties : GrpcClientProperties() { + var trustCertCollection: String? = null + /** Below Used only for Mutual TLS */ + var clientCertChain: String? = null + var clientPrivateKey: String? = null +} + open class BasicAuthGrpcClientProperties : GrpcClientProperties() { lateinit var username: String lateinit var password: String diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt new file mode 100644 index 000000000..f3b14b59f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor + +import io.grpc.* +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +class GrpcClientLoggingInterceptor : ClientInterceptor { + val log = logger(GrpcClientLoggingInterceptor::class) + + val loggingService = GrpcLoggerService() + + override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>, + callOptions: CallOptions, channel: Channel): ClientCall<ReqT, RespT> { + + return object : ForwardingClientCall + .SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) { + + override fun start(responseListener: Listener<RespT>, headers: Metadata) { + val listener = object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) { + override fun onMessage(message: RespT) { + loggingService.grpcInvoking(headers) + super.onMessage(message) + } + } + super.start(listener, headers) + } + } + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt new file mode 100644 index 000000000..e21d5d3ce --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt @@ -0,0 +1,92 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor + +import io.grpc.* +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintDownloadInput +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintRemoveInput +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintUploadInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.slf4j.MDC + +class GrpcServerLoggingInterceptor : ServerInterceptor { + val log = logger(GrpcServerLoggingInterceptor::class) + val loggingService = GrpcLoggerService() + + override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>, + requestHeaders: Metadata, next: ServerCallHandler<ReqT, RespT>) + : ServerCall.Listener<ReqT> { + + val forwardingServerCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) { + override fun sendHeaders(responseHeaders: Metadata) { + loggingService.grpResponding(requestHeaders, responseHeaders) + super.sendHeaders(responseHeaders) + } + } + + return object + : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>( + next.startCall(forwardingServerCall, requestHeaders)) { + + override fun onMessage(message: ReqT) { + /** Get the requestId, SubRequestId and Originator Id and set in MDS context + * If you are using other GRPC services, Implement own Logging Interceptors to get tracing. + * */ + when (message) { + is ExecutionServiceInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintUploadInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintDownloadInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintRemoveInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + else -> { + loggingService.grpcRequesting(call, requestHeaders, next) + } + } + super.onMessage(message) + } + + override fun onComplete() { + MDC.clear() + super.onComplete() + } + + override fun onCancel() { + MDC.clear() + super.onCancel() + } + + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt index a1d2188ab..f4933a3ad 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,17 +19,56 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BasicAuthGrpcClientProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.* import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.returnNullIfMissing import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @Service(GRPCLibConstants.SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY) open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: BluePrintProperties) { + /** GRPC Server Lib Property Service */ + fun grpcServerProperties(jsonNode: JsonNode): GrpcServerProperties { + return when (val type = jsonNode.get("type").textValue()) { + GRPCLibConstants.TYPE_TOKEN_AUTH -> { + JacksonUtils.readValue(jsonNode, TokenAuthGrpcServerProperties::class.java)!! + } + GRPCLibConstants.TYPE_TLS_AUTH -> { + JacksonUtils.readValue(jsonNode, TLSAuthGrpcServerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Grpc type($type) not supported") + } + } + } + + fun grpcServerProperties(prefix: String): GrpcServerProperties { + val type = bluePrintProperties.propertyBeanType( + "$prefix.type", String::class.java) + return when (type) { + GRPCLibConstants.TYPE_TOKEN_AUTH -> { + tokenAuthGrpcServerProperties(prefix) + } + GRPCLibConstants.TYPE_TLS_AUTH -> { + tlsAuthGrpcServerProperties(prefix) + } + else -> { + throw BluePrintProcessorException("Grpc type($type) not supported") + } + } + } + + private fun tokenAuthGrpcServerProperties(prefix: String): TokenAuthGrpcServerProperties { + return bluePrintProperties.propertyBeanType(prefix, TokenAuthGrpcServerProperties::class.java) + } + + private fun tlsAuthGrpcServerProperties(prefix: String): TLSAuthGrpcServerProperties { + return bluePrintProperties.propertyBeanType(prefix, TLSAuthGrpcServerProperties::class.java) + } + + /** GRPC Client Lib Property Service */ + fun blueprintGrpcClientService(jsonNode: JsonNode): BluePrintGrpcClientService { val restClientProperties = grpcClientProperties(jsonNode) return blueprintGrpcClientService(restClientProperties) @@ -42,11 +82,15 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue fun grpcClientProperties(jsonNode: JsonNode): GrpcClientProperties { - val type = jsonNode.get("type").textValue() + val type = jsonNode.get("type").returnNullIfMissing()?.textValue() + ?: BluePrintProcessorException("missing type property") return when (type) { GRPCLibConstants.TYPE_TOKEN_AUTH -> { JacksonUtils.readValue(jsonNode, TokenAuthGrpcClientProperties::class.java)!! } + GRPCLibConstants.TYPE_TLS_AUTH -> { + JacksonUtils.readValue(jsonNode, TLSAuthGrpcClientProperties::class.java)!! + } GRPCLibConstants.TYPE_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, BasicAuthGrpcClientProperties::class.java)!! } @@ -63,6 +107,9 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue GRPCLibConstants.TYPE_TOKEN_AUTH -> { tokenAuthGrpcClientProperties(prefix) } + GRPCLibConstants.TYPE_TLS_AUTH -> { + tlsAuthGrpcClientProperties(prefix) + } GRPCLibConstants.TYPE_BASIC_AUTH -> { basicAuthGrpcClientProperties(prefix) } @@ -75,12 +122,15 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties): BluePrintGrpcClientService { - when (grpcClientProperties) { + return when (grpcClientProperties) { is TokenAuthGrpcClientProperties -> { - return TokenAuthGrpcClientService(grpcClientProperties) + TokenAuthGrpcClientService(grpcClientProperties) + } + is TLSAuthGrpcClientProperties -> { + TLSAuthGrpcClientService(grpcClientProperties) } is BasicAuthGrpcClientProperties -> { - return BasicAuthGrpcClientService(grpcClientProperties) + BasicAuthGrpcClientService(grpcClientProperties) } else -> { throw BluePrintProcessorException("couldn't get grpc service for type(${grpcClientProperties.type})") @@ -92,6 +142,10 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue return bluePrintProperties.propertyBeanType(prefix, TokenAuthGrpcClientProperties::class.java) } + private fun tlsAuthGrpcClientProperties(prefix: String): TLSAuthGrpcClientProperties { + return bluePrintProperties.propertyBeanType(prefix, TLSAuthGrpcClientProperties::class.java) + } + private fun basicAuthGrpcClientProperties(prefix: String): BasicAuthGrpcClientProperties { return bluePrintProperties.propertyBeanType(prefix, BasicAuthGrpcClientProperties::class.java) } diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt index 016c05035..0d9291615 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +18,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service import io.grpc.ManagedChannel +import io.grpc.netty.NettyServerBuilder + +interface BluePrintGrpcServerService { + fun serverBuilder(): NettyServerBuilder +} interface BluePrintGrpcClientService { suspend fun channel(): ManagedChannel diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt new file mode 100644 index 000000000..6d2ba43cc --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt @@ -0,0 +1,101 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.Grpc +import io.grpc.Metadata +import io.grpc.ServerCall +import io.grpc.ServerCallHandler +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.net.InetSocketAddress +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class GrpcLoggerService { + + private val log = logger(GrpcLoggerService::class) + + /** Used when server receives request */ + fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>, + headers: Metadata, next: ServerCallHandler<ReqT, RespT>) { + val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID() + val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID() + val partnerName = headers.getStringKey(ONAP_PARTNER_NAME) ?: "UNKNOWN" + grpcRequesting(requestID, invocationID, partnerName, call) + } + + fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>, + headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) { + val requestID = headers.requestId.defaultToUUID() + val invocationID = headers.subRequestId.defaultToUUID() + val partnerName = headers.originatorId ?: "UNKNOWN" + grpcRequesting(requestID, invocationID, partnerName, call) + } + + fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String, + call: ServerCall<ReqT, RespT>) { + val localhost = InetAddress.getLocalHost() + + val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress + ?: throw BluePrintProcessorException("failed to get client address") + val serviceName = call.methodDescriptor.fullMethodName + + MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", serviceName) + log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + + /** Used before invoking any GRPC outbound request, Inbound Invocation ID is used as request Id + * for outbound Request, If invocation Id is missing then default Request Id will be generated. + */ + fun grpcInvoking(requestHeader: Metadata) { + requestHeader.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.putStringKeyValue(ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.putStringKeyValue(ONAP_PARTNER_NAME, partnerName) + } + + /** Used when server returns response */ + fun grpResponding(requestHeaders: Metadata, responseHeaders: Metadata) { + try { + responseHeaders.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("RequestID").defaultToEmpty()) + responseHeaders.putStringKeyValue(ONAP_INVOCATION_ID, MDC.get("InvocationID").defaultToEmpty()) + responseHeaders.putStringKeyValue(ONAP_PARTNER_NAME, MDC.get("PartnerName").defaultToEmpty()) + } catch (e: Exception) { + log.warn("couldn't set grpc response headers", e) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt new file mode 100644 index 000000000..a70cbbce0 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt @@ -0,0 +1,54 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.ManagedChannel +import io.grpc.internal.DnsNameResolverProvider +import io.grpc.internal.PickFirstLoadBalancerProvider +import io.grpc.netty.GrpcSslContexts +import io.grpc.netty.NettyChannelBuilder +import io.netty.handler.ssl.SslContext +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + +class TLSAuthGrpcClientService(private val tlsAuthGrpcClientProperties: TLSAuthGrpcClientProperties) + : BluePrintGrpcClientService { + + override suspend fun channel(): ManagedChannel { + return NettyChannelBuilder + .forAddress(tlsAuthGrpcClientProperties.host, tlsAuthGrpcClientProperties.port) + .nameResolverFactory(DnsNameResolverProvider()) + .loadBalancerFactory(PickFirstLoadBalancerProvider()) + .intercept(GrpcClientLoggingInterceptor()) + .sslContext(sslContext()) + .build() + } + + fun sslContext(): SslContext { + val builder = GrpcSslContexts.forClient() + if (tlsAuthGrpcClientProperties.trustCertCollection != null) { + builder.trustManager(normalizedFile(tlsAuthGrpcClientProperties.trustCertCollection!!)) + } + if (tlsAuthGrpcClientProperties.clientCertChain != null + && tlsAuthGrpcClientProperties.clientPrivateKey != null) { + builder.keyManager(normalizedFile(tlsAuthGrpcClientProperties.clientCertChain!!), + normalizedFile(tlsAuthGrpcClientProperties.clientPrivateKey!!)) + } + return builder.build() + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt new file mode 100644 index 000000000..fc73d43f9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt @@ -0,0 +1,49 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.netty.GrpcSslContexts +import io.grpc.netty.NettyServerBuilder +import io.netty.handler.ssl.ClientAuth +import io.netty.handler.ssl.SslContext +import io.netty.handler.ssl.SslContextBuilder +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + + +class TLSAuthGrpcServerService(private val tlsAuthGrpcServerProperties: TLSAuthGrpcServerProperties) + : BluePrintGrpcServerService { + + override fun serverBuilder(): NettyServerBuilder { + return NettyServerBuilder + .forPort(tlsAuthGrpcServerProperties.port) + .sslContext(sslContext()) + } + + fun sslContext(): SslContext { + val sslClientContextBuilder = SslContextBuilder + .forServer(normalizedFile(tlsAuthGrpcServerProperties.certChain), + normalizedFile(tlsAuthGrpcServerProperties.privateKey)) + + tlsAuthGrpcServerProperties.trustCertCollection?.let { trustCertFile -> + sslClientContextBuilder.trustManager(normalizedFile(trustCertFile)) + sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE) + } + return GrpcSslContexts.configure(sslClientContextBuilder).build() + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt index dbff84211..601dc0e33 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt @@ -21,6 +21,7 @@ import io.grpc.internal.DnsNameResolverProvider import io.grpc.internal.PickFirstLoadBalancerProvider import io.grpc.netty.NettyChannelBuilder import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: TokenAuthGrpcClientProperties) : BluePrintGrpcClientService { @@ -30,6 +31,7 @@ class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: Toke .forAddress(tokenAuthGrpcClientProperties.host, tokenAuthGrpcClientProperties.port) .nameResolverFactory(DnsNameResolverProvider()) .loadBalancerFactory(PickFirstLoadBalancerProvider()) + .intercept(GrpcClientLoggingInterceptor()) .intercept(TokenAuthClientInterceptor(tokenAuthGrpcClientProperties)).usePlaintext().build() return managedChannel } diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt index 8df218fe9..b7ddc1569 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,9 +23,8 @@ import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BasicAuthGrpcClientProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BluePrintGrpcLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.* +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -42,11 +42,25 @@ import kotlin.test.assertTrue "blueprintsprocessor.grpcclient.sample.port=50505", "blueprintsprocessor.grpcclient.sample.username=sampleuser", "blueprintsprocessor.grpcclient.sample.password=sampleuser", + "blueprintsprocessor.grpcclient.token.type=token-auth", "blueprintsprocessor.grpcclient.token.host=127.0.0.1", "blueprintsprocessor.grpcclient.token.port=50505", "blueprintsprocessor.grpcclient.token.username=sampleuser", - "blueprintsprocessor.grpcclient.token.password=sampleuser" + "blueprintsprocessor.grpcclient.token.password=sampleuser", + + "blueprintsprocessor.grpcserver.tls-sample.type=tls-auth", + "blueprintsprocessor.grpcserver.tls-sample.port=50505", + "blueprintsprocessor.grpcserver.tls-sample.certChain=server1.pem", + "blueprintsprocessor.grpcserver.tls-sample.privateKey=server1.key", + "blueprintsprocessor.grpcserver.tls-sample.trustCertCollection=ca.pem", + + "blueprintsprocessor.grpcclient.tls-sample.type=tls-auth", + "blueprintsprocessor.grpcclient.tls-sample.host=127.0.0.1", + "blueprintsprocessor.grpcclient.tls-sample.port=50505", + "blueprintsprocessor.grpcclient.tls-sample.trustCertCollection=ca.pem", + "blueprintsprocessor.grpcclient.tls-sample.clientCertChain=client.pem", + "blueprintsprocessor.grpcclient.tls-sample.clientPrivateKey=client.key" ]) class BluePrintGrpcLibPropertyServiceTest { @@ -129,4 +143,52 @@ class BluePrintGrpcLibPropertyServiceTest { .blueprintGrpcClientService(actualObj) assertTrue(svc is BasicAuthGrpcClientService) } + + @Test + fun testGrpcClientTLSProperties() { + val properties = bluePrintGrpcLibPropertyService + .grpcClientProperties("blueprintsprocessor.grpcclient.tls-sample") as TLSAuthGrpcClientProperties + assertNotNull(properties, "failed to create property bean") + assertNotNull(properties.host, "failed to get host property in property bean") + assertNotNull(properties.port, "failed to get host property in property bean") + assertNotNull(properties.trustCertCollection, "failed to get trustCertCollection property in property bean") + assertNotNull(properties.clientCertChain, "failed to get clientCertChain property in property bean") + assertNotNull(properties.clientPrivateKey, "failed to get clientPrivateKey property in property bean") + + val configDsl = """{ + "type" : "tls-auth", + "host" : "localhost", + "port" : "50505", + "trustCertCollection" : "server1.pem", + "clientCertChain" : "server1.key", + "clientPrivateKey" : "ca.pem" + } + """.trimIndent() + val jsonProperties = bluePrintGrpcLibPropertyService + .grpcClientProperties(configDsl.jsonAsJsonType()) as TLSAuthGrpcClientProperties + assertNotNull(jsonProperties, "failed to create property bean from json") + } + + @Test + fun testGrpcServerTLSProperties() { + val properties = bluePrintGrpcLibPropertyService + .grpcServerProperties("blueprintsprocessor.grpcserver.tls-sample") as TLSAuthGrpcServerProperties + assertNotNull(properties, "failed to create property bean") + assertNotNull(properties.port, "failed to get host property in property bean") + assertNotNull(properties.trustCertCollection, "failed to get trustCertCollection property in property bean") + assertNotNull(properties.certChain, "failed to get certChain property in property bean") + assertNotNull(properties.privateKey, "failed to get privateKey property in property bean") + + val configDsl = """{ + "type" : "tls-auth", + "port" : "50505", + "certChain" : "server1.pem", + "privateKey" : "server1.key", + "trustCertCollection" : "ca.pem" + } + """.trimIndent() + val jsonProperties = bluePrintGrpcLibPropertyService + .grpcServerProperties(configDsl.jsonAsJsonType()) as TLSAuthGrpcServerProperties + assertNotNull(jsonProperties, "failed to create property bean from json") + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt new file mode 100644 index 000000000..8154d3747 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt @@ -0,0 +1,109 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming +import com.google.protobuf.util.JsonFormat +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import java.util.* +import kotlin.test.Test +import kotlin.test.assertNotNull + +class BluePrintGrpcServerTest { + + private val tlsAuthGrpcServerProperties = TLSAuthGrpcServerProperties().apply { + port = 50052 + type = GRPCLibConstants.TYPE_TLS_AUTH + certChain = "src/test/resources/tls-manual/py-executor-chain.pem" + privateKey = "src/test/resources/tls-manual/py-executor-key.pem" + } + + private val tlsAuthGrpcClientProperties = TLSAuthGrpcClientProperties().apply { + host = "localhost" + port = 50052 + type = GRPCLibConstants.TYPE_TLS_AUTH + trustCertCollection = "src/test/resources/tls-manual/py-executor-chain.pem" + } + + @Test + fun testGrpcTLSContext() { + val tlsAuthGrpcServerService = TLSAuthGrpcServerService(tlsAuthGrpcServerProperties) + val sslContext = tlsAuthGrpcServerService.sslContext() + assertNotNull(sslContext, "failed to create grpc server ssl context") + + val tlsAuthGrpcClientService = TLSAuthGrpcClientService(tlsAuthGrpcClientProperties) + val clientSslContext = tlsAuthGrpcClientService.sslContext() + assertNotNull(clientSslContext, "failed to create grpc client ssl context") + } + + /** TLS Client Integration testing, GRPC TLS Junit testing is not supported. */ + //@Test + fun testGrpcTLSServerIntegration() { + runBlocking { + val tlsAuthGrpcClientService = TLSAuthGrpcClientService(tlsAuthGrpcClientProperties) + val grpcChannel = tlsAuthGrpcClientService.channel() + /** Get Send and Receive Channel for bidirectional process method*/ + val (reqChannel, resChannel) = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), + grpcChannel) + launch { + resChannel.consumeEach { + log.info("Received Response") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + resChannel.cancel() + } + } + } + val request = getRequest("12345") + reqChannel.send(request) + } + } + + private fun getRequest(requestId: String): ExecutionServiceInput { + val commonHeader = CommonHeader.newBuilder() + .setTimestamp("2012-04-23T18:25:43.511Z") + .setOriginatorId("System") + .setRequestId(requestId) + .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build() + val actionIdentifier = ActionIdentifiers.newBuilder() + .setActionName("SampleScript") + .setBlueprintName("sample-cba") + .setBlueprintVersion("1.0.0") + .setMode(ACTION_MODE_SYNC) + .build() + val jsonContent = """{ "key1" : "value1" }""" + val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder + JsonFormat.parser().merge(jsonContent, payloadBuilder) + + return ExecutionServiceInput.newBuilder() + .setCommonHeader(commonHeader) + .setActionIdentifiers(actionIdentifier) + .setPayload(payloadBuilder.build()) + .build() + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt new file mode 100644 index 000000000..d5bc70c48 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt @@ -0,0 +1,90 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.stub.StreamObserver +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.common.api.Status +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput + + +val log = logger(MockTLSBluePrintProcessingServer::class) + +/** For Integration testing stat this server, Set the working path to run this method */ +fun main() { + try { + val tlsAuthGrpcServerProperties = TLSAuthGrpcServerProperties().apply { + port = 50052 + type = GRPCLibConstants.TYPE_TLS_AUTH + certChain = "src/test/resources/tls-manual/py-executor-chain.pem" + privateKey = "src/test/resources/tls-manual/py-executor-key.pem" + } + val server = TLSAuthGrpcServerService(tlsAuthGrpcServerProperties).serverBuilder() + .intercept(GrpcServerLoggingInterceptor()) + .addService(MockTLSBluePrintProcessingServer()) + .build() + server.start() + log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...") + server.awaitTermination() + } catch (e: Exception) { + log.error("Failed to start tls grpc integration server", e) + } + +} + +class MockTLSBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { + override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { + + return object : StreamObserver<ExecutionServiceInput> { + override fun onNext(executionServiceInput: ExecutionServiceInput) { + log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " + + "subRequestId(${executionServiceInput.commonHeader.subRequestId})") + responseObserver.onNext(buildResponse(executionServiceInput)) + responseObserver.onCompleted() + } + + override fun onError(error: Throwable) { + log.debug("Fail to process message", error) + responseObserver.onError(io.grpc.Status.INTERNAL + .withDescription(error.message) + .asException()) + } + + override fun onCompleted() { + log.info("Completed") + } + } + } + + private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput { + val status = Status.newBuilder().setCode(200) + .setEventType(EventType.EVENT_COMPONENT_EXECUTED) + .build() + return ExecutionServiceOutput.newBuilder() + .setCommonHeader(input.commonHeader) + .setActionIdentifiers(input.actionIdentifiers) + .setStatus(status) + .build() + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README new file mode 100644 index 000000000..c4e91a2fc --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README @@ -0,0 +1,5 @@ + +Generate Certifactes & Keys +---------------------------- + +openssl req -x509 -newkey rsa:4096 -keyout my-private-key.pem -out my-public-key-cert.pem -days 3650 -nodes -subj '/CN=localhost' diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem new file mode 100644 index 000000000..30f09dfea --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem @@ -0,0 +1,27 @@ +-----BEGIN CERTIFICATE----- +MIIEpDCCAowCCQDyhR+GR2RUiTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls +b2NhbGhvc3QwHhcNMTkxMDIzMDAwMTA0WhcNMjkxMDIwMDAwMTA0WjAUMRIwEAYD +VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCs +c4d6qfbW+GSMp+XURoLXtSAbbehoBXL2beSzqQNW6e+Q9IVtSPZst8VRjUXelFzM +m7VpS9jhiXOPZ5KKUOD0GVuNQc54VpwtHt7t9L5wS9OvdnLijnMIkc0iUvC6+Rcq +HSfbNC2Tb+a8jLwojmtRCeY/MyCnmqYpD+U3b6Eue89VpMOIfmDuTqSRRBYNVO72 +hq7FI3UD8+zREg7htfzjJjG14Ec5iVMDxJA1FlwtXFnZxDHgbLjEVjTTR/9Wm1eU +aJ4oWRt3gG/vnJNa+GwN4w/My+j/5/n/YpNh6GeQrHxBl/SL/SAFBshlwozr4K4K +av5MqRKyhCACV4SsdhKJUEDtvrtukJvh/ZDW8jdNbFJAljm8UucZGbJrZl6G7XB3 +WteI7rezo0mL0NMBZIT3nQSMEpefKUFZFiE5lYvIk3UuChqIM0xdgV4INwLRHZdc +1TtiGaBJV05y3Klo5gaUgNGbHP26zfub5TydiMrOA5W2mUvMkG2oit9aqnbaZBLD +t17cCKzpzcVF5uNUng3j6sQvpTt3S4L28TvKUMAfpecQqvxMoxG0/9HZuv2z+U+L +LVVsS07yJPIGMLcq1LMM++8LwD1MupcoShjNOq/lUOL6hIMfLOIfxt8Kv8WykVzv +6yjKEIurjkwMipq4kvr9J7FFi54kGr7uvXWQRHDFJwIDAQABMA0GCSqGSIb3DQEB +CwUAA4ICAQB7gJzvaOIP3/S2jrObz67g0jiz1cfb4I9KQwpwb6JUWbYm1QjBcGm4 +IhNbdPMD6dpwBc/A4JctA5E+/fArvl14UtK1jkaaE/GCumL0VUSZeAM6CK/63brt +LplqCunv8ePHmiwjJBnhu+ewe1+mDMVDMw0iot/q+pOM3vqNS1Fipja+xFK1JQZx +JmkjW/Ug3NHk/SSTfO+VNmlI5bBBApMqKmd9picsyDZ7dTBtZvbqV5eQsPZvv14G +oEvWnvvom+D5GojroSO+OMHNDR3bzK6p0Cu8AiTy9Ls6J2e4GXJz3Cg/kuF9tNlR +3X62zDT+CUipuYyTvmjbSyNMGwU7BIZTKFPuTtjh7EwT2g6S8RV9PmT98CQW6kTT +RJbL7nMIOF0WusysAT5wj1HJ0QKBQCXK+L6WTKTTovaEE7JSVrYe7wVF8Q9SyBIM +4CPVZt+GMyQKJ9SRnVgTDEMb7sj9HPaoVeDc6LQTv8Q//wFeTdZIWXQhpVJCQCEG +qkRk9r3isF60ISOXXIYhqE+hx3QXY9M2UyHDtKXPZ7X370vADi2ebBMF8MpIZYl5 +628dME9JhOhLhD5qPJeva2Nq4gLpK+rO6t7ML0Us4edoKyoScowXAh80q1GW3EO3 +IxTK123651C/S0kDqLqZ9rknEdpwSujrT2UW95jUlfo5OKDrPpdOBw== +-----END CERTIFICATE----- diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem new file mode 100644 index 000000000..830a3ae21 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCsc4d6qfbW+GSM +p+XURoLXtSAbbehoBXL2beSzqQNW6e+Q9IVtSPZst8VRjUXelFzMm7VpS9jhiXOP +Z5KKUOD0GVuNQc54VpwtHt7t9L5wS9OvdnLijnMIkc0iUvC6+RcqHSfbNC2Tb+a8 +jLwojmtRCeY/MyCnmqYpD+U3b6Eue89VpMOIfmDuTqSRRBYNVO72hq7FI3UD8+zR +Eg7htfzjJjG14Ec5iVMDxJA1FlwtXFnZxDHgbLjEVjTTR/9Wm1eUaJ4oWRt3gG/v +nJNa+GwN4w/My+j/5/n/YpNh6GeQrHxBl/SL/SAFBshlwozr4K4Kav5MqRKyhCAC +V4SsdhKJUEDtvrtukJvh/ZDW8jdNbFJAljm8UucZGbJrZl6G7XB3WteI7rezo0mL +0NMBZIT3nQSMEpefKUFZFiE5lYvIk3UuChqIM0xdgV4INwLRHZdc1TtiGaBJV05y +3Klo5gaUgNGbHP26zfub5TydiMrOA5W2mUvMkG2oit9aqnbaZBLDt17cCKzpzcVF +5uNUng3j6sQvpTt3S4L28TvKUMAfpecQqvxMoxG0/9HZuv2z+U+LLVVsS07yJPIG +MLcq1LMM++8LwD1MupcoShjNOq/lUOL6hIMfLOIfxt8Kv8WykVzv6yjKEIurjkwM +ipq4kvr9J7FFi54kGr7uvXWQRHDFJwIDAQABAoICADepPmRAMbTnDYU8t/jRHXBE +PO29htL0V0vk4nl+pt5JuZJe6iYA89DZa+3LnG6gEmfUJjSrT4BUXiE+O9U7D7CZ +8qvgPqUmx1fk6+2AHmuefd/XanNnqQduD/jxLlQbC/gC2xdsev1ok9/tyNmKRmcs +u81QUkzmpJUCVWiUNkELozswaBBJQj4I0iM1B60b6dlWVVi5/g3dkGVW38jIdaxX +apoansKaaVoA+s63vd7CPRoFsleOoAB3FqvPREIO97CmJ848HJpwsTB0qDcnkbDV +xgbDFhxrIozko09ptOvEUILXag45EDmvG8WEivmjVml0aUoTFD7cWHyJBQCpR4fU +5W9mYd4Rrzbmpb+LGYdNyrp3wo3C7dJ7/ffBMQxmXTdMZkcxorxj4BRG3oACRQ1u +Ff1iUruZzIIDtEkrC9hc5QpLlDf9b1obm8L9sxf1QmTt59o5oFG40GPwPP19GXwE +l2faHwho2jYLM9rhuSsK/5sSmUshPNQYmfMnbWzTtghMPE/g0Cfpt8qbspq+G1bk +z3M97JlFMF83ccRotDElX9E/ttjU7Lehoz+1sOyHiVW1E4oqKer4t+nI2bp6VYZm +W94qptW7kb4o0DsvPCaoTPBxLJ1ag2WBlqoFkVI0YaxZiZ8OTR55Ovi4z5xWBO1q +NkCKgdAUQvQVzVtASVGBAoIBAQDSw2nvPFN4gGZ6OI+8j2gWtPcsrhSHS9ykxBeB +mB/HExYIe8k3EvClf2rnfwzuKgKyVMp7Ev7nH2jS/PGZq37QyXrw0NBGRnvJY0Ez +YB1KTgf9xaHMGMut5efNvv/cPwYriqosgJ0pdt0vvUAIQ6EBv+iDXXqJ1lQUSRYk +wKjFABi6TeJY4t9vC474KoXTDaHlwn9+TwnuRBk85wrZzlhK90J0iVa9/Eqeddsc +Z3CuTlc+NmcP3qvniYODq8nyVc0pKw+28AVYYEd3aJfgm+dpcB21L0oz7CaxH/Rz +FNONuQRaOzJrcuJsde/KG2X+MHs6hVMXXXWciPrJ2l+Cq7dnAoIBAQDRdwZDcgem +tJHLihCRzUl9PKip4ZA5757ZyTy6WMLR3wMS2cNTK8+bTrUa0SSC4WSI28pybFA7 +QdSR08c5Nd7jXcIrtqspgZKhb0E60i8VQHhh6ba/kyQjsEz9c/G1WquPK13j2vZ0 +79bomDwFJPsFzABU+sC0/F42ZVQzy9qXkjngjtmaGfrCc7X+pV28nEGtyxHci3L4 +XXfE2dOb+GBVZPLBVXwcthdRYsFuU9GMy2GH0zVtWPOcGRnlpx53Tqg7NIeR0Nm1 +K35EaK8PH92PsAr0Xza7vQHY4cPRz+RhDzjyGQtnhKf96U6gzzt4ZVbQ/UuzDBcL +PQ2DvUH+sqxBAoIBAEW5kiUsDu0xhTVv2tVll+jTK2ZjnLT5ut/jY2djHTgtrz9V +PEb1BBmsIoC9PljYGxZGCMpYiW2KrZIHTiIpYwXNcdeTLSPik3cXV+2YIXiAghJJ +PHKZzWAVS+97/YcubmsfL5cTYWrjQN9XO4TAYtaCV3iGB1DsT9p6J1I3Tl4F3yhb +NcN0IrjI2R5uauFchC/PfYAaw81ISBUm1iciJYF/dUO6X7DwcvsjQD6QVe3ESwZw +1v2gC7zIeHKp9WAvVHUHIubBVvNavqnZN01+JjtydNGI+IJe4Jn+WU9tF2OuTqtP +JCn50sBQ7+gr0j0aatn8W3XCXHNRua3niWtgRYcCggEAT7OzfWxhPuyMYV9qiKAN +a4ruPp3mjDUCQ6pP4jQuBT+PYtfbe8U63MSpIsgb1XVAFNdVBA70xGd7I/XqY3l9 +ExS08n8yR7vW+Hhl4KTjZ3m9lLwiXmj1omLOGM7KVRBoITUGJ9JEXyB3rM9oXyjA +H2eNZMh5FSTGEHqj/IV/6paoUSrp37os8VqoEHoJ3d+zGhcf98RT/e9KyGt+GmX6 ++eNMf4YwkJg07THfmkRoguNMfCtAtBfZsjbW5MyfShRy7PxC7ZgDju06wXr3yZB9 +dNQuhufH4s27azQUl7w8ETaCm5QuA7i1V2c0FPpljZ052JHZAQsDpbIYd11HREvm +QQKCAQEA0W7xNYoFvnyikdG0t266LLv1EkWDFdgkelGx/eGe/JZ+au3uTM94EssC +ni64XX2P8vK/te+c3jItYO4MRgnDJ7GW+bRnJFu2kBE0W4chx7vga0XApVCP+Ugg +owv5yf9cOAHFulvPefsU0snYStD3gNq77XDg0CwoyUkpeq+GiupoQ8tquMSsrEwp +ve5DtDip3cLHz2oVLB3mR4kKVwVwmOgO5RKq6N/H6Jxtf/Zk1I260dKr+Dv2MnDh +dysO4zH5YEt2ML3oY4zY8lu+I5bHCBR1updSny0B31WrXAJyfZpMx+HOwETFKa3B +v9AGKz0Jc2GOIRKHrCQ/WkZePetaYQ== +-----END PRIVATE KEY----- diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt new file mode 100644 index 000000000..a817c0c74 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import java.nio.charset.Charset + + +fun <T : Headers> T?.toMap(): MutableMap<String, String> { + val map: MutableMap<String, String> = hashMapOf() + this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } + return map +} + +fun Headers.addHeader(key: String, value: String) { + this.add(RecordHeader(key, value.toByteArray())) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 1cd8a2af7..184e85b70 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,8 +34,10 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var clientId: String? = null + lateinit var clientId: String var topic: String? = null + var autoCommit: Boolean = true + var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 25f0bf44d..8bcc7580a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,47 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +/** Consumer Function Interfaces */ +interface ConsumerFunction interface BlueprintMessageConsumerService { + suspend fun subscribe(): Channel<String> { + return subscribe(null) + } + /** Subscribe to the Kafka channel with [additionalConfig] */ suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String> + /** Consume and execute dynamic function [consumerFunction] */ + suspend fun consume(consumerFunction: ConsumerFunction) { + consume(null, consumerFunction) + } + + /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + + /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + /** close the channel, consumer and other resources */ suspend fun shutDown() - +} +/** Consumer dynamic implementation interface */ +interface KafkaConsumerRecordsFunction : ConsumerFunction { + suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *>) }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index e33d41c09..7d8138639 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean = runBlocking { - sendMessageNB(message) + fun sendMessage(message: Any): Boolean { + return sendMessage(message = message, headers = null) } - fun sendMessage(topic: String, message: Any): Boolean = runBlocking { - sendMessageNB(topic, message) + fun sendMessage(topic: String, message: Any): Boolean { + return sendMessage(topic, message, null) } - suspend fun sendMessageNB(message: Any): Boolean + fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(message = message, headers = headers) + } + + fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(topic, message, headers) + } + + suspend fun sendMessageNB(message: Any): Boolean { + return sendMessageNB(message = message, headers = null) + } + + suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean + + suspend fun sendMessageNB(topic: String, message: Any): Boolean { + return sendMessageNB(topic, message, null) + } - suspend fun sendMessageNB(topic: String, message: Any): Boolean + suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b5d444a49..757846c81 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,33 +25,39 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread -class KafkaBasicAuthMessageConsumerService( +open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { - private val channel = Channel<String>() - private var kafkaConsumer: Consumer<String, String>? = null val log = logger(KafkaBasicAuthMessageConsumerService::class) + val channel = Channel<String>() + var kafkaConsumer: Consumer<String, ByteArray>? = null @Volatile var keepGoing = true - fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> { + fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> { val configProperties = hashMapOf<String, Any>() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - if (messageConsumerProperties.clientId != null) { - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! - } + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId + /** To handle Back pressure, Get only configured record for processing */ if (messageConsumerProperties.pollRecords > 0) { configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords @@ -70,7 +77,7 @@ class KafkaBasicAuthMessageConsumerService( } - override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -80,22 +87,22 @@ class KafkaBasicAuthMessageConsumerService( "topics(${messageConsumerProperties.bootstrapServers})" } - kafkaConsumer!!.subscribe(consumerTopic) - log.info("Successfully consumed topic($consumerTopic)") + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") - thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.info("Consumed Records : ${consumerRecords.count()}") + log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ consumerRecord.value()?.let { launch { if (!channel.isClosedForSend) { - channel.send(it) + channel.send(String(it, Charset.defaultCharset())) } else { log.error("Channel is closed to receive message") } @@ -110,6 +117,46 @@ class KafkaBasicAuthMessageConsumerService( return channel } + override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + override suspend fun shutDown() { /** stop the polling loop */ keepGoing = false diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 1c93bb0fc..42adcd712 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,18 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.SendResult -import org.springframework.util.concurrent.ListenableFutureCallback +import java.nio.charset.Charset class KafkaBasicAuthMessageProducerService( private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) @@ -34,42 +37,46 @@ class KafkaBasicAuthMessageProducerService( private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - private var kafkaTemplate: KafkaTemplate<String, Any>? = null + private var kafkaProducer: KafkaProducer<String, ByteArray>? = null + + private val messageLoggerService = MessageLoggerService() override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessage(messageProducerProperties.topic!!, message) + return sendMessageNB(messageProducerProperties.topic!!, message) } - override suspend fun sendMessageNB(topic: String, message: Any): Boolean { - val serializedMessage = when (message) { - is String -> { - message - } - else -> { - message.asJsonType().toString() - } - } - val future = messageTemplate().send(topic, serializedMessage) + override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } - future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> { - override fun onSuccess(result: SendResult<String, Any>) { - log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]") - } + override suspend fun sendMessageNB(topic: String, message: Any, + headers: MutableMap<String, String>?): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } - override fun onFailure(ex: Throwable) { - log.error("Unable to send message", ex) - } - }) + val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) + headers?.let { + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.info("message published offset(${metadata.offset()}, headers :$headers )") + } + messageTemplate().send(record, callback).get() return true } - private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> { - log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { + log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = hashMapOf<String, Any>() configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } @@ -79,14 +86,11 @@ class KafkaBasicAuthMessageProducerService( if (additionalConfig != null) { configProps.putAll(additionalConfig) } - return DefaultKafkaProducerFactory(configProps) - } - fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> { - if (kafkaTemplate == null) { - kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) + if (kafkaProducer == null) { + kafkaProducer = KafkaProducer(configProps) } - return kafkaTemplate!! + return kafkaProducer!! } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt new file mode 100644 index 000000000..21bf1b76c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Headers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class MessageLoggerService { + + private val log = logger(MessageLoggerService::class) + + fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) { + messageConsuming(headers.requestId, headers.subRequestId, + headers.originatorId, consumerRecord) + } + + fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID() + val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID() + val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" + messageConsuming(requestID, invocationID, partnerName, consumerRecord) + } + + + fun messageConsuming(requestID: String, invocationID: String, partnerName: String, + consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val localhost = InetAddress.getLocalHost() + MDC.put("InvokeTimestamp", ZonedDateTime + .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", consumerRecord.topic()) + // Custom MDC for Message Consumers + MDC.put("Offset", consumerRecord.offset().toString()) + MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty()) + log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + /** Used before producing message request, Inbound Invocation ID is used as request Id + * for produced message Request, If invocation Id is missing then default Request Id will be generated. + */ + fun messageProducing(requestHeader: Headers) { + val localhost = InetAddress.getLocalHost() + requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName) + requestHeader.addHeader("ClientIPAddress", localhost.hostAddress) + } + + fun messageConsumingExisting() { + MDC.clear() + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f4e85a94b..bdceec7d3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.MockConsumer -import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext @@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest { partitionsEndMap[partition] = records topicsCollection.add(partition.topic()) } - val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) mockKafkaConsumer.subscribe(topicsCollection) mockKafkaConsumer.rebalance(partitions) mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i", - "I am message $i") + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) mockKafkaConsumer.addRecord(record) } @@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaBasicAuthConsumerWithDynamicFunction() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList<TopicPartition> = arrayListOf() + val topicsCollection: MutableList<String> = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf() + val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + /** Test Consumer Function implementation */ + val consumerFunction = object : KafkaConsumerRecordsFunction { + override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + val count = consumerRecords.count() + log.trace("Received Message count($count)") + } + } + spyBlueprintMessageConsumerService.consume(consumerFunction) + delay(10) + spyBlueprintMessageConsumerService.shutDown() + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ //@Test fun testKafkaIntegration() { @@ -131,7 +179,10 @@ open class BlueprintMessageConsumerServiceTest { launch { repeat(5) { delay(100) - blueprintMessageProducerService.sendMessage("this is my message($it)") + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", + headers = headers) } } delay(5000) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 31bcc1517..f23624f7a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,18 +20,18 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.RecordMetadata import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.springframework.beans.factory.annotation.Autowired -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.SendResult import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner -import org.springframework.util.concurrent.SettableListenableFuture +import java.util.concurrent.Future import kotlin.test.Test import kotlin.test.assertTrue @@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest { val blueprintMessageProducerService = bluePrintMessageLibPropertyService .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService - val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>() + val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() - val future = SettableListenableFuture<SendResult<String, Any>>() - //future.setException(BluePrintException("failed sending")) + val responseMock = mockk<Future<RecordMetadata>>() + every { responseMock.get() } returns mockk() - every { mockKafkaTemplate.send(any(), any()) } returns future + every { mockKafkaTemplate.send(any(), any()) } returns responseMock val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt new file mode 100644 index 000000000..82e40efd1 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.slf4j.MDC +import kotlin.test.assertEquals + +class MessageLoggerServiceTest { + + + @Test + fun testMessagingHeaders() { + val messageLoggerService = MessageLoggerService() + val commonHeader = CommonHeader().apply { + requestId = "1234" + subRequestId = "1234-12" + originatorId = "cds-test" + } + + val consumerRecord = mockk<ConsumerRecord<*, *>>() + every { consumerRecord.headers() } returns null + every { consumerRecord.key() } returns "1234" + every { consumerRecord.offset() } returns 12345 + every { consumerRecord.topic() } returns "sample-topic" + every { consumerRecord.timestamp() } returns System.currentTimeMillis() + messageLoggerService.messageConsuming(commonHeader, consumerRecord) + assertEquals(commonHeader.requestId, MDC.get("RequestID")) + assertEquals(commonHeader.subRequestId, MDC.get("InvocationID")) + + val mockHeaders = RecordHeaders() + messageLoggerService.messageProducing(mockHeaders) + val map = mockHeaders.toMap() + assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID]) + + messageLoggerService.messageConsumingExisting() + + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 3868440c7..820041f74 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -1,5 +1,6 @@ <!-- ~ Copyright © 2019 IBM. + ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -15,11 +16,18 @@ --> <configuration> + + <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> + <property name="defaultPattern" + value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> + <property name="testing" + value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern> + <pattern>${localPattern}</pattern> </encoder> </appender> diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt deleted file mode 100644 index cdf6ce195..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactor.ReactorContext -import kotlinx.coroutines.reactor.asCoroutineContext -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine -import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext -import reactor.core.publisher.Mono -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext - -/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ -@UseExperimental(InternalCoroutinesApi::class) -fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> - - val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) - ?: sink.currentContext()).asCoroutineContext() - /** Populate MDC context only if present in Reactor Context */ - val newContext = if (!reactorContext.context.isEmpty - && reactorContext.context.hasKey(MDCContext)) { - val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) - GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) - } else GlobalScope.newCoroutineContext(context + reactorContext) - - val coroutine = MonoMDCCoroutine(newContext, sink) - sink.onDispose(coroutine) - coroutine.start(CoroutineStart.DEFAULT, coroutine, block) -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt index 2f9ea4a25..d63f34ced 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt @@ -39,7 +39,8 @@ data class RemoteScriptExecutionInput(var requestId: String, data class RemoteScriptExecutionOutput(var requestId: String, var response: List<String>, var status: StatusType = StatusType.SUCCESS, - var timestamp: Date = Date()) + var timestamp: Date = Date(), + var payload: JsonNode) data class PrepareRemoteEnvInput(var requestId: String, var correlationId: String? = null, diff --git a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/BluePrintRestLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/BluePrintRestLibPropertyService.kt index 9fa13bdaf..384946ae8 100644 --- a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/BluePrintRestLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/BluePrintRestLibPropertyService.kt @@ -29,16 +29,32 @@ import org.springframework.stereotype.Service open class BluePrintRestLibPropertyService(private var bluePrintProperties: BluePrintProperties) { - open fun blueprintWebClientService(jsonNode: JsonNode): - BlueprintWebClientService { - val restClientProperties = restClientProperties(jsonNode) - return blueprintWebClientService(restClientProperties) + private var preInterceptor: PreInterceptor? = null + private var postInterceptor: PostInterceptor? = null + + fun setInterceptors(preInterceptor: PreInterceptor?, postInterceptor: PostInterceptor?) { + this.preInterceptor = preInterceptor + this.postInterceptor = postInterceptor + } + + fun clearInterceptors() { + this.preInterceptor = null + this.postInterceptor = null + } + + open fun blueprintWebClientService(jsonNode: JsonNode): BlueprintWebClientService { + val service = preInterceptor?.getInstance(jsonNode) + ?: blueprintWebClientService(restClientProperties(jsonNode)) + return postInterceptor?.getInstance(jsonNode, service) ?: service } open fun blueprintWebClientService(selector: String): BlueprintWebClientService { - val prefix = "blueprintsprocessor.restclient.$selector" - val restClientProperties = restClientProperties(prefix) - return blueprintWebClientService(restClientProperties) + val service = preInterceptor?.getInstance(selector) ?: run { + val prefix = "blueprintsprocessor.restclient.$selector" + val restClientProperties = restClientProperties(prefix) + blueprintWebClientService(restClientProperties) + } + return postInterceptor?.getInstance(selector, service) ?: service } fun restClientProperties(prefix: String): RestClientProperties { @@ -182,6 +198,18 @@ open class BluePrintRestLibPropertyService(private var bluePrintProperties: return bluePrintProperties.propertyBeanType( prefix, PolicyManagerRestClientProperties::class.java) } + + interface PreInterceptor { + fun getInstance(jsonNode: JsonNode): BlueprintWebClientService? + + fun getInstance(selector: String): BlueprintWebClientService? + } + + interface PostInterceptor { + fun getInstance(jsonNode: JsonNode, service: BlueprintWebClientService): BlueprintWebClientService + + fun getInstance(selector: String, service: BlueprintWebClientService): BlueprintWebClientService + } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt index 4da7dcd0e..cec11ae3c 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt +++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt @@ -14,43 +14,47 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.core.service +package org.onap.ccsdk.cds.blueprintsprocessor.rest.service -import kotlinx.coroutines.AbstractCoroutine -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.handleCoroutineException +import kotlinx.coroutines.* +import kotlinx.coroutines.reactor.ReactorContext +import kotlinx.coroutines.reactor.asCoroutineContext +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.slf4j.MDC import org.springframework.http.server.reactive.ServerHttpRequest import org.springframework.http.server.reactive.ServerHttpResponse import reactor.core.Disposable +import reactor.core.publisher.Mono import reactor.core.publisher.MonoSink +import java.net.InetAddress import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter -import java.util.* import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext -class LoggingService { - private val log = logger(LoggingService::class) +class RestLoggerService { + private val log = logger(RestLoggerService::class) - companion object { - const val ONAP_REQUEST_ID = "X-ONAP-RequestID" - const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID" - const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName" - } fun entering(request: ServerHttpRequest) { + val localhost = InetAddress.getLocalHost() val headers = request.headers - val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID)) - val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID)) - val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME)) + val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID() + val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID() + val partnerName = headers.getFirst(ONAP_PARTNER_NAME).defaultToEmpty() MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) MDC.put("RequestID", requestID) MDC.put("InvocationID", invocationID) MDC.put("PartnerName", partnerName) - MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress)) - MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString)) + MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty()) + MDC.put("ServerFQDN",localhost.hostName.defaultToEmpty()) if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) { MDC.put("ServiceName", request.uri.path) } @@ -62,22 +66,35 @@ class LoggingService { val resHeaders = response.headers resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID") resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID") + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + resHeaders[ONAP_PARTNER_NAME] = partnerName } catch (e: Exception) { log.warn("couldn't set response headers", e) } finally { MDC.clear() } } +} - private fun defaultToEmpty(input: Any?): String { - return input?.toString() ?: "" - } - private fun defaultToUUID(input: String?): String { - return input ?: UUID.randomUUID().toString() - } -} +/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ +@UseExperimental(InternalCoroutinesApi::class) +fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> + + val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) + ?: sink.currentContext()).asCoroutineContext() + /** Populate MDC context only if present in Reactor Context */ + val newContext = if (!reactorContext.context.isEmpty + && reactorContext.context.hasKey(MDCContext)) { + val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) + GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) + } else GlobalScope.newCoroutineContext(context + reactorContext) + val coroutine = MonoMDCCoroutine(newContext, sink) + sink.onDispose(coroutine) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) +} @InternalCoroutinesApi class MonoMDCCoroutine<in T>( diff --git a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt index a6bff7051..bf251f6c3 100644 --- a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt @@ -19,9 +19,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.designer.api import io.swagger.annotations.ApiOperation import io.swagger.annotations.ApiParam -import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelSearch import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.handler.BluePrintModelHandler +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.springframework.core.io.Resource import org.springframework.http.MediaType diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index f14f61e60..345650686 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -23,7 +23,7 @@ import io.swagger.annotations.ApiParam import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput -import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.logger diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index 20af589a1..ade47cf3f 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -23,11 +23,13 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.* import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.toProto +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils import org.slf4j.LoggerFactory import org.springframework.stereotype.Service @@ -70,26 +72,45 @@ class ExecutionServiceHandler(private val bluePrintLoadConfiguration: BluePrintL val blueprintName = actionIdentifiers.blueprintName val blueprintVersion = actionIdentifiers.blueprintVersion try { - val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) - log.info("blueprint base path $basePath") + /** Check Blueprint is needed for this request */ + if (checkServiceFunction(executionServiceInput)) { + return executeServiceFunction(executionServiceInput) + } else { + val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) + log.info("blueprint base path $basePath") - val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) - val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService, - executionServiceInput, hashMapOf()) + val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService, + executionServiceInput, hashMapOf()) - val errors = blueprintRuntimeService.getBluePrintError().errors - if (errors.isNotEmpty()) { - val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", ")) - setErrorStatus(errorMessage, output.status) + val errors = blueprintRuntimeService.getBluePrintError().errors + if (errors.isNotEmpty()) { + val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", ")) + setErrorStatus(errorMessage, output.status) + } + return output } - return output } catch (e: Exception) { log.error("fail processing request id $requestId", e) return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } } + /** If the blueprint name is default, It means no blueprint is needed for the execution */ + fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean { + return executionServiceInput.actionIdentifiers.blueprintName == "default" + } + + /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */ + suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { + val actionName = executionServiceInput.actionIdentifiers.actionName + val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName) + checkNotNull(instance) { "failed to initialize service function($actionName)" } + instance.actionName = actionName + return instance.applyNB(executionServiceInput) + } + private fun setErrorStatus(errorMessage: String, status: Status) { status.errorMessage = errorMessage status.eventType = EventType.EVENT_COMPONENT_FAILURE.name diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt new file mode 100644 index 000000000..293da0da6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.ApplicationContext +import org.springframework.stereotype.Service +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.Test +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +@RunWith(SpringRunner::class) +@ContextConfiguration(classes = [MockServiceAction::class]) +class ExecutionServiceHandlerTest { + + @Autowired + lateinit var applicationContext: ApplicationContext + + @Before + fun init() { + BluePrintDependencyService.inject(applicationContext) + } + + @Test + fun testExecuteServiceFunction() { + val executionServiceInput = ExecutionServiceInput().apply { + commonHeader = CommonHeader().apply { + requestId = "1234" + subRequestId = "1234-12" + originatorId = "cds-test" + } + actionIdentifiers = ActionIdentifiers().apply { + blueprintName = "default" + blueprintVersion = "1.0.0" + actionName = "mock-service-action" + } + } + runBlocking { + val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk()) + val isServiceFunction = executionServiceHandler.checkServiceFunction(executionServiceInput) + assertTrue(isServiceFunction, "failed to checkServiceFunction") + val executionServiceOutput = executionServiceHandler.executeServiceFunction(executionServiceInput) + assertNotNull(executionServiceOutput, "failed to get executionServiceOutput") + } + } +} + +@Service("mock-service-action") +class MockServiceAction : AbstractServiceFunction() { + override suspend fun processNB(executionRequest: ExecutionServiceInput) { + val responsePayload = """{"answer" : "correct"}""".jsonAsJsonType() + setResponsePayloadForAction(responsePayload) + } + + override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) { + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractServiceFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractServiceFunction.kt new file mode 100644 index 000000000..67ab9c4de --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractServiceFunction.kt @@ -0,0 +1,112 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution + +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintFunctionNode +import org.onap.ccsdk.cds.controllerblueprints.core.jsonPathParse +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils + +/** This implementation is used to build services, which doesn't need blueprints */ +abstract class AbstractServiceFunction : BlueprintFunctionNode<ExecutionServiceInput, ExecutionServiceOutput> { + + @Transient + private val log = logger(AbstractServiceFunction::class) + + lateinit var executionServiceInput: ExecutionServiceInput + var executionServiceOutput = ExecutionServiceOutput() + lateinit var processId: String + lateinit var actionName: String + lateinit var responseActionPayload: JsonNode + + override fun getName(): String { + return actionName + } + + override suspend fun prepareRequestNB(executionRequest: ExecutionServiceInput): ExecutionServiceInput { + + this.executionServiceInput = executionRequest + + actionName = executionRequest.actionIdentifiers.actionName + check(actionName.isNotEmpty()) { "couldn't get action name" } + + processId = executionRequest.commonHeader.requestId + check(processId.isNotEmpty()) { "couldn't get process id for service action($actionName)" } + + return executionRequest + } + + override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { + try { + prepareRequestNB(executionServiceInput) + processNB(executionServiceInput) + } catch (runtimeException: RuntimeException) { + log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException) + recoverNB(runtimeException, executionServiceInput) + } + return prepareResponseNB() + } + + override suspend fun prepareResponseNB(): ExecutionServiceOutput { + log.debug("Preparing Response...") + executionServiceOutput.commonHeader = executionServiceInput.commonHeader + executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers + var status = Status() + try { + // Set the Response Payload + executionServiceOutput.payload = JacksonUtils.objectMapper.createObjectNode() + executionServiceOutput.payload.set("$actionName-response", responseActionPayload) + // Set the Default Step Status + status.eventType = EventType.EVENT_COMPONENT_EXECUTED.name + } catch (e: Exception) { + status.message = BluePrintConstants.STATUS_FAILURE + status.eventType = EventType.EVENT_COMPONENT_FAILURE.name + } + executionServiceOutput.status = status + return this.executionServiceOutput + } + + fun setResponsePayloadForAction(actionPayload: JsonNode) { + this.responseActionPayload = actionPayload + } + + /** + * Get Execution Input Payload data + */ + fun requestPayload(): JsonNode? { + return executionServiceInput.payload + } + + /** + * Get Execution Input payload action property with [expression] + * ex: requestPayloadActionProperty("data") will look for path "payload/<action-name>-request/data" + */ + fun requestPayloadActionProperty(expression: String?): JsonNode? { + val requestExpression = if (expression.isNullOrBlank()) { + "$actionName-request" + } else { + "$actionName-request.$expression" + } + return executionServiceInput.payload.jsonPathParse(".$requestExpression") + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt index b0e3e4701..d6146e111 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt @@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.* import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService import org.onap.ccsdk.cds.controllerblueprints.command.api.* +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.slf4j.LoggerFactory import org.springframework.beans.factory.config.ConfigurableBeanFactory @@ -154,7 +155,8 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi return RemoteScriptExecutionOutput( requestId = this.requestId, response = this.responseList, - status = StatusType.valueOf(this.status.name) + status = StatusType.valueOf(this.status.name), + payload = payload.jsonAsJsonType() ) } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt index e291aa78e..6bffffdb5 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt @@ -18,8 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts import io.grpc.ServerBuilder import io.grpc.stub.StreamObserver +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType import org.onap.ccsdk.cds.controllerblueprints.common.api.Status +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput @@ -36,8 +40,13 @@ class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintPr override fun onNext(executionServiceInput: ExecutionServiceInput) { log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " + "subRequestId(${executionServiceInput.commonHeader.subRequestId})") - responseObserver.onNext(buildNotification(executionServiceInput)) - responseObserver.onNext(buildResponse(executionServiceInput)) + runBlocking { + launch(MDCContext()) { + responseObserver.onNext(buildNotification(executionServiceInput)) + responseObserver.onNext(buildResponse(executionServiceInput)) + log.info("message has sent successfully...") + } + } responseObserver.onCompleted() } @@ -85,6 +94,7 @@ fun main() { try { val server = ServerBuilder .forPort(50052) + .intercept(GrpcServerLoggingInterceptor()) .addService(MockBluePrintProcessingServer()) .build() server.start() diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt index 29d24c6ad..9a5be0151 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt @@ -16,6 +16,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution +import com.google.protobuf.util.JsonFormat import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder import io.grpc.testing.GrpcCleanupRule @@ -26,6 +27,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import org.junit.Rule import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService @@ -132,12 +134,17 @@ class StreamingRemoteExecutionServiceTest { .setActionName("SampleScript") .setBlueprintName("sample-cba") .setBlueprintVersion("1.0.0") + .setMode(ACTION_MODE_SYNC) .build() + val jsonContent = """{ "key1" : "value1" }""" + val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder + JsonFormat.parser().merge(jsonContent, payloadBuilder) + return ExecutionServiceInput.newBuilder() .setCommonHeader(commonHeader) .setActionIdentifiers(actionIdentifier) - //.setPayload(payloadBuilder.build()) + .setPayload(payloadBuilder.build()) .build() } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml index afe10b39d..8951e1a71 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml @@ -15,11 +15,18 @@ --> <configuration> + + <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> + <property name="defaultPattern" + value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> + <property name="testing" + value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> + <pattern>${testing}</pattern> </encoder> </appender> diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml index 37a071280..a77122fe8 100755 --- a/ms/blueprintsprocessor/parent/pom.xml +++ b/ms/blueprintsprocessor/parent/pom.xml @@ -48,6 +48,7 @@ <json.unit.version>2.8.0</json.unit.version> <xmlunit.version>2.6.3</xmlunit.version> + <netty-ssl>2.0.26.Final</netty-ssl> <sshd.version>2.2.0</sshd.version> <jsch.version>0.1.55</jsch.version> <jython.version>2.7.1</jython.version> @@ -269,6 +270,11 @@ <artifactId>protobuf-java-util</artifactId> <version>${protobuff.java.utils.version}</version> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <version>${netty-ssl}</version> + </dependency> <!-- Adaptors --> <dependency> @@ -665,6 +671,10 @@ <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + </dependency> </dependencies> <repositories> |