1 |
package tools; |
2 |
|
3 |
import java.io.BufferedReader; |
4 |
import java.io.File; |
5 |
import java.io.IOException; |
6 |
import java.io.InputStreamReader; |
7 |
import java.sql.SQLException; |
8 |
import java.sql.Statement; |
9 |
import java.util.HashMap; |
10 |
|
11 |
public class DataDistributer { |
12 |
|
13 |
public final static String GLOBALDIR="/project/snow/global/tmp"; |
14 |
public final static String MASTER = DataDistribution.MASTER; |
15 |
public final static String KEYMASTERDB = DataDistribution.KEYMASTERDB; |
16 |
|
17 |
// Following parameters are optimized after some testing. I found these to be good values |
18 |
// They represent the number of nodes reading/writing from/to nfs concurrently when loading/dumping |
19 |
// 1 means no concurrency at all. Higher numbers mean more concurrency i.e. more nodes writing/reading to/from nfs concurrently |
20 |
final static int NUM_CONCURRENT_READ_QUERIES = 9; |
21 |
final static int NUM_CONCURRENT_WRITE_QUERIES = 6; |
22 |
// Following parameter only for concurrent writing queries in same host. This number has to be low |
23 |
// as the only limitation is the i/o capacity of the host. |
24 |
final static int NUM_CONCURRENT_SAMEHOST_QUERIES = 3; |
25 |
|
26 |
String dumpdir; |
27 |
String srcDb; |
28 |
String destDb; |
29 |
String user; |
30 |
String pwd; |
31 |
boolean debug = false; |
32 |
String rmvtmp = "force"; // 3 possible values: force, noremove and prompt |
33 |
|
34 |
/** |
35 |
* Construct DataDistributer object passing only source db. The destination db will be same as source. |
36 |
* @param srcDb |
37 |
* @param user |
38 |
* @param pwd |
39 |
*/ |
40 |
public DataDistributer(String srcDb, String user, String pwd) { |
41 |
this.srcDb=srcDb; |
42 |
this.destDb=srcDb; |
43 |
this.user=user; |
44 |
this.pwd=pwd; |
45 |
initializeDirs(); |
46 |
} |
47 |
|
48 |
/** |
49 |
* Construct DataDistributer object passing both source db and destination db |
50 |
* @param srcDb |
51 |
* @param destDb |
52 |
* @param user |
53 |
* @param pwd |
54 |
*/ |
55 |
public DataDistributer(String srcDb, String destDb, String user, String pwd) { |
56 |
this.srcDb=srcDb; |
57 |
this.destDb=destDb; |
58 |
this.user=user; |
59 |
this.pwd=pwd; |
60 |
initializeDirs(); |
61 |
} |
62 |
|
63 |
public void setDebug(boolean debug){ |
64 |
this.debug=debug; |
65 |
} |
66 |
|
67 |
public void setRmv(String rmvtmp){ |
68 |
this.rmvtmp=rmvtmp; |
69 |
} |
70 |
|
71 |
public void setDumpDir(String dumpdir){ |
72 |
// before reseting the dumpdir, we get rid of the existent dumpdir created when this DataDistributer object was constructed |
73 |
// in the case where we set the dumpdir using setDumpDir clearly the randomly named dumpdir created while constructing is not needed |
74 |
SystemCmd.exec("rmdir "+this.dumpdir); |
75 |
this.dumpdir=dumpdir; |
76 |
} |
77 |
|
78 |
public void initializeDirs() { |
79 |
dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis(); |
80 |
if (!((new File(dumpdir)).mkdir())) { |
81 |
System.err.println("Couldn't create directory "+dumpdir); |
82 |
System.exit(1); |
83 |
} |
84 |
SystemCmd.exec("chmod go+rw "+dumpdir); |
85 |
} |
86 |
|
87 |
public void initializeDirs(String[] hosts){ |
88 |
for (String host: hosts) { |
89 |
if (!((new File(dumpdir+"/"+host+"/"+srcDb)).mkdirs())) { |
90 |
System.err.println("Couldn't create directory "+dumpdir+"/"+host+"/"+srcDb); |
91 |
System.exit(1); |
92 |
} |
93 |
SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+host); |
94 |
} |
95 |
} |
96 |
|
97 |
public void finalizeDirs() { |
98 |
if (debug) { |
99 |
System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually"); |
100 |
} else { |
101 |
if (rmvtmp.equals("force")) { |
102 |
System.out.println("Removing temporary directory "+dumpdir); |
103 |
//TODO must capture exit state and print to error if problems deleting dir |
104 |
SystemCmd.exec("rm -rf "+dumpdir); |
105 |
} else if (rmvtmp.equals("prompt")){ |
106 |
System.out.println("Would you like to remove the temporary data directory '"+dumpdir+"' ? (y/n)"); |
107 |
BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); |
108 |
try { |
109 |
String answer = br.readLine(); |
110 |
if (answer.equals("y") || answer.equals("yes") || answer.equals("Y") || answer.equals("YES") || answer.equals("Yes")) { |
111 |
System.out.println("Removing temporary directory "+dumpdir); |
112 |
SystemCmd.exec("rm -rf "+dumpdir); |
113 |
} else { |
114 |
System.out.println("Temporary directory "+dumpdir+" was not removed."); |
115 |
} |
116 |
} catch (IOException e) { |
117 |
System.err.println("I/O error while reading user input."); |
118 |
e.printStackTrace(); |
119 |
System.exit(2); |
120 |
} |
121 |
} else if (rmvtmp.equals("noremove")){ |
122 |
System.out.println("Temporary directory "+dumpdir+" was not removed."); |
123 |
} |
124 |
} |
125 |
} |
126 |
|
127 |
private MySQLConnection getConnectionToNode(String node) { |
128 |
MySQLConnection conn=new MySQLConnection(node,user,pwd,srcDb); |
129 |
return conn; |
130 |
} |
131 |
|
132 |
private MySQLConnection getConnectionToMaster() { |
133 |
MySQLConnection conn=this.getConnectionToNode(MASTER); |
134 |
return conn; |
135 |
} |
136 |
|
137 |
private MySQLConnection getConnectionToMasterKeyDb() { |
138 |
MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB); |
139 |
return conn; |
140 |
} |
141 |
|
142 |
public void dumpData(String[] srchosts, String[] tables) { |
143 |
int concurrentQueries = 1; |
144 |
if (srchosts.length>1) { |
145 |
concurrentQueries = NUM_CONCURRENT_WRITE_QUERIES; |
146 |
} |
147 |
int i = 0; |
148 |
QueryThread[] qtGroup = new QueryThread[concurrentQueries]; |
149 |
initializeDirs(srchosts); |
150 |
for ( String tbl: tables) { |
151 |
for (String srchost: srchosts) { |
152 |
String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt"; |
153 |
String wherestr=""; |
154 |
String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';"; |
155 |
if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);} |
156 |
else { |
157 |
if (i!=0 && i%(concurrentQueries) == 0) { |
158 |
try { |
159 |
for (int j = 0;j<qtGroup.length;j++){ |
160 |
qtGroup[j].join(); // wait until previous thread group is finished |
161 |
} |
162 |
} catch (InterruptedException e) { |
163 |
e.printStackTrace(); |
164 |
} |
165 |
i = 0; |
166 |
qtGroup = new QueryThread[concurrentQueries]; |
167 |
} |
168 |
qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb); |
169 |
} //end else if debug |
170 |
i++; |
171 |
} // end foreach srchost |
172 |
} // end foreach table |
173 |
try { |
174 |
for (int j = 0;j<qtGroup.length;j++){ |
175 |
if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join |
176 |
qtGroup[j].join(); // wait until the last thread group is finished |
177 |
} |
178 |
} |
179 |
} catch (InterruptedException e) { |
180 |
e.printStackTrace(); |
181 |
} |
182 |
System.out.println ("Dump finished."); |
183 |
} |
184 |
|
185 |
public <T> void dumpSplitData(String srchost, String[] tables, String key, HashMap<String,T[]> idSets) { |
186 |
int concurrentQueries = NUM_CONCURRENT_SAMEHOST_QUERIES; |
187 |
int i = 0; |
188 |
QueryThread[] qtGroup = new QueryThread[concurrentQueries]; |
189 |
String[] srchosts = {srchost}; |
190 |
initializeDirs(srchosts); |
191 |
for ( String tbl: tables) { |
192 |
for (String node:idSets.keySet()) { |
193 |
String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+"_split_"+node+".txt"; |
194 |
String wherestr=""; |
195 |
// if number of ids for the key was less than number of nodes, some of the node slots will be empty. We check for those before trying to access the array |
196 |
if (idSets.get(node).length>0) { |
197 |
T idmin= idSets.get(node)[0]; |
198 |
T idmax= idSets.get(node)[idSets.get(node).length-1]; |
199 |
wherestr="WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"'"; |
200 |
String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';"; |
201 |
if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);} |
202 |
else { |
203 |
if (i!=0 && i%(concurrentQueries) == 0) { |
204 |
try { |
205 |
for (int j = 0;j<qtGroup.length;j++){ |
206 |
qtGroup[j].join(); // wait until previous thread group is finished |
207 |
} |
208 |
} catch (InterruptedException e) { |
209 |
e.printStackTrace(); |
210 |
} |
211 |
i = 0; |
212 |
qtGroup = new QueryThread[concurrentQueries]; |
213 |
} |
214 |
qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb); |
215 |
} //end else if debug |
216 |
i++; |
217 |
} |
218 |
} // end foreach node |
219 |
} // end foreach table |
220 |
try { |
221 |
for (int j = 0;j<qtGroup.length;j++){ |
222 |
if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join |
223 |
qtGroup[j].join(); // wait until the last thread group is finished |
224 |
} |
225 |
} |
226 |
} catch (InterruptedException e) { |
227 |
e.printStackTrace(); |
228 |
} |
229 |
System.out.println ("Dump finished."); |
230 |
} |
231 |
|
232 |
public void loadData(String[] srchosts, String[] desthosts,String[] tables) { |
233 |
int concurrentQueries = 1; |
234 |
if (srchosts.length==1 && desthosts.length>1) { |
235 |
concurrentQueries = NUM_CONCURRENT_READ_QUERIES; |
236 |
} |
237 |
int i = 0; |
238 |
QueryThread[] qtGroup = new QueryThread[concurrentQueries]; |
239 |
for (String srchost:srchosts){ |
240 |
for (String tbl:tables) { |
241 |
String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt"; |
242 |
String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;"; |
243 |
for (String desthost: desthosts) { |
244 |
if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);} |
245 |
else { |
246 |
if (i!=0 && i%(concurrentQueries) == 0) { |
247 |
try { |
248 |
for (int j = 0;j<qtGroup.length;j++){ |
249 |
qtGroup[j].join(); // wait until previous thread group is finished |
250 |
} |
251 |
} catch (InterruptedException e) { |
252 |
e.printStackTrace(); |
253 |
} |
254 |
i = 0; |
255 |
qtGroup = new QueryThread[concurrentQueries]; |
256 |
} |
257 |
qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb); |
258 |
} |
259 |
i++; |
260 |
} // end foreach desthost |
261 |
} // end foreach tbl |
262 |
} // end foreach srchost |
263 |
try { |
264 |
for (int j = 0;j<qtGroup.length;j++){ |
265 |
if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join |
266 |
qtGroup[j].join(); // wait until the last thread group is finished |
267 |
} |
268 |
} |
269 |
} catch (InterruptedException e) { |
270 |
e.printStackTrace(); |
271 |
} |
272 |
System.out.println ("Load finished."); |
273 |
finalizeDirs(); |
274 |
} |
275 |
|
276 |
public void loadSplitData(String srchost, String[] desthosts, String tableName) { |
277 |
int concurrentQueries = 1; |
278 |
if (desthosts.length>1) { |
279 |
concurrentQueries = NUM_CONCURRENT_READ_QUERIES; |
280 |
} |
281 |
int i = 0; |
282 |
QueryThread[] qtGroup = new QueryThread[concurrentQueries]; |
283 |
for (String desthost:desthosts) { |
284 |
String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tableName+"_split_"+desthost+".txt"; |
285 |
// we test first if dumpfile exists, if not all the slots (nodes) are filled then some nodes won't have to load anything and the files won't be there because dumpSplitData didn't create them |
286 |
if ((new File(dumpfile)).exists()) { |
287 |
String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;"; |
288 |
if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);} |
289 |
else { |
290 |
if (i!=0 && i%(concurrentQueries) == 0) { |
291 |
try { |
292 |
for (int j = 0;j<qtGroup.length;j++){ |
293 |
qtGroup[j].join(); // wait until previous thread group is finished |
294 |
} |
295 |
} catch (InterruptedException e) { |
296 |
e.printStackTrace(); |
297 |
} |
298 |
i = 0; |
299 |
qtGroup = new QueryThread[concurrentQueries]; |
300 |
} |
301 |
qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb); |
302 |
} // end else if debug |
303 |
i++; |
304 |
} |
305 |
} // end foreach desthosts |
306 |
try { |
307 |
for (int j = 0;j<qtGroup.length;j++){ |
308 |
if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join |
309 |
qtGroup[j].join(); // wait until the last thread group is finished |
310 |
} |
311 |
} |
312 |
} catch (InterruptedException e) { |
313 |
e.printStackTrace(); |
314 |
} |
315 |
System.out.println ("Load finished."); |
316 |
finalizeDirs(); |
317 |
} |
318 |
|
319 |
/** |
320 |
* For a certain key (text or numeric) and table returns a "data distribution" (kind of evenly distributed) of the data to the nodes |
321 |
* To be used when we have a table that we are going to split to the nodes |
322 |
* TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids |
323 |
* @param key |
324 |
* @param table |
325 |
* @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node |
326 |
*/ |
327 |
public HashMap<String,Object[]> splitIdsIntoSets(String key, String table){ |
328 |
HashMap<String,Object[]> idSets =new HashMap<String,Object[]>(); |
329 |
String[] nodes=DataDistribution.getMySQLNodes(); |
330 |
int numNodes=nodes.length; |
331 |
MySQLConnection conn = this.getConnectionToMaster(); |
332 |
Object[] allIds=conn.getAllIds4KeyAndTable(key,table); |
333 |
conn.close(); |
334 |
int numIds=allIds.length; |
335 |
int setSize=numIds/numNodes; |
336 |
int remainder=numIds%numNodes; |
337 |
for (int i=0;i<numNodes;i++){ |
338 |
if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node |
339 |
Object[] thisnodeidset=new Object[setSize+1]; |
340 |
for (int j=0;j<thisnodeidset.length;j++){ |
341 |
thisnodeidset[j]=allIds[j+i*(setSize+1)]; |
342 |
} |
343 |
idSets.put(nodes[i],thisnodeidset); |
344 |
} else { // for the rest we put only setSize ids |
345 |
Object[] thisnodeidset=new Object[setSize]; |
346 |
for (int j=0;j<thisnodeidset.length;j++){ |
347 |
thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize]; |
348 |
} |
349 |
idSets.put(nodes[i],thisnodeidset); |
350 |
} |
351 |
} |
352 |
return idSets; |
353 |
} |
354 |
|
355 |
/** |
356 |
* To split a given table in chunks based on a key, split tables remain in same database and server |
357 |
* @param key |
358 |
* @param table |
359 |
*/ |
360 |
public void splitTable (String key,String table){ |
361 |
String[] nodes=DataDistribution.getMySQLNodes(); |
362 |
MySQLConnection conn=this.getConnectionToMaster(); |
363 |
String[] splitTables = new String[nodes.length]; // we create an array that will contain the name of all split tables |
364 |
String[] indexes=conn.getAllIndexes4Table(table); |
365 |
try { |
366 |
// we create split tables and drop indexes before inserting |
367 |
for (int i=0;i<nodes.length;i++) { |
368 |
String splitTbl=table+"_split_"+nodes[i]; |
369 |
splitTables[i]=splitTbl; // will be used later while looping over splitTables |
370 |
// we create permanent tables |
371 |
String query="CREATE TABLE "+splitTbl+" LIKE "+table+";"; |
372 |
conn.executeSql(query); |
373 |
// drop the indexes if there was any, indexes will slow down the creation of split tables |
374 |
for (String index:indexes) { |
375 |
conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";"); |
376 |
} |
377 |
} |
378 |
HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table); |
379 |
for (int i=0;i<nodes.length;i++) { |
380 |
Object idmin=idSets.get(nodes[i])[0]; |
381 |
Object idmax=idSets.get(nodes[i])[idSets.get(nodes[i]).length-1]; |
382 |
String query="INSERT INTO "+splitTables[i]+" SELECT * FROM "+table+" WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"';"; |
383 |
conn.executeSql(query); |
384 |
//TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection |
385 |
} |
386 |
} |
387 |
catch (SQLException e){ |
388 |
e.printStackTrace(); |
389 |
} |
390 |
conn.close(); |
391 |
} |
392 |
|
393 |
/** |
394 |
* To split a given table in chunks based on a key, split tables go to different nodes of cluster |
395 |
* @param key |
396 |
* @param table |
397 |
*/ |
398 |
public DataDistribution splitTableToCluster (String key,String table){ |
399 |
System.out.println("Splitting table "+table+" to cluster based on key "+key+"..."); |
400 |
String[] tables={table}; |
401 |
String[] desthosts=DataDistribution.getMySQLNodes(); |
402 |
HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table); |
403 |
// dumping data with the dumpSplitData method, a modified version of dumpData |
404 |
dumpSplitData(MASTER,tables,key,idSets); |
405 |
// putting the ids in the key_master database so we keep track of where everything is |
406 |
insertIdsToKeyMaster(key,table,idSets); |
407 |
// using here loadSplitData rather than loadData because table names are not the same on source and destination, |
408 |
// i.e. source: table_split_tla01, dest: table |
409 |
loadSplitData(MASTER,desthosts,table); |
410 |
DataDistribution dataDist = new DataDistribution(destDb,user,pwd); |
411 |
System.out.println("Done with splitting."); |
412 |
return dataDist; |
413 |
} |
414 |
|
415 |
/** |
416 |
* Insert all ids to the key_master database creating a new table for this destDb/given table combination if not exists |
417 |
* @param key name of key on which distribution of table is based |
418 |
* @param table name of table that we are distributing |
419 |
* @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes from a DataDistribution object |
420 |
*/ |
421 |
public <T> void insertIdsToKeyMaster(String key,String table,HashMap<String,T[]> idSets) { |
422 |
System.out.println("Updating key_master database with ids to nodes mapping..."); |
423 |
MySQLConnection conn = this.getConnectionToMasterKeyDb(); |
424 |
String keyMasterTbl = createNewKeyMasterTbl(key,table); |
425 |
removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!! |
426 |
// getting first mapping between nodes names and node ids |
427 |
HashMap<String,Integer> nodes2nodeids = new HashMap<String,Integer>(); |
428 |
for (String node:idSets.keySet()){ |
429 |
String query="SELECT client_id FROM clients_names WHERE client_name='"+node+"';"; |
430 |
int id = conn.getIntFromDb(query); |
431 |
nodes2nodeids.put(node,id); |
432 |
} |
433 |
for (String node:idSets.keySet()){ |
434 |
T[] thisNodeIds=idSets.get(node); |
435 |
for (T id:thisNodeIds){ |
436 |
String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) VALUES ('"+id+"',"+nodes2nodeids.get(node)+");"; |
437 |
try { |
438 |
conn.executeSql(query); |
439 |
} catch (SQLException e) { |
440 |
e.printStackTrace(); |
441 |
} |
442 |
} |
443 |
} |
444 |
conn.close(); |
445 |
removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them |
446 |
addPK(keyMasterTbl,key); // if there were duplicates, this should barf |
447 |
System.out.println("Done with updating key_master database."); |
448 |
} |
449 |
|
450 |
/** |
451 |
* To create a new key master table for destination db in the key_master database given a key and table. Used by insertIdsToKeyMaster |
452 |
* Eventually this method on other key_master related should go into their own class, shouldn't they? |
453 |
* @param key |
454 |
* @param table |
455 |
* @return the name of the key master table created |
456 |
*/ |
457 |
public String createNewKeyMasterTbl(String key,String table) { |
458 |
// find out whether key is numeric or text and setting accordingly query strings |
459 |
String nodes[] = DataDistribution.getMySQLNodes(); // we need the list of nodes only to get one of them no matter which |
460 |
MySQLConnection conn=new MySQLConnection(nodes[0],user,pwd,destDb); // here we connect to destDb in one node, needed to getColumnType |
461 |
String colType = conn.getColumnType(table,key); |
462 |
String autoIncr = ""; |
463 |
if (colType.contains("int")){ |
464 |
autoIncr = "auto_increment"; |
465 |
} |
466 |
conn.close(); |
467 |
// key master table name and connection to key master db |
468 |
String keyMasterTbl=destDb+"__"+table; |
469 |
conn=this.getConnectionToMasterKeyDb(); |
470 |
try { |
471 |
String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+ |
472 |
key+" "+colType+" NOT NULL "+autoIncr+", " + |
473 |
"client_id smallint(6) NOT NULL default '0', " + |
474 |
"PRIMARY KEY (`"+key+"`) " + |
475 |
") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;"; |
476 |
Statement S=conn.createStatement(); |
477 |
S.executeUpdate(query); |
478 |
S.close(); |
479 |
} catch (SQLException e) { |
480 |
System.err.println("Couldn't create table "+keyMasterTbl); |
481 |
e.printStackTrace(); |
482 |
} |
483 |
try { |
484 |
Statement S=conn.createStatement(); |
485 |
String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+destDb+"\',\'"+keyMasterTbl+"\');"; |
486 |
S.executeUpdate(query); |
487 |
S.close(); |
488 |
} catch (SQLException e) { |
489 |
System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!"); |
490 |
System.err.println("SQLException: " + e.getMessage()); |
491 |
} |
492 |
conn.close(); |
493 |
return keyMasterTbl; |
494 |
} |
495 |
|
496 |
public void removePK (String keyMasterTbl,String key){ |
497 |
MySQLConnection conn=this.getConnectionToMasterKeyDb(); |
498 |
boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key); |
499 |
String colType = conn.getColumnType(keyMasterTbl,key); |
500 |
try { |
501 |
if (isNumeric){ // removing the auto_increment, only in numeric keys |
502 |
String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL;"; |
503 |
conn.executeSql(query); |
504 |
} |
505 |
// removing primary key (same sql code for both numeric or text keys |
506 |
String query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;"; |
507 |
conn.executeSql(query); |
508 |
} catch (SQLException e) { |
509 |
e.printStackTrace(); |
510 |
} |
511 |
conn.close(); |
512 |
} |
513 |
|
514 |
public void addPK (String keyMasterTbl, String key){ |
515 |
MySQLConnection conn=this.getConnectionToMasterKeyDb(); |
516 |
boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key); |
517 |
String colType = conn.getColumnType(keyMasterTbl,key); |
518 |
try { |
519 |
// adding primary key (same sql code for both numeric or text keys |
520 |
String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");"; |
521 |
conn.executeSql(query); |
522 |
if (isNumeric){ // adding auto_increment, only in numeric keys |
523 |
query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL auto_increment;"; |
524 |
conn.executeSql(query); |
525 |
} |
526 |
} catch (SQLException e) { |
527 |
e.printStackTrace(); |
528 |
} |
529 |
conn.close(); |
530 |
} |
531 |
|
532 |
public void removeZeros (String keyMasterTbl, String key){ |
533 |
MySQLConnection conn=this.getConnectionToMasterKeyDb(); |
534 |
try { |
535 |
// attention! the quotes around 0 are very important, otherwise if key is text-based all records get deleted |
536 |
// mysql somehow considers all text records = 0, using '0' is ok as is considered as a text literal for char fields and as a number for int fields |
537 |
String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"='0';"; |
538 |
conn.executeSql(query); |
539 |
} catch (SQLException e) { |
540 |
e.printStackTrace(); |
541 |
} |
542 |
conn.close(); |
543 |
} |
544 |
|
545 |
/** |
546 |
* Executes a query in all nodes in cluster. |
547 |
* Not in use at the moment |
548 |
* TODO Right now it is serial, must parallelize this with threads |
549 |
* @param query |
550 |
*/ |
551 |
public void clusterExecuteQuery(String query){ |
552 |
String[] nodes = DataDistribution.getMySQLNodes(); |
553 |
for (String node: nodes){ |
554 |
try { |
555 |
MySQLConnection conn = this.getConnectionToNode(node); |
556 |
conn.executeSql(query); |
557 |
conn.close(); |
558 |
} |
559 |
catch(SQLException e){ |
560 |
e.printStackTrace(); |
561 |
System.err.println("Couldn't execute query="+query+", in node="+node); |
562 |
System.exit(1); |
563 |
} |
564 |
} |
565 |
} |
566 |
|
567 |
/** |
568 |
* Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node) |
569 |
* Not in use at the moment |
570 |
* TODO Right now it is serial, must parallelize this with threads |
571 |
* TODO This can be used in the load/dump methods in this class where queries are different for each node |
572 |
* @param queries a HashMap containing a query per node |
573 |
*/ |
574 |
public void clusterExecuteQuery(HashMap<String,String> queries){ |
575 |
String[] nodes = DataDistribution.getMySQLNodes(); |
576 |
for (String node: nodes){ |
577 |
String query=""; |
578 |
try { |
579 |
query=queries.get(node); |
580 |
MySQLConnection conn = this.getConnectionToNode(node); |
581 |
conn.executeSql(query); |
582 |
conn.close(); |
583 |
} |
584 |
catch(SQLException e){ |
585 |
e.printStackTrace(); |
586 |
System.err.println("Couldn't execute query="+query+", in node="+node); |
587 |
System.exit(1); |
588 |
} |
589 |
} |
590 |
} |
591 |
|
592 |
} |