aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
authorBruno Sakoto <bruno.sakoto@bell.ca>2021-06-04 07:49:14 -0400
committerBruno Sakoto <bruno.sakoto@bell.ca>2021-07-05 18:37:15 -0400
commitc9b99347c7c425fce7a1f5f3c7e2ac500f2f0c5c (patch)
tree199b49d2db987f2c630de124356a48b1bd97657a /src/main/java/org
parent8b2193b7ed06e2ee6a90f7986921e72ca70ad90f (diff)
Add kafka listener for data updated events
See "Running via Docker Compose" section from README.md file to have an example of event processing Issue-ID: CPS-371 Signed-off-by: Bruno Sakoto <bruno.sakoto@bell.ca> Change-Id: Id3abfa32fb04e07102a5f28e6e43a9b533391188
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java34
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java73
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java129
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java44
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java56
-rw-r--r--src/main/java/org/onap/cps/temporal/controller/web/QueryController.java (renamed from src/main/java/org/onap/cps/temporal/controller/QueryController.java)2
-rw-r--r--src/main/java/org/onap/cps/temporal/domain/NetworkData.java9
-rw-r--r--src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java6
-rw-r--r--src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java27
-rw-r--r--src/main/java/org/onap/cps/temporal/service/ServiceException.java34
10 files changed, 401 insertions, 13 deletions
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java
new file mode 100644
index 0000000..a9d1ce2
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.exception;
+
+/**
+ * Class representing a listener exception related to system event error.
+ */
+public class EventListenerException extends RuntimeException {
+
+ public EventListenerException(final String message) {
+ super(message);
+ }
+
+ public EventListenerException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java
new file mode 100644
index 0000000..df4e756
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.exception;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Class representing an invalid event envelop exception.
+ */
+@Getter
+public class InvalidEventEnvelopException extends EventListenerException {
+
+ private final List<InvalidField> invalidFields = new ArrayList<>();
+
+ public InvalidEventEnvelopException(final String message) {
+ super(message);
+ }
+
+ public void addInvalidField(final InvalidField invalidField) {
+ this.invalidFields.add(invalidField);
+ }
+
+ public boolean hasInvalidFields() {
+ return ! this.invalidFields.isEmpty();
+ }
+
+ @Override
+ public String getMessage() {
+ return String.format("%s. invalidFields: %s", super.getMessage(), this.invalidFields.toString());
+ }
+
+ @AllArgsConstructor
+ @Getter
+ @EqualsAndHashCode
+ @ToString
+ public static class InvalidField implements Serializable {
+
+ private static final long serialVersionUID = -7118283787669377391L;
+
+ private final ErrorType errorType;
+ private final String fieldName;
+ private final String actualValue;
+ private final String expectedValue;
+
+ public enum ErrorType {
+ UNEXPECTED, MISSING
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
new file mode 100644
index 0000000..79c9d92
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
@@ -0,0 +1,129 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka;
+
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING;
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.event.model.CpsDataUpdatedEvent;
+import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException;
+import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException;
+import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper;
+import org.onap.cps.temporal.service.NetworkDataService;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+/**
+ * Listener for data updated events.
+ */
+@Component
+@Slf4j
+public class DataUpdatedEventListener {
+
+ private static final URI EVENT_SOURCE;
+
+ static {
+ try {
+ EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
+ } catch (final URISyntaxException e) {
+ throw new EventListenerException("Invalid URI for event source.", e);
+ }
+ }
+
+ private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
+
+ private final NetworkDataService networkDataService;
+ private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper;
+
+ /**
+ * Constructor.
+ */
+ public DataUpdatedEventListener(
+ final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) {
+ this.networkDataService = networkDataService;
+ this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper;
+ }
+
+ /**
+ * Consume the specified event.
+ *
+ * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
+ */
+ @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
+ public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+
+ log.debug("Receiving {} ...", cpsDataUpdatedEvent);
+
+ // Validate event envelop
+ validateEventEnvelop(cpsDataUpdatedEvent);
+
+ // Map event to entity
+ final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
+ log.debug("Persisting {} ...", networkData);
+
+ // Persist entity
+ final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
+ log.debug("Persisted {}", persistedNetworkData);
+
+ }
+
+ private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+
+ final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure");
+
+ // Schema
+ if (cpsDataUpdatedEvent.getSchema() == null) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ MISSING, "schema", null,
+ CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
+ .value()));
+ }
+ // Id
+ if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ MISSING, "id", null, null));
+ }
+ // Source
+ if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "source",
+ cpsDataUpdatedEvent.getSource() != null
+ ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString()));
+ }
+ // Type
+ if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) {
+ invalidEventEnvelopException.addInvalidField(
+ new InvalidEventEnvelopException.InvalidField(
+ UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE));
+ }
+
+ if (invalidEventEnvelopException.hasInvalidFields()) {
+ throw invalidEventEnvelopException;
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java
new file mode 100644
index 0000000..7a4ee7f
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.listener.KafkaListenerErrorHandler;
+import org.springframework.kafka.listener.ListenerExecutionFailedException;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+/**
+ * Class responsible to handle errors for data updated event listener.
+ */
+@Component
+@Slf4j
+class DataUpdatedEventListenerErrorHandler implements KafkaListenerErrorHandler {
+
+ @Override
+ public Object handleError(final Message<?> message, final ListenerExecutionFailedException exception) {
+ log.error(
+ "Failed to process message {}. Error cause is {}.",
+ message,
+ exception.getCause() != null ? exception.getCause().toString() : null,
+ exception);
+ return exception;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
new file mode 100644
index 0000000..9ef25d5
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.onap.cps.event.model.CpsDataUpdatedEvent;
+import org.onap.cps.event.model.Data;
+import org.onap.cps.temporal.domain.NetworkData;
+
+/**
+ * Mapper for data updated event schema.
+ */
+@Mapper(componentModel = "spring")
+public abstract class CpsDataUpdatedEventMapper {
+
+ private static final DateTimeFormatter ISO_TIMESTAMP_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ @Mapping(source = "content.observedTimestamp", target = "observedTimestamp")
+ @Mapping(source = "content.dataspaceName", target = "dataspace")
+ @Mapping(source = "content.schemaSetName", target = "schemaSet")
+ @Mapping(source = "content.anchorName", target = "anchor")
+ @Mapping(source = "content.data", target = "payload")
+ @Mapping(expression = "java(null)", target = "createdTimestamp")
+ public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent);
+
+ String map(final Data data) throws JsonProcessingException {
+ return data != null ? new ObjectMapper().writeValueAsString(data) : null;
+ }
+
+ OffsetDateTime map(final String timestamp) {
+ return timestamp != null ? OffsetDateTime.parse(timestamp, ISO_TIMESTAMP_FORMATTER) : null;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/QueryController.java b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java
index d083dc9..fae95cc 100644
--- a/src/main/java/org/onap/cps/temporal/controller/QueryController.java
+++ b/src/main/java/org/onap/cps/temporal/controller/web/QueryController.java
@@ -16,7 +16,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.temporal.controller;
+package org.onap.cps.temporal.controller.web;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
index aa2ce95..1537e4a 100644
--- a/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
+++ b/src/main/java/org/onap/cps/temporal/domain/NetworkData.java
@@ -48,27 +48,30 @@ import org.hibernate.annotations.TypeDef;
@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
public class NetworkData implements Serializable {
- private static final long serialVersionUID = -8032810412816532433L;
+ private static final long serialVersionUID = 8886477871334560919L;
@Id
+ @NotNull
@Column
private OffsetDateTime observedTimestamp;
@Id
+ @NotNull
@Column
private String dataspace;
@Id
+ @NotNull
@Column
private String anchor;
@NotNull
- @Column
+ @Column(updatable = false)
private String schemaSet;
@NotNull
@Type(type = "jsonb")
- @Column(columnDefinition = "jsonb")
+ @Column(columnDefinition = "jsonb", updatable = false)
private String payload;
@CreationTimestamp
diff --git a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
index e9742e2..18c4dcf 100644
--- a/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
+++ b/src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
@@ -32,10 +32,10 @@ import lombok.NoArgsConstructor;
@EqualsAndHashCode
public class NetworkDataId implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -1039604338648260766L;
+ private OffsetDateTime observedTimestamp;
private String dataspace;
private String anchor;
- private OffsetDateTime observedTimestamp;
-} \ No newline at end of file
+}
diff --git a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
index 2e7afb2..687ba85 100644
--- a/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
+++ b/src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
@@ -18,24 +18,39 @@
package org.onap.cps.temporal.service;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.temporal.domain.NetworkData;
+import org.onap.cps.temporal.domain.NetworkDataId;
import org.onap.cps.temporal.repository.NetworkDataRepository;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
/**
* Service implementation for Network Data.
*/
-@Component
+@Service
@Slf4j
public class NetworkDataServiceImpl implements NetworkDataService {
- @Autowired
- NetworkDataRepository networkDataRepository;
+ private final NetworkDataRepository networkDataRepository;
+
+ public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository) {
+ this.networkDataRepository = networkDataRepository;
+ }
@Override
public NetworkData addNetworkData(final NetworkData networkData) {
- return networkDataRepository.save(networkData);
+ final var savedNetworkData = networkDataRepository.save(networkData);
+ if (savedNetworkData.getCreatedTimestamp() == null) {
+ // Data already exists and can not be inserted
+ final var id =
+ new NetworkDataId(
+ networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
+ final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id);
+ throw new ServiceException(
+ "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
+ }
+ return savedNetworkData;
}
+
}
diff --git a/src/main/java/org/onap/cps/temporal/service/ServiceException.java b/src/main/java/org/onap/cps/temporal/service/ServiceException.java
new file mode 100644
index 0000000..b9d7184
--- /dev/null
+++ b/src/main/java/org/onap/cps/temporal/service/ServiceException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.service;
+
+/**
+ * Class representing a service exception related to business error.
+ */
+public class ServiceException extends RuntimeException {
+
+ /**
+ * Instantiate a service exception with the specified message.
+ * @param message the exception message
+ */
+ public ServiceException(final String message) {
+ super(message);
+ }
+
+}