diff options
5 files changed, 125 insertions, 130 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 6010f1a..f5551ce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,11 @@ services: KAFKA_SOURCE_TOPIC: config KAFKA_APPLICATION_ID: avcn-manager REST_CLIENT_PNFSIMULATOR_ENDPOINT: http://ves-client:5000/simulator/start + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9090/healthcheck" ] + interval: 5s + timeout: 5s + retries: 5 depends_on: - avcn-kafka diff --git a/src/main/java/org/onap/avcnmanager/AvcnManagerController.java b/src/main/java/org/onap/avcnmanager/AvcnManagerController.java new file mode 100644 index 0000000..0f90c1f --- /dev/null +++ b/src/main/java/org/onap/avcnmanager/AvcnManagerController.java @@ -0,0 +1,109 @@ +/*- + * ============LICENSE_START======================================================= + * Simulator + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.avcnmanager; + + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; + +@RestController +@EnableScheduling +public class AvcnManagerController { + + private static final Logger LOGGER = LoggerFactory.getLogger(AvcnManagerController.class); + + private final Topology topology; + private final Properties properties; + + private final ReentrantLock lock = new ReentrantLock(); + private KafkaStreams kafkaStreams; + + public AvcnManagerController(@Qualifier("AVCNTopology") Topology topology, + @Qualifier("AVCNProperties") Properties properties) { + this.topology = topology; + this.properties = properties; + } + + @GetMapping("/healthcheck") + public ResponseEntity healthcheck() { + if (kafkaStreams != null && kafkaStreams.state().isRunning()) { + return new ResponseEntity("Up", HttpStatus.OK); + } else { + return new ResponseEntity("Down", HttpStatus.EXPECTATION_FAILED); + } + } + + @Scheduled(fixedDelay = 10000) + public void configureKafkaStream() { + try { + lock.lock(); + + if (kafkaStreams == null) { + startKafkaStream(); + } else if (!kafkaStreams.state().isRunning()) { + LOGGER.info("KafkaStream is not running. Trying to restart ..."); + kafkaStreams.close(Duration.ZERO); + startKafkaStream(); + } + } catch (Exception e) { + LOGGER.info("Unable to configure and start KafkaStream.",e); + this.kafkaStreams = null; + } finally { + lock.unlock(); + } + } + + private void startKafkaStream() { + LOGGER.info("Starting KafkaStream ..."); + kafkaStreams = new KafkaStreams(topology, properties); + kafkaStreams.setUncaughtExceptionHandler( + (thread, throwable) -> System.out.printf( + "Error occurs during data KafkaStream processing. Thread: %s, Throwable: %s%n", + thread, throwable) + ); + kafkaStreams.start(); + } + + @PreDestroy + public void preDestroy() { + try { + lock.lock(); + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ZERO); + } + } finally { + lock.unlock(); + } + } +} diff --git a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java index 35273ff..a2682b5 100644 --- a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java +++ b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java @@ -21,16 +21,15 @@ package org.onap.avcnmanager.config; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.kstream.KStream; -import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.data.ChangePack; +import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.serializers.ChangePackDeserializer; import org.onap.avcnmanager.message.serializers.ChangePackSerializer; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -41,6 +40,9 @@ import java.util.Properties; @Configuration public class AVCNConfig { + private static final int RETRIES_CONFIG = 5; + private static final int RETRY_BACKOFF_MS = 60000; + @Bean(name = "AVCNProperties") public Properties getKafkaStreamProperties( @Value("${kafka.bootstrap-servers}") String bootstrapServer, @@ -51,6 +53,10 @@ public class AVCNConfig { properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ChangePackSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + properties.put(StreamsConfig.RETRIES_CONFIG, RETRIES_CONFIG); + properties.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + return properties; } @@ -65,25 +71,14 @@ public class AVCNConfig { return builder.build(); } - - @Bean - public KafkaStreams getKafkaStream( - @Qualifier("AVCNTopology") Topology topology, - @Qualifier("AVCNProperties") Properties properties - ) throws InterruptedException { - Thread.sleep(10000); - return new KafkaStreams(topology, properties); - } - - @Bean public RestTemplate getRestTemplate() { return new RestTemplate(); } - static public final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> { + public static final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> { public ChangePackSerde() { super(new ChangePackSerializer(), new ChangePackDeserializer()); } } -}
\ No newline at end of file +} diff --git a/src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java b/src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java deleted file mode 100644 index dfc7912..0000000 --- a/src/main/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStream.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2021 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.avcnmanager.kafka.stream; - -import org.apache.kafka.streams.KafkaStreams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - - -@Component -public class AVCNKafkaStream { - - private static final Logger LOGGER = LoggerFactory.getLogger(AVCNKafkaStream.class); - - private final KafkaStreams streams; - - @Autowired - AVCNKafkaStream(KafkaStreams streams) { - this.streams = streams; - streams.setUncaughtExceptionHandler(this::handleExceptionInStreams); - } - - @PostConstruct - void startKafkaStream() { - streams.start(); - } - - private void handleExceptionInStreams(Thread thread,Throwable throwable) { - LOGGER.warn("Exception occurred int kafka stream: " + thread); - LOGGER.debug(throwable.getMessage()); - if(!streams.state().isRunning()) { - LOGGER.error("Kafka stream stop running, state: " + streams.state()); - streams.close(); - System.exit(1); - } - } -} - - diff --git a/src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java b/src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java deleted file mode 100644 index 2d30f92..0000000 --- a/src/test/java/org/onap/avcnmanager/kafka/stream/AVCNKafkaStreamTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2021 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.avcnmanager.kafka.stream; - -import org.apache.kafka.streams.KafkaStreams; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -class AVCNKafkaStreamTest { - - - @Mock - private KafkaStreams streams; - - @BeforeEach - void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - void validateAVCNKafkaStream() { - AVCNKafkaStream stream = new AVCNKafkaStream(streams); - stream.startKafkaStream(); - - verify(streams, times(1) ).setUncaughtExceptionHandler(any(Thread.UncaughtExceptionHandler.class)); - verify(streams, times(1) ).start(); - } - -} |