ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistributer.java
Revision: 102
Committed: Fri Aug 4 14:34:00 2006 UTC (18 years, 1 month ago) by duarte
File size: 22303 byte(s)
Log Message:
Method finalizeDirs now prompts by default whether to delete temp dirs, unless new boolean field "force" is true.
Line User Rev File contents
1 duarte 85 package tools;
2    
3 duarte 102 import java.io.BufferedReader;
4 duarte 85 import java.io.File;
5 duarte 102 import java.io.IOException;
6     import java.io.InputStreamReader;
7 duarte 85 import java.sql.SQLException;
8     import java.sql.Statement;
9     import java.util.HashMap;
10    
11     public class DataDistributer {
12    
13     public final static String GLOBALDIR="/project/snow/global/tmp";
14     public final static String MASTER = DataDistribution.MASTER;
15     public final static String KEYMASTERDB = DataDistribution.KEYMASTERDB;
16    
17 duarte 86 // Following parameters are optimized after some testing. I found these to be good values
18     // They represent the number of nodes reading/writing from/to nfs concurrently when loading/dumping
19     // 1 means no concurrency at all. Higher numbers mean more concurrency i.e. more nodes writing/reading to/from nfs concurrently
20     final static int NUM_CONCURRENT_READ_QUERIES = 9;
21     final static int NUM_CONCURRENT_WRITE_QUERIES = 6;
22 duarte 87 // Following parameter only for concurrent writing queries in same host. This number has to be low
23     // as the only limitation is the i/o capacity of the host.
24     final static int NUM_CONCURRENT_SAMEHOST_QUERIES = 3;
25 duarte 86
26 duarte 85 String dumpdir;
27     String srcDb;
28     String destDb;
29     String user;
30     String pwd;
31     boolean debug = false;
32 duarte 102 boolean force = false;
33 duarte 85
34     /**
35     * Construct DataDistributer object passing only source db. The destination db will be same as source.
36     * @param srcDb
37     * @param user
38     * @param pwd
39     */
40     public DataDistributer(String srcDb, String user, String pwd) {
41     this.srcDb=srcDb;
42     this.destDb=srcDb;
43     this.user=user;
44     this.pwd=pwd;
45     initializeDirs();
46     }
47    
48     /**
49     * Construct DataDistributer object passing both source db and destination db
50     * @param srcDb
51     * @param destDb
52     * @param user
53     * @param pwd
54     */
55     public DataDistributer(String srcDb, String destDb, String user, String pwd) {
56     this.srcDb=srcDb;
57     this.destDb=destDb;
58     this.user=user;
59     this.pwd=pwd;
60     initializeDirs();
61     }
62    
63     public void setDebug(boolean debug){
64     this.debug=debug;
65     }
66    
67 duarte 102 public void setForce(boolean force){
68     this.force=force;
69     }
70    
71 duarte 89 public void setDumpDir(String dumpdir){
72     this.dumpdir=dumpdir;
73     }
74    
75 duarte 85 public void initializeDirs() {
76     dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
77     if (!((new File(dumpdir)).mkdir())) {
78     System.err.println("Couldn't create directory "+dumpdir);
79     System.exit(1);
80     }
81     SystemCmd.exec("chmod go+rw "+dumpdir);
82     }
83    
84 duarte 86 public void initializeDirs(String[] hosts){
85     for (String host: hosts) {
86     if (!((new File(dumpdir+"/"+host+"/"+srcDb)).mkdirs())) {
87     System.err.println("Couldn't create directory "+dumpdir+"/"+host+"/"+srcDb);
88     System.exit(1);
89     }
90     SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+host);
91     }
92     }
93    
94 duarte 85 public void finalizeDirs() {
95     if (debug) {
96     System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
97     } else {
98 duarte 102 if (force) {
99     System.out.println("Removing temporary directory "+dumpdir);
100     //TODO must capture exit state and print to error if problems deleting dir
101     SystemCmd.exec("rm -rf "+dumpdir);
102     } else {
103     System.out.println("Would you like to remove the temporary data directory '"+dumpdir+"' ? (y/n)");
104     BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
105     try {
106     String answer = br.readLine();
107     if (answer.equals("y") || answer.equals("yes") || answer.equals("Y") || answer.equals("YES") || answer.equals("Yes")) {
108     System.out.println("Removing temporary directory "+dumpdir);
109     SystemCmd.exec("rm -rf "+dumpdir);
110     } else {
111     System.out.println("Temporary directory "+dumpdir+" was not removed.");
112     }
113     } catch (IOException e) {
114     System.err.println("I/O error while reading user input.");
115     e.printStackTrace();
116     System.exit(2);
117     }
118     }
119 duarte 85 }
120     }
121    
122     private MySQLConnection getConnectionToNode(String node) {
123     MySQLConnection conn=new MySQLConnection(node,user,pwd,srcDb);
124     return conn;
125     }
126    
127     private MySQLConnection getConnectionToMaster() {
128     MySQLConnection conn=this.getConnectionToNode(MASTER);
129     return conn;
130     }
131    
132     private MySQLConnection getConnectionToMasterKeyDb() {
133     MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
134     return conn;
135     }
136    
137     public void dumpData(String[] srchosts, String[] tables) {
138 duarte 86 int concurrentQueries = 1;
139     if (srchosts.length>1) {
140     concurrentQueries = NUM_CONCURRENT_WRITE_QUERIES;
141     }
142     int i = 0;
143     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
144     initializeDirs(srchosts);
145     for ( String tbl: tables) {
146     for (String srchost: srchosts) {
147 duarte 85 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
148     String wherestr="";
149     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
150     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
151     else {
152 duarte 86 if (i!=0 && i%(concurrentQueries) == 0) {
153     try {
154     for (int j = 0;j<qtGroup.length;j++){
155     qtGroup[j].join(); // wait until previous thread group is finished
156     }
157     } catch (InterruptedException e) {
158     e.printStackTrace();
159     }
160     i = 0;
161     qtGroup = new QueryThread[concurrentQueries];
162     }
163     qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
164 duarte 85 } //end else if debug
165 duarte 86 i++;
166     } // end foreach srchost
167     } // end foreach table
168     try {
169     for (int j = 0;j<qtGroup.length;j++){
170     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
171     qtGroup[j].join(); // wait until the last thread group is finished
172     }
173     }
174     } catch (InterruptedException e) {
175     e.printStackTrace();
176     }
177 duarte 85 System.out.println ("Dump finished.");
178     }
179 duarte 86
180 duarte 93 public <T> void dumpSplitData(String srchost, String[] tables, String key, HashMap<String,T[]> idSets) {
181 duarte 87 int concurrentQueries = NUM_CONCURRENT_SAMEHOST_QUERIES;
182     int i = 0;
183     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
184     String[] srchosts = {srchost};
185     initializeDirs(srchosts);
186     for ( String tbl: tables) {
187 duarte 93 for (String node:idSets.keySet()) {
188 duarte 87 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+"_split_"+node+".txt";
189 duarte 93 String wherestr="";
190     // if number of ids for the key was less than number of nodes, some of the node slots will be empty. We check for those before trying to access the array
191     if (idSets.get(node).length>0) {
192     T idmin= idSets.get(node)[0];
193     T idmax= idSets.get(node)[idSets.get(node).length-1];
194     wherestr="WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"'";
195     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
196     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
197     else {
198     if (i!=0 && i%(concurrentQueries) == 0) {
199     try {
200     for (int j = 0;j<qtGroup.length;j++){
201     qtGroup[j].join(); // wait until previous thread group is finished
202     }
203     } catch (InterruptedException e) {
204     e.printStackTrace();
205 duarte 87 }
206 duarte 93 i = 0;
207     qtGroup = new QueryThread[concurrentQueries];
208     }
209     qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
210     } //end else if debug
211     i++;
212     }
213     } // end foreach node
214 duarte 87 } // end foreach table
215     try {
216     for (int j = 0;j<qtGroup.length;j++){
217     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
218     qtGroup[j].join(); // wait until the last thread group is finished
219     }
220     }
221     } catch (InterruptedException e) {
222     e.printStackTrace();
223     }
224     System.out.println ("Dump finished.");
225     }
226    
227 duarte 85 public void loadData(String[] srchosts, String[] desthosts,String[] tables) {
228 duarte 86 int concurrentQueries = 1;
229     if (srchosts.length==1 && desthosts.length>1) {
230     concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
231     }
232     int i = 0;
233     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
234     for (String srchost:srchosts){
235     for (String tbl:tables) {
236     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
237     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
238     for (String desthost: desthosts) {
239     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
240     else {
241     if (i!=0 && i%(concurrentQueries) == 0) {
242     try {
243     for (int j = 0;j<qtGroup.length;j++){
244     qtGroup[j].join(); // wait until previous thread group is finished
245     }
246     } catch (InterruptedException e) {
247     e.printStackTrace();
248     }
249     i = 0;
250     qtGroup = new QueryThread[concurrentQueries];
251     }
252     qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
253     }
254     i++;
255     } // end foreach desthost
256     } // end foreach tbl
257     } // end foreach srchost
258     try {
259     for (int j = 0;j<qtGroup.length;j++){
260     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
261     qtGroup[j].join(); // wait until the last thread group is finished
262     }
263     }
264     } catch (InterruptedException e) {
265     e.printStackTrace();
266     }
267 duarte 85 System.out.println ("Load finished.");
268     finalizeDirs();
269     }
270    
271     public void loadSplitData(String srchost, String[] desthosts, String tableName) {
272 duarte 86 int concurrentQueries = 1;
273     if (desthosts.length>1) {
274     concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
275     }
276     int i = 0;
277     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
278 duarte 85 for (String desthost:desthosts) {
279     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tableName+"_split_"+desthost+".txt";
280 duarte 93 // we test first if dumpfile exists, if not all the slots (nodes) are filled then some nodes won't have to load anything and the files won't be there because dumpSplitData didn't create them
281     if ((new File(dumpfile)).exists()) {
282     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
283     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
284     else {
285     if (i!=0 && i%(concurrentQueries) == 0) {
286     try {
287     for (int j = 0;j<qtGroup.length;j++){
288     qtGroup[j].join(); // wait until previous thread group is finished
289     }
290     } catch (InterruptedException e) {
291     e.printStackTrace();
292 duarte 86 }
293 duarte 93 i = 0;
294     qtGroup = new QueryThread[concurrentQueries];
295     }
296     qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
297     } // end else if debug
298     i++;
299     }
300 duarte 86 } // end foreach desthosts
301     try {
302     for (int j = 0;j<qtGroup.length;j++){
303     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
304     qtGroup[j].join(); // wait until the last thread group is finished
305 duarte 85 }
306 duarte 86 }
307     } catch (InterruptedException e) {
308     e.printStackTrace();
309     }
310 duarte 85 System.out.println ("Load finished.");
311     finalizeDirs();
312     }
313    
314     /**
315 duarte 94 * For a certain key (text or numeric) and table returns a "data distribution" (kind of evenly distributed) of the data to the nodes
316 duarte 85 * To be used when we have a table that we are going to split to the nodes
317     * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
318 duarte 94 * @param key
319 duarte 85 * @param table
320 duarte 94 * @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node
321 duarte 85 */
322 duarte 94 public HashMap<String,Object[]> splitIdsIntoSets(String key, String table){
323     HashMap<String,Object[]> idSets =new HashMap<String,Object[]>();
324 duarte 85 String[] nodes=DataDistribution.getNodes();
325     int numNodes=nodes.length;
326     MySQLConnection conn = this.getConnectionToMaster();
327 duarte 94 Object[] allIds=conn.getAllIds4KeyAndTable(key,table);
328 duarte 85 conn.close();
329     int numIds=allIds.length;
330     int setSize=numIds/numNodes;
331     int remainder=numIds%numNodes;
332     for (int i=0;i<numNodes;i++){
333     if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
334 duarte 94 Object[] thisnodeidset=new Object[setSize+1];
335 duarte 85 for (int j=0;j<thisnodeidset.length;j++){
336     thisnodeidset[j]=allIds[j+i*(setSize+1)];
337     }
338     idSets.put(nodes[i],thisnodeidset);
339     } else { // for the rest we put only setSize ids
340 duarte 94 Object[] thisnodeidset=new Object[setSize];
341 duarte 85 for (int j=0;j<thisnodeidset.length;j++){
342     thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
343     }
344     idSets.put(nodes[i],thisnodeidset);
345     }
346     }
347     return idSets;
348     }
349    
350     /**
351     * To split a given table in chunks based on a key, split tables remain in same database and server
352     * @param key
353     * @param table
354     */
355     public void splitTable (String key,String table){
356 duarte 93 String[] nodes=DataDistribution.getNodes();
357     MySQLConnection conn=this.getConnectionToMaster();
358     String[] splitTables = new String[nodes.length]; // we create an array that will contain the name of all split tables
359     String[] indexes=conn.getAllIndexes4Table(table);
360 duarte 85 try {
361 duarte 93 // we create split tables and drop indexes before inserting
362     for (int i=0;i<nodes.length;i++) {
363     String splitTbl=table+"_split_"+nodes[i];
364     splitTables[i]=splitTbl; // will be used later while looping over splitTables
365 duarte 85 // we create permanent tables
366 duarte 93 String query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
367 duarte 85 conn.executeSql(query);
368     // drop the indexes if there was any, indexes will slow down the creation of split tables
369     for (String index:indexes) {
370     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
371     }
372 duarte 94 }
373     HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
374     for (int i=0;i<nodes.length;i++) {
375     Object idmin=idSets.get(nodes[i])[0];
376     Object idmax=idSets.get(nodes[i])[idSets.get(nodes[i]).length-1];
377     String query="INSERT INTO "+splitTables[i]+" SELECT * FROM "+table+" WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"';";
378     conn.executeSql(query);
379     //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
380     }
381 duarte 85 }
382     catch (SQLException e){
383     e.printStackTrace();
384     }
385 duarte 93 conn.close();
386 duarte 85 }
387    
388     /**
389     * To split a given table in chunks based on a key, split tables go to different nodes of cluster
390     * @param key
391     * @param table
392     */
393     public DataDistribution splitTableToCluster (String key,String table){
394 duarte 93 System.out.println("Splitting table "+table+" to cluster based on key "+key+"...");
395 duarte 87 String[] tables={table};
396     String[] desthosts=DataDistribution.getNodes();
397 duarte 94 HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
398     // dumping data with the dumpSplitData method, a modified version of dumpData
399     dumpSplitData(MASTER,tables,key,idSets);
400     // putting the ids in the key_master database so we keep track of where everything is
401     insertIdsToKeyMaster(key,table,idSets);
402 duarte 93 // using here loadSplitData rather than loadData because table names are not the same on source and destination,
403     // i.e. source: table_split_tla01, dest: table
404 duarte 87 loadSplitData(MASTER,desthosts,table);
405 duarte 85 DataDistribution dataDist = new DataDistribution(destDb,user,pwd);
406 duarte 93 System.out.println("Done with splitting.");
407 duarte 85 return dataDist;
408     }
409    
410     /**
411     * Insert all ids to the key_master database creating a new table for this destDb/given table combination if not exists
412     * @param key name of key on which distribution of table is based
413     * @param table name of table that we are distributing
414     * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes from a DataDistribution object
415     */
416 duarte 93 public <T> void insertIdsToKeyMaster(String key,String table,HashMap<String,T[]> idSets) {
417     System.out.println("Updating key_master database with ids to nodes mapping...");
418 duarte 85 MySQLConnection conn = this.getConnectionToMasterKeyDb();
419     String keyMasterTbl = createNewKeyMasterTbl(key,table);
420     removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
421 duarte 96 // getting first mapping between nodes names and node ids
422     HashMap<String,Integer> nodes2nodeids = new HashMap<String,Integer>();
423 duarte 85 for (String node:idSets.keySet()){
424 duarte 96 String query="SELECT client_id FROM clients_names WHERE client_name='"+node+"';";
425     int id = conn.getIntFromDb(query);
426     nodes2nodeids.put(node,id);
427     }
428     for (String node:idSets.keySet()){
429 duarte 93 T[] thisNodeIds=idSets.get(node);
430     for (T id:thisNodeIds){
431 duarte 96 String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) VALUES ('"+id+"',"+nodes2nodeids.get(node)+");";
432 duarte 93 try {
433     conn.executeSql(query);
434     } catch (SQLException e) {
435     e.printStackTrace();
436     }
437 duarte 85 }
438     }
439 duarte 93 conn.close();
440 duarte 85 removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
441     addPK(keyMasterTbl,key); // if there were duplicates, this should barf
442 duarte 93 System.out.println("Done with updating key_master database.");
443 duarte 85 }
444    
445     /**
446     * To create a new key master table for destination db in the key_master database given a key and table. Used by insertIdsToKeyMaster
447     * Eventually this method on other key_master related should go into their own class, shouldn't they?
448     * @param key
449     * @param table
450     * @return the name of the key master table created
451     */
452     public String createNewKeyMasterTbl(String key,String table) {
453 duarte 93 // find out whether key is numeric or text and setting accordingly query strings
454     String nodes[] = DataDistribution.getNodes(); // we need the list of nodes only to get one of them no matter which
455     MySQLConnection conn=new MySQLConnection(nodes[0],user,pwd,destDb); // here we connect to destDb in one node, needed to getColumnType
456     String colType = conn.getColumnType(table,key);
457     String autoIncr = "";
458     if (colType.contains("int")){
459     autoIncr = "auto_increment";
460     }
461     conn.close();
462     // key master table name and connection to key master db
463 duarte 85 String keyMasterTbl=destDb+"__"+table;
464 duarte 93 conn=this.getConnectionToMasterKeyDb();
465 duarte 85 try {
466     String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
467 duarte 93 key+" "+colType+" NOT NULL "+autoIncr+", " +
468     "client_id smallint(6) NOT NULL default '0', " +
469     "PRIMARY KEY (`"+key+"`) " +
470     ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
471 duarte 85 Statement S=conn.createStatement();
472     S.executeUpdate(query);
473     S.close();
474     } catch (SQLException e) {
475     System.err.println("Couldn't create table "+keyMasterTbl);
476     e.printStackTrace();
477     }
478     try {
479     Statement S=conn.createStatement();
480 duarte 93 String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+destDb+"\',\'"+keyMasterTbl+"\');";
481 duarte 85 S.executeUpdate(query);
482     S.close();
483     } catch (SQLException e) {
484     System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
485     System.err.println("SQLException: " + e.getMessage());
486     }
487     conn.close();
488     return keyMasterTbl;
489     }
490    
491     public void removePK (String keyMasterTbl,String key){
492 duarte 93 MySQLConnection conn=this.getConnectionToMasterKeyDb();
493     boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
494     String colType = conn.getColumnType(keyMasterTbl,key);
495 duarte 85 try {
496 duarte 93 if (isNumeric){ // removing the auto_increment, only in numeric keys
497     String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL;";
498     conn.executeSql(query);
499     }
500     // removing primary key (same sql code for both numeric or text keys
501     String query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
502 duarte 85 conn.executeSql(query);
503     } catch (SQLException e) {
504     e.printStackTrace();
505     }
506     conn.close();
507     }
508    
509     public void addPK (String keyMasterTbl, String key){
510 duarte 93 MySQLConnection conn=this.getConnectionToMasterKeyDb();
511     boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
512     String colType = conn.getColumnType(keyMasterTbl,key);
513 duarte 85 try {
514 duarte 93 // adding primary key (same sql code for both numeric or text keys
515 duarte 85 String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
516     conn.executeSql(query);
517 duarte 93 if (isNumeric){ // adding auto_increment, only in numeric keys
518     query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL auto_increment;";
519     conn.executeSql(query);
520     }
521 duarte 85 } catch (SQLException e) {
522     e.printStackTrace();
523     }
524     conn.close();
525     }
526    
527     public void removeZeros (String keyMasterTbl, String key){
528     MySQLConnection conn=this.getConnectionToMasterKeyDb();
529     try {
530 duarte 93 // attention! the quotes around 0 are very important, otherwise if key is text-based all records get deleted
531     // mysql somehow considers all text records = 0, using '0' is ok as is considered as a text literal for char fields and as a number for int fields
532     String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"='0';";
533 duarte 85 conn.executeSql(query);
534     } catch (SQLException e) {
535     e.printStackTrace();
536     }
537     conn.close();
538     }
539    
540     /**
541     * Executes a query in all nodes in cluster.
542     * Not in use at the moment
543     * TODO Right now it is serial, must parallelize this with threads
544     * @param query
545     */
546     public void clusterExecuteQuery(String query){
547     String[] nodes = DataDistribution.getNodes();
548     for (String node: nodes){
549     try {
550     MySQLConnection conn = this.getConnectionToNode(node);
551     conn.executeSql(query);
552     conn.close();
553     }
554     catch(SQLException e){
555     e.printStackTrace();
556     System.err.println("Couldn't execute query="+query+", in node="+node);
557     System.exit(1);
558     }
559     }
560     }
561    
562     /**
563     * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
564     * Not in use at the moment
565     * TODO Right now it is serial, must parallelize this with threads
566     * TODO This can be used in the load/dump methods in this class where queries are different for each node
567     * @param queries a HashMap containing a query per node
568     */
569     public void clusterExecuteQuery(HashMap<String,String> queries){
570     String[] nodes = DataDistribution.getNodes();
571     for (String node: nodes){
572     String query="";
573     try {
574     query=queries.get(node);
575     MySQLConnection conn = this.getConnectionToNode(node);
576     conn.executeSql(query);
577     conn.close();
578     }
579     catch(SQLException e){
580     e.printStackTrace();
581     System.err.println("Couldn't execute query="+query+", in node="+node);
582     System.exit(1);
583     }
584     }
585     }
586    
587     }

Properties

Name Value
svn:executable *