aboutsummaryrefslogtreecommitdiffstats
path: root/common/src/main/java/org/onap/so/client/cds/CDSProcessingClient.java
blob: e40b936daa45b3dfc6d02170c8baa088e67e3330 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*-
 * ============LICENSE_START=======================================================
 * ONAP - SO
 * ================================================================================
 * Copyright (C) 2017 - 2019 Bell Canada, Deutsche Telekom.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.so.client.cds;

import io.grpc.ManagedChannel;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.so.client.KeyStoreLoader;
import org.onap.so.client.PreconditionFailedException;
import org.onap.so.client.RestPropertiesLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * The CDS processing client is using gRPC for communication between SO and CDS. That communication is configured to use
 * a streaming approach, meaning that client can send an event to which server can reply will multiple sub-responses,
 * until full completion of the processing.
 * </p>
 * <p>
 * In order for the caller to manage the callback, it is the responsibility of the caller to implement and provide a
 * {@link CDSProcessingListener} so received messages can be handled appropriately.
 * </p>
 *
 * Here is an example of implementation of such listener:
 * 
 * <pre>
 * new CDSProcessingListener {
 *
 *     &#64;Override
 *     public void onMessage(ExecutionServiceOutput message) {
 *         log.info("Received notification from CDS: {}", message);
 *     }
 *
 *     &#64;Override
 *     public void onError(Throwable t) {
 *         Status status = Status.fromThrowable(t);
 *         log.error("Failed processing blueprint {}", status, t);
 *     }
 * }
 * </pre>
 */
public class CDSProcessingClient implements AutoCloseable {

    private static final Logger log = LoggerFactory.getLogger(CDSProcessingClient.class);

    private ManagedChannel channel;
    private CDSProcessingHandler handler;

    public CDSProcessingClient(final CDSProcessingListener listener) {
        CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
        if (props == null) {
            throw new PreconditionFailedException(
                    "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
        }
        NettyChannelBuilder builder = NettyChannelBuilder.forAddress(props.getHost(), props.getPort())
                .nameResolverFactory(new DnsNameResolverProvider());
        if (props.getUseSSL()) {
            log.info("Configure SSL connection");
            KeyStore ks = KeyStoreLoader.getKeyStore();
            if (ks == null) {
                log.error("Can't load KeyStore");
                throw new RuntimeException("Can't load KeyStore to create secure channel");
            }
            try {
                TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                tmf.init(ks);
                builder.sslContext(GrpcSslContexts.forClient().trustManager(tmf).build());
            } catch (NoSuchAlgorithmException e) {
                log.error("Can't get default TrustManager algorithm");
                throw new RuntimeException(e);
            } catch (KeyStoreException e) {
                log.error("TrustManagerFactory initialization failed");
                throw new RuntimeException(e);
            } catch (SSLException e) {
                log.error("SslContext build error");
                throw new RuntimeException(e);
            }
        }
        if (props.getUseBasicAuth()) {
            log.info("Configure Basic authentication");
            builder.intercept(new BasicAuthClientInterceptor(props)).usePlaintext();
        }
        this.channel = builder.build();
        this.handler = new CDSProcessingHandler(listener);
        log.info("CDSProcessingClient started");
    }

    CDSProcessingClient(final ManagedChannel channel, final CDSProcessingHandler handler) {
        this.channel = channel;
        this.handler = handler;
    }

    /**
     * Sends a request to the CDS backend micro-service.
     *
     * The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The
     * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter
     * will decrement.
     *
     * It is the user responsibility to close the client.
     *
     * @param input request to send
     * @return CountDownLatch instance that can be use to #await for completeness of processing
     */
    public CountDownLatch sendRequest(ExecutionServiceInput input) {
        return handler.process(input, channel);
    }

    @Override
    public void close() {
        if (channel != null) {
            channel.shutdown();
        }
        log.info("CDSProcessingClient stopped");
    }
}