From c9b99347c7c425fce7a1f5f3c7e2ac500f2f0c5c Mon Sep 17 00:00:00 2001 From: Bruno Sakoto Date: Fri, 4 Jun 2021 07:49:14 -0400 Subject: Add kafka listener for data updated events See "Running via Docker Compose" section from README.md file to have an example of event processing Issue-ID: CPS-371 Signed-off-by: Bruno Sakoto Change-Id: Id3abfa32fb04e07102a5f28e6e43a9b533391188 --- .../cps/temporal/controller/QueryController.java | 35 ------ .../listener/exception/EventListenerException.java | 34 ++++++ .../exception/InvalidEventEnvelopException.java | 73 ++++++++++++ .../listener/kafka/DataUpdatedEventListener.java | 129 +++++++++++++++++++++ .../DataUpdatedEventListenerErrorHandler.java | 44 +++++++ .../event/model/CpsDataUpdatedEventMapper.java | 56 +++++++++ .../temporal/controller/web/QueryController.java | 35 ++++++ .../org/onap/cps/temporal/domain/NetworkData.java | 9 +- .../onap/cps/temporal/domain/NetworkDataId.java | 6 +- .../temporal/service/NetworkDataServiceImpl.java | 27 ++++- .../cps/temporal/service/ServiceException.java | 34 ++++++ src/main/resources/application-sasl-ssl-kafka.yml | 31 +++++ src/main/resources/application.yml | 28 ++++- src/main/resources/logback.xml | 43 +++++++ 14 files changed, 531 insertions(+), 53 deletions(-) delete mode 100644 src/main/java/org/onap/cps/temporal/controller/QueryController.java create mode 100644 src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java create mode 100644 src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java create mode 100644 src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java create mode 100644 src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java create mode 100644 src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java create mode 100644 src/main/java/org/onap/cps/temporal/controller/web/QueryController.java create mode 100644 src/main/java/org/onap/cps/temporal/service/ServiceException.java create mode 100644 src/main/resources/application-sasl-ssl-kafka.yml create mode 100644 src/main/resources/logback.xml (limited to 'src/main') diff --git a/src/main/java/org/onap/cps/temporal/controller/QueryController.java b/src/main/java/org/onap/cps/temporal/controller/QueryController.java deleted file mode 100644 index d083dc9..0000000 --- a/src/main/java/org/onap/cps/temporal/controller/QueryController.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2021 Bell Canada. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.temporal.controller; - -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; - -/** - * Controller for query requests. - */ -@RestController -public class QueryController { - - @GetMapping("/") - public String home() { - return "Welcome to CPS Temporal Service!"; - } - -} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java new file mode 100644 index 0000000..a9d1ce2 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.exception; + +/** + * Class representing a listener exception related to system event error. + */ +public class EventListenerException extends RuntimeException { + + public EventListenerException(final String message) { + super(message); + } + + public EventListenerException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java new file mode 100644 index 0000000..df4e756 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.exception; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** + * Class representing an invalid event envelop exception. + */ +@Getter +public class InvalidEventEnvelopException extends EventListenerException { + + private final List invalidFields = new ArrayList<>(); + + public InvalidEventEnvelopException(final String message) { + super(message); + } + + public void addInvalidField(final InvalidField invalidField) { + this.invalidFields.add(invalidField); + } + + public boolean hasInvalidFields() { + return ! this.invalidFields.isEmpty(); + } + + @Override + public String getMessage() { + return String.format("%s. invalidFields: %s", super.getMessage(), this.invalidFields.toString()); + } + + @AllArgsConstructor + @Getter + @EqualsAndHashCode + @ToString + public static class InvalidField implements Serializable { + + private static final long serialVersionUID = -7118283787669377391L; + + private final ErrorType errorType; + private final String fieldName; + private final String actualValue; + private final String expectedValue; + + public enum ErrorType { + UNEXPECTED, MISSING + } + + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java new file mode 100644 index 0000000..79c9d92 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java @@ -0,0 +1,129 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.kafka; + +import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING; +import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED; + +import java.net.URI; +import java.net.URISyntaxException; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.event.model.CpsDataUpdatedEvent; +import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException; +import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException; +import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper; +import org.onap.cps.temporal.service.NetworkDataService; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +/** + * Listener for data updated events. + */ +@Component +@Slf4j +public class DataUpdatedEventListener { + + private static final URI EVENT_SOURCE; + + static { + try { + EVENT_SOURCE = new URI("urn:cps:org.onap.cps"); + } catch (final URISyntaxException e) { + throw new EventListenerException("Invalid URI for event source.", e); + } + } + + private static final String EVENT_TYPE = "org.onap.cps.data-updated-event"; + + private final NetworkDataService networkDataService; + private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper; + + /** + * Constructor. + */ + public DataUpdatedEventListener( + final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) { + this.networkDataService = networkDataService; + this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper; + } + + /** + * Consume the specified event. + * + * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted. + */ + @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler") + public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) { + + log.debug("Receiving {} ...", cpsDataUpdatedEvent); + + // Validate event envelop + validateEventEnvelop(cpsDataUpdatedEvent); + + // Map event to entity + final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent); + log.debug("Persisting {} ...", networkData); + + // Persist entity + final var persistedNetworkData = this.networkDataService.addNetworkData(networkData); + log.debug("Persisted {}", persistedNetworkData); + + } + + private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) { + + final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure"); + + // Schema + if (cpsDataUpdatedEvent.getSchema() == null) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + MISSING, "schema", null, + CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT + .value())); + } + // Id + if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + MISSING, "id", null, null)); + } + // Source + if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "source", + cpsDataUpdatedEvent.getSource() != null + ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString())); + } + // Type + if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) { + invalidEventEnvelopException.addInvalidField( + new InvalidEventEnvelopException.InvalidField( + UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE)); + } + + if (invalidEventEnvelopException.hasInvalidFields()) { + throw invalidEventEnvelopException; + } + + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java new file mode 100644 index 0000000..7a4ee7f --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.listener.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +/** + * Class responsible to handle errors for data updated event listener. + */ +@Component +@Slf4j +class DataUpdatedEventListenerErrorHandler implements KafkaListenerErrorHandler { + + @Override + public Object handleError(final Message message, final ListenerExecutionFailedException exception) { + log.error( + "Failed to process message {}. Error cause is {}.", + message, + exception.getCause() != null ? exception.getCause().toString() : null, + exception); + return exception; + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java new file mode 100644 index 0000000..9ef25d5 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.event.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.onap.cps.event.model.CpsDataUpdatedEvent; +import org.onap.cps.event.model.Data; +import org.onap.cps.temporal.domain.NetworkData; + +/** + * Mapper for data updated event schema. + */ +@Mapper(componentModel = "spring") +public abstract class CpsDataUpdatedEventMapper { + + private static final DateTimeFormatter ISO_TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + @Mapping(source = "content.observedTimestamp", target = "observedTimestamp") + @Mapping(source = "content.dataspaceName", target = "dataspace") + @Mapping(source = "content.schemaSetName", target = "schemaSet") + @Mapping(source = "content.anchorName", target = "anchor") + @Mapping(source = "content.data", target = "payload") + @Mapping(expression = "java(null)", target = "createdTimestamp") + public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent); + + String map(final Data data) throws JsonProcessingException { + return data != null ? new ObjectMapper().writeValueAsString(data) : null; + } + + OffsetDateTime map(final String timestamp) { + return timestamp != null ? OffsetDateTime.parse(timestamp, ISO_TIMESTAMP_FORMATTER) : null; + } + +} diff --git a/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java new file mode 100644 index 0000000..fae95cc --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.controller.web; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * Controller for query requests. + */ +@RestController +public class QueryController { + + @GetMapping("/") + public String home() { + return "Welcome to CPS Temporal Service!"; + } + +} diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java index aa2ce95..1537e4a 100644 --- a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java +++ b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java @@ -48,27 +48,30 @@ import org.hibernate.annotations.TypeDef; @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class) public class NetworkData implements Serializable { - private static final long serialVersionUID = -8032810412816532433L; + private static final long serialVersionUID = 8886477871334560919L; @Id + @NotNull @Column private OffsetDateTime observedTimestamp; @Id + @NotNull @Column private String dataspace; @Id + @NotNull @Column private String anchor; @NotNull - @Column + @Column(updatable = false) private String schemaSet; @NotNull @Type(type = "jsonb") - @Column(columnDefinition = "jsonb") + @Column(columnDefinition = "jsonb", updatable = false) private String payload; @CreationTimestamp diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java index e9742e2..18c4dcf 100644 --- a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java +++ b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java @@ -32,10 +32,10 @@ import lombok.NoArgsConstructor; @EqualsAndHashCode public class NetworkDataId implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -1039604338648260766L; + private OffsetDateTime observedTimestamp; private String dataspace; private String anchor; - private OffsetDateTime observedTimestamp; -} \ No newline at end of file +} diff --git a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java index 2e7afb2..687ba85 100644 --- a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java +++ b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java @@ -18,24 +18,39 @@ package org.onap.cps.temporal.service; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.onap.cps.temporal.domain.NetworkData; +import org.onap.cps.temporal.domain.NetworkDataId; import org.onap.cps.temporal.repository.NetworkDataRepository; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; /** * Service implementation for Network Data. */ -@Component +@Service @Slf4j public class NetworkDataServiceImpl implements NetworkDataService { - @Autowired - NetworkDataRepository networkDataRepository; + private final NetworkDataRepository networkDataRepository; + + public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository) { + this.networkDataRepository = networkDataRepository; + } @Override public NetworkData addNetworkData(final NetworkData networkData) { - return networkDataRepository.save(networkData); + final var savedNetworkData = networkDataRepository.save(networkData); + if (savedNetworkData.getCreatedTimestamp() == null) { + // Data already exists and can not be inserted + final var id = + new NetworkDataId( + networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor()); + final Optional existingNetworkData = networkDataRepository.findById(id); + throw new ServiceException( + "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null))); + } + return savedNetworkData; } + } diff --git a/src/main/java/org/onap/cps/temporal/service/ServiceException.java b/src/main/java/org/onap/cps/temporal/service/ServiceException.java new file mode 100644 index 0000000..b9d7184 --- /dev/null +++ b/src/main/java/org/onap/cps/temporal/service/ServiceException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.temporal.service; + +/** + * Class representing a service exception related to business error. + */ +public class ServiceException extends RuntimeException { + + /** + * Instantiate a service exception with the specified message. + * @param message the exception message + */ + public ServiceException(final String message) { + super(message); + } + +} diff --git a/src/main/resources/application-sasl-ssl-kafka.yml b/src/main/resources/application-sasl-ssl-kafka.yml new file mode 100644 index 0000000..fdc6458 --- /dev/null +++ b/src/main/resources/application-sasl-ssl-kafka.yml @@ -0,0 +1,31 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 Bell Canada. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +# Spring profile configuration for sasl ssl Kafka + +spring: + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} + security: + protocol: SASL_SSL + ssl: + trust-store-type: JKS + trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION} + trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD} + properties: + sasl.mechanism: SCRAM-SHA-512 + sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG} + ssl.endpoint.identification.algorithm: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d4c799c..5fe30b0 100755 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -27,10 +27,26 @@ spring: change-log: classpath:/db/changelog/changelog-master.xml jpa: properties: - hibernate: - dialect: org.hibernate.dialect.PostgreSQLDialect + hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect + hibernate.format_sql: true + hibernate.generate_statistics: false + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} + security: + protocol: PLAINTEXT + consumer: + group-id: ${KAFKA_CONSUMER_GROUP_ID:cps-temporal-group} + # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers + # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer + # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/ + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + properties: + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent -logging: - level: - org: - springframework: INFO +app: + listener: + data-updated: + topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..a75b7aa --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,43 @@ + + + + + + + %d - %highlight(%-5level) [%-20.20thread] %cyan(%logger{36}) - %msg%n + + + + + + + + + + + + + + + + + + + + -- cgit 1.2.3-korg