aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml5
-rw-r--r--src/main/java/org/onap/avcnmanager/AvcnManagerController.java109
-rw-r--r--src/main/java/org/onap/avcnmanager/config/AVCNConfig.java27
-rw-r--r--src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java61
-rw-r--r--src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java53
5 files changed, 125 insertions, 130 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 6010f1a..f5551ce 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -16,6 +16,11 @@ services:
KAFKA_SOURCE_TOPIC: config
KAFKA_APPLICATION_ID: avcn-manager
REST_CLIENT_PNFSIMULATOR_ENDPOINT: http://ves-client:5000/simulator/start
+ healthcheck:
+ test: [ "CMD", "curl", "-f", "http://localhost:9090/healthcheck" ]
+ interval: 5s
+ timeout: 5s
+ retries: 5
depends_on:
- avcn-kafka
diff --git a/src/main/java/org/onap/avcnmanager/AvcnManagerController.java b/src/main/java/org/onap/avcnmanager/AvcnManagerController.java
new file mode 100644
index 0000000..0f90c1f
--- /dev/null
+++ b/src/main/java/org/onap/avcnmanager/AvcnManagerController.java
@@ -0,0 +1,109 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Simulator
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.avcnmanager;
+
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+
+@RestController
+@EnableScheduling
+public class AvcnManagerController {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvcnManagerController.class);
+
+ private final Topology topology;
+ private final Properties properties;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private KafkaStreams kafkaStreams;
+
+ public AvcnManagerController(@Qualifier("AVCNTopology") Topology topology,
+ @Qualifier("AVCNProperties") Properties properties) {
+ this.topology = topology;
+ this.properties = properties;
+ }
+
+ @GetMapping("/healthcheck")
+ public ResponseEntity healthcheck() {
+ if (kafkaStreams != null && kafkaStreams.state().isRunning()) {
+ return new ResponseEntity("Up", HttpStatus.OK);
+ } else {
+ return new ResponseEntity("Down", HttpStatus.EXPECTATION_FAILED);
+ }
+ }
+
+ @Scheduled(fixedDelay = 10000)
+ public void configureKafkaStream() {
+ try {
+ lock.lock();
+
+ if (kafkaStreams == null) {
+ startKafkaStream();
+ } else if (!kafkaStreams.state().isRunning()) {
+ LOGGER.info("KafkaStream is not running. Trying to restart ...");
+ kafkaStreams.close(Duration.ZERO);
+ startKafkaStream();
+ }
+ } catch (Exception e) {
+ LOGGER.info("Unable to configure and start KafkaStream.",e);
+ this.kafkaStreams = null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void startKafkaStream() {
+ LOGGER.info("Starting KafkaStream ...");
+ kafkaStreams = new KafkaStreams(topology, properties);
+ kafkaStreams.setUncaughtExceptionHandler(
+ (thread, throwable) -> System.out.printf(
+ "Error occurs during data KafkaStream processing. Thread: %s, Throwable: %s%n",
+ thread, throwable)
+ );
+ kafkaStreams.start();
+ }
+
+ @PreDestroy
+ public void preDestroy() {
+ try {
+ lock.lock();
+ if (kafkaStreams != null) {
+ kafkaStreams.close(Duration.ZERO);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
index 35273ff..a2682b5 100644
--- a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
+++ b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
@@ -21,16 +21,15 @@
package org.onap.avcnmanager.config;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
-import org.onap.avcnmanager.message.handlers.MessageHandler;
import org.onap.avcnmanager.message.data.ChangePack;
+import org.onap.avcnmanager.message.handlers.MessageHandler;
import org.onap.avcnmanager.message.serializers.ChangePackDeserializer;
import org.onap.avcnmanager.message.serializers.ChangePackSerializer;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -41,6 +40,9 @@ import java.util.Properties;
@Configuration
public class AVCNConfig {
+ private static final int RETRIES_CONFIG = 5;
+ private static final int RETRY_BACKOFF_MS = 60000;
+
@Bean(name = "AVCNProperties")
public Properties getKafkaStreamProperties(
@Value("${kafka.bootstrap-servers}") String bootstrapServer,
@@ -51,6 +53,10 @@ public class AVCNConfig {
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ChangePackSerde.class.getName());
+ properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
+ properties.put(StreamsConfig.RETRIES_CONFIG, RETRIES_CONFIG);
+ properties.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
+
return properties;
}
@@ -65,25 +71,14 @@ public class AVCNConfig {
return builder.build();
}
-
- @Bean
- public KafkaStreams getKafkaStream(
- @Qualifier("AVCNTopology") Topology topology,
- @Qualifier("AVCNProperties") Properties properties
- ) throws InterruptedException {
- Thread.sleep(10000);
- return new KafkaStreams(topology, properties);
- }
-
-
@Bean
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
- static public final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> {
+ public static final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> {
public ChangePackSerde() {
super(new ChangePackSerializer(), new ChangePackDeserializer());
}
}
-} \ No newline at end of file
+}
diff --git a/src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java b/src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java
deleted file mode 100644
index dfc7912..0000000
--- a/src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Simulator
- * ================================================================================
- * Copyright (C) 2021 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.avcnmanager.kafka.stream;
-
-import org.apache.kafka.streams.KafkaStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-
-
-@Component
-public class AVCNKafkaStream {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AVCNKafkaStream.class);
-
- private final KafkaStreams streams;
-
- @Autowired
- AVCNKafkaStream(KafkaStreams streams) {
- this.streams = streams;
- streams.setUncaughtExceptionHandler(this::handleExceptionInStreams);
- }
-
- @PostConstruct
- void startKafkaStream() {
- streams.start();
- }
-
- private void handleExceptionInStreams(Thread thread,Throwable throwable) {
- LOGGER.warn("Exception occurred int kafka stream: " + thread);
- LOGGER.debug(throwable.getMessage());
- if(!streams.state().isRunning()) {
- LOGGER.error("Kafka stream stop running, state: " + streams.state());
- streams.close();
- System.exit(1);
- }
- }
-}
-
-
diff --git a/src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java b/src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java
deleted file mode 100644
index 2d30f92..0000000
--- a/src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Simulator
- * ================================================================================
- * Copyright (C) 2021 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.avcnmanager.kafka.stream;
-
-import org.apache.kafka.streams.KafkaStreams;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-class AVCNKafkaStreamTest {
-
-
- @Mock
- private KafkaStreams streams;
-
- @BeforeEach
- void setUp() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- void validateAVCNKafkaStream() {
- AVCNKafkaStream stream = new AVCNKafkaStream(streams);
- stream.startKafkaStream();
-
- verify(streams, times(1) ).setUncaughtExceptionHandler(any(Thread.UncaughtExceptionHandler.class));
- verify(streams, times(1) ).start();
- }
-
-}