diff options
author | Bogumil Zebek <bogumil.zebek@nokia.com> | 2021-04-19 12:03:25 +0200 |
---|---|---|
committer | Zebek Bogumil <bogumil.zebek@nokia.com> | 2021-04-21 08:12:46 +0200 |
commit | 9dde0643293fa5c06e845b2a8121486353e8e81c (patch) | |
tree | 1e07f076a7fa15d5fb295b44108a1b732ffa9257 /src/main/java/org/onap/avcnmanager/config | |
parent | 7e2c97281cf40738ff471a49ca26f044d65f105c (diff) |
Add healthcheck endpoint1.0.1
Issue-ID: INT-1869
Signed-off-by: Zebek Bogumil <bogumil.zebek@nokia.com>
Change-Id: Icfc40347deca1dc21233e84732cbd0d7e97b5836
Diffstat (limited to 'src/main/java/org/onap/avcnmanager/config')
-rw-r--r-- | src/main/java/org/onap/avcnmanager/config/AVCNConfig.java | 27 |
1 files changed, 11 insertions, 16 deletions
diff --git a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java index 35273ff..a2682b5 100644 --- a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java +++ b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java @@ -21,16 +21,15 @@ package org.onap.avcnmanager.config; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.kstream.KStream; -import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.data.ChangePack; +import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.serializers.ChangePackDeserializer; import org.onap.avcnmanager.message.serializers.ChangePackSerializer; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -41,6 +40,9 @@ import java.util.Properties; @Configuration public class AVCNConfig { + private static final int RETRIES_CONFIG = 5; + private static final int RETRY_BACKOFF_MS = 60000; + @Bean(name = "AVCNProperties") public Properties getKafkaStreamProperties( @Value("${kafka.bootstrap-servers}") String bootstrapServer, @@ -51,6 +53,10 @@ public class AVCNConfig { properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ChangePackSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + properties.put(StreamsConfig.RETRIES_CONFIG, RETRIES_CONFIG); + properties.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + return properties; } @@ -65,25 +71,14 @@ public class AVCNConfig { return builder.build(); } - - @Bean - public KafkaStreams getKafkaStream( - @Qualifier("AVCNTopology") Topology topology, - @Qualifier("AVCNProperties") Properties properties - ) throws InterruptedException { - Thread.sleep(10000); - return new KafkaStreams(topology, properties); - } - - @Bean public RestTemplate getRestTemplate() { return new RestTemplate(); } - static public final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> { + public static final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> { public ChangePackSerde() { super(new ChangePackSerializer(), new ChangePackDeserializer()); } } -}
\ No newline at end of file +} |