summaryrefslogtreecommitdiffstats
path: root/authz-cass/src/main/java/com/att/dao/CassAccess.java
blob: 00802c983f763bb80b3b5a5d203e09ded9f54ab0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*******************************************************************************
 * ============LICENSE_START====================================================
 * * org.onap.aaf
 * * ===========================================================================
 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 * * ===========================================================================
 * * Licensed under the Apache License, Version 2.0 (the "License");
 * * you may not use this file except in compliance with the License.
 * * You may obtain a copy of the License at
 * * 
 *  *      http://www.apache.org/licenses/LICENSE-2.0
 * * 
 *  * Unless required by applicable law or agreed to in writing, software
 * * distributed under the License is distributed on an "AS IS" BASIS,
 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * * See the License for the specific language governing permissions and
 * * limitations under the License.
 * * ============LICENSE_END====================================================
 * *
 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
 * *
 ******************************************************************************/
package com.att.dao;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.att.authz.env.AuthzEnv;
import com.att.cadi.routing.GreatCircle;
import com.att.inno.env.APIException;
import com.att.inno.env.Env;
import com.att.inno.env.util.Split;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;

public class CassAccess {
	public static final String KEYSPACE = "authz";
	public static final String CASSANDRA_CLUSTERS = "cassandra.clusters";
	public static final String CASSANDRA_CLUSTERS_PORT = "cassandra.clusters.port";
	public static final String CASSANDRA_CLUSTERS_USER_NAME = "cassandra.clusters.user";
	public static final String CASSANDRA_CLUSTERS_PASSWORD = "cassandra.clusters.password";
	public static final String CASSANDRA_RESET_EXCEPTIONS = "cassandra.reset.exceptions";
	public static final String LATITUDE = "LATITUDE";
	public static final String LONGITUDE = "LONGITUDE";
	private static final List<Resettable> resetExceptions = new ArrayList<Resettable>();
	public static final String ERR_ACCESS_MSG = "Accessing Backend";
	private static Builder cb = null;

	/**
	 * To create DCAwareRoundRobing Policy:
	 * 	 Need Properties
	 * 		LATITUDE (or AFT_LATITUDE)
	 * 		LONGITUDE (or AFT_LONGITUDE)
	 * 		CASSANDRA CLUSTERS with additional information:
	 * 			machine:DC:lat:long,machine:DC:lat:long
	 * @param env
	 * @param prefix
	 * @return
	 * @throws APIException
	 * @throws IOException
	 */

	@SuppressWarnings("deprecation")
	public static synchronized Cluster cluster(Env env, String prefix) throws APIException, IOException {
		if(cb == null) {
			String pre;
			if(prefix==null) {
				pre="";
			} else {
				env.info().log("Cassandra Connection for ",prefix);
				pre = prefix+'.';
			}
			cb = Cluster.builder();
			String str = env.getProperty(pre+CASSANDRA_CLUSTERS_PORT,"9042");
			if(str!=null) {
				env.init().log("Cass Port = ",str );
				cb.withPort(Integer.parseInt(str));
			}
			str = env.getProperty(pre+CASSANDRA_CLUSTERS_USER_NAME,null);
			if(str!=null) {
				env.init().log("Cass User = ",str );
				String epass = env.getProperty(pre + CASSANDRA_CLUSTERS_PASSWORD,null);
				if(epass==null) {
					throw new APIException("No Password configured for " + str);
				}
				//TODO Figure out way to ensure Decryptor setting in AuthzEnv
				if(env instanceof AuthzEnv) {
					cb.withCredentials(str,((AuthzEnv)env).decrypt(epass,true));
				} else {
					cb.withCredentials(str, env.decryptor().decrypt(epass));
				}
			}
	
			str = env.getProperty(pre+CASSANDRA_RESET_EXCEPTIONS,null);
			if(str!=null) {
				env.init().log("Cass ResetExceptions = ",str );
				for(String ex : Split.split(',', str)) {
					resetExceptions.add(new Resettable(env,ex));
				}
			}
	
			str = env.getProperty(LATITUDE,env.getProperty("AFT_LATITUDE",null));
			Double lat = str!=null?Double.parseDouble(str):null;
			str = env.getProperty(LONGITUDE,env.getProperty("AFT_LONGITUDE",null));
			Double lon = str!=null?Double.parseDouble(str):null;
			if(lat == null || lon == null) {
				throw new APIException("LATITUDE(or AFT_LATITUDE) and/or LONGITUDE(or AFT_LATITUDE) are not set");
			}
			
			env.init().printf("Service Latitude,Longitude = %f,%f",lat,lon);
			
			str = env.getProperty(pre+CASSANDRA_CLUSTERS,"localhost");
			env.init().log("Cass Clusters = ",str );
			String[] machs = Split.split(',', str);
			String[] cpoints = new String[machs.length];
			String bestDC = null;
			int numInBestDC = 1;
			double mlat, mlon,temp,distance = -1.0;
			for(int i=0;i<machs.length;++i) {
				String[] minfo = Split.split(':',machs[i]);
				if(minfo.length>0) {
					cpoints[i]=minfo[0];
				}
			
				// Calc closest DC with Great Circle
				if(minfo.length>3) {
					mlat = Double.parseDouble(minfo[2]);
					mlon = Double.parseDouble(minfo[3]);
					if((temp=GreatCircle.calc(lat, lon, mlat, mlon)) > distance) {
						distance = temp;
						if(bestDC!=null && bestDC.equals(minfo[1])) {
							++numInBestDC;
						} else {
							bestDC = minfo[1];
							numInBestDC = 1;
						}
					} else {
						if(bestDC!=null && bestDC.equals(minfo[1])) {
							++numInBestDC;
						}
					}
				}
			}
			
			cb.addContactPoints(cpoints);
			
			if(bestDC!=null) {
				// 8/26/2016 Management has determined that Accuracy is preferred over speed in bad situations
				// Local DC Aware Load Balancing appears to have the highest normal performance, with the best
				// Degraded Accuracy
				cb.withLoadBalancingPolicy(new DCAwareRoundRobinPolicy(
						bestDC, numInBestDC, true /*allow LocalDC to look at other DCs for LOCAL_QUORUM */));
				env.init().printf("Cassandra configured for DCAwareRoundRobinPolicy at %s with emergency remote of up to %d node(s)"
					,bestDC, numInBestDC);
			} else {
				env.init().printf("Cassandra is using Default Policy, which is not DC aware");
			}
		}
		return cb.build();
	}
	
	private static class Resettable {
		private Class<? extends Exception> cls;
		private List<String> messages;
		
		@SuppressWarnings("unchecked")
		public Resettable(Env env, String propData) throws APIException {
			if(propData!=null && propData.length()>1) {
				String[] split = Split.split(':', propData);
				if(split.length>0) {
					try {
						cls = (Class<? extends Exception>)Class.forName(split[0]);
					} catch (ClassNotFoundException e) {
						throw new APIException("Declared Cassandra Reset Exception, " + propData + ", cannot be ClassLoaded");
					}
				}
				if(split.length>1) {
					messages=new ArrayList<String>();
					for(int i=1;i<split.length;++i) {
						String str = split[i];
						int start = str.startsWith("\"")?1:0;
						int end = str.length()-(str.endsWith("\"")?1:0);
						messages.add(split[i].substring(start, end));
					}
				} else {
					messages = null;
				}
			}
		}
		
		public boolean matches(Exception ex) {
			if(ex.getClass().equals(cls)) {
				if(messages!=null) {
					String msg = ex.getMessage();
					for(String m : messages) {
						if(msg.contains(m)) {
							return true;
						}
					}
				}
			}
			return false;
		}
	}
	
	public static final boolean isResetException(Exception e) {
		if(e==null) {
			return true;
		}
		for(Resettable re : resetExceptions) {
			if(re.matches(e)) {
				return true;
			}
		}
		return false;
	}
}