1 |
package tools; |
2 |
|
3 |
import java.io.*; |
4 |
import java.sql.*; |
5 |
import java.util.ArrayList; |
6 |
import java.util.HashMap; |
7 |
import java.util.Collections; |
8 |
|
9 |
public class DataDistribution { |
10 |
|
11 |
final static String ADMINDIR="/project/StruPPi/Cluster/admin"; |
12 |
final static String HOSTSFILE=ADMINDIR+"/hosts_mysql_server.txt"; |
13 |
public final static String MASTER="white"; |
14 |
public final static String KEYMASTERDB="key_master"; |
15 |
|
16 |
final static long TIME_OUT = 3000; |
17 |
|
18 |
public String db; |
19 |
private String user; |
20 |
private String pwd; |
21 |
|
22 |
public String[] nodes; |
23 |
|
24 |
//TODO fields that we might want to implement in future: |
25 |
//public boolean isSplit; //whether data distribution is a split distribution or an all data in all distribution |
26 |
//public String splitTable; |
27 |
//public String splitKey; |
28 |
//public String keyTable; // this is normally dbOnNodes__splitTable (as stored on dbs_keys table in key_master db) |
29 |
//public String dbOnMaster; // use it in checkCountsAllTables in cases master and nodes db have different names |
30 |
//public String dbOnNodes; // use it in checkCountsAllTables in cases master and nodes db have different names |
31 |
|
32 |
|
33 |
public DataDistribution(String db,String user,String pwd) { |
34 |
this.db=db; |
35 |
this.user=user; |
36 |
this.pwd=pwd; |
37 |
// check that mysql server running in node and that database db exists on it |
38 |
nodes = nodesAlive(getMySQLNodes()); |
39 |
//TODO implement method tablesCoincide |
40 |
//nodes = tablesCoincide(nodes); |
41 |
//TODO implement compareNodesList2MasterKeyTable, see half-way implemented code below |
42 |
//compareNodesList2MasterKeyTable(); |
43 |
} |
44 |
|
45 |
private MySQLConnection getConnectionToMaster() { |
46 |
MySQLConnection conn=this.getConnectionToNode(MASTER); |
47 |
return conn; |
48 |
} |
49 |
|
50 |
private MySQLConnection getConnectionToMasterKeyDb() { |
51 |
MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB); |
52 |
return conn; |
53 |
} |
54 |
|
55 |
private MySQLConnection getConnectionToNode(String node) { |
56 |
MySQLConnection conn=new MySQLConnection(node,user,pwd,db); |
57 |
return conn; |
58 |
} |
59 |
|
60 |
public String[] getNodes(){ |
61 |
return nodes; |
62 |
} |
63 |
|
64 |
public String[] nodesAlive(String[] testNodes){ |
65 |
HashMap<String,MySQLConnectionCheck> allchecks = new HashMap<String,MySQLConnectionCheck>(); |
66 |
ArrayList<String> nodesAL = new ArrayList<String>(); |
67 |
for (String node:testNodes){ |
68 |
MySQLConnectionCheck check = new MySQLConnectionCheck(node,user,pwd,db); |
69 |
allchecks.put(node, check); |
70 |
check.start(); |
71 |
} |
72 |
for (String node:testNodes){ |
73 |
try { |
74 |
MySQLConnectionCheck check = allchecks.get(node); |
75 |
check.join(TIME_OUT); |
76 |
if (check.connTestPassed()) { |
77 |
nodesAL.add(check.dbServer); |
78 |
} |
79 |
} catch (InterruptedException e) { |
80 |
e.printStackTrace(); |
81 |
} |
82 |
} |
83 |
Collections.sort(nodesAL); |
84 |
String[] passedNodes=new String[nodesAL.size()]; |
85 |
nodesAL.toArray(passedNodes); |
86 |
|
87 |
return passedNodes; |
88 |
} |
89 |
|
90 |
// public String[] compareNodesList2MasterKeyTable() { |
91 |
// ClusterConnection cconn = new ClusterConnection(db,key,user,pwd); // where do we get key from? |
92 |
// String keyTable=cconn.getTableOnNode(); |
93 |
// String masterKeyTable=cconn.getKeyTable(); |
94 |
// cconn.close(); |
95 |
// // getting distinct client_names from masterKeyTable |
96 |
// ArrayList<String> nodesAL = new ArrayList<String>(); |
97 |
// for (String node:nodes){ |
98 |
// String query="SELECT client_name FROM clients_names AS c WHERE c.client_id IN (SELECT DISTINCT client_id FROM "+masterKeyTable+") ORDER BY client_name;"; |
99 |
// try { |
100 |
// MySQLConnection conn = this.getConnectionToMasterKeyDb(); |
101 |
// Statement S=conn.createStatement(); |
102 |
// ResultSet R=S.executeQuery(query); |
103 |
// while (R.next()){ |
104 |
// nodesAL.add(R.getString(1)); |
105 |
// } |
106 |
// S.close(); |
107 |
// R.close(); |
108 |
// conn.close(); |
109 |
// } |
110 |
// catch(SQLException e){ |
111 |
// e.printStackTrace(); |
112 |
// System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB); |
113 |
// } |
114 |
// } |
115 |
// for (String node:nodes){ |
116 |
// //TODO compare nodesAL and nodes |
117 |
// } |
118 |
// //TODO return a nodes array string with non matching nodes |
119 |
// //return ; |
120 |
// } |
121 |
|
122 |
/** |
123 |
* Gets all potentially working MySQL server nodes that we have by reading the file HOSTSFILE |
124 |
* which contains all of our working MySQL server nodes (checked with script check-nodes.py) |
125 |
* @return String array with node names |
126 |
*/ |
127 |
public static String[] getMySQLNodes() { |
128 |
String[] mysqlnodes=null; |
129 |
try { |
130 |
File inputFile = new File(HOSTSFILE); |
131 |
BufferedReader hostsFile = new BufferedReader(new FileReader(inputFile)); // open BufferedReader to the file |
132 |
String nodesstr=hostsFile.readLine(); |
133 |
mysqlnodes=nodesstr.split(" "); |
134 |
hostsFile.close(); |
135 |
} |
136 |
catch (IOException e){ |
137 |
e.printStackTrace(); |
138 |
System.err.println("Couldn't read from file "+HOSTSFILE); |
139 |
} |
140 |
return mysqlnodes; |
141 |
} |
142 |
|
143 |
public static int getIdFromMaster(MySQLConnection conn, String key, String keyTable, String host) { |
144 |
int id=0; |
145 |
try { |
146 |
String query="CREATE TABLE IF NOT EXISTS "+keyTable+" ("+ |
147 |
key+" int(11) NOT NULL auto_increment, " + |
148 |
"client_id smallint(6) NOT NULL default '0', " + |
149 |
"PRIMARY KEY (`"+key+"`) " + |
150 |
") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;"; |
151 |
conn.executeSql(query); |
152 |
query = "SELECT client_id FROM clients_names WHERE client_name='"+host+"';"; |
153 |
int clientId=conn.getIntFromDb(query); |
154 |
query="INSERT INTO "+keyTable+" (client_id) VALUES ("+clientId+");"; |
155 |
conn.executeSql(query); |
156 |
query = "SELECT LAST_INSERT_ID() FROM "+keyTable+" LIMIT 1;"; |
157 |
id=conn.getIntFromDb(query); |
158 |
} |
159 |
catch (SQLException E) { |
160 |
System.err.println("SQLException: " + E.getMessage()); |
161 |
System.err.println("SQLState: " + E.getSQLState()); |
162 |
System.err.println("Couldn't insert record in master table "+keyTable+" in host "+conn.getHost()); |
163 |
} |
164 |
return id; |
165 |
} |
166 |
|
167 |
|
168 |
public boolean checkCountsAllTables (){ |
169 |
boolean checkResult=true; |
170 |
MySQLConnection mconn = this.getConnectionToMaster(); |
171 |
String[] tables = mconn.getTables4Db(); |
172 |
mconn.close(); |
173 |
// getting hashmap of all counts from all tables from nodes |
174 |
HashMap<String,HashMap<String,Integer>> countsNodes=new HashMap<String,HashMap<String,Integer>>(); |
175 |
for (String node:nodes){ |
176 |
HashMap<String,Integer> tableCounts = new HashMap<String,Integer>(); |
177 |
for (String tbl:tables){ |
178 |
String query="SELECT count(*) FROM "+tbl+";"; |
179 |
try { |
180 |
MySQLConnection conn = this.getConnectionToNode(node); |
181 |
Statement S=conn.createStatement(); |
182 |
ResultSet R=S.executeQuery(query); |
183 |
if (R.next()){ |
184 |
tableCounts.put(tbl,R.getInt(1)); |
185 |
} |
186 |
S.close(); |
187 |
R.close(); |
188 |
conn.close(); |
189 |
} |
190 |
catch(SQLException e){ |
191 |
e.printStackTrace(); |
192 |
System.err.println("Couldn't execute query in host="+node+", database="+db); |
193 |
System.exit(1); |
194 |
} |
195 |
} |
196 |
countsNodes.put(node,tableCounts); |
197 |
} |
198 |
// getting hashmap of all counts of all tables from master |
199 |
HashMap<String,Integer> countsMaster= new HashMap<String,Integer>(); |
200 |
for (String tbl:tables){ |
201 |
String query="SELECT count(*) FROM "+tbl+";"; |
202 |
try { |
203 |
MySQLConnection conn = this.getConnectionToMaster(); |
204 |
Statement S=conn.createStatement(); |
205 |
ResultSet R=S.executeQuery(query); |
206 |
if (R.next()){ |
207 |
countsMaster.put(tbl,R.getInt(1)); |
208 |
} |
209 |
S.close(); |
210 |
R.close(); |
211 |
conn.close(); |
212 |
} |
213 |
catch(SQLException e){ |
214 |
e.printStackTrace(); |
215 |
System.err.println("Couldn't execute query in host="+MASTER+", database="+db); |
216 |
System.exit(1); |
217 |
} |
218 |
} |
219 |
// comparing the nodes counts with the master counts |
220 |
for (String tbl:countsMaster.keySet()){ |
221 |
int masterCount=countsMaster.get(tbl); |
222 |
for (String node:countsNodes.keySet()){ |
223 |
int thisNodeCount=countsNodes.get(node).get(tbl); |
224 |
if (masterCount!=thisNodeCount) { |
225 |
System.out.println("Count difers for table "+tbl+" in database "+db+" of node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount); |
226 |
checkResult=false; // if one count difers then the check fails, we return false for checkResult. If no difers at all we return true |
227 |
} |
228 |
else { |
229 |
System.out.println("Count check passed for node "+node+", table "+tbl); |
230 |
} |
231 |
|
232 |
} |
233 |
} |
234 |
return checkResult; |
235 |
} // end checkCounts |
236 |
|
237 |
/** |
238 |
* To check the key counts in master and nodes for a certain key. |
239 |
* @param key the name of the key |
240 |
* @return boolean: true if check passed, false if not passed |
241 |
*/ |
242 |
public boolean checkKeyCounts(String key) { |
243 |
boolean checkResult=true; |
244 |
ClusterConnection cconn = new ClusterConnection(db,key,user,pwd); |
245 |
String keyTable=cconn.getTableOnNode(); |
246 |
String masterKeyTable=cconn.getKeyTable(); |
247 |
cconn.close(); |
248 |
// getting hashmap of counts of keys from nodes |
249 |
HashMap<String,int[]> countsNodes=new HashMap<String,int[]>(); |
250 |
String query="SELECT count("+key+"),count(DISTINCT "+key+") FROM "+keyTable+";"; |
251 |
for (String node:nodes){ |
252 |
try { |
253 |
MySQLConnection conn = this.getConnectionToNode(node); |
254 |
Statement S=conn.createStatement(); |
255 |
ResultSet R=S.executeQuery(query); |
256 |
int[] thisNodeKeyCount=new int[2]; |
257 |
if (R.next()){ |
258 |
thisNodeKeyCount[0]=R.getInt(1); |
259 |
thisNodeKeyCount[1]=R.getInt(2); |
260 |
} |
261 |
countsNodes.put(node,thisNodeKeyCount); |
262 |
S.close(); |
263 |
R.close(); |
264 |
conn.close(); |
265 |
} |
266 |
catch(SQLException e){ |
267 |
e.printStackTrace(); |
268 |
System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db); |
269 |
System.exit(1); |
270 |
} |
271 |
} |
272 |
// getting hashmap of counts of keys from master |
273 |
HashMap<String,Integer> countsMaster= new HashMap<String,Integer>(); |
274 |
for (String node:nodes){ |
275 |
String queryM="SELECT count(*) FROM "+masterKeyTable+" AS a,clients_names as c WHERE a.client_id=c.client_id AND c.client_name='"+node+"';"; |
276 |
try { |
277 |
MySQLConnection conn = this.getConnectionToMasterKeyDb(); |
278 |
Statement S=conn.createStatement(); |
279 |
ResultSet R=S.executeQuery(queryM); |
280 |
if (R.next()){ |
281 |
countsMaster.put(node,R.getInt(1)); |
282 |
} |
283 |
S.close(); |
284 |
R.close(); |
285 |
conn.close(); |
286 |
} |
287 |
catch(SQLException e){ |
288 |
e.printStackTrace(); |
289 |
System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB); |
290 |
System.exit(1); |
291 |
} |
292 |
} |
293 |
//compare the two hashmaps of key counts |
294 |
for (String node:countsMaster.keySet()){ |
295 |
int masterCount=countsMaster.get(node); |
296 |
int[] thisNodeCount=countsNodes.get(node); |
297 |
if (thisNodeCount[0]!=thisNodeCount[1]) { |
298 |
System.out.println("Key count and distinct key count do not coincide for key "+key+" in node "+node+". Key count="+thisNodeCount[0]+", distinct key count="+thisNodeCount[1]); |
299 |
checkResult=false; |
300 |
} |
301 |
else if (thisNodeCount[0]!=masterCount) { |
302 |
System.out.println("Key counts do not coincide for key "+key+" in master and node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount[0]); |
303 |
System.out.print("Differing "+key+"'s are: "); |
304 |
String[] diffKeys = getDifferingKeys(key,node); |
305 |
for (String k:diffKeys){ |
306 |
System.out.print(k+" "); |
307 |
} |
308 |
System.out.println(); |
309 |
checkResult=false; |
310 |
} |
311 |
else { |
312 |
System.out.println("Key counts check passed for key "+key+" in node "+node+". The count is: "+masterCount); |
313 |
} |
314 |
} |
315 |
return checkResult; |
316 |
} |
317 |
|
318 |
/** |
319 |
* Method to get the differing keys for a certain key and node. Used by checkKeycounts method. Shouldn't be used out of this class. |
320 |
* @param key the key name |
321 |
* @param node the host name of the cluster node |
322 |
* @return array of ints with all differing keys for this key and node |
323 |
*/ |
324 |
public String[] getDifferingKeys (String key,String node) { |
325 |
ArrayList<String> diffKeys = new ArrayList<String>(); |
326 |
String[] diffKeysAr; |
327 |
ClusterConnection cconn = new ClusterConnection(db,key,user,pwd); |
328 |
String keyTable=cconn.getTableOnNode(); |
329 |
String masterKeyTable=cconn.getKeyTable(); |
330 |
cconn.close(); |
331 |
String query="SELECT DISTINCT "+key+" FROM "+keyTable+" ORDER BY "+key+";"; |
332 |
MySQLConnection mconn=null; |
333 |
try { |
334 |
MySQLConnection nconn = this.getConnectionToNode(node); |
335 |
String colType = nconn.getColumnType(keyTable,key); |
336 |
mconn = this.getConnectionToMasterKeyDb(); |
337 |
Statement S=nconn.createStatement(); |
338 |
ResultSet R=S.executeQuery(query); |
339 |
mconn.executeSql("CREATE TEMPORARY TABLE tmp_keys ("+key+" "+colType+" default NULL) ENGINE=MEMORY;"); |
340 |
String thisKey=null; |
341 |
while (R.next()){ |
342 |
thisKey=R.getString(1); |
343 |
query="INSERT INTO tmp_keys VALUES ('"+thisKey+"');"; |
344 |
mconn.executeSql(query); |
345 |
} |
346 |
S.close(); |
347 |
R.close(); |
348 |
nconn.close(); |
349 |
} |
350 |
catch(SQLException e){ |
351 |
e.printStackTrace(); |
352 |
System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db); |
353 |
System.exit(1); |
354 |
} |
355 |
try { |
356 |
query="SELECT c.k " + |
357 |
"FROM " + |
358 |
"(SELECT u.id AS k,count(u.id) AS cnt " + |
359 |
"FROM " + |
360 |
"(SELECT "+key+" AS id FROM tmp_keys UNION ALL SELECT kt."+key+" AS id FROM "+masterKeyTable+" AS kt LEFT JOIN clients_names AS cn ON kt.client_id=cn.client_id WHERE cn.client_name='"+node+"') AS u GROUP BY u.id) " + |
361 |
"AS c " + |
362 |
"WHERE c.cnt=1;"; |
363 |
Statement S=mconn.createStatement(); |
364 |
ResultSet R=S.executeQuery(query); |
365 |
while (R.next()){ |
366 |
diffKeys.add(R.getString(1)); |
367 |
} |
368 |
S.close(); |
369 |
R.close(); |
370 |
} |
371 |
catch(SQLException e){ |
372 |
e.printStackTrace(); |
373 |
System.err.println("Couldn't execute query: "+query+"in host="+MASTER+", database="+KEYMASTERDB); |
374 |
System.exit(1); |
375 |
} |
376 |
diffKeysAr= new String[diffKeys.size()]; |
377 |
for (int i=0;i<diffKeys.size();i++) { |
378 |
diffKeysAr[i]=diffKeys.get(i); |
379 |
} |
380 |
return diffKeysAr; |
381 |
} |
382 |
|
383 |
/** |
384 |
* For a certain key (numeric or text based) and table finds out how the table is splitted in nodes and returns the "data distribution" |
385 |
* Take care when using this method as an argument of insertIdsToKeyMaster: if table is not in chunks (but rather all data in all) |
386 |
* then ids can't be inserted in key_master as there will be duplication, i.e. for key_id=1 as data is in all nodes there |
387 |
* will be 40 copies of it and thus 40 equal ids will try to be inserted in key_master, which a) makes no sense and |
388 |
* b) mysql won't do it anyway |
389 |
* @param key text-based key (char/varchar) |
390 |
* @param table |
391 |
* @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node |
392 |
*/ |
393 |
public HashMap<String,Object[]> getIdSetsFromNodes(String key, String table){ |
394 |
HashMap<String,Object[]> idSets =new HashMap<String,Object[]>(); |
395 |
for (String node:nodes){ |
396 |
MySQLConnection conn = this.getConnectionToNode(node); |
397 |
idSets.put(node,conn.getAllIds4KeyAndTable(key,table)); |
398 |
conn.close(); |
399 |
} |
400 |
return idSets; |
401 |
} |
402 |
|
403 |
} |