ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistributer.java
Revision: 202
Committed: Thu Jun 21 17:18:11 2007 UTC (17 years, 3 months ago) by duarte
File size: 23205 byte(s)
Log Message:
MySQLConnection now throwing SQLException on connect
Many files changed following this: all calling classes now re-throwing or catching the SQLException
Line User Rev File contents
1 duarte 85 package tools;
2    
3 duarte 102 import java.io.BufferedReader;
4 duarte 85 import java.io.File;
5 duarte 102 import java.io.IOException;
6     import java.io.InputStreamReader;
7 duarte 85 import java.sql.SQLException;
8     import java.sql.Statement;
9     import java.util.HashMap;
10    
11     public class DataDistributer {
12    
13     public final static String GLOBALDIR="/project/snow/global/tmp";
14     public final static String MASTER = DataDistribution.MASTER;
15     public final static String KEYMASTERDB = DataDistribution.KEYMASTERDB;
16    
17 duarte 86 // Following parameters are optimized after some testing. I found these to be good values
18     // They represent the number of nodes reading/writing from/to nfs concurrently when loading/dumping
19     // 1 means no concurrency at all. Higher numbers mean more concurrency i.e. more nodes writing/reading to/from nfs concurrently
20     final static int NUM_CONCURRENT_READ_QUERIES = 9;
21     final static int NUM_CONCURRENT_WRITE_QUERIES = 6;
22 duarte 87 // Following parameter only for concurrent writing queries in same host. This number has to be low
23     // as the only limitation is the i/o capacity of the host.
24     final static int NUM_CONCURRENT_SAMEHOST_QUERIES = 3;
25 duarte 86
26 duarte 85 String dumpdir;
27     String srcDb;
28     String destDb;
29     String user;
30     String pwd;
31     boolean debug = false;
32 duarte 104 String rmvtmp = "force"; // 3 possible values: force, noremove and prompt
33 duarte 85
34     /**
35     * Construct DataDistributer object passing only source db. The destination db will be same as source.
36     * @param srcDb
37     * @param user
38     * @param pwd
39     */
40     public DataDistributer(String srcDb, String user, String pwd) {
41     this.srcDb=srcDb;
42     this.destDb=srcDb;
43     this.user=user;
44     this.pwd=pwd;
45     initializeDirs();
46     }
47    
48     /**
49     * Construct DataDistributer object passing both source db and destination db
50     * @param srcDb
51     * @param destDb
52     * @param user
53     * @param pwd
54     */
55     public DataDistributer(String srcDb, String destDb, String user, String pwd) {
56     this.srcDb=srcDb;
57     this.destDb=destDb;
58     this.user=user;
59     this.pwd=pwd;
60     initializeDirs();
61     }
62    
63     public void setDebug(boolean debug){
64     this.debug=debug;
65     }
66    
67 duarte 104 public void setRmv(String rmvtmp){
68     this.rmvtmp=rmvtmp;
69 duarte 102 }
70    
71 duarte 89 public void setDumpDir(String dumpdir){
72 duarte 104 // before reseting the dumpdir, we get rid of the existent dumpdir created when this DataDistributer object was constructed
73     // in the case where we set the dumpdir using setDumpDir clearly the randomly named dumpdir created while constructing is not needed
74     SystemCmd.exec("rmdir "+this.dumpdir);
75 duarte 89 this.dumpdir=dumpdir;
76     }
77    
78 duarte 85 public void initializeDirs() {
79     dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
80     if (!((new File(dumpdir)).mkdir())) {
81     System.err.println("Couldn't create directory "+dumpdir);
82     System.exit(1);
83     }
84     SystemCmd.exec("chmod go+rw "+dumpdir);
85     }
86    
87 duarte 86 public void initializeDirs(String[] hosts){
88     for (String host: hosts) {
89     if (!((new File(dumpdir+"/"+host+"/"+srcDb)).mkdirs())) {
90     System.err.println("Couldn't create directory "+dumpdir+"/"+host+"/"+srcDb);
91     System.exit(1);
92     }
93     SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+host);
94     }
95     }
96    
97 duarte 85 public void finalizeDirs() {
98     if (debug) {
99     System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
100     } else {
101 duarte 104 if (rmvtmp.equals("force")) {
102 duarte 102 System.out.println("Removing temporary directory "+dumpdir);
103     //TODO must capture exit state and print to error if problems deleting dir
104     SystemCmd.exec("rm -rf "+dumpdir);
105 duarte 104 } else if (rmvtmp.equals("prompt")){
106 duarte 102 System.out.println("Would you like to remove the temporary data directory '"+dumpdir+"' ? (y/n)");
107     BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
108     try {
109     String answer = br.readLine();
110     if (answer.equals("y") || answer.equals("yes") || answer.equals("Y") || answer.equals("YES") || answer.equals("Yes")) {
111     System.out.println("Removing temporary directory "+dumpdir);
112     SystemCmd.exec("rm -rf "+dumpdir);
113     } else {
114     System.out.println("Temporary directory "+dumpdir+" was not removed.");
115     }
116     } catch (IOException e) {
117     System.err.println("I/O error while reading user input.");
118     e.printStackTrace();
119     System.exit(2);
120     }
121 duarte 104 } else if (rmvtmp.equals("noremove")){
122     System.out.println("Temporary directory "+dumpdir+" was not removed.");
123 duarte 102 }
124 duarte 85 }
125     }
126    
127 duarte 202 private MySQLConnection getConnectionToNode(String node) throws SQLException {
128 duarte 85 MySQLConnection conn=new MySQLConnection(node,user,pwd,srcDb);
129     return conn;
130     }
131    
132 duarte 202 private MySQLConnection getConnectionToMaster() throws SQLException {
133 duarte 85 MySQLConnection conn=this.getConnectionToNode(MASTER);
134     return conn;
135     }
136    
137 duarte 202 private MySQLConnection getConnectionToMasterKeyDb() throws SQLException {
138 duarte 85 MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
139     return conn;
140     }
141    
142     public void dumpData(String[] srchosts, String[] tables) {
143 duarte 86 int concurrentQueries = 1;
144     if (srchosts.length>1) {
145     concurrentQueries = NUM_CONCURRENT_WRITE_QUERIES;
146     }
147     int i = 0;
148     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
149     initializeDirs(srchosts);
150     for ( String tbl: tables) {
151     for (String srchost: srchosts) {
152 duarte 85 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
153     String wherestr="";
154     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
155     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
156     else {
157 duarte 86 if (i!=0 && i%(concurrentQueries) == 0) {
158     try {
159     for (int j = 0;j<qtGroup.length;j++){
160     qtGroup[j].join(); // wait until previous thread group is finished
161     }
162     } catch (InterruptedException e) {
163     e.printStackTrace();
164     }
165     i = 0;
166     qtGroup = new QueryThread[concurrentQueries];
167     }
168     qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
169 duarte 85 } //end else if debug
170 duarte 86 i++;
171     } // end foreach srchost
172     } // end foreach table
173     try {
174     for (int j = 0;j<qtGroup.length;j++){
175     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
176     qtGroup[j].join(); // wait until the last thread group is finished
177     }
178     }
179     } catch (InterruptedException e) {
180     e.printStackTrace();
181     }
182 duarte 85 System.out.println ("Dump finished.");
183     }
184 duarte 86
185 duarte 93 public <T> void dumpSplitData(String srchost, String[] tables, String key, HashMap<String,T[]> idSets) {
186 duarte 87 int concurrentQueries = NUM_CONCURRENT_SAMEHOST_QUERIES;
187     int i = 0;
188     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
189     String[] srchosts = {srchost};
190     initializeDirs(srchosts);
191     for ( String tbl: tables) {
192 duarte 93 for (String node:idSets.keySet()) {
193 duarte 87 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+"_split_"+node+".txt";
194 duarte 93 String wherestr="";
195     // if number of ids for the key was less than number of nodes, some of the node slots will be empty. We check for those before trying to access the array
196     if (idSets.get(node).length>0) {
197     T idmin= idSets.get(node)[0];
198     T idmax= idSets.get(node)[idSets.get(node).length-1];
199     wherestr="WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"'";
200     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
201     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
202     else {
203     if (i!=0 && i%(concurrentQueries) == 0) {
204     try {
205     for (int j = 0;j<qtGroup.length;j++){
206     qtGroup[j].join(); // wait until previous thread group is finished
207     }
208     } catch (InterruptedException e) {
209     e.printStackTrace();
210 duarte 87 }
211 duarte 93 i = 0;
212     qtGroup = new QueryThread[concurrentQueries];
213     }
214     qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
215     } //end else if debug
216     i++;
217     }
218     } // end foreach node
219 duarte 87 } // end foreach table
220     try {
221     for (int j = 0;j<qtGroup.length;j++){
222     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
223     qtGroup[j].join(); // wait until the last thread group is finished
224     }
225     }
226     } catch (InterruptedException e) {
227     e.printStackTrace();
228     }
229     System.out.println ("Dump finished.");
230     }
231    
232 duarte 85 public void loadData(String[] srchosts, String[] desthosts,String[] tables) {
233 duarte 86 int concurrentQueries = 1;
234     if (srchosts.length==1 && desthosts.length>1) {
235     concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
236     }
237     int i = 0;
238     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
239     for (String srchost:srchosts){
240     for (String tbl:tables) {
241     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
242     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
243     for (String desthost: desthosts) {
244     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
245     else {
246     if (i!=0 && i%(concurrentQueries) == 0) {
247     try {
248     for (int j = 0;j<qtGroup.length;j++){
249     qtGroup[j].join(); // wait until previous thread group is finished
250     }
251     } catch (InterruptedException e) {
252     e.printStackTrace();
253     }
254     i = 0;
255     qtGroup = new QueryThread[concurrentQueries];
256     }
257     qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
258     }
259     i++;
260     } // end foreach desthost
261     } // end foreach tbl
262     } // end foreach srchost
263     try {
264     for (int j = 0;j<qtGroup.length;j++){
265     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
266     qtGroup[j].join(); // wait until the last thread group is finished
267     }
268     }
269     } catch (InterruptedException e) {
270     e.printStackTrace();
271     }
272 duarte 85 System.out.println ("Load finished.");
273     finalizeDirs();
274     }
275    
276     public void loadSplitData(String srchost, String[] desthosts, String tableName) {
277 duarte 86 int concurrentQueries = 1;
278     if (desthosts.length>1) {
279     concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
280     }
281     int i = 0;
282     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
283 duarte 85 for (String desthost:desthosts) {
284     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tableName+"_split_"+desthost+".txt";
285 duarte 93 // we test first if dumpfile exists, if not all the slots (nodes) are filled then some nodes won't have to load anything and the files won't be there because dumpSplitData didn't create them
286     if ((new File(dumpfile)).exists()) {
287     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
288     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
289     else {
290     if (i!=0 && i%(concurrentQueries) == 0) {
291     try {
292     for (int j = 0;j<qtGroup.length;j++){
293     qtGroup[j].join(); // wait until previous thread group is finished
294     }
295     } catch (InterruptedException e) {
296     e.printStackTrace();
297 duarte 86 }
298 duarte 93 i = 0;
299     qtGroup = new QueryThread[concurrentQueries];
300     }
301     qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
302     } // end else if debug
303     i++;
304     }
305 duarte 86 } // end foreach desthosts
306     try {
307     for (int j = 0;j<qtGroup.length;j++){
308     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
309     qtGroup[j].join(); // wait until the last thread group is finished
310 duarte 85 }
311 duarte 86 }
312     } catch (InterruptedException e) {
313     e.printStackTrace();
314     }
315 duarte 85 System.out.println ("Load finished.");
316     finalizeDirs();
317     }
318    
319     /**
320 duarte 94 * For a certain key (text or numeric) and table returns a "data distribution" (kind of evenly distributed) of the data to the nodes
321 duarte 85 * To be used when we have a table that we are going to split to the nodes
322     * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
323 duarte 94 * @param key
324 duarte 85 * @param table
325 duarte 94 * @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node
326 duarte 202 * @throws SQLException
327 duarte 85 */
328 duarte 202 public HashMap<String,Object[]> splitIdsIntoSets(String key, String table) throws SQLException{
329 duarte 94 HashMap<String,Object[]> idSets =new HashMap<String,Object[]>();
330 duarte 110 String[] nodes=DataDistribution.getMySQLNodes();
331 duarte 85 int numNodes=nodes.length;
332     MySQLConnection conn = this.getConnectionToMaster();
333 duarte 94 Object[] allIds=conn.getAllIds4KeyAndTable(key,table);
334 duarte 85 conn.close();
335     int numIds=allIds.length;
336     int setSize=numIds/numNodes;
337     int remainder=numIds%numNodes;
338     for (int i=0;i<numNodes;i++){
339     if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
340 duarte 94 Object[] thisnodeidset=new Object[setSize+1];
341 duarte 85 for (int j=0;j<thisnodeidset.length;j++){
342     thisnodeidset[j]=allIds[j+i*(setSize+1)];
343     }
344     idSets.put(nodes[i],thisnodeidset);
345     } else { // for the rest we put only setSize ids
346 duarte 94 Object[] thisnodeidset=new Object[setSize];
347 duarte 85 for (int j=0;j<thisnodeidset.length;j++){
348     thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
349     }
350     idSets.put(nodes[i],thisnodeidset);
351     }
352     }
353     return idSets;
354     }
355    
356     /**
357     * To split a given table in chunks based on a key, split tables remain in same database and server
358     * @param key
359     * @param table
360 duarte 202 * @throws SQLException
361 duarte 85 */
362 duarte 202 public void splitTable (String key,String table) throws SQLException{
363 duarte 110 String[] nodes=DataDistribution.getMySQLNodes();
364 duarte 93 MySQLConnection conn=this.getConnectionToMaster();
365     String[] splitTables = new String[nodes.length]; // we create an array that will contain the name of all split tables
366     String[] indexes=conn.getAllIndexes4Table(table);
367 duarte 85 try {
368 duarte 93 // we create split tables and drop indexes before inserting
369     for (int i=0;i<nodes.length;i++) {
370     String splitTbl=table+"_split_"+nodes[i];
371     splitTables[i]=splitTbl; // will be used later while looping over splitTables
372 duarte 85 // we create permanent tables
373 duarte 93 String query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
374 duarte 85 conn.executeSql(query);
375     // drop the indexes if there was any, indexes will slow down the creation of split tables
376     for (String index:indexes) {
377     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
378     }
379 duarte 94 }
380     HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
381     for (int i=0;i<nodes.length;i++) {
382     Object idmin=idSets.get(nodes[i])[0];
383     Object idmax=idSets.get(nodes[i])[idSets.get(nodes[i]).length-1];
384     String query="INSERT INTO "+splitTables[i]+" SELECT * FROM "+table+" WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"';";
385     conn.executeSql(query);
386     //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
387     }
388 duarte 85 }
389     catch (SQLException e){
390     e.printStackTrace();
391     }
392 duarte 93 conn.close();
393 duarte 85 }
394    
395     /**
396     * To split a given table in chunks based on a key, split tables go to different nodes of cluster
397     * @param key
398     * @param table
399 duarte 202 * @throws SQLException
400 duarte 85 */
401 duarte 202 public DataDistribution splitTableToCluster (String key,String table) throws SQLException{
402 duarte 93 System.out.println("Splitting table "+table+" to cluster based on key "+key+"...");
403 duarte 87 String[] tables={table};
404 duarte 110 String[] desthosts=DataDistribution.getMySQLNodes();
405 duarte 94 HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
406     // dumping data with the dumpSplitData method, a modified version of dumpData
407     dumpSplitData(MASTER,tables,key,idSets);
408     // putting the ids in the key_master database so we keep track of where everything is
409     insertIdsToKeyMaster(key,table,idSets);
410 duarte 93 // using here loadSplitData rather than loadData because table names are not the same on source and destination,
411     // i.e. source: table_split_tla01, dest: table
412 duarte 87 loadSplitData(MASTER,desthosts,table);
413 duarte 85 DataDistribution dataDist = new DataDistribution(destDb,user,pwd);
414 duarte 93 System.out.println("Done with splitting.");
415 duarte 85 return dataDist;
416     }
417    
418     /**
419     * Insert all ids to the key_master database creating a new table for this destDb/given table combination if not exists
420     * @param key name of key on which distribution of table is based
421     * @param table name of table that we are distributing
422     * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes from a DataDistribution object
423 duarte 202 * @throws SQLException
424 duarte 85 */
425 duarte 202 public <T> void insertIdsToKeyMaster(String key,String table,HashMap<String,T[]> idSets) throws SQLException {
426 duarte 93 System.out.println("Updating key_master database with ids to nodes mapping...");
427 duarte 85 MySQLConnection conn = this.getConnectionToMasterKeyDb();
428     String keyMasterTbl = createNewKeyMasterTbl(key,table);
429     removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
430 duarte 96 // getting first mapping between nodes names and node ids
431     HashMap<String,Integer> nodes2nodeids = new HashMap<String,Integer>();
432 duarte 85 for (String node:idSets.keySet()){
433 duarte 96 String query="SELECT client_id FROM clients_names WHERE client_name='"+node+"';";
434     int id = conn.getIntFromDb(query);
435     nodes2nodeids.put(node,id);
436     }
437     for (String node:idSets.keySet()){
438 duarte 93 T[] thisNodeIds=idSets.get(node);
439     for (T id:thisNodeIds){
440 duarte 96 String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) VALUES ('"+id+"',"+nodes2nodeids.get(node)+");";
441 duarte 93 try {
442     conn.executeSql(query);
443     } catch (SQLException e) {
444     e.printStackTrace();
445     }
446 duarte 85 }
447     }
448 duarte 93 conn.close();
449 duarte 85 removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
450     addPK(keyMasterTbl,key); // if there were duplicates, this should barf
451 duarte 93 System.out.println("Done with updating key_master database.");
452 duarte 85 }
453    
454     /**
455     * To create a new key master table for destination db in the key_master database given a key and table. Used by insertIdsToKeyMaster
456     * Eventually this method on other key_master related should go into their own class, shouldn't they?
457     * @param key
458     * @param table
459     * @return the name of the key master table created
460 duarte 202 * @throws SQLException
461 duarte 85 */
462 duarte 202 public String createNewKeyMasterTbl(String key,String table) throws SQLException {
463 duarte 93 // find out whether key is numeric or text and setting accordingly query strings
464 duarte 110 String nodes[] = DataDistribution.getMySQLNodes(); // we need the list of nodes only to get one of them no matter which
465 duarte 93 MySQLConnection conn=new MySQLConnection(nodes[0],user,pwd,destDb); // here we connect to destDb in one node, needed to getColumnType
466     String colType = conn.getColumnType(table,key);
467     String autoIncr = "";
468     if (colType.contains("int")){
469     autoIncr = "auto_increment";
470     }
471     conn.close();
472     // key master table name and connection to key master db
473 duarte 85 String keyMasterTbl=destDb+"__"+table;
474 duarte 93 conn=this.getConnectionToMasterKeyDb();
475 duarte 85 try {
476     String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
477 duarte 93 key+" "+colType+" NOT NULL "+autoIncr+", " +
478     "client_id smallint(6) NOT NULL default '0', " +
479     "PRIMARY KEY (`"+key+"`) " +
480     ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
481 duarte 85 Statement S=conn.createStatement();
482     S.executeUpdate(query);
483     S.close();
484     } catch (SQLException e) {
485     System.err.println("Couldn't create table "+keyMasterTbl);
486     e.printStackTrace();
487     }
488     try {
489     Statement S=conn.createStatement();
490 duarte 93 String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+destDb+"\',\'"+keyMasterTbl+"\');";
491 duarte 85 S.executeUpdate(query);
492     S.close();
493     } catch (SQLException e) {
494     System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
495     System.err.println("SQLException: " + e.getMessage());
496     }
497     conn.close();
498     return keyMasterTbl;
499     }
500    
501 duarte 202 public void removePK (String keyMasterTbl,String key) throws SQLException{
502 duarte 93 MySQLConnection conn=this.getConnectionToMasterKeyDb();
503     boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
504     String colType = conn.getColumnType(keyMasterTbl,key);
505 duarte 85 try {
506 duarte 93 if (isNumeric){ // removing the auto_increment, only in numeric keys
507     String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL;";
508     conn.executeSql(query);
509     }
510     // removing primary key (same sql code for both numeric or text keys
511     String query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
512 duarte 85 conn.executeSql(query);
513     } catch (SQLException e) {
514     e.printStackTrace();
515     }
516     conn.close();
517     }
518    
519 duarte 202 public void addPK (String keyMasterTbl, String key) throws SQLException{
520 duarte 93 MySQLConnection conn=this.getConnectionToMasterKeyDb();
521     boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
522     String colType = conn.getColumnType(keyMasterTbl,key);
523 duarte 85 try {
524 duarte 93 // adding primary key (same sql code for both numeric or text keys
525 duarte 85 String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
526     conn.executeSql(query);
527 duarte 93 if (isNumeric){ // adding auto_increment, only in numeric keys
528     query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL auto_increment;";
529     conn.executeSql(query);
530     }
531 duarte 85 } catch (SQLException e) {
532     e.printStackTrace();
533     }
534     conn.close();
535     }
536    
537 duarte 202 public void removeZeros (String keyMasterTbl, String key) throws SQLException{
538 duarte 85 MySQLConnection conn=this.getConnectionToMasterKeyDb();
539     try {
540 duarte 93 // attention! the quotes around 0 are very important, otherwise if key is text-based all records get deleted
541     // mysql somehow considers all text records = 0, using '0' is ok as is considered as a text literal for char fields and as a number for int fields
542     String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"='0';";
543 duarte 85 conn.executeSql(query);
544     } catch (SQLException e) {
545     e.printStackTrace();
546     }
547     conn.close();
548     }
549    
550     /**
551     * Executes a query in all nodes in cluster.
552     * Not in use at the moment
553     * TODO Right now it is serial, must parallelize this with threads
554     * @param query
555     */
556     public void clusterExecuteQuery(String query){
557 duarte 110 String[] nodes = DataDistribution.getMySQLNodes();
558 duarte 85 for (String node: nodes){
559     try {
560     MySQLConnection conn = this.getConnectionToNode(node);
561     conn.executeSql(query);
562     conn.close();
563     }
564     catch(SQLException e){
565     e.printStackTrace();
566     System.err.println("Couldn't execute query="+query+", in node="+node);
567     System.exit(1);
568     }
569     }
570     }
571    
572     /**
573     * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
574     * Not in use at the moment
575     * TODO Right now it is serial, must parallelize this with threads
576     * TODO This can be used in the load/dump methods in this class where queries are different for each node
577     * @param queries a HashMap containing a query per node
578     */
579     public void clusterExecuteQuery(HashMap<String,String> queries){
580 duarte 110 String[] nodes = DataDistribution.getMySQLNodes();
581 duarte 85 for (String node: nodes){
582     String query="";
583     try {
584     query=queries.get(node);
585     MySQLConnection conn = this.getConnectionToNode(node);
586     conn.executeSql(query);
587     conn.close();
588     }
589     catch(SQLException e){
590     e.printStackTrace();
591     System.err.println("Couldn't execute query="+query+", in node="+node);
592     System.exit(1);
593     }
594     }
595     }
596    
597     }

Properties

Name Value
svn:executable *