aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
blob: 94586e9bd735acc5da50329c8303ce4d8e447aab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//
// ============LICENSE_START=======================================================
//  Copyright (C) 2016-2018 Ericsson. All rights reserved.
//  Modifications Copyright (C) 2019 Nordix Foundation.
// ================================================================================
// This file is licensed under the CREATIVE COMMONS ATTRIBUTION 4.0 INTERNATIONAL LICENSE
// Full license text at https://creativecommons.org/licenses/by/4.0/legalcode
// 
// SPDX-License-Identifier: CC-BY-4.0
// ============LICENSE_END=========================================================
//
// @author Sven van der Meer (sven.van.der.meer@ericsson.com)
//

== Kafka IO

Kafka IO is supported by the APEX Kafka plugin.
The configurations below are examples.
APEX will take any configuration inside the parameter object and forward it to Kafka.
More information on Kafka specific configuration parameters can be found in the Kafka documentation:

* link:https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html[Kafka Consumer Class]
* link:https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html[Kafka Producer Class]



=== Kafka Input

APEX will receive events from the Apache Kafka messaging system.
The input is uni-directional, an engine will only receive events from the input but not send any event to the input.

[source%nowrap,json]
----
"carrierTechnologyParameters" : {
  "carrierTechnology" : "KAFKA", <1>
  "parameterClassName" :
    "org.onap.policy.apex.plugins.event.carrier.kafka.KAFKACarrierTechnologyParameters",
  "parameters" : {
    "bootstrapServers"  : "localhost:49092", <2>
    "groupId"           : "apex-group-id", <3>
    "enableAutoCommit"  : true, <4>
    "autoCommitTime"    : 1000, <5>
    "sessionTimeout"    : 30000, <6>
    "consumerPollTime"  : 100, <7>
    "consumerTopicList" : ["apex-in-0", "apex-in-1"], <8>
    "keyDeserializer"   :
        "org.apache.kafka.common.serialization.StringDeserializer", <9>
    "valueDeserializer" :
        "org.apache.kafka.common.serialization.StringDeserializer", <10>
    "kafkaProperties"   : [] <11>
  }
}
----

<1> set Kafka as carrier technology
<2> bootstrap server and port
<3> a group identifier
<4> flag for auto-commit
<5> auto-commit timeout in milliseconds
<6> session timeout in milliseconds
<7> consumer poll time in milliseconds
<8> consumer topic list
<9> key for the Kafka de-serializer
<10> value for the Kafka de-serializer
<11> an optional list of name value pairs of properties to be passed transparently to Kafka.
This field need not be specified, can be set to null, or to an empty list as here.


=== Kafka Output

APEX will send events to the Apache Kafka messaging system.
The output is uni-directional, an engine will send events to the output but not receive any event from the output.


[source%nowrap,json]
----
"carrierTechnologyParameters" : {
  "carrierTechnology" : "KAFKA", <1>
  "parameterClassName" :
    "org.onap.policy.apex.plugins.event.carrier.kafka.KAFKACarrierTechnologyParameters",
  "parameters" : {
    "bootstrapServers"  : "localhost:49092", <2>
    "acks"              : "all", <3>
    "retries"           : 0, <4>
    "batchSize"         : 16384, <5>
    "lingerTime"        : 1, <6>
    "bufferMemory"      : 33554432, <7>
    "producerTopic"     : "apex-out", <8>
    "keySerializer"     :
        "org.apache.kafka.common.serialization.StringSerializer", <9>
    "valueSerializer"   :
        "org.apache.kafka.common.serialization.StringSerializer", <10>
    "kafkaProperties": [ <11>
        ["message.max.bytes", 1000000],
        ["compression.codec", "none"]
    ]
  }
}
----

<1> set Kafka as carrier technology
<2> bootstrap server and port
<3> acknowledgement strategy
<4> number of retries
<5> batch size
<6> time to linger in milliseconds
<7> buffer memory in byte
<8> producer topic
<9> key for the Kafka serializer
<10> value for the Kafka serializer
<11> an optional list of name value pairs of properties to be passed transparently to Kafka. If a property appears in
the _kafkaProperties_ field and is also explicitly specified to a non-default value (such as _lingerTime_
and _linger.ms_) the explictly specified value of the property is used rather than the value specified in the
_kafkaProperties_ list.