summaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java222
1 files changed, 104 insertions, 118 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
index 52e3d2dc..8ee0f2d2 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
@@ -62,7 +62,6 @@ import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Date;
@@ -81,15 +80,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.servlet.ServletException;
-import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.eclipse.jetty.server.ServerConnector;
import org.glassfish.jersey.client.ClientProperties;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.HttpClient;
@@ -97,7 +93,6 @@ import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
-import org.onap.policy.drools.system.PolicyEngineConstants;
import org.onap.policy.drools.utils.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,10 +121,6 @@ public class Server implements Comparable<Server> {
// the current REST server
private static HttpServletServer restServer;
- // incoming packets from HTTP
- private static LinkedTransferQueue<byte[]> incomingPackets =
- new LinkedTransferQueue<>();
-
/*==================================================*/
/* Some properties extracted at initialization time */
/*==================================================*/
@@ -212,6 +203,9 @@ public class Server implements Comparable<Server> {
static final int SOCKET_ADDRESS_TAG = 1;
static final int SITE_SOCKET_ADDRESS_TAG = 2;
+ // 'pingHosts' error
+ static final String PINGHOSTS_ERROR = "Server.pingHosts error";
+
/*==============================*/
/* Comparable<Server> interface */
/*==============================*/
@@ -311,6 +305,7 @@ public class Server implements Comparable<Server> {
InetAddress address = InetAddress.getByName(ipAddressString);
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+ possibleError = "HTTP server initialization error";
restServer = HttpServletServerFactoryInstance.getServerFactory().build(
"SERVER-POOL", // name
useHttps, // https
@@ -332,7 +327,9 @@ public class Server implements Comparable<Server> {
}
// we may not know the port until after the server is started
+ possibleError = "HTTP server start error";
restServer.start();
+ possibleError = null;
// determine the address to use
if (DEFAULT_SERVER_IP_ADDRESS.contentEquals(address.getHostAddress())) {
@@ -346,13 +343,10 @@ public class Server implements Comparable<Server> {
// start background thread
MainLoop.startThread();
- MainLoop.queueWork(new Runnable() {
- @Override
- public void run() {
- // run this in the 'MainLoop' thread
- Leader.startup();
- Bucket.startup();
- }
+ MainLoop.queueWork(() -> {
+ // run this in the 'MainLoop' thread
+ Leader.startup();
+ Bucket.startup();
});
logger.info("Listening on port {}", port);
@@ -491,14 +485,12 @@ public class Server implements Comparable<Server> {
int tag;
while ((tag = is.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
switch (tag) {
- case SOCKET_ADDRESS_TAG: {
+ case SOCKET_ADDRESS_TAG:
socketAddress = readSocketAddress(is);
break;
- }
- case SITE_SOCKET_ADDRESS_TAG: {
+ case SITE_SOCKET_ADDRESS_TAG:
siteSocketAddress = readSocketAddress(is);
break;
- }
default:
// ignore tag
logger.error("Illegal tag: {}", tag);
@@ -513,8 +505,7 @@ public class Server implements Comparable<Server> {
* @param is the 'DataInputStream'
* @return the 'InetSocketAddress'
*/
- private static InetSocketAddress readSocketAddress(DataInputStream is)
- throws IOException, UnknownHostException {
+ private static InetSocketAddress readSocketAddress(DataInputStream is) throws IOException {
byte[] ipAddress = new byte[4];
is.read(ipAddress, 0, 4);
@@ -926,47 +917,45 @@ public class Server implements Comparable<Server> {
return;
}
- getThreadPool().execute(new Runnable() {
+ getThreadPool().execute(() -> {
/**
* This method is running within the 'MainLoop' thread.
*/
- @Override
- public void run() {
- try {
- WebTarget webTarget = target.path(path);
- if (responseCallback != null) {
- // give callback a chance to modify 'WebTarget'
- webTarget = responseCallback.webTarget(webTarget);
-
- // send the response to the callback
- Response response;
- if (entity == null) {
- response = webTarget.request().get();
- } else {
- response = webTarget.request().post(entity);
- }
- responseCallback.response(response);
+ try {
+ WebTarget webTarget = target.path(path);
+ if (responseCallback != null) {
+ // give callback a chance to modify 'WebTarget'
+ webTarget = responseCallback.webTarget(webTarget);
+
+ // send the response to the callback
+ Response response;
+ if (entity == null) {
+ response = webTarget.request().get();
} else {
- // just do the invoke, and ignore the response
- if (entity == null) {
- webTarget.request().get();
- } else {
- webTarget.request().post(entity);
- }
+ response = webTarget.request().post(entity);
}
- } catch (Exception e) {
- logger.error("Failed to send to {} ({}, {})",
- uuid, destSocketAddress, destName);
+ responseCallback.response(response);
+ } else {
+ // just do the invoke, and ignore the response
+ if (entity == null) {
+ webTarget.request().get();
+ } else {
+ webTarget.request().post(entity);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to send to {} ({}, {})",
+ uuid, destSocketAddress, destName);
+ if (responseCallback != null) {
responseCallback.exceptionResponse(e);
- MainLoop.queueWork(new Runnable() {
- @Override
- public void run() {
- // the DNS cache may have been out-of-date when this server
- // was first contacted -- fix the problem, if needed
- checkServer();
- }
- });
}
+ MainLoop.queueWork(() -> {
+ // this runs in the 'MainLoop' thread
+
+ // the DNS cache may have been out-of-date when this server
+ // was first contacted -- fix the problem, if needed
+ checkServer();
+ });
}
});
}
@@ -1037,18 +1026,20 @@ public class Server implements Comparable<Server> {
* in 'notifyList' (may need to build or rebuild 'notifyList').
*/
static void sendOutData() throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
// include 'thisServer' in the data -- first, advance the count
- if ((thisServer.count += 1) == 0) {
+ thisServer.count += 1;
+ if (thisServer.count == 0) {
/*
- * counter wrapped (0 is a special case);
+ * counter wrapped (0 is a special case) --
* actually, we could probably leave this out, because it would take
* more than a century to wrap if the increment is 1 second
*/
thisServer.count = 1;
}
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
thisServer.lastUpdateTime = System.currentTimeMillis();
thisServer.writeServerData(dos);
@@ -1129,11 +1120,11 @@ public class Server implements Comparable<Server> {
}
} catch (NumberFormatException e) {
out.println(host + ": Invalid port value");
- logger.error("Server.pingHosts error", e);
+ logger.error(PINGHOSTS_ERROR, e);
error = true;
} catch (UnknownHostException e) {
out.println(host + ": Unknown host");
- logger.error("Server.pingHosts error", e);
+ logger.error(PINGHOSTS_ERROR, e);
error = true;
}
}
@@ -1152,58 +1143,58 @@ public class Server implements Comparable<Server> {
*/
static void pingHosts(final PrintStream out,
final Collection<InetSocketAddress> hosts) {
- FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() {
- @Override
- public Integer call() {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // add information for this server only
- try {
- thisServer.writeServerData(dos);
-
- // create an 'Entity' that can be sent out to all hosts
- Entity<String> entity = Entity.entity(
- new String(Base64.getEncoder().encode(bos.toByteArray()),
- StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
-
- // loop through hosts
- for (InetSocketAddress host : hosts) {
- HttpClient client = null;
-
- try {
- client = buildClient(host.toString(), host,
+ FutureTask<Integer> ft = new FutureTask<>(() -> {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // add information for this server only
+ try {
+ thisServer.writeServerData(dos);
+
+ // create an 'Entity' that can be sent out to all hosts
+ Entity<String> entity = Entity.entity(
+ new String(Base64.getEncoder().encode(bos.toByteArray()),
+ StandardCharsets.UTF_8),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+
+ // loop through hosts
+ for (InetSocketAddress host : hosts) {
+ HttpClient httpClient = null;
+
+ try {
+ httpClient = buildClient(host.toString(), host,
socketAddressToName(host));
- getTarget(client).path("admin").request().post(entity);
- client.shutdown();
- client = null;
- } catch (KeyManagementException | NoSuchAlgorithmException e) {
- out.println(host + ": Unable to create client connection");
- logger.error("Server.pingHosts error", e);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- out.println(host + ": Unable to get link to target");
- logger.error("Server.pingHosts error", e);
- } catch (Exception e) {
- out.println(host + ": " + e);
- logger.error("Server.pingHosts error", e);
- }
- if (client != null) {
- client.shutdown();
- }
+ getTarget(httpClient).path("admin").request().post(entity);
+ httpClient.shutdown();
+ httpClient = null;
+ } catch (KeyManagementException | NoSuchAlgorithmException e) {
+ out.println(host + ": Unable to create client connection");
+ logger.error(PINGHOSTS_ERROR, e);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ out.println(host + ": Unable to get link to target");
+ logger.error(PINGHOSTS_ERROR, e);
+ } catch (Exception e) {
+ out.println(host + ": " + e);
+ logger.error(PINGHOSTS_ERROR, e);
+ }
+ if (httpClient != null) {
+ httpClient.shutdown();
}
- } catch (IOException e) {
- out.println("Unable to generate 'ping' data: " + e);
- logger.error("Server.pingHosts error", e);
}
- return 0;
+ } catch (IOException e) {
+ out.println("Unable to generate 'ping' data: " + e);
+ logger.error(PINGHOSTS_ERROR, e);
}
+ return 0;
});
MainLoop.queueWork(ft);
try {
ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ logger.error("Server.pingHosts: interrupted waiting for queued work", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException e) {
logger.error("Server.pingHosts: error waiting for queued work", e);
}
}
@@ -1215,16 +1206,17 @@ public class Server implements Comparable<Server> {
* @param out the 'PrintStream' to dump the table to
*/
public static void dumpHosts(final PrintStream out) {
- FutureTask<Integer> ft = new FutureTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- dumpHostsInternal(out);
- return 0;
- }
+ FutureTask<Integer> ft = new FutureTask<>(() -> {
+ dumpHostsInternal(out);
+ return 0;
});
MainLoop.queueWork(ft);
try {
ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ logger.error("Server.dumpHosts: interrupted waiting for queued work", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException e) {
logger.error("Server.dumpHosts: error waiting for queued work", e);
}
}
@@ -1278,12 +1270,6 @@ public class Server implements Comparable<Server> {
} else if (localNotifyList.contains(server)) {
thisOne = "n";
}
- /*
- else if (newHosts.contains(server))
- {
- thisOne = "N";
- }
- */
if (siteData) {
String siteIp = "";