diff options
-rw-r--r-- | pom.xml | 28 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java | 43 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java (renamed from src/main/java/org/onap/aai/event/EventBusComponent.java) | 14 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java (renamed from src/main/java/org/onap/aai/event/EventBusEndpoint.java) | 87 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusConsumer.java | 52 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusProducer.java | 4 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/KafkaEventBusComponent.java | 49 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java | 129 | ||||
-rw-r--r-- | src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus | 1 | ||||
-rw-r--r-- | src/main/resources/META-INF/services/org/apache/camel/component/event-bus | 1 | ||||
-rw-r--r-- | src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus | 1 | ||||
-rw-r--r-- | src/test/java/org/onap/aai/event/EventBusTest.java | 39 |
12 files changed, 332 insertions, 116 deletions
@@ -34,6 +34,7 @@ limitations under the License. <name>aai-router-core</name> <properties> <checkstyle.config.location>google_checks.xml</checkstyle.config.location> + <event.client.version>1.3.0-SNAPSHOT</event.client.version> <!-- Sonar Properties --> <sonar.language>java</sonar.language> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> @@ -60,20 +61,21 @@ limitations under the License. <artifactId>common-logging</artifactId> <version>1.2.2</version> </dependency> + + <dependency> + <groupId>org.onap.aai.event-client</groupId> + <artifactId>event-client-api</artifactId> + <version>${event.client.version}</version> + </dependency> + <dependency> + <groupId>org.onap.aai.event-client</groupId> + <artifactId>event-client-dmaap</artifactId> + <version>${event.client.version}</version> + </dependency> <dependency> - <groupId>com.att.nsa</groupId> - <artifactId>cambriaClient</artifactId> - <version>0.0.1</version> - <exclusions> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient-cache</artifactId> - </exclusion> - </exclusions> + <groupId>org.onap.aai.event-client</groupId> + <artifactId>event-client-kafka</artifactId> + <version>${event.client.version}</version> </dependency> <dependency> diff --git a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java new file mode 100644 index 0000000..abbfe63 --- /dev/null +++ b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java @@ -0,0 +1,43 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.Component; +import org.apache.camel.impl.DefaultEndpoint; + +public abstract class AbstractEventBusEndpoint extends DefaultEndpoint { + public AbstractEventBusEndpoint() { + } + + public AbstractEventBusEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + public AbstractEventBusEndpoint(String endpointUri) { + super(endpointUri); + } + + abstract void close(); + abstract int getPollingDelay(); + abstract int getPoolSize(); + abstract String getEventTopic(); + +} diff --git a/src/main/java/org/onap/aai/event/EventBusComponent.java b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java index b257c7a..696e0f4 100644 --- a/src/main/java/org/onap/aai/event/EventBusComponent.java +++ b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java @@ -27,22 +27,22 @@ import org.apache.camel.impl.UriEndpointComponent; import java.util.Map; /** - * Represents the component that manages {@link EventBusEndpoint}. + * Represents the component that manages {@link DMaaPEventBusEndpoint}. */ -public class EventBusComponent extends UriEndpointComponent { +public class DMaaPEventBusComponent extends UriEndpointComponent { - public EventBusComponent() { - super(EventBusEndpoint.class); + public DMaaPEventBusComponent() { + super(DMaaPEventBusEndpoint.class); } - public EventBusComponent(CamelContext context) { - super(context, EventBusEndpoint.class); + public DMaaPEventBusComponent(CamelContext context) { + super(context, DMaaPEventBusEndpoint.class); } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Endpoint endpoint = new EventBusEndpoint(uri, this); + Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this); setProperties(endpoint, parameters); return endpoint; } diff --git a/src/main/java/org/onap/aai/event/EventBusEndpoint.java b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java index ee7cba8..36e930a 100644 --- a/src/main/java/org/onap/aai/event/EventBusEndpoint.java +++ b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java @@ -24,19 +24,19 @@ package org.onap.aai.event; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.eclipse.jetty.util.security.Password; +import org.onap.aai.event.client.DMaaPEventConsumer; /** * Represents a EventBus endpoint. */ -@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name", - consumerClass = EventBusConsumer.class, title = "event-bus") -public class EventBusEndpoint extends DefaultEndpoint { +@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name", + consumerClass = EventBusConsumer.class, title = "dmaap-event-bus") +public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint { @UriPath @Metadata(required = "true") private String name; @@ -44,16 +44,16 @@ public class EventBusEndpoint extends DefaultEndpoint { @UriParam(label = "eventTopic") @Metadata(required = "true") private String eventTopic; - @UriParam(label = "groupName") + @UriParam(label = "consumerGroup") @Metadata(required = "true") - private String groupName; - @UriParam(label = "groupId") + private String consumerGroup; + @UriParam(label = "consumerId") @Metadata(required = "true") - private String groupId; - @UriParam(label = "apiKey") - private String apiKey; - @UriParam(label = "apiSecret") - private String apiSecret; + private String consumerId; + @UriParam(label = "username") + private String username; + @UriParam(label = "password") + private String password; @UriParam(label = "url") @Metadata(required = "true") private String url; @@ -63,23 +63,36 @@ public class EventBusEndpoint extends DefaultEndpoint { @UriParam(label = "pollingDelay") @Metadata(required = "true", defaultValue="30000") private int pollingDelay = 30000; + @UriParam(label = "transportType") + @Metadata(required = "true", defaultValue="HTTPAUTH") + private String transportType; + + private DMaaPEventConsumer dmaapConsumer; - public EventBusEndpoint() {} + public DMaaPEventBusEndpoint() {} - public EventBusEndpoint(String uri, EventBusComponent component) { + public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) { super(uri, component); } - public EventBusEndpoint(String endpointUri) { + public DMaaPEventBusEndpoint(String endpointUri) { super(endpointUri); } + + @Override + void close() { + // Don't have to do anything for DMaaP + } + @Override public Producer createProducer() throws Exception { return new EventBusProducer(this); } @Override public Consumer createConsumer(Processor processor) throws Exception { - return new EventBusConsumer(this, processor); + // TODO: other overloads based on filled-in properties + dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, Password.deobfuscate(username), Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType); + return new EventBusConsumer(this, processor, dmaapConsumer); } @Override public boolean isSingleton() { @@ -102,36 +115,38 @@ public class EventBusEndpoint extends DefaultEndpoint { this.eventTopic = eventTopic; } - public String getGroupName() { - return groupName; + public String getConsumerGroup() { + return consumerGroup; } - public void setGroupName(String groupName) { - this.groupName = groupName; + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; } - public String getGroupId() { - return groupId; + public String getConsumerId() { + return consumerId; } - public void setGroupId(String groupId) { - this.groupId = groupId; + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; } - public String getApiKey() { - return apiKey == null ? null : Password.deobfuscate(apiKey); + public String getUsername() { + return username == null ? null : Password.deobfuscate(username); + //return username; } - public void setApiKey(String apiKey) { - this.apiKey = apiKey; + public void setUsername(String username) { + this.username = username; } - public String getApiSecret() { - return apiSecret == null ? null : Password.deobfuscate(apiSecret); + public String getPassword() { + return password == null ? null : Password.deobfuscate(password); + //return password; } - public void setApiSecret(String apiSecret) { - this.apiSecret = apiSecret; + public void setPassword(String password) { + this.password = password; } public int getPoolSize() { @@ -157,5 +172,13 @@ public class EventBusEndpoint extends DefaultEndpoint { public void setUrl(String url) { this.url = url; } + + public String getTransportType() { + return transportType; + } + + public void setTransportType(String transportType) { + this.transportType = transportType; + } } diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index a9d5478..e795e2c 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -20,9 +20,7 @@ */ package org.onap.aai.event; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; +import org.onap.aai.event.api.EventConsumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -33,10 +31,6 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -49,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer { private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class); private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class); - private final EventBusEndpoint endpoint; + private final AbstractEventBusEndpoint endpoint; - private CambriaConsumer consumer; + private EventConsumer consumer; /** * EventBusConsumer Constructor. */ - public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) { + public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) { super(endpoint, processor); super.setDelay(endpoint.getPollingDelay()); this.endpoint = endpoint; setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize())); - String[] urls = endpoint.getUrl().split(","); - - List<String> urlList = null; - - if (urls != null) { - urlList = Arrays.asList(urls); - } - - try { - - ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder() - .usingHosts(urlList).onTopic(endpoint.getEventTopic()) - .knownAs(endpoint.getGroupName(), endpoint.getGroupId()); - - String apiKey = endpoint.getApiKey(); - String apiSecret = endpoint.getApiSecret(); - - if (apiKey != null && apiSecret != null) { - consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret()); - } - - consumer = consumerBuilder.build(); - - } catch (MalformedURLException | GeneralSecurityException e) { - logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage()); - } + this.consumer = consumer; } /** @@ -104,7 +73,8 @@ public class EventBusConsumer extends ScheduledPollConsumer { int processCount = 0; - Iterable<String> messages = consumer.fetch(); + //Iterable<String> messages = consumer.fetch(); + Iterable<String> messages = consumer.consumeAndCommit(); String topic = endpoint.getEventTopic(); @@ -119,15 +89,15 @@ public class EventBusConsumer extends ScheduledPollConsumer { @Override protected void doStop() throws Exception { super.doStop(); - if (consumer != null) { - consumer.close(); + if (endpoint != null) { + endpoint.close(); } } @Override protected void doShutdown() throws Exception { super.doShutdown(); - if (consumer != null) { - consumer.close(); + if (endpoint != null) { + endpoint.close(); } } diff --git a/src/main/java/org/onap/aai/event/EventBusProducer.java b/src/main/java/org/onap/aai/event/EventBusProducer.java index ec2a0da..dd8dbed 100644 --- a/src/main/java/org/onap/aai/event/EventBusProducer.java +++ b/src/main/java/org/onap/aai/event/EventBusProducer.java @@ -27,9 +27,9 @@ import org.apache.camel.impl.DefaultProducer; * The EventBus producer. */ public class EventBusProducer extends DefaultProducer { - private EventBusEndpoint endpoint; + private AbstractEventBusEndpoint endpoint; - public EventBusProducer(EventBusEndpoint endpoint) { + public EventBusProducer(AbstractEventBusEndpoint endpoint) { super(endpoint); this.endpoint = endpoint; } diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java new file mode 100644 index 0000000..07b4c82 --- /dev/null +++ b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java @@ -0,0 +1,49 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; + +import java.util.Map; + +/** + * Represents the component that manages {@link KafkaEventBusEndpoint}. + */ +public class KafkaEventBusComponent extends UriEndpointComponent { + + public KafkaEventBusComponent() { + super(KafkaEventBusEndpoint.class); + } + + public KafkaEventBusComponent(CamelContext context) { + super(context, KafkaEventBusEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) + throws Exception { + Endpoint endpoint = new KafkaEventBusEndpoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java new file mode 100644 index 0000000..4194c89 --- /dev/null +++ b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java @@ -0,0 +1,129 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.onap.aai.event.client.KafkaEventConsumer; + +/** + * Represents a EventBus endpoint. + */ +@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name", + consumerClass = EventBusConsumer.class, title = "kafka-event-bus") +public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint { + @UriParam(label = "url") + @Metadata(required = "true") + private String url; + @UriParam(label = "eventTopic") + @Metadata(required = "true") + private String eventTopic; + @UriParam(label = "consumerGroup") + @Metadata(required = "true") + private String consumerGroup; + @UriParam(label = "poolSize") + @Metadata(required = "true", defaultValue="20") + private int poolSize = 20; + @UriParam(label = "pollingDelay") + @Metadata(required = "true", defaultValue="30000") + private int pollingDelay = 30000; + + private KafkaEventConsumer consumer; + + public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) { + super(uri, component); + } + + @Override + public Producer createProducer() throws Exception { + return new EventBusProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup); + return new EventBusConsumer(this, processor, consumer); + } + + @Override + public boolean isSingleton() { + return false; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + @Override + String getEventTopic() { + return eventTopic; + } + + public void setEventTopic(String eventTopic) { + this.eventTopic = eventTopic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + @Override + int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + @Override + void close() { + consumer.close(); + } + + @Override + int getPollingDelay() { + return pollingDelay; + } + + public void setPollingDelay(int pollingDelay) { + this.pollingDelay = pollingDelay; + } + + + + + + +} diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus new file mode 100644 index 0000000..f1fee02 --- /dev/null +++ b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus @@ -0,0 +1 @@ +class=org.onap.aai.event.DMaaPEventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus deleted file mode 100644 index f795067..0000000 --- a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus +++ /dev/null @@ -1 +0,0 @@ -class=org.onap.aai.event.EventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus new file mode 100644 index 0000000..89f243d --- /dev/null +++ b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus @@ -0,0 +1 @@ +class=org.onap.aai.event.KafkaEventBusComponent diff --git a/src/test/java/org/onap/aai/event/EventBusTest.java b/src/test/java/org/onap/aai/event/EventBusTest.java index 8d294cd..1add1c0 100644 --- a/src/test/java/org/onap/aai/event/EventBusTest.java +++ b/src/test/java/org/onap/aai/event/EventBusTest.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashMap; -import java.util.Map; import org.apache.camel.Endpoint; import org.junit.Before; @@ -46,23 +45,23 @@ public class EventBusTest { @Test public void validateProducer() throws Exception { try { - EventBusComponent rc = new EventBusComponent(); - EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc); - endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + DMaaPEventBusComponent rc = new DMaaPEventBusComponent(); + DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc); + endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); endpoint.setEventTopic("eventTopic"); - endpoint.setGroupId("groupId"); - endpoint.setGroupName("gn"); + endpoint.setConsumerId("groupId"); + endpoint.setConsumerGroup("gn"); endpoint.setName("name"); endpoint.setPoolSize(45); endpoint.setPollingDelay(10); endpoint.setUrl("url"); - assertTrue(endpoint.getApiSecret().compareTo("onapSecret") == 0); - assertTrue(endpoint.getApiKey().compareTo("onapSecret") == 0); + assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0); + assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0); assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0); - assertTrue(endpoint.getGroupId().compareTo("groupId") == 0); - assertTrue(endpoint.getGroupName().compareTo("gn") == 0); + assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0); + assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0); assertTrue(endpoint.getName().compareTo("name") == 0); assertTrue(endpoint.getPoolSize() == 45); assertTrue(endpoint.getPollingDelay() == 10); @@ -84,7 +83,7 @@ public class EventBusTest { @Test public void validateEventBusComponent() throws Exception { - EventBusComponent rc = new EventBusComponent(new TestCamelContext()); + DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext()); Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap<String, Object>()); assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint")); } @@ -92,22 +91,22 @@ public class EventBusTest { @Test public void validateConsumer() throws Exception { try { - EventBusComponent rc = new EventBusComponent(); - EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc); + DMaaPEventBusComponent rc = new DMaaPEventBusComponent(); + DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc); - endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); + endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); endpoint.setEventTopic("eventTopic"); - endpoint.setGroupId("groupId"); - endpoint.setGroupName("gn"); + endpoint.setConsumerId("groupId"); + endpoint.setConsumerGroup("gn"); endpoint.setName("name"); endpoint.setPoolSize(45); endpoint.setPollingDelay(10); endpoint.setUrl("url"); TestProcessor processor = new TestProcessor(); - EventBusConsumer consumer = new EventBusConsumer(endpoint, processor); - + EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor); + } catch (Exception ex) { StringWriter writer = new StringWriter(); |