ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/gclib/scripts/jgridx
Revision: 24
Committed: Tue Jul 26 21:46:39 2011 UTC (13 years, 1 month ago) by gpertea
File size: 68505 byte(s)
Log Message:
Line File contents
1 #!/usr/bin/perl
2 use strict;
3 use Getopt::Std;
4 use File::Basename; #for basename, dirname
5 use POSIX "sys_wait_h";
6 use Net::Domain qw(hostname hostdomain);
7 use Net::SMTP; # for send_mail function
8 use Cwd qw(abs_path cwd);
9 use Fcntl qw(:DEFAULT :seek); #mostly for the SEEK constants
10 use FindBin; use lib "$FindBin::Bin";
11
12 my $NORMAL_ENDING=1; #set to 0 while working, to 1 upon normal ending of script (i.e. no die())
13 my $USER=$ENV{USER} || POSIX::cuserid(); #it may not be there for condor workers
14 my $PWD=cwd(); #from Cwd module
15 my $PERL_BIN='/usr/bin/perl';
16 my $MAX_RETRIES=3; #-- how many times to try a failed task
17 my $F_WRKCOUNT='.wrkCount'; # count of currently running workers
18 my $F_ALLDONE='.all.Done.';
19 my $F_WRKSTART='.wrkStarted'; #count of (ever) started workers
20 my $F_LASTTASK='.lastTask'; # maximum task# ever started
21 my $F_TASKSDONE='tasksDone'; # number of tasks successfully finished
22 my $F_ERRTASKS='err.tasks'; #LIST of task#s which returned non-zero status
23 # even after MAX_RETRIES
24 my $F_RETRYTASKS='retry.tasks'; #stack of task#s which returned non-zero status
25 #less then MAX_RETRIES times
26 my $F_WRKRUNNING='.running-worker'; #semaphore file in each worker directory
27 my $F_ENDCMD='epilogue';
28 my $F_WRKDIR='workdir';
29 my $F_NOTIFY='notify';
30 my $F_TASKDB='taskDb';
31 my $GRID_DEBUG;
32 my $starting_dir;
33 my $STARTED_GRID_TASK;
34 my $SMPChildren=0; #SMP case: number of children running
35 my %SMPChildren=(); #set of child PIDs
36 my %Locks = (); # filehandle -> [lockfilelink, hostlockfile]
37 #my $HOSTNAME = (&POSIX::uname)[1]; # can't trust HOST envvar under condor, because
38 # # it will either inherit HOST and HOSTNAME from submitter env for getenv=TRUE,
39 # # OR it will not set anything for getenv=FALSE
40 my $HOSTNAME = hostname(); # from Net::Domain
41 chomp($HOSTNAME);
42 $HOSTNAME=lc($HOSTNAME);
43 my $HOST=$HOSTNAME;
44 # my ($DOMAIN)=($HOST=~m/^[\w\-]+\.(.+)/);
45 # unless ($DOMAIN) {
46 # if ($HOST=~m/^flicker|^wren|^ibis/) {
47 # $DOMAIN='umiacs.umd.edu';
48 # }
49 # unless ($DOMAIN) {
50 # $DOMAIN=($PWD=~/^\/export|^\/home/) ? 'dfci.harvard.edu' : 'umiacs.umd.edu';
51 # }
52 # }
53 my $DOMAIN = hostdomain(); # from Net::Domain
54 ($HOST)=($HOST=~m/^([\w\-]+)/);
55 $ENV{HOST}=$HOST;
56 $ENV{MACHINE}=$HOST;
57 $ENV{HOSTNAME}=$HOSTNAME;
58 $ENV{USER}=$USER;
59 #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! - site specific -
60 #---------- modify the SITE-SPECIFIC paths here if needed:
61 #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&!
62 # ===== PATH management ====
63 # default: assume architecture/PATH/LIB uniformity across nodes
64 # you can change this by adding your own worker/server paths here
65 my $binpath=$ENV{PATH};
66 my $libpath=$ENV{LD_LIBRARY_PATH};
67 my $perllib=$ENV{PERLLIB};
68
69 # use home directory for symbolic links to working directories
70 my $homebase=$ENV{HOME};
71 $homebase=~s/(\/[^\/]+$)//;
72
73 # sometimes the HOME path is not defined within the grid job
74 # please use some globally mounted path instead
75
76 # default grid engine used:
77 my ($GRID_MONHOME, $GRID_ENGINE) = ($homebase, 'pbs');
78 my $sysopen_mode = O_RDWR | O_CREAT ;
79 # $sysopen_mode |= O_DIRECT if $DOMAIN!~/umiacs/;
80
81 #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! -
82 #
83 my $morehints=qq{
84 The following entries will be created by gridx:
85 <GRID_JOBDIR>/epilogue - only if -e option was given, a file containing the
86 <end_cmd> parameter
87 <GRID_JOBDIR>/notify - a file containing the e-mail address to notify
88 if -m was given
89 <GRID_JOBDIR>/.lastTask - a file keeping track of the last task# scheduled
90 <GRID_JOBDIR>/.wrkStarted - a file keeping track of the number of
91 <GRID_JOBDIR>/.wrkCount - a file keeping track of the number of
92 currenly running workers
93 <GRID_JOBDIR>/err.tasks - list of all error-terminated tasks, after retry
94 (should be zero or empty if all tasks
95 finished successfully)
96 <GRID_JOBDIR>/retry.tasks - list of all error-terminated tasks that will
97 still be retried
98 <GRID_JOBDIR>/taskDb - pseudo-fasta db with the info & status
99 of each task
100 <GRID_JOBDIR>/taskDb.cidx - the cdbfasta index of the above db file
101 <GRID_JOBDIR>/locks/ - lockfiles/links are placed here
102 <GRID_JOBDIR>/running/ - while a task is processed, a file entry
103 called task_<GRID_TASK> will be created in
104 here for locking purposes; such file will have
105 a line with processing info for the current
106 task: <hostname> <pid> <CPU#>
107 <GRID_JOBDIR>/wrk_<CPU#>/ - working directory exclusive to each worker
108 process, one per CPU (1 <= CPU <= maxCPUs);
109 also contains stderr and stdout log files
110 for each worker process (wrk_stderr.log and
111 wrk_stdout.log)
112 * if -i option is used, a slice db file is created for <fastadb> with the
113 positions of all slices of <numseqs> fasta records in the <fastadb> file.
114 This file is called <GRID_JOBDIR>/taskDb
115 * if -b option was given: executes the <begin_script> in the current submit
116 directory (the one above <GRID_JOBDIR>)
117 * if all tasks completed successfully and if -e option was given
118 the <end_script> is executed in the <GRID_JOBDIR> subdirectory
119 };
120
121 my $usage=qq{
122 Submit a list of commands to the grid. Can also automatically
123 slice a multi-fasta file into chunks and pass the slice file names as
124 parameters to a <cmd>:
125
126 Usage:
127
128 jgridx \{-f <cmds_file> | -i <fastadb> | -r <num_iterations> \} -p <num_workers>
129 [-w <#threads_per_worker>] [-M <req_mem> ]\{-U|-u <slot>\} [-n <numseqs>]
130 [-s <skipseqs>] [-t <totalseqs>] [-g <engine>] [-d <dir_prefix>] [-a]
131 [-L <local_wrk_dir>] [-O <log_files_dir>] [-q] [-m <e-mail>]
132 [-S] [-T] [-v] [-x <node_exclude_list>] [-y <node_targets_list>]
133 [-b <begin_script>] [-e <end_script>] [-c] <cmd> [-C] [<cmd_args>..]
134
135
136 Unless -J option is given, gridx will submit the job to the grid,
137 creating a <GRID_JOBDIR> subdirectory (which on Condor has the format:
138 gridx-<hostname>_<job_id>). A file called 'cmdline-<numtasks>.cmd'
139 will also be created there containing the current gridx commmand line.
140
141 Options:
142 -g grid engine to use, can be 'pbs', 'smp', 'condor' or 'sge' (default: $GRID_ENGINE)
143 -p maximum number of worker processes to request for this job
144 -w (pbs engine only) request number of threads (CPUs) for each worker process
145 -M (pbs engine only) request node memory (in GB) for each worker process;
146 MUST use this option if your program needs more than 2GB memory!
147 -f provide a file with a list of commands to be run on the grid (one
148 command per line per CPU)
149 -i grid processing of slices of the given multi-fasta file <fastadb>;
150 the total number of tasks/iterations is given by the number of slices
151 in <fastadb> (and the -n <numseqs> option); the script will
152 automatically create the slice file in the worker directory before
153 <cmd> is run with these parameters:
154 <fastaslice> <numseqs> <slice#> <is_last> <skipped> <total> <cmd_args>
155 -n slice size: for -i option, how many fasta records from <fastadb>
156 should be placed into a slice file for each iteration (default: 2000)
157 -C legacy option for psx-like usage (to pass <cmd_args>)
158 -r submit an array job with <numtasks> iterations/tasks (default: 1)
159
160 -S switch to (stay in) the current directory (where gridx was launched from)
161 before each task execution (especially useful for -f option); by default
162 worker process are executed in their own worker subdirectory
163 -a (psx legacy): inherit the submitter environment (default)
164 -b prologue script to be executed BEFORE the grid job is submitted
165 -e epilogue script to be executed AFTER all the grid tasks were completed,
166 and only if ALL the tasks terminated successfully.
167 -m send an e-mail to the given address when all tasks are finished
168 -v do not try to validate <cmd> (assume it is going to be valid on the grid
169 nodes)
170 -x provide a list of machines (host names) that should be excluded
171 from the condor pool for this submitted job ('condor' engine only)
172 -y the submitted job is only allowed to run on the machines on this list
173 ('condor' engine only)
174 -d create <dir_prefix> prefixed symbolic links to worker directories
175 in the current directory
176 -O place all Condor log files (condor stderr/stdout redirects)
177 into <log_files_dir>
178 -M parent directory for creating a "monitoring" symlink; a symbolic link to
179 the current <GRID_JOBDIR> will be created there, under
180 <monitoring_basedir>/$USER/ (default: home directory)
181 -T do not exit after the job is submitted, wait around instead
182 until the last worker terminates and prints some job completion stats
183 -U force one worker per machine (only for 'condor' engine)
184 -u force each worker to run only on CPU <slot#> (between 1 and 12) of each
185 machine (only for 'condor' engine)
186 For -r option, the provided <cmd> could make use of the environment variables
187 GRID_TASK, GRID_TASKLAST to determine the current iteration being executed.
188
189 Unless -S option was used, each <cmd> iteration will be executed in its own
190 worker subdirectory <GRID_JOBDIR>/wrk_<CPU#>
191
192 Job monitoring/resuming/stopping (-J mode):
193
194 gridx [-m e-mail] [-e <end_script>] [-K|-R] -J <jobID>
195
196 If -J option is provided, gridx will report the status of the job <jobID>
197 which must have been submitted by a previous, "regular" use of gridx; it relies
198 on the the symbolic link <monitor_basedir>/$USER/gridx-<jobID> which must
199 be valid (<monitor_basedir> can be set by -M).
200
201 Additional/alternate actions for -J mode:
202 -e update the <end_script> for the *running* <jobID> or for the <jobID>
203 rerun (if -R option is also given); does not work with -K option
204 -m update the e-mail notification option for job <jobID>
205 -K kill/abandon/terminate ALL pending tasks for grid job jobID
206 (trying to remove all running/pending tasks on the grid)
207 -R rerun/resubmit all the unfinished/unprocessed or unsuccessful tasks
208 for <GRID_JOB>; this assumes -K -J <JOB_ID> was given first (so there
209 are no pending tasks) and then will submit a new job in the same working
210 directory, renaming the GRID_JOBDIR accordingly while workers
211 will now *skip* all the tasks found with a "Done" status ('.') in the
212 <GRID_JOBDIR>/taskDb file
213 }; #'
214
215
216 RESUME_JOBID:
217 my @ar=@ARGV;
218 while ($ar[0] eq '-Z') { shift(@ar); shift(@ar); }
219 my $CMDLINE="$FindBin::Script\t".join("\t",@ar);
220 # parse script options
221 print STDERR "Running on $HOST (domain: $DOMAIN): $0 ".join(' ',@ARGV)."\n";
222 getopts('USWHM:O:NKBRDTqvaJZ:L:u:r:x:y:i:Ff:n:t:d:s:p:w:m:g:b:e:c:C:') || die($usage."\n");
223 umask 0002;
224 if ($Getopt::Std::opt_H) {
225 print STDERR $usage;
226 die($morehints."\n");
227 }
228 my $SwitchDir=$Getopt::Std::opt_S;
229 $NORMAL_ENDING=0;
230 $GRID_DEBUG=$Getopt::Std::opt_D;
231 #print STDERR "Host=$HOST, Domain=$DOMAIN\n" if $GRID_DEBUG;
232
233 my $GRID_DIRPREFIX=$Getopt::Std::opt_d;
234 my ($submitJob, $removeJob);
235 $GRID_ENGINE=lc($Getopt::Std::opt_g) if $Getopt::Std::opt_g;
236 if ($GRID_ENGINE eq 'pbs') {
237 ($submitJob, $removeJob)=(\&submitJob_pbs, \&removeJob_sge);
238 }
239 elsif ($GRID_ENGINE eq 'sge') {
240 ($submitJob, $removeJob)=(\&submitJob_sge, \&removeJob_sge);
241 }
242 elsif ($GRID_ENGINE eq 'condor') {
243 ($submitJob, $removeJob)=(\&submitJob_condor, \&removeJob_condor);
244 }
245 elsif ($GRID_ENGINE eq 'smp') {
246 ($submitJob, $removeJob)=(\&submitJob_smp, \&removeJob_smp);
247 }
248 else {
249 die("Error: invalid grid engine given (only 'pbs', 'condor' or 'smp' are accepted)!\n");
250 }
251 my $UniqueVM=$Getopt::Std::opt_U;
252
253 my $UniqVMreq=$Getopt::Std::opt_u;
254 $UniqVMreq=int($UniqVMreq) if $UniqVMreq;
255 die("Error: use either -U or -u option, not both!\n") if $UniqueVM && $UniqVMreq>0;
256 my $worker_threads=$Getopt::Std::opt_w || 1;
257 my $node_mem_req = $Getopt::Std::opt_M;
258
259 # - exclude the following machines
260 my @xmachinelist=split(/\,/,$Getopt::Std::opt_x); #do NOT allow jobs to run on these machines
261 my @ymachinelist=split(/\,/,$Getopt::Std::opt_y); #only allow jobs to run on these machines, not others
262 #submitJob MUST use the globals:
263 # GRID_CMD, GRID_TASKLAST, GRID_MONHOME and update GRID_JOB
264
265 #$GRID_MONHOME=$Getopt::Std::opt_M if $Getopt::Std::opt_M;
266 $GRID_MONHOME.='/'.$USER unless $GRID_MONHOME=~m/$USER$/;
267 #
268
269 die("Error: directory $GRID_MONHOME should already be created! Aborting..\n")
270 unless (-d $GRID_MONHOME);
271
272 my $mailnotify=$Getopt::Std::opt_m;
273 #-------- GLOBALs ---------------------------
274 my $GRID_JOBDIR;
275 my $GRID_JOB;
276 my $GRID_LOCKDIR;
277 my $GRID_TASKLAST;
278 my $GRID_CMD; # user's command and arguments
279 my $GRID_CMDLIST=$Getopt::Std::opt_f; # one line per run.. with arguments for command <cmd>
280 my $GRID_USECMDLIST=1 if $GRID_CMDLIST || $Getopt::Std::opt_F;
281
282 my $GRID_PSXFASTA; #if -i was used
283 my $GRID_PSXSTEP; #if -i was used (-n value)
284 my $GRID_PSXSKIP=0; # -s option
285 my $GRID_PSXTOTAL=0; # -t option
286
287 my $GRID_NUMPROCS=0; # -p option
288 my $GRID_RESUME=$Getopt::Std::opt_R || $Getopt::Std::opt_Z;
289 my $GRID_LOCAL_JOBDIR=$Getopt::Std::opt_L;
290 #---------- worker side global vars:
291 my $GRID_ENVSET=0; #was the environment set?
292 my $GRID_WORKER=0; # worker#
293 #my $GRID_NOWRKLINKS=1;
294 my $GRID_LOGDIR=$Getopt::Std::opt_O;
295 my $GRID_TASK; # dynamic -- task iteration#
296 my $TASK_ERRCOUNT; # dynamic -- current task error (retry) counter
297 my $TASK_DATA; # current task's user data as stored in taskDb
298 my $TASK_LOCKH; #file handle for the current task lock file
299 my $TASK_LOCKF; #file name for the current task lock file
300 my $GRID_WRKDIR; #only for the worker case, it's the current worker's subdirectory
301
302 if ($Getopt::Std::opt_J) {
303 #################################################################
304 # jgridx job monitoring use:
305 #---------------------------------------------------
306 # gridx -J [-m <e-mail>] [-M <mondir>] [-R | -K] <jobid>
307 #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
308 my ($jobid)=shift(@ARGV);
309 $jobid=~tr/ //d;
310 unless ($jobid) { # list mode
311 my $fmask="$GRID_MONHOME/gridx-*";
312 my @jdirs=<${fmask}>;
313 if (@jdirs>0) {
314 print STDOUT "The following jobs were found (in $GRID_MONHOME):\n";
315 foreach (@jdirs) {
316 my ($jobid)=(m/gridx\-(\w[\w\-]+)$/);
317 print " $jobid\n";
318 }
319 }
320 else {
321 print STDOUT "No jobs were found (in $GRID_MONHOME).\n";
322 }
323 $NORMAL_ENDING=1;
324 exit(0);
325 }
326 #a valid $jobid was given, hopefully
327 $jobid=lc($jobid);
328 my $subdir='gridx-'.$jobid;
329 my $jobdir="$GRID_MONHOME/gridx-$jobid";
330 unless (-d $jobdir) { #try current directory..
331 if (-d $subdir) {
332 $jobdir=$subdir;
333 }
334 else {
335 die "No such job found($jobid) - neither $jobdir nor $subdir exist!\n";
336 }
337 }
338 #print STDERR "..found jobdir='$jobdir'\n";
339 chdir($jobdir) || die("Error at chdir($jobdir)!\n");
340 #my $msg=jobSummary($mailnotify);
341 my $msg=jobSummary();
342 print STDOUT $msg."\n";
343 if ($Getopt::Std::opt_K) {
344 &$removeJob($jobid);
345 }
346 elsif ($Getopt::Std::opt_R) { #resume/rerun!
347 #chdir($jobdir) || die("Error at chdir($jobdir)!\n");
348 my @cmdfile=<cmdline-*.cmd>;
349 die "Error getting the cmdline-*.cmd from current directory!\n"
350 unless @cmdfile;
351 my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/);
352 die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0;
353 my $cmdline=readFile($cmdfile[0]);
354 chomp($cmdline);
355 my @args=split(/\t/,$cmdline);
356 die("$jobdir/$F_TASKDB and/or index not valid - cannot resume!\n")
357 unless -s $F_TASKDB && -s "$F_TASKDB.cidx";
358 shift(@args); #discard gridx command itself
359 chdir('..'); #go in the original working dir
360 $PWD=cwd(); #from Cwd module
361 $CMDLINE="$FindBin::Script\t".join("\t",@args);
362 @ARGV=('-Z',$jobid, @args);
363 undef($Getopt::Std::opt_J);
364 undef($Getopt::Std::opt_R);
365 goto RESUME_JOBID;
366 }
367 $NORMAL_ENDING=1;
368 exit(0);
369 }
370 elsif ($Getopt::Std::opt_B) {
371 #################################################################
372 # jgridx setup job:
373 #---------------------------------------------------
374 # gridx -B <jobid>
375 # prepare the directory and taskDb for <jobid>
376 # which was submitted but placed on hold until this setup job
377 # finishes
378 #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
379 }
380 elsif ($Getopt::Std::opt_W) {
381 ##########################################################
382 # jgridx Worker Mode:
383 #---------------------------------------------------------
384 # This is the actual job that gets submitted
385 # gridx -W <cmd> <cmd_args>
386 # At runtime the environment should be set accordingly
387 #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
388 beginWorker(); #set up environment and worker directory
389 #(wrk_NNNNN) and chdir() to it
390 $GRID_USECMDLIST=$Getopt::Std::opt_F;
391 my @cmd=@ARGV; # cmd and cmdargs
392 while (my $taskId = getNextTask()) {
393 runTask($taskId, @cmd);
394 }
395 my $runningworkers=endWorker();
396 my $exitcode=0;
397 if ($runningworkers==0) { #this was the last worker!
398 #run any JOB finishing/clean up code
399 chdir($GRID_JOBDIR);
400 my $epilogue=readFile($F_ENDCMD) if -s $F_ENDCMD;
401 if ($epilogue) {
402 chomp($epilogue);
403 #only run it if ALL tasks succeeded
404 my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone);
405 if ($tasksdone==$GRID_TASKLAST && !(-s $F_ERRTASKS)) {
406 runCmd($epilogue);
407 }
408 }
409 my $notify=readFile($F_NOTIFY) if -s $F_NOTIFY;
410 chomp($notify);
411 jobSummary($notify);
412 local *FDONE;
413 open(FDONE, '>'.$GRID_JOBDIR.'/'.$F_ALLDONE) || die("Error creating $GRID_JOBDIR/$F_ALLDONE\n");
414 print FDONE "done.\n";
415 close(FDONE);
416 } #last worker ending
417 $NORMAL_ENDING=1;
418 exit(0);
419 }# -- worker use case
420
421
422 ##########################################################
423 #
424 # gridx Submit use:
425 #
426 ##########################################################
427
428 my $gridBeginCmd=$Getopt::Std::opt_b;
429 my $gridEndCmd=$Getopt::Std::opt_e;
430 $GRID_RESUME=$Getopt::Std::opt_Z; #caused by an initial -J -R request
431 $GRID_TASKLAST=$Getopt::Std::opt_r;
432 #--submit specific globals -- temporary
433 my $JOBID;
434 #--
435 unless ($GRID_CMDLIST) {
436 $GRID_CMD=$Getopt::Std::opt_c || shift(@ARGV);
437 die "$usage\nError: no command given!\n" unless $GRID_CMD;
438 unless ($Getopt::Std::opt_v) {
439 $GRID_CMD = getCmdPath($GRID_CMD) ||
440 die "Error: command $GRID_CMD not found in PATH!\n";
441 }
442 $GRID_CMD.=' '.$Getopt::Std::opt_C if $Getopt::Std::opt_C;
443 $GRID_CMD.=' '.join(' ',@ARGV) if @ARGV;
444 }
445 else {
446 $GRID_CMD='.';
447 }
448 $GRID_NUMPROCS=$Getopt::Std::opt_p || 1; #numer of workers requested
449
450 if ($GRID_PSXFASTA=$Getopt::Std::opt_i) { #psx emulation case
451 die "Error: -r and -i are mutually exclusive options!\n" if $GRID_TASKLAST;
452 $GRID_PSXSTEP=$Getopt::Std::opt_n || 1;
453 $GRID_PSXSKIP=$Getopt::Std::opt_s || 0;
454 $GRID_PSXTOTAL=$Getopt::Std::opt_t || 0;
455 $GRID_PSXFASTA=getFullPath($GRID_PSXFASTA, 1);
456 }
457 else { $GRID_TASKLAST=1 unless $GRID_TASKLAST; } #one shot run
458
459 $JOBID = &$submitJob({PATH=> $binpath,
460 LD_LIBRARY_PATH=>$libpath, PERLLIB=>$perllib} );
461 #-- submitJob also calls setupJobDir() to create the gridx-<JOBID> subdirectory
462 #-- and move/rename the taskDb in there.
463 die("Error: no job ID has been obtained!\n") unless $JOBID;
464 if ($Getopt::Std::opt_T) { #wait for all children to finish
465 my $wstat;
466 chdir($GRID_JOBDIR);
467 do {
468 sleep(5);
469 } until (-e $F_ALLDONE);
470 my $msg=jobSummary($mailnotify);
471 my $exitcode = (-s $F_ERRTASKS) ? `wc -l $F_ERRTASKS` : 0;
472 chomp($exitcode);
473 print STDOUT $msg."\n";
474 chdir($PWD); #this should be the original working directory
475 $NORMAL_ENDING=1;
476 exit($exitcode);
477 }
478 $NORMAL_ENDING=1;
479 exit(0);
480
481 #**********************************************************
482 #******************* SUBROUTINES **************************
483
484
485
486 #******************** SUBMIT SIDE *************************
487
488 =head2 ----------- taskDbRec -----------
489
490 taskDbRec($fhdb, $jobId, $command_line..)
491
492 Creates (writes) a record into the taskDb file open for writing
493 with the file handle $fhdb. This subroutine takes care of
494 formatting the fixed length defline (header) of the job record.
495
496 This subroutine should only be used by the program which
497 creates the taskDb. After all the records are added to the taskDb;
498 the taskDb should be eventually indexed with cdbfasta.
499
500 =cut
501
502 sub taskDbRec {
503 my ($fh, $taskId, @userdata)=@_;
504 my $rec='>'.$taskId."\t{-|-|----|----|--------}\t{".('-' x 25).'}';
505 $rec.="\t".join(' ',@userdata) if @userdata;
506 print($fh $rec."\n");
507 }
508
509
510 sub prepareTaskDb {
511 #we must be in GRID_JOBDIR already
512 if (-s $F_TASKDB && $GRID_RESUME) {
513 #taskDb already there, it's a Resume request
514 my $r=`cdbyank -s $F_TASKDB.cidx`; # force reindex
515 ($GRID_TASKLAST)=($r=~m/\nNumber of records:\s+(\d+)\n/s);
516 die "Error: couldn't determine number of records in $F_TASKDB\n"
517 unless $GRID_TASKLAST>0;
518 return;
519 }
520 if ($GRID_PSXFASTA) { #-i given, psx mode
521 #-- iterate through the fasta file, create slice index
522 # and the index for the slice index..
523 local *TASKDB;
524 local *PSXFASTA;
525 open(PSXFASTA, $GRID_PSXFASTA) || die "Error opening file $GRID_PSXFASTA\n";
526 binmode(PSXFASTA);
527 open(TASKDB, '>'.$F_TASKDB) || die "Error creating file $F_TASKDB ($!)\n";
528 my $foffset=0;
529 my $rcount=0;
530 my $numrecs=0;
531 $GRID_TASKLAST=0;
532 while (<PSXFASTA>) {
533 my $linelen=length($_);
534 if (/^>/) {
535 $rcount++;
536 if ($rcount>$GRID_PSXSKIP) {
537 $numrecs++;
538 if (($numrecs-1) % $GRID_PSXSTEP == 0) {
539 $GRID_TASKLAST++;
540 taskDbRec(\*TASKDB, $GRID_TASKLAST, $foffset);
541 }
542 }
543 last if ($GRID_PSXTOTAL>0 && $numrecs>$GRID_PSXTOTAL);
544 }
545 $foffset+=$linelen;
546 }
547 close(PSXFASTA);
548 #&taskdbRec(\*TASKDB, $GRID_TASKLAST, $foffset)
549 # unless ($numrecs-1) % $GRID_PSXSTEP == 0;
550 close(TASKDB);
551 }
552 elsif ($GRID_CMDLIST) { # -f option given
553 #return if ($GRID_RESUME); #taskDb must already be there!
554 local *TASKDB;
555 open(CMDFILE, $GRID_CMDLIST) || die "Error opening file $GRID_CMDLIST\n";
556 open(TASKDB, '>'.$F_TASKDB) || die "Error creating file $F_TASKDB ($!)\n";
557 my $i=1;
558 while(<CMDFILE>) {
559 next if m/^\s*#/;
560 chomp;
561 taskDbRec(\*TASKDB, $i, $_);
562 $i++;
563 }
564 close(TASKDB);
565 close(CMDFILE);
566 $GRID_TASKLAST=$i-1;
567 }
568 elsif ($GRID_TASKLAST) { # -r option given
569 #return if ($GRID_RESUME); #taskDb must already be there!
570 local *TASKDB;
571 open(TASKDB, '>'.$F_TASKDB) || die "Error creating file $F_TASKDB ($!)\n";
572 for (my $i=1;$i<=$GRID_TASKLAST;$i++) {
573 taskDbRec(\*TASKDB, $i, $GRID_CMD);
574 }
575 close(TASKDB);
576 }
577 else { return; }
578 runCmd("cdbfasta $F_TASKDB");
579 }
580
581 sub getFName {
582 return basename($_[0]);
583 }
584
585 sub getFDir {
586 return dirname($_[0]);
587 }
588
589 sub getFullPath {
590 my ($fname, $check)=@_;
591 die("Error: file $fname does not exist!\n") if $check && !-r $fname;
592 return abs_path($fname); #Cwd module
593 }
594
595 #== getCmdPath -- checks for executable in the PATH if no full path given
596 # tests if the executable is a text file and interpreter was requested
597 sub getCmdPath {
598 my $cmd=$_[0];
599 my $fullpath;
600 my $checkBinary=wantarray();
601 if ($cmd =~ m/^\//) {
602 $fullpath = (-x $cmd) ? $cmd : '';
603 }
604 elsif ($cmd =~ m/^\.+\//) { #relative path given
605 $fullpath= (-x $cmd) ? abs_path($cmd) : '';
606 }
607 else { #we search in the path..
608 my @paths=split(/:/, $ENV{'PATH'});
609 foreach my $p (@paths) {
610 if (-x $p.'/'.$cmd) {
611 $fullpath=$p.'/'.$cmd;
612 last;
613 }
614 }
615 }#path searching
616 if ($checkBinary) { #asked for interpreter, if any
617 if ($fullpath) {
618 my $interpreter='';
619 if (-r $fullpath && -T $fullpath) {#readable text file, look for bang line
620 open(TFILE, $fullpath);
621 my $linesread=1;
622 while ($linesread<10) {#read only the first 10 lines..
623 $_=<TFILE>;
624 chomp;
625 if (m/^\s*#\!\s*(\S.+)/) {
626 $interpreter=$1;
627 last;
628 }
629 $linesread++;
630 }
631 $interpreter=~s/\s+$//;
632 }
633 return ($fullpath, $interpreter);
634 }
635 else { return (); } #cmd not found;
636 }
637 else { return $fullpath; }
638 }
639
640 sub runCmd {
641 my ($cmd, $jobid)=@_;
642 my $exitstatus=system($cmd);
643 if ($exitstatus != 0) {
644 if ($jobid) {
645 &$removeJob($jobid);
646 }
647 die("Error at running system command: $cmd\n");
648 }
649 }
650
651
652 sub removeJob_sge {
653 my $jobid=shift(@_);
654 runCmd("qdel $jobid");
655 }
656
657 sub removeJob_condor {
658 my $jobid=shift(@_); #machine+'_'+job#
659 #must be on the same machine that submit was issuedlh
660 my ($hostname, $job)=($jobid=~m/([\w\-]+)_(\d+)$/);
661 die("Error parsing hostname, job# from $jobid!\n")
662 unless $hostname && ($job>0);
663 #print STDERR "$hostname, $HOST, $jobid, $job\n";
664 if (lc($hostname) eq lc($HOST)) { #local host
665 runCmd("condor_rm $job");
666 }
667 else {
668 runCmd("condor_rm -name $hostname $job");
669 }
670 }
671
672
673 sub smpTaskReaper { # takes care of dead children $SIG{CHLD} = \&taskReaper;
674 my $childpid;
675 while (($childpid = waitpid(-1, WNOHANG)) > 0) {
676 $SMPChildren --;
677 delete $SMPChildren{$childpid};
678 }
679 $SIG{CHLD}=\&smpTaskReaper;
680 }
681
682 sub smpTaskKiller { # signal handler for SIGINT
683 local($SIG{CHLD}) = 'IGNORE'; # we're going to kill our children
684 kill 'INT' => keys %SMPChildren;
685 }
686
687 sub removeJob_smp {
688 taskKiller();
689
690 }
691
692 sub submitJob_sge {
693 my ($envhash)=@_;
694 # submit array job for SGE grid
695 my $array='';
696 if ($GRID_NUMPROCS > 1) {
697 $array="-t 1-$GRID_NUMPROCS";
698 }
699 #append our GRID_ environment
700 my $envparam="-v 'GRID_ENGINE=pbs".
701 ",GRID_JOBDIR=$PWD".
702 ",GRID_TASKLAST=$GRID_TASKLAST".
703 ",GRID_MONHOME=$GRID_MONHOME".
704 ",GRID_CMD=$GRID_CMD".
705 ",GRID_DIRPREFIX=$GRID_DIRPREFIX";
706 $envparam.=",GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
707 $envparam.= ",GRID_CMDLIST=$GRID_CMDLIST" if $GRID_CMDLIST;
708 $envparam.=",GRID_PSXFASTA=$GRID_PSXFASTA".
709 ",GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
710 $envparam.= ",GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
711 $envparam.= ",GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
712 $envparam.= ",GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR" if $GRID_LOCAL_JOBDIR;
713 $envparam.=',BLASTMAT='.$ENV{BLASTMAT} if $ENV{BLASTMAT};
714 $envparam.=',BOWTIE_INDEXES='.$ENV{BOWTIE_INDEXES} if $ENV{BOWTIE_INDEXES};
715
716 if (keys(%$envhash)>0) {
717 $envparam.=',';
718 my @envars;
719 while (my ($env, $val)= each(%$envhash)) {
720 push(@envars, $env.'='.$val);
721 }
722 $envparam.=join(',',@envars);
723 }
724 $envparam.="'";
725 #--
726
727 my $sub="qsub -cwd -b y $envparam";
728 #$sub.="-e $errout" if $errout;
729 #$sub.="-o $stdout" if $stdout;
730 my $logdir='';
731 if ($GRID_LOGDIR) {
732 #separate log dir given
733 $logdir=$GRID_LOGDIR;
734 if (-d $GRID_LOGDIR) {
735 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
736 }
737 else {
738 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
739 }
740 $logdir.='/' unless $logdir=~m/\/$/;
741 }
742 my $otherflags='';
743 $otherflags.=' -D' if $GRID_DEBUG;
744 $otherflags.=' -F' if $GRID_USECMDLIST;
745 $otherflags.=' -S' if $SwitchDir;
746 my $subcmd= "$sub $array $PERL_BIN $0 $otherflags -W $GRID_CMD";
747 print STDERR "$subcmd\n" if $GRID_DEBUG;
748 my $subout = `$subcmd`;
749 my ($jobid)=($subout=~m/^(\d+)/);
750 die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid>0);
751
752 setupJobDir($jobid);
753
754 return $jobid;
755 }
756
757 sub submitJob_pbs {
758 my ($envhash)=@_;
759 # submit array job for PBS grid
760 my $array='';
761 if ($GRID_NUMPROCS > 1) {
762 $array="-t 1-$GRID_NUMPROCS";
763 }
764 #append our GRID_ environment
765 my $envparam="-v 'GRID_ENGINE=pbs".
766 ",GRID_JOBDIR=$PWD".
767 ",GRID_TASKLAST=$GRID_TASKLAST".
768 ",GRID_MONHOME=$GRID_MONHOME".
769 ",GRID_CMD=$GRID_CMD".
770 ",GRID_DIRPREFIX=$GRID_DIRPREFIX";
771 $envparam.=",GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
772 $envparam.= ",GRID_CMDLIST=$GRID_CMDLIST" if $GRID_CMDLIST;
773 $envparam.=",GRID_PSXFASTA=$GRID_PSXFASTA".
774 ",GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
775 $envparam.= ",GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
776 $envparam.= ",GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
777 $envparam.= ",GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR" if $GRID_LOCAL_JOBDIR;
778 $envparam.=',BLASTMAT='.$ENV{BLASTMAT} if $ENV{BLASTMAT};
779 $envparam.=',BOWTIE_INDEXES='.$ENV{BOWTIE_INDEXES} if $ENV{BOWTIE_INDEXES};
780
781 if (keys(%$envhash)>0) {
782 $envparam.=',';
783 my @envars;
784 while (my ($env, $val)= each(%$envhash)) {
785 push(@envars, $env.'='.$val);
786 }
787 $envparam.=join(',',@envars);
788 }
789 $envparam.="'";
790 #--
791
792 my $sub="qsub -cwd -b y $envparam";
793 #$sub.="-e $errout" if $errout;
794 #$sub.="-o $stdout" if $stdout;
795 my $logdir='';
796 if ($GRID_LOGDIR) {
797 #separate log dir given
798 $logdir=$GRID_LOGDIR;
799 if (-d $GRID_LOGDIR) {
800 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
801 }
802 else {
803 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
804 }
805 $logdir.='/' unless $logdir=~m/\/$/;
806 }
807 my $otherflags='';
808 $otherflags.=' -D' if $GRID_DEBUG;
809 $otherflags.=' -F' if $GRID_USECMDLIST;
810 $otherflags.=' -S' if $SwitchDir;
811 my $subcmd= "$sub $array $PERL_BIN $0 $otherflags -W $GRID_CMD";
812 print STDERR "$subcmd\n" if $GRID_DEBUG;
813 my $subout = `$subcmd`;
814 my ($jobid)=($subout=~m/^(\d+)/);
815 die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid>0);
816
817 setupJobDir($jobid);
818
819 return $jobid;
820 }
821
822
823 sub submitJob_smp {
824 my ($envhash)=@_;
825 my $jobid='smp_'.$$;
826 $ENV{GRID_ENGINE}='smp';
827 @ENV{'GRID_JOBDIR', 'GRID_TASKLAST', 'GRID_MONHOME', 'GRID_CMD','GRID_DIRPREFIX'}=
828 ($PWD."/gridx-$jobid", $GRID_TASKLAST, $GRID_MONHOME, $GRID_CMD,$GRID_DIRPREFIX);
829 $ENV{GRID_RESUME}=$GRID_RESUME;
830 $ENV{GRID_JOB}=$jobid;
831 $ENV{GRID_CMDLIST}=$GRID_CMDLIST if $GRID_CMDLIST;
832
833 @ENV{'GRID_PSXFASTA','GRID_PSXSTEP'}=($GRID_PSXFASTA, $GRID_PSXSTEP) if $GRID_PSXFASTA;
834 $ENV{GRID_PSXSKIP}=$GRID_PSXSKIP if $GRID_PSXSKIP;
835 $ENV{GRID_PSXTOTAL}=$GRID_PSXTOTAL if $GRID_PSXTOTAL;
836
837 $ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR."/gridx-$jobid" if $GRID_LOCAL_JOBDIR;
838 if (keys(%$envhash)>0) {
839 while (my ($env, $val)= each(%$envhash)) {
840 $ENV{$env}=$val;
841 }
842 }
843 # Fork off the children
844 my $pid;
845 setupJobDir($jobid); #we do this one in advance..
846 $SIG{CHLD}=\&smpTaskReaper;
847 print STDERR "..forking $GRID_NUMPROCS workers..\n" if $GRID_DEBUG;
848 my $logdir='';
849 if ($GRID_LOGDIR) {
850 #separate log dir given
851 $logdir=$GRID_LOGDIR;
852 if (-d $GRID_LOGDIR) {
853 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
854 }
855 else {
856 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
857 }
858 $logdir.='/' unless $logdir=~m/\/$/;
859 }
860 my $otherflags;
861 $otherflags=' -D' if $GRID_DEBUG;
862 #$otherflags.=' -N' if $GRID_NOWRKLINKS;
863 $otherflags.=' -F' if $GRID_USECMDLIST;
864 $otherflags.=' -S' if $SwitchDir;
865
866 for (1 .. $GRID_NUMPROCS) {
867 die "Error at fork: $!" unless defined ($pid = fork);
868 if ($pid) { # Parent here
869 $SMPChildren{$pid} = 1;
870 $SMPChildren++;
871 next;
872 } else { #Child here
873 # Child can *not* return from this subroutine.
874 #$SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did before
875 exec("$PERL_BIN $0 $otherflags -W $GRID_CMD"); #never returns
876 }
877 }
878 $SIG{INT}=\&smpTaskKiller;
879 $SIG{TERM}=\&smpTaskKiller;
880 return $jobid;
881 }
882
883 sub submitJob_condor {
884 my ($envhash)=@_;
885 my $jobid;
886 my $queue='queue';
887 $queue.=" $GRID_NUMPROCS" if ($GRID_NUMPROCS > 1);
888 #append our GRID_ environment
889 my $dprefix="gridx-$HOST".'_';
890 my $envparam="GRID_ENGINE=condor;".
891 "GRID_JOBDIR=$PWD/$dprefix\$(Cluster);".
892 "GRID_JOB=$HOST\_\$(Cluster);".
893 "GRID_TASKLAST=$GRID_TASKLAST;".
894 "GRID_MONHOME=$GRID_MONHOME;".
895 "GRID_CMD=$GRID_CMD;".
896 "GRID_DIRPREFIX=$GRID_DIRPREFIX";
897 $envparam.=";GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
898 $envparam.= ";GRID_CMDLIST=$GRID_CMDLIST" if $GRID_CMDLIST;
899 $envparam.=";GRID_PSXFASTA=$GRID_PSXFASTA".
900 ";GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
901 $envparam.= ";GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
902 $envparam.= ";GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
903 $envparam.=';BLASTMAT='.$ENV{BLASTMAT} if $ENV{BLASTMAT};
904 $envparam.=';BOWTIE_INDEXES='.$ENV{BOWTIE_INDEXES} if $ENV{BOWTIE_INDEXES};
905
906 $envparam.= ";GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR/$dprefix\$(Cluster);" if $GRID_LOCAL_JOBDIR;
907 if (keys(%$envhash)>0) {
908 $envparam.=';';
909 my @envars;
910 while (my ($env, $val)= each(%$envhash)) {
911 push(@envars, $env.'='.$val);
912 }
913 $envparam.=join(';',@envars);
914 }
915 my $logdir='';
916 if ($GRID_LOGDIR) {
917 #separate log dir given
918 $logdir=$GRID_LOGDIR;
919 if (-d $GRID_LOGDIR) {
920 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
921 }
922 else {
923 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
924 }
925 $logdir.='/' unless $logdir=~m/\/$/;
926 }
927 my $otherflags;
928 $otherflags=' -D' if $GRID_DEBUG;
929 #$otherflags.=' -N' if $GRID_NOWRKLINKS;
930 $otherflags.=' -F' if $GRID_USECMDLIST;
931 $otherflags.=' -S' if $SwitchDir;
932
933 my $mtime=time();
934 my $cmdfile="condor.$$.t$mtime.$USER.$HOST.cmd";
935 local *CMDFILE;
936 open(CMDFILE, '>'.$cmdfile) || die "Cannot create $cmdfile!\n";
937 my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL" || Arch == "x86_64")';
938 my $force_slot;
939 if ($UniqVMreq>0) {
940 $force_slot=$UniqVMreq;
941 }
942 elsif ($UniqueVM) {
943 $force_slot=3+int(rand(9)); #this is messed up, only works on >11 core machines
944 }
945 $requirements .= ' && (VirtualMachineId == '.$force_slot.')' if $force_slot;
946 if (@xmachinelist>0) {
947 #map { $_='Machine != "'.$_.'.'.$DOMAIN.'"' } @xmachinelist;
948 #$requirements.= ' && ('.join(' && ',@xmachinelist).')';
949 if (@xmachinelist==1 && ($xmachinelist[0]=~tr/|^?*\\//)) {
950 $requirements.=' && (TRUE != regexp("'.$xmachinelist[0].'", Machine, "i"))';
951 }
952 else {
953 $requirements.=' && (TRUE != regexp("^('.join('|',@xmachinelist).
954 ')\..*", Machine, "i"))'
955 }
956 }
957 elsif (@ymachinelist>0) {
958 if (@ymachinelist==1 && ($ymachinelist[0]=~tr/|^?*\\//)>0) {
959 $requirements.=' && regexp("'.$ymachinelist[0].'", Machine, "i")';
960 }
961 else {
962 #map { $_='Machine == "'.$_.'.'.$DOMAIN.'"' } @ymachinelist;
963 #$requirements.= ' && ('.join(' || ',@ymachinelist).')';
964 $requirements.=' && regexp("^('.join('|',@ymachinelist).
965 ')\..*", Machine, "i")'
966 }
967 }
968 print CMDFILE qq{universe = vanilla
969 requirements = $requirements
970 notification = Never
971 executable = $0
972 initialdir = $PWD
973 };
974 if ($logdir) {
975 print CMDFILE "error = ${logdir}log_$dprefix\$(Cluster).\$(Process).stderr\n".
976 "output = ${logdir}log_$dprefix\$(Cluster).\$(Process).stdout\n";
977 }
978 print CMDFILE "arguments = $otherflags -W $GRID_CMD\n".
979 "environment = $envparam;\n".
980 "$queue\n";
981 close(CMDFILE);
982 my $subcmd="condor_submit $cmdfile";
983 my $subout = `$subcmd`;
984 ($jobid)=($subout=~/submitted\s+to\s+cluster\s+(\d+)\./s);
985 die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid);
986 $jobid=$HOST.'_'.$jobid;
987 setupJobDir($jobid);
988 #setupJobDir also chdirs() in the $GRID_JOBDIR
989 system("mv ../$cmdfile condor_submit.cmd");
990 return $jobid;
991 }
992
993 sub jobDie {
994 my $jobid=shift @_;
995 print STDERR "Error: ".join("\n",@_)."\n";
996 &$removeJob($jobid);
997 die();
998 }
999
1000 sub setupJobDir {
1001 my ($jobid)=@_;
1002 my $jobdir = "gridx-$jobid";
1003 jobDie($jobid, "job directory $jobdir already exists!")
1004 if (-d $jobdir);
1005 if ($GRID_RESUME) {
1006 my $prevjobdir='gridx-'.$GRID_RESUME;
1007 print STDERR " ..taking over jobdir: $prevjobdir\n";
1008 unlink("$GRID_MONHOME/$prevjobdir") || warn(" couldn't unlink $GRID_MONHOME/$prevjobdir");
1009 unlink("$prevjobdir/$F_LASTTASK");
1010 unlink("$prevjobdir/$F_ALLDONE");
1011 rename("$prevjobdir/$F_ERRTASKS", "$prevjobdir/prev_$F_ERRTASKS");
1012 unlink("$prevjobdir/$F_RETRYTASKS");
1013 system("/bin/rm -rf $prevjobdir/wrk_*/$F_WRKRUNNING");
1014 system("/bin/rm -rf $prevjobdir/.wrk*");
1015 system("/bin/rm -rf $prevjobdir/locks");
1016 system("/bin/rm -rf $prevjobdir/running");
1017 system("mv $prevjobdir $jobdir") &&
1018 jobDie($jobid, "cannot 'mv $prevjobdir $jobdir' - $! - Resuming failed!");
1019 unlink("$jobdir/$F_TASKDB.cidx");
1020 }
1021 else {
1022 mkdir($jobdir) || jobDie($jobid, "cannot create subdirectory $jobdir");
1023 #runCmd("mv $TASKDB $jobdir/taskDb", $jobid);
1024 #runCmd("mv $TASKDB.cidx $jobdir/taskDb.cidx", $jobid);
1025 }
1026
1027 mkdir("$jobdir/locks") || jobDie($jobid,
1028 "cannot create subdirectory $jobdir/locks");
1029 mkdir("$jobdir/running") || jobDie($jobid,
1030 "cannot create subdirectory $jobdir/running");
1031 if ($GRID_MONHOME ne $PWD) {
1032 symlink("$PWD/$jobdir", "$GRID_MONHOME/$jobdir")
1033 || jobDie($jobid, "cannot symlink $GRID_MONHOME/$jobdir");
1034 }
1035 #- CHDIR to the new GRID_JOBDIR
1036 chdir($jobdir) || jobDie($jobid, "cannot chdir($jobdir) (from $PWD)!");
1037 readFile($F_TASKSDONE);
1038 my $cmdfile="cmdline-$GRID_TASKLAST.cmd";
1039 local *FHND;
1040 open(FHND, ">$cmdfile") || jobDie($jobid, "cannot create file $cmdfile\n");
1041 print FHND $CMDLINE."\n";
1042 close(FHND);
1043 open(FHND, ">$F_WRKDIR");
1044 print FHND "$PWD/$jobdir\n";
1045 close(FHND);
1046
1047 if ($mailnotify) {
1048 open(FHND, ">$F_NOTIFY");
1049 print FHND "$mailnotify\n";
1050 close(FHND);
1051 }
1052
1053 if ($gridEndCmd) {
1054 open(FHND, ">$F_ENDCMD") || die("Error creating file $jobdir/.$F_ENDCMD ($!)\n");
1055 print FHND $gridEndCmd."\n";
1056 close(FHND);
1057 }
1058 $GRID_JOBDIR = "$PWD/$jobdir";
1059 print STDOUT "Job $jobid scheduled to run with GRID_JOBDIR = $PWD/$jobdir\n";
1060
1061 } #setupJobDir
1062
1063
1064 sub jobSummary {
1065 my ($mail) = @_;
1066 #assuming we're in GRID_JOBDIR directory
1067 #(either directly or by $GRID_MONHOME)!
1068 die "Error: cannot locate $F_TASKSDONE and $F_WRKDIR in current directory ($ENV{PWD})!\n"
1069 unless -f $F_TASKSDONE && -f $F_WRKDIR;
1070 my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone);
1071 $tasksdone=0 unless $tasksdone;
1072 my $wrkdir=readFile($F_WRKDIR);chomp($wrkdir);
1073 my ($jobid)=($wrkdir=~m/\/gridx\-(\w[\w\-]+)$/);
1074 die("Error: cannot parse jobid from $F_WRKDIR content ($wrkdir)\n") unless $jobid;
1075 my @cmdfile=<cmdline-*.cmd>;
1076 die "Error getting the cmdline-*.cmd from current directory!\n"
1077 unless @cmdfile;
1078 my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/);
1079 die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0;
1080 open(CMDFILE, $cmdfile[0]);
1081 my $cmdline=<CMDFILE>;
1082 chomp($cmdline);
1083 $cmdline=~s/\t/ /g;
1084 close(CMDFILE);
1085 unlink($F_NOTIFY) if -s $F_NOTIFY;
1086 my ($msg, $subj);
1087 my $sig='';
1088 if ($mail) {
1089 $sig = "\n\n-------------------------\n -= mail sent from $HOST";
1090 $sig.=" (worker $GRID_WORKER)" if $GRID_WORKER;
1091 $sig.=" \nWorking directory: $wrkdir]\n" if $wrkdir;
1092 $sig.=" \nCommand line: \n $cmdline\n" if $cmdline;
1093 $sig.=" =-\n";
1094 }
1095 if ($tasksdone!=$numtasks) {
1096 $msg="Summary of gridx job $jobid: $tasksdone tasks done out of $numtasks\n".
1097 "Check $wrkdir for more details.\n";
1098 $subj="gridx job $jobid (done $tasksdone out of $numtasks)";
1099 }
1100 else{
1101 $msg="gridx job $jobid - done all $numtasks tasks\n";
1102 $subj="gridx job $jobid Done (all $numtasks tasks)";
1103 }
1104 $msg.=$sig;
1105 send_mail( { to=>$mail, subj=>$subj, body=>$msg }) if $mail;
1106 return $msg;
1107 }
1108
1109
1110
1111 ############## WORKER SIDE subroutines #####################
1112
1113 #=================== taskDB handling =================
1114
1115 =head2 taskDbStat (taskdb, taskId [,tstatus, cpu#, host, retries, exitcode])
1116
1117 taskDbStat($taskdb, $taskId, [, $taskstatus, $CPUno,
1118 $host, $retrycount, $exitcode])
1119
1120 gets/sets the status of a task in an existing taskdb file
1121
1122 If only $taskdb and $taskId parameters are given it works
1123 as a getter and returns the whole $taskdb entry, either as a raw
1124 string or, if wantarray(), as:
1125
1126 ($taskstatus, $userdata, $CPUno, $host, $lastexitcode, $startTime, $retrycount)
1127
1128 ..where $startTime is in minutes since the epoch (time/60)
1129
1130 If more than 2 parameters are given, it is a setter for the task status
1131 and the other task related data.
1132
1133 Valid status values:
1134 '-' = queued/idle/unprocessed
1135 'r' = running (could be a retry)
1136 '.' = finished successfully
1137 'E' = finished with error ending (after max retries)
1138
1139 Internal details:
1140 -Each record is assumed locked at the time of writing
1141 (no other processes are trying to update the same task record).
1142 -A taskdb record format is:
1143
1144 >taskId\t{S|R|xxxx|dddd|mmmmmmmm}\t{hostname}[\t<additional data..>]
1145
1146 where:
1147 S = running status ('-','r','.' or 'E')
1148 R = retry counter (0-9)
1149 xxxx = last exit code, in hex
1150 dddd = last CPU number, in hex (the wrk_<CPU> subdirectory)
1151 mmmmmmmm = start minute since the epoch (time/60) in hexadecimal
1152 hostname = fixed 25 char length machine name
1153 (blank padded as needed)
1154
1155 Error protocol for the setter: when given $status is 'E'
1156 the retry counter is incremented and the actual status
1157 would be updated to 'E' if > $maxRetries, or back to '-' otherwise.
1158
1159 =cut
1160
1161 sub taskDbStat {
1162 my ($taskdb, $entry, $status, $dirno, $machine, $errcount, $exitcode)=@_;
1163 my $taskdbidx=$taskdb;
1164 $taskdbidx.='.cidx' unless ($taskdb=~s/\.cidx$//);
1165 my $tdberr="taskDbStat(".join(',',@_).") Error:";
1166 wrkDie("$tdberr db $taskdb (and/or index) not found!".
1167 "(pwd = $ENV{PWD})") unless (-e $taskdb && -e $taskdbidx);
1168 wrkDie("$tdberr no entry given!") unless $entry;
1169 local *TASKDB;
1170 my $dbpos= `cdbyank -a '$entry' -P $taskdbidx`;
1171 chomp($dbpos);
1172 wrkDie("$tdberr at retrieving pos of entry $entry\n")
1173 if ($? || $dbpos!~/^\d+$/);
1174
1175 my $openmode = O_RDONLY | O_SYNC ;
1176 if ($status) { # setter code:
1177 wrkDie("$tdberr invalid update parameters ($status)")
1178 if (length($status)>1 || ($dirno && ($dirno>65535 || $dirno<1)));
1179 $openmode= O_RDWR | O_SYNC; # | O_DIRECT might be needed ?
1180 }
1181 sysopen(TASKDB, $taskdb, $openmode) ||
1182 wrkDie("$tdberr sysopening $taskdb failed!");
1183 binmode(TASKDB);
1184 unless (sysseek(TASKDB, $dbpos, SEEK_SET)) {
1185 close(TASKDB);
1186 wrkDie("$tdberr at sysseek() to $dbpos for $entry");
1187 }
1188 #----- read the next line and check the format
1189 my $targetstr='>'.$entry."\t{-|-|----|----|--------}\t{".('-' x 25)."}";
1190 local $/="\n";
1191 my $dbline=readline(*TASKDB);
1192 binmode(TASKDB);
1193 my ($pentry, $pstats, $pmachine, $userdata)=
1194 split(/\t/,$dbline,4);
1195 chomp($pmachine);chomp($userdata);
1196 my $tcheck=join("\t",$pentry,$pstats,$pmachine);
1197 if ($pentry ne '>'.$entry ||
1198 length($tcheck)!=length($targetstr)) {
1199 close(TASKDB);
1200 wrkDie("$tdberr invalid record format for '$entry' ".
1201 "pos $dbpos, Found:\n'$tcheck'\n ..instead of:\n'$targetstr'\n");
1202 }
1203
1204 $pmachine=~tr/{} //d;
1205 $pstats=~tr/{}//d;
1206 my ($pstatus, $pfcount, $pxcode, $pdirno, $ptime)=split(/\|/,$pstats);
1207 $ptime=hex($ptime) || $ptime;
1208 $pfcount=0 unless $pfcount>0;
1209 #----
1210 if ($status) { #---- setter code:
1211 $status=lc($status);
1212 if ($status!~/^[\-r\.e]$/) {
1213 close(TASKDB);
1214 wrkDie("$tdberr invalid status provided ($status)!");
1215 }
1216 $dbpos+=length(">$entry\t{");
1217 sysseek(TASKDB, $dbpos, SEEK_SET);
1218 if (defined($exitcode)) {
1219 $pxcode = $exitcode>0 ? sprintf('%04x', $exitcode) : $pxcode;
1220 }
1221
1222 $pfcount=$errcount if defined($errcount);
1223 if ($status eq 'r') { #mark this entry as "running"
1224 $ptime=sprintf('%08x',int(time()/60));
1225 $pxcode='----';
1226 }
1227 elsif ($status eq '-') { # mark this entry as "available" (idle)
1228 $ptime='--------';
1229 }
1230 $pmachine=$machine if $machine;
1231 $pmachine=sprintf('%25s',$pmachine);
1232 $dirno=$pdirno unless $dirno>=1;
1233 my $wstr=$status.'|'.int($pfcount).
1234 '|'.$pxcode.
1235 '|'.sprintf('%04x', $dirno).
1236 '|'.$ptime."}\t{".$pmachine.'}';
1237 if (length($ptime)>8) {
1238 die("Error writing into taskDb record, ptime='$ptime' is too long when writing:\n".
1239 "$wstr\n");
1240 }
1241 my $wlen=length($wstr);
1242 my $w=syswrite(TASKDB, $wstr, $wlen);
1243 binmode(TASKDB); # 'cause this flushes any pending I/O buffers
1244 if ($w!=$wlen) {
1245 close(TASKDB);
1246 wrkDie("$tdberr failed writing '$wstr' for $entry!");
1247 }
1248 close(TASKDB);
1249 return 1;
1250 } # setter
1251 else { # getter code
1252 close(TASKDB);
1253 if (wantarray()) { #retrieve the parsed list of values
1254 return ($pstatus, $userdata, hex($pdirno), $pmachine, int($pfcount),
1255 hex($pxcode), $ptime);
1256 }
1257 else { #return the raw taskDb line
1258 return($dbline);
1259 }
1260 } # getter
1261 }
1262
1263
1264 sub gridWorkerEnv {
1265 if ($_[0]) {
1266 $GRID_TASK=$_[0];
1267 $ENV{GRID_TASK}=$_[0];
1268 }
1269 return if $GRID_ENVSET;
1270 my $gridengine=lc($ENV{GRID_ENGINE});
1271 ( $GRID_JOBDIR, $GRID_TASKLAST, $GRID_RESUME, $GRID_MONHOME, $GRID_LOCAL_JOBDIR,
1272 $GRID_PSXFASTA, $GRID_PSXSTEP, $GRID_PSXSKIP, $GRID_PSXTOTAL, $GRID_DIRPREFIX, )=
1273 @ENV{'GRID_JOBDIR','GRID_TASKLAST','GRID_RESUME','GRID_MONHOME', 'GRID_LOCAL_JOBDIR',
1274 'GRID_PSXFASTA','GRID_PSXSTEP','GRID_PSXSKIP','GRID_PSXTOTAL', 'GRID_DIRPREFIX'};
1275 if ($gridengine eq 'pbs') {
1276 #can't have dynamic environment variables in SGE/PBS
1277 $GRID_JOB=$ENV{PBS_JOBID};
1278 #only static ones have been prepared (GRID_ENGINE, GRID_TASKLAST)
1279 # with GRID_JOBDIR initially set to the submit working directory
1280 $GRID_JOBDIR.='/gridx-'.$GRID_JOB;
1281
1282 $GRID_LOCAL_JOBDIR.='/gridx-'.$GRID_JOB if $GRID_LOCAL_JOBDIR;
1283 $GRID_PSXFASTA=$ENV{GRID_PSXFASTA};
1284 $GRID_PSXSTEP=$ENV{GRID_PSXSTEP};
1285 #dynamic ones are built now from SGE ones
1286 $ENV{GRID_JOBDIR}=$GRID_JOBDIR;
1287 $ENV{GRID_JOB}=$GRID_JOB;
1288 $ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR;
1289 }
1290 elsif ($gridengine eq 'condor' || $gridengine eq 'smp') {
1291 #condor should have all the environment in order
1292 $GRID_JOB=$ENV{GRID_JOB};
1293 }
1294 else {
1295 die("Error: Invalid GRID_ENGINE (Is this a valid worker run?)\n");
1296 }
1297 $GRID_LOCKDIR=$GRID_JOBDIR.'/locks';
1298 $GRID_ENVSET=1;
1299 }
1300
1301 sub beginWorker {
1302 gridWorkerEnv(); #setup the worker environment
1303 my $maxretries=20; #just give some time for the submit script to catch up
1304 my $chdir=0; # (only if execution of a submitted task is extraordinarily fast)
1305 my $retries=0;
1306 while (!($chdir=chdir($GRID_JOBDIR))) {
1307 sleep(1);
1308 $retries++;
1309 last if $retries==$maxretries;
1310 }
1311 die("Worker error: cannot chdir to $GRID_JOBDIR") unless $chdir;
1312 # -- we are in GRID_JOBDIR now
1313 my $errmsg="Worker PID $$ ($GRID_WORKER) failed on $HOST..\n";
1314 if ($GRID_WORKER>1) {
1315 while (!-f $F_TASKDB.'.cidx') {
1316 sleep(1); #wait for the first worker to build taskDB
1317 }
1318 }
1319 my $fh=setXLock($F_WRKSTART,120) || die $errmsg; #update the worker counter
1320 if ($GRID_WORKER==1 && !-f $F_TASKDB.'.cidx') {
1321 print STDERR "..preparing $F_TASKDB\n";
1322 prepareTaskDb(); #builds a taskDb in the current directory
1323 #checks global $GRID_PSXFASTA and $GRID_PSXSTEP
1324 #sets TASKDB to the file name, and GRID_TASKLAST is updated as needed
1325 if ($gridBeginCmd) {
1326 system($gridBeginCmd) && die("Error: exit status $? returned by prologue command: '$gridBeginCmd'\n");
1327 }
1328 }
1329
1330 $GRID_WORKER=incFValue($fh, $F_WRKSTART);
1331 endXLock($fh);
1332 print STDERR "D: worker $GRID_WORKER assigned to host $HOST pid $$ at ".getTime()."\n" if $GRID_DEBUG;
1333 $GRID_WRKDIR=sprintf("wrk_%04d",$GRID_WORKER);
1334 $starting_dir=$GRID_JOBDIR;
1335 $starting_dir=~s/[^\/]+\/?$//;
1336 if ($GRID_LOCAL_JOBDIR) {
1337 print STDERR "D: creating local dir on $HOST: $GRID_LOCAL_JOBDIR/$GRID_WRKDIR\n" if $GRID_DEBUG;
1338 mkdir("$GRID_LOCAL_JOBDIR") unless -d $GRID_LOCAL_JOBDIR;
1339 system("/bin/rm -f $GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1340 mkdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR") unless -d $GRID_LOCAL_JOBDIR;
1341 wrkDie("Error: couldn't create local worker directory $GRID_LOCAL_JOBDIR/$GRID_WRKDIR on $HOST!\n")
1342 unless (-d "$GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1343 }
1344 #-- also updates the "currently running" counter
1345 unless (-d "$GRID_JOBDIR/$GRID_WRKDIR") { #worker directory doesn't exist
1346 unless (mkdir("$GRID_JOBDIR/$GRID_WRKDIR")) {
1347 die "Error at mkdir $GRID_JOBDIR/$GRID_WRKDIR ($!)!\n";
1348 }
1349 }
1350 else { #existing working directory
1351 my $frunning= "$GRID_WRKDIR/$F_WRKRUNNING";
1352 if (-f $frunning) { #weird -- should never happen..
1353 die "Error: another process running in $GRID_WRKDIR?!\n".`cat $frunning`."$errmsg\n";
1354 }
1355 else { #normal case: no worker-running semaphore there already
1356 local *RSEM;
1357 open(RSEM, '>'.$frunning) || die "Error creating $frunning file! ($!)\n$errmsg";
1358 print RSEM join(" ",int(time()/60), $HOST, $$)."\n";
1359 close(RSEM);
1360 }
1361 }
1362 if ($GRID_DIRPREFIX) {
1363 my $gwrkdir="../$GRID_DIRPREFIX".'_'.$GRID_WORKER;
1364 unlink($gwrkdir);
1365 symlink("gridx-$GRID_JOB/$GRID_WRKDIR", $gwrkdir) ||
1366 print STDERR "Warning: cannot symlink gridx-$GRID_JOB/$GRID_WRKDIR to $gwrkdir ($!)\n";
1367 #needed by PSX emulation
1368 }
1369 # we are in GRID_JOBDIR now - descend into wrk_<GRID_WORKER>
1370 $fh=setXLock($F_WRKCOUNT,70,3) || die $errmsg; # update the count of running workers
1371 incFValue($fh, $F_WRKCOUNT);
1372 endXLock($fh);
1373 chdir("$GRID_JOBDIR/$GRID_WRKDIR") ||
1374 die "Error at chdir($GRID_JOBDIR/$GRID_WRKDIR) ($!)\n";
1375
1376 open(STDERR, ">wrk_err.log");
1377 print STDERR "worker $GRID_WORKER fully assigned to host $HOST PID $$\n" if $GRID_DEBUG;
1378 open(STDOUT, ">wrk_log.log");
1379 local *WRKGRAB;
1380 open(WRKGRAB, ">.on_$HOST");
1381 print WRKGRAB join("\t",$GRID_WRKDIR, $HOST, $$, 'start: '.getTime())."\n";
1382 close(WRKGRAB);
1383 $ENV{GRID_WORKER}=$GRID_WORKER;
1384 if ($SwitchDir) {
1385 chdir($starting_dir) || die "Error at chdir($starting_dir)!\n";
1386 }
1387 elsif ($GRID_LOCAL_JOBDIR) {
1388 chdir($GRID_LOCAL_JOBDIR/$GRID_WRKDIR) || die "Error at chdir($starting_dir)!\n";
1389 }
1390 }
1391
1392 sub endWorker {
1393 return -1 unless $GRID_WORKER && $GRID_WRKDIR;
1394 #make sure we are back in the wrk_<GRID_WORKER> directory
1395 unless (chdir("$GRID_JOBDIR/$GRID_WRKDIR")) {
1396 die("ERROR: endWorker() could not change to $GRID_JOBDIR/$GRID_WRKDIR\n");
1397 }
1398
1399 my $fh=setXLock($F_WRKCOUNT,70, 3) || die "Error updating the number of running workers!\n";
1400 # my $v=readFile($fh,0,$F_WRKCOUNT);chomp($v);
1401 my $v=incFValue($fh, $F_WRKCOUNT, -1);
1402 unlink($F_WRKRUNNING); #remove the "worker here" semaphore..
1403 if ($v<0) {
1404 endXLock($fh);
1405 die("Error: invalid number of workers ($v) reported in $GRID_JOBDIR/$F_WRKCOUNT\n")
1406 }
1407 endXLock($fh);
1408 print STDERR join("\t","D. worker $GRID_WRKDIR", $HOST, $$, 'finished at:',getTime())."\n" if $GRID_DEBUG;
1409
1410 local *WRKGRAB;
1411 sysopen(WRKGRAB, ".on_$HOST", O_RDWR | O_SYNC );
1412 sysseek(WRKGRAB,-1,SEEK_END);
1413 print WRKGRAB "\tend: ".getTime()."\n";
1414 close(WRKGRAB);
1415 if ($GRID_LOCAL_JOBDIR) {
1416 my $r=system("cp -pr $GRID_LOCAL_JOBDIR/$GRID_WRKDIR/* $GRID_JOBDIR/$GRID_WRKDIR/");
1417 if ($r) {
1418 print STDERR "Error at copying $HOST local files back to $GRID_JOBDIR/$GRID_WRKDIR!\n";
1419 }
1420 system("/bin/rm -rf $GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1421 rmdir($GRID_LOCAL_JOBDIR); #it'll fail if there are other subdirs there, but that's OK
1422 }
1423 undef($GRID_WORKER);
1424 undef($GRID_WRKDIR);
1425 return $v; #returns the number of workers left running
1426 }
1427
1428 sub writeFValue {
1429 my ($fh, $v)=@_;
1430 truncate($fh, 0);
1431 seek($fh,0,SEEK_SET);
1432 $v=int($v);
1433 print $fh $v."\n";
1434 return $v;
1435 }
1436
1437 sub writeFList { #with truncate
1438 my ($fh, $listref, $delim)=@_;
1439 $delim='' unless $delim;
1440 seek($fh,0,SEEK_SET);
1441 truncate($fh,0);
1442 print $fh join($delim,@$listref);
1443 #truncate($fh, tell($fh));
1444 }
1445 #
1446 # incFValue(fhandle) => reads an int value from fhandle
1447 # increments it, writes it back
1448 # and returns it
1449 #
1450 sub incFValue {
1451 my ($fh, $finfo, $incv)=@_;
1452 seek($fh, 0, SEEK_SET);
1453 $incv=1 unless $incv;
1454 my $v=readFile($fh, 0, "incFValue(,,$incv) in $finfo");
1455 chomp($v);
1456 return writeFValue($fh, int($v)+$incv);
1457 }
1458
1459
1460 #******************** worker side:
1461 # getNextTask() -- returns a taskId for the next task to be processed
1462 # or 0 if no more
1463 # -ignore any entries which are already "done"
1464 # -look first into the "retry pool" ($F_RETRYTASKS) to get some tasks from there,
1465 # if any -- and removes that entry
1466 #--------------- resources used:
1467 # update $F_RETRYTASKS
1468 # update $F_LASTTASK
1469 # update ENV{GRID_TASK} and $GRID_TASK
1470 #
1471 sub getNextTask {
1472 #check for any tasks in the "retry" queue
1473 my ($taskID, $retries);
1474 my $sourcemsg;
1475 SKIP_DONE:
1476 $retries=0;
1477 if (-s "$GRID_JOBDIR/$F_RETRYTASKS") {
1478 my $hr=setXLock($F_RETRYTASKS,120,5)
1479 || wrkDie("Error locking $F_RETRYTASKS ($HOST, $GRID_WORKER)");
1480 my @errstack=readFile($hr,0,$F_RETRYTASKS); #format: taskid <space> #retries
1481 ($taskID, $retries)=split(/\s+/, shift(@errstack)); #fetch last
1482 if ($taskID>0) { #valid taskID to retry
1483 chomp($retries);
1484 writeFList($hr, \@errstack); #write the list back
1485 $sourcemsg=' from the retry pool.';
1486 }
1487 endXLock($hr);
1488 }
1489 NEXT_TASK:
1490 unless ($taskID) { #getting the next available task
1491 #no retry tasks, so get the next task not processed yet
1492 my $fh=setXLock($F_LASTTASK, 70, 3) ||
1493 wrkDie("Error locking $F_LASTTASK ($HOST, $GRID_WORKER)"); #30 /retries
1494 my $last=readFile($fh, 0, $F_LASTTASK);chomp($last);
1495 if ($last<$GRID_TASKLAST) { # valid one, take the next
1496 $taskID=writeFValue($fh,int($last)+1);
1497 }
1498 $sourcemsg='';
1499 endXLock($fh);
1500 }
1501 return undef unless $taskID;
1502 $GRID_TASK=$taskID;
1503 # lock this taskID
1504 $TASK_LOCKF="$GRID_JOBDIR/running/task_$taskID";
1505 #$TASK_LOCKF="running/task_$taskID";
1506 catchSigs(1); #install signal handler
1507 $TASK_LOCKH=setXLock($TASK_LOCKF, 70, 3);
1508 unless ($TASK_LOCKH) { #this SHOULD be available immediately..
1509 my $lockedby=`cat $TASK_LOCKF`;chomp($lockedby);
1510 print STDERR "WARNING: lock-fail on task $taskID $sourcemsg (previously locked in $TASK_LOCKF by [$lockedby])\n";
1511 #wrkDie("Error: couldn't get a lock on task $taskID..\n");
1512 undef($taskID);
1513 goto SKIP_DONE; # don't kill the worker, just move on
1514 }
1515 print $TASK_LOCKH "$HOST $$ ".sprintf("wrk_%04d",$GRID_WORKER)."\n";
1516 # check the status of this task:
1517 my ($tstatus, $tuserdata, $tdirno, $thost, $terrcount,
1518 $texcode, $tstartmin)=taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID);
1519 print STDERR ">task-$taskID assigned to worker $GRID_WORKER (on $HOST) $sourcemsg\n";
1520 if ($GRID_RESUME && $tstatus eq '.') {
1521 #skip this one, it's finished (according to taskDb!)
1522 print STDERR ">SKIP-done:$taskID \{$tstatus|$terrcount|$texcode|$tdirno|$tstartmin\}\t\{$thost\}\n";
1523 undef $taskID;
1524 undef $GRID_TASK;
1525 endXLock($TASK_LOCKH);
1526 catchSigs(0);
1527 unlink($TASK_LOCKF);
1528 undef($TASK_LOCKH);undef($TASK_LOCKF);
1529 goto SKIP_DONE;
1530 }
1531 #update status of this task to 'running'
1532 $TASK_ERRCOUNT=$retries;
1533 taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID, 'r', $GRID_WORKER, $HOST, $TASK_ERRCOUNT);
1534 #--
1535 $GRID_TASK=$taskID;
1536 $TASK_DATA=$tuserdata;
1537 $ENV{'GRID_TASK'}=$taskID;
1538 $STARTED_GRID_TASK=$taskID;
1539 return $taskID;
1540 }
1541
1542 ##############################################
1543 # runTask($taskID, @cmd)
1544 #------------------------
1545 # *runs into a ./wrk_NNNN subdirectory of GRID_JOBDIR
1546 # *employs $GRID_TASK, $TASK_DATA, $TASK_ERRCOUNT
1547 # *with GRID_PSXFASTA, it uses TASK_DATA and other GRID_PSX..
1548 # to prepare the fasta slice and pass it to the cmd
1549 # *on exit, it should:
1550 # - set a lock on the ../running/task_$taskID file
1551 # - IF error exit of cmd (non-zero exit status), use $TASK_ERRCOUNT+1 and $MAX_RETRIES
1552 # to determine if the job should be put in the $F_RETRYTASKS file
1553 # or if status should be set to 'E' in taskDb and the entry added
1554 # to $F_ERRTASKS
1555 # - IF successful exit : increment $F_TASKSDONE
1556 # - update taskDb with the current status
1557 # - remove the lock on ../running/task_$taskID and delete this file!
1558 #
1559 #---------------------------------------------
1560 sub runTask {
1561 my $taskID=shift(@_);
1562 my @cmd=@_;
1563 my $exitstatus;
1564 my $runcmd; #the actuall command run using system();
1565
1566 catchSigs(1); #install signal handler (should have been done already in getNextTask())
1567 if ($GRID_LOCAL_JOBDIR) {
1568 wrkDie("Fatal: cannot chdir to local dir $GRID_LOCAL_JOBDIR/$GRID_WRKDIR!")
1569 unless chdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1570 }
1571
1572 #if ($SwitchDir) {
1573 # we are currently in a wrk_* directory
1574 # chdir('../..'); #change to the original directory (where gridx was launched from)
1575 # }
1576 if ($GRID_PSXFASTA) { #psx emulation
1577 #prepare the fasta slice here
1578 my $fslice=sprintf('%s.slice-%08d',getFName($GRID_PSXFASTA), $GRID_TASK);
1579 #-- write the slicefile
1580 local *FDB;
1581 local *FSL;
1582 open(FSL, '>'.$fslice)
1583 || wrkDie("Cannot create fasta slice $fslice");
1584 open(FDB, $GRID_PSXFASTA) || wrkDie("Cannot open fasta db $GRID_PSXFASTA");
1585 seek(FDB, $TASK_DATA, SEEK_SET);
1586 local $/="\n";
1587 my $seqnext=$GRID_PSXSTEP*($GRID_TASK-1); #+GRID_PSXSKIP
1588 my $seqcount=0;
1589 my $seqmax=($GRID_PSXTOTAL<=0) ? 0 : $GRID_PSXTOTAL;
1590 while (<FDB>) {
1591 if (/^>/) { #record start
1592 $seqnext++;
1593 last if ($seqcount>=$GRID_PSXSTEP);
1594 last if $seqmax && ($seqnext>$seqmax);
1595 $seqcount++;
1596 }
1597 print FSL $_;
1598 }#while input line from fastadb
1599 close(FSL);
1600 close(FDB);
1601
1602 $runcmd=shift(@cmd);
1603 $GRID_PSXSKIP=0 unless ($GRID_PSXSKIP);
1604 $GRID_PSXTOTAL=-1 unless ($GRID_PSXTOTAL>0);
1605 my $islast=($GRID_TASK==$GRID_TASKLAST) ? 1 : 0;
1606 # 1 2 3 4
1607 $runcmd.=' '.join(' ',$fslice, $seqcount, $GRID_TASK, $islast,
1608 # 5 6 7
1609 $GRID_PSXSKIP, $GRID_PSXTOTAL, @cmd);
1610
1611 }
1612 elsif ($GRID_USECMDLIST) {
1613 # $TASK_DATA is the actual command to run
1614 $runcmd=$TASK_DATA;
1615 }
1616 else { #normal repeat cmd -- let the cmd use the ENV accordingly
1617 $runcmd=join(" ",@cmd);
1618 }
1619
1620 #... get $exitstatus for the system() call
1621 print STDERR ">starting-task-$GRID_TASK by worker $GRID_WORKER (on $HOST): '$runcmd'\n" if $GRID_DEBUG;
1622 $exitstatus=system($runcmd);
1623 if ($SwitchDir) {
1624 chdir("$GRID_JOBDIR/$GRID_WRKDIR");
1625 }
1626 endTask($exitstatus); #taskID is taken from $GRID_TASK
1627 }
1628
1629
1630 sub catchSigs { # true/false
1631 if ($_[0]) {
1632 $SIG{INT}=\&sigHandler;
1633 $SIG{TERM}=\&sigHandler;
1634 }
1635 else {
1636 $SIG{INT}='DEFAULT';
1637 $SIG{TERM}='DEFAULT';
1638 }
1639 }
1640
1641 sub sigHandler {
1642 my $signame=shift;
1643 wrkDie("Signal $signame caught for worker $$ on $HOST, aborting..\n");
1644 }
1645
1646 sub toRetry {
1647 my ($taskID, $taskErrCount)=@_;
1648 return unless $taskID;
1649 $taskErrCount=1 unless $taskErrCount;
1650 my $hr=setXLock($F_RETRYTASKS, 70, 3);
1651 while (<$hr>) {
1652 chomp;
1653 my ($tid, $tec)=split(/\s+/);
1654 if ($tid==$taskID) { #double retry, don't bother
1655 endXLock($hr);
1656 return;
1657 }
1658 }
1659 fappend($hr, "$taskID\t$taskErrCount\n");
1660 endXLock($hr);
1661 }
1662
1663 sub endTask {
1664 return unless $GRID_TASK;
1665 # we MUST be in GRID_WRKDIR
1666 chdir("$GRID_JOBDIR/$GRID_WRKDIR") || die("Error: failed to chdir to $GRID_JOBDIR/$GRID_WRKDIR!");
1667 my ($exitstatus)=@_;
1668
1669 if (!$exitstatus && !$STARTED_GRID_TASK) { # could be a premature failure, like cdbyank not found, etc.
1670 print STDERR "scheduling $GRID_TASK for retry..\n";
1671 toRetry($GRID_TASK, $TASK_ERRCOUNT);
1672 endXLock($TASK_LOCKH) if $TASK_LOCKH;
1673 unlink($TASK_LOCKF);
1674 undef($TASK_LOCKH);undef($TASK_LOCKF);
1675 unlink($F_WRKRUNNING);
1676 catchSigs(0);
1677 undef $GRID_TASK;
1678 return;
1679 }
1680 my $dbstatus;
1681 if (defined($exitstatus)) {
1682 if ($exitstatus==0) { #success
1683 $dbstatus='.';
1684 # update $F_TASKSDONE
1685 my $fh=setXLock($F_TASKSDONE, 110, 5);
1686 if ($fh) {
1687 incFValue($fh, $F_TASKSDONE);
1688 endXLock($fh);
1689 }
1690 }
1691 else { #error status
1692 $TASK_ERRCOUNT++;
1693 print STDERR "task $GRID_TASK failed on $HOST (PID=$$, status='$exitstatus') (worker $GRID_WORKER); error count=$TASK_ERRCOUNT\n" if $GRID_DEBUG;
1694 if ($TASK_ERRCOUNT>$MAX_RETRIES) { #trash it
1695 my $he=setXLock($F_ERRTASKS, 70, 3);
1696 fappend($he, join("\t",$GRID_TASK,$exitstatus,$HOST,$GRID_WRKDIR)."\n");
1697 endXLock($he);
1698 $dbstatus='E';
1699 }
1700 else {#give it another retry chance
1701 toRetry($GRID_TASK, $TASK_ERRCOUNT);
1702 $dbstatus='-';
1703 }
1704 }
1705 endXLock($TASK_LOCKH) if $TASK_LOCKH;
1706 unlink($TASK_LOCKF) if $TASK_LOCKF;
1707 undef($TASK_LOCKH);undef($TASK_LOCKF);
1708 print STDERR "]task $GRID_TASK ended - updating taskDb with status code '$dbstatus'\n" if $GRID_DEBUG;
1709 taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $GRID_TASK, $dbstatus,
1710 $GRID_WORKER, $HOST, $TASK_ERRCOUNT, $exitstatus);
1711 } # defined $exitstatus
1712 else { #no exitstatus given, it was an interrupt signal or otherwise failed task
1713 print STDERR "task $GRID_TASK ended with undefined exit status\n" if $GRID_DEBUG;
1714 toRetry($GRID_TASK, $TASK_ERRCOUNT);
1715 endXLock($TASK_LOCKH) if $TASK_LOCKH;
1716 unlink($TASK_LOCKF) if $TASK_LOCKF;
1717 undef($TASK_LOCKH);undef($TASK_LOCKF);
1718 unlink($F_WRKRUNNING);
1719 }
1720 catchSigs(0);
1721 undef($STARTED_GRID_TASK);
1722 }
1723
1724 sub wrkDie {
1725 my ($msg)=@_;
1726 endTask();
1727 my $grdwrk=$GRID_WORKER;
1728 endWorker();
1729 #remove any other locks left in this worker..
1730 die("Error at worker $grdwrk on $HOST:\n$msg\n");
1731 }
1732
1733 #--onExit, onEnd trigger
1734 sub END {
1735 unless ($NORMAL_ENDING) {
1736 $NORMAL_ENDING=1; #to avoid recursion?
1737 endTask();
1738 endWorker();
1739 #remove all locks
1740 my @locks=keys(%Locks);
1741 foreach my $lock (@locks) {
1742 my $d=$Locks{$lock};
1743 endXLock($lock);
1744 }
1745 }
1746 }
1747
1748 sub fappend {
1749 my $fh=shift(@_);
1750 seek($fh,0,SEEK_END);
1751 print $fh join("\n",@_);
1752 }
1753
1754 sub getLockDir {
1755 my $fname=shift;
1756 my $basename=basename($fname);
1757 my $dirname=dirname($fname);
1758 my $lockdir="$basename";
1759 $lockdir=$dirname.'/'.$lockdir if $dirname;
1760 return ($lockdir, "by.$HOST.$$");
1761 }
1762
1763 sub makeLockFile {
1764 #this is called by a node just before a file lock is requested
1765 my ($fname)=@_; #MUST be a path relative to GRID_JOBDIR
1766 if (index($fname,$GRID_JOBDIR.'/')==0) {
1767 #remove the full path, if there
1768 $fname=substr($fname, length($GRID_JOBDIR)+1);
1769 }
1770 my $fnlock=$fname;
1771 $fnlock=~tr/\/\\/--/s;
1772 my $nodelockf="$GRID_LOCKDIR/$fnlock-lock.by.$HOST.$$";
1773 #--
1774 $nodelockf.='-'.substr(time(),-4);
1775 #--
1776 my $fh;
1777 open($fh, '>'.$nodelockf) || die("Error creating lockfile $nodelockf!\n");
1778 my $thismin=int(time()/60); # time when the lock was first initiated
1779 print $fh "$HOST $$ $thismin $GRID_WORKER\n";
1780 close($fh);
1781 return ($GRID_LOCKDIR.'/'.$fnlock, $nodelockf);
1782 }
1783
1784 sub getFileLock {
1785 #attempts to make $lockfile a hard link to $nodefile which is host/process dependent
1786 my ($lockfile, $nodefile)=@_;
1787 if (link($nodefile, $lockfile)) { #could create link to node lock file
1788 my @stat=stat($nodefile);
1789 my $linkcount=$stat[3];
1790 if ($linkcount>2) {
1791 print STDERR "WARNING: weird lnkcount=$linkcount ($GRID_WORKER, $HOST, task $GRID_TASK)!\n";
1792 }
1793 return ($linkcount>1); #should never be more than 2..
1794 }
1795 else { #cannot create link
1796 return undef;
1797 }
1798 }
1799
1800 #attempts to aquire an exclusive lock on a specific file
1801 #-returns a file handle ref
1802 sub setXLock {
1803 my ($fname, $maxretries, $stalemin)=@_;
1804 $maxretries=80 unless $maxretries;
1805 #
1806 # -- default: locks older than this are
1807 # considered "stale" and removed!
1808 $stalemin=7200 unless $stalemin;
1809 my ($lockfile, $nodefile)=makeLockFile($fname);
1810 my $retries=0;
1811 my $startmin=int(time()/60); #the minute we started trying
1812 #----------- try mkdir
1813 my ($currentLocker, $lockage);
1814 my $lock_age;
1815 my $prevlock;
1816 my $haveLock;
1817
1818 while (!($haveLock=getFileLock($lockfile, $nodefile))) {
1819 if ($retries>$maxretries) {
1820 #tried too many times?
1821 #check for stale lock forgotten here..
1822 if (-f $lockfile) {
1823 #anyone ELSE holding it?
1824 my $l=readFile($lockfile);
1825 my ($lhost, $lpid, $ltime, $worker)=split(/\s+/,$l);
1826 $prevlock="$lhost (pid $lpid, worker $worker)" if $lpid;
1827 # code to check for a stale lock:
1828 $lock_age=int(time()/60)-$ltime;
1829 if ($lock_age>$stalemin) { #previous lock is older than $stalemin minutes
1830 print STDERR "WARNING: removing stale $fname lock from $lhost PID $lpid (worker $worker), age $lockage minutes.\n";
1831 unlink($lockfile);
1832 next; # hopefully we'll get it next time, unless another node steals it..
1833 }
1834 }
1835 last; #too many retries;
1836 }
1837 $retries++;
1838 sleep(3); #pause 3 seconds between attempts
1839 } #----- while failing at getting a lock
1840 if ($haveLock) {
1841 local *FH;
1842 open(FH, ">$nodefile")
1843 || die "Error re-creating host lock file $nodefile ?! ($!)\n"; #should never happen, really
1844 my $thismin= int(time()/60);
1845 print FH "$HOST $$ $thismin $GRID_WORKER\n";
1846 #print FH "$HOST $$ $thismin $GRID_WORKER ".getTime()."\n";
1847 close(FH);
1848 # -- now it's safe to open the actual file being locked..
1849 # has to be a path relative to $GRID_JOBDIR
1850 $fname="$GRID_JOBDIR/$fname" unless $fname=~m/^[\/\\]/;
1851 # --
1852 # -- add O_DIRECT only if you notice syncing/flushing issues for small files
1853 # --
1854 #my $mode= O_RDWR | O_CREAT | O_SYNC | O_DIRECT;
1855 #my $mode=(-f $fname) ? '+<' : '+>'; #create file if not there..
1856 my $fh;
1857 #open($fh, $mode.$fname) || die "setXLock($fname) failed at open($mode.$fname): $!\n";
1858 sysopen($fh, $fname, $sysopen_mode) || die "setXLock($fname) failed at open ($fname): ($!)\n";
1859 $Locks{$fh}=[$lockfile, $nodefile];
1860 return $fh;
1861 }
1862 else {
1863 print STDERR "ERROR getting a lock on $fname ($lockfile -> $nodefile) after $retries attempts!\n";
1864 print STDERR " (blocked by: $prevlock of $lock_age minutes)\n" if $prevlock;
1865 unlink($nodefile);
1866 return undef;
1867 }
1868 }
1869
1870 sub endXLock {
1871 my $fh=shift(@_);
1872 close($fh);
1873 my $d = delete($Locks{$fh});
1874 unless ($d) {
1875 print STDERR "WARNING at endXLock(): no Locks entry found for file handle $fh!\n";
1876 return;
1877 }
1878 my ($locklink, $nodefile)=@$d;
1879 unlink($locklink);
1880 #-- keep these around for debugging purposes:
1881 unlink($nodefile);
1882 }
1883
1884 # readFile(fname/fglobref) : read first or all lines from a given file
1885 # or filehandle glob reference
1886 sub readFile {
1887 my ($f, $nonfatal, $context)=@_;
1888 $context=" ($context)" if $context;
1889 my ($fh, $open);
1890 if (ref($f) eq 'GLOB') { # file handle or glob reference..
1891 $fh=$f;
1892 $f='[fh]';
1893 }
1894 else { #scalar: string = filename
1895 #create if not there!
1896 my $mode=(-f $f) ? '+<' : '+>'; #create if not exists
1897 my $canopen=open($fh, $mode.$f);
1898 unless ($canopen) {
1899 return undef if $nonfatal;
1900 die "readFile($mode $f)$context task $GRID_TASK on $HOST, open error: $!\n";
1901 }
1902 $open=1;
1903 }
1904 local $/="\n";
1905 if (wantarray()) {
1906 my @r=<$fh>;
1907 close($fh) if $open;
1908 return @r;
1909 }
1910 else { #first line only
1911 my $line=<$fh> || '';
1912 close($fh) if $open;
1913 return $line;
1914 }
1915 }
1916
1917 #=================================================================
1918 # Mailer subroutine - relies on Net::SMTP to be installed properly
1919 # --> recognized hash keys: from, to, subj, file
1920 #-----------------------------------------------------------------
1921 sub send_mail {
1922 my $hash=shift;
1923 my $smtp=Net::SMTP->new(Host=>'mailhost');
1924 my $from=$hash->{'from'};
1925 if ($from) {
1926 $from.='@'.$smtp->domain() unless $from=~m/@/;
1927 }
1928 else {
1929 $from=$USER.'@'.$smtp->domain();
1930 }
1931 my $to=$hash->{'to'};
1932 if ($to) {
1933 $to.='@'.$DOMAIN unless $to =~ m/@/;
1934 }
1935 else {
1936 $to=$USER.'@'.$DOMAIN;
1937 }
1938 my $subj=$hash->{'subj'};
1939 my $body=$hash->{'body'};
1940 $smtp->mail($from);
1941 $smtp->to($to);
1942 $smtp->data();
1943 $smtp->datasend("To: $to\n");
1944 if ($subj) {
1945 $smtp->datasend("Subject: $subj\n\n");
1946 }
1947 my $file;
1948 if (defined($hash->{file})) {
1949 local *ADDFILE;
1950 #warning: assumes it's a decent, short text file!
1951 local $/=undef; #read whole file
1952 open(ADDFILE, '<'.$hash->{file}) || die("Error: cannot open file ".$hash->{file}."\n");
1953 $file=<ADDFILE>;
1954 close ADDFILE;
1955 $body.="\n\n".$file;
1956 }
1957 $smtp->datasend("$body\n");
1958 $smtp->dataend();
1959 $smtp->quit;
1960 }
1961
1962 sub getTime {
1963 my $date=localtime();
1964 #get rid of the day so Sybase will accept it
1965 (my $wday,$date)=split(/\s+/,$date,2);
1966 return $date;
1967 }

Properties

Name Value
svn:executable *