From 6891c2008b93e09f5a67e45b859696cd13466f8a Mon Sep 17 00:00:00 2001 From: Daniel Silverthorn Date: Fri, 25 May 2018 16:09:38 -0400 Subject: Remove cambria client dependency Change-Id: I9760839ae44df851640b271d032a39f4bb3691c2 Issue-ID: AAI-1182 Signed-off-by: Daniel Silverthorn --- pom.xml | 28 ++-- .../onap/aai/event/AbstractEventBusEndpoint.java | 43 +++++ .../org/onap/aai/event/DMaaPEventBusComponent.java | 49 ++++++ .../org/onap/aai/event/DMaaPEventBusEndpoint.java | 184 +++++++++++++++++++++ .../java/org/onap/aai/event/EventBusComponent.java | 49 ------ .../java/org/onap/aai/event/EventBusConsumer.java | 52 ++---- .../java/org/onap/aai/event/EventBusEndpoint.java | 161 ------------------ .../java/org/onap/aai/event/EventBusProducer.java | 4 +- .../org/onap/aai/event/KafkaEventBusComponent.java | 49 ++++++ .../org/onap/aai/event/KafkaEventBusEndpoint.java | 129 +++++++++++++++ .../org/apache/camel/component/dmaap-event-bus | 1 + .../services/org/apache/camel/component/event-bus | 1 - .../org/apache/camel/component/kafka-event-bus | 1 + src/test/java/org/onap/aai/event/EventBusTest.java | 39 +++-- 14 files changed, 503 insertions(+), 287 deletions(-) create mode 100644 src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java create mode 100644 src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java create mode 100644 src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java delete mode 100644 src/main/java/org/onap/aai/event/EventBusComponent.java delete mode 100644 src/main/java/org/onap/aai/event/EventBusEndpoint.java create mode 100644 src/main/java/org/onap/aai/event/KafkaEventBusComponent.java create mode 100644 src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java create mode 100644 src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus delete mode 100644 src/main/resources/META-INF/services/org/apache/camel/component/event-bus create mode 100644 src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus diff --git a/pom.xml b/pom.xml index 5ddef51..1cc8a5a 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,7 @@ limitations under the License. aai-router-core google_checks.xml + 1.3.0-SNAPSHOT java jacoco @@ -60,20 +61,21 @@ limitations under the License. common-logging 1.2.2 + + + org.onap.aai.event-client + event-client-api + ${event.client.version} + + + org.onap.aai.event-client + event-client-dmaap + ${event.client.version} + - com.att.nsa - cambriaClient - 0.0.1 - - - org.apache.httpcomponents - httpclient - - - org.apache.httpcomponents - httpclient-cache - - + org.onap.aai.event-client + event-client-kafka + ${event.client.version} diff --git a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java new file mode 100644 index 0000000..abbfe63 --- /dev/null +++ b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java @@ -0,0 +1,43 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.Component; +import org.apache.camel.impl.DefaultEndpoint; + +public abstract class AbstractEventBusEndpoint extends DefaultEndpoint { + public AbstractEventBusEndpoint() { + } + + public AbstractEventBusEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + public AbstractEventBusEndpoint(String endpointUri) { + super(endpointUri); + } + + abstract void close(); + abstract int getPollingDelay(); + abstract int getPoolSize(); + abstract String getEventTopic(); + +} diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java new file mode 100644 index 0000000..696e0f4 --- /dev/null +++ b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java @@ -0,0 +1,49 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; + +import java.util.Map; + +/** + * Represents the component that manages {@link DMaaPEventBusEndpoint}. + */ +public class DMaaPEventBusComponent extends UriEndpointComponent { + + public DMaaPEventBusComponent() { + super(DMaaPEventBusEndpoint.class); + } + + public DMaaPEventBusComponent(CamelContext context) { + super(context, DMaaPEventBusEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) + throws Exception { + Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java new file mode 100644 index 0000000..36e930a --- /dev/null +++ b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java @@ -0,0 +1,184 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.eclipse.jetty.util.security.Password; +import org.onap.aai.event.client.DMaaPEventConsumer; + +/** + * Represents a EventBus endpoint. + */ +@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name", + consumerClass = EventBusConsumer.class, title = "dmaap-event-bus") +public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint { + @UriPath + @Metadata(required = "true") + private String name; + + @UriParam(label = "eventTopic") + @Metadata(required = "true") + private String eventTopic; + @UriParam(label = "consumerGroup") + @Metadata(required = "true") + private String consumerGroup; + @UriParam(label = "consumerId") + @Metadata(required = "true") + private String consumerId; + @UriParam(label = "username") + private String username; + @UriParam(label = "password") + private String password; + @UriParam(label = "url") + @Metadata(required = "true") + private String url; + @UriParam(label = "poolSize") + @Metadata(required = "true", defaultValue="20") + private int poolSize = 20; + @UriParam(label = "pollingDelay") + @Metadata(required = "true", defaultValue="30000") + private int pollingDelay = 30000; + @UriParam(label = "transportType") + @Metadata(required = "true", defaultValue="HTTPAUTH") + private String transportType; + + private DMaaPEventConsumer dmaapConsumer; + + public DMaaPEventBusEndpoint() {} + + public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) { + super(uri, component); + } + + public DMaaPEventBusEndpoint(String endpointUri) { + super(endpointUri); + } + + @Override + void close() { + // Don't have to do anything for DMaaP + } + + @Override + public Producer createProducer() throws Exception { + return new EventBusProducer(this); + } + @Override + public Consumer createConsumer(Processor processor) throws Exception { + // TODO: other overloads based on filled-in properties + dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, Password.deobfuscate(username), Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType); + return new EventBusConsumer(this, processor, dmaapConsumer); + } + @Override + public boolean isSingleton() { + return false; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public String getEventTopic() { + return eventTopic; + } + + public void setEventTopic(String eventTopic) { + this.eventTopic = eventTopic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getConsumerId() { + return consumerId; + } + + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; + } + + public String getUsername() { + return username == null ? null : Password.deobfuscate(username); + //return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password == null ? null : Password.deobfuscate(password); + //return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolsize) { + this.poolSize = poolsize; + } + + public int getPollingDelay() { + return pollingDelay; + } + + public void setPollingDelay(int pollingDelay) { + this.pollingDelay = pollingDelay; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getTransportType() { + return transportType; + } + + public void setTransportType(String transportType) { + this.transportType = transportType; + } +} + diff --git a/src/main/java/org/onap/aai/event/EventBusComponent.java b/src/main/java/org/onap/aai/event/EventBusComponent.java deleted file mode 100644 index b257c7a..0000000 --- a/src/main/java/org/onap/aai/event/EventBusComponent.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.UriEndpointComponent; - -import java.util.Map; - -/** - * Represents the component that manages {@link EventBusEndpoint}. - */ -public class EventBusComponent extends UriEndpointComponent { - - public EventBusComponent() { - super(EventBusEndpoint.class); - } - - public EventBusComponent(CamelContext context) { - super(context, EventBusEndpoint.class); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map parameters) - throws Exception { - Endpoint endpoint = new EventBusEndpoint(uri, this); - setProperties(endpoint, parameters); - return endpoint; - } -} diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index a9d5478..e795e2c 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -20,9 +20,7 @@ */ package org.onap.aai.event; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; +import org.onap.aai.event.api.EventConsumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -33,10 +31,6 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -49,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer { private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class); private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class); - private final EventBusEndpoint endpoint; + private final AbstractEventBusEndpoint endpoint; - private CambriaConsumer consumer; + private EventConsumer consumer; /** * EventBusConsumer Constructor. */ - public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) { + public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) { super(endpoint, processor); super.setDelay(endpoint.getPollingDelay()); this.endpoint = endpoint; setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize())); - String[] urls = endpoint.getUrl().split(","); - - List urlList = null; - - if (urls != null) { - urlList = Arrays.asList(urls); - } - - try { - - ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder() - .usingHosts(urlList).onTopic(endpoint.getEventTopic()) - .knownAs(endpoint.getGroupName(), endpoint.getGroupId()); - - String apiKey = endpoint.getApiKey(); - String apiSecret = endpoint.getApiSecret(); - - if (apiKey != null && apiSecret != null) { - consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret()); - } - - consumer = consumerBuilder.build(); - - } catch (MalformedURLException | GeneralSecurityException e) { - logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage()); - } + this.consumer = consumer; } /** @@ -104,7 +73,8 @@ public class EventBusConsumer extends ScheduledPollConsumer { int processCount = 0; - Iterable messages = consumer.fetch(); + //Iterable messages = consumer.fetch(); + Iterable messages = consumer.consumeAndCommit(); String topic = endpoint.getEventTopic(); @@ -119,15 +89,15 @@ public class EventBusConsumer extends ScheduledPollConsumer { @Override protected void doStop() throws Exception { super.doStop(); - if (consumer != null) { - consumer.close(); + if (endpoint != null) { + endpoint.close(); } } @Override protected void doShutdown() throws Exception { super.doShutdown(); - if (consumer != null) { - consumer.close(); + if (endpoint != null) { + endpoint.close(); } } diff --git a/src/main/java/org/onap/aai/event/EventBusEndpoint.java b/src/main/java/org/onap/aai/event/EventBusEndpoint.java deleted file mode 100644 index ee7cba8..0000000 --- a/src/main/java/org/onap/aai/event/EventBusEndpoint.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - - -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriPath; -import org.eclipse.jetty.util.security.Password; - -/** - * Represents a EventBus endpoint. - */ -@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name", - consumerClass = EventBusConsumer.class, title = "event-bus") -public class EventBusEndpoint extends DefaultEndpoint { - @UriPath - @Metadata(required = "true") - private String name; - - @UriParam(label = "eventTopic") - @Metadata(required = "true") - private String eventTopic; - @UriParam(label = "groupName") - @Metadata(required = "true") - private String groupName; - @UriParam(label = "groupId") - @Metadata(required = "true") - private String groupId; - @UriParam(label = "apiKey") - private String apiKey; - @UriParam(label = "apiSecret") - private String apiSecret; - @UriParam(label = "url") - @Metadata(required = "true") - private String url; - @UriParam(label = "poolSize") - @Metadata(required = "true", defaultValue="20") - private int poolSize = 20; - @UriParam(label = "pollingDelay") - @Metadata(required = "true", defaultValue="30000") - private int pollingDelay = 30000; - - public EventBusEndpoint() {} - - public EventBusEndpoint(String uri, EventBusComponent component) { - super(uri, component); - } - - public EventBusEndpoint(String endpointUri) { - super(endpointUri); - } - @Override - public Producer createProducer() throws Exception { - return new EventBusProducer(this); - } - @Override - public Consumer createConsumer(Processor processor) throws Exception { - return new EventBusConsumer(this, processor); - } - @Override - public boolean isSingleton() { - return false; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public String getEventTopic() { - return eventTopic; - } - - public void setEventTopic(String eventTopic) { - this.eventTopic = eventTopic; - } - - public String getGroupName() { - return groupName; - } - - public void setGroupName(String groupName) { - this.groupName = groupName; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public String getApiKey() { - return apiKey == null ? null : Password.deobfuscate(apiKey); - } - - public void setApiKey(String apiKey) { - this.apiKey = apiKey; - } - - public String getApiSecret() { - return apiSecret == null ? null : Password.deobfuscate(apiSecret); - } - - public void setApiSecret(String apiSecret) { - this.apiSecret = apiSecret; - } - - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolsize) { - this.poolSize = poolsize; - } - - public int getPollingDelay() { - return pollingDelay; - } - - public void setPollingDelay(int pollingDelay) { - this.pollingDelay = pollingDelay; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } -} - diff --git a/src/main/java/org/onap/aai/event/EventBusProducer.java b/src/main/java/org/onap/aai/event/EventBusProducer.java index ec2a0da..dd8dbed 100644 --- a/src/main/java/org/onap/aai/event/EventBusProducer.java +++ b/src/main/java/org/onap/aai/event/EventBusProducer.java @@ -27,9 +27,9 @@ import org.apache.camel.impl.DefaultProducer; * The EventBus producer. */ public class EventBusProducer extends DefaultProducer { - private EventBusEndpoint endpoint; + private AbstractEventBusEndpoint endpoint; - public EventBusProducer(EventBusEndpoint endpoint) { + public EventBusProducer(AbstractEventBusEndpoint endpoint) { super(endpoint); this.endpoint = endpoint; } diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java new file mode 100644 index 0000000..07b4c82 --- /dev/null +++ b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java @@ -0,0 +1,49 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; + +import java.util.Map; + +/** + * Represents the component that manages {@link KafkaEventBusEndpoint}. + */ +public class KafkaEventBusComponent extends UriEndpointComponent { + + public KafkaEventBusComponent() { + super(KafkaEventBusEndpoint.class); + } + + public KafkaEventBusComponent(CamelContext context) { + super(context, KafkaEventBusEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) + throws Exception { + Endpoint endpoint = new KafkaEventBusEndpoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java new file mode 100644 index 0000000..4194c89 --- /dev/null +++ b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java @@ -0,0 +1,129 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.onap.aai.event.client.KafkaEventConsumer; + +/** + * Represents a EventBus endpoint. + */ +@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name", + consumerClass = EventBusConsumer.class, title = "kafka-event-bus") +public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint { + @UriParam(label = "url") + @Metadata(required = "true") + private String url; + @UriParam(label = "eventTopic") + @Metadata(required = "true") + private String eventTopic; + @UriParam(label = "consumerGroup") + @Metadata(required = "true") + private String consumerGroup; + @UriParam(label = "poolSize") + @Metadata(required = "true", defaultValue="20") + private int poolSize = 20; + @UriParam(label = "pollingDelay") + @Metadata(required = "true", defaultValue="30000") + private int pollingDelay = 30000; + + private KafkaEventConsumer consumer; + + public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) { + super(uri, component); + } + + @Override + public Producer createProducer() throws Exception { + return new EventBusProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup); + return new EventBusConsumer(this, processor, consumer); + } + + @Override + public boolean isSingleton() { + return false; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + @Override + String getEventTopic() { + return eventTopic; + } + + public void setEventTopic(String eventTopic) { + this.eventTopic = eventTopic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + @Override + int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + @Override + void close() { + consumer.close(); + } + + @Override + int getPollingDelay() { + return pollingDelay; + } + + public void setPollingDelay(int pollingDelay) { + this.pollingDelay = pollingDelay; + } + + + + + + +} diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus new file mode 100644 index 0000000..f1fee02 --- /dev/null +++ b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus @@ -0,0 +1 @@ +class=org.onap.aai.event.DMaaPEventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus deleted file mode 100644 index f795067..0000000 --- a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus +++ /dev/null @@ -1 +0,0 @@ -class=org.onap.aai.event.EventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus new file mode 100644 index 0000000..89f243d --- /dev/null +++ b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus @@ -0,0 +1 @@ +class=org.onap.aai.event.KafkaEventBusComponent diff --git a/src/test/java/org/onap/aai/event/EventBusTest.java b/src/test/java/org/onap/aai/event/EventBusTest.java index 8d294cd..1add1c0 100644 --- a/src/test/java/org/onap/aai/event/EventBusTest.java +++ b/src/test/java/org/onap/aai/event/EventBusTest.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashMap; -import java.util.Map; import org.apache.camel.Endpoint; import org.junit.Before; @@ -46,23 +45,23 @@ public class EventBusTest { @Test public void validateProducer() throws Exception { try { - EventBusComponent rc = new EventBusComponent(); - EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc); - endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + DMaaPEventBusComponent rc = new DMaaPEventBusComponent(); + DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc); + endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); endpoint.setEventTopic("eventTopic"); - endpoint.setGroupId("groupId"); - endpoint.setGroupName("gn"); + endpoint.setConsumerId("groupId"); + endpoint.setConsumerGroup("gn"); endpoint.setName("name"); endpoint.setPoolSize(45); endpoint.setPollingDelay(10); endpoint.setUrl("url"); - assertTrue(endpoint.getApiSecret().compareTo("onapSecret") == 0); - assertTrue(endpoint.getApiKey().compareTo("onapSecret") == 0); + assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0); + assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0); assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0); - assertTrue(endpoint.getGroupId().compareTo("groupId") == 0); - assertTrue(endpoint.getGroupName().compareTo("gn") == 0); + assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0); + assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0); assertTrue(endpoint.getName().compareTo("name") == 0); assertTrue(endpoint.getPoolSize() == 45); assertTrue(endpoint.getPollingDelay() == 10); @@ -84,7 +83,7 @@ public class EventBusTest { @Test public void validateEventBusComponent() throws Exception { - EventBusComponent rc = new EventBusComponent(new TestCamelContext()); + DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext()); Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap()); assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint")); } @@ -92,22 +91,22 @@ public class EventBusTest { @Test public void validateConsumer() throws Exception { try { - EventBusComponent rc = new EventBusComponent(); - EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc); + DMaaPEventBusComponent rc = new DMaaPEventBusComponent(); + DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc); - endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); endpoint.setEventTopic("eventTopic"); - endpoint.setGroupId("groupId"); - endpoint.setGroupName("gn"); + endpoint.setConsumerId("groupId"); + endpoint.setConsumerGroup("gn"); endpoint.setName("name"); endpoint.setPoolSize(45); endpoint.setPollingDelay(10); endpoint.setUrl("url"); TestProcessor processor = new TestProcessor(); - EventBusConsumer consumer = new EventBusConsumer(endpoint, processor); - + EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor); + } catch (Exception ex) { StringWriter writer = new StringWriter(); -- cgit 1.2.3-korg