From 9dde0643293fa5c06e845b2a8121486353e8e81c Mon Sep 17 00:00:00 2001 From: Bogumil Zebek Date: Mon, 19 Apr 2021 12:03:25 +0200 Subject: Add healthcheck endpoint Issue-ID: INT-1869 Signed-off-by: Zebek Bogumil Change-Id: Icfc40347deca1dc21233e84732cbd0d7e97b5836 --- .../org/onap/avcnmanager/config/AVCNConfig.java | 27 +++++++++------------- 1 file changed, 11 insertions(+), 16 deletions(-) (limited to 'src/main/java/org/onap/avcnmanager/config') diff --git a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java index 35273ff..a2682b5 100644 --- a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java +++ b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java @@ -21,16 +21,15 @@ package org.onap.avcnmanager.config; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.kstream.KStream; -import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.data.ChangePack; +import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.serializers.ChangePackDeserializer; import org.onap.avcnmanager.message.serializers.ChangePackSerializer; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -41,6 +40,9 @@ import java.util.Properties; @Configuration public class AVCNConfig { + private static final int RETRIES_CONFIG = 5; + private static final int RETRY_BACKOFF_MS = 60000; + @Bean(name = "AVCNProperties") public Properties getKafkaStreamProperties( @Value("${kafka.bootstrap-servers}") String bootstrapServer, @@ -51,6 +53,10 @@ public class AVCNConfig { properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ChangePackSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + properties.put(StreamsConfig.RETRIES_CONFIG, RETRIES_CONFIG); + properties.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + return properties; } @@ -65,25 +71,14 @@ public class AVCNConfig { return builder.build(); } - - @Bean - public KafkaStreams getKafkaStream( - @Qualifier("AVCNTopology") Topology topology, - @Qualifier("AVCNProperties") Properties properties - ) throws InterruptedException { - Thread.sleep(10000); - return new KafkaStreams(topology, properties); - } - - @Bean public RestTemplate getRestTemplate() { return new RestTemplate(); } - static public final class ChangePackSerde extends Serdes.WrapperSerde { + public static final class ChangePackSerde extends Serdes.WrapperSerde { public ChangePackSerde() { super(new ChangePackSerializer(), new ChangePackDeserializer()); } } -} \ No newline at end of file +} -- cgit 1.2.3-korg