ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/tags/aglappe-0.6/tools/DataDistributer.java
Revision: 197
Committed: Thu Jun 14 14:44:44 2007 UTC (17 years, 4 months ago) by duarte
File size: 22855 byte(s)
Log Message:
Tagging version 0.6 for release of cmview 0.6
Line File contents
1 package tools;
2
3 import java.io.BufferedReader;
4 import java.io.File;
5 import java.io.IOException;
6 import java.io.InputStreamReader;
7 import java.sql.SQLException;
8 import java.sql.Statement;
9 import java.util.HashMap;
10
11 public class DataDistributer {
12
13 public final static String GLOBALDIR="/project/snow/global/tmp";
14 public final static String MASTER = DataDistribution.MASTER;
15 public final static String KEYMASTERDB = DataDistribution.KEYMASTERDB;
16
17 // Following parameters are optimized after some testing. I found these to be good values
18 // They represent the number of nodes reading/writing from/to nfs concurrently when loading/dumping
19 // 1 means no concurrency at all. Higher numbers mean more concurrency i.e. more nodes writing/reading to/from nfs concurrently
20 final static int NUM_CONCURRENT_READ_QUERIES = 9;
21 final static int NUM_CONCURRENT_WRITE_QUERIES = 6;
22 // Following parameter only for concurrent writing queries in same host. This number has to be low
23 // as the only limitation is the i/o capacity of the host.
24 final static int NUM_CONCURRENT_SAMEHOST_QUERIES = 3;
25
26 String dumpdir;
27 String srcDb;
28 String destDb;
29 String user;
30 String pwd;
31 boolean debug = false;
32 String rmvtmp = "force"; // 3 possible values: force, noremove and prompt
33
34 /**
35 * Construct DataDistributer object passing only source db. The destination db will be same as source.
36 * @param srcDb
37 * @param user
38 * @param pwd
39 */
40 public DataDistributer(String srcDb, String user, String pwd) {
41 this.srcDb=srcDb;
42 this.destDb=srcDb;
43 this.user=user;
44 this.pwd=pwd;
45 initializeDirs();
46 }
47
48 /**
49 * Construct DataDistributer object passing both source db and destination db
50 * @param srcDb
51 * @param destDb
52 * @param user
53 * @param pwd
54 */
55 public DataDistributer(String srcDb, String destDb, String user, String pwd) {
56 this.srcDb=srcDb;
57 this.destDb=destDb;
58 this.user=user;
59 this.pwd=pwd;
60 initializeDirs();
61 }
62
63 public void setDebug(boolean debug){
64 this.debug=debug;
65 }
66
67 public void setRmv(String rmvtmp){
68 this.rmvtmp=rmvtmp;
69 }
70
71 public void setDumpDir(String dumpdir){
72 // before reseting the dumpdir, we get rid of the existent dumpdir created when this DataDistributer object was constructed
73 // in the case where we set the dumpdir using setDumpDir clearly the randomly named dumpdir created while constructing is not needed
74 SystemCmd.exec("rmdir "+this.dumpdir);
75 this.dumpdir=dumpdir;
76 }
77
78 public void initializeDirs() {
79 dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
80 if (!((new File(dumpdir)).mkdir())) {
81 System.err.println("Couldn't create directory "+dumpdir);
82 System.exit(1);
83 }
84 SystemCmd.exec("chmod go+rw "+dumpdir);
85 }
86
87 public void initializeDirs(String[] hosts){
88 for (String host: hosts) {
89 if (!((new File(dumpdir+"/"+host+"/"+srcDb)).mkdirs())) {
90 System.err.println("Couldn't create directory "+dumpdir+"/"+host+"/"+srcDb);
91 System.exit(1);
92 }
93 SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+host);
94 }
95 }
96
97 public void finalizeDirs() {
98 if (debug) {
99 System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
100 } else {
101 if (rmvtmp.equals("force")) {
102 System.out.println("Removing temporary directory "+dumpdir);
103 //TODO must capture exit state and print to error if problems deleting dir
104 SystemCmd.exec("rm -rf "+dumpdir);
105 } else if (rmvtmp.equals("prompt")){
106 System.out.println("Would you like to remove the temporary data directory '"+dumpdir+"' ? (y/n)");
107 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
108 try {
109 String answer = br.readLine();
110 if (answer.equals("y") || answer.equals("yes") || answer.equals("Y") || answer.equals("YES") || answer.equals("Yes")) {
111 System.out.println("Removing temporary directory "+dumpdir);
112 SystemCmd.exec("rm -rf "+dumpdir);
113 } else {
114 System.out.println("Temporary directory "+dumpdir+" was not removed.");
115 }
116 } catch (IOException e) {
117 System.err.println("I/O error while reading user input.");
118 e.printStackTrace();
119 System.exit(2);
120 }
121 } else if (rmvtmp.equals("noremove")){
122 System.out.println("Temporary directory "+dumpdir+" was not removed.");
123 }
124 }
125 }
126
127 private MySQLConnection getConnectionToNode(String node) {
128 MySQLConnection conn=new MySQLConnection(node,user,pwd,srcDb);
129 return conn;
130 }
131
132 private MySQLConnection getConnectionToMaster() {
133 MySQLConnection conn=this.getConnectionToNode(MASTER);
134 return conn;
135 }
136
137 private MySQLConnection getConnectionToMasterKeyDb() {
138 MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
139 return conn;
140 }
141
142 public void dumpData(String[] srchosts, String[] tables) {
143 int concurrentQueries = 1;
144 if (srchosts.length>1) {
145 concurrentQueries = NUM_CONCURRENT_WRITE_QUERIES;
146 }
147 int i = 0;
148 QueryThread[] qtGroup = new QueryThread[concurrentQueries];
149 initializeDirs(srchosts);
150 for ( String tbl: tables) {
151 for (String srchost: srchosts) {
152 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
153 String wherestr="";
154 String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
155 if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
156 else {
157 if (i!=0 && i%(concurrentQueries) == 0) {
158 try {
159 for (int j = 0;j<qtGroup.length;j++){
160 qtGroup[j].join(); // wait until previous thread group is finished
161 }
162 } catch (InterruptedException e) {
163 e.printStackTrace();
164 }
165 i = 0;
166 qtGroup = new QueryThread[concurrentQueries];
167 }
168 qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
169 } //end else if debug
170 i++;
171 } // end foreach srchost
172 } // end foreach table
173 try {
174 for (int j = 0;j<qtGroup.length;j++){
175 if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
176 qtGroup[j].join(); // wait until the last thread group is finished
177 }
178 }
179 } catch (InterruptedException e) {
180 e.printStackTrace();
181 }
182 System.out.println ("Dump finished.");
183 }
184
185 public <T> void dumpSplitData(String srchost, String[] tables, String key, HashMap<String,T[]> idSets) {
186 int concurrentQueries = NUM_CONCURRENT_SAMEHOST_QUERIES;
187 int i = 0;
188 QueryThread[] qtGroup = new QueryThread[concurrentQueries];
189 String[] srchosts = {srchost};
190 initializeDirs(srchosts);
191 for ( String tbl: tables) {
192 for (String node:idSets.keySet()) {
193 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+"_split_"+node+".txt";
194 String wherestr="";
195 // if number of ids for the key was less than number of nodes, some of the node slots will be empty. We check for those before trying to access the array
196 if (idSets.get(node).length>0) {
197 T idmin= idSets.get(node)[0];
198 T idmax= idSets.get(node)[idSets.get(node).length-1];
199 wherestr="WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"'";
200 String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
201 if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
202 else {
203 if (i!=0 && i%(concurrentQueries) == 0) {
204 try {
205 for (int j = 0;j<qtGroup.length;j++){
206 qtGroup[j].join(); // wait until previous thread group is finished
207 }
208 } catch (InterruptedException e) {
209 e.printStackTrace();
210 }
211 i = 0;
212 qtGroup = new QueryThread[concurrentQueries];
213 }
214 qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
215 } //end else if debug
216 i++;
217 }
218 } // end foreach node
219 } // end foreach table
220 try {
221 for (int j = 0;j<qtGroup.length;j++){
222 if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
223 qtGroup[j].join(); // wait until the last thread group is finished
224 }
225 }
226 } catch (InterruptedException e) {
227 e.printStackTrace();
228 }
229 System.out.println ("Dump finished.");
230 }
231
232 public void loadData(String[] srchosts, String[] desthosts,String[] tables) {
233 int concurrentQueries = 1;
234 if (srchosts.length==1 && desthosts.length>1) {
235 concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
236 }
237 int i = 0;
238 QueryThread[] qtGroup = new QueryThread[concurrentQueries];
239 for (String srchost:srchosts){
240 for (String tbl:tables) {
241 String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
242 String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
243 for (String desthost: desthosts) {
244 if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
245 else {
246 if (i!=0 && i%(concurrentQueries) == 0) {
247 try {
248 for (int j = 0;j<qtGroup.length;j++){
249 qtGroup[j].join(); // wait until previous thread group is finished
250 }
251 } catch (InterruptedException e) {
252 e.printStackTrace();
253 }
254 i = 0;
255 qtGroup = new QueryThread[concurrentQueries];
256 }
257 qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
258 }
259 i++;
260 } // end foreach desthost
261 } // end foreach tbl
262 } // end foreach srchost
263 try {
264 for (int j = 0;j<qtGroup.length;j++){
265 if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
266 qtGroup[j].join(); // wait until the last thread group is finished
267 }
268 }
269 } catch (InterruptedException e) {
270 e.printStackTrace();
271 }
272 System.out.println ("Load finished.");
273 finalizeDirs();
274 }
275
276 public void loadSplitData(String srchost, String[] desthosts, String tableName) {
277 int concurrentQueries = 1;
278 if (desthosts.length>1) {
279 concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
280 }
281 int i = 0;
282 QueryThread[] qtGroup = new QueryThread[concurrentQueries];
283 for (String desthost:desthosts) {
284 String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tableName+"_split_"+desthost+".txt";
285 // we test first if dumpfile exists, if not all the slots (nodes) are filled then some nodes won't have to load anything and the files won't be there because dumpSplitData didn't create them
286 if ((new File(dumpfile)).exists()) {
287 String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
288 if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
289 else {
290 if (i!=0 && i%(concurrentQueries) == 0) {
291 try {
292 for (int j = 0;j<qtGroup.length;j++){
293 qtGroup[j].join(); // wait until previous thread group is finished
294 }
295 } catch (InterruptedException e) {
296 e.printStackTrace();
297 }
298 i = 0;
299 qtGroup = new QueryThread[concurrentQueries];
300 }
301 qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
302 } // end else if debug
303 i++;
304 }
305 } // end foreach desthosts
306 try {
307 for (int j = 0;j<qtGroup.length;j++){
308 if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
309 qtGroup[j].join(); // wait until the last thread group is finished
310 }
311 }
312 } catch (InterruptedException e) {
313 e.printStackTrace();
314 }
315 System.out.println ("Load finished.");
316 finalizeDirs();
317 }
318
319 /**
320 * For a certain key (text or numeric) and table returns a "data distribution" (kind of evenly distributed) of the data to the nodes
321 * To be used when we have a table that we are going to split to the nodes
322 * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
323 * @param key
324 * @param table
325 * @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node
326 */
327 public HashMap<String,Object[]> splitIdsIntoSets(String key, String table){
328 HashMap<String,Object[]> idSets =new HashMap<String,Object[]>();
329 String[] nodes=DataDistribution.getMySQLNodes();
330 int numNodes=nodes.length;
331 MySQLConnection conn = this.getConnectionToMaster();
332 Object[] allIds=conn.getAllIds4KeyAndTable(key,table);
333 conn.close();
334 int numIds=allIds.length;
335 int setSize=numIds/numNodes;
336 int remainder=numIds%numNodes;
337 for (int i=0;i<numNodes;i++){
338 if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
339 Object[] thisnodeidset=new Object[setSize+1];
340 for (int j=0;j<thisnodeidset.length;j++){
341 thisnodeidset[j]=allIds[j+i*(setSize+1)];
342 }
343 idSets.put(nodes[i],thisnodeidset);
344 } else { // for the rest we put only setSize ids
345 Object[] thisnodeidset=new Object[setSize];
346 for (int j=0;j<thisnodeidset.length;j++){
347 thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
348 }
349 idSets.put(nodes[i],thisnodeidset);
350 }
351 }
352 return idSets;
353 }
354
355 /**
356 * To split a given table in chunks based on a key, split tables remain in same database and server
357 * @param key
358 * @param table
359 */
360 public void splitTable (String key,String table){
361 String[] nodes=DataDistribution.getMySQLNodes();
362 MySQLConnection conn=this.getConnectionToMaster();
363 String[] splitTables = new String[nodes.length]; // we create an array that will contain the name of all split tables
364 String[] indexes=conn.getAllIndexes4Table(table);
365 try {
366 // we create split tables and drop indexes before inserting
367 for (int i=0;i<nodes.length;i++) {
368 String splitTbl=table+"_split_"+nodes[i];
369 splitTables[i]=splitTbl; // will be used later while looping over splitTables
370 // we create permanent tables
371 String query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
372 conn.executeSql(query);
373 // drop the indexes if there was any, indexes will slow down the creation of split tables
374 for (String index:indexes) {
375 conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
376 }
377 }
378 HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
379 for (int i=0;i<nodes.length;i++) {
380 Object idmin=idSets.get(nodes[i])[0];
381 Object idmax=idSets.get(nodes[i])[idSets.get(nodes[i]).length-1];
382 String query="INSERT INTO "+splitTables[i]+" SELECT * FROM "+table+" WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"';";
383 conn.executeSql(query);
384 //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
385 }
386 }
387 catch (SQLException e){
388 e.printStackTrace();
389 }
390 conn.close();
391 }
392
393 /**
394 * To split a given table in chunks based on a key, split tables go to different nodes of cluster
395 * @param key
396 * @param table
397 */
398 public DataDistribution splitTableToCluster (String key,String table){
399 System.out.println("Splitting table "+table+" to cluster based on key "+key+"...");
400 String[] tables={table};
401 String[] desthosts=DataDistribution.getMySQLNodes();
402 HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
403 // dumping data with the dumpSplitData method, a modified version of dumpData
404 dumpSplitData(MASTER,tables,key,idSets);
405 // putting the ids in the key_master database so we keep track of where everything is
406 insertIdsToKeyMaster(key,table,idSets);
407 // using here loadSplitData rather than loadData because table names are not the same on source and destination,
408 // i.e. source: table_split_tla01, dest: table
409 loadSplitData(MASTER,desthosts,table);
410 DataDistribution dataDist = new DataDistribution(destDb,user,pwd);
411 System.out.println("Done with splitting.");
412 return dataDist;
413 }
414
415 /**
416 * Insert all ids to the key_master database creating a new table for this destDb/given table combination if not exists
417 * @param key name of key on which distribution of table is based
418 * @param table name of table that we are distributing
419 * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes from a DataDistribution object
420 */
421 public <T> void insertIdsToKeyMaster(String key,String table,HashMap<String,T[]> idSets) {
422 System.out.println("Updating key_master database with ids to nodes mapping...");
423 MySQLConnection conn = this.getConnectionToMasterKeyDb();
424 String keyMasterTbl = createNewKeyMasterTbl(key,table);
425 removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
426 // getting first mapping between nodes names and node ids
427 HashMap<String,Integer> nodes2nodeids = new HashMap<String,Integer>();
428 for (String node:idSets.keySet()){
429 String query="SELECT client_id FROM clients_names WHERE client_name='"+node+"';";
430 int id = conn.getIntFromDb(query);
431 nodes2nodeids.put(node,id);
432 }
433 for (String node:idSets.keySet()){
434 T[] thisNodeIds=idSets.get(node);
435 for (T id:thisNodeIds){
436 String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) VALUES ('"+id+"',"+nodes2nodeids.get(node)+");";
437 try {
438 conn.executeSql(query);
439 } catch (SQLException e) {
440 e.printStackTrace();
441 }
442 }
443 }
444 conn.close();
445 removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
446 addPK(keyMasterTbl,key); // if there were duplicates, this should barf
447 System.out.println("Done with updating key_master database.");
448 }
449
450 /**
451 * To create a new key master table for destination db in the key_master database given a key and table. Used by insertIdsToKeyMaster
452 * Eventually this method on other key_master related should go into their own class, shouldn't they?
453 * @param key
454 * @param table
455 * @return the name of the key master table created
456 */
457 public String createNewKeyMasterTbl(String key,String table) {
458 // find out whether key is numeric or text and setting accordingly query strings
459 String nodes[] = DataDistribution.getMySQLNodes(); // we need the list of nodes only to get one of them no matter which
460 MySQLConnection conn=new MySQLConnection(nodes[0],user,pwd,destDb); // here we connect to destDb in one node, needed to getColumnType
461 String colType = conn.getColumnType(table,key);
462 String autoIncr = "";
463 if (colType.contains("int")){
464 autoIncr = "auto_increment";
465 }
466 conn.close();
467 // key master table name and connection to key master db
468 String keyMasterTbl=destDb+"__"+table;
469 conn=this.getConnectionToMasterKeyDb();
470 try {
471 String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
472 key+" "+colType+" NOT NULL "+autoIncr+", " +
473 "client_id smallint(6) NOT NULL default '0', " +
474 "PRIMARY KEY (`"+key+"`) " +
475 ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
476 Statement S=conn.createStatement();
477 S.executeUpdate(query);
478 S.close();
479 } catch (SQLException e) {
480 System.err.println("Couldn't create table "+keyMasterTbl);
481 e.printStackTrace();
482 }
483 try {
484 Statement S=conn.createStatement();
485 String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+destDb+"\',\'"+keyMasterTbl+"\');";
486 S.executeUpdate(query);
487 S.close();
488 } catch (SQLException e) {
489 System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
490 System.err.println("SQLException: " + e.getMessage());
491 }
492 conn.close();
493 return keyMasterTbl;
494 }
495
496 public void removePK (String keyMasterTbl,String key){
497 MySQLConnection conn=this.getConnectionToMasterKeyDb();
498 boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
499 String colType = conn.getColumnType(keyMasterTbl,key);
500 try {
501 if (isNumeric){ // removing the auto_increment, only in numeric keys
502 String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL;";
503 conn.executeSql(query);
504 }
505 // removing primary key (same sql code for both numeric or text keys
506 String query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
507 conn.executeSql(query);
508 } catch (SQLException e) {
509 e.printStackTrace();
510 }
511 conn.close();
512 }
513
514 public void addPK (String keyMasterTbl, String key){
515 MySQLConnection conn=this.getConnectionToMasterKeyDb();
516 boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
517 String colType = conn.getColumnType(keyMasterTbl,key);
518 try {
519 // adding primary key (same sql code for both numeric or text keys
520 String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
521 conn.executeSql(query);
522 if (isNumeric){ // adding auto_increment, only in numeric keys
523 query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL auto_increment;";
524 conn.executeSql(query);
525 }
526 } catch (SQLException e) {
527 e.printStackTrace();
528 }
529 conn.close();
530 }
531
532 public void removeZeros (String keyMasterTbl, String key){
533 MySQLConnection conn=this.getConnectionToMasterKeyDb();
534 try {
535 // attention! the quotes around 0 are very important, otherwise if key is text-based all records get deleted
536 // mysql somehow considers all text records = 0, using '0' is ok as is considered as a text literal for char fields and as a number for int fields
537 String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"='0';";
538 conn.executeSql(query);
539 } catch (SQLException e) {
540 e.printStackTrace();
541 }
542 conn.close();
543 }
544
545 /**
546 * Executes a query in all nodes in cluster.
547 * Not in use at the moment
548 * TODO Right now it is serial, must parallelize this with threads
549 * @param query
550 */
551 public void clusterExecuteQuery(String query){
552 String[] nodes = DataDistribution.getMySQLNodes();
553 for (String node: nodes){
554 try {
555 MySQLConnection conn = this.getConnectionToNode(node);
556 conn.executeSql(query);
557 conn.close();
558 }
559 catch(SQLException e){
560 e.printStackTrace();
561 System.err.println("Couldn't execute query="+query+", in node="+node);
562 System.exit(1);
563 }
564 }
565 }
566
567 /**
568 * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
569 * Not in use at the moment
570 * TODO Right now it is serial, must parallelize this with threads
571 * TODO This can be used in the load/dump methods in this class where queries are different for each node
572 * @param queries a HashMap containing a query per node
573 */
574 public void clusterExecuteQuery(HashMap<String,String> queries){
575 String[] nodes = DataDistribution.getMySQLNodes();
576 for (String node: nodes){
577 String query="";
578 try {
579 query=queries.get(node);
580 MySQLConnection conn = this.getConnectionToNode(node);
581 conn.executeSql(query);
582 conn.close();
583 }
584 catch(SQLException e){
585 e.printStackTrace();
586 System.err.println("Couldn't execute query="+query+", in node="+node);
587 System.exit(1);
588 }
589 }
590 }
591
592 }

Properties

Name Value
svn:executable *