diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java | 222 |
1 files changed, 104 insertions, 118 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java index 52e3d2dc..8ee0f2d2 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java @@ -62,7 +62,6 @@ import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Date; @@ -81,15 +80,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.servlet.ServletException; -import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.eclipse.jetty.server.ServerConnector; import org.glassfish.jersey.client.ClientProperties; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.HttpClient; @@ -97,7 +93,6 @@ import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; -import org.onap.policy.drools.system.PolicyEngineConstants; import org.onap.policy.drools.utils.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,10 +121,6 @@ public class Server implements Comparable<Server> { // the current REST server private static HttpServletServer restServer; - // incoming packets from HTTP - private static LinkedTransferQueue<byte[]> incomingPackets = - new LinkedTransferQueue<>(); - /*==================================================*/ /* Some properties extracted at initialization time */ /*==================================================*/ @@ -212,6 +203,9 @@ public class Server implements Comparable<Server> { static final int SOCKET_ADDRESS_TAG = 1; static final int SITE_SOCKET_ADDRESS_TAG = 2; + // 'pingHosts' error + static final String PINGHOSTS_ERROR = "Server.pingHosts error"; + /*==============================*/ /* Comparable<Server> interface */ /*==============================*/ @@ -311,6 +305,7 @@ public class Server implements Comparable<Server> { InetAddress address = InetAddress.getByName(ipAddressString); InetSocketAddress socketAddress = new InetSocketAddress(address, port); + possibleError = "HTTP server initialization error"; restServer = HttpServletServerFactoryInstance.getServerFactory().build( "SERVER-POOL", // name useHttps, // https @@ -332,7 +327,9 @@ public class Server implements Comparable<Server> { } // we may not know the port until after the server is started + possibleError = "HTTP server start error"; restServer.start(); + possibleError = null; // determine the address to use if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) { @@ -346,13 +343,10 @@ public class Server implements Comparable<Server> { // start background thread MainLoop.startThread(); - MainLoop.queueWork(new Runnable() { - @Override - public void run() { - // run this in the 'MainLoop' thread - Leader.startup(); - Bucket.startup(); - } + MainLoop.queueWork(() -> { + // run this in the 'MainLoop' thread + Leader.startup(); + Bucket.startup(); }); logger.info("Listening on port {}", port); @@ -491,14 +485,12 @@ public class Server implements Comparable<Server> { int tag; while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) { switch (tag) { - case SOCKET_ADDRESS_TAG: { + case SOCKET_ADDRESS_TAG: socketAddress = readSocketAddress(is); break; - } - case SITE_SOCKET_ADDRESS_TAG: { + case SITE_SOCKET_ADDRESS_TAG: siteSocketAddress = readSocketAddress(is); break; - } default: // ignore tag logger.error("Illegal tag: {}", tag); @@ -513,8 +505,7 @@ public class Server implements Comparable<Server> { * @param is the 'DataInputStream' * @return the 'InetSocketAddress' */ - private static InetSocketAddress readSocketAddress(DataInputStream is) - throws IOException, UnknownHostException { + private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException { byte[] ipAddress = new byte[4]; is.read(ipAddress, 0, 4); @@ -926,47 +917,45 @@ public class Server implements Comparable<Server> { return; } - getThreadPool().execute(new Runnable() { + getThreadPool().execute(() -> { /** * This method is running within the 'MainLoop' thread. */ - @Override - public void run() { - try { - WebTarget webTarget = target.path(path); - if (responseCallback != null) { - // give callback a chance to modify 'WebTarget' - webTarget = responseCallback.webTarget(webTarget); - - // send the response to the callback - Response response; - if (entity == null) { - response = webTarget.request().get(); - } else { - response = webTarget.request().post(entity); - } - responseCallback.response(response); + try { + WebTarget webTarget = target.path(path); + if (responseCallback != null) { + // give callback a chance to modify 'WebTarget' + webTarget = responseCallback.webTarget(webTarget); + + // send the response to the callback + Response response; + if (entity == null) { + response = webTarget.request().get(); } else { - // just do the invoke, and ignore the response - if (entity == null) { - webTarget.request().get(); - } else { - webTarget.request().post(entity); - } + response = webTarget.request().post(entity); } - } catch (Exception e) { - logger.error("Failed to send to {} ({}, {})", - uuid, destSocketAddress, destName); + responseCallback.response(response); + } else { + // just do the invoke, and ignore the response + if (entity == null) { + webTarget.request().get(); + } else { + webTarget.request().post(entity); + } + } + } catch (Exception e) { + logger.error("Failed to send to {} ({}, {})", + uuid, destSocketAddress, destName); + if (responseCallback != null) { responseCallback.exceptionResponse(e); - MainLoop.queueWork(new Runnable() { - @Override - public void run() { - // the DNS cache may have been out-of-date when this server - // was first contacted -- fix the problem, if needed - checkServer(); - } - }); } + MainLoop.queueWork(() -> { + // this runs in the 'MainLoop' thread + + // the DNS cache may have been out-of-date when this server + // was first contacted -- fix the problem, if needed + checkServer(); + }); } }); } @@ -1037,18 +1026,20 @@ public class Server implements Comparable<Server> { * in 'notifyList' (may need to build or rebuild 'notifyList'). */ static void sendOutData() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - // include 'thisServer' in the data -- first, advance the count - if ((thisServer.count += 1) == 0) { + thisServer.count += 1; + if (thisServer.count == 0) { /* - * counter wrapped (0 is a special case); + * counter wrapped (0 is a special case) -- * actually, we could probably leave this out, because it would take * more than a century to wrap if the increment is 1 second */ thisServer.count = 1; } + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + thisServer.lastUpdateTime = System.currentTimeMillis(); thisServer.writeServerData(dos); @@ -1129,11 +1120,11 @@ public class Server implements Comparable<Server> { } } catch (NumberFormatException e) { out.println(host + ": Invalid port value"); - logger.error("Server.pingHosts error", e); + logger.error(PINGHOSTS_ERROR, e); error = true; } catch (UnknownHostException e) { out.println(host + ": Unknown host"); - logger.error("Server.pingHosts error", e); + logger.error(PINGHOSTS_ERROR, e); error = true; } } @@ -1152,58 +1143,58 @@ public class Server implements Comparable<Server> { */ static void pingHosts(final PrintStream out, final Collection<InetSocketAddress> hosts) { - FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() { - @Override - public Integer call() { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - - // add information for this server only - try { - thisServer.writeServerData(dos); - - // create an 'Entity' that can be sent out to all hosts - Entity<String> entity = Entity.entity( - new String(Base64.getEncoder().encode(bos.toByteArray()), - StandardCharsets.UTF_8), - MediaType.APPLICATION_OCTET_STREAM_TYPE); - - // loop through hosts - for (InetSocketAddress host : hosts) { - HttpClient client = null; - - try { - client = buildClient(host.toString(), host, + FutureTask<Integer> ft = new FutureTask<>(() -> { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + // add information for this server only + try { + thisServer.writeServerData(dos); + + // create an 'Entity' that can be sent out to all hosts + Entity<String> entity = Entity.entity( + new String(Base64.getEncoder().encode(bos.toByteArray()), + StandardCharsets.UTF_8), + MediaType.APPLICATION_OCTET_STREAM_TYPE); + + // loop through hosts + for (InetSocketAddress host : hosts) { + HttpClient httpClient = null; + + try { + httpClient = buildClient(host.toString(), host, socketAddressToName(host)); - getTarget(client).path("admin").request().post(entity); - client.shutdown(); - client = null; - } catch (KeyManagementException | NoSuchAlgorithmException e) { - out.println(host + ": Unable to create client connection"); - logger.error("Server.pingHosts error", e); - } catch (NoSuchFieldException | IllegalAccessException e) { - out.println(host + ": Unable to get link to target"); - logger.error("Server.pingHosts error", e); - } catch (Exception e) { - out.println(host + ": " + e); - logger.error("Server.pingHosts error", e); - } - if (client != null) { - client.shutdown(); - } + getTarget(httpClient).path("admin").request().post(entity); + httpClient.shutdown(); + httpClient = null; + } catch (KeyManagementException | NoSuchAlgorithmException e) { + out.println(host + ": Unable to create client connection"); + logger.error(PINGHOSTS_ERROR, e); + } catch (NoSuchFieldException | IllegalAccessException e) { + out.println(host + ": Unable to get link to target"); + logger.error(PINGHOSTS_ERROR, e); + } catch (Exception e) { + out.println(host + ": " + e); + logger.error(PINGHOSTS_ERROR, e); + } + if (httpClient != null) { + httpClient.shutdown(); } - } catch (IOException e) { - out.println("Unable to generate 'ping' data: " + e); - logger.error("Server.pingHosts error", e); } - return 0; + } catch (IOException e) { + out.println("Unable to generate 'ping' data: " + e); + logger.error(PINGHOSTS_ERROR, e); } + return 0; }); MainLoop.queueWork(ft); try { ft.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + logger.error("Server.pingHosts: interrupted waiting for queued work", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException | TimeoutException e) { logger.error("Server.pingHosts: error waiting for queued work", e); } } @@ -1215,16 +1206,17 @@ public class Server implements Comparable<Server> { * @param out the 'PrintStream' to dump the table to */ public static void dumpHosts(final PrintStream out) { - FutureTask<Integer> ft = new FutureTask<Integer>(new Callable<Integer>() { - public Integer call() { - dumpHostsInternal(out); - return 0; - } + FutureTask<Integer> ft = new FutureTask<>(() -> { + dumpHostsInternal(out); + return 0; }); MainLoop.queueWork(ft); try { ft.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + logger.error("Server.dumpHosts: interrupted waiting for queued work", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException | TimeoutException e) { logger.error("Server.dumpHosts: error waiting for queued work", e); } } @@ -1278,12 +1270,6 @@ public class Server implements Comparable<Server> { } else if (localNotifyList.contains(server)) { thisOne = "n"; } - /* - else if (newHosts.contains(server)) - { - thisOne = "N"; - } - */ if (siteData) { String siteIp = ""; |