diff options
Diffstat (limited to 'src/main/java/org/onap/avcnmanager/config')
-rw-r--r-- | src/main/java/org/onap/avcnmanager/config/AVCNConfig.java | 27 |
1 files changed, 11 insertions, 16 deletions
diff --git a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java index 35273ff..a2682b5 100644 --- a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java +++ b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java @@ -21,16 +21,15 @@ package org.onap.avcnmanager.config; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.kstream.KStream; -import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.data.ChangePack; +import org.onap.avcnmanager.message.handlers.MessageHandler; import org.onap.avcnmanager.message.serializers.ChangePackDeserializer; import org.onap.avcnmanager.message.serializers.ChangePackSerializer; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -41,6 +40,9 @@ import java.util.Properties; @Configuration public class AVCNConfig { + private static final int RETRIES_CONFIG = 5; + private static final int RETRY_BACKOFF_MS = 60000; + @Bean(name = "AVCNProperties") public Properties getKafkaStreamProperties( @Value("${kafka.bootstrap-servers}") String bootstrapServer, @@ -51,6 +53,10 @@ public class AVCNConfig { properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ChangePackSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + properties.put(StreamsConfig.RETRIES_CONFIG, RETRIES_CONFIG); + properties.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + return properties; } @@ -65,25 +71,14 @@ public class AVCNConfig { return builder.build(); } - - @Bean - public KafkaStreams getKafkaStream( - @Qualifier("AVCNTopology") Topology topology, - @Qualifier("AVCNProperties") Properties properties - ) throws InterruptedException { - Thread.sleep(10000); - return new KafkaStreams(topology, properties); - } - - @Bean public RestTemplate getRestTemplate() { return new RestTemplate(); } - static public final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> { + public static final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> { public ChangePackSerde() { super(new ChangePackSerializer(), new ChangePackDeserializer()); } } -}
\ No newline at end of file +} |