diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java new file mode 100644 index 00000000..b564229b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation publishes events for the associated KAFKA topic, inline with the calling + * thread. + */ +public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink { + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + + /** + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned + * attributes. + * + * <p>servers list of KAFKA servers available for publishing + * topic the topic to publish to + * partitionId the partition key (optional, autogenerated if not provided) + * useHttps does connection use HTTPS? + * @param busTopicParams contains attributes needed + * @throws IllegalArgumentException if invalid arguments are detected + */ + public InlineKafkaTopicSink(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + /** + * Instantiation of internal resources. + */ + @Override + public void init() { + + this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.effectiveTopic) + .useHttps(this.useHttps) + .build()); + logger.info("{}: KAFKA SINK created", this); + } + + @Override + public String toString() { + return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + + super.toString() + "]"; + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.KAFKA; + } +} |