ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistribution.java
Revision: 66
Committed: Fri Apr 7 09:31:08 2006 UTC (18 years, 5 months ago) by duarte
File size: 20736 byte(s)
Log Message:
Made MASTER final static variable also public. It is in use by other classes.
Line User Rev File contents
1 duarte 65 package tools;
2    
3     import java.io.*;
4     import java.sql.*;
5     import java.util.ArrayList;
6     import java.util.HashMap;
7    
8     public class DataDistribution {
9    
10     final static String GLOBALDIR="/project/snow/global/tmp";
11     final static String ADMINDIR="/project/StruPPi/Cluster/admin";
12 duarte 66 public final static String MASTER="white";
13 duarte 65 final static String HFILE=ADMINDIR+"/hosts_ss.txt";
14     final static String KEYMASTERDB="key_master";
15    
16     boolean debug=false; // if set to true only mysql commands written, no actual dump or load, also dump directory not removed. Use setDebug method to change it
17     String dumpdir;
18     private String db;
19     private String user;
20     private String pwd;
21    
22    
23     public DataDistribution(String db,String user,String pwd) {
24     this.dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
25     this.db=db;
26     this.user=user;
27     this.pwd=pwd;
28     }
29    
30     private MySQLConnection getConnectionToMaster() {
31     MySQLConnection conn=this.getConnectionToNode(MASTER);
32     return conn;
33     }
34    
35     private MySQLConnection getConnectionToMasterKeyDb() {
36     MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
37     return conn;
38     }
39    
40     private MySQLConnection getConnectionToNode(String node) {
41     MySQLConnection conn=new MySQLConnection(node,user,pwd,db);
42     return conn;
43     }
44    
45     private MySQLConnection getConnectionToNode(String node, String dbName){
46     MySQLConnection conn=new MySQLConnection(node,user,pwd,dbName);
47     return conn;
48     }
49    
50     public void setDebug(boolean debug) {
51     this.debug=debug;
52     }
53    
54     public static String[] getNodes() {
55     String[] nodes=null;
56     try {
57     File inputFile = new File(HFILE);
58     BufferedReader hostsFile = new BufferedReader(new FileReader(inputFile)); // open BufferedReader to the file
59     String nodesstr=hostsFile.readLine();
60     nodes=nodesstr.split(" ");
61     hostsFile.close();
62     }
63     catch (IOException e){
64     e.printStackTrace();
65     System.err.println("Couldn't read from file "+HFILE);
66     }
67     return nodes;
68     }
69    
70     public String[] getTables4Db(){
71     String[] tables=null;
72     ArrayList<String> tablesAL=new ArrayList<String>();
73     String query="SELECT table_name FROM information_schema.tables WHERE table_schema='"+db+"' ORDER BY table_name DESC;";
74     try {
75     MySQLConnection conn = this.getConnectionToMaster();
76     Statement S=conn.createStatement();
77     ResultSet R=S.executeQuery(query);
78     while (R.next()){
79     tablesAL.add(R.getString(1));
80     }
81     S.close();
82     R.close();
83     conn.close();
84     tables=new String[tablesAL.size()];
85     for (int i=0;i<tablesAL.size();i++) {
86     tables[i]=tablesAL.get(i);
87     }
88     }
89     catch(SQLException e){
90     e.printStackTrace();
91     System.err.println("Couldn't get table names from "+MASTER+" for db="+db);
92     System.exit(1);
93     }
94     return tables;
95     }
96    
97     public void initializeDirs() {
98     if (!((new File(dumpdir)).mkdir())) {
99     System.err.println("Couldn't create directory "+dumpdir);
100     System.exit(1);
101     }
102     SystemCmd.exec("chmod go+rw "+dumpdir);
103     }
104    
105     public void finalizeDirs() {
106     if (debug) {
107     System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
108     } else {
109     System.out.println("Removing temporary directory "+dumpdir);
110     //TODO must capture exit state and print to error if problems deleting dir
111     SystemCmd.exec("rm -rf "+dumpdir);
112     }
113     }
114    
115     public void dumpData(String[] srchosts, String[] tables) {
116     // initialising temporary dump directory
117     initializeDirs();
118     for (String node: srchosts) {
119     dumpData(node,tables);
120     }
121     System.out.println ("Dump finished.");
122     }
123    
124     private void dumpData(String srchost, String[] tables) {
125     //String quickpar="--quick"; // not to buffer to memory, needed for big tables
126     if (!((new File(dumpdir+"/"+srchost+"/"+db)).mkdirs())) {
127     System.err.println("Couldn't create directory "+dumpdir+"/"+srchost+"/"+db);
128     System.exit(1);
129     }
130     SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+srchost);
131     for ( String tbl: tables) {
132     String outfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt";
133     String wherestr="";
134     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
135     //String dumpstr="$MYSQLDIR/bin/mysql $srcconnpar $quickpar -e \"$sqldumpstr\" ";
136     if (debug) {System.out.println ("HOST="+srchost+", sqldumpstr="+sqldumpstr);}
137     else {
138     try {
139     MySQLConnection conn = this.getConnectionToNode(srchost);
140     Statement S=conn.createStatement();
141     S.executeQuery(sqldumpstr);
142     System.out.println ("Dumped from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile);
143     S.close();
144     conn.close();
145     }
146     catch(SQLException e){
147     e.printStackTrace();
148     System.err.println("Couldn't dump from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile);
149     System.exit(1);
150     }
151     } //end else if debug
152     } // end foreach tbl
153     }
154    
155     public void loadData(String[] srchosts, String[] desthosts,String[] tables, String destDb) {
156     for (String desthost: desthosts) {
157     for (String srchost: srchosts) {
158     loadData(srchost,desthost,tables,destDb);
159     }
160     }
161     System.out.println ("Load finished.");
162     finalizeDirs();
163     }
164    
165     private void loadData(String srchost, String desthost,String[] tables, String destDb) {
166     for (String tbl:tables) {
167     String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt";
168     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
169     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);}
170     else {
171     try {
172     MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
173     conn.executeSql(sqlloadstr);
174     System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", into database="+destDb+", table="+tbl);
175     conn.close();
176     }
177     catch(SQLException e){
178     e.printStackTrace();
179     System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tbl);
180     System.exit(1);
181     }
182     }
183     } // end foreach tbl
184     }
185    
186     private void loadSplitData(String srchost, String[] desthosts, String tableName, String destDb) {
187     for (String desthost:desthosts) {
188     String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tableName+"_split_"+desthost+".txt";
189     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
190     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);}
191     else {
192     try {
193     MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
194     conn.executeSql(sqlloadstr);
195     System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", database="+db+", table="+tableName);
196     conn.close();
197     }
198     catch(SQLException e){
199     e.printStackTrace();
200     System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tableName);
201     System.exit(1);
202     }
203     }
204     } // end foreach desthosts
205     System.out.println ("Load finished.");
206     finalizeDirs();
207     }
208    
209     public boolean checkCountsAllTables (){
210     boolean checkResult=true;
211     String[] nodes = getNodes();
212     String[] tables = getTables4Db();
213     // getting hashmap of all counts from all tables from nodes
214     HashMap<String,HashMap<String,Integer>> countsNodes=new HashMap<String,HashMap<String,Integer>>();
215     for (String node:nodes){
216     HashMap<String,Integer> tableCounts = new HashMap<String,Integer>();
217     for (String tbl:tables){
218     String query="SELECT count(*) FROM "+tbl+";";
219     try {
220     MySQLConnection conn = this.getConnectionToNode(node);
221     Statement S=conn.createStatement();
222     ResultSet R=S.executeQuery(query);
223     if (R.next()){
224     tableCounts.put(tbl,R.getInt(1));
225     }
226     S.close();
227     R.close();
228     conn.close();
229     }
230     catch(SQLException e){
231     e.printStackTrace();
232     System.err.println("Couldn't execute query in host="+node+", database="+db);
233     System.exit(1);
234     }
235     }
236     countsNodes.put(node,tableCounts);
237     }
238     // getting hashmap of all counts of all tables from master
239     HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
240     for (String tbl:tables){
241     String query="SELECT count(*) FROM "+tbl+";";
242     try {
243     MySQLConnection conn = this.getConnectionToMaster();
244     Statement S=conn.createStatement();
245     ResultSet R=S.executeQuery(query);
246     if (R.next()){
247     countsMaster.put(tbl,R.getInt(1));
248     }
249     S.close();
250     R.close();
251     conn.close();
252     }
253     catch(SQLException e){
254     e.printStackTrace();
255     System.err.println("Couldn't execute query in host="+MASTER+", database="+db);
256     System.exit(1);
257     }
258     }
259     // comparing the nodes counts with the master counts
260     for (String tbl:countsMaster.keySet()){
261     int masterCount=countsMaster.get(tbl);
262     for (String node:countsNodes.keySet()){
263     int thisNodeCount=countsNodes.get(node).get(tbl);
264     if (masterCount!=thisNodeCount) {
265     System.out.println("Count difers for table "+tbl+" in database "+db+" of node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount);
266     checkResult=false; // if one count difers then the check fails, we return false for checkResult. If no difers at all we return true
267     }
268     else {
269     System.out.println("Count check passed for node "+node+", table "+tbl);
270     }
271    
272     }
273     }
274     return checkResult;
275     } // end checkCounts
276    
277     /**
278     * To check the key counts in master and nodes for a certain key.
279     * @param key the name of the key
280     * @return boolean: true if check passed, false if not passed
281     */
282     public boolean checkKeyCounts(String key) {
283     boolean checkResult=true;
284     String keyTable=key+"_list";
285     String masterKeyTable=db+"_"+key+"_list_master";
286     String keyColumn =key+"_id";
287     // getting hashmap of counts of keys from nodes
288     String[] nodes = getNodes();
289     HashMap<String,int[]> countsNodes=new HashMap<String,int[]>();
290     String query="SELECT count("+keyColumn+"),count(DISTINCT "+keyColumn+") FROM "+keyTable+";";
291     for (String node:nodes){
292     try {
293     MySQLConnection conn = this.getConnectionToNode(node);
294     Statement S=conn.createStatement();
295     ResultSet R=S.executeQuery(query);
296     int[] thisNodeKeyCount=new int[2];
297     if (R.next()){
298     thisNodeKeyCount[0]=R.getInt(1);
299     thisNodeKeyCount[1]=R.getInt(2);
300     }
301     countsNodes.put(node,thisNodeKeyCount);
302     S.close();
303     R.close();
304     conn.close();
305     }
306     catch(SQLException e){
307     e.printStackTrace();
308     System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
309     System.exit(1);
310     }
311     }
312     // getting hashmap of counts of keys from master
313     HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
314     for (String node:nodes){
315     String queryM="SELECT count(*) FROM "+masterKeyTable+" AS a,clients_names as c WHERE a.client_id=c.client_id AND c.client_name='"+node+"';";
316     try {
317     MySQLConnection conn = this.getConnectionToMasterKeyDb();
318     Statement S=conn.createStatement();
319     ResultSet R=S.executeQuery(queryM);
320     if (R.next()){
321     countsMaster.put(node,R.getInt(1));
322     }
323     S.close();
324     R.close();
325     conn.close();
326     }
327     catch(SQLException e){
328     e.printStackTrace();
329     System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB);
330     System.exit(1);
331     }
332     }
333     //compare the two hashmaps of key counts
334     for (String node:countsMaster.keySet()){
335     int masterCount=countsMaster.get(node);
336     int[] thisNodeCount=countsNodes.get(node);
337     if (thisNodeCount[0]!=thisNodeCount[1]) {
338     System.out.println("Key count and distinct key count do not coincide for key "+keyColumn+" in node "+node+". Key count="+thisNodeCount[0]+", distinct key count="+thisNodeCount[1]);
339     checkResult=false;
340     }
341     else if (thisNodeCount[0]!=masterCount) {
342     System.out.println("Key counts do not coincide for key "+keyColumn+" in master and node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount[0]);
343     System.out.print("Differing "+keyColumn+"'s are: ");
344     int[] diffKeys = getDifferingKeys(key,node);
345     for (int k:diffKeys){
346     System.out.print(k+" ");
347     }
348     System.out.println();
349     checkResult=false;
350     }
351     else {
352     System.out.println("Key counts check passed for key "+keyColumn+" in node "+node+". The count is: "+masterCount);
353     }
354     }
355     return checkResult;
356     }
357    
358     /**
359     * Method to get the differing keys for a certain key and node. Used by checkKeycounts method. Shouldn't be used out of this class.
360     * @param key the key name
361     * @param node the host name of the cluster node
362     * @return array of ints with all differing keys for this key and node
363     */
364     public int[] getDifferingKeys (String key,String node) {
365     ArrayList<Integer> diffKeys = new ArrayList<Integer>();
366     int[] diffKeysAr;
367     String keyTable=key+"_list";
368     String masterKeyTable=db+"_"+key+"_list_master";
369     String keyColumn=key+"_id";
370     String query="SELECT DISTINCT "+keyColumn+" FROM "+keyTable+" ORDER BY "+keyColumn+";";
371     MySQLConnection mconn=null;
372     try {
373     MySQLConnection nconn = this.getConnectionToNode(node);
374     mconn = this.getConnectionToMasterKeyDb();
375     Statement S=nconn.createStatement();
376     ResultSet R=S.executeQuery(query);
377     mconn.executeSql("CREATE TEMPORARY TABLE tmp_keys ("+keyColumn+" int(11) default NULL) ENGINE=MEMORY;");
378     int thisKey=0;
379     while (R.next()){
380     thisKey=R.getInt(1);
381     query="INSERT INTO tmp_keys VALUES ("+thisKey+");";
382     mconn.executeSql(query);
383     }
384     S.close();
385     R.close();
386     nconn.close();
387     }
388     catch(SQLException e){
389     e.printStackTrace();
390     System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
391     System.exit(1);
392     }
393     try {
394     query="SELECT c.k " +
395     "FROM " +
396     "(SELECT u.id AS k,count(u.id) AS cnt " +
397     "FROM " +
398     "(SELECT "+keyColumn+" AS id FROM tmp_keys UNION ALL SELECT kt."+keyColumn+" AS id FROM "+masterKeyTable+" AS kt LEFT JOIN clients_names AS cn ON kt.client_id=cn.client_id WHERE cn.client_name='"+node+"') AS u GROUP BY u.id) " +
399     "AS c " +
400     "WHERE c.cnt=1;";
401     Statement S=mconn.createStatement();
402     ResultSet R=S.executeQuery(query);
403     while (R.next()){
404     diffKeys.add(R.getInt(1));
405     }
406     S.close();
407     R.close();
408     }
409     catch(SQLException e){
410     e.printStackTrace();
411     System.err.println("Couldn't execute query: "+query+"in host="+MASTER+", database="+KEYMASTERDB);
412     System.exit(1);
413     }
414     diffKeysAr= new int[diffKeys.size()];
415     for (int i=0;i<diffKeys.size();i++) {
416     diffKeysAr[i]=diffKeys.get(i);
417     }
418     return diffKeysAr;
419     }
420    
421     /**
422     * Method used by splitIdsIntoSet method. To get all ordered ids from a certain key and table from this db in master server
423     * @param key the key name
424     * @param table the table name
425     * @return int array containing all ids
426     */
427     public int[] getAllIds4KeyAndTable(String key, String table){
428     int[] allIds=null;
429     String keyColumn=key+"_id";
430     try {
431     MySQLConnection conn=this.getConnectionToMaster();
432     Statement S=conn.createStatement();
433     String query="SELECT DISTINCT "+keyColumn+" FROM "+table+" ORDER BY "+keyColumn+";";
434     ResultSet R=S.executeQuery(query);
435     ArrayList<Integer> idsAL=new ArrayList<Integer>();
436     while (R.next()){
437     idsAL.add(R.getInt(1));
438     }
439     allIds=new int[idsAL.size()];
440     for (int i=0;i<idsAL.size();i++) {
441     allIds[i]=idsAL.get(i);
442     }
443     R.close();
444     S.close();
445     conn.close();
446     }
447     catch (SQLException e){
448     e.printStackTrace();
449     }
450     return allIds;
451     }
452    
453     /**
454     * For a certain key and table returns a HashMap containing an int array per cluster node
455     * @param key
456     * @param table
457     * @return HashMap, keys are node names, values: int array with the ids for each node
458     */
459     public HashMap<String,int[]> splitIdsIntoSets(String key, String table){
460     HashMap<String,int[]> idSets =new HashMap<String,int[]>();
461     String[] nodes=DataDistribution.getNodes();
462     int numNodes=nodes.length;
463     int[] allIds=this.getAllIds4KeyAndTable(key,table);
464     int numIds=allIds.length;
465     int setSize=numIds/numNodes;
466     int remainder=numIds%numNodes;
467     for (int i=0;i<numNodes;i++){
468     if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
469     int[] thisnodeidset=new int[setSize+1];
470     for (int j=0;j<thisnodeidset.length;j++){
471     thisnodeidset[j]=allIds[j+i*(setSize+1)];
472     }
473     idSets.put(nodes[i],thisnodeidset);
474     } else { // for the rest we put only setSize ids
475     int[] thisnodeidset=new int[setSize];
476     for (int j=0;j<thisnodeidset.length;j++){
477     thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
478     }
479     idSets.put(nodes[i],thisnodeidset);
480     }
481     }
482     return idSets;
483     }
484    
485     /**
486     * To split a given table in chunks based on a key, split tables remain in same database and server
487     * @param key
488     * @param table
489     */
490     public void splitTable (String key,String table){
491     String keyColumn=key+"_id";
492     String query;
493     HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
494     String[] splitTables=new String[idSets.size()];
495     try {
496     MySQLConnection conn=this.getConnectionToMaster();
497     int i=0;
498     for (String node:idSets.keySet()) {
499     String splitTbl=table+"_split_"+node;
500     splitTables[i]=splitTbl;
501     i++;
502     // we create permanent tables
503     query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
504     conn.executeSql(query);
505     // drop the indexes if there was any, indexes will slow down the creation of split tables
506     String[] indexes=conn.getAllIndexes4Table(table);
507     for (String index:indexes) {
508     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
509     }
510     int idmin=idSets.get(node)[0];
511     int idmax=idSets.get(node)[idSets.get(node).length-1];
512     query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+keyColumn+">="+idmin+" AND "+keyColumn+"<="+idmax+";";
513     conn.executeSql(query);
514     //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
515     }
516     conn.close();
517     }
518     catch (SQLException e){
519     e.printStackTrace();
520     }
521     }
522    
523     /**
524     * To split a given table in chunks based on a key, split tables go to different nodes of cluster
525     * @param key
526     * @param table
527     * @param destDb name of destination db
528     */
529     public void splitTableToCluster (String key,String table, String destDb){
530     String keyColumn=key+"_id";
531     String query;
532     HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
533     String[] splitTables=new String[idSets.size()];
534     try {
535     MySQLConnection conn=this.getConnectionToMaster();
536     int i=0;
537     for (String node:idSets.keySet()) {
538     String splitTbl=table+"_split_"+node;
539     splitTables[i]=splitTbl;
540     i++;
541     // we create permanent tables, later we drop them. Can't be temporary as we use another connection for dumpData
542     query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
543     conn.executeSql(query);
544     // drop the indexes if there was any, indexes will slow down the creation of split tables
545     String[] indexes=conn.getAllIndexes4Table(table);
546     for (String index:indexes) {
547     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
548     }
549     // make the table a memory table (won't be feasible in general case where tables can be VERY big, even white won't cope)
550     //query="ALTER TABLE "+splitTbl+" TYPE=MEMORY;";
551     //conn.executeSql(query);
552     int idmin=idSets.get(node)[0];
553     int idmax=idSets.get(node)[idSets.get(node).length-1];
554     query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+keyColumn+">="+idmin+" AND "+keyColumn+"<="+idmax+";";
555     conn.executeSql(query);
556     }
557     // transfering data across
558     String[] srchosts={MASTER};
559     String[] desthosts=getNodes();
560     dumpData(srchosts,splitTables);
561     // using here loadSplitData rather than loadData because table names are not the same on source and destination, i.e. source: table_split_tla01, dest: table
562     loadSplitData(MASTER,desthosts,table,destDb);
563     // droping table, we don't want them anymore after loading data to nodes
564     for (String tbl:splitTables){
565     query="DROP TABLE "+tbl+";";
566     conn.executeSql(query);
567     }
568     conn.close();
569     }
570     catch (SQLException e){
571     e.printStackTrace();
572     }
573     }
574    
575     /**
576     * Executes a query in all nodes in cluster.
577     * TODO Right now it is serial, must parallelize this with threads
578     * TODO This can be used in lots of methods in this class (all the loadData and dumpData ones)
579     * @param query
580     */
581     public void clusterExecuteQuery(String query){
582     String[] nodes = getNodes();
583     for (String node: nodes){
584     try {
585     MySQLConnection conn = this.getConnectionToNode(node);
586     conn.executeSql(query);
587     conn.close();
588     }
589     catch(SQLException e){
590     e.printStackTrace();
591     System.err.println("Couldn't execute query="+query+", in node="+node);
592     System.exit(1);
593     }
594     }
595    
596     }
597     }

Properties

Name Value
svn:executable *