aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/avcnmanager/config/AVCNConfig.java')
-rw-r--r--src/main/java/org/onap/avcnmanager/config/AVCNConfig.java27
1 files changed, 11 insertions, 16 deletions
diff --git a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
index 35273ff..a2682b5 100644
--- a/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
+++ b/src/main/java/org/onap/avcnmanager/config/AVCNConfig.java
@@ -21,16 +21,15 @@
package org.onap.avcnmanager.config;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
-import org.onap.avcnmanager.message.handlers.MessageHandler;
import org.onap.avcnmanager.message.data.ChangePack;
+import org.onap.avcnmanager.message.handlers.MessageHandler;
import org.onap.avcnmanager.message.serializers.ChangePackDeserializer;
import org.onap.avcnmanager.message.serializers.ChangePackSerializer;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -41,6 +40,9 @@ import java.util.Properties;
@Configuration
public class AVCNConfig {
+ private static final int RETRIES_CONFIG = 5;
+ private static final int RETRY_BACKOFF_MS = 60000;
+
@Bean(name = "AVCNProperties")
public Properties getKafkaStreamProperties(
@Value("${kafka.bootstrap-servers}") String bootstrapServer,
@@ -51,6 +53,10 @@ public class AVCNConfig {
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ChangePackSerde.class.getName());
+ properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
+ properties.put(StreamsConfig.RETRIES_CONFIG, RETRIES_CONFIG);
+ properties.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
+
return properties;
}
@@ -65,25 +71,14 @@ public class AVCNConfig {
return builder.build();
}
-
- @Bean
- public KafkaStreams getKafkaStream(
- @Qualifier("AVCNTopology") Topology topology,
- @Qualifier("AVCNProperties") Properties properties
- ) throws InterruptedException {
- Thread.sleep(10000);
- return new KafkaStreams(topology, properties);
- }
-
-
@Bean
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
- static public final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> {
+ public static final class ChangePackSerde extends Serdes.WrapperSerde<ChangePack> {
public ChangePackSerde() {
super(new ChangePackSerializer(), new ChangePackDeserializer());
}
}
-} \ No newline at end of file
+}