diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src')
27 files changed, 1450 insertions, 559 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java deleted file mode 100644 index 7c282911d..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java +++ /dev/null @@ -1,58 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.List; -import java.util.concurrent.Callable; - -public class CallableConsumer implements Callable<List<String>> { - - private Consumer consumer; - - private int timeout = 15000; - private int limit = 1000; - - public CallableConsumer(Consumer c) { - this.consumer = c; - } - - public CallableConsumer(Consumer c, int waitMs, int fetchSize) { - this.consumer = c; - this.timeout = waitMs; - this.limit = fetchSize; - } - - @Override - public List<String> call() { - return consumer.fetch(timeout, limit); - } - - /** - * The maximum amount of time to keep a connection alive. Currently is set to waitMs + 10s - * - * @return An integer representing the maximum amount of time to keep this thread alive - */ - public int getMaxLife() { - return 10000 + timeout; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java deleted file mode 100644 index 32034e5fb..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.List; - -public interface Consumer { - - /** - * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. - * - * @return A list of strings representing the messages pulled from the topic. - */ - public List<String> fetch(); - - /** - * Gets a batch of messages from the topic. - * - * @param waitMs - * The amount of time to wait in milliseconds if the topic is empty for data to be written. Should be no - * less than 15000ms to prevent too many requests - * @param limit - * The amount of messages to fetch - * @return A list of strings representing the messages pulled from the topic. - */ - public List<String> fetch(int waitMs, int limit); - - /** - * Updates the api credentials for making authenticated requests - * - * @param apiKey - * The public key to authenticate with - * @param apiSecret - * The secret key to authenticate with - */ - public void updateCredentials(String apiKey, String apiSecret); - - // TODO - Implement once Cambria allows you to set outside of constructor - // public void setFilter(String filter); - - /** - * Creates a dmaap client using a https connection - * - * @param yes - * True if https should be used, false otherwise - */ - public void useHttps(boolean yes); - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java deleted file mode 100644 index efbe194ba..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java +++ /dev/null @@ -1,26 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -public enum DmaapDestination { - DCAE -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java deleted file mode 100644 index 7d4a7c090..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java +++ /dev/null @@ -1,35 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.Map; - -import org.openecomp.appc.adapter.dmaap.event.EventMessage; -import org.openecomp.appc.exceptions.APPCException; -import org.openecomp.sdnc.sli.SvcLogicContext; -import org.openecomp.sdnc.sli.SvcLogicJavaPlugin; - - -public interface EventSender extends SvcLogicJavaPlugin{ - boolean sendEvent(DmaapDestination destination, EventMessage msg); - boolean sendEvent(DmaapDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException; -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java deleted file mode 100644 index 183e618ba..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.Set; - -public interface Manager { - - /** - * Updates the api credentials for making authenticated requests - * - * @param apiKey - * The public key to authenticate with - * @param apiSecret - * The secret key to authenticate with - */ - public void updateCredentials(String apiKey, String apiSecret); - - /** - * Return a set of strings representing topics that the user can see - * - * @return A set of strings with topic names or an empty set if no topics are visible - */ - public Set<String> getTopics(); - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java deleted file mode 100644 index f19c516be..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java +++ /dev/null @@ -1,46 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -public interface Producer { - - public boolean post(String partition, String data); - - /** - * Updates the api credentials for making authenticated requests - * - * @param apiKey - * The public key to authenticate with - * @param apiSecret - * The secret key to authenticate with - */ - public void updateCredentials(String apiKey, String apiSecret); - - /** - * Creates a dmaap client using a https connection - * - * @param yes - * True if https should be used, false otherwise - */ - public void useHttps(boolean yes); - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java deleted file mode 100644 index dd951fe37..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java +++ /dev/null @@ -1,64 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap.event; - -import com.fasterxml.jackson.annotation.JsonProperty; - - -public class EventHeader { - - @JsonProperty("eventTime") - private final String eventTime; - - @JsonProperty("apiVer") - private final String apiVer; - - @JsonProperty("eventId") - private final String eventId; - - public EventHeader(String eventTime, String apiVer, String eventId) { - this.eventTime = eventTime; - this.apiVer = apiVer; - this.eventId = eventId; - } - - public String getEventTime() { - return eventTime; - } - - public String getApiVer() { - return apiVer; - } - - public String getEventId() { - return eventId; - } - - @Override - public String toString() { - return "EventHeader{" + - "eventTime='" + eventTime + '\'' + - ", apiVer='" + apiVer + '\'' + - ", eventId='" + eventId + '\'' + - '}'; - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java deleted file mode 100644 index af5cff2f9..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java +++ /dev/null @@ -1,99 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap.event; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize.Inclusion; - -import java.io.IOException; -import java.io.Serializable; - -/* - { - "EventHeader": { - "eventTime": "2016-03-15T10:59:33.79Z", - "apiVer": "1.01", - "EventId": "<ECOMP_EVENT_ID>", - }, - "EventStatus": { - "code": "NNN", - "reason": "A reason" - } - } -*/ - - - - -@JsonSerialize(include = Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class EventMessage implements Serializable { - - private static final long serialVersionUID = 1L; - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - @JsonProperty("eventHeader") - private EventHeader eventHeader; - @JsonProperty("eventStatus") - private EventStatus eventStatus; - - public EventMessage(EventHeader eventHeader, EventStatus eventStatus) { - this.eventHeader = eventHeader; - this.eventStatus = eventStatus; - } - - public EventHeader getEventHeader() { - return eventHeader; - } - - public void setEventHeader(EventHeader eventHeader) { - this.eventHeader = eventHeader; - } - - public EventStatus getEventStatus() { - return eventStatus; - } - - public void setEventStatus(EventStatus eventStatus) { - this.eventStatus = eventStatus; - } - - public String toJson() { - try { - return OBJECT_MAPPER.writeValueAsString(this); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public String toString() { - return "EventMessage{" + - "eventHeader=" + eventHeader + - ", eventStatus=" + eventStatus + - '}'; - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java deleted file mode 100644 index f5d7a59d4..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java +++ /dev/null @@ -1,56 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap.event; - -import com.fasterxml.jackson.annotation.JsonProperty; - - -public class EventStatus { - - @JsonProperty("code") - private final Integer code; - - @JsonProperty("reason") - private final String reason; - - public EventStatus(Integer code, String aReason) { - this.code = code; - reason = aReason; - } - - - public Integer getCode() { - return code; - } - - public String getReason() { - return reason; - } - - @Override - public String toString() { - return "EventStatus{" + - "code=" + code + - ", reason='" + reason + '\'' + - '}'; - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java index c02553dfe..c7be330cf 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap; import org.openecomp.appc.configuration.ConfigurationFactory; import com.att.eelf.configuration.EELFLogger; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java index 654ec6f7f..0d0450681 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index 6e16d896b..2145eaa70 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -19,14 +19,13 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -38,10 +37,11 @@ import org.apache.http.client.utils.URIBuilder; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.json.JSONArray; +import org.openecomp.appc.adapter.message.Consumer; -public class DmaapConsumer extends CommonHttpClient implements Consumer { +public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; @@ -54,17 +54,17 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer { private boolean useHttps = false; - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) { this(hosts, topicName, consumerName, consumerId, null); } - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter) { this(hosts, topicName, consumerName, consumerId, filter, null, null); } - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter, String user, String password) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter, String user, String password) { urls = new ArrayList<String>(); for (String host : hosts) { urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); @@ -155,9 +155,10 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer { LOG.error("Interrupted while sleeping"); } } - - public void close(){ - //not used yet - } + + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java index 6845177b1..85e446dca 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; import java.util.ArrayList; @@ -34,10 +34,11 @@ import com.att.eelf.configuration.EELFManager; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; +import org.openecomp.appc.adapter.message.Producer; -public class DmaapProducer extends CommonHttpClient implements Producer { +public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); private static final String CONTENT_TYPE = "application/cambria"; private static final String URL_TEMPLATE = "%s/events/%s"; @@ -47,7 +48,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer { private boolean useHttps = false; - public DmaapProducer(Collection<String> urls, String topicName) { + public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { hosts = new ArrayList<String>(); topics = new HashSet<String>(); topics.add(topicName); @@ -57,7 +58,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer { } } - public DmaapProducer(Collection<String> urls, Set<String> topicNames) { + public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) { hosts = new ArrayList<String>(); topics = topicNames; @@ -126,8 +127,9 @@ public class DmaapProducer extends CommonHttpClient implements Producer { String m = (msg == null) ? "" : msg; return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); } - - public void close(){ - //not used yet - } + + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java new file mode 100644 index 000000000..342d52448 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -0,0 +1,231 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.IOException; +import java.util.*; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +//import com.att.nsa.cambria.client.CambriaConsumer; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.metricservice.MetricRegistry; +import org.openecomp.appc.metricservice.MetricService; +import org.openecomp.appc.metricservice.impl.MetricServiceImpl; +import org.openecomp.appc.metricservice.metric.Metric; +import org.openecomp.appc.metricservice.metric.MetricType; +import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.openecomp.appc.metricservice.policy.PublishingPolicy; +import org.openecomp.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapConsumerImpl implements Consumer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + // Default values + private static final int DEFAULT_TIMEOUT_MS = 60000; + private static final int DEFAULT_LIMIT = 1000; + private static MetricRegistry metricRegistry; + private String topic; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + private boolean useHttps = false; + private MRConsumer client = null; + private Properties props = null; + + + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { + this(urls, topicName, consumerGroupName, consumerId,user, password,null); + + } + + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { + this.topic = topicName; + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("group",consumerGroupName); + props.setProperty("id",consumerId); + props.setProperty("username",user); + props.setProperty("password",password); + if(filter != null) { + props.setProperty("filter", filter); + } + } + + + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry = metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if (metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + } + private MRConsumer getClient() { + return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + /** + * @return An instance of MRConsumer created from our class variables + */ + private synchronized MRConsumer getClient(int waitMs, int limit) { + try { + props.setProperty("timeout",String.valueOf(waitMs)); + props.setProperty("limit",String.valueOf(limit)); + String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); + return MRClientFactory.createConsumer ( topicProducerPropFileName); + } catch (IOException e1) { + LOG.error("failed to createConsumer",e1); + return null; + } + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + client = null; + } + + @Override + public List<String> fetch(int waitMs, int limit) { + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + List<String> out = new ArrayList<String>(); + + // Create client once and reuse it on subsequent fetches. This is + // to support failover to other servers in the DMaaP cluster. + if (client == null) { + LOG.info("Getting DMaaP Client ..."); + client = getClient(waitMs, limit); + } + try { + for (String s : client.fetch(waitMs, limit)) { + out.add(s); + if(isMetricEnabled){ + ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); + } + } + LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); + } catch (Exception e) { + // Connection exception + LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); + e.printStackTrace(); + try { + LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); + Thread.sleep(waitMs); + } catch (InterruptedException e2) { + LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); + } + } + + + return out; + } + + /** + * Close consumer Dmaap client + */ + @Override + public void close() { + LOG.debug("Closing Dmaap consumer client...."); + if (client != null) { + client.close(); + } + } + + @Override + public List<String> fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override + public String toString() { + String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); + String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); + String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); + return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + + private MetricService getMetricservice() { + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + // Get AAIadapter reference + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricServiceImpl) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java new file mode 100644 index 000000000..79d6b3db7 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -0,0 +1,220 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.*; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaBatchingPublisher; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.metricservice.MetricRegistry; +import org.openecomp.appc.metricservice.MetricService; +import org.openecomp.appc.metricservice.metric.Metric; +import org.openecomp.appc.metricservice.metric.MetricType; +import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.openecomp.appc.metricservice.policy.PublishingPolicy; +import org.openecomp.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapProducerImpl implements Producer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + + private Set<String> topics = new HashSet<String>(); + + private Properties props = null; + private static MetricRegistry metricRegistry; + private boolean useHttps = false; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + + private Set<MRBatchingPublisher> clients; + + + public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) { + this(urls, (Set<String>)null, user, password); + this.topics = new HashSet<>(); + if (topicName != null) { + for (String topic : topicName.split(",")) { + topics.add(topic); + } + } + } + + public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { + topics = topicNames; + if(urls == null || user == null || password == null){ + throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); + } + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("id", UUID.randomUUID().toString()); + props.setProperty("username",user); + props.setProperty("password",password); + } + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry=metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if(metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + + } + private Set<MRBatchingPublisher> getClients() { + Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>(); + for (String topic : topics) { + try { + String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); + final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); + out.add(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return out; + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + clients = null; + } + + @Override + public boolean post(String partition, String data) { + boolean success = true; + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + + // Create clients once and reuse them on subsequent posts. This is + // to support failover to other servers in the Dmaap cluster. + if ((clients == null) || (clients.isEmpty())) { + LOG.info("Getting CambriaBatchingPublisher Clients ..."); + clients = getClients(); + } + + for (MRBatchingPublisher client : clients) { + try { + LOG.debug(String.format("Posting %s to %s", data, client)); + client.send(partition, data); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + if(isMetricEnabled){ + ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); + } + return success; + } + + /** + * Close producer Dmaap client + */ + @Override + public void close() { + if ((clients == null) || (clients.isEmpty())) { + return; + } + + LOG.debug("Closing Dmaap producer clients...."); + for (MRBatchingPublisher client : clients) { + try { + client.close(1, TimeUnit.SECONDS); + } catch (IOException | InterruptedException e) { + LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); + e.printStackTrace(); + } + } + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + private MetricService getMetricservice() { +/* + return AppcDmaapAdapterActivator.getMetricService(); +*/ + + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricService) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java new file mode 100644 index 000000000..39857b856 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class DmaapUtil { + private final static String delimiter = "_"; + private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { + String topicPreferredRouteFileName = null; + topicPreferredRouteFileName = topic+"preferredRoute.properties"; + File fo= new File(topicPreferredRouteFileName); + if(!fo.exists()) { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); + Properties props = new Properties(); + props.load(inputStream); + String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1"; + props.setProperty("preferredRouteKey", fileName); + topicPreferredRouteFileName = topic + "preferredRoute.properties"; + props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); + } + return topicPreferredRouteFileName; + } + + public static String createConsumerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "consumer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + public static String createProducerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "producer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); + Properties defaultProps = new Properties(); + defaultProps.load(inputStream); + defaultProps.setProperty("topic",topic); + + String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); + if(props != null && !props.isEmpty()){ + defaultProps.putAll(props); + } + defaultProps.setProperty("topic",topic); + defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); + String id = defaultProps.getProperty("id"); + String topicConsumerPropFileName = defaultProfFileName; + topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + + defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); + return topicConsumerPropFileName; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java index 0f7d40f5b..c671c6fdb 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java @@ -19,33 +19,33 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap.impl; +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.openecomp.sdnc.sli.SvcLogicContext; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.openecomp.appc.adapter.dmaap.EventSender; -import org.openecomp.appc.adapter.dmaap.Producer; -import org.openecomp.appc.adapter.dmaap.DmaapDestination; -import org.openecomp.appc.adapter.dmaap.event.EventHeader; -import org.openecomp.appc.adapter.dmaap.event.EventMessage; -import org.openecomp.appc.adapter.dmaap.event.EventStatus; -import org.openecomp.appc.adapter.dmaap.DmaapProducer; +import org.openecomp.appc.adapter.message.EventSender; +import org.openecomp.appc.adapter.message.MessageDestination; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.message.event.EventHeader; +import org.openecomp.appc.adapter.message.event.EventMessage; +import org.openecomp.appc.adapter.message.event.EventStatus; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; import org.openecomp.appc.exceptions.APPCException; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.openecomp.sdnc.sli.SvcLogicContext; - -public class EventSenderImpl implements EventSender +public class EventSenderDmaapImpl implements EventSender { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderImpl.class); - public static final String EVENT_TOPIC_WRITE = "event.topic.write"; - public static final String EVENT_CLIENT_KEY = "event.client.key"; - public static final String EVENT_CLIENT_SECRET = "event.client.secret"; - public static final String EVENT_POOL_MEMBERS = "event.pool.members"; + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); + public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; + public static final String DMAAP_USERNAME = "dmaap.appc.username"; + public static final String DMAAP_PASSWORD = "dmaap.appc.password"; + public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members"; private static Configuration configuration = ConfigurationFactory.getConfiguration(); @@ -59,21 +59,21 @@ public class EventSenderImpl implements EventSender this.producerMap = producerMap; } - public EventSenderImpl(){ + public EventSenderDmaapImpl(){ } public void initialize(){ Properties properties = configuration.getProperties(); String writeTopic; - String apiKey; - String apiSecret; + String username; + String password; final List<String> pool = new ArrayList<>(); - for(DmaapDestination destination:DmaapDestination.values()){ + for(MessageDestination destination: MessageDestination.values()){ writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); - apiKey = properties.getProperty(destination + "." + EVENT_CLIENT_KEY); - apiSecret = properties.getProperty(destination + "." + EVENT_CLIENT_SECRET); + username = properties.getProperty(destination + "." + DMAAP_USERNAME); + password = properties.getProperty(destination + "." + DMAAP_PASSWORD); String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); if (hostNames != null && !hostNames.isEmpty()) { @@ -83,12 +83,8 @@ public class EventSenderImpl implements EventSender LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("apiKey = %s, taken from property: %s", apiKey, destination + "." + EVENT_CLIENT_KEY)); - Producer producer = new DmaapProducer(pool, writeTopic); - - if (apiKey != null && apiSecret != null) { - producer.updateCredentials(apiKey, apiSecret); - } + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); for (String url : pool) { if (url.contains("3905") || url.contains("https")) { @@ -103,7 +99,7 @@ public class EventSenderImpl implements EventSender } @Override - public boolean sendEvent(DmaapDestination destination,EventMessage msg) { + public boolean sendEvent(MessageDestination destination, EventMessage msg) { String jsonStr = msg.toJson(); String id = msg.getEventHeader().getEventId(); LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); @@ -112,7 +108,43 @@ public class EventSenderImpl implements EventSender } @Override - public boolean sendEvent(DmaapDestination destination,Map<String, String> params, SvcLogicContext ctx) throws APPCException { + public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) { + String jsonStr = msg.toJson(); + String id = msg.getEventHeader().getEventId(); + LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); + Producer producer = createProducer(destination, eventTopicName); + return producer.post(id, jsonStr); + } + + private Producer createProducer(MessageDestination destination, String eventTopicName) { + Properties properties = configuration.getProperties(); + final List<String> pool = new ArrayList<>(); + String username = properties.getProperty(destination + "." + DMAAP_USERNAME); + String password = properties.getProperty(destination + "." + DMAAP_PASSWORD); + String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); + + if (hostNames != null && !hostNames.isEmpty()) { + LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); + Collections.addAll(pool, hostNames.split(",")); + } + + LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); + LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password); + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + LOG.debug("Producer should use HTTPS"); + producer.useHttps(true); + break; + } + } + return producer; + } + + @Override + public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException { if (params == null) { String message = "Parameters map is empty (null)"; @@ -134,10 +166,10 @@ public class EventSenderImpl implements EventSender LOG.error(message); throw new APPCException(message); } - EventMessage dmaapEventMessage = new EventMessage( + EventMessage eventMessage = new EventMessage( new EventHeader(eventTime, apiVer, eventId), new EventStatus(code, reason)); - return sendEvent(destination,dmaapEventMessage); + return sendEvent(destination,eventMessage); } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml index a8caf666f..eefe8e504 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -24,10 +24,10 @@ <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> - <bean id="eventSenderBean" class="org.openecomp.appc.adapter.dmaap.impl.EventSenderImpl" + <bean id="eventSenderBean" class="org.openecomp.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl" init-method="initialize" scope="singleton"> </bean> - <service id="eventSenderService" interface="org.openecomp.appc.adapter.dmaap.EventSender" ref="eventSenderBean" /> + <service id="eventSenderService" interface="org.openecomp.appc.adapter.message.EventSender" ref="eventSenderBean" /> </blueprint> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties new file mode 100644 index 000000000..facd33e4d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties @@ -0,0 +1,57 @@ +### +# ============LICENSE_START======================================================= +# openECOMP : APP-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH > +TransportType=HTTPAAF +Latitude =50.000000 +Longitude =-100.000000 +Version =1.0 +ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events +Environment =TEST +Partner=BOT_R +routeOffer=MR1 +SubContextPath =/ +Protocol =http +MethodType =GET +username =admin +password =admin +contenttype =application/json +authKey=01234567890abcde:01234567890abcdefghijklmn +authDate=2016-02-18T13:57:37-0800 +host=127.0.0.1 +topic=org.openecomp.appc.UNIT-TEST +group=jmsgrp +id=2 +timeout=15000 +limit=1000 +filter= +AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler +AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler +AFT_DME2_REQ_TRACE_ON=true +AFT_ENVIRONMENT=AFTUAT +AFT_DME2_EP_CONN_TIMEOUT=15000 +AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000 +AFT_DME2_EP_READ_TIMEOUT_MS=50000 +sessionstickinessrequired=NO +DME2preferredRouterFilePath=preferredRoute.txt + + + diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt new file mode 100644 index 000000000..662b0aa7d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt @@ -0,0 +1 @@ +preferredRouteKey=MR1
\ No newline at end of file diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties new file mode 100644 index 000000000..4901d517e --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties @@ -0,0 +1,52 @@ +### +# ============LICENSE_START======================================================= +# openECOMP : APP-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH > +TransportType=HTTPAAF +Latitude =50.000000 +Longitude =-100.000000 +Version =1.0 +ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events +Environment =TEST +Partner=BOT_R +SubContextPath =/ +Protocol =http +MethodType =POST +username =admin +password =admin +contenttype = application/json +authKey=01234567890abcde:01234567890abcdefghijklmn +authDate=2016-07-20T11:30:56-0700 +host=127.0.0.1 +topic=org.openecomp.appc.UNIT-TEST +partition=2 +maxBatchSize=100 +maxAgeMs=250 +AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler +AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler +AFT_DME2_REQ_TRACE_ON=true +AFT_ENVIRONMENT=AFTUAT +AFT_DME2_EP_CONN_TIMEOUT=15000 +AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000 +AFT_DME2_EP_READ_TIMEOUT_MS=50000 +sessionstickinessrequired=NO +DME2preferredRouterFilePath=preferredRoute.txt +MessageSentThreadOccurance=50 diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java new file mode 100644 index 000000000..a0ae92ea8 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java @@ -0,0 +1,134 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + +import java.io.*; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.mr.client.MRConsumer; +import org.json.JSONObject; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + + +/** + *An example of how to use the Java publisher. + */ +public class SimpleExamplePublisher +{ + + public static void main(String []args) throws InterruptedException, Exception{ + int msgCount = 1; + SimpleExamplePublisher publisher = new SimpleExamplePublisher(); + + int i=0; + + String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.openecomp.appc.UNIT-TEST", null); + while (i< msgCount) + { + publisher.publishMessage(topicProducerPropFileName,i); + i++; + } + + fetchMessage(); + } + + + public void publishMessage( String producerFilePath,int count ) throws IOException, InterruptedException, Exception + { + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "Partition:2", "Message:" +count); + //msg1.put ( "greeting", "Hello .." ); + + pub.send ( "2", msg1.toString()); + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + System.err.println ( stuck.size() + " messages unsent" ); + } + else + { + System.out.println ( "Clean exit; all messages sent." ); + } + } + + + public static void fetchMessage() + { + int count = 0; + + try + { + String topic = "org.openecomp.appc.UNIT-TEST"; + Properties props = new Properties(); + props.put("id", "1"); + props.put("group", "group1"); + String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props); + final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1); + + props = new Properties(); + props.put("id", "2"); + props.put("group", "group2"); + String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props); + final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2); + + for ( String msg : consumer1.fetch () ) + { + count++; + System.out.println ( "consumer1 "+count + ": " + msg ); + } + for ( String msg : consumer2.fetch () ) + { + count++; + System.out.println ( "consumer1 "+count + ": " + msg ); + } + + + } + catch ( Exception x ) + { + System.out.println("inside cons exc"); + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} + + + + + + + + + diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java index e97014198..6626938a7 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java @@ -19,13 +19,13 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import org.junit.Test; -import org.openecomp.appc.adapter.dmaap.AppcDmaapAdapterActivator; +import org.openecomp.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator; public class TestAppcDmaapAdapterActivator { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java new file mode 100644 index 000000000..23647d61a --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.junit.Ignore; + +import java.util.Arrays; +import java.util.List; + +/** + * Must have a DMaaP cluster or simulator up and running + * Update the hostname, topic, client properties in + * resources/org/openecomp/appc/default.properties + * + */ +public class TestDmaapConsuming { + + private static Consumer dmaapConsumer; + private static Consumer httpConsumer; + + @BeforeClass + public static void setUp() { + + Configuration configuration = ConfigurationFactory.getConfiguration(); + + List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); + String topic = configuration.getProperty("topic.read"); + String consumerName = configuration.getProperty("client.name"); + String consumerId = configuration.getProperty("client.name.id"); + String msgFilter = configuration.getProperty("message.filter"); + String user = configuration.getProperty("dmaap.appc.username"); + String password = configuration.getProperty("dmaap.appc.password"); + + httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter); + dmaapConsumer = new DmaapConsumerImpl(hosts, topic, consumerName, consumerId,user,password,msgFilter); + } + + @Test + @Ignore + public void testHttpFetchMessages() { + testFetchMessages(httpConsumer); + } + + @Test + @Ignore + public void testFetchMessages() { + testFetchMessages(dmaapConsumer); + } + + private void testFetchMessages(Consumer consumer) { + List<String> messages = consumer.fetch(1000, 100); + Assert.assertNotNull(messages); + Assert.assertFalse(messages.isEmpty()); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java new file mode 100644 index 000000000..b35f9871d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java @@ -0,0 +1,169 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + +import org.openecomp.sdnc.sli.SvcLogicContext; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.openecomp.appc.adapter.message.MessageDestination; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.message.event.EventHeader; +import org.openecomp.appc.adapter.message.event.EventMessage; +import org.openecomp.appc.adapter.message.event.EventStatus; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.exceptions.APPCException; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +public class TestDmaapEventSender { + + private static Properties props; + private static Map<String,Producer> producerMap = new HashMap<>(); + private static EventMessage eventMessage; + + @BeforeClass + public static void setUp() { + + Configuration configuration = ConfigurationFactory.getConfiguration(); // test.properties file placed in home dir. + + props = new Properties(); + props.setProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS, + configuration.getProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS) != null ? + configuration.getProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS) : "member1,member2,member3"); + props.setProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE, + configuration.getProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE) != null ? + configuration.getProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE) : "topic1"); + + String eventClientKey = configuration.getProperty(EventSenderDmaapImpl.DMAAP_USERNAME); + if (eventClientKey != null) { + props.setProperty(EventSenderDmaapImpl.DMAAP_USERNAME,eventClientKey); + } + String eventClientSecret = configuration.getProperty(EventSenderDmaapImpl.DMAAP_PASSWORD); + if (eventClientSecret != null) { + props.setProperty(EventSenderDmaapImpl.DMAAP_PASSWORD, eventClientSecret); + } + + Producer producer = Mockito.mock(DmaapProducerImpl.class); + producerMap.put(MessageDestination.DCAE.toString(),producer); + Mockito.when(producer.post(Matchers.anyString(), Matchers.anyString())).thenReturn(true); + + eventMessage = new EventMessage( + new EventHeader("2016-03-15T10:59:33.79Z", "1.01", "17"), + new EventStatus(404, "No krokodil found")); + } + + @Test + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderWithProperties() { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage)); + } + + @Test + public void testDmaapEventSenderWithNullProperties() { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); +// eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage)); + } + + /* + * This test runs agains a real Dmaap (or a simulator) that should be cofigured in test.properties file. + */ + @Test + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderWithDmaapSim() { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage)); + } + + + @Test + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDG() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = new HashMap<>(); + + params.put("eventTime", eventMessage.getEventHeader().getEventTime()); + params.put("apiVer", eventMessage.getEventHeader().getApiVer()); + params.put("eventId", eventMessage.getEventHeader().getEventId()); + params.put("reason", eventMessage.getEventStatus().getReason()); + params.put("code", "200"); + + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext())); + } + + @Test(expected = APPCException.class) + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDGNoParams() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = new HashMap<>(); + + Assert.assertFalse(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext())); + } + + + @Test(expected = APPCException.class) + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDGNullParam() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = null; + + Assert.assertFalse(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext())); + } + + @Test(expected = APPCException.class) + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDGNoParam() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = new HashMap<>(); + +// params.put("apiVer", eventMessage.getEventHeader().getApiVer()); + params.put("eventId", eventMessage.getEventHeader().getEventId()); + params.put("reason", eventMessage.getEventStatus().getReason()); + params.put("code", "200"); + + eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext()); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java new file mode 100644 index 000000000..b5b7c9538 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java @@ -0,0 +1,80 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; + +import java.util.Arrays; +import java.util.List; + +/** + * Must have a DMaaP cluster or simulator up and running + * Update the hostname, topic, client properties in + * resources/org/openecomp/appc/default.properties + * + */ +public class TestDmaapProducing { + + private static Producer httpProducer; + private static Producer dmaapProducer; + + @BeforeClass + public static void setUp() { + + Configuration configuration = ConfigurationFactory.getConfiguration(); + + List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); + String topic = configuration.getProperty("topic.write"); + String user = configuration.getProperty("dmaap.appc.username"); + String password = configuration.getProperty("dmaap.appc.password"); + + dmaapProducer = new DmaapProducerImpl(hosts, topic,user,password); + httpProducer = new HttpDmaapProducerImpl(hosts, topic); + httpProducer.updateCredentials(user,password); + } + + @Test + @Ignore + public void testHttpPostMessage() { + testPostMessage(httpProducer); + } + + @Test + @Ignore + public void testPostMessages() { + testPostMessage(dmaapProducer); + } + + private void testPostMessage(Producer producer) { + Assert.assertTrue(producer.post("partition", "{\"message\": \"Hello, world!\"}")); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java new file mode 100644 index 000000000..4b936351a --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java @@ -0,0 +1,242 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.*; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; + +public class TestConsumerProducerImpl { + + private Collection<String> urls; + private String topicRead; + private String topicWrite; + private String group; + private String groupId; + private String user; + private String password; + + @Before + public void setup() { + System.out.println("setup entry..."); +// urls = new HashSet<String>(); +// urls.add("dmaaphost1"); +// urls.add("dmaaphost2"); +// //remove unavailable dmaap instance for build +// //urls.add("dmaaphost3"); +// +// topicRead = "APPC-UNIT-TEST"; +// topicWrite = "APPC-UNIT-TEST"; +// group = "APPC-CLIENT"; +// groupId = "0"; + Configuration configuration = ConfigurationFactory.getConfiguration(); + List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); + urls = new HashSet<String>(hosts); + topicRead = configuration.getProperty("topic.read"); + topicWrite = configuration.getProperty("topic.write"); + user = configuration.getProperty("dmaap.appc.username"); + password = configuration.getProperty("dmaap.appc.password"); + group = "APPC-CLIENT"; + groupId = "0"; + + + runoff(); + } + + /** + * Test that we can read and write and that the messages come back in order + */ + @Ignore + @Test + public void testWriteRead() { + System.out.println("testWriteRead entry..."); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + + String s1 = UUID.randomUUID().toString(); + String s2 = UUID.randomUUID().toString(); + if (p.post("TEST", s1) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s1); + } + if (p.post("TEST", s2) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s2); + } + + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> out = c.fetch(); + // if fetch is empty, try again - a 2nd attempt may succeed if + // cambria client has failed over + if ((out == null) || out.isEmpty()) { + out = c.fetch(); + } + + assertNotNull(out); + assertEquals(2, out.size()); + assertEquals(s1, out.get(0)); + assertEquals(s2, out.get(1)); + + } + + /** + * Test that we can read and write and that the messages come back in order + */ + @Test + @Ignore // Https Not support on jenkins server + public void testWriteReadHttps() { + System.out.println("testWriteReadHttps entry..."); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + p.useHttps(true); + + String s1 = UUID.randomUUID().toString(); + String s2 = UUID.randomUUID().toString(); + if (p.post("TEST", s1) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s1); + } + if (p.post("TEST", s2) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s2); + } + + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + c.useHttps(true); + + List<String> out = c.fetch(); + // if fetch is empty, try again - a 2nd attempt may succeed if + // cambria client has failed over + if ((out == null) || out.isEmpty()) { + out = c.fetch(); + } + + assertNotNull(out); + assertEquals(2, out.size()); + assertEquals(s1, out.get(0)); + assertEquals(s2, out.get(1)); + + } + + @Test + @Ignore // requires connection to a live DMaaP server + public void testBadUrl() { + System.out.println("testBadUrl entry..."); + urls.clear(); + urls.add("something.local"); + + // Producer p = new DmaapProducerImpl(urls, topicWrite); + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> result = c.fetch(1000, 1000); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + @Ignore // requires connection to a live DMaaP server + public void testAuth() { + System.out.println("testAuth entry..."); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + + p.updateCredentials("key", "secret"); + c.updateCredentials("key", "secret"); + + // TODO - Do some protected dmaap queries when the apis are updated + } + + /** + * Test DMaaP client failover to another server when a bad url is encountered + + */ + @Ignore + @Test + public void testFailover() { + System.out.println("testFailover entry..."); + urls.clear(); + urls.add("openecomp2.org"); // bad url + urls.add("dmaaphost2"); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + + String s1 = UUID.randomUUID().toString(); + if (p.post("TEST", s1) == false) { + // try again - cambria client should have failed over + p.post("TEST", s1); + } + + urls.clear(); + urls.add("openecomp3.org"); // bad url + urls.add("dmaaphost3"); + + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> out = c.fetch(1000, 1000); + // if fetch is empty, try again - cambria client should have failed over + if ((out == null) || out.isEmpty()) { + out = c.fetch(); + } + + assertNotNull(out); + assertEquals(1, out.size()); + assertEquals(s1, out.get(0)); + } + + /** + * Reads through the entire topic so it is clean for testing. WARNING - ONLY USE ON TOPICS WHERE YOU ARE THE ONLY + * WRITER. Could end in an infinite loop otherwise. + */ + private void runoff() { + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> data; + do { + data = c.fetch(1000, 10000); + } while (!data.isEmpty() && data.size()!=1); + } + + @Test + @Ignore + public void testFilter() { + System.out.println("testFilter entry..."); + List<String> res; + String filter = "{\"class\":\"Assigned\",\"field\":\"request\"}"; + Consumer c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password,filter); + res = c.fetch(2000, 10); + assertFalse(res.isEmpty()); + + res.clear(); + filter = "{\"class\":\"Assigned\",\"field\":\"response\"}"; + c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password, filter); + res = c.fetch(2000, 10); + assertTrue(res.isEmpty()); + } +} |