diff options
author | Bogumil Zebek <bogumil.zebek@nokia.com> | 2021-04-19 12:03:25 +0200 |
---|---|---|
committer | Zebek Bogumil <bogumil.zebek@nokia.com> | 2021-04-21 08:12:46 +0200 |
commit | 9dde0643293fa5c06e845b2a8121486353e8e81c (patch) | |
tree | 1e07f076a7fa15d5fb295b44108a1b732ffa9257 /src/main/java/org/onap/avcnmanager/AvcnManagerController.java | |
parent | 7e2c97281cf40738ff471a49ca26f044d65f105c (diff) |
Add healthcheck endpoint1.0.1
Issue-ID: INT-1869
Signed-off-by: Zebek Bogumil <bogumil.zebek@nokia.com>
Change-Id: Icfc40347deca1dc21233e84732cbd0d7e97b5836
Diffstat (limited to 'src/main/java/org/onap/avcnmanager/AvcnManagerController.java')
-rw-r--r-- | src/main/java/org/onap/avcnmanager/AvcnManagerController.java | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/src/main/java/org/onap/avcnmanager/AvcnManagerController.java b/src/main/java/org/onap/avcnmanager/AvcnManagerController.java new file mode 100644 index 0000000..0f90c1f --- /dev/null +++ b/src/main/java/org/onap/avcnmanager/AvcnManagerController.java @@ -0,0 +1,109 @@ +/*- + * ============LICENSE_START======================================================= + * Simulator + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.avcnmanager; + + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; + +@RestController +@EnableScheduling +public class AvcnManagerController { + + private static final Logger LOGGER = LoggerFactory.getLogger(AvcnManagerController.class); + + private final Topology topology; + private final Properties properties; + + private final ReentrantLock lock = new ReentrantLock(); + private KafkaStreams kafkaStreams; + + public AvcnManagerController(@Qualifier("AVCNTopology") Topology topology, + @Qualifier("AVCNProperties") Properties properties) { + this.topology = topology; + this.properties = properties; + } + + @GetMapping("/healthcheck") + public ResponseEntity healthcheck() { + if (kafkaStreams != null && kafkaStreams.state().isRunning()) { + return new ResponseEntity("Up", HttpStatus.OK); + } else { + return new ResponseEntity("Down", HttpStatus.EXPECTATION_FAILED); + } + } + + @Scheduled(fixedDelay = 10000) + public void configureKafkaStream() { + try { + lock.lock(); + + if (kafkaStreams == null) { + startKafkaStream(); + } else if (!kafkaStreams.state().isRunning()) { + LOGGER.info("KafkaStream is not running. Trying to restart ..."); + kafkaStreams.close(Duration.ZERO); + startKafkaStream(); + } + } catch (Exception e) { + LOGGER.info("Unable to configure and start KafkaStream.",e); + this.kafkaStreams = null; + } finally { + lock.unlock(); + } + } + + private void startKafkaStream() { + LOGGER.info("Starting KafkaStream ..."); + kafkaStreams = new KafkaStreams(topology, properties); + kafkaStreams.setUncaughtExceptionHandler( + (thread, throwable) -> System.out.printf( + "Error occurs during data KafkaStream processing. Thread: %s, Throwable: %s%n", + thread, throwable) + ); + kafkaStreams.start(); + } + + @PreDestroy + public void preDestroy() { + try { + lock.lock(); + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ZERO); + } + } finally { + lock.unlock(); + } + } +} |