summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java')
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java210
1 files changed, 210 insertions, 0 deletions
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java
new file mode 100644
index 0000000..1c60cfd
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java
@@ -0,0 +1,210 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.thrift.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A factory compatible with Apache commons-pool for Cassandra Thrift API
+ * connections.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+public class CTConnectionFactory implements KeyedPoolableObjectFactory<String, CTConnection> {
+
+ private static final Logger log = LoggerFactory.getLogger(CTConnectionFactory.class);
+ private static final long SCHEMA_WAIT_MAX = 5000L;
+ private static final long SCHEMA_WAIT_INCREMENT = 25L;
+
+ private final AtomicReference<Config> cfgRef;
+
+ private CTConnectionFactory(Config config) {
+ this.cfgRef = new AtomicReference<Config>(config);
+ }
+
+ @Override
+ public void activateObject(String key, CTConnection c) throws Exception {
+ // Do nothing, as in passivateObject
+ }
+
+ @Override
+ public void destroyObject(String key, CTConnection c) throws Exception {
+ TTransport t = c.getTransport();
+
+ if (t.isOpen()) {
+ t.close();
+ log.trace("Closed transport {}", t);
+ } else {
+ log.trace("Not closing transport {} (already closed)", t);
+ }
+ }
+
+ @Override
+ public CTConnection makeObject(String key) throws Exception {
+ CTConnection conn = makeRawConnection();
+ Cassandra.Client client = conn.getClient();
+ client.set_keyspace(key);
+
+ return conn;
+ }
+
+ /**
+ * Create a Cassandra-Thrift connection, but do not attempt to
+ * set a keyspace on the connection.
+ *
+ * @return A CTConnection ready to talk to a Cassandra cluster
+ * @throws TTransportException on any Thrift transport failure
+ */
+ public CTConnection makeRawConnection() throws TTransportException {
+ final Config cfg = cfgRef.get();
+
+ String hostname = cfg.getRandomHost();
+
+ log.debug("Creating TSocket({}, {}, {}, {}, {})", hostname, cfg.port, cfg.username, cfg.password, cfg.timeoutMS);
+
+ TSocket socket;
+ if (null != cfg.sslTruststoreLocation && !cfg.sslTruststoreLocation.isEmpty()) {
+ TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters() {{
+ setTrustStore(cfg.sslTruststoreLocation, cfg.sslTruststorePassword);
+ }};
+ socket = TSSLTransportFactory.getClientSocket(hostname, cfg.port, cfg.timeoutMS, params);
+ } else {
+ socket = new TSocket(hostname, cfg.port, cfg.timeoutMS);
+ }
+
+ TTransport transport = new TFramedTransport(socket, cfg.frameSize);
+ log.trace("Created transport {}", transport);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ Cassandra.Client client = new Cassandra.Client(protocol);
+ if (!transport.isOpen()) {
+ transport.open();
+ }
+
+ if (cfg.username != null) {
+ Map<String, String> credentials = new HashMap<String, String>() {{
+ put(IAuthenticator.USERNAME_KEY, cfg.username);
+ put(IAuthenticator.PASSWORD_KEY, cfg.password);
+ }};
+
+ try {
+ client.login(new AuthenticationRequest(credentials));
+ } catch (Exception e) { // TTransportException will propagate authentication/authorization failure
+ throw new TTransportException(e);
+ }
+ }
+ return new CTConnection(transport, client, cfg);
+ }
+
+ @Override
+ public void passivateObject(String key, CTConnection o) throws Exception {
+ // Do nothing, as in activateObject
+ }
+
+ @Override
+ public boolean validateObject(String key, CTConnection c) {
+ Config curCfg = cfgRef.get();
+
+ boolean isSameConfig = c.getConfig().equals(curCfg);
+ if (log.isDebugEnabled()) {
+ if (isSameConfig) {
+ log.trace("Validated {} by configuration {}", c, curCfg);
+ } else {
+ log.trace("Rejected {}; current config is {}; rejected connection config is {}",
+ c, curCfg, c.getConfig());
+ }
+ }
+
+ return isSameConfig && c.isOpen();
+ }
+
+ public static class Config {
+
+ private final String[] hostnames;
+ private final int port;
+ private final String username;
+ private final String password;
+ private final Random random;
+
+ private int timeoutMS;
+ private int frameSize;
+
+ private String sslTruststoreLocation;
+ private String sslTruststorePassword;
+
+ private boolean isBuilt;
+
+ public Config(String[] hostnames, int port, String username, String password) {
+ this.hostnames = hostnames;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.random = new Random();
+ }
+
+ // TODO: we don't really need getters/setters here as all of the fields are final and immutable
+
+ public String getHostname() {
+ return hostnames[0];
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getRandomHost() {
+ return hostnames.length == 1 ? hostnames[0] : hostnames[random.nextInt(hostnames.length)];
+ }
+
+ public Config setTimeoutMS(int timeoutMS) {
+ checkIfAlreadyBuilt();
+ this.timeoutMS = timeoutMS;
+ return this;
+ }
+
+ public Config setFrameSize(int frameSize) {
+ checkIfAlreadyBuilt();
+ this.frameSize = frameSize;
+ return this;
+ }
+
+ public Config setSSLTruststoreLocation(String location) {
+ checkIfAlreadyBuilt();
+ this.sslTruststoreLocation = location;
+ return this;
+ }
+
+ public Config setSSLTruststorePassword(String password) {
+ checkIfAlreadyBuilt();
+ this.sslTruststorePassword = password;
+ return this;
+ }
+
+ public CTConnectionFactory build() {
+ isBuilt = true;
+ return new CTConnectionFactory(this);
+ }
+
+
+ public void checkIfAlreadyBuilt() {
+ if (isBuilt)
+ throw new IllegalStateException("Can't accept modifications when used with built factory.");
+ }
+
+ @Override
+ public String toString() {
+ return "Config[hostnames=" + StringUtils.join(hostnames, ',') + ", port=" + port
+ + ", timeoutMS=" + timeoutMS + ", frameSize=" + frameSize
+ + "]";
+ }
+ }
+
+}
+