ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistribution.java
Revision: 113
Committed: Thu Oct 19 13:22:32 2006 UTC (17 years, 11 months ago) by duarte
File size: 14427 byte(s)
Log Message:
Modified method getIdFromMaster to create keyTable if it didn't exist
Line File contents
1 package tools;
2
3 import java.io.*;
4 import java.sql.*;
5 import java.util.ArrayList;
6 import java.util.HashMap;
7 import java.util.Collections;
8
9 public class DataDistribution {
10
11 final static String ADMINDIR="/project/StruPPi/Cluster/admin";
12 final static String HOSTSFILE=ADMINDIR+"/hosts_mysql_server.txt";
13 public final static String MASTER="white";
14 public final static String KEYMASTERDB="key_master";
15
16 final static long TIME_OUT = 3000;
17
18 public String db;
19 private String user;
20 private String pwd;
21
22 public String[] nodes;
23
24 //TODO fields that we might want to implement in future:
25 //public boolean isSplit; //whether data distribution is a split distribution or an all data in all distribution
26 //public String splitTable;
27 //public String splitKey;
28 //public String keyTable; // this is normally dbOnNodes__splitTable (as stored on dbs_keys table in key_master db)
29 //public String dbOnMaster; // use it in checkCountsAllTables in cases master and nodes db have different names
30 //public String dbOnNodes; // use it in checkCountsAllTables in cases master and nodes db have different names
31
32
33 public DataDistribution(String db,String user,String pwd) {
34 this.db=db;
35 this.user=user;
36 this.pwd=pwd;
37 // check that mysql server running in node and that database db exists on it
38 nodes = nodesAlive(getMySQLNodes());
39 //TODO implement method tablesCoincide
40 //nodes = tablesCoincide(nodes);
41 //TODO implement compareNodesList2MasterKeyTable, see half-way implemented code below
42 //compareNodesList2MasterKeyTable();
43 }
44
45 private MySQLConnection getConnectionToMaster() {
46 MySQLConnection conn=this.getConnectionToNode(MASTER);
47 return conn;
48 }
49
50 private MySQLConnection getConnectionToMasterKeyDb() {
51 MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
52 return conn;
53 }
54
55 private MySQLConnection getConnectionToNode(String node) {
56 MySQLConnection conn=new MySQLConnection(node,user,pwd,db);
57 return conn;
58 }
59
60 public String[] getNodes(){
61 return nodes;
62 }
63
64 public String[] nodesAlive(String[] testNodes){
65 HashMap<String,MySQLConnectionCheck> allchecks = new HashMap<String,MySQLConnectionCheck>();
66 ArrayList<String> nodesAL = new ArrayList<String>();
67 for (String node:testNodes){
68 MySQLConnectionCheck check = new MySQLConnectionCheck(node,user,pwd,db);
69 allchecks.put(node, check);
70 check.start();
71 }
72 for (String node:testNodes){
73 try {
74 MySQLConnectionCheck check = allchecks.get(node);
75 check.join(TIME_OUT);
76 if (check.connTestPassed()) {
77 nodesAL.add(check.dbServer);
78 }
79 } catch (InterruptedException e) {
80 e.printStackTrace();
81 }
82 }
83 Collections.sort(nodesAL);
84 String[] passedNodes=new String[nodesAL.size()];
85 nodesAL.toArray(passedNodes);
86
87 return passedNodes;
88 }
89
90 // public String[] compareNodesList2MasterKeyTable() {
91 // ClusterConnection cconn = new ClusterConnection(db,key,user,pwd); // where do we get key from?
92 // String keyTable=cconn.getTableOnNode();
93 // String masterKeyTable=cconn.getKeyTable();
94 // cconn.close();
95 // // getting distinct client_names from masterKeyTable
96 // ArrayList<String> nodesAL = new ArrayList<String>();
97 // for (String node:nodes){
98 // String query="SELECT client_name FROM clients_names AS c WHERE c.client_id IN (SELECT DISTINCT client_id FROM "+masterKeyTable+") ORDER BY client_name;";
99 // try {
100 // MySQLConnection conn = this.getConnectionToMasterKeyDb();
101 // Statement S=conn.createStatement();
102 // ResultSet R=S.executeQuery(query);
103 // while (R.next()){
104 // nodesAL.add(R.getString(1));
105 // }
106 // S.close();
107 // R.close();
108 // conn.close();
109 // }
110 // catch(SQLException e){
111 // e.printStackTrace();
112 // System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB);
113 // }
114 // }
115 // for (String node:nodes){
116 // //TODO compare nodesAL and nodes
117 // }
118 // //TODO return a nodes array string with non matching nodes
119 // //return ;
120 // }
121
122 /**
123 * Gets all potentially working MySQL server nodes that we have by reading the file HOSTSFILE
124 * which contains all of our working MySQL server nodes (checked with script check-nodes.py)
125 * @return String array with node names
126 */
127 public static String[] getMySQLNodes() {
128 String[] mysqlnodes=null;
129 try {
130 File inputFile = new File(HOSTSFILE);
131 BufferedReader hostsFile = new BufferedReader(new FileReader(inputFile)); // open BufferedReader to the file
132 String nodesstr=hostsFile.readLine();
133 mysqlnodes=nodesstr.split(" ");
134 hostsFile.close();
135 }
136 catch (IOException e){
137 e.printStackTrace();
138 System.err.println("Couldn't read from file "+HOSTSFILE);
139 }
140 return mysqlnodes;
141 }
142
143 public static int getIdFromMaster(MySQLConnection conn, String key, String keyTable, String host) {
144 int id=0;
145 try {
146 String query="CREATE TABLE IF NOT EXISTS "+keyTable+" ("+
147 key+" int(11) NOT NULL auto_increment, " +
148 "client_id smallint(6) NOT NULL default '0', " +
149 "PRIMARY KEY (`"+key+"`) " +
150 ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
151 conn.executeSql(query);
152 query = "SELECT client_id FROM clients_names WHERE client_name='"+host+"';";
153 int clientId=conn.getIntFromDb(query);
154 query="INSERT INTO "+keyTable+" (client_id) VALUES ("+clientId+");";
155 conn.executeSql(query);
156 query = "SELECT LAST_INSERT_ID() FROM "+keyTable+" LIMIT 1;";
157 id=conn.getIntFromDb(query);
158 }
159 catch (SQLException E) {
160 System.err.println("SQLException: " + E.getMessage());
161 System.err.println("SQLState: " + E.getSQLState());
162 System.err.println("Couldn't insert record in master table "+keyTable+" in host "+conn.getHost());
163 }
164 return id;
165 }
166
167
168 public boolean checkCountsAllTables (){
169 boolean checkResult=true;
170 MySQLConnection mconn = this.getConnectionToMaster();
171 String[] tables = mconn.getTables4Db();
172 mconn.close();
173 // getting hashmap of all counts from all tables from nodes
174 HashMap<String,HashMap<String,Integer>> countsNodes=new HashMap<String,HashMap<String,Integer>>();
175 for (String node:nodes){
176 HashMap<String,Integer> tableCounts = new HashMap<String,Integer>();
177 for (String tbl:tables){
178 String query="SELECT count(*) FROM "+tbl+";";
179 try {
180 MySQLConnection conn = this.getConnectionToNode(node);
181 Statement S=conn.createStatement();
182 ResultSet R=S.executeQuery(query);
183 if (R.next()){
184 tableCounts.put(tbl,R.getInt(1));
185 }
186 S.close();
187 R.close();
188 conn.close();
189 }
190 catch(SQLException e){
191 e.printStackTrace();
192 System.err.println("Couldn't execute query in host="+node+", database="+db);
193 System.exit(1);
194 }
195 }
196 countsNodes.put(node,tableCounts);
197 }
198 // getting hashmap of all counts of all tables from master
199 HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
200 for (String tbl:tables){
201 String query="SELECT count(*) FROM "+tbl+";";
202 try {
203 MySQLConnection conn = this.getConnectionToMaster();
204 Statement S=conn.createStatement();
205 ResultSet R=S.executeQuery(query);
206 if (R.next()){
207 countsMaster.put(tbl,R.getInt(1));
208 }
209 S.close();
210 R.close();
211 conn.close();
212 }
213 catch(SQLException e){
214 e.printStackTrace();
215 System.err.println("Couldn't execute query in host="+MASTER+", database="+db);
216 System.exit(1);
217 }
218 }
219 // comparing the nodes counts with the master counts
220 for (String tbl:countsMaster.keySet()){
221 int masterCount=countsMaster.get(tbl);
222 for (String node:countsNodes.keySet()){
223 int thisNodeCount=countsNodes.get(node).get(tbl);
224 if (masterCount!=thisNodeCount) {
225 System.out.println("Count difers for table "+tbl+" in database "+db+" of node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount);
226 checkResult=false; // if one count difers then the check fails, we return false for checkResult. If no difers at all we return true
227 }
228 else {
229 System.out.println("Count check passed for node "+node+", table "+tbl);
230 }
231
232 }
233 }
234 return checkResult;
235 } // end checkCounts
236
237 /**
238 * To check the key counts in master and nodes for a certain key.
239 * @param key the name of the key
240 * @return boolean: true if check passed, false if not passed
241 */
242 public boolean checkKeyCounts(String key) {
243 boolean checkResult=true;
244 ClusterConnection cconn = new ClusterConnection(db,key,user,pwd);
245 String keyTable=cconn.getTableOnNode();
246 String masterKeyTable=cconn.getKeyTable();
247 cconn.close();
248 // getting hashmap of counts of keys from nodes
249 HashMap<String,int[]> countsNodes=new HashMap<String,int[]>();
250 String query="SELECT count("+key+"),count(DISTINCT "+key+") FROM "+keyTable+";";
251 for (String node:nodes){
252 try {
253 MySQLConnection conn = this.getConnectionToNode(node);
254 Statement S=conn.createStatement();
255 ResultSet R=S.executeQuery(query);
256 int[] thisNodeKeyCount=new int[2];
257 if (R.next()){
258 thisNodeKeyCount[0]=R.getInt(1);
259 thisNodeKeyCount[1]=R.getInt(2);
260 }
261 countsNodes.put(node,thisNodeKeyCount);
262 S.close();
263 R.close();
264 conn.close();
265 }
266 catch(SQLException e){
267 e.printStackTrace();
268 System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
269 System.exit(1);
270 }
271 }
272 // getting hashmap of counts of keys from master
273 HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
274 for (String node:nodes){
275 String queryM="SELECT count(*) FROM "+masterKeyTable+" AS a,clients_names as c WHERE a.client_id=c.client_id AND c.client_name='"+node+"';";
276 try {
277 MySQLConnection conn = this.getConnectionToMasterKeyDb();
278 Statement S=conn.createStatement();
279 ResultSet R=S.executeQuery(queryM);
280 if (R.next()){
281 countsMaster.put(node,R.getInt(1));
282 }
283 S.close();
284 R.close();
285 conn.close();
286 }
287 catch(SQLException e){
288 e.printStackTrace();
289 System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB);
290 System.exit(1);
291 }
292 }
293 //compare the two hashmaps of key counts
294 for (String node:countsMaster.keySet()){
295 int masterCount=countsMaster.get(node);
296 int[] thisNodeCount=countsNodes.get(node);
297 if (thisNodeCount[0]!=thisNodeCount[1]) {
298 System.out.println("Key count and distinct key count do not coincide for key "+key+" in node "+node+". Key count="+thisNodeCount[0]+", distinct key count="+thisNodeCount[1]);
299 checkResult=false;
300 }
301 else if (thisNodeCount[0]!=masterCount) {
302 System.out.println("Key counts do not coincide for key "+key+" in master and node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount[0]);
303 System.out.print("Differing "+key+"'s are: ");
304 String[] diffKeys = getDifferingKeys(key,node);
305 for (String k:diffKeys){
306 System.out.print(k+" ");
307 }
308 System.out.println();
309 checkResult=false;
310 }
311 else {
312 System.out.println("Key counts check passed for key "+key+" in node "+node+". The count is: "+masterCount);
313 }
314 }
315 return checkResult;
316 }
317
318 /**
319 * Method to get the differing keys for a certain key and node. Used by checkKeycounts method. Shouldn't be used out of this class.
320 * @param key the key name
321 * @param node the host name of the cluster node
322 * @return array of ints with all differing keys for this key and node
323 */
324 public String[] getDifferingKeys (String key,String node) {
325 ArrayList<String> diffKeys = new ArrayList<String>();
326 String[] diffKeysAr;
327 ClusterConnection cconn = new ClusterConnection(db,key,user,pwd);
328 String keyTable=cconn.getTableOnNode();
329 String masterKeyTable=cconn.getKeyTable();
330 cconn.close();
331 String query="SELECT DISTINCT "+key+" FROM "+keyTable+" ORDER BY "+key+";";
332 MySQLConnection mconn=null;
333 try {
334 MySQLConnection nconn = this.getConnectionToNode(node);
335 String colType = nconn.getColumnType(keyTable,key);
336 mconn = this.getConnectionToMasterKeyDb();
337 Statement S=nconn.createStatement();
338 ResultSet R=S.executeQuery(query);
339 mconn.executeSql("CREATE TEMPORARY TABLE tmp_keys ("+key+" "+colType+" default NULL) ENGINE=MEMORY;");
340 String thisKey=null;
341 while (R.next()){
342 thisKey=R.getString(1);
343 query="INSERT INTO tmp_keys VALUES ('"+thisKey+"');";
344 mconn.executeSql(query);
345 }
346 S.close();
347 R.close();
348 nconn.close();
349 }
350 catch(SQLException e){
351 e.printStackTrace();
352 System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
353 System.exit(1);
354 }
355 try {
356 query="SELECT c.k " +
357 "FROM " +
358 "(SELECT u.id AS k,count(u.id) AS cnt " +
359 "FROM " +
360 "(SELECT "+key+" AS id FROM tmp_keys UNION ALL SELECT kt."+key+" AS id FROM "+masterKeyTable+" AS kt LEFT JOIN clients_names AS cn ON kt.client_id=cn.client_id WHERE cn.client_name='"+node+"') AS u GROUP BY u.id) " +
361 "AS c " +
362 "WHERE c.cnt=1;";
363 Statement S=mconn.createStatement();
364 ResultSet R=S.executeQuery(query);
365 while (R.next()){
366 diffKeys.add(R.getString(1));
367 }
368 S.close();
369 R.close();
370 }
371 catch(SQLException e){
372 e.printStackTrace();
373 System.err.println("Couldn't execute query: "+query+"in host="+MASTER+", database="+KEYMASTERDB);
374 System.exit(1);
375 }
376 diffKeysAr= new String[diffKeys.size()];
377 for (int i=0;i<diffKeys.size();i++) {
378 diffKeysAr[i]=diffKeys.get(i);
379 }
380 return diffKeysAr;
381 }
382
383 /**
384 * For a certain key (numeric or text based) and table finds out how the table is splitted in nodes and returns the "data distribution"
385 * Take care when using this method as an argument of insertIdsToKeyMaster: if table is not in chunks (but rather all data in all)
386 * then ids can't be inserted in key_master as there will be duplication, i.e. for key_id=1 as data is in all nodes there
387 * will be 40 copies of it and thus 40 equal ids will try to be inserted in key_master, which a) makes no sense and
388 * b) mysql won't do it anyway
389 * @param key text-based key (char/varchar)
390 * @param table
391 * @return idSets HashMap, keys are node names, values: Integer/String array with the ids for each node
392 */
393 public HashMap<String,Object[]> getIdSetsFromNodes(String key, String table){
394 HashMap<String,Object[]> idSets =new HashMap<String,Object[]>();
395 for (String node:nodes){
396 MySQLConnection conn = this.getConnectionToNode(node);
397 idSets.put(node,conn.getAllIds4KeyAndTable(key,table));
398 conn.close();
399 }
400 return idSets;
401 }
402
403 }

Properties

Name Value
svn:executable *