diff options
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java | 8 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java | 184 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusComponent.java (renamed from src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java) | 44 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusConsumer.java | 1 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusEndPoint.java | 108 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusProducer.java | 11 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/KafkaEventBusComponent.java | 49 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java | 129 | ||||
-rw-r--r-- | src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus | 1 | ||||
-rw-r--r-- | src/main/resources/META-INF/services/org/apache/camel/component/event-bus | 1 | ||||
-rw-r--r-- | src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus | 1 |
11 files changed, 137 insertions, 400 deletions
diff --git a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java index abbfe63..9a5abee 100644 --- a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java +++ b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java @@ -24,18 +24,12 @@ import org.apache.camel.Component; import org.apache.camel.impl.DefaultEndpoint; public abstract class AbstractEventBusEndpoint extends DefaultEndpoint { - public AbstractEventBusEndpoint() { - } public AbstractEventBusEndpoint(String endpointUri, Component component) { super(endpointUri, component); } - public AbstractEventBusEndpoint(String endpointUri) { - super(endpointUri); - } - - abstract void close(); + abstract void close() throws Exception; abstract int getPollingDelay(); abstract int getPoolSize(); abstract String getEventTopic(); diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java deleted file mode 100644 index 3cc37e8..0000000 --- a/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - - -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriPath; -import org.eclipse.jetty.util.security.Password; -import org.onap.aai.event.client.DMaaPEventConsumer; - -/** - * Represents a EventBus endpoint. - */ -@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name", - consumerClass = EventBusConsumer.class, title = "dmaap-event-bus") -public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint { - @UriPath - @Metadata(required = "true") - private String name; - - @UriParam(label = "eventTopic") - @Metadata(required = "true") - private String eventTopic; - @UriParam(label = "consumerGroup") - @Metadata(required = "true") - private String consumerGroup; - @UriParam(label = "consumerId") - @Metadata(required = "true") - private String consumerId; - @UriParam(label = "username") - private String username; - @UriParam(label = "password") - private String password; - @UriParam(label = "url") - @Metadata(required = "true") - private String url; - @UriParam(label = "poolSize") - @Metadata(required = "true", defaultValue="20") - private int poolSize = 20; - @UriParam(label = "pollingDelay") - @Metadata(required = "true", defaultValue="30000") - private int pollingDelay = 30000; - @UriParam(label = "transportType") - @Metadata(required = "true", defaultValue="HTTPAUTH") - private String transportType = "HTTPAUTH"; - - private DMaaPEventConsumer dmaapConsumer; - - public DMaaPEventBusEndpoint() {} - - public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) { - super(uri, component); - } - - public DMaaPEventBusEndpoint(String endpointUri) { - super(endpointUri); - } - - @Override - void close() { - // Don't have to do anything for DMaaP - } - - @Override - public Producer createProducer() throws Exception { - return new EventBusProducer(this); - } - @Override - public Consumer createConsumer(Processor processor) throws Exception { - // TODO: other overloads based on filled-in properties - dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, username, Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType); - return new EventBusConsumer(this, processor, dmaapConsumer); - } - @Override - public boolean isSingleton() { - return false; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public String getEventTopic() { - return eventTopic; - } - - public void setEventTopic(String eventTopic) { - this.eventTopic = eventTopic; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - public String getConsumerId() { - return consumerId; - } - - public void setConsumerId(String consumerId) { - this.consumerId = consumerId; - } - - public String getUsername() { - return username == null ? null : Password.deobfuscate(username); - //return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password == null ? null : Password.deobfuscate(password); - //return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolsize) { - this.poolSize = poolsize; - } - - public int getPollingDelay() { - return pollingDelay; - } - - public void setPollingDelay(int pollingDelay) { - this.pollingDelay = pollingDelay; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - public String getTransportType() { - return transportType; - } - - public void setTransportType(String transportType) { - this.transportType = transportType; - } -} - diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java b/src/main/java/org/onap/aai/event/EventBusComponent.java index 696e0f4..766382b 100644 --- a/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java +++ b/src/main/java/org/onap/aai/event/EventBusComponent.java @@ -2,8 +2,8 @@ * ============LICENSE_START======================================================= * org.onap.aai * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 Amdocs * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,30 +20,24 @@ */ package org.onap.aai.event; -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.UriEndpointComponent; - import java.util.Map; -/** - * Represents the component that manages {@link DMaaPEventBusEndpoint}. - */ -public class DMaaPEventBusComponent extends UriEndpointComponent { - - public DMaaPEventBusComponent() { - super(DMaaPEventBusEndpoint.class); - } - - public DMaaPEventBusComponent(CamelContext context) { - super(context, DMaaPEventBusEndpoint.class); - } +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) - throws Exception { - Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this); - setProperties(endpoint, parameters); - return endpoint; - } +public class EventBusComponent extends DefaultComponent { + public EventBusComponent() { + super(); + } + public EventBusComponent(CamelContext context) { + super(context); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + Endpoint endpoint = new EventBusEndPoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } } diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index da8ffb5..b189cfd 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -31,7 +31,6 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import java.util.Collections; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; diff --git a/src/main/java/org/onap/aai/event/EventBusEndPoint.java b/src/main/java/org/onap/aai/event/EventBusEndPoint.java new file mode 100644 index 0000000..621b30f --- /dev/null +++ b/src/main/java/org/onap/aai/event/EventBusEndPoint.java @@ -0,0 +1,108 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; + +@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name", +consumerClass = EventBusConsumer.class, title = "event-bus") +public class EventBusEndPoint extends AbstractEventBusEndpoint { + @UriParam(label = "eventTopic") + @Metadata(required = "true") + private String eventTopic; + @UriParam(label = "poolSize") + @Metadata(required = "true", defaultValue="20") + private int poolSize = 20; + @UriParam(label = "pollingDelay") + @Metadata(required = "true", defaultValue="30000") + private int pollingDelay = 30000; + + EventConsumer consumer; //This would be injected via bean through camel route when passed with # + + EventPublisher publisher; //This would be injected via bean through camel route when passed with # + + private Logger logger = LoggerFactory.getInstance().getLogger(EventBusEndPoint.class); + + public EventBusEndPoint(String uri, EventBusComponent component) { + super(uri, component); + } + + @Override + public Producer createProducer() throws Exception { + return new EventBusProducer(this, publisher); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new EventBusConsumer(this, processor, consumer); + } + + @Override + public boolean isSingleton() { + return false; + } + + void close() throws Exception { + if(consumer != null) + consumer.close(); + if(publisher != null) + publisher.close(); + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + public void setPollingDelay(int pollingDelay) { + this.pollingDelay = pollingDelay; + } + + public int getPollingDelay() { + return pollingDelay; + } + public int getPoolSize() { + return poolSize; + } + public String getEventTopic() { + return eventTopic; + } + + public void setEventTopic(String eventTopic) { + this.eventTopic = eventTopic; + } + + public void setConsumer(EventConsumer consumer) { + this.consumer = consumer; + } + + public void setPublisher(EventPublisher publisher) { + this.publisher = publisher; + } +} diff --git a/src/main/java/org/onap/aai/event/EventBusProducer.java b/src/main/java/org/onap/aai/event/EventBusProducer.java index dd8dbed..dfd1bfe 100644 --- a/src/main/java/org/onap/aai/event/EventBusProducer.java +++ b/src/main/java/org/onap/aai/event/EventBusProducer.java @@ -22,17 +22,22 @@ package org.onap.aai.event; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.onap.aai.event.api.EventPublisher; /** * The EventBus producer. */ public class EventBusProducer extends DefaultProducer { private AbstractEventBusEndpoint endpoint; + + private EventPublisher publisher; - public EventBusProducer(AbstractEventBusEndpoint endpoint) { - super(endpoint); - this.endpoint = endpoint; + public EventBusProducer(AbstractEventBusEndpoint endpoint, EventPublisher publisher) { + super(endpoint); + this.endpoint = endpoint; + this.publisher = publisher; } + @Override public void process(Exchange exchange) throws Exception { // Publishing to event bus is currently not supported diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java deleted file mode 100644 index 07b4c82..0000000 --- a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.UriEndpointComponent; - -import java.util.Map; - -/** - * Represents the component that manages {@link KafkaEventBusEndpoint}. - */ -public class KafkaEventBusComponent extends UriEndpointComponent { - - public KafkaEventBusComponent() { - super(KafkaEventBusEndpoint.class); - } - - public KafkaEventBusComponent(CamelContext context) { - super(context, KafkaEventBusEndpoint.class); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) - throws Exception { - Endpoint endpoint = new KafkaEventBusEndpoint(uri, this); - setProperties(endpoint, parameters); - return endpoint; - } -} diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java deleted file mode 100644 index 4194c89..0000000 --- a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.onap.aai.event.client.KafkaEventConsumer; - -/** - * Represents a EventBus endpoint. - */ -@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name", - consumerClass = EventBusConsumer.class, title = "kafka-event-bus") -public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint { - @UriParam(label = "url") - @Metadata(required = "true") - private String url; - @UriParam(label = "eventTopic") - @Metadata(required = "true") - private String eventTopic; - @UriParam(label = "consumerGroup") - @Metadata(required = "true") - private String consumerGroup; - @UriParam(label = "poolSize") - @Metadata(required = "true", defaultValue="20") - private int poolSize = 20; - @UriParam(label = "pollingDelay") - @Metadata(required = "true", defaultValue="30000") - private int pollingDelay = 30000; - - private KafkaEventConsumer consumer; - - public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) { - super(uri, component); - } - - @Override - public Producer createProducer() throws Exception { - return new EventBusProducer(this); - } - - @Override - public Consumer createConsumer(Processor processor) throws Exception { - consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup); - return new EventBusConsumer(this, processor, consumer); - } - - @Override - public boolean isSingleton() { - return false; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - @Override - String getEventTopic() { - return eventTopic; - } - - public void setEventTopic(String eventTopic) { - this.eventTopic = eventTopic; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - @Override - int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - - @Override - void close() { - consumer.close(); - } - - @Override - int getPollingDelay() { - return pollingDelay; - } - - public void setPollingDelay(int pollingDelay) { - this.pollingDelay = pollingDelay; - } - - - - - - -} diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus deleted file mode 100644 index f1fee02..0000000 --- a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus +++ /dev/null @@ -1 +0,0 @@ -class=org.onap.aai.event.DMaaPEventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus new file mode 100644 index 0000000..f795067 --- /dev/null +++ b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus @@ -0,0 +1 @@ +class=org.onap.aai.event.EventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus deleted file mode 100644 index 89f243d..0000000 --- a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus +++ /dev/null @@ -1 +0,0 @@ -class=org.onap.aai.event.KafkaEventBusComponent |