diff options
Diffstat (limited to 'policy-endpoints/src/main')
3 files changed, 161 insertions, 30 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java new file mode 100644 index 00000000..e44d1f76 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + +import java.util.List; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for sending messages to a Topic using TopicSink. + */ +public class TopicSinkClient { + private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class); + + /** + * Coder used to encode messages being sent to the topic. + */ + private static final Coder CODER = new StandardCoder(); + + /** + * Topic to which messages are published. + */ + @Getter + private final String topic; + + /** + * Where messages are published. + */ + private final TopicSink sink; + + /** + * Constructs the object. + * + * @param topic topic to which messages should be published + * @throws TopicSinkClientException if the topic does not exist + */ + public TopicSinkClient(final String topic) throws TopicSinkClientException { + this.topic = topic; + + final List<TopicSink> lst = getTopicSinks(topic); + if (lst.isEmpty()) { + throw new TopicSinkClientException("no sinks for topic: " + topic); + } + + this.sink = lst.get(0); + } + + /** + * Sends a message to the topic, after encoding the message as json. + * + * @param message message to be encoded and sent + * @return {@code true} if the message was successfully sent/enqueued, {@code false} otherwise + */ + public boolean send(final Object message) { + try { + final String json = CODER.encode(message); + return sink.send(json); + + } catch (RuntimeException | CoderException e) { + logger.warn("send to {} failed because of {}", topic, e.getMessage(), e); + return false; + } + } + + // the remaining methods are wrappers that can be overridden by junit tests + + /** + * Gets the sinks for a given topic. + * + * @param topic the topic of interest + * @return the sinks for the topic + */ + protected List<TopicSink> getTopicSinks(final String topic) { + return TopicEndpoint.manager.getTopicSinks(topic); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java new file mode 100644 index 00000000..608393b3 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + +/** + * Exception thrown by TopicSink client classes. + */ +public class TopicSinkClientException extends Exception { + private static final long serialVersionUID = 1L; + + public TopicSinkClientException() { + super(); + } + + public TopicSinkClientException(final String message) { + super(message); + } + + public TopicSinkClientException(final Throwable cause) { + super(cause); + } + + public TopicSinkClientException(final String message, final Throwable cause) { + super(message, cause); + } + + public TopicSinkClientException(final String message, final Throwable cause, final boolean enableSuppression, + final boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java index ca0611cb..d4ccc494 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java @@ -8,9 +8,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,13 +25,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; import java.util.Map; import java.util.Map.Entry; import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; @@ -42,6 +38,7 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; +import org.onap.policy.common.utils.network.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +51,7 @@ public class JerseyClient implements HttpClient { * Logger. */ private static Logger logger = LoggerFactory.getLogger(JerseyClient.class); - + protected static final String JERSEY_DEFAULT_SERIALIZATION_PROVIDER = "com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider"; @@ -74,11 +71,11 @@ public class JerseyClient implements HttpClient { /** * Constructor. - * + * * <p>name the name https is it https or not selfSignedCerts are there self signed certs * hostname the hostname port port being used basePath base context userName user * password password - * + * * @param busTopicParams Input parameters object * @throws KeyManagementException key exception * @throws NoSuchAlgorithmException no algorithm exception @@ -116,25 +113,7 @@ public class JerseyClient implements HttpClient { ClientBuilder clientBuilder; SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); if (this.selfSignedCerts) { - sslContext.init(null, new TrustManager[] {new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) - throws CertificateException { - // always trusted - } - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) - throws CertificateException { - // always trusted - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - - } }, new SecureRandom()); + sslContext.init(null, NetworkUtil.getAlwaysTrustingManager(), new SecureRandom()); clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier((host, session) -> true); } else { @@ -153,7 +132,7 @@ public class JerseyClient implements HttpClient { } registerSerProviders(busTopicParams.getSerializationProvider()); - + this.client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true"); this.baseUrl = tmpBaseUrl.append(this.hostname).append(":").append(this.port).append("/") @@ -162,7 +141,7 @@ public class JerseyClient implements HttpClient { /** * Registers the serialization provider(s) with the client. - * + * * @param serializationProvider comma-separated list of serialization providers * @throws ClassNotFoundException if the serialization provider cannot be found */ |