1 |
duarte |
65 |
package tools; |
2 |
|
|
|
3 |
|
|
import java.io.*; |
4 |
|
|
import java.sql.*; |
5 |
|
|
import java.util.ArrayList; |
6 |
|
|
import java.util.HashMap; |
7 |
|
|
|
8 |
|
|
public class DataDistribution { |
9 |
|
|
|
10 |
|
|
final static String GLOBALDIR="/project/snow/global/tmp"; |
11 |
|
|
final static String ADMINDIR="/project/StruPPi/Cluster/admin"; |
12 |
duarte |
66 |
public final static String MASTER="white"; |
13 |
duarte |
65 |
final static String HFILE=ADMINDIR+"/hosts_ss.txt"; |
14 |
|
|
final static String KEYMASTERDB="key_master"; |
15 |
|
|
|
16 |
|
|
boolean debug=false; // if set to true only mysql commands written, no actual dump or load, also dump directory not removed. Use setDebug method to change it |
17 |
|
|
String dumpdir; |
18 |
|
|
private String db; |
19 |
|
|
private String user; |
20 |
|
|
private String pwd; |
21 |
|
|
|
22 |
|
|
|
23 |
|
|
public DataDistribution(String db,String user,String pwd) { |
24 |
|
|
this.dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis(); |
25 |
|
|
this.db=db; |
26 |
|
|
this.user=user; |
27 |
|
|
this.pwd=pwd; |
28 |
|
|
} |
29 |
|
|
|
30 |
|
|
private MySQLConnection getConnectionToMaster() { |
31 |
|
|
MySQLConnection conn=this.getConnectionToNode(MASTER); |
32 |
|
|
return conn; |
33 |
|
|
} |
34 |
|
|
|
35 |
|
|
private MySQLConnection getConnectionToMasterKeyDb() { |
36 |
|
|
MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB); |
37 |
|
|
return conn; |
38 |
|
|
} |
39 |
|
|
|
40 |
|
|
private MySQLConnection getConnectionToNode(String node) { |
41 |
|
|
MySQLConnection conn=new MySQLConnection(node,user,pwd,db); |
42 |
|
|
return conn; |
43 |
|
|
} |
44 |
|
|
|
45 |
|
|
private MySQLConnection getConnectionToNode(String node, String dbName){ |
46 |
|
|
MySQLConnection conn=new MySQLConnection(node,user,pwd,dbName); |
47 |
|
|
return conn; |
48 |
|
|
} |
49 |
|
|
|
50 |
|
|
public void setDebug(boolean debug) { |
51 |
|
|
this.debug=debug; |
52 |
|
|
} |
53 |
|
|
|
54 |
|
|
public static String[] getNodes() { |
55 |
|
|
String[] nodes=null; |
56 |
|
|
try { |
57 |
|
|
File inputFile = new File(HFILE); |
58 |
|
|
BufferedReader hostsFile = new BufferedReader(new FileReader(inputFile)); // open BufferedReader to the file |
59 |
|
|
String nodesstr=hostsFile.readLine(); |
60 |
|
|
nodes=nodesstr.split(" "); |
61 |
|
|
hostsFile.close(); |
62 |
|
|
} |
63 |
|
|
catch (IOException e){ |
64 |
|
|
e.printStackTrace(); |
65 |
|
|
System.err.println("Couldn't read from file "+HFILE); |
66 |
|
|
} |
67 |
|
|
return nodes; |
68 |
|
|
} |
69 |
|
|
|
70 |
|
|
public String[] getTables4Db(){ |
71 |
|
|
String[] tables=null; |
72 |
|
|
ArrayList<String> tablesAL=new ArrayList<String>(); |
73 |
|
|
String query="SELECT table_name FROM information_schema.tables WHERE table_schema='"+db+"' ORDER BY table_name DESC;"; |
74 |
|
|
try { |
75 |
|
|
MySQLConnection conn = this.getConnectionToMaster(); |
76 |
|
|
Statement S=conn.createStatement(); |
77 |
|
|
ResultSet R=S.executeQuery(query); |
78 |
|
|
while (R.next()){ |
79 |
|
|
tablesAL.add(R.getString(1)); |
80 |
|
|
} |
81 |
|
|
S.close(); |
82 |
|
|
R.close(); |
83 |
|
|
conn.close(); |
84 |
|
|
tables=new String[tablesAL.size()]; |
85 |
|
|
for (int i=0;i<tablesAL.size();i++) { |
86 |
|
|
tables[i]=tablesAL.get(i); |
87 |
|
|
} |
88 |
|
|
} |
89 |
|
|
catch(SQLException e){ |
90 |
|
|
e.printStackTrace(); |
91 |
|
|
System.err.println("Couldn't get table names from "+MASTER+" for db="+db); |
92 |
|
|
System.exit(1); |
93 |
|
|
} |
94 |
|
|
return tables; |
95 |
|
|
} |
96 |
|
|
|
97 |
|
|
public void initializeDirs() { |
98 |
|
|
if (!((new File(dumpdir)).mkdir())) { |
99 |
|
|
System.err.println("Couldn't create directory "+dumpdir); |
100 |
|
|
System.exit(1); |
101 |
|
|
} |
102 |
|
|
SystemCmd.exec("chmod go+rw "+dumpdir); |
103 |
|
|
} |
104 |
|
|
|
105 |
|
|
public void finalizeDirs() { |
106 |
|
|
if (debug) { |
107 |
|
|
System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually"); |
108 |
|
|
} else { |
109 |
|
|
System.out.println("Removing temporary directory "+dumpdir); |
110 |
|
|
//TODO must capture exit state and print to error if problems deleting dir |
111 |
|
|
SystemCmd.exec("rm -rf "+dumpdir); |
112 |
|
|
} |
113 |
|
|
} |
114 |
|
|
|
115 |
|
|
public void dumpData(String[] srchosts, String[] tables) { |
116 |
|
|
// initialising temporary dump directory |
117 |
|
|
initializeDirs(); |
118 |
|
|
for (String node: srchosts) { |
119 |
|
|
dumpData(node,tables); |
120 |
|
|
} |
121 |
|
|
System.out.println ("Dump finished."); |
122 |
|
|
} |
123 |
|
|
|
124 |
|
|
private void dumpData(String srchost, String[] tables) { |
125 |
|
|
//String quickpar="--quick"; // not to buffer to memory, needed for big tables |
126 |
|
|
if (!((new File(dumpdir+"/"+srchost+"/"+db)).mkdirs())) { |
127 |
|
|
System.err.println("Couldn't create directory "+dumpdir+"/"+srchost+"/"+db); |
128 |
|
|
System.exit(1); |
129 |
|
|
} |
130 |
|
|
SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+srchost); |
131 |
|
|
for ( String tbl: tables) { |
132 |
|
|
String outfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt"; |
133 |
|
|
String wherestr=""; |
134 |
|
|
String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';"; |
135 |
|
|
//String dumpstr="$MYSQLDIR/bin/mysql $srcconnpar $quickpar -e \"$sqldumpstr\" "; |
136 |
|
|
if (debug) {System.out.println ("HOST="+srchost+", sqldumpstr="+sqldumpstr);} |
137 |
|
|
else { |
138 |
|
|
try { |
139 |
|
|
MySQLConnection conn = this.getConnectionToNode(srchost); |
140 |
|
|
Statement S=conn.createStatement(); |
141 |
|
|
S.executeQuery(sqldumpstr); |
142 |
|
|
System.out.println ("Dumped from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile); |
143 |
|
|
S.close(); |
144 |
|
|
conn.close(); |
145 |
|
|
} |
146 |
|
|
catch(SQLException e){ |
147 |
|
|
e.printStackTrace(); |
148 |
|
|
System.err.println("Couldn't dump from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile); |
149 |
|
|
System.exit(1); |
150 |
|
|
} |
151 |
|
|
} //end else if debug |
152 |
|
|
} // end foreach tbl |
153 |
|
|
} |
154 |
|
|
|
155 |
|
|
public void loadData(String[] srchosts, String[] desthosts,String[] tables, String destDb) { |
156 |
|
|
for (String desthost: desthosts) { |
157 |
|
|
for (String srchost: srchosts) { |
158 |
|
|
loadData(srchost,desthost,tables,destDb); |
159 |
|
|
} |
160 |
|
|
} |
161 |
|
|
System.out.println ("Load finished."); |
162 |
|
|
finalizeDirs(); |
163 |
|
|
} |
164 |
|
|
|
165 |
|
|
private void loadData(String srchost, String desthost,String[] tables, String destDb) { |
166 |
|
|
for (String tbl:tables) { |
167 |
|
|
String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt"; |
168 |
|
|
String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;"; |
169 |
|
|
if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);} |
170 |
|
|
else { |
171 |
|
|
try { |
172 |
|
|
MySQLConnection conn = this.getConnectionToNode(desthost,destDb); |
173 |
|
|
conn.executeSql(sqlloadstr); |
174 |
|
|
System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", into database="+destDb+", table="+tbl); |
175 |
|
|
conn.close(); |
176 |
|
|
} |
177 |
|
|
catch(SQLException e){ |
178 |
|
|
e.printStackTrace(); |
179 |
|
|
System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tbl); |
180 |
|
|
System.exit(1); |
181 |
|
|
} |
182 |
|
|
} |
183 |
|
|
} // end foreach tbl |
184 |
|
|
} |
185 |
|
|
|
186 |
|
|
private void loadSplitData(String srchost, String[] desthosts, String tableName, String destDb) { |
187 |
|
|
for (String desthost:desthosts) { |
188 |
|
|
String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tableName+"_split_"+desthost+".txt"; |
189 |
|
|
String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;"; |
190 |
|
|
if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);} |
191 |
|
|
else { |
192 |
|
|
try { |
193 |
|
|
MySQLConnection conn = this.getConnectionToNode(desthost,destDb); |
194 |
|
|
conn.executeSql(sqlloadstr); |
195 |
|
|
System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", database="+db+", table="+tableName); |
196 |
|
|
conn.close(); |
197 |
|
|
} |
198 |
|
|
catch(SQLException e){ |
199 |
|
|
e.printStackTrace(); |
200 |
|
|
System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tableName); |
201 |
|
|
System.exit(1); |
202 |
|
|
} |
203 |
|
|
} |
204 |
|
|
} // end foreach desthosts |
205 |
|
|
System.out.println ("Load finished."); |
206 |
|
|
finalizeDirs(); |
207 |
|
|
} |
208 |
|
|
|
209 |
|
|
public boolean checkCountsAllTables (){ |
210 |
|
|
boolean checkResult=true; |
211 |
|
|
String[] nodes = getNodes(); |
212 |
|
|
String[] tables = getTables4Db(); |
213 |
|
|
// getting hashmap of all counts from all tables from nodes |
214 |
|
|
HashMap<String,HashMap<String,Integer>> countsNodes=new HashMap<String,HashMap<String,Integer>>(); |
215 |
|
|
for (String node:nodes){ |
216 |
|
|
HashMap<String,Integer> tableCounts = new HashMap<String,Integer>(); |
217 |
|
|
for (String tbl:tables){ |
218 |
|
|
String query="SELECT count(*) FROM "+tbl+";"; |
219 |
|
|
try { |
220 |
|
|
MySQLConnection conn = this.getConnectionToNode(node); |
221 |
|
|
Statement S=conn.createStatement(); |
222 |
|
|
ResultSet R=S.executeQuery(query); |
223 |
|
|
if (R.next()){ |
224 |
|
|
tableCounts.put(tbl,R.getInt(1)); |
225 |
|
|
} |
226 |
|
|
S.close(); |
227 |
|
|
R.close(); |
228 |
|
|
conn.close(); |
229 |
|
|
} |
230 |
|
|
catch(SQLException e){ |
231 |
|
|
e.printStackTrace(); |
232 |
|
|
System.err.println("Couldn't execute query in host="+node+", database="+db); |
233 |
|
|
System.exit(1); |
234 |
|
|
} |
235 |
|
|
} |
236 |
|
|
countsNodes.put(node,tableCounts); |
237 |
|
|
} |
238 |
|
|
// getting hashmap of all counts of all tables from master |
239 |
|
|
HashMap<String,Integer> countsMaster= new HashMap<String,Integer>(); |
240 |
|
|
for (String tbl:tables){ |
241 |
|
|
String query="SELECT count(*) FROM "+tbl+";"; |
242 |
|
|
try { |
243 |
|
|
MySQLConnection conn = this.getConnectionToMaster(); |
244 |
|
|
Statement S=conn.createStatement(); |
245 |
|
|
ResultSet R=S.executeQuery(query); |
246 |
|
|
if (R.next()){ |
247 |
|
|
countsMaster.put(tbl,R.getInt(1)); |
248 |
|
|
} |
249 |
|
|
S.close(); |
250 |
|
|
R.close(); |
251 |
|
|
conn.close(); |
252 |
|
|
} |
253 |
|
|
catch(SQLException e){ |
254 |
|
|
e.printStackTrace(); |
255 |
|
|
System.err.println("Couldn't execute query in host="+MASTER+", database="+db); |
256 |
|
|
System.exit(1); |
257 |
|
|
} |
258 |
|
|
} |
259 |
|
|
// comparing the nodes counts with the master counts |
260 |
|
|
for (String tbl:countsMaster.keySet()){ |
261 |
|
|
int masterCount=countsMaster.get(tbl); |
262 |
|
|
for (String node:countsNodes.keySet()){ |
263 |
|
|
int thisNodeCount=countsNodes.get(node).get(tbl); |
264 |
|
|
if (masterCount!=thisNodeCount) { |
265 |
|
|
System.out.println("Count difers for table "+tbl+" in database "+db+" of node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount); |
266 |
|
|
checkResult=false; // if one count difers then the check fails, we return false for checkResult. If no difers at all we return true |
267 |
|
|
} |
268 |
|
|
else { |
269 |
|
|
System.out.println("Count check passed for node "+node+", table "+tbl); |
270 |
|
|
} |
271 |
|
|
|
272 |
|
|
} |
273 |
|
|
} |
274 |
|
|
return checkResult; |
275 |
|
|
} // end checkCounts |
276 |
|
|
|
277 |
|
|
/** |
278 |
|
|
* To check the key counts in master and nodes for a certain key. |
279 |
|
|
* @param key the name of the key |
280 |
|
|
* @return boolean: true if check passed, false if not passed |
281 |
|
|
*/ |
282 |
|
|
public boolean checkKeyCounts(String key) { |
283 |
|
|
boolean checkResult=true; |
284 |
|
|
String keyTable=key+"_list"; |
285 |
|
|
String masterKeyTable=db+"_"+key+"_list_master"; |
286 |
|
|
String keyColumn =key+"_id"; |
287 |
|
|
// getting hashmap of counts of keys from nodes |
288 |
|
|
String[] nodes = getNodes(); |
289 |
|
|
HashMap<String,int[]> countsNodes=new HashMap<String,int[]>(); |
290 |
|
|
String query="SELECT count("+keyColumn+"),count(DISTINCT "+keyColumn+") FROM "+keyTable+";"; |
291 |
|
|
for (String node:nodes){ |
292 |
|
|
try { |
293 |
|
|
MySQLConnection conn = this.getConnectionToNode(node); |
294 |
|
|
Statement S=conn.createStatement(); |
295 |
|
|
ResultSet R=S.executeQuery(query); |
296 |
|
|
int[] thisNodeKeyCount=new int[2]; |
297 |
|
|
if (R.next()){ |
298 |
|
|
thisNodeKeyCount[0]=R.getInt(1); |
299 |
|
|
thisNodeKeyCount[1]=R.getInt(2); |
300 |
|
|
} |
301 |
|
|
countsNodes.put(node,thisNodeKeyCount); |
302 |
|
|
S.close(); |
303 |
|
|
R.close(); |
304 |
|
|
conn.close(); |
305 |
|
|
} |
306 |
|
|
catch(SQLException e){ |
307 |
|
|
e.printStackTrace(); |
308 |
|
|
System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db); |
309 |
|
|
System.exit(1); |
310 |
|
|
} |
311 |
|
|
} |
312 |
|
|
// getting hashmap of counts of keys from master |
313 |
|
|
HashMap<String,Integer> countsMaster= new HashMap<String,Integer>(); |
314 |
|
|
for (String node:nodes){ |
315 |
|
|
String queryM="SELECT count(*) FROM "+masterKeyTable+" AS a,clients_names as c WHERE a.client_id=c.client_id AND c.client_name='"+node+"';"; |
316 |
|
|
try { |
317 |
|
|
MySQLConnection conn = this.getConnectionToMasterKeyDb(); |
318 |
|
|
Statement S=conn.createStatement(); |
319 |
|
|
ResultSet R=S.executeQuery(queryM); |
320 |
|
|
if (R.next()){ |
321 |
|
|
countsMaster.put(node,R.getInt(1)); |
322 |
|
|
} |
323 |
|
|
S.close(); |
324 |
|
|
R.close(); |
325 |
|
|
conn.close(); |
326 |
|
|
} |
327 |
|
|
catch(SQLException e){ |
328 |
|
|
e.printStackTrace(); |
329 |
|
|
System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB); |
330 |
|
|
System.exit(1); |
331 |
|
|
} |
332 |
|
|
} |
333 |
|
|
//compare the two hashmaps of key counts |
334 |
|
|
for (String node:countsMaster.keySet()){ |
335 |
|
|
int masterCount=countsMaster.get(node); |
336 |
|
|
int[] thisNodeCount=countsNodes.get(node); |
337 |
|
|
if (thisNodeCount[0]!=thisNodeCount[1]) { |
338 |
|
|
System.out.println("Key count and distinct key count do not coincide for key "+keyColumn+" in node "+node+". Key count="+thisNodeCount[0]+", distinct key count="+thisNodeCount[1]); |
339 |
|
|
checkResult=false; |
340 |
|
|
} |
341 |
|
|
else if (thisNodeCount[0]!=masterCount) { |
342 |
|
|
System.out.println("Key counts do not coincide for key "+keyColumn+" in master and node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount[0]); |
343 |
|
|
System.out.print("Differing "+keyColumn+"'s are: "); |
344 |
|
|
int[] diffKeys = getDifferingKeys(key,node); |
345 |
|
|
for (int k:diffKeys){ |
346 |
|
|
System.out.print(k+" "); |
347 |
|
|
} |
348 |
|
|
System.out.println(); |
349 |
|
|
checkResult=false; |
350 |
|
|
} |
351 |
|
|
else { |
352 |
|
|
System.out.println("Key counts check passed for key "+keyColumn+" in node "+node+". The count is: "+masterCount); |
353 |
|
|
} |
354 |
|
|
} |
355 |
|
|
return checkResult; |
356 |
|
|
} |
357 |
|
|
|
358 |
|
|
/** |
359 |
|
|
* Method to get the differing keys for a certain key and node. Used by checkKeycounts method. Shouldn't be used out of this class. |
360 |
|
|
* @param key the key name |
361 |
|
|
* @param node the host name of the cluster node |
362 |
|
|
* @return array of ints with all differing keys for this key and node |
363 |
|
|
*/ |
364 |
|
|
public int[] getDifferingKeys (String key,String node) { |
365 |
|
|
ArrayList<Integer> diffKeys = new ArrayList<Integer>(); |
366 |
|
|
int[] diffKeysAr; |
367 |
|
|
String keyTable=key+"_list"; |
368 |
|
|
String masterKeyTable=db+"_"+key+"_list_master"; |
369 |
|
|
String keyColumn=key+"_id"; |
370 |
|
|
String query="SELECT DISTINCT "+keyColumn+" FROM "+keyTable+" ORDER BY "+keyColumn+";"; |
371 |
|
|
MySQLConnection mconn=null; |
372 |
|
|
try { |
373 |
|
|
MySQLConnection nconn = this.getConnectionToNode(node); |
374 |
|
|
mconn = this.getConnectionToMasterKeyDb(); |
375 |
|
|
Statement S=nconn.createStatement(); |
376 |
|
|
ResultSet R=S.executeQuery(query); |
377 |
|
|
mconn.executeSql("CREATE TEMPORARY TABLE tmp_keys ("+keyColumn+" int(11) default NULL) ENGINE=MEMORY;"); |
378 |
|
|
int thisKey=0; |
379 |
|
|
while (R.next()){ |
380 |
|
|
thisKey=R.getInt(1); |
381 |
|
|
query="INSERT INTO tmp_keys VALUES ("+thisKey+");"; |
382 |
|
|
mconn.executeSql(query); |
383 |
|
|
} |
384 |
|
|
S.close(); |
385 |
|
|
R.close(); |
386 |
|
|
nconn.close(); |
387 |
|
|
} |
388 |
|
|
catch(SQLException e){ |
389 |
|
|
e.printStackTrace(); |
390 |
|
|
System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db); |
391 |
|
|
System.exit(1); |
392 |
|
|
} |
393 |
|
|
try { |
394 |
|
|
query="SELECT c.k " + |
395 |
|
|
"FROM " + |
396 |
|
|
"(SELECT u.id AS k,count(u.id) AS cnt " + |
397 |
|
|
"FROM " + |
398 |
|
|
"(SELECT "+keyColumn+" AS id FROM tmp_keys UNION ALL SELECT kt."+keyColumn+" AS id FROM "+masterKeyTable+" AS kt LEFT JOIN clients_names AS cn ON kt.client_id=cn.client_id WHERE cn.client_name='"+node+"') AS u GROUP BY u.id) " + |
399 |
|
|
"AS c " + |
400 |
|
|
"WHERE c.cnt=1;"; |
401 |
|
|
Statement S=mconn.createStatement(); |
402 |
|
|
ResultSet R=S.executeQuery(query); |
403 |
|
|
while (R.next()){ |
404 |
|
|
diffKeys.add(R.getInt(1)); |
405 |
|
|
} |
406 |
|
|
S.close(); |
407 |
|
|
R.close(); |
408 |
|
|
} |
409 |
|
|
catch(SQLException e){ |
410 |
|
|
e.printStackTrace(); |
411 |
|
|
System.err.println("Couldn't execute query: "+query+"in host="+MASTER+", database="+KEYMASTERDB); |
412 |
|
|
System.exit(1); |
413 |
|
|
} |
414 |
|
|
diffKeysAr= new int[diffKeys.size()]; |
415 |
|
|
for (int i=0;i<diffKeys.size();i++) { |
416 |
|
|
diffKeysAr[i]=diffKeys.get(i); |
417 |
|
|
} |
418 |
|
|
return diffKeysAr; |
419 |
|
|
} |
420 |
|
|
|
421 |
|
|
/** |
422 |
|
|
* Method used by splitIdsIntoSet method. To get all ordered ids from a certain key and table from this db in master server |
423 |
|
|
* @param key the key name |
424 |
|
|
* @param table the table name |
425 |
|
|
* @return int array containing all ids |
426 |
|
|
*/ |
427 |
|
|
public int[] getAllIds4KeyAndTable(String key, String table){ |
428 |
|
|
int[] allIds=null; |
429 |
|
|
String keyColumn=key+"_id"; |
430 |
|
|
try { |
431 |
|
|
MySQLConnection conn=this.getConnectionToMaster(); |
432 |
|
|
Statement S=conn.createStatement(); |
433 |
|
|
String query="SELECT DISTINCT "+keyColumn+" FROM "+table+" ORDER BY "+keyColumn+";"; |
434 |
|
|
ResultSet R=S.executeQuery(query); |
435 |
|
|
ArrayList<Integer> idsAL=new ArrayList<Integer>(); |
436 |
|
|
while (R.next()){ |
437 |
|
|
idsAL.add(R.getInt(1)); |
438 |
|
|
} |
439 |
|
|
allIds=new int[idsAL.size()]; |
440 |
|
|
for (int i=0;i<idsAL.size();i++) { |
441 |
|
|
allIds[i]=idsAL.get(i); |
442 |
|
|
} |
443 |
|
|
R.close(); |
444 |
|
|
S.close(); |
445 |
|
|
conn.close(); |
446 |
|
|
} |
447 |
|
|
catch (SQLException e){ |
448 |
|
|
e.printStackTrace(); |
449 |
|
|
} |
450 |
|
|
return allIds; |
451 |
|
|
} |
452 |
|
|
|
453 |
|
|
/** |
454 |
|
|
* For a certain key and table returns a HashMap containing an int array per cluster node |
455 |
|
|
* @param key |
456 |
|
|
* @param table |
457 |
|
|
* @return HashMap, keys are node names, values: int array with the ids for each node |
458 |
|
|
*/ |
459 |
|
|
public HashMap<String,int[]> splitIdsIntoSets(String key, String table){ |
460 |
|
|
HashMap<String,int[]> idSets =new HashMap<String,int[]>(); |
461 |
|
|
String[] nodes=DataDistribution.getNodes(); |
462 |
|
|
int numNodes=nodes.length; |
463 |
|
|
int[] allIds=this.getAllIds4KeyAndTable(key,table); |
464 |
|
|
int numIds=allIds.length; |
465 |
|
|
int setSize=numIds/numNodes; |
466 |
|
|
int remainder=numIds%numNodes; |
467 |
|
|
for (int i=0;i<numNodes;i++){ |
468 |
|
|
if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node |
469 |
|
|
int[] thisnodeidset=new int[setSize+1]; |
470 |
|
|
for (int j=0;j<thisnodeidset.length;j++){ |
471 |
|
|
thisnodeidset[j]=allIds[j+i*(setSize+1)]; |
472 |
|
|
} |
473 |
|
|
idSets.put(nodes[i],thisnodeidset); |
474 |
|
|
} else { // for the rest we put only setSize ids |
475 |
|
|
int[] thisnodeidset=new int[setSize]; |
476 |
|
|
for (int j=0;j<thisnodeidset.length;j++){ |
477 |
|
|
thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize]; |
478 |
|
|
} |
479 |
|
|
idSets.put(nodes[i],thisnodeidset); |
480 |
|
|
} |
481 |
|
|
} |
482 |
|
|
return idSets; |
483 |
|
|
} |
484 |
|
|
|
485 |
|
|
/** |
486 |
|
|
* To split a given table in chunks based on a key, split tables remain in same database and server |
487 |
|
|
* @param key |
488 |
|
|
* @param table |
489 |
|
|
*/ |
490 |
|
|
public void splitTable (String key,String table){ |
491 |
|
|
String keyColumn=key+"_id"; |
492 |
|
|
String query; |
493 |
|
|
HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table); |
494 |
|
|
String[] splitTables=new String[idSets.size()]; |
495 |
|
|
try { |
496 |
|
|
MySQLConnection conn=this.getConnectionToMaster(); |
497 |
|
|
int i=0; |
498 |
|
|
for (String node:idSets.keySet()) { |
499 |
|
|
String splitTbl=table+"_split_"+node; |
500 |
|
|
splitTables[i]=splitTbl; |
501 |
|
|
i++; |
502 |
|
|
// we create permanent tables |
503 |
|
|
query="CREATE TABLE "+splitTbl+" LIKE "+table+";"; |
504 |
|
|
conn.executeSql(query); |
505 |
|
|
// drop the indexes if there was any, indexes will slow down the creation of split tables |
506 |
|
|
String[] indexes=conn.getAllIndexes4Table(table); |
507 |
|
|
for (String index:indexes) { |
508 |
|
|
conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";"); |
509 |
|
|
} |
510 |
|
|
int idmin=idSets.get(node)[0]; |
511 |
|
|
int idmax=idSets.get(node)[idSets.get(node).length-1]; |
512 |
|
|
query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+keyColumn+">="+idmin+" AND "+keyColumn+"<="+idmax+";"; |
513 |
|
|
conn.executeSql(query); |
514 |
|
|
//TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection |
515 |
|
|
} |
516 |
|
|
conn.close(); |
517 |
|
|
} |
518 |
|
|
catch (SQLException e){ |
519 |
|
|
e.printStackTrace(); |
520 |
|
|
} |
521 |
|
|
} |
522 |
|
|
|
523 |
|
|
/** |
524 |
|
|
* To split a given table in chunks based on a key, split tables go to different nodes of cluster |
525 |
|
|
* @param key |
526 |
|
|
* @param table |
527 |
|
|
* @param destDb name of destination db |
528 |
|
|
*/ |
529 |
|
|
public void splitTableToCluster (String key,String table, String destDb){ |
530 |
|
|
String keyColumn=key+"_id"; |
531 |
|
|
String query; |
532 |
|
|
HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table); |
533 |
|
|
String[] splitTables=new String[idSets.size()]; |
534 |
|
|
try { |
535 |
|
|
MySQLConnection conn=this.getConnectionToMaster(); |
536 |
|
|
int i=0; |
537 |
|
|
for (String node:idSets.keySet()) { |
538 |
|
|
String splitTbl=table+"_split_"+node; |
539 |
|
|
splitTables[i]=splitTbl; |
540 |
|
|
i++; |
541 |
|
|
// we create permanent tables, later we drop them. Can't be temporary as we use another connection for dumpData |
542 |
|
|
query="CREATE TABLE "+splitTbl+" LIKE "+table+";"; |
543 |
|
|
conn.executeSql(query); |
544 |
|
|
// drop the indexes if there was any, indexes will slow down the creation of split tables |
545 |
|
|
String[] indexes=conn.getAllIndexes4Table(table); |
546 |
|
|
for (String index:indexes) { |
547 |
|
|
conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";"); |
548 |
|
|
} |
549 |
|
|
// make the table a memory table (won't be feasible in general case where tables can be VERY big, even white won't cope) |
550 |
|
|
//query="ALTER TABLE "+splitTbl+" TYPE=MEMORY;"; |
551 |
|
|
//conn.executeSql(query); |
552 |
|
|
int idmin=idSets.get(node)[0]; |
553 |
|
|
int idmax=idSets.get(node)[idSets.get(node).length-1]; |
554 |
|
|
query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+keyColumn+">="+idmin+" AND "+keyColumn+"<="+idmax+";"; |
555 |
|
|
conn.executeSql(query); |
556 |
|
|
} |
557 |
|
|
// transfering data across |
558 |
|
|
String[] srchosts={MASTER}; |
559 |
|
|
String[] desthosts=getNodes(); |
560 |
|
|
dumpData(srchosts,splitTables); |
561 |
|
|
// using here loadSplitData rather than loadData because table names are not the same on source and destination, i.e. source: table_split_tla01, dest: table |
562 |
|
|
loadSplitData(MASTER,desthosts,table,destDb); |
563 |
|
|
// droping table, we don't want them anymore after loading data to nodes |
564 |
|
|
for (String tbl:splitTables){ |
565 |
|
|
query="DROP TABLE "+tbl+";"; |
566 |
|
|
conn.executeSql(query); |
567 |
|
|
} |
568 |
|
|
conn.close(); |
569 |
|
|
} |
570 |
|
|
catch (SQLException e){ |
571 |
|
|
e.printStackTrace(); |
572 |
|
|
} |
573 |
|
|
} |
574 |
|
|
|
575 |
|
|
/** |
576 |
|
|
* Executes a query in all nodes in cluster. |
577 |
|
|
* TODO Right now it is serial, must parallelize this with threads |
578 |
|
|
* TODO This can be used in lots of methods in this class (all the loadData and dumpData ones) |
579 |
|
|
* @param query |
580 |
|
|
*/ |
581 |
|
|
public void clusterExecuteQuery(String query){ |
582 |
|
|
String[] nodes = getNodes(); |
583 |
|
|
for (String node: nodes){ |
584 |
|
|
try { |
585 |
|
|
MySQLConnection conn = this.getConnectionToNode(node); |
586 |
|
|
conn.executeSql(query); |
587 |
|
|
conn.close(); |
588 |
|
|
} |
589 |
|
|
catch(SQLException e){ |
590 |
|
|
e.printStackTrace(); |
591 |
|
|
System.err.println("Couldn't execute query="+query+", in node="+node); |
592 |
|
|
System.exit(1); |
593 |
|
|
} |
594 |
|
|
} |
595 |
|
|
|
596 |
|
|
} |
597 |
|
|
} |