diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java')
-rwxr-xr-x | src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java | 336 |
1 files changed, 0 insertions, 336 deletions
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java deleted file mode 100755 index cfebe3b..0000000 --- a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java +++ /dev/null @@ -1,336 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.restconf - * ================================================================================ - * Copyright (C) 2018 Huawei. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.collectors.restconf.common; - -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import org.glassfish.jersey.media.sse.EventSource; -import org.glassfish.jersey.media.sse.SseFeature; -import org.json.JSONArray; -import org.json.JSONObject; -import org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPConfigurationParser; -import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.HttpHeaders; -import java.nio.file.Paths; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.Base64; - -import static org.onap.dcae.collectors.restconf.common.RestapiCallNodeUtil.getParameters; -import static org.onap.dcae.collectors.restconf.common.RestapiCallNodeUtil.addAuthType; - -public class RestConfProc { - - private static final Logger log = LoggerFactory.getLogger(RestConfProc.class); - - public static String format; - - private static RestConfContext ctx = new RestConfContext(); - - private static final Logger oplog = LoggerFactory.getLogger("org.onap.restconf.common.output"); - - private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>(); - - private final Map<String, String> paraMap = new HashMap<>(); - private static String cambriaConfigFile; - - public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; - - public static String streamID; - private ExecutorService executor = Executors.newCachedThreadPool(); - - public RestConfProc() { - } - - private void parseInputParameters(rrNvReadable settings) { - String tempFileName; - String restApiUrl; - String httpMetthod; - String respPrefix; - String skipSending; - String sseConnectUrl; - String restapiUser; - String restapiPassword; - String trustStoreFileName; - String trustStorePassword; - String keyStoreFileName; - String keyStorePassword; - String[] currentConfigFile; - - currentConfigFile = settings.getStrings(Constants.KSETTING_DMAAPCONFIGS, Constants.KDEFAULT_DMAAPCONFIGS); - cambriaConfigFile = currentConfigFile[0]; - - tempFileName = settings.getString(Constants.KDEFAULT_TEMP_FILENAME, null); - restApiUrl = settings.getString(Constants.KSETTING_REST_API_URL, null); - httpMetthod = settings.getString(Constants.KSETTING_HTTP_METHOD, null); - respPrefix = settings.getString(Constants.KSETTING_RESP_PREFIX, null); - skipSending = settings.getString(Constants.KSETTING_SKIP_SENDING, null); - sseConnectUrl = settings.getString(Constants.KSETTING_SSE_CONNECT_URL, null); - restapiUser = settings.getString(Constants.KSETTING_UNAME, null); - restapiPassword = settings.getString(Constants.KSETTING_PASSWORD, null); - trustStoreFileName = settings.getString(Constants.KSETTING_TRUST_STORE_FILENAME, null); - trustStorePassword = settings.getString(Constants.KSETTING_TRUST_STORE_PASSWORD, null); - keyStoreFileName = settings.getString(Constants.KSETTING_KEY_STORE_FILENAME, null); - keyStorePassword = settings.getString(Constants.KSETTING_KEY_STORE_PASSWORD, null); - format = settings.getString(Constants.KSETTING_FORMAT, null); - streamID = "route=route_failure"; - - paraMap.put(Constants.KDEFAULT_TEMP_FILENAME, tempFileName); - paraMap.put(Constants.KSETTING_REST_API_URL, restApiUrl); - paraMap.put(Constants.KSETTING_HTTP_METHOD, httpMetthod); - paraMap.put(Constants.KSETTING_RESP_PREFIX, respPrefix); - paraMap.put(Constants.KSETTING_SKIP_SENDING, skipSending); - paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, sseConnectUrl); - paraMap.put(Constants.KSETTING_FORMAT, format); - paraMap.put(Constants.KSETTING_UNAME, restapiUser); - paraMap.put(Constants.KSETTING_PASSWORD, restapiPassword); - paraMap.put(Constants.KSETTING_TRUST_STORE_FILENAME, trustStoreFileName); - paraMap.put(Constants.KSETTING_TRUST_STORE_PASSWORD, trustStorePassword); - paraMap.put(Constants.KSETTING_KEY_STORE_FILENAME, keyStoreFileName); - paraMap.put(Constants.KSETTING_KEY_STORE_PASSWORD, keyStorePassword); - - ctx.setAttribute("prop.encoding-json", "encoding-json"); - ctx.setAttribute("restapi-result.response-code", "200"); - ctx.setAttribute("restapi-result.ietf-subscribed-notifications:output.identifier", "100"); - } - - public RestConfProc(rrNvReadable settings) { - - parseInputParameters(settings); - - fProcessingInputQueue = new LinkedBlockingQueue<>(Constants.KDEFAULT_MAXQUEUEDEVENTS); - - EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog, - DMaaPConfigurationParser - .parseToDomainMapping(Paths.get(cambriaConfigFile)) - .get())); - ExecutorService executor = Executors.newFixedThreadPool(20); - for (int i = 0; i < 20; ++i) { - executor.execute(ep); - } - } - - /** - * To establish a subscription with controller by sending HTTP request - * - * @param paramMap holds the input configuration - * @param ctx restconf context - * @param url url to send subscription request - * @throws Exception exception - */ - public void establishSubscription(Map<String, String> paramMap, - RestConfContext ctx, - String url) throws Exception { - - RestapiCallNode restApiCallNode = new RestapiCallNode(); - - Map<String, String> params = new HashMap<>(); - params.put("restapiUrl", "https://" + url + "/controller/v2/tokens"); - params.put("httpMethod", "post"); - params.put("templateFileName", "./etc/access-token.json"); - params.put("skipSending", "false"); - params.put("format", "json"); - params.put("restapiUser", "test123"); - params.put("restapiPassword", "Changeme_123"); - params.put(Constants.KSETTING_TRUST_STORE_FILENAME, - paramMap.get(Constants.KSETTING_TRUST_STORE_FILENAME)); - params.put(Constants.KSETTING_TRUST_STORE_PASSWORD, "adminadmin"); - params.put(Constants.KSETTING_KEY_STORE_FILENAME, - paramMap.get(Constants.KSETTING_KEY_STORE_FILENAME)); - params.put(Constants.KSETTING_KEY_STORE_PASSWORD, "adminadmin"); - - String httpResponse = null; - try { - restApiCallNode.sendRequest(params, ctx, null); - httpResponse = ctx.getAttribute("httpResponse"); - JSONObject jsonObj = new JSONObject(httpResponse); - JSONObject data = jsonObj.getJSONObject("data"); - String tokenId = data.get("token_id").toString(); - paramMap.put("customHttpHeaders", "X-ACCESS-TOKEN=" + tokenId); - paramMap.put("TokenId", tokenId); - } catch (Exception e) { - log.info("Access token is not supported" + e.getMessage()); - log.info("http response" + httpResponse); - } - - restApiCallNode.sendRequest(paramMap, ctx, null); - - establishPersistentConnection(paramMap, ctx); - } - - /** - * To establish persistent connection after receiving successful subscription response from controller - * - * @param paramMap holds the input configuration - * @param ctx restconf context - */ - public void establishPersistentConnection(Map<String, String> paramMap, RestConfContext ctx) { - - // check whether response is ok - if (ctx.getAttribute(Constants.RESPONSE_CODE).equals(Constants.RESPONSE_CODE_200)) { - - String id = ctx.getAttribute(Constants.OUTPUT_IDENTIFIER); - - String url = paramMap.get(Constants.KSETTING_SSE_CONNECT_URL); - - PersistentConnection connection = new PersistentConnection(url, ctx, paramMap); - runnableInfo.put(id, connection); - executor.execute(connection); - } else { - // error response is already updated in ctx - log.info("Failed to subscribe"); - } - } - - /** - * Get input parameter map - * - * @return input parameters map - */ - public Map<String, String> getParaMap() { - return paraMap; - } - - - /** - * Get restConf context which has information about message encoding type - * - * @return restconf context - */ - public RestConfContext getCtx() { - return ctx; - } - - public class PersistentConnection implements Runnable { - private String url; - private RestConfContext ctx; - private Map<String, String> paramMap; - private volatile boolean running = true; - - public PersistentConnection(String url, RestConfContext ctx, Map<String, String> paramMap) { - this.url = url; - this.ctx = ctx; - this.paramMap = paramMap; - } - - @Override - public void run() { - Parameters p = null; - try { - p = getParameters(paramMap); - } catch (Exception e) { - log.error("Exception occured!", e); - Thread.currentThread().interrupt(); - } - - Client client = ignoreSslClient().register(SseFeature.class); - WebTarget target = addAuthType(client, p).target(url); - String token = paramMap.get("TokenId"); - String headerName = "X-ACCESS-TOKEN"; - if (token == null) { - headerName = HttpHeaders.AUTHORIZATION; - if(null!=p) { - token = getAuthorizationToken(p.restapiUser, p.restapiPassword); - } - } - AdditionalHeaderWebTarget newTarget = new AdditionalHeaderWebTarget(target, token, headerName); - EventSource eventSource = EventSource.target(newTarget).build(); - eventSource.register(new DataChangeEventListener(ctx)); - eventSource.open(); - log.debug("Connected to SSE source"); - while (running) { - try { - log.debug("SSE state " + eventSource.isOpen()); - Thread.sleep(5000); - } catch (InterruptedException ie) { - log.debug("Exception: " + ie.getMessage()); - Thread.currentThread().interrupt(); - } - } - eventSource.close(); - log.info("Closed connection to SSE source"); - } - } - - private String getAuthorizationToken(String userName, String password) { - return "Basic " + Base64.getEncoder().encodeToString(( - userName + ":" + password).getBytes()); - } - - /** - * To process the array of events which are received from controller - * - * @param a JSONArray - * @throws Exception exception - */ - public static void handleEvents(JSONArray a) throws Exception { - for (int i = 0; i < a.length(); i++) { - if (!fProcessingInputQueue.offer(a.getJSONObject(i))) { - throw new Exception(); - } - } - log.debug("RestConfCollector.handleEvents:EVENTS has been published successfully!"); - } - - private Client ignoreSslClient() { - SSLContext sslcontext = null; - - try { - sslcontext = SSLContext.getInstance("TLS"); - sslcontext.init(null, new TrustManager[]{new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { - } - - @Override - public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - } }, new java.security.SecureRandom()); - } catch (NoSuchAlgorithmException | KeyManagementException e) { - throw new IllegalStateException(e); - } - - return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build(); - } -} - - |