ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistributer.java
Revision: 85
Committed: Mon Apr 24 12:41:27 2006 UTC (18 years, 5 months ago) by duarte
File size: 16265 byte(s)
Log Message:
MAJOR change.
Split DataDistribution into 2 classes: DataDistributer and DataDistribution.
I haven't actually changed or added functionality
DataDistributer deals with the distribution of the data, while DataDistribution deals with things to do when data is already distributed, right now is only a few data checks
Note that DataDistributer now has two db fields: srcDb and destDb. This is different to before, when destDb was rather a parameter passed as arguments to the methods
Methods in DataDistributer have been tidied up a little (specially load and dump ones)
Line User Rev File contents
1 duarte 85 package tools;
2    
3     import java.io.File;
4     import java.sql.SQLException;
5     import java.sql.Statement;
6     import java.util.HashMap;
7    
8     public class DataDistributer {
9    
10     public final static String GLOBALDIR="/project/snow/global/tmp";
11     public final static String MASTER = DataDistribution.MASTER;
12     public final static String KEYMASTERDB = DataDistribution.KEYMASTERDB;
13    
14     String dumpdir;
15     String srcDb;
16     String destDb;
17     String user;
18     String pwd;
19     boolean debug = false;
20    
21     /**
22     * Construct DataDistributer object passing only source db. The destination db will be same as source.
23     * @param srcDb
24     * @param user
25     * @param pwd
26     */
27     public DataDistributer(String srcDb, String user, String pwd) {
28     this.srcDb=srcDb;
29     this.destDb=srcDb;
30     this.user=user;
31     this.pwd=pwd;
32     initializeDirs();
33     }
34    
35     /**
36     * Construct DataDistributer object passing both source db and destination db
37     * @param srcDb
38     * @param destDb
39     * @param user
40     * @param pwd
41     */
42     public DataDistributer(String srcDb, String destDb, String user, String pwd) {
43     this.srcDb=srcDb;
44     this.destDb=destDb;
45     this.user=user;
46     this.pwd=pwd;
47     initializeDirs();
48     }
49    
50     public void setDebug(boolean debug){
51     this.debug=debug;
52     }
53    
54     public void initializeDirs() {
55     dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
56     if (!((new File(dumpdir)).mkdir())) {
57     System.err.println("Couldn't create directory "+dumpdir);
58     System.exit(1);
59     }
60     SystemCmd.exec("chmod go+rw "+dumpdir);
61     }
62    
63     public void finalizeDirs() {
64     if (debug) {
65     System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
66     } else {
67     System.out.println("Removing temporary directory "+dumpdir);
68     //TODO must capture exit state and print to error if problems deleting dir
69     SystemCmd.exec("rm -rf "+dumpdir);
70     }
71     }
72    
73     private MySQLConnection getConnectionToNode(String node) {
74     MySQLConnection conn=new MySQLConnection(node,user,pwd,srcDb);
75     return conn;
76     }
77    
78     private MySQLConnection getConnectionToNode(String node, String dbName){
79     MySQLConnection conn=new MySQLConnection(node,user,pwd,dbName);
80     return conn;
81     }
82    
83     private MySQLConnection getConnectionToMaster() {
84     MySQLConnection conn=this.getConnectionToNode(MASTER);
85     return conn;
86     }
87    
88     private MySQLConnection getConnectionToMasterKeyDb() {
89     MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
90     return conn;
91     }
92    
93     public void dumpData(String[] srchosts, String[] tables) {
94     for (String srchost: srchosts) {
95     if (!((new File(dumpdir+"/"+srchost+"/"+srcDb)).mkdirs())) {
96     System.err.println("Couldn't create directory "+dumpdir+"/"+srchost+"/"+srcDb);
97     System.exit(1);
98     }
99     SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+srchost);
100     for ( String tbl: tables) {
101     String outfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
102     String wherestr="";
103     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
104     if (debug) {System.out.println ("HOST="+srchost+", database="+srcDb+", sqldumpstr="+sqldumpstr);}
105     else {
106     try {
107     MySQLConnection conn = this.getConnectionToNode(srchost);
108     Statement S=conn.createStatement();
109     S.executeQuery(sqldumpstr);
110     System.out.println ("Dumped from host="+srchost+", database="+srcDb+", table="+tbl+" to outfile="+outfile);
111     S.close();
112     conn.close();
113     }
114     catch(SQLException e){
115     e.printStackTrace();
116     System.err.println("Couldn't dump from host="+srchost+", database="+srcDb+", table="+tbl+" to outfile="+outfile);
117     System.exit(1);
118     }
119     } //end else if debug
120     } // end foreach tbl
121     }
122     System.out.println ("Dump finished.");
123     }
124    
125     public void loadData(String[] srchosts, String[] desthosts,String[] tables) {
126     for (String desthost: desthosts) {
127     for (String srchost: srchosts) {
128     for (String tbl:tables) {
129     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tbl+".txt";
130     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
131     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
132     else {
133     try {
134     MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
135     conn.executeSql(sqlloadstr);
136     System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", into database="+destDb+", table="+tbl);
137     conn.close();
138     }
139     catch(SQLException e){
140     e.printStackTrace();
141     System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tbl);
142     System.exit(1);
143     }
144     }
145     } // end foreach tbl
146     }
147     }
148     System.out.println ("Load finished.");
149     finalizeDirs();
150     }
151    
152     public void loadSplitData(String srchost, String[] desthosts, String tableName) {
153     for (String desthost:desthosts) {
154     String dumpfile=dumpdir+"/"+srchost+"/"+srcDb+"/"+tableName+"_split_"+desthost+".txt";
155     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
156     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", DESTDB="+destDb+", sqlloadstr="+sqlloadstr);}
157     else {
158     try {
159     MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
160     conn.executeSql(sqlloadstr);
161     System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", database="+srcDb+", table="+tableName);
162     conn.close();
163     }
164     catch(SQLException e){
165     e.printStackTrace();
166     System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tableName);
167     System.exit(1);
168     }
169     }
170     } // end foreach desthosts
171     System.out.println ("Load finished.");
172     finalizeDirs();
173     }
174    
175     /**
176     * For a certain key and table returns a "data distribution" (kind of evenly distributed) of the data to the nodes
177     * To be used when we have a table that we are going to split to the nodes
178     * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
179     * @param key
180     * @param table
181     * @return idSets HashMap, keys are node names, values: int array with the ids for each node
182     */
183     public HashMap<String,int[]> splitIdsIntoSets(String key, String table){
184     HashMap<String,int[]> idSets =new HashMap<String,int[]>();
185     String[] nodes=DataDistribution.getNodes();
186     int numNodes=nodes.length;
187     MySQLConnection conn = this.getConnectionToMaster();
188     int[] allIds=conn.getAllIds4KeyAndTable(key,table);
189     conn.close();
190     int numIds=allIds.length;
191     int setSize=numIds/numNodes;
192     int remainder=numIds%numNodes;
193     for (int i=0;i<numNodes;i++){
194     if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
195     int[] thisnodeidset=new int[setSize+1];
196     for (int j=0;j<thisnodeidset.length;j++){
197     thisnodeidset[j]=allIds[j+i*(setSize+1)];
198     }
199     idSets.put(nodes[i],thisnodeidset);
200     } else { // for the rest we put only setSize ids
201     int[] thisnodeidset=new int[setSize];
202     for (int j=0;j<thisnodeidset.length;j++){
203     thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
204     }
205     idSets.put(nodes[i],thisnodeidset);
206     }
207     }
208     return idSets;
209     }
210    
211     /**
212     * To split a given table in chunks based on a key, split tables remain in same database and server
213     * @param key
214     * @param table
215     */
216     public void splitTable (String key,String table){
217     String query;
218     HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
219     String[] splitTables=new String[idSets.size()];
220     try {
221     MySQLConnection conn=this.getConnectionToMaster();
222     int i=0;
223     for (String node:idSets.keySet()) {
224     String splitTbl=table+"_split_"+node;
225     splitTables[i]=splitTbl;
226     i++;
227     // we create permanent tables
228     query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
229     conn.executeSql(query);
230     // drop the indexes if there was any, indexes will slow down the creation of split tables
231     String[] indexes=conn.getAllIndexes4Table(table);
232     for (String index:indexes) {
233     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
234     }
235     int idmin=idSets.get(node)[0];
236     int idmax=idSets.get(node)[idSets.get(node).length-1];
237     query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+key+">="+idmin+" AND "+key+"<="+idmax+";";
238     conn.executeSql(query);
239     //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
240     }
241     conn.close();
242     }
243     catch (SQLException e){
244     e.printStackTrace();
245     }
246     }
247    
248     /**
249     * To split a given table in chunks based on a key, split tables go to different nodes of cluster
250     * @param key
251     * @param table
252     */
253     public DataDistribution splitTableToCluster (String key,String table){
254     String query;
255     HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
256     String[] splitTables=new String[idSets.size()];
257     try {
258     MySQLConnection conn=this.getConnectionToMaster();
259     int i=0;
260     for (String node:idSets.keySet()) {
261     String splitTbl=table+"_split_"+node;
262     splitTables[i]=splitTbl;
263     i++;
264     // we create permanent tables, later we drop them. Can't be temporary as we use another connection for dumpData
265     query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
266     conn.executeSql(query);
267     // drop the indexes if there was any, indexes will slow down the creation of split tables
268     String[] indexes=conn.getAllIndexes4Table(table);
269     for (String index:indexes) {
270     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
271     }
272     // make the table a memory table (won't be feasible in general case where tables can be VERY big, even white won't cope)
273     //query="ALTER TABLE "+splitTbl+" TYPE=MEMORY;";
274     //conn.executeSql(query);
275     int idmin=idSets.get(node)[0];
276     int idmax=idSets.get(node)[idSets.get(node).length-1];
277     query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+key+">="+idmin+" AND "+key+"<="+idmax+";";
278     conn.executeSql(query);
279     }
280     // transfering data across
281     String[] srchosts={MASTER};
282     String[] desthosts=DataDistribution.getNodes();
283     dumpData(srchosts,splitTables);
284     // using here loadSplitData rather than loadData because table names are not the same on source and destination, i.e. source: table_split_tla01, dest: table
285     loadSplitData(MASTER,desthosts,table);
286     // droping table, we don't want them anymore after loading data to nodes
287     for (String tbl:splitTables){
288     query="DROP TABLE "+tbl+";";
289     conn.executeSql(query);
290     }
291     conn.close();
292     }
293     catch (SQLException e){
294     e.printStackTrace();
295     }
296     // putting the ids in the key_master database so we keep track of where everything is
297     insertIdsToKeyMaster(key,table,idSets);
298     DataDistribution dataDist = new DataDistribution(destDb,user,pwd);
299     return dataDist;
300     }
301    
302     /**
303     * Insert all ids to the key_master database creating a new table for this destDb/given table combination if not exists
304     * @param key name of key on which distribution of table is based
305     * @param table name of table that we are distributing
306     * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes from a DataDistribution object
307     */
308     public void insertIdsToKeyMaster(String key,String table,HashMap<String,int[]> idSets) {
309     MySQLConnection conn = this.getConnectionToMasterKeyDb();
310     String keyMasterTbl = createNewKeyMasterTbl(key,table);
311     removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
312     for (String node:idSets.keySet()){
313     int[] thisNodeIds=idSets.get(node);
314     for (int id:thisNodeIds){
315     String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) " +
316     "SELECT "+id+",c.client_id FROM clients_names AS c WHERE client_name='"+node+"';";
317     try {
318     conn.executeSql(query);
319     } catch (SQLException e) {
320     e.printStackTrace();
321     }
322     }
323     }
324     removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
325     addPK(keyMasterTbl,key); // if there were duplicates, this should barf
326     }
327    
328     /**
329     * To create a new key master table for destination db in the key_master database given a key and table. Used by insertIdsToKeyMaster
330     * Eventually this method on other key_master related should go into their own class, shouldn't they?
331     * @param key
332     * @param table
333     * @return the name of the key master table created
334     */
335     public String createNewKeyMasterTbl(String key,String table) {
336     String keyMasterTbl=destDb+"__"+table;
337     MySQLConnection conn=this.getConnectionToMasterKeyDb();
338     try {
339     String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
340     key+" int(11) NOT NULL auto_increment, " +
341     "client_id smallint(6) NOT NULL default '0', " +
342     "PRIMARY KEY (`"+key+"`) " +
343     ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
344     Statement S=conn.createStatement();
345     S.executeUpdate(query);
346     S.close();
347     } catch (SQLException e) {
348     System.err.println("Couldn't create table "+keyMasterTbl);
349     e.printStackTrace();
350     }
351     try {
352     Statement S=conn.createStatement();
353     String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+srcDb+"\',\'"+keyMasterTbl+"\');";
354     S.executeUpdate(query);
355     S.close();
356     } catch (SQLException e) {
357     System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
358     System.err.println("SQLException: " + e.getMessage());
359     }
360     conn.close();
361     return keyMasterTbl;
362     }
363    
364     public void removePK (String keyMasterTbl,String key){
365     MySQLConnection conn=this.getConnectionToMasterKeyDb();
366     try {
367     String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" int(11) NOT NULL default '0';";
368     conn.executeSql(query);
369     query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
370     conn.executeSql(query);
371     } catch (SQLException e) {
372     e.printStackTrace();
373     }
374     conn.close();
375     }
376    
377     public void addPK (String keyMasterTbl, String key){
378     MySQLConnection conn=this.getConnectionToMasterKeyDb();
379     try {
380     String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
381     conn.executeSql(query);
382     query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" int(11) NOT NULL auto_increment;";
383     conn.executeSql(query);
384     } catch (SQLException e) {
385     e.printStackTrace();
386     }
387     conn.close();
388     }
389    
390     public void removeZeros (String keyMasterTbl, String key){
391     MySQLConnection conn=this.getConnectionToMasterKeyDb();
392     try {
393     String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"=0;";
394     conn.executeSql(query);
395     } catch (SQLException e) {
396     e.printStackTrace();
397     }
398     conn.close();
399     }
400    
401     /**
402     * Executes a query in all nodes in cluster.
403     * Not in use at the moment
404     * TODO Right now it is serial, must parallelize this with threads
405     * @param query
406     */
407     public void clusterExecuteQuery(String query){
408     String[] nodes = DataDistribution.getNodes();
409     for (String node: nodes){
410     try {
411     MySQLConnection conn = this.getConnectionToNode(node);
412     conn.executeSql(query);
413     conn.close();
414     }
415     catch(SQLException e){
416     e.printStackTrace();
417     System.err.println("Couldn't execute query="+query+", in node="+node);
418     System.exit(1);
419     }
420     }
421     }
422    
423     /**
424     * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
425     * Not in use at the moment
426     * TODO Right now it is serial, must parallelize this with threads
427     * TODO This can be used in the load/dump methods in this class where queries are different for each node
428     * @param queries a HashMap containing a query per node
429     */
430     public void clusterExecuteQuery(HashMap<String,String> queries){
431     String[] nodes = DataDistribution.getNodes();
432     for (String node: nodes){
433     String query="";
434     try {
435     query=queries.get(node);
436     MySQLConnection conn = this.getConnectionToNode(node);
437     conn.executeSql(query);
438     conn.close();
439     }
440     catch(SQLException e){
441     e.printStackTrace();
442     System.err.println("Couldn't execute query="+query+", in node="+node);
443     System.exit(1);
444     }
445     }
446     }
447    
448     }

Properties

Name Value
svn:executable *