ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistributer.java
Revision: 110
Committed: Tue Sep 19 13:04:55 2006 UTC (18 years ago) by duarte
File size: 22855 byte(s)
Log Message:
New class MySQLConnectionCheck, to check whether MySQL servers are alive, uses threads
Changes in DataDistribution:
- new field: String[] nodes, initialised in constructor
- changed constructor to check for nodes that are alive, using new method nodesAlive
- renamed getNodes to getMySQLNodes. New method getNodes just returns the new field String[] nodes
DataDistributer: renamed calls to DataDistribution's getNodes to getMySQLNodes
Line User Rev File contents
1 duarte 85 package tools;
2    
3 duarte 102 import java.io.BufferedReader;
4 duarte 85 import java.io.File;
5 duarte 102 import java.io.IOException;
6     import java.io.InputStreamReader;
7 duarte 85 import java.sql.SQLException;
8     import java.sql.Statement;
9     import java.util.HashMap;
10    
11     public class DataDistributer {
12    
13     public final static String GLOBALDIR="/project/snow/global/tmp";
14     public final static String MASTER = DataDistribution.MASTER;
15     public final static String KEYMASTERDB = DataDistribution.KEYMASTERDB;
16    
17 duarte 86 // Following parameters are optimized after some testing. I found these to be good values
18     // They represent the number of nodes reading/writing from/to nfs concurrently when loading/dumping
19     // 1 means no concurrency at all. Higher numbers mean more concurrency i.e. more nodes writing/reading to/from nfs concurrently
20     final static int NUM_CONCURRENT_READ_QUERIES = 9;
21     final static int NUM_CONCURRENT_WRITE_QUERIES = 6;
22 duarte 87 // Following parameter only for concurrent writing queries in same host. This number has to be low
23     // as the only limitation is the i/o capacity of the host.
24     final static int NUM_CONCURRENT_SAMEHOST_QUERIES = 3;
25 duarte 86
26 duarte 85 String dumpdir;
27     String srcDb;
28     String destDb;
29     String user;
30     String pwd;
31     boolean debug = false;
32 duarte 104 String rmvtmp = "force"; // 3 possible values: force, noremove and prompt
33 duarte 85
34     /**
35     * Construct DataDistributer object passing only source db. The destination db will be same as source.
36     * @param srcDb
37     * @param user
38     * @param pwd
39     */
40     public DataDistributer(String srcDb, String user, String pwd) {
41     this.srcDb=srcDb;
42     this.destDb=srcDb;
43     this.user=user;
44     this.pwd=pwd;
45     initializeDirs();
46     }
47    
48     /**
49     * Construct DataDistributer object passing both source db and destination db
50     * @param srcDb
51     * @param destDb
52     * @param user
53     * @param pwd
54     */
55     public DataDistributer(String srcDb, String destDb, String user, String pwd) {
56     this.srcDb=srcDb;
57     this.destDb=destDb;
58     this.user=user;
59     this.pwd=pwd;
60     initializeDirs();
61     }
62    
63     public void setDebug(boolean debug){
64     this.debug=debug;
65     }
66    
67 duarte 104 public void setRmv(String rmvtmp){
68     this.rmvtmp=rmvtmp;
69 duarte 102 }
70    
71 duarte 89 public void setDumpDir(String dumpdir){
72 duarte 104 // before reseting the dumpdir, we get rid of the existent dumpdir created when this DataDistributer object was constructed
73     // in the case where we set the dumpdir using setDumpDir clearly the randomly named dumpdir created while constructing is not needed
74     SystemCmd.exec("rmdir "+this.dumpdir);
75 duarte 89 this.dumpdir=dumpdir;
76     }
77    
78 duarte 85 public void initializeDirs() {
79     dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
80     if (!((new File(dumpdir)).mkdir())) {
81     System.err.println("Couldn't create directory "+dumpdir);
82     System.exit(1);
83     }
84     SystemCmd.exec("chmod go+rw "+dumpdir);
85     }
86    
87 duarte 86 public void initializeDirs(String[] hosts){
88     for (String host: hosts) {
89     if (!((new File(dumpdir+"/"+host+"/"+srcDb)).mkdirs())) {
90     System.err.println("Couldn't create directory "+dumpdir+"/"+host+"/"+srcDb);
91     System.exit(1);
92     }
93     SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+host);
94     }
95     }
96    
97 duarte 85 public void finalizeDirs() {
98     if (debug) {
99     System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
100     } else {
101 duarte 104 if (rmvtmp.equals("force")) {
102 duarte 102 System.out.println("Removing temporary directory "+dumpdir);
103     //TODO must capture exit state and print to error if problems deleting dir
104     SystemCmd.exec("rm -rf "+dumpdir);
105 duarte 104 } else if (rmvtmp.equals("prompt")){
106 duarte 102 System.out.println("Would you like to remove the temporary data directory '"+dumpdir+"' ? (y/n)");
107     BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
108     try {
109     String answer = br.readLine();
110     if (answer.equals("y") || answer.equals("yes") || answer.equals("Y") || answer.equals("YES") || answer.equals("Yes")) {
111     System.out.println("Removing temporary directory "+dumpdir);
112     SystemCmd.exec("rm -rf "+dumpdir);
113     } else {
114     System.out.println("Temporary directory "+dumpdir+" was not removed.");
115     }
116     } catch (IOException e) {
117     System.err.println("I/O error while reading user input.");
118     e.printStackTrace();
119     System.exit(2);
120     }
121 duarte 104 } else if (rmvtmp.equals("noremove")){
122     System.out.println("Temporary directory "+dumpdir+" was not removed.");
123 duarte 102 }
124 duarte 85 }
125     }
126    
127     private MySQLConnection getConnectionToNode(String node) {
128     MySQLConnection conn=new MySQLConnection(node,user,pwd,srcDb);
129     return conn;
130     }
131    
132     private MySQLConnection getConnectionToMaster() {
133     MySQLConnection conn=this.getConnectionToNode(MASTER);
134     return conn;
135     }
136    
137     private MySQLConnection getConnectionToMasterKeyDb() {
138     MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
139     return conn;
140     }
141    
142     public void dumpData(String[] srchosts, String[] tables) {
143 duarte 86 int concurrentQueries = 1;
144     if (srchosts.length>1) {
145     concurrentQueries = NUM_CONCURRENT_WRITE_QUERIES;
146     }
147     int i = 0;
148     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
149     initializeDirs(srchosts);
150     for ( String tbl: tables) {
151     for (String srchost: srchosts) {
152 duarte 85 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
153     String wherestr="";
154     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
155     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
156     else {
157 duarte 86 if (i!=0 && i%(concurrentQueries) == 0) {
158     try {
159     for (int j = 0;j<qtGroup.length;j++){
160     qtGroup[j].join(); // wait until previous thread group is finished
161     }
162     } catch (InterruptedException e) {
163     e.printStackTrace();
164     }
165     i = 0;
166     qtGroup = new QueryThread[concurrentQueries];
167     }
168     qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
169 duarte 85 } //end else if debug
170 duarte 86 i++;
171     } // end foreach srchost
172     } // end foreach table
173     try {
174     for (int j = 0;j<qtGroup.length;j++){
175     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
176     qtGroup[j].join(); // wait until the last thread group is finished
177     }
178     }
179     } catch (InterruptedException e) {
180     e.printStackTrace();
181     }
182 duarte 85 System.out.println ("Dump finished.");
183     }
184 duarte 86
185 duarte 93 public <T> void dumpSplitData(String srchost, String[] tables, String key, HashMap<String,T[]> idSets) {
186 duarte 87 int concurrentQueries = NUM_CONCURRENT_SAMEHOST_QUERIES;
187     int i = 0;
188     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
189     String[] srchosts = {srchost};
190     initializeDirs(srchosts);
191     for ( String tbl: tables) {
192 duarte 93 for (String node:idSets.keySet()) {
193 duarte 87 String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+"_split_"+node+".txt";
194 duarte 93 String wherestr="";
195     // if number of ids for the key was less than number of nodes, some of the node slots will be empty. We check for those before trying to access the array
196     if (idSets.get(node).length>0) {
197     T idmin= idSets.get(node)[0];
198     T idmax= idSets.get(node)[idSets.get(node).length-1];
199     wherestr="WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"'";
200     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
201     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
202     else {
203     if (i!=0 && i%(concurrentQueries) == 0) {
204     try {
205     for (int j = 0;j<qtGroup.length;j++){
206     qtGroup[j].join(); // wait until previous thread group is finished
207     }
208     } catch (InterruptedException e) {
209     e.printStackTrace();
210 duarte 87 }
211 duarte 93 i = 0;
212     qtGroup = new QueryThread[concurrentQueries];
213     }
214     qtGroup[i] = new QueryThread(sqldumpstr,srchost,user,pwd,srcDb);
215     } //end else if debug
216     i++;
217     }
218     } // end foreach node
219 duarte 87 } // end foreach table
220     try {
221     for (int j = 0;j<qtGroup.length;j++){
222     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
223     qtGroup[j].join(); // wait until the last thread group is finished
224     }
225     }
226     } catch (InterruptedException e) {
227     e.printStackTrace();
228     }
229     System.out.println ("Dump finished.");
230     }
231    
232 duarte 85 public void loadData(String[] srchosts, String[] desthosts,String[] tables) {
233 duarte 86 int concurrentQueries = 1;
234     if (srchosts.length==1 && desthosts.length>1) {
235     concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
236     }
237     int i = 0;
238     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
239     for (String srchost:srchosts){
240     for (String tbl:tables) {
241     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
242     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
243     for (String desthost: desthosts) {
244     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
245     else {
246     if (i!=0 && i%(concurrentQueries) == 0) {
247     try {
248     for (int j = 0;j<qtGroup.length;j++){
249     qtGroup[j].join(); // wait until previous thread group is finished
250     }
251     } catch (InterruptedException e) {
252     e.printStackTrace();
253     }
254     i = 0;
255     qtGroup = new QueryThread[concurrentQueries];
256     }
257     qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
258     }
259     i++;
260     } // end foreach desthost
261     } // end foreach tbl
262     } // end foreach srchost
263     try {
264     for (int j = 0;j<qtGroup.length;j++){
265     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
266     qtGroup[j].join(); // wait until the last thread group is finished
267     }
268     }
269     } catch (InterruptedException e) {
270     e.printStackTrace();
271     }
272 duarte 85 System.out.println ("Load finished.");
273     finalizeDirs();
274     }
275    
276     public void loadSplitData(String srchost, String[] desthosts, String tableName) {
277 duarte 86 int concurrentQueries = 1;
278     if (desthosts.length>1) {
279     concurrentQueries = NUM_CONCURRENT_READ_QUERIES;
280     }
281     int i = 0;
282     QueryThread[] qtGroup = new QueryThread[concurrentQueries];
283 duarte 85 for (String desthost:desthosts) {
284     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tableName+"_split_"+desthost+".txt";
285 duarte 93 // we test first if dumpfile exists, if not all the slots (nodes) are filled then some nodes won't have to load anything and the files won't be there because dumpSplitData didn't create them
286     if ((new File(dumpfile)).exists()) {
287     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
288     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
289     else {
290     if (i!=0 && i%(concurrentQueries) == 0) {
291     try {
292     for (int j = 0;j<qtGroup.length;j++){
293     qtGroup[j].join(); // wait until previous thread group is finished
294     }
295     } catch (InterruptedException e) {
296     e.printStackTrace();
297 duarte 86 }
298 duarte 93 i = 0;
299     qtGroup = new QueryThread[concurrentQueries];
300     }
301     qtGroup[i] = new QueryThread(sqlloadstr,desthost,user,pwd,destDb);
302     } // end else if debug
303     i++;
304     }
305 duarte 86 } // end foreach desthosts
306     try {
307     for (int j = 0;j<qtGroup.length;j++){
308     if (qtGroup[j]!=null){ // some slots of the array may be null, check for those before trying the join
309     qtGroup[j].join(); // wait until the last thread group is finished
310 duarte 85 }
311 duarte 86 }
312     } catch (InterruptedException e) {
313     e.printStackTrace();
314     }
315 duarte 85 System.out.println ("Load finished.");
316     finalizeDirs();
317     }
318    
319     /**
320 duarte 94 * For a certain key (text or numeric) and table returns a "data distribution" (kind of evenly distributed) of the data to the nodes
321 duarte 85 * To be used when we have a table that we are going to split to the nodes
322     * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
323 duarte 94 * @param key
324 duarte 85 * @param table
325 duarte 94 * @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node
326 duarte 85 */
327 duarte 94 public HashMap<String,Object[]> splitIdsIntoSets(String key, String table){
328     HashMap<String,Object[]> idSets =new HashMap<String,Object[]>();
329 duarte 110 String[] nodes=DataDistribution.getMySQLNodes();
330 duarte 85 int numNodes=nodes.length;
331     MySQLConnection conn = this.getConnectionToMaster();
332 duarte 94 Object[] allIds=conn.getAllIds4KeyAndTable(key,table);
333 duarte 85 conn.close();
334     int numIds=allIds.length;
335     int setSize=numIds/numNodes;
336     int remainder=numIds%numNodes;
337     for (int i=0;i<numNodes;i++){
338     if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
339 duarte 94 Object[] thisnodeidset=new Object[setSize+1];
340 duarte 85 for (int j=0;j<thisnodeidset.length;j++){
341     thisnodeidset[j]=allIds[j+i*(setSize+1)];
342     }
343     idSets.put(nodes[i],thisnodeidset);
344     } else { // for the rest we put only setSize ids
345 duarte 94 Object[] thisnodeidset=new Object[setSize];
346 duarte 85 for (int j=0;j<thisnodeidset.length;j++){
347     thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
348     }
349     idSets.put(nodes[i],thisnodeidset);
350     }
351     }
352     return idSets;
353     }
354    
355     /**
356     * To split a given table in chunks based on a key, split tables remain in same database and server
357     * @param key
358     * @param table
359     */
360     public void splitTable (String key,String table){
361 duarte 110 String[] nodes=DataDistribution.getMySQLNodes();
362 duarte 93 MySQLConnection conn=this.getConnectionToMaster();
363     String[] splitTables = new String[nodes.length]; // we create an array that will contain the name of all split tables
364     String[] indexes=conn.getAllIndexes4Table(table);
365 duarte 85 try {
366 duarte 93 // we create split tables and drop indexes before inserting
367     for (int i=0;i<nodes.length;i++) {
368     String splitTbl=table+"_split_"+nodes[i];
369     splitTables[i]=splitTbl; // will be used later while looping over splitTables
370 duarte 85 // we create permanent tables
371 duarte 93 String query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
372 duarte 85 conn.executeSql(query);
373     // drop the indexes if there was any, indexes will slow down the creation of split tables
374     for (String index:indexes) {
375     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
376     }
377 duarte 94 }
378     HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
379     for (int i=0;i<nodes.length;i++) {
380     Object idmin=idSets.get(nodes[i])[0];
381     Object idmax=idSets.get(nodes[i])[idSets.get(nodes[i]).length-1];
382     String query="INSERT INTO "+splitTables[i]+" SELECT * FROM "+table+" WHERE "+key+">='"+idmin+"' AND "+key+"<='"+idmax+"';";
383     conn.executeSql(query);
384     //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
385     }
386 duarte 85 }
387     catch (SQLException e){
388     e.printStackTrace();
389     }
390 duarte 93 conn.close();
391 duarte 85 }
392    
393     /**
394     * To split a given table in chunks based on a key, split tables go to different nodes of cluster
395     * @param key
396     * @param table
397     */
398     public DataDistribution splitTableToCluster (String key,String table){
399 duarte 93 System.out.println("Splitting table "+table+" to cluster based on key "+key+"...");
400 duarte 87 String[] tables={table};
401 duarte 110 String[] desthosts=DataDistribution.getMySQLNodes();
402 duarte 94 HashMap<String,Object[]> idSets = this.splitIdsIntoSets(key,table);
403     // dumping data with the dumpSplitData method, a modified version of dumpData
404     dumpSplitData(MASTER,tables,key,idSets);
405     // putting the ids in the key_master database so we keep track of where everything is
406     insertIdsToKeyMaster(key,table,idSets);
407 duarte 93 // using here loadSplitData rather than loadData because table names are not the same on source and destination,
408     // i.e. source: table_split_tla01, dest: table
409 duarte 87 loadSplitData(MASTER,desthosts,table);
410 duarte 85 DataDistribution dataDist = new DataDistribution(destDb,user,pwd);
411 duarte 93 System.out.println("Done with splitting.");
412 duarte 85 return dataDist;
413     }
414    
415     /**
416     * Insert all ids to the key_master database creating a new table for this destDb/given table combination if not exists
417     * @param key name of key on which distribution of table is based
418     * @param table name of table that we are distributing
419     * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes from a DataDistribution object
420     */
421 duarte 93 public <T> void insertIdsToKeyMaster(String key,String table,HashMap<String,T[]> idSets) {
422     System.out.println("Updating key_master database with ids to nodes mapping...");
423 duarte 85 MySQLConnection conn = this.getConnectionToMasterKeyDb();
424     String keyMasterTbl = createNewKeyMasterTbl(key,table);
425     removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
426 duarte 96 // getting first mapping between nodes names and node ids
427     HashMap<String,Integer> nodes2nodeids = new HashMap<String,Integer>();
428 duarte 85 for (String node:idSets.keySet()){
429 duarte 96 String query="SELECT client_id FROM clients_names WHERE client_name='"+node+"';";
430     int id = conn.getIntFromDb(query);
431     nodes2nodeids.put(node,id);
432     }
433     for (String node:idSets.keySet()){
434 duarte 93 T[] thisNodeIds=idSets.get(node);
435     for (T id:thisNodeIds){
436 duarte 96 String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) VALUES ('"+id+"',"+nodes2nodeids.get(node)+");";
437 duarte 93 try {
438     conn.executeSql(query);
439     } catch (SQLException e) {
440     e.printStackTrace();
441     }
442 duarte 85 }
443     }
444 duarte 93 conn.close();
445 duarte 85 removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
446     addPK(keyMasterTbl,key); // if there were duplicates, this should barf
447 duarte 93 System.out.println("Done with updating key_master database.");
448 duarte 85 }
449    
450     /**
451     * To create a new key master table for destination db in the key_master database given a key and table. Used by insertIdsToKeyMaster
452     * Eventually this method on other key_master related should go into their own class, shouldn't they?
453     * @param key
454     * @param table
455     * @return the name of the key master table created
456     */
457     public String createNewKeyMasterTbl(String key,String table) {
458 duarte 93 // find out whether key is numeric or text and setting accordingly query strings
459 duarte 110 String nodes[] = DataDistribution.getMySQLNodes(); // we need the list of nodes only to get one of them no matter which
460 duarte 93 MySQLConnection conn=new MySQLConnection(nodes[0],user,pwd,destDb); // here we connect to destDb in one node, needed to getColumnType
461     String colType = conn.getColumnType(table,key);
462     String autoIncr = "";
463     if (colType.contains("int")){
464     autoIncr = "auto_increment";
465     }
466     conn.close();
467     // key master table name and connection to key master db
468 duarte 85 String keyMasterTbl=destDb+"__"+table;
469 duarte 93 conn=this.getConnectionToMasterKeyDb();
470 duarte 85 try {
471     String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
472 duarte 93 key+" "+colType+" NOT NULL "+autoIncr+", " +
473     "client_id smallint(6) NOT NULL default '0', " +
474     "PRIMARY KEY (`"+key+"`) " +
475     ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
476 duarte 85 Statement S=conn.createStatement();
477     S.executeUpdate(query);
478     S.close();
479     } catch (SQLException e) {
480     System.err.println("Couldn't create table "+keyMasterTbl);
481     e.printStackTrace();
482     }
483     try {
484     Statement S=conn.createStatement();
485 duarte 93 String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+destDb+"\',\'"+keyMasterTbl+"\');";
486 duarte 85 S.executeUpdate(query);
487     S.close();
488     } catch (SQLException e) {
489     System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
490     System.err.println("SQLException: " + e.getMessage());
491     }
492     conn.close();
493     return keyMasterTbl;
494     }
495    
496     public void removePK (String keyMasterTbl,String key){
497 duarte 93 MySQLConnection conn=this.getConnectionToMasterKeyDb();
498     boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
499     String colType = conn.getColumnType(keyMasterTbl,key);
500 duarte 85 try {
501 duarte 93 if (isNumeric){ // removing the auto_increment, only in numeric keys
502     String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL;";
503     conn.executeSql(query);
504     }
505     // removing primary key (same sql code for both numeric or text keys
506     String query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
507 duarte 85 conn.executeSql(query);
508     } catch (SQLException e) {
509     e.printStackTrace();
510     }
511     conn.close();
512     }
513    
514     public void addPK (String keyMasterTbl, String key){
515 duarte 93 MySQLConnection conn=this.getConnectionToMasterKeyDb();
516     boolean isNumeric = conn.isKeyNumeric(keyMasterTbl,key);
517     String colType = conn.getColumnType(keyMasterTbl,key);
518 duarte 85 try {
519 duarte 93 // adding primary key (same sql code for both numeric or text keys
520 duarte 85 String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
521     conn.executeSql(query);
522 duarte 93 if (isNumeric){ // adding auto_increment, only in numeric keys
523     query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" "+colType+" NOT NULL auto_increment;";
524     conn.executeSql(query);
525     }
526 duarte 85 } catch (SQLException e) {
527     e.printStackTrace();
528     }
529     conn.close();
530     }
531    
532     public void removeZeros (String keyMasterTbl, String key){
533     MySQLConnection conn=this.getConnectionToMasterKeyDb();
534     try {
535 duarte 93 // attention! the quotes around 0 are very important, otherwise if key is text-based all records get deleted
536     // mysql somehow considers all text records = 0, using '0' is ok as is considered as a text literal for char fields and as a number for int fields
537     String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"='0';";
538 duarte 85 conn.executeSql(query);
539     } catch (SQLException e) {
540     e.printStackTrace();
541     }
542     conn.close();
543     }
544    
545     /**
546     * Executes a query in all nodes in cluster.
547     * Not in use at the moment
548     * TODO Right now it is serial, must parallelize this with threads
549     * @param query
550     */
551     public void clusterExecuteQuery(String query){
552 duarte 110 String[] nodes = DataDistribution.getMySQLNodes();
553 duarte 85 for (String node: nodes){
554     try {
555     MySQLConnection conn = this.getConnectionToNode(node);
556     conn.executeSql(query);
557     conn.close();
558     }
559     catch(SQLException e){
560     e.printStackTrace();
561     System.err.println("Couldn't execute query="+query+", in node="+node);
562     System.exit(1);
563     }
564     }
565     }
566    
567     /**
568     * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
569     * Not in use at the moment
570     * TODO Right now it is serial, must parallelize this with threads
571     * TODO This can be used in the load/dump methods in this class where queries are different for each node
572     * @param queries a HashMap containing a query per node
573     */
574     public void clusterExecuteQuery(HashMap<String,String> queries){
575 duarte 110 String[] nodes = DataDistribution.getMySQLNodes();
576 duarte 85 for (String node: nodes){
577     String query="";
578     try {
579     query=queries.get(node);
580     MySQLConnection conn = this.getConnectionToNode(node);
581     conn.executeSql(query);
582     conn.close();
583     }
584     catch(SQLException e){
585     e.printStackTrace();
586     System.err.println("Couldn't execute query="+query+", in node="+node);
587     System.exit(1);
588     }
589     }
590     }
591    
592     }

Properties

Name Value
svn:executable *