ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/gclib/scripts/gridx
Revision: 71
Committed: Fri Sep 23 19:23:49 2011 UTC (12 years, 11 months ago) by gpertea
File size: 66008 byte(s)
Log Message:
gridx fixed to export PERL5LIB and PYTHONPATH

Line File contents
1 #!/usr/bin/perl
2 require 5.8.0;
3 use strict;
4 use Getopt::Std;
5 use File::Basename; #for basename, dirname
6 use POSIX "sys_wait_h"; # mostly for the handy uname() function
7 use Cwd qw(abs_path cwd);
8 use Fcntl qw(:DEFAULT :seek); #mostly for the SEEK constants
9 use FindBin; use lib "$FindBin::Bin";
10
11 my $NORMAL_ENDING=1; #set to 0 while working, to 1 upon normal ending of script (i.e. no die())
12 my $USER=$ENV{USER} || POSIX::cuserid(); #it may not be there for condor workers
13 my $PWD=cwd(); #from Cwd module
14 my $PERL_BIN='/usr/bin/perl';
15 my $MAX_RETRIES=3; #-- how many times to try a failed task
16 my $F_WRKCOUNT='.wrkCount'; # count of currently running workers
17 my $F_ALLDONE='.all.Done.';
18 my $F_WRKSTART='.wrkStarted'; #count of (ever) started workers
19 my $F_LASTTASK='.lastTask'; # maximum task# ever started
20 my $F_TASKSDONE='tasksDone'; # number of tasks successfully finished
21 my $F_ERRTASKS='err.tasks'; #LIST of task#s which returned non-zero status
22 # even after MAX_RETRIES
23 my $F_RETRYTASKS='retry.tasks'; #stack of task#s which returned non-zero status
24 #less then MAX_RETRIES times
25 my $F_WRKRUNNING='.running-worker'; #semaphore file in each worker directory
26 my $F_ENDCMD='epilogue';
27 my $F_WRKDIR='workdir';
28 my $F_NOTIFY='notify';
29 my $F_TASKDB='taskDb';
30 my $GRID_DEBUG;
31 my $starting_dir;
32 my $STARTED_GRID_TASK;
33 my $SMPChildren=0; #SMP case: number of children running
34 my %SMPChildren=(); #set of child PIDs
35 my %Locks = (); # filehandle -> [lockfilelink, hostlockfile]
36 my $HOSTNAME = (&POSIX::uname)[1]; # can't trust HOST envvar under condor, because
37 # it will either inherit HOST and HOSTNAME from submitter env for getenv=TRUE,
38 # OR it will not set anything for getenv=FALSE
39 chomp($HOSTNAME);
40 $HOSTNAME=lc($HOSTNAME);
41 my $HOST=$HOSTNAME;
42 my ($DOMAIN)=($HOST=~m/^[\w\-]+\.(.+)/);
43 unless ($DOMAIN) {
44 if ($HOST=~m/^flicker|^wren|^ibis/) {
45 $DOMAIN='umiacs.umd.edu';
46 }
47 unless ($DOMAIN) {
48 $DOMAIN=($PWD=~/^\/export|^\/home/) ? 'dfci.harvard.edu' : 'umiacs.umd.edu';
49 }
50 }
51 ($HOST)=($HOST=~m/^([\w\-]+)/);
52 $ENV{HOST}=$HOST;
53 $ENV{MACHINE}=$HOST;
54 $ENV{HOSTNAME}=$HOSTNAME;
55 $ENV{USER}=$USER;
56 #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! - site specific -
57 #---------- modify the SITE-SPECIFIC paths here if needed:
58 #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&!
59 # ===== PATH management ====
60 # default: assume architecture/PATH/LIB uniformity across nodes
61 # you can change this by adding your own worker/server paths here
62 my $binpath=$ENV{PATH};
63 my $libpath=$ENV{LD_LIBRARY_PATH};
64 my $perllib=$ENV{PERLLIB};
65 my $perl5lib=$ENV{PERL5LIB};
66 my $pythonpath=$ENV{PYTHONPATH};
67
68 # use home directory for symbolic links to working directories
69 my $homebase=$ENV{HOME};
70 $homebase=~s/(\/[^\/]+$)//;
71
72 $homebase='/fs/wrenhomes' unless $homebase; #this is just valid for UMIACS here
73 # sometimes the HOME path is not defined within the condor job,
74 # please use your own globally mounted path instead
75
76 # default grid engine used:
77 #my $GRID_ENGINE='sge'; # can also be: 'condor' and 'smp'
78 my ($GRID_MONHOME, $GRID_ENGINE) = ($DOMAIN=~/dfci/) ? ('/home', 'sge')
79 : ($homebase, 'condor');
80 my $sysopen_mode = O_RDWR | O_CREAT ;
81 $sysopen_mode |= O_DIRECT if $DOMAIN!~/umiacs/;
82
83 #print STDERR "|$GRID_MONHOME|\n";
84 #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! -
85 #
86 my $morehints=qq{
87 The following entries will be created by gridx:
88 <GRID_JOBDIR>/epilogue - only if -e option was given, a file containing the
89 <end_cmd> parameter
90 <GRID_JOBDIR>/notify - a file containing the e-mail address to notify
91 if -m was given
92 <GRID_JOBDIR>/.lastTask - a file keeping track of the last task# scheduled
93 <GRID_JOBDIR>/.wrkStarted - a file keeping track of the number of
94 <GRID_JOBDIR>/.wrkCount - a file keeping track of the number of
95 currenly running workers
96 <GRID_JOBDIR>/err.tasks - list of all error-terminated tasks, after retry
97 (should be zero or empty if all tasks
98 finished successfully)
99 <GRID_JOBDIR>/retry.tasks - list of all error-terminated tasks that will
100 still be retried
101 <GRID_JOBDIR>/taskDb - pseudo-fasta db with the info & status
102 of each task
103 <GRID_JOBDIR>/taskDb.cidx - the cdbfasta index of the above db file
104 <GRID_JOBDIR>/locks/ - lockfiles/links are placed here
105 <GRID_JOBDIR>/running/ - while a task is processed, a file entry
106 called task_<GRID_TASK> will be created in
107 here for locking purposes; such file will have
108 a line with processing info for the current
109 task: <hostname> <pid> <CPU#>
110 <GRID_JOBDIR>/wrk_<CPU#>/ - working directory exclusive to each worker
111 process, one per CPU (1 <= CPU <= maxCPUs);
112 also contains stderr and stdout log files
113 for each worker process (wrk_stderr.log and
114 wrk_stdout.log)
115 * if -i option is used, a slice db file is created for <fastadb> with the
116 positions of all slices of <numseqs> fasta records in the <fastadb> file.
117 This file is called <GRID_JOBDIR>/taskDb
118 * if -b option was given: executes the <begin_script> in the current submit
119 directory (the one above <GRID_JOBDIR>)
120 * if all tasks completed successfully and if -e option was given
121 the <end_script> is executed in the <GRID_JOBDIR> subdirectory
122 };
123
124 my $usage=qq{
125 Submit a list of commands to the grid. Can also automatically
126 slice a multi-fasta file into chunks and pass the slice file names as
127 parameters to a <cmd>:
128
129 Usage:
130
131 gridx \{-f <cmds_file> | -i <fastadb> | -r <numtasks> \} -p <numprocs>
132 \{-U|-u <slot#>\} [-n <numseqs>] [-s <skipseqs>] [-t <totalseqs>]
133 [-g <engine>] [-d <dir_prefix>] [-a] [-M <monitoring_basedir>]
134 [-L <local_wrk_dir>] [-O <log_files_dir>] [-S] [-T] [-v]
135 [-x <node_exclude_list>] [-y <node_targets_list>]
136 [-b <begin_script>] [-e <end_script>] [-q] [-m <e-mail>]
137 [-c] <cmd> [-C] [<cmd_args>..]
138
139 Unless -J option is given, gridx will submit the job to the grid,
140 creating a <GRID_JOBDIR> subdirectory (which on Condor has the format:
141 gridx-<hostname>_<job_id>). A file called 'cmdline-<numtasks>.cmd'
142 will also be created there containing the current gridx commmand line.
143
144 Options:
145 -g grid engine to use, can be 'smp', 'condor' or 'sge' (default: $GRID_ENGINE)
146 -p maximum number of grid CPUs to allocate for this job
147 -f provide a file with a list of commands to be run on the grid (one
148 command per line per CPU)
149 -i grid processing of slices of the given multi-fasta file <fastadb>;
150 the total number of tasks/iterations is given by the number of slices
151 in <fastadb> (and the -n <numseqs> option); the script will
152 automatically create the slice file in the worker directory before
153 <cmd> is run with these parameters:
154 <fastaslice> <numseqs> <slice#> <is_last> <skipped> <total> <cmd_args>
155 -n slice size: for -i option, how many fasta records from <fastadb>
156 should be placed into a slice file for each iteration (default: 2000)
157 -C legacy option for psx-like usage (to pass <cmd_args>)
158 -r submit an array job with <numtasks> iterations/tasks (default: 1)
159
160 -S switch to (stay in) the current directory (where gridx was launched from)
161 before each task execution (especially useful for -f option)
162 -a (psx legacy): inherit the submitter environment (default)
163 -b prologue script to be executed BEFORE the grid job is submitted
164 -e epilogue script to be executed AFTER all the grid tasks were completed,
165 and only if ALL the tasks terminated successfully.
166 -m send an e-mail to the given address when all tasks are finished
167 -v do not try to validate <cmd> (assume it is going to be valid on the grid
168 nodes)
169 -x provide a list of machines (host names) that should be excluded
170 from the condor pool for this submitted job ('condor' engine only)
171 -y the submitted job is only allowed to run on the machines on this list
172 ('condor' engine only)
173 -d create <dir_prefix> prefixed symbolic links to worker directories
174 in the current directory
175 -O place all Condor log files (condor stderr/stdout redirects)
176 into <log_files_dir>
177 -M parent directory for creating a "monitoring" symlink; a symbolic link to
178 the current <GRID_JOBDIR> will be created there, under
179 <monitoring_basedir>/$USER/ (default: home directory)
180 -T do not exit after the job is submitted, wait around instead
181 until the last worker terminates and prints some job completion stats
182 -U force one worker per machine (only for 'condor' engine)
183 -u force each worker to run only on CPU <slot#> (between 1 and 12) of each
184 machine (only for 'condor' engine)
185 For -r option, the provided <cmd> could make use of the environment variables
186 GRID_TASK, GRID_TASKLAST to determine the current iteration being executed.
187
188 Unless -S option was used, each <cmd> iteration will be executed in its own
189 worker subdirectory <GRID_JOBDIR>/wrk_<CPU#>
190
191 Job monitoring/resuming/stopping (-J mode):
192
193 gridx [-m e-mail] [-e <end_script>] [-K|-R] -J <jobID>
194
195 If -J option is provided, gridx will report the status of the job <jobID>
196 which must have been submitted by a previous, "regular" use of gridx; it relies
197 on the the symbolic link <monitor_basedir>/$USER/gridx-<jobID> which must
198 be valid (<monitor_basedir> can be set by -M).
199
200 Additional/alternate actions for -J mode:
201 -e update the <end_script> for the *running* <jobID> or for the <jobID>
202 rerun (if -R option is also given); does not work with -K option
203 -m update the e-mail notification option for job <jobID>
204 -K kill/abandon/terminate ALL pending tasks for grid job jobID
205 (trying to remove all running/pending tasks on the grid)
206 -R rerun/resubmit all the unfinished/unprocessed or unsuccessful tasks
207 for <GRID_JOB>; this assumes -K -J <JOB_ID> was given first (so there
208 are no pending tasks) and then will submit a new job in the same working
209 directory, renaming the GRID_JOBDIR accordingly while workers
210 will now *skip* all the tasks found with a "Done" status ('.') in the
211 <GRID_JOBDIR>/taskDb file
212 }; #'
213
214
215 RESUME_JOBID:
216 my @ar=@ARGV;
217 while ($ar[0] eq '-Z') { shift(@ar); shift(@ar); }
218 my $CMDLINE="$FindBin::Script\t".join("\t",@ar);
219 # parse script options
220 print STDERR "Running on $HOST (domain: $DOMAIN): $0 ".join(' ',@ARGV)."\n";
221 getopts('USWHM:O:NKRDTqvaJZ:L:u:r:x:y:i:Ff:n:t:d:s:p:m:g:b:e:c:C:') || die($usage."\n");
222 umask 0002;
223 if ($Getopt::Std::opt_H) {
224 print STDERR $usage;
225 die($morehints."\n");
226 }
227 my $SwitchDir=$Getopt::Std::opt_S;
228 $NORMAL_ENDING=0;
229 $GRID_DEBUG=$Getopt::Std::opt_D;
230 #print STDERR "Host=$HOST, Domain=$DOMAIN\n" if $GRID_DEBUG;
231
232 my $GRID_DIRPREFIX=$Getopt::Std::opt_d;
233 my ($submitJob, $removeJob);
234 $GRID_ENGINE=lc($Getopt::Std::opt_g) if $Getopt::Std::opt_g;
235 if ($GRID_ENGINE eq 'sge') {
236 ($submitJob, $removeJob)=(\&submitJob_sge, \&removeJob_sge);
237 }
238 elsif ($GRID_ENGINE eq 'condor') {
239 ($submitJob, $removeJob)=(\&submitJob_condor, \&removeJob_condor);
240 }
241 elsif ($GRID_ENGINE eq 'smp') {
242 ($submitJob, $removeJob)=(\&submitJob_smp, \&removeJob_smp);
243 }
244 else {
245 die("Error: invalid grid engine given (only 'sge', 'condor' or 'smp' are accepted)!\n");
246 }
247 my $UniqueVM=$Getopt::Std::opt_U;
248
249 my $UniqVMreq=$Getopt::Std::opt_u;
250 $UniqVMreq=int($UniqVMreq) if $UniqVMreq;
251 die("Error: use either -U or -u option, not both!\n") if $UniqueVM && $UniqVMreq>0;
252 # - exclude the following machines
253 my @xmachinelist=split(/\,/,$Getopt::Std::opt_x); #do NOT allow jobs to run on these machines
254 my @ymachinelist=split(/\,/,$Getopt::Std::opt_y); #only allow jobs to run on these machines, not others
255 #submitJob MUST use the globals:
256 # GRID_CMD, GRID_TASKLAST, GRID_MONHOME and update GRID_JOB
257
258 $GRID_MONHOME=$Getopt::Std::opt_M if $Getopt::Std::opt_M;
259 #$GRID_MONHOME=$ENV{HOME} unless $GRID_MONHOME;
260 $GRID_MONHOME.='/'.$USER unless $GRID_MONHOME=~m/$USER$/;
261 #
262
263 die("Error: directory $GRID_MONHOME should already be created! Aborting..\n")
264 unless (-d $GRID_MONHOME);
265
266 my $mailnotify=$Getopt::Std::opt_m;
267 #-------- GLOBALs ---------------------------
268 my $GRID_JOBDIR;
269 my $GRID_JOB;
270 my $GRID_LOCKDIR;
271 my $GRID_TASKLAST;
272 my $GRID_CMD; # user's command and arguments
273 my $GRID_CMDFILE=$Getopt::Std::opt_f; # one line per run.. with arguments for command <cmd>
274 my $GRID_USECMDLIST=1 if $GRID_CMDFILE || $Getopt::Std::opt_F;
275
276 my $GRID_PSXFASTA; #if -i was used
277 my $GRID_PSXSTEP; #if -i was used (-n value)
278 my $GRID_PSXSKIP=0; # -s option
279 my $GRID_PSXTOTAL=0; # -t option
280
281 my $GRID_NUMPROCS=0; # -p option
282 my $GRID_RESUME=$Getopt::Std::opt_R || $Getopt::Std::opt_Z;
283 my $GRID_LOCAL_JOBDIR=$Getopt::Std::opt_L;
284 #---------- worker side global vars:
285 my $GRID_ENVSET=0; #was the environment set?
286 my $GRID_WORKER=0; # worker#
287 #my $GRID_NOWRKLINKS=1;
288 my $GRID_LOGDIR=$Getopt::Std::opt_O;
289 my $GRID_TASK; # dynamic -- task iteration#
290 my $TASK_ERRCOUNT; # dynamic -- current task error (retry) counter
291 my $TASK_DATA; # current task's user data as stored in taskDb
292 my $TASK_LOCKH; #file handle for the current task lock file
293 my $TASK_LOCKF; #file name for the current task lock file
294 my $GRID_WRKDIR; #only for the worker case, it's the current worker's subdirectory
295
296 if ($Getopt::Std::opt_J) {
297 #################################################################
298 # gridx job monitoring use:
299 #---------------------------------------------------
300 # gridx -J [-m <e-mail>] [-M <mondir>] [-R | -K] <jobid>
301 #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
302 my ($jobid)=shift(@ARGV);
303 $jobid=~tr/ //d;
304 unless ($jobid) { # list mode
305 my $fmask="$GRID_MONHOME/gridx-*";
306 my @jdirs=<${fmask}>;
307 if (@jdirs>0) {
308 print STDOUT "The following jobs were found (in $GRID_MONHOME):\n";
309 foreach (@jdirs) {
310 my ($jobid)=(m/gridx\-(\w[\w\-]+)$/);
311 print " $jobid\n";
312 }
313 }
314 else {
315 print STDOUT "No jobs were found (in $GRID_MONHOME).\n";
316 }
317 $NORMAL_ENDING=1;
318 exit(0);
319 }
320 #a valid $jobid was given, hopefully
321 $jobid=lc($jobid);
322 my $subdir='gridx-'.$jobid;
323 my $jobdir="$GRID_MONHOME/gridx-$jobid";
324 unless (-d $jobdir) { #try current directory..
325 if (-d $subdir) {
326 $jobdir=$subdir;
327 }
328 else {
329 die "No such job found($jobid) - neither $jobdir nor $subdir exist!\n";
330 }
331 }
332 #print STDERR "..found jobdir='$jobdir'\n";
333 chdir($jobdir) || die("Error at chdir($jobdir)!\n");
334 #my $msg=jobSummary($mailnotify);
335 my $msg=jobSummary();
336 print STDOUT $msg."\n";
337 if ($Getopt::Std::opt_K) {
338 &$removeJob($jobid);
339 }
340 elsif ($Getopt::Std::opt_R) { #resume/rerun!
341 #chdir($jobdir) || die("Error at chdir($jobdir)!\n");
342 my @cmdfile=<cmdline-*.cmd>;
343 die "Error getting the cmdline-*.cmd from current directory!\n"
344 unless @cmdfile;
345 my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/);
346 die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0;
347 my $cmdline=readFile($cmdfile[0]);
348 chomp($cmdline);
349 my @args=split(/\t/,$cmdline);
350 die("$jobdir/$F_TASKDB and/or index not valid - cannot resume!\n")
351 unless -s $F_TASKDB && -s "$F_TASKDB.cidx";
352 shift(@args); #discard gridx command itself
353 chdir('..'); #go in the original working dir
354 $PWD=cwd(); #from Cwd module
355 $CMDLINE="$FindBin::Script\t".join("\t",@args);
356 @ARGV=('-Z',$jobid, @args);
357 undef($Getopt::Std::opt_J);
358 undef($Getopt::Std::opt_R);
359 goto RESUME_JOBID;
360 }
361 $NORMAL_ENDING=1;
362 exit(0);
363 }
364 elsif ($Getopt::Std::opt_W) {
365 ##########################################################
366 # gridx Worker use:
367 #---------------------------------------------------------
368 # This is the actual job that gets submitted
369 # gridx -W <cmd> <cmd_args>
370 # At runtime the environment should be set accordingly
371 #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
372 beginWorker(); #set up environment and worker directory
373 #(wrk_NNNNN) and chdir() to it
374 $GRID_USECMDLIST=$Getopt::Std::opt_F;
375 my @cmd=@ARGV; # cmd and cmdargs
376 while (my $taskId = getNextTask()) {
377 runTask($taskId, @cmd);
378 }
379 my $runningworkers=endWorker();
380 my $exitcode=0;
381 if ($runningworkers==0) { #this was the last worker!
382 #run any JOB finishing/clean up code
383 chdir($GRID_JOBDIR);
384 my $epilogue=readFile($F_ENDCMD) if -s $F_ENDCMD;
385 if ($epilogue) {
386 chomp($epilogue);
387 #only run it if ALL tasks succeeded
388 my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone);
389 if ($tasksdone==$GRID_TASKLAST && !(-s $F_ERRTASKS)) {
390 runCmd($epilogue);
391 }
392 }
393 my $notify=readFile($F_NOTIFY) if -s $F_NOTIFY;
394 chomp($notify);
395 jobSummary($notify);
396 local *FDONE;
397 open(FDONE, '>'.$GRID_JOBDIR.'/'.$F_ALLDONE) || die("Error creating $GRID_JOBDIR/$F_ALLDONE\n");
398 print FDONE "done.\n";
399 close(FDONE);
400 } #last worker ending
401 $NORMAL_ENDING=1;
402 exit(0);
403 }# -- worker use case
404
405
406 ##########################################################
407 #
408 # gridx Submit use:
409 #
410 ##########################################################
411
412 my $gridBeginCmd=$Getopt::Std::opt_b;
413 my $gridEndCmd=$Getopt::Std::opt_e;
414 $GRID_RESUME=$Getopt::Std::opt_Z; #caused by an initial -J -R request
415 $GRID_TASKLAST=$Getopt::Std::opt_r;
416 #--submit specific globals -- temporary
417 my $TASKDB;
418 my $JOBID;
419 #--
420 unless ($GRID_CMDFILE) {
421 $GRID_CMD=$Getopt::Std::opt_c || shift(@ARGV);
422 die "$usage\nError: no command given!\n" unless $GRID_CMD;
423 unless ($Getopt::Std::opt_v) {
424 $GRID_CMD = getCmdPath($GRID_CMD) ||
425 die "Error: command $GRID_CMD not found in PATH!\n";
426 }
427 $GRID_CMD.=' '.$Getopt::Std::opt_C if $Getopt::Std::opt_C;
428 $GRID_CMD.=' '.join(' ',@ARGV) if @ARGV;
429 }
430 else {
431 $GRID_CMD='.';
432 }
433 $GRID_NUMPROCS=$Getopt::Std::opt_p || 1; #numer of workers submitted
434
435 if ($GRID_PSXFASTA=$Getopt::Std::opt_i) { #psx emulation case
436 die "Error: -r and -i are mutually exclusive options!\n" if $GRID_TASKLAST;
437 $GRID_PSXSTEP=$Getopt::Std::opt_n || 1;
438 #it will be moved into GRID_JOBDIR right after submit
439 $GRID_PSXSKIP=$Getopt::Std::opt_s || 0;
440 $GRID_PSXTOTAL=$Getopt::Std::opt_t || 0;
441 #$GRID_NOWRKLINKS=$Getopt::Std::opt_N;
442 }
443 else { $GRID_TASKLAST=1 unless $GRID_TASKLAST; } #one shot run
444 prepareTaskDb(); #builds a taskDb in the current directory for now
445 #checks global $GRID_PSXFASTA and $GRID_PSXSTEP
446 #sets TASKDB to the file name, and GRID_TASKLAST is updated as needed
447
448 if ($gridBeginCmd) {
449 system($gridBeginCmd) && die("Error: exit status $? returned by prologue command: '$gridBeginCmd'\n");
450 }
451 $JOBID = &$submitJob({PATH=> $binpath, PYTHONPATH=> $pythonpath,
452 LD_LIBRARY_PATH=>$libpath, PERLLIB=>$perllib, PERL5LIB=>$perl5lib} );
453 #-- submitJob should also call setupJobDir() to create the gridx-<JOBID> subdirectory
454 #-- and move/rename the taskDb in there.
455 die("Error: no job ID has been obtained!\n") unless $JOBID;
456 if ($Getopt::Std::opt_T) { #wait for all children to finish
457 my $wstat;
458 chdir($GRID_JOBDIR);
459 do {
460 sleep(5);
461 } until (-e $F_ALLDONE);
462 my $msg=jobSummary($mailnotify);
463 my $exitcode = (-s $F_ERRTASKS) ? `wc -l $F_ERRTASKS` : 0;
464 chomp($exitcode);
465 print STDOUT $msg."\n";
466 chdir($PWD); #this should be the original working directory
467 $NORMAL_ENDING=1;
468 exit($exitcode);
469 }
470 $NORMAL_ENDING=1;
471 exit(0);
472 # if ($notify) {
473 # #qsub -o /dev/null -N "$cmd-$jobid-($numtasks)-watcher" -hold_jid $jobid -b y -j y $notify sleep 0
474 # my $sgecmd=`which sgepl`;
475 # qsub -o $GRID_MONHOME/.gridjob_$jobid/watcher.log -N "$cmdname-$jobid-($numtasks)-watcher" \
476 # -hold_jid $jobid -b y -j y -V /bin/tcsh -f $sgecmd -Done $cmdname $jobid $numtasks
477 # echo "Submitted watchdog job $cmdname-$jobid-($numtasks)-watcher:"
478 # echo " $sgecmd -Done $cmdname $jobid $numtasks"
479 # }
480
481 #**********************************************************
482 #******************* SUBROUTINES **************************
483
484
485
486 #******************** SUBMIT SIDE *************************
487
488 =head2 ----------- taskDbRec -----------
489
490 taskDbRec($fhdb, $jobId, $command_line..)
491
492 Creates (writes) a record into the taskDb file open for writing
493 with the file handle $fhdb. This subroutine takes care of
494 formatting the fixed length defline (header) of the job record.
495
496 This subroutine should only be used by the program which
497 creates the taskDb. After all the records are added to the taskDb;
498 the taskDb should be eventually indexed with cdbfasta.
499
500 =cut
501
502 sub taskDbRec {
503 my ($fh, $taskId, @userdata)=@_;
504 my $rec='>'.$taskId."\t{-|-|----|----|--------}\t{".('-' x 25).'}';
505 $rec.="\t".join(' ',@userdata) if @userdata;
506 print($fh $rec."\n");
507 }
508
509
510 sub prepareTaskDb {
511 if ($GRID_PSXFASTA) { #-i given, psx mode
512 $GRID_PSXFASTA=getFullPath($GRID_PSXFASTA, 1);
513 die ("Error: prepareTaskDb() called with PSXFASTA but no PSXSTEP!\n")
514 unless $GRID_PSXSTEP;
515 my $basename=getFName($GRID_PSXFASTA);
516 $TASKDB='gridx-psx-'.$basename.
517 ".n$GRID_PSXSTEP.s$GRID_PSXSKIP.t$GRID_PSXTOTAL".'.taskDb';
518 if ($GRID_RESUME) {
519 #taskDb must already be there!
520 my $r=`cdbyank -s gridx-$GRID_RESUME/taskDb.cidx`;
521 ($GRID_TASKLAST)=($r=~m/\nNumber of records:\s+(\d+)\n/s);
522 die "Error: couldn't determine number of records in gridx-$GRID_RESUME/taskDb\n"
523 unless $GRID_TASKLAST>0;
524 return;
525 }
526
527 #-- iterate through the fasta file, create slice index
528 # and the index for the slice index..
529 local *TASKDB;
530 local *PSXFASTA;
531 open(PSXFASTA, $GRID_PSXFASTA) || die "Error opening file $GRID_PSXFASTA\n";
532 binmode(PSXFASTA);
533 open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n";
534 my $foffset=0;
535 my $rcount=0;
536 my $numrecs=0;
537 $GRID_TASKLAST=0;
538 while (<PSXFASTA>) {
539 my $linelen=length($_);
540 if (/^>/) {
541 $rcount++;
542 if ($rcount>$GRID_PSXSKIP) {
543 $numrecs++;
544 if (($numrecs-1) % $GRID_PSXSTEP == 0) {
545 $GRID_TASKLAST++;
546 taskDbRec(\*TASKDB, $GRID_TASKLAST, $foffset);
547 }
548 }
549 last if ($GRID_PSXTOTAL>0 && $numrecs>$GRID_PSXTOTAL);
550 }
551 $foffset+=$linelen;
552 }
553 close(PSXFASTA);
554 #&taskdbRec(\*TASKDB, $GRID_TASKLAST, $foffset)
555 # unless ($numrecs-1) % $GRID_PSXSTEP == 0;
556 close(TASKDB);
557 }
558 elsif ($GRID_CMDFILE) { # -f option given
559 return if ($GRID_RESUME); #taskDb must already be there!
560 $TASKDB="gridx.$GRID_CMDFILE.taskDb";
561 local *TASKDB;
562 open(CMDFILE, $GRID_CMDFILE) || die "Error opening file $GRID_CMDFILE\n";
563 open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n";
564 my $i=1;
565 while(<CMDFILE>) {
566 next if m/^\s*#/;
567 chomp;
568 taskDbRec(\*TASKDB, $i, $_);
569 $i++;
570 }
571 close(TASKDB);
572 close(CMDFILE);
573 $GRID_TASKLAST=$i-1;
574 }
575 elsif ($GRID_TASKLAST) { # -r option given
576 return if ($GRID_RESUME); #taskDb must already be there!
577 $TASKDB="gridx.r$GRID_TASKLAST.taskDb";
578
579 local *TASKDB;
580 open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n";
581 for (my $i=1;$i<=$GRID_TASKLAST;$i++) {
582 taskDbRec(\*TASKDB, $i, $GRID_CMD);
583 }
584 close(TASKDB);
585 }
586 else { return; }
587 runCmd("cdbfasta $TASKDB");
588 }
589
590 sub getFName {
591 return basename($_[0]);
592 }
593
594 sub getFDir {
595 return dirname($_[0]);
596 }
597
598 sub getFullPath {
599 my ($fname, $check)=@_;
600 die("Error: file $fname does not exist!\n") if $check && !-r $fname;
601 return abs_path($fname); #Cwd module
602 }
603
604 #== getCmdPath -- checks for executable in the PATH if no full path given
605 # tests if the executable is a text file and interpreter was requested
606 sub getCmdPath {
607 my $cmd=$_[0];
608 my $fullpath;
609 my $checkBinary=wantarray();
610 if ($cmd =~ m/^\//) {
611 $fullpath = (-x $cmd) ? $cmd : '';
612 }
613 elsif ($cmd =~ m/^\.+\//) { #relative path given
614 $fullpath= (-x $cmd) ? abs_path($cmd) : '';
615 }
616 else { #we search in the path..
617 my @paths=split(/:/, $ENV{'PATH'});
618 foreach my $p (@paths) {
619 if (-x $p.'/'.$cmd) {
620 $fullpath=$p.'/'.$cmd;
621 last;
622 }
623 }
624 }#path searching
625 if ($checkBinary) { #asked for interpreter, if any
626 if ($fullpath) {
627 my $interpreter='';
628 if (-r $fullpath && -T $fullpath) {#readable text file, look for bang line
629 open(TFILE, $fullpath);
630 my $linesread=1;
631 while ($linesread<10) {#read only the first 10 lines..
632 $_=<TFILE>;
633 chomp;
634 if (m/^\s*#\!\s*(\S.+)/) {
635 $interpreter=$1;
636 last;
637 }
638 $linesread++;
639 }
640 $interpreter=~s/\s+$//;
641 }
642 return ($fullpath, $interpreter);
643 }
644 else { return (); } #cmd not found;
645 }
646 else { return $fullpath; }
647 }
648
649 sub runCmd {
650 my ($cmd, $jobid)=@_;
651 my $exitstatus=system($cmd);
652 if ($exitstatus != 0) {
653 if ($jobid) {
654 &$removeJob($jobid);
655 }
656 die("Error at running system command: $cmd\n");
657 }
658 }
659
660
661 sub removeJob_sge {
662 my $jobid=shift(@_);
663 runCmd("qdel -f $jobid");
664 }
665
666 sub removeJob_condor {
667 my $jobid=shift(@_); #machine+'_'+job#
668 #must be on the same machine that submit was issuedlh
669 my ($hostname, $job)=($jobid=~m/([\w\-]+)_(\d+)$/);
670 die("Error parsing hostname, job# from $jobid!\n")
671 unless $hostname && ($job>0);
672 #print STDERR "$hostname, $HOST, $jobid, $job\n";
673 if (lc($hostname) eq lc($HOST)) { #local host
674 runCmd("condor_rm $job");
675 }
676 else {
677 runCmd("condor_rm -name $hostname $job");
678 }
679 }
680
681
682 sub smpTaskReaper { # takes care of dead children $SIG{CHLD} = \&taskReaper;
683 my $childpid;
684 while (($childpid = waitpid(-1, WNOHANG)) > 0) {
685 $SMPChildren --;
686 delete $SMPChildren{$childpid};
687 }
688 $SIG{CHLD}=\&smpTaskReaper;
689 }
690
691 sub smpTaskKiller { # signal handler for SIGINT
692 local($SIG{CHLD}) = 'IGNORE'; # we're going to kill our children
693 kill 'INT' => keys %SMPChildren;
694 }
695
696 sub removeJob_smp {
697 taskKiller();
698
699 }
700
701 sub submitJob_sge {
702 my ($envhash)=@_;
703 # submit and array job, SGE style
704 my $array='';
705 if ($GRID_NUMPROCS > 1) {
706 $array="-t 1-$GRID_NUMPROCS";
707 }
708 #append our GRID_ environment
709 my $envparam="-v 'GRID_ENGINE=sge,".
710 "GRID_JOBDIR=$PWD,".
711 "GRID_TASKLAST=$GRID_TASKLAST,".
712 "GRID_MONHOME=$GRID_MONHOME,".
713 "GRID_CMD=$GRID_CMD,".
714 "GRID_DIRPREFIX=$GRID_DIRPREFIX";
715 $envparam.=",GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
716 $envparam.=",GRID_PSXFASTA=$GRID_PSXFASTA".
717 ",GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
718 $envparam.= ",GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
719 $envparam.= ",GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
720 $envparam.= ",GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR" if $GRID_LOCAL_JOBDIR;
721 if (keys(%$envhash)>0) {
722 $envparam.=',';
723 my @envars;
724 while (my ($env, $val)= each(%$envhash)) {
725 push(@envars, $env.'='.$val);
726 }
727 #$envparam.=" '".join(',',@envars)."'";
728 $envparam.=join(',',@envars);
729 }
730 $envparam.="'";
731 #--
732
733 my $sub="qsub -cwd -b y $envparam";
734 #$sub.="-e $errout" if $errout;
735 #$sub.="-o $stdout" if $stdout;
736 my $logdir='';
737 if ($GRID_LOGDIR) {
738 #separate log dir given
739 $logdir=$GRID_LOGDIR;
740 if (-d $GRID_LOGDIR) {
741 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
742 }
743 else {
744 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
745 }
746 $logdir.='/' unless $logdir=~m/\/$/;
747 }
748 my $otherflags='';
749 $otherflags.=' -D' if $GRID_DEBUG;
750 #$otherflags.=' -N' if $GRID_NOWRKLINKS;
751 $otherflags.=' -F' if $GRID_USECMDLIST;
752 $otherflags.=' -S' if $SwitchDir;
753 my $subcmd= "$sub $array $PERL_BIN $0 $otherflags -W $GRID_CMD";
754 print STDERR "$subcmd\n" if $GRID_DEBUG;
755 my $subout = `$subcmd`;
756 #print STDOUT $subout;
757 #my ($jobid)=($subout=~/Your\s+job\S*\s+(\d+)/s);
758 my ($jobid)=($subout=~/Your\s+job\S*\s+(\d+)/s);
759 die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid);
760
761 setupJobDir($jobid);
762
763 return $jobid;
764 }
765
766
767 sub submitJob_smp {
768 my ($envhash)=@_;
769 my $jobid='smp_'.$$;
770 $ENV{GRID_ENGINE}='smp';
771 @ENV{'GRID_JOBDIR', 'GRID_TASKLAST', 'GRID_MONHOME', 'GRID_CMD','GRID_DIRPREFIX'}=
772 ($PWD."/gridx-$jobid", $GRID_TASKLAST, $GRID_MONHOME, $GRID_CMD,$GRID_DIRPREFIX);
773 $ENV{GRID_RESUME}=$GRID_RESUME;
774 $ENV{GRID_JOB}=$jobid;
775 @ENV{'GRID_PSXFASTA','GRID_PSXSTEP'}=($GRID_PSXFASTA, $GRID_PSXSTEP) if $GRID_PSXFASTA;
776 $ENV{GRID_PSXSKIP}=$GRID_PSXSKIP if $GRID_PSXSKIP;
777 $ENV{GRID_PSXTOTAL}=$GRID_PSXTOTAL if $GRID_PSXTOTAL;
778
779 $ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR."/gridx-$jobid" if $GRID_LOCAL_JOBDIR;
780 if (keys(%$envhash)>0) {
781 while (my ($env, $val)= each(%$envhash)) {
782 $ENV{$env}=$val;
783 }
784 }
785 # Fork off the children
786 my $pid;
787 setupJobDir($jobid); #we do this one in advance..
788 $SIG{CHLD}=\&smpTaskReaper;
789 print STDERR "..forking $GRID_NUMPROCS workers..\n" if $GRID_DEBUG;
790 my $logdir='';
791 if ($GRID_LOGDIR) {
792 #separate log dir given
793 $logdir=$GRID_LOGDIR;
794 if (-d $GRID_LOGDIR) {
795 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
796 }
797 else {
798 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
799 }
800 $logdir.='/' unless $logdir=~m/\/$/;
801 }
802 my $otherflags;
803 $otherflags=' -D' if $GRID_DEBUG;
804 #$otherflags.=' -N' if $GRID_NOWRKLINKS;
805 $otherflags.=' -F' if $GRID_USECMDLIST;
806 $otherflags.=' -S' if $SwitchDir;
807
808 for (1 .. $GRID_NUMPROCS) {
809 die "Error at fork: $!" unless defined ($pid = fork);
810 if ($pid) { # Parent here
811 $SMPChildren{$pid} = 1;
812 $SMPChildren++;
813 next;
814 } else { #Child here
815 # Child can *not* return from this subroutine.
816 #$SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did before
817 exec("$PERL_BIN $0 $otherflags -W $GRID_CMD"); #never returns
818 }
819 }
820 $SIG{INT}=\&smpTaskKiller;
821 $SIG{TERM}=\&smpTaskKiller;
822 return $jobid;
823 }
824
825 sub submitJob_condor {
826 my ($envhash)=@_;
827 my $jobid;
828 my $queue='queue';
829 $queue.=" $GRID_NUMPROCS" if ($GRID_NUMPROCS > 1);
830 #append our GRID_ environment
831 my $dprefix="gridx-$HOST".'_';
832 my $envparam="GRID_ENGINE=condor;".
833 "GRID_JOBDIR=$PWD/$dprefix\$(Cluster);".
834 "GRID_JOB=$HOST\_\$(Cluster);".
835 "GRID_TASKLAST=$GRID_TASKLAST;".
836 "GRID_MONHOME=$GRID_MONHOME;".
837 "GRID_CMD=$GRID_CMD;".
838 "GRID_DIRPREFIX=$GRID_DIRPREFIX";
839 $envparam.=";GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
840 $envparam.=";GRID_PSXFASTA=$GRID_PSXFASTA".
841 ";GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
842 $envparam.= ";GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
843 $envparam.= ";GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
844 $envparam.=';BLASTMAT='.$ENV{BLASTMAT} if $ENV{BLASTMAT};
845 $envparam.= ";GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR/$dprefix\$(Cluster);" if $GRID_LOCAL_JOBDIR;
846 if (keys(%$envhash)>0) {
847 $envparam.=';';
848 my @envars;
849 while (my ($env, $val)= each(%$envhash)) {
850 push(@envars, $env.'='.$val);
851 }
852 $envparam.=join(';',@envars);
853 }
854 my $logdir='';
855 if ($GRID_LOGDIR) {
856 #separate log dir given
857 $logdir=$GRID_LOGDIR;
858 if (-d $GRID_LOGDIR) {
859 print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
860 }
861 else {
862 mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
863 }
864 $logdir.='/' unless $logdir=~m/\/$/;
865 }
866 my $otherflags;
867 $otherflags=' -D' if $GRID_DEBUG;
868 #$otherflags.=' -N' if $GRID_NOWRKLINKS;
869 $otherflags.=' -F' if $GRID_USECMDLIST;
870 $otherflags.=' -S' if $SwitchDir;
871
872 my $mtime=time();
873 my $cmdfile="condor.$$.t$mtime.$USER.$HOST.cmd";
874 local *CMDFILE;
875 open(CMDFILE, '>'.$cmdfile) || die "Cannot create $cmdfile!\n";
876 my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL" || Arch == "x86_64")';
877 #my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL")';
878 my $force_slot;
879 if ($UniqVMreq>0) {
880 $force_slot=$UniqVMreq;
881 }
882 elsif ($UniqueVM) {
883 $force_slot=3+int(rand(9)); #this is messed up, only works on >11 core machines
884 }
885 $requirements .= ' && (VirtualMachineId == '.$force_slot.')' if $force_slot;
886 if (@xmachinelist>0) {
887 #map { $_='Machine != "'.$_.'.'.$DOMAIN.'"' } @xmachinelist;
888 #$requirements.= ' && ('.join(' && ',@xmachinelist).')';
889 if (@xmachinelist==1 && ($xmachinelist[0]=~tr/|^?*\\//)) {
890 $requirements.=' && (TRUE != regexp("'.$xmachinelist[0].'", Machine, "i"))';
891 }
892 else {
893 $requirements.=' && (TRUE != regexp("^('.join('|',@xmachinelist).
894 ')\..*", Machine, "i"))'
895 }
896 }
897 elsif (@ymachinelist>0) {
898 if (@ymachinelist==1 && ($ymachinelist[0]=~tr/|^?*\\//)>0) {
899 $requirements.=' && regexp("'.$ymachinelist[0].'", Machine, "i")';
900 }
901 else {
902 #map { $_='Machine == "'.$_.'.'.$DOMAIN.'"' } @ymachinelist;
903 #$requirements.= ' && ('.join(' || ',@ymachinelist).')';
904 $requirements.=' && regexp("^('.join('|',@ymachinelist).
905 ')\..*", Machine, "i")'
906 }
907 }
908 print CMDFILE qq{universe = vanilla
909 requirements = $requirements
910 notification = Never
911 executable = $0
912 initialdir = $PWD
913 };
914 if ($logdir) {
915 print CMDFILE "error = ${logdir}log_$dprefix\$(Cluster).\$(Process).stderr\n".
916 "output = ${logdir}log_$dprefix\$(Cluster).\$(Process).stdout\n";
917 }
918 print CMDFILE "arguments = $otherflags -W $GRID_CMD\n".
919 "environment = $envparam;\n".
920 "$queue\n";
921 close(CMDFILE);
922 my $subcmd="condor_submit $cmdfile";
923 my $subout = `$subcmd`;
924 ($jobid)=($subout=~/submitted\s+to\s+cluster\s+(\d+)\./s);
925 die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid);
926 $jobid=$HOST.'_'.$jobid;
927 setupJobDir($jobid);
928 #setupJobDir also chdirs() in the $GRID_JOBDIR
929 system("mv ../$cmdfile condor_submit.cmd");
930 return $jobid;
931 }
932
933 sub jobDie {
934 my $jobid=shift @_;
935 print STDERR "Error: ".join("\n",@_)."\n";
936 &$removeJob($jobid);
937 die();
938 }
939
940 sub setupJobDir {
941 my ($jobid)=@_;
942 my $jobdir = "gridx-$jobid";
943 jobDie($jobid, "job directory $jobdir already exists!")
944 if (-d $jobdir);
945 if ($GRID_RESUME) {
946 my $prevjobdir='gridx-'.$GRID_RESUME;
947 print STDERR " ..taking over jobdir: $prevjobdir\n";
948 unlink("$GRID_MONHOME/$prevjobdir") || warn(" couldn't unlink $GRID_MONHOME/$prevjobdir");
949 unlink("$prevjobdir/$F_LASTTASK");
950 unlink("$prevjobdir/$F_ALLDONE");
951 rename("$prevjobdir/$F_ERRTASKS", "$prevjobdir/prev_$F_ERRTASKS");
952 unlink("$prevjobdir/$F_RETRYTASKS");
953 system("/bin/rm -rf $prevjobdir/wrk_*/$F_WRKRUNNING");
954 system("/bin/rm -rf $prevjobdir/.wrk*");
955 system("/bin/rm -rf $prevjobdir/locks");
956 system("/bin/rm -rf $prevjobdir/running");
957 system("mv $prevjobdir $jobdir") &&
958 jobDie($jobid, "cannot 'mv $prevjobdir $jobdir' - $! - Resuming failed!");
959 }
960 else {
961 mkdir($jobdir) || jobDie($jobid, "cannot create subdirectory $jobdir");
962 runCmd("mv $TASKDB $jobdir/taskDb", $jobid);
963 runCmd("mv $TASKDB.cidx $jobdir/taskDb.cidx", $jobid);
964 }
965
966 mkdir("$jobdir/locks") || jobDie($jobid,
967 "cannot create subdirectory $jobdir/locks");
968 mkdir("$jobdir/running") || jobDie($jobid,
969 "cannot create subdirectory $jobdir/running");
970 if ($GRID_MONHOME ne $PWD) {
971 symlink("$PWD/$jobdir", "$GRID_MONHOME/$jobdir")
972 || jobDie($jobid, "cannot symlink $GRID_MONHOME/$jobdir");
973 }
974 #- CHDIR to the new GRID_JOBDIR
975 chdir($jobdir) || jobDie($jobid, "cannot chdir($jobdir) (from $PWD)!");
976 readFile($F_TASKSDONE);
977 my $cmdfile="cmdline-$GRID_TASKLAST.cmd";
978 local *FHND;
979 open(FHND, ">$cmdfile") || jobDie($jobid, "cannot create file $cmdfile\n");
980 print FHND $CMDLINE."\n";
981 close(FHND);
982 open(FHND, ">$F_WRKDIR");
983 print FHND "$PWD/$jobdir\n";
984 close(FHND);
985
986 if ($mailnotify) {
987 open(FHND, ">$F_NOTIFY");
988 print FHND "$mailnotify\n";
989 close(FHND);
990 }
991
992 if ($gridEndCmd) {
993 open(FHND, ">$F_ENDCMD") || die("Error creating file $jobdir/.$F_ENDCMD ($!)\n");
994 print FHND $gridEndCmd."\n";
995 close(FHND);
996 }
997 $GRID_JOBDIR = "$PWD/$jobdir";
998 print STDOUT "Job $jobid scheduled to run with GRID_JOBDIR = $PWD/$jobdir\n";
999 } #setupJobDir
1000
1001
1002 sub jobSummary {
1003 my ($mail) = @_;
1004 #assuming we're in GRID_JOBDIR directory
1005 #(either directly or by $GRID_MONHOME)!
1006 die "Error: cannot locate $F_TASKSDONE and $F_WRKDIR in current directory ($ENV{PWD})!\n"
1007 unless -f $F_TASKSDONE && -f $F_WRKDIR;
1008 my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone);
1009 $tasksdone=0 unless $tasksdone;
1010 my $wrkdir=readFile($F_WRKDIR);chomp($wrkdir);
1011 my ($jobid)=($wrkdir=~m/\/gridx\-(\w[\w\-]+)$/);
1012 die("Error: cannot parse jobid from $F_WRKDIR content ($wrkdir)\n") unless $jobid;
1013 my @cmdfile=<cmdline-*.cmd>;
1014 die "Error getting the cmdline-*.cmd from current directory!\n"
1015 unless @cmdfile;
1016 my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/);
1017 die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0;
1018 open(CMDFILE, $cmdfile[0]);
1019 my $cmdline=<CMDFILE>;
1020 chomp($cmdline);
1021 $cmdline=~s/\t/ /g;
1022 close(CMDFILE);
1023 unlink($F_NOTIFY) if -s $F_NOTIFY;
1024 my ($msg, $subj);
1025 my $sig='';
1026 if ($mail) {
1027 $sig = "\n\n-------------------------\n -= mail sent from $HOST";
1028 $sig.=" (worker $GRID_WORKER)" if $GRID_WORKER;
1029 $sig.=" \nWorking directory: $wrkdir]\n" if $wrkdir;
1030 $sig.=" \nCommand line: \n $cmdline\n" if $cmdline;
1031 $sig.=" =-\n";
1032 }
1033 if ($tasksdone!=$numtasks) {
1034 $msg="Summary of gridx job $jobid: $tasksdone tasks done out of $numtasks\n".
1035 "Check $wrkdir for more details.\n";
1036 $subj="gridx job $jobid (done $tasksdone out of $numtasks)";
1037 }
1038 else{
1039 $msg="gridx job $jobid - done all $numtasks tasks\n";
1040 $subj="gridx job $jobid Done (all $numtasks tasks)";
1041 }
1042 $msg.=$sig;
1043 send_mail( { to=>$mail, subj=>$subj, body=>$msg }) if $mail;
1044 return $msg;
1045 }
1046
1047
1048
1049 ############## WORKER SIDE subroutines #####################
1050
1051 #=================== taskDB handling =================
1052
1053 =head2 taskDbStat (taskdb, taskId [,tstatus, cpu#, host, retries, exitcode])
1054
1055 taskDbStat($taskdb, $taskId, [, $taskstatus, $CPUno,
1056 $host, $retrycount, $exitcode])
1057
1058 gets/sets the status of a task in an existing taskdb file
1059
1060 If only $taskdb and $taskId parameters are given it works
1061 as a getter and returns the whole $taskdb entry, either as a raw
1062 string or, if wantarray(), as:
1063
1064 ($taskstatus, $userdata, $CPUno, $host, $lastexitcode, $startTime, $retrycount)
1065
1066 ..where $startTime is in minutes since the epoch (time/60)
1067
1068 If more than 2 parameters are given, it is a setter for the task status
1069 and the other task related data.
1070
1071 Valid status values:
1072 '-' = queued/idle/unprocessed
1073 'r' = running (could be a retry)
1074 '.' = finished successfully
1075 'E' = finished with error ending (after max retries)
1076
1077 Internal details:
1078 -Each record is assumed locked at the time of writing
1079 (no other processes are trying to update the same task record).
1080 -A taskdb record format is:
1081
1082 >taskId\t{S|R|xxxx|dddd|mmmmmmmm}\t{hostname}[\t<additional data..>]
1083
1084 where:
1085 S = running status ('-','r','.' or 'E')
1086 R = retry counter (0-9)
1087 xxxx = last exit code, in hex
1088 dddd = last CPU number, in hex (the wrk_<CPU> subdirectory)
1089 mmmmmmmm = start minute since the epoch (time/60) in hexadecimal
1090 hostname = fixed 25 char length machine name
1091 (blank padded as needed)
1092
1093 Error protocol for the setter: when given $status is 'E'
1094 the retry counter is incremented and the actual status
1095 would be updated to 'E' if > $maxRetries, or back to '-' otherwise.
1096
1097 =cut
1098
1099 sub taskDbStat {
1100 my ($taskdb, $entry, $status, $dirno, $machine, $errcount, $exitcode)=@_;
1101 my $taskdbidx=$taskdb;
1102 $taskdbidx.='.cidx' unless ($taskdb=~s/\.cidx$//);
1103 my $tdberr="taskDbStat(".join(',',@_).") Error:";
1104 wrkDie("$tdberr db $taskdb (and/or index) not found!".
1105 "(pwd = $ENV{PWD})") unless (-e $taskdb && -e $taskdbidx);
1106 wrkDie("$tdberr no entry given!") unless $entry;
1107 local *TASKDB;
1108 my $dbpos= `cdbyank -a '$entry' -P $taskdbidx`;
1109 chomp($dbpos);
1110 wrkDie("$tdberr at retrieving pos of entry $entry\n")
1111 if ($? || $dbpos!~/^\d+$/);
1112
1113 my $openmode = O_RDONLY | O_SYNC ;
1114 if ($status) { # setter code:
1115 wrkDie("$tdberr invalid update parameters ($status)")
1116 if (length($status)>1 || ($dirno && ($dirno>65535 || $dirno<1)));
1117 $openmode= O_RDWR | O_SYNC; # | O_DIRECT might be needed ?
1118 }
1119 sysopen(TASKDB, $taskdb, $openmode) ||
1120 wrkDie("$tdberr sysopening $taskdb failed!");
1121 binmode(TASKDB);
1122 unless (sysseek(TASKDB, $dbpos, SEEK_SET)) {
1123 close(TASKDB);
1124 wrkDie("$tdberr at sysseek() to $dbpos for $entry");
1125 }
1126 #----- read the next line and check the format
1127 my $targetstr='>'.$entry."\t{-|-|----|----|--------}\t{".('-' x 25)."}";
1128 local $/="\n";
1129 my $dbline=readline(*TASKDB);
1130 binmode(TASKDB);
1131 my ($pentry, $pstats, $pmachine, $userdata)=
1132 split(/\t/,$dbline,4);
1133 chomp($pmachine);chomp($userdata);
1134 my $tcheck=join("\t",$pentry,$pstats,$pmachine);
1135 if ($pentry ne '>'.$entry ||
1136 length($tcheck)!=length($targetstr)) {
1137 close(TASKDB);
1138 wrkDie("$tdberr invalid record format for '$entry' ".
1139 "pos $dbpos, Found:\n'$tcheck'\n ..instead of:\n'$targetstr'\n");
1140 }
1141
1142 $pmachine=~tr/{} //d;
1143 $pstats=~tr/{}//d;
1144 my ($pstatus, $pfcount, $pxcode, $pdirno, $ptime)=split(/\|/,$pstats);
1145 $ptime=hex($ptime) || $ptime;
1146 $pfcount=0 unless $pfcount>0;
1147 #----
1148 if ($status) { #---- setter code:
1149 $status=lc($status);
1150 if ($status!~/^[\-r\.e]$/) {
1151 close(TASKDB);
1152 wrkDie("$tdberr invalid status provided ($status)!");
1153 }
1154 $dbpos+=length(">$entry\t{");
1155 sysseek(TASKDB, $dbpos, SEEK_SET);
1156 if (defined($exitcode)) {
1157 $pxcode = $exitcode>0 ? sprintf('%04x', $exitcode) : $pxcode;
1158 }
1159
1160 $pfcount=$errcount if defined($errcount);
1161 if ($status eq 'r') { #mark this entry as "running"
1162 $ptime=sprintf('%08x',int(time()/60));
1163 $pxcode='----';
1164 }
1165 elsif ($status eq '-') { # mark this entry as "available" (idle)
1166 $ptime='--------';
1167 }
1168 $pmachine=$machine if $machine;
1169 $pmachine=sprintf('%25s',$pmachine);
1170 $dirno=$pdirno unless $dirno>=1;
1171 my $wstr=$status.'|'.int($pfcount).
1172 '|'.$pxcode.
1173 '|'.sprintf('%04x', $dirno).
1174 '|'.$ptime."}\t{".$pmachine.'}';
1175 if (length($ptime)>8) {
1176 die("Error writing into taskDb record, ptime='$ptime' is too long when writing:\n".
1177 "$wstr\n");
1178 }
1179 my $wlen=length($wstr);
1180 my $w=syswrite(TASKDB, $wstr, $wlen);
1181 binmode(TASKDB); # 'cause this flushes any pending I/O buffers
1182 if ($w!=$wlen) {
1183 close(TASKDB);
1184 wrkDie("$tdberr failed writing '$wstr' for $entry!");
1185 }
1186 close(TASKDB);
1187 return 1;
1188 } # setter
1189 else { # getter code
1190 close(TASKDB);
1191 if (wantarray()) { #retrieve the parsed list of values
1192 return ($pstatus, $userdata, hex($pdirno), $pmachine, int($pfcount),
1193 hex($pxcode), $ptime);
1194 }
1195 else { #return the raw taskDb line
1196 return($dbline);
1197 }
1198 } # getter
1199 }
1200
1201
1202 sub gridWorkerEnv {
1203 if ($_[0]) {
1204 $GRID_TASK=$_[0];
1205 $ENV{GRID_TASK}=$_[0];
1206 }
1207 return if $GRID_ENVSET;
1208 my $gridengine=lc($ENV{GRID_ENGINE});
1209 ( $GRID_JOBDIR, $GRID_TASKLAST, $GRID_RESUME, $GRID_MONHOME, $GRID_LOCAL_JOBDIR,
1210 $GRID_PSXFASTA, $GRID_PSXSTEP, $GRID_PSXSKIP, $GRID_PSXTOTAL, $GRID_DIRPREFIX, )=
1211 @ENV{'GRID_JOBDIR','GRID_TASKLAST','GRID_RESUME','GRID_MONHOME', 'GRID_LOCAL_JOBDIR',
1212 'GRID_PSXFASTA','GRID_PSXSTEP','GRID_PSXSKIP','GRID_PSXTOTAL', 'GRID_DIRPREFIX'};
1213 if ($gridengine eq 'sge') {
1214 #can't have dynamic environment variables in SGE
1215 $GRID_JOB=$ENV{JOB_ID};
1216 #only static ones have been prepared (GRID_ENGINE, GRID_TASKLAST)
1217 # with GRID_JOBDIR initially set to the submit working directory
1218 $GRID_JOBDIR.='/gridx-'.$GRID_JOB;
1219
1220 $GRID_LOCAL_JOBDIR.='/gridx-'.$GRID_JOB if $GRID_LOCAL_JOBDIR;
1221 $GRID_PSXFASTA=$ENV{GRID_PSXFASTA};
1222 $GRID_PSXSTEP=$ENV{GRID_PSXSTEP};
1223 #dynamic ones are built now from SGE ones
1224 $ENV{GRID_JOBDIR}=$GRID_JOBDIR;
1225 $ENV{GRID_JOB}=$GRID_JOB;
1226 $ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR;
1227 }
1228 elsif ($gridengine eq 'condor' || $gridengine eq 'smp') {
1229 #condor should have all the environment in order
1230 $GRID_JOB=$ENV{GRID_JOB};
1231 }
1232 else {
1233 die("Error: Invalid GRID_ENGINE (Is this a valid worker run?)\n");
1234 }
1235 $GRID_LOCKDIR=$GRID_JOBDIR.'/locks';
1236 $GRID_ENVSET=1;
1237 }
1238
1239 sub beginWorker {
1240 gridWorkerEnv(); #setup the worker environment
1241 my $maxretries=5; #just give some time for the submit script to catch up
1242 my $chdir=0; # (only if execution of a submitted task is extraordinarily fast)
1243 my $retries=0;
1244 while (!($chdir=chdir($GRID_JOBDIR))) {
1245 sleep(1);
1246 $retries++;
1247 last if $retries==$maxretries;
1248 }
1249 die("Worker error: cannot chdir to $GRID_JOBDIR") unless $chdir;
1250 # -- we are in GRID_JOBDIR now!
1251 my $errmsg="Worker PID $$ ($GRID_WORKER) aborted on $HOST..\n";
1252 my $fh=setXLock($F_WRKSTART,120) || die $errmsg; #update the count of started workers
1253 $GRID_WORKER=incFValue($fh, $F_WRKSTART);
1254 endXLock($fh);
1255 print STDERR "D: worker $GRID_WORKER assigned to host $HOST pid $$ at ".getTime()."\n" if $GRID_DEBUG;
1256 $GRID_WRKDIR=sprintf("wrk_%04d",$GRID_WORKER);
1257 $starting_dir=$GRID_JOBDIR;
1258 $starting_dir=~s/[^\/]+\/?$//;
1259 if ($GRID_LOCAL_JOBDIR) {
1260 print STDERR "D: creating local dir on $HOST: $GRID_LOCAL_JOBDIR/$GRID_WRKDIR\n" if $GRID_DEBUG;
1261 mkdir("$GRID_LOCAL_JOBDIR") unless -d $GRID_LOCAL_JOBDIR;
1262 system("/bin/rm -f $GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1263 mkdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR") unless -d $GRID_LOCAL_JOBDIR;
1264 wrkDie("Error: couldn't create local worker directory $GRID_LOCAL_JOBDIR/$GRID_WRKDIR on $HOST!\n")
1265 unless (-d "$GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1266 }
1267 #-- also updates the "currently running" counter
1268 unless (-d "$GRID_JOBDIR/$GRID_WRKDIR") { #worker directory doesn't exit
1269 unless (mkdir("$GRID_JOBDIR/$GRID_WRKDIR")) {
1270 die "Error at mkdir $GRID_JOBDIR/$GRID_WRKDIR ($!)!\n";
1271 }
1272 }
1273 else { #existing working directory
1274 my $frunning= "$GRID_WRKDIR/$F_WRKRUNNING";
1275 if (-f $frunning) { #weird -- should never happen..
1276 die "Error: another process running in $GRID_WRKDIR?!\n".`cat $frunning`."$errmsg\n";
1277 }
1278 else { #normal case: no worker-running semaphore there already
1279 local *RSEM;
1280 open(RSEM, '>'.$frunning) || die "Error creating $frunning file! ($!)\n$errmsg";
1281 print RSEM join(" ",int(time()/60), $HOST, $$)."\n";
1282 close(RSEM);
1283 }
1284 }
1285 if ($GRID_DIRPREFIX) {
1286 my $gwrkdir="../$GRID_DIRPREFIX".'_'.$GRID_WORKER;
1287 unlink($gwrkdir);
1288 symlink("gridx-$GRID_JOB/$GRID_WRKDIR", $gwrkdir) ||
1289 print STDERR "Warning: cannot symlink gridx-$GRID_JOB/$GRID_WRKDIR to $gwrkdir ($!)\n";
1290 #needed by PSX emulation
1291 }
1292 # we are in GRID_JOBDIR now - descend into wrk_<GRID_WORKER>
1293 $fh=setXLock($F_WRKCOUNT,70,3) || die $errmsg; # update the count of running workers
1294 incFValue($fh, $F_WRKCOUNT);
1295 endXLock($fh);
1296 chdir("$GRID_JOBDIR/$GRID_WRKDIR") ||
1297 die "Error at chdir($GRID_JOBDIR/$GRID_WRKDIR) ($!)\n";
1298
1299 open(STDERR, ">wrk_err.log");
1300 print STDERR "worker $GRID_WORKER fully assigned to host $HOST PID $$\n" if $GRID_DEBUG;
1301 open(STDOUT, ">wrk_log.log");
1302 local *WRKGRAB;
1303 open(WRKGRAB, ">.on_$HOST");
1304 print WRKGRAB join("\t",$GRID_WRKDIR, $HOST, $$, 'start: '.getTime())."\n";
1305 close(WRKGRAB);
1306 $ENV{GRID_WORKER}=$GRID_WORKER;
1307 if ($SwitchDir) {
1308 chdir($starting_dir) || die "Error at chdir($starting_dir)!\n";
1309 }
1310 elsif ($GRID_LOCAL_JOBDIR) {
1311 chdir($GRID_LOCAL_JOBDIR/$GRID_WRKDIR) || die "Error at chdir($starting_dir)!\n";
1312 }
1313 }
1314
1315 sub endWorker {
1316 return -1 unless $GRID_WORKER && $GRID_WRKDIR;
1317 #make sure we are back in the wrk_<GRID_WORKER> directory
1318 unless (chdir("$GRID_JOBDIR/$GRID_WRKDIR")) {
1319 die("ERROR: endWorker() could not change to $GRID_JOBDIR/$GRID_WRKDIR\n");
1320 }
1321
1322 my $fh=setXLock($F_WRKCOUNT,70, 3) || die "Error updating the number of running workers!\n";
1323 # my $v=readFile($fh,0,$F_WRKCOUNT);chomp($v);
1324 my $v=incFValue($fh, $F_WRKCOUNT, -1);
1325 unlink($F_WRKRUNNING); #remove the "worker here" semaphore..
1326 if ($v<0) {
1327 endXLock($fh);
1328 die("Error: invalid number of workers ($v) reported in $GRID_JOBDIR/$F_WRKCOUNT\n")
1329 }
1330 endXLock($fh);
1331 print STDERR join("\t","D. worker $GRID_WRKDIR", $HOST, $$, 'finished at:',getTime())."\n" if $GRID_DEBUG;
1332
1333 local *WRKGRAB;
1334 sysopen(WRKGRAB, ".on_$HOST", O_RDWR | O_SYNC );
1335 sysseek(WRKGRAB,-1,SEEK_END);
1336 print WRKGRAB "\tend: ".getTime()."\n";
1337 close(WRKGRAB);
1338 if ($GRID_LOCAL_JOBDIR) {
1339 my $r=system("cp -pr $GRID_LOCAL_JOBDIR/$GRID_WRKDIR/* $GRID_JOBDIR/$GRID_WRKDIR/");
1340 if ($r) {
1341 print STDERR "Error at copying $HOST local files back to $GRID_JOBDIR/$GRID_WRKDIR!\n";
1342 }
1343 system("/bin/rm -rf $GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1344 rmdir($GRID_LOCAL_JOBDIR); #it'll fail if there are other subdirs there, but that's OK
1345 }
1346 undef($GRID_WORKER);
1347 undef($GRID_WRKDIR);
1348 return $v; #returns the number of workers left running
1349 }
1350
1351 sub writeFValue {
1352 my ($fh, $v)=@_;
1353 truncate($fh, 0);
1354 seek($fh,0,SEEK_SET);
1355 $v=int($v);
1356 print $fh $v."\n";
1357 return $v;
1358 }
1359
1360 sub writeFList { #with truncate
1361 my ($fh, $listref, $delim)=@_;
1362 $delim='' unless $delim;
1363 seek($fh,0,SEEK_SET);
1364 truncate($fh,0);
1365 print $fh join($delim,@$listref);
1366 #truncate($fh, tell($fh));
1367 }
1368 #
1369 # incFValue(fhandle) => reads an int value from fhandle
1370 # increments it, writes it back
1371 # and returns it
1372 #
1373 sub incFValue {
1374 my ($fh, $finfo, $incv)=@_;
1375 seek($fh, 0, SEEK_SET);
1376 $incv=1 unless $incv;
1377 my $v=readFile($fh, 0, "incFValue(,,$incv) in $finfo");
1378 chomp($v);
1379 return writeFValue($fh, int($v)+$incv);
1380 }
1381
1382
1383 #******************** worker side:
1384 # getNextTask() -- returns a taskId for the next task to be processed
1385 # or 0 if no more
1386 # -ignore any entries which are already "done"
1387 # -look first into the "retry pool" ($F_RETRYTASKS) to get some tasks from there,
1388 # if any -- and removes that entry
1389 #--------------- resources used:
1390 # update $F_RETRYTASKS
1391 # update $F_LASTTASK
1392 # update ENV{GRID_TASK} and $GRID_TASK
1393 #
1394 sub getNextTask {
1395 #check for any tasks in the "retry" queue
1396 my ($taskID, $retries);
1397 my $sourcemsg;
1398 SKIP_DONE:
1399 $retries=0;
1400 if (-s "$GRID_JOBDIR/$F_RETRYTASKS") {
1401 my $hr=setXLock($F_RETRYTASKS,120,5)
1402 || wrkDie("Error locking $F_RETRYTASKS ($HOST, $GRID_WORKER)");
1403 my @errstack=readFile($hr,0,$F_RETRYTASKS); #format: taskid <space> #retries
1404 ($taskID, $retries)=split(/\s+/, shift(@errstack)); #fetch last
1405 if ($taskID>0) { #valid taskID to retry
1406 chomp($retries);
1407 writeFList($hr, \@errstack); #write the list back
1408 $sourcemsg=' from the retry pool.';
1409 }
1410 endXLock($hr);
1411 }
1412 NEXT_TASK:
1413 unless ($taskID) { #getting the next available task
1414 #no retry tasks, so get the next task not processed yet
1415 my $fh=setXLock($F_LASTTASK, 70, 3) ||
1416 wrkDie("Error locking $F_LASTTASK ($HOST, $GRID_WORKER)"); #30 /retries
1417 my $last=readFile($fh, 0, $F_LASTTASK);chomp($last);
1418 if ($last<$GRID_TASKLAST) { # valid one, take the next
1419 $taskID=writeFValue($fh,int($last)+1);
1420 }
1421 $sourcemsg='';
1422 endXLock($fh);
1423 }
1424 return undef unless $taskID;
1425 $GRID_TASK=$taskID;
1426 # lock this taskID
1427 $TASK_LOCKF="$GRID_JOBDIR/running/task_$taskID";
1428 #$TASK_LOCKF="running/task_$taskID";
1429 catchSigs(1); #install signal handler
1430 $TASK_LOCKH=setXLock($TASK_LOCKF, 70, 3);
1431 unless ($TASK_LOCKH) { #this SHOULD be available immediately..
1432 my $lockedby=`cat $TASK_LOCKF`;chomp($lockedby);
1433 print STDERR "WARNING: lock-fail on task $taskID $sourcemsg (previously locked in $TASK_LOCKF by [$lockedby])\n";
1434 #wrkDie("Error: couldn't get a lock on task $taskID..\n");
1435 undef($taskID);
1436 goto SKIP_DONE; # don't kill the worker, just move on
1437 }
1438 print $TASK_LOCKH "$HOST $$ ".sprintf("wrk_%04d",$GRID_WORKER)."\n";
1439 # check the status of this task:
1440 my ($tstatus, $tuserdata, $tdirno, $thost, $terrcount,
1441 $texcode, $tstartmin)=taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID);
1442 print STDERR ">task-$taskID assigned to worker $GRID_WORKER (on $HOST) $sourcemsg\n";
1443 if ($GRID_RESUME && $tstatus eq '.') {
1444 #skip this one, it's finished (according to taskDb!)
1445 print STDERR ">SKIP-done:$taskID \{$tstatus|$terrcount|$texcode|$tdirno|$tstartmin\}\t\{$thost\}\n";
1446 undef $taskID;
1447 undef $GRID_TASK;
1448 endXLock($TASK_LOCKH);
1449 catchSigs(0);
1450 unlink($TASK_LOCKF);
1451 undef($TASK_LOCKH);undef($TASK_LOCKF);
1452 goto SKIP_DONE;
1453 }
1454 #update status of this task to 'running'
1455 $TASK_ERRCOUNT=$retries;
1456 taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID, 'r', $GRID_WORKER, $HOST, $TASK_ERRCOUNT);
1457 #--
1458 $GRID_TASK=$taskID;
1459 $TASK_DATA=$tuserdata;
1460 $ENV{'GRID_TASK'}=$taskID;
1461 $STARTED_GRID_TASK=$taskID;
1462 return $taskID;
1463 }
1464
1465 ##############################################
1466 # runTask($taskID, @cmd)
1467 #------------------------
1468 # *runs into a ./wrk_NNNN subdirectory of GRID_JOBDIR
1469 # *employs $GRID_TASK, $TASK_DATA, $TASK_ERRCOUNT
1470 # *with GRID_PSXFASTA, it uses TASK_DATA and other GRID_PSX..
1471 # to prepare the fasta slice and pass it to the cmd
1472 # *on exit, it should:
1473 # - set a lock on the ../running/task_$taskID file
1474 # - IF error exit of cmd (non-zero exit status), use $TASK_ERRCOUNT+1 and $MAX_RETRIES
1475 # to determine if the job should be put in the $F_RETRYTASKS file
1476 # or if status should be set to 'E' in taskDb and the entry added
1477 # to $F_ERRTASKS
1478 # - IF successful exit : increment $F_TASKSDONE
1479 # - update taskDb with the current status
1480 # - remove the lock on ../running/task_$taskID and delete this file!
1481 #
1482 #---------------------------------------------
1483 sub runTask {
1484 my $taskID=shift(@_);
1485 my @cmd=@_;
1486 my $exitstatus;
1487 my $runcmd; #the actuall command run using system();
1488
1489 catchSigs(1); #install signal handler (should have been done already in getNextTask())
1490 if ($GRID_LOCAL_JOBDIR) {
1491 wrkDie("Fatal: cannot chdir to local dir $GRID_LOCAL_JOBDIR/$GRID_WRKDIR!")
1492 unless chdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1493 }
1494
1495 #if ($SwitchDir) {
1496 # we are currently in a wrk_* directory
1497 # chdir('../..'); #change to the original directory (where gridx was launched from)
1498 # }
1499 if ($GRID_PSXFASTA) { #psx emulation
1500 #prepare the fasta slice here
1501 my $fslice=sprintf('%s.slice-%08d',getFName($GRID_PSXFASTA), $GRID_TASK);
1502 #-- write the slicefile
1503 local *FDB;
1504 local *FSL;
1505 open(FSL, '>'.$fslice)
1506 || wrkDie("Cannot create fasta slice $fslice");
1507 open(FDB, $GRID_PSXFASTA) || wrkDie("Cannot open fasta db $GRID_PSXFASTA");
1508 seek(FDB, $TASK_DATA, SEEK_SET);
1509 local $/="\n";
1510 my $seqnext=$GRID_PSXSTEP*($GRID_TASK-1); #+GRID_PSXSKIP
1511 my $seqcount=0;
1512 my $seqmax=($GRID_PSXTOTAL<=0) ? 0 : $GRID_PSXTOTAL;
1513 while (<FDB>) {
1514 if (/^>/) { #record start
1515 $seqnext++;
1516 last if ($seqcount>=$GRID_PSXSTEP);
1517 last if $seqmax && ($seqnext>$seqmax);
1518 $seqcount++;
1519 }
1520 print FSL $_;
1521 }#while input line from fastadb
1522 close(FSL);
1523 close(FDB);
1524
1525 $runcmd=shift(@cmd);
1526 $GRID_PSXSKIP=0 unless ($GRID_PSXSKIP);
1527 $GRID_PSXTOTAL=-1 unless ($GRID_PSXTOTAL>0);
1528 my $islast=($GRID_TASK==$GRID_TASKLAST) ? 1 : 0;
1529 # 1 2 3 4
1530 $runcmd.=' '.join(' ',$fslice, $seqcount, $GRID_TASK, $islast,
1531 # 5 6 7
1532 $GRID_PSXSKIP, $GRID_PSXTOTAL, @cmd);
1533
1534 }
1535 elsif ($GRID_USECMDLIST) {
1536 # $TASK_DATA is the actual command to run
1537 $runcmd=$TASK_DATA;
1538 }
1539 else { #normal repeat cmd -- let the cmd use the ENV accordingly
1540 $runcmd=join(" ",@cmd);
1541 }
1542
1543 #... get $exitstatus for the system() call
1544 print STDERR ">starting-task-$GRID_TASK by worker $GRID_WORKER (on $HOST): '$runcmd'\n" if $GRID_DEBUG;
1545 $exitstatus=system($runcmd);
1546 if ($SwitchDir) {
1547 chdir("$GRID_JOBDIR/$GRID_WRKDIR");
1548 }
1549 endTask($exitstatus); #taskID is taken from $GRID_TASK
1550 }
1551
1552
1553 sub catchSigs { # true/false
1554 if ($_[0]) {
1555 $SIG{INT}=\&sigHandler;
1556 $SIG{TERM}=\&sigHandler;
1557 }
1558 else {
1559 $SIG{INT}='DEFAULT';
1560 $SIG{TERM}='DEFAULT';
1561 }
1562 }
1563
1564 sub sigHandler {
1565 my $signame=shift;
1566 wrkDie("Signal $signame caught for worker $$ on $HOST, aborting..\n");
1567 }
1568
1569 sub toRetry {
1570 my ($taskID, $taskErrCount)=@_;
1571 return unless $taskID;
1572 $taskErrCount=1 unless $taskErrCount;
1573 my $hr=setXLock($F_RETRYTASKS, 70, 3);
1574 while (<$hr>) {
1575 chomp;
1576 my ($tid, $tec)=split(/\s+/);
1577 if ($tid==$taskID) { #double retry, don't bother
1578 endXLock($hr);
1579 return;
1580 }
1581 }
1582 fappend($hr, "$taskID\t$taskErrCount\n");
1583 endXLock($hr);
1584 }
1585
1586 sub endTask {
1587 return unless $GRID_TASK;
1588 # we MUST be in GRID_WRKDIR
1589 chdir("$GRID_JOBDIR/$GRID_WRKDIR") || die("Error: failed to chdir to $GRID_JOBDIR/$GRID_WRKDIR!");
1590 my ($exitstatus)=@_;
1591
1592 if (!$exitstatus && !$STARTED_GRID_TASK) { # could be a premature failure, like cdbyank not found, etc.
1593 print STDERR "scheduling $GRID_TASK for retry..\n";
1594 toRetry($GRID_TASK, $TASK_ERRCOUNT);
1595 endXLock($TASK_LOCKH) if $TASK_LOCKH;
1596 unlink($TASK_LOCKF);
1597 undef($TASK_LOCKH);undef($TASK_LOCKF);
1598 unlink($F_WRKRUNNING);
1599 catchSigs(0);
1600 undef $GRID_TASK;
1601 return;
1602 }
1603 my $dbstatus;
1604 if (defined($exitstatus)) {
1605 if ($exitstatus==0) { #success
1606 $dbstatus='.';
1607 # update $F_TASKSDONE
1608 my $fh=setXLock($F_TASKSDONE, 110, 5);
1609 if ($fh) {
1610 incFValue($fh, $F_TASKSDONE);
1611 endXLock($fh);
1612 }
1613 }
1614 else { #error status
1615 $TASK_ERRCOUNT++;
1616 print STDERR "task $GRID_TASK failed on $HOST (PID=$$, status='$exitstatus') (worker $GRID_WORKER); error count=$TASK_ERRCOUNT\n" if $GRID_DEBUG;
1617 if ($TASK_ERRCOUNT>$MAX_RETRIES) { #trash it
1618 my $he=setXLock($F_ERRTASKS, 70, 3);
1619 fappend($he, join("\t",$GRID_TASK,$exitstatus,$HOST,$GRID_WRKDIR)."\n");
1620 endXLock($he);
1621 $dbstatus='E';
1622 }
1623 else {#give it another retry chance
1624 toRetry($GRID_TASK, $TASK_ERRCOUNT);
1625 $dbstatus='-';
1626 }
1627 }
1628 endXLock($TASK_LOCKH) if $TASK_LOCKH;
1629 unlink($TASK_LOCKF) if $TASK_LOCKF;
1630 undef($TASK_LOCKH);undef($TASK_LOCKF);
1631 print STDERR "]task $GRID_TASK ended - updating taskDb with status code '$dbstatus'\n" if $GRID_DEBUG;
1632 taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $GRID_TASK, $dbstatus,
1633 $GRID_WORKER, $HOST, $TASK_ERRCOUNT, $exitstatus);
1634 } # defined $exitstatus
1635 else { #no exitstatus given, it was an interrupt signal or otherwise failed task
1636 print STDERR "task $GRID_TASK ended with undefined exit status\n" if $GRID_DEBUG;
1637 toRetry($GRID_TASK, $TASK_ERRCOUNT);
1638 endXLock($TASK_LOCKH) if $TASK_LOCKH;
1639 unlink($TASK_LOCKF) if $TASK_LOCKF;
1640 undef($TASK_LOCKH);undef($TASK_LOCKF);
1641 unlink($F_WRKRUNNING);
1642 }
1643 catchSigs(0);
1644 undef($STARTED_GRID_TASK);
1645 }
1646
1647 sub wrkDie {
1648 my ($msg)=@_;
1649 endTask();
1650 my $grdwrk=$GRID_WORKER;
1651 endWorker();
1652 #remove any other locks left in this worker..
1653 die("Error at worker $grdwrk on $HOST:\n$msg\n");
1654 }
1655
1656 #--onExit, onEnd trigger
1657 sub END {
1658 unless ($NORMAL_ENDING) {
1659 $NORMAL_ENDING=1; #to avoid recursion?
1660 endTask();
1661 endWorker();
1662 #remove all locks
1663 my @locks=keys(%Locks);
1664 foreach my $lock (@locks) {
1665 my $d=$Locks{$lock};
1666 endXLock($lock);
1667 }
1668 }
1669 }
1670
1671 sub fappend {
1672 my $fh=shift(@_);
1673 seek($fh,0,SEEK_END);
1674 print $fh join("\n",@_);
1675 }
1676
1677 sub getLockDir {
1678 my $fname=shift;
1679 my $basename=basename($fname);
1680 my $dirname=dirname($fname);
1681 my $lockdir="$basename";
1682 $lockdir=$dirname.'/'.$lockdir if $dirname;
1683 return ($lockdir, "by.$HOST.$$");
1684 }
1685
1686 sub makeLockFile {
1687 #this is called by a node just before a file lock is requested
1688 my ($fname)=@_; #MUST be a path relative to GRID_JOBDIR
1689 if (index($fname,$GRID_JOBDIR.'/')==0) {
1690 #remove the full path, if there
1691 $fname=substr($fname, length($GRID_JOBDIR)+1);
1692 }
1693 my $fnlock=$fname;
1694 $fnlock=~tr/\/\\/--/s;
1695 my $nodelockf="$GRID_LOCKDIR/$fnlock-lock.by.$HOST.$$";
1696 #--
1697 $nodelockf.='-'.substr(time(),-4);
1698 #--
1699 my $fh;
1700 open($fh, '>'.$nodelockf) || die("Error creating lockfile $nodelockf!\n");
1701 my $thismin=int(time()/60); # time when the lock was first initiated
1702 print $fh "$HOST $$ $thismin $GRID_WORKER\n";
1703 close($fh);
1704 return ($GRID_LOCKDIR.'/'.$fnlock, $nodelockf);
1705 }
1706
1707 sub getFileLock {
1708 #attempts to make $lockfile a hard link to $nodefile which is host/process dependent
1709 my ($lockfile, $nodefile)=@_;
1710 if (link($nodefile, $lockfile)) { #could create link to node lock file
1711 my @stat=stat($nodefile);
1712 my $linkcount=$stat[3];
1713 if ($linkcount>2) {
1714 print STDERR "WARNING: weird lnkcount=$linkcount ($GRID_WORKER, $HOST, task $GRID_TASK)!\n";
1715 }
1716 return ($linkcount>1); #should never be more than 2..
1717 }
1718 else { #cannot create link
1719 return undef;
1720 }
1721 }
1722
1723 #attempts to aquire an exclusive lock on a specific file
1724 #-returns a file handle ref
1725 sub setXLock {
1726 my ($fname, $maxretries, $stalemin)=@_;
1727 $maxretries=80 unless $maxretries;
1728 #
1729 # -- default: locks older than this are
1730 # considered "stale" and removed!
1731 $stalemin=7200 unless $stalemin;
1732 my ($lockfile, $nodefile)=makeLockFile($fname);
1733 my $retries=0;
1734 my $startmin=int(time()/60); #the minute we started trying
1735 #----------- try mkdir
1736 my ($currentLocker, $lockage);
1737 my $lock_age;
1738 my $prevlock;
1739 my $haveLock;
1740
1741 while (!($haveLock=getFileLock($lockfile, $nodefile))) {
1742 if ($retries>$maxretries) {
1743 #tried too many times?
1744 #check for stale lock forgotten here..
1745 if (-f $lockfile) {
1746 #anyone ELSE holding it?
1747 my $l=readFile($lockfile);
1748 my ($lhost, $lpid, $ltime, $worker)=split(/\s+/,$l);
1749 $prevlock="$lhost (pid $lpid, worker $worker)" if $lpid;
1750 # code to check for a stale lock:
1751 $lock_age=int(time()/60)-$ltime;
1752 if ($lock_age>$stalemin) { #previous lock is older than $stalemin minutes
1753 print STDERR "WARNING: removing stale $fname lock from $lhost PID $lpid (worker $worker), age $lockage minutes.\n";
1754 unlink($lockfile);
1755 next; # hopefully we'll get it next time, unless another node steals it..
1756 }
1757 }
1758 last; #too many retries;
1759 }
1760 $retries++;
1761 sleep(3); #pause 3 seconds between attempts
1762 } #----- while failing at getting a lock
1763 if ($haveLock) {
1764 local *FH;
1765 open(FH, ">$nodefile")
1766 || die "Error re-creating host lock file $nodefile ?! ($!)\n"; #should never happen, really
1767 my $thismin= int(time()/60);
1768 print FH "$HOST $$ $thismin $GRID_WORKER\n";
1769 #print FH "$HOST $$ $thismin $GRID_WORKER ".getTime()."\n";
1770 close(FH);
1771 # -- now it's safe to open the actual file being locked..
1772 # has to be a path relative to $GRID_JOBDIR
1773 $fname="$GRID_JOBDIR/$fname" unless $fname=~m/^[\/\\]/;
1774 # --
1775 # -- add O_DIRECT only if you notice syncing/flushing issues for small files
1776 # --
1777 #my $mode= O_RDWR | O_CREAT | O_SYNC | O_DIRECT;
1778 #my $mode=(-f $fname) ? '+<' : '+>'; #create file if not there..
1779 my $fh;
1780 #open($fh, $mode.$fname) || die "setXLock($fname) failed at open($mode.$fname): $!\n";
1781 sysopen($fh, $fname, $sysopen_mode) || die "setXLock($fname) failed at open ($fname): ($!)\n";
1782 $Locks{$fh}=[$lockfile, $nodefile];
1783 return $fh;
1784 }
1785 else {
1786 print STDERR "ERROR getting a lock on $fname ($lockfile -> $nodefile) after $retries attempts!\n";
1787 print STDERR " (blocked by: $prevlock of $lock_age minutes)\n" if $prevlock;
1788 unlink($nodefile);
1789 return undef;
1790 }
1791 }
1792
1793 sub endXLock {
1794 my $fh=shift(@_);
1795 close($fh);
1796 my $d = delete($Locks{$fh});
1797 unless ($d) {
1798 print STDERR "WARNING at endXLock(): no Locks entry found for file handle $fh!\n";
1799 return;
1800 }
1801 my ($locklink, $nodefile)=@$d;
1802 unlink($locklink);
1803 #-- keep these around for debugging purposes:
1804 unlink($nodefile);
1805 }
1806
1807 # readFile(fname/fglobref) : read first or all lines from a given file
1808 # or filehandle glob reference
1809 sub readFile {
1810 my ($f, $nonfatal, $context)=@_;
1811 $context=" ($context)" if $context;
1812 my ($fh, $open);
1813 if (ref($f) eq 'GLOB') { # file handle or glob reference..
1814 $fh=$f;
1815 $f='[fh]';
1816 }
1817 else { #scalar: string = filename
1818 #create if not there!
1819 my $mode=(-f $f) ? '+<' : '+>'; #create if not exists
1820 my $canopen=open($fh, $mode.$f);
1821 unless ($canopen) {
1822 return undef if $nonfatal;
1823 die "readFile($mode $f)$context task $GRID_TASK on $HOST, open error: $!\n";
1824 }
1825 $open=1;
1826 }
1827 local $/="\n";
1828 if (wantarray()) {
1829 my @r=<$fh>;
1830 close($fh) if $open;
1831 return @r;
1832 }
1833 else { #first line only
1834 my $line=<$fh> || '';
1835 close($fh) if $open;
1836 return $line;
1837 }
1838 }
1839
1840 #===================================================
1841 # Mailer subroutine
1842 #--- only works for some Linux installations.. as it requires sendmail
1843 sub send_mail {
1844 my $hash=shift;
1845 $hash->{'from'}=$USER.'@'.$HOSTNAME
1846 unless defined($hash->{'from'});
1847 my $to=$hash->{'to'};
1848 unless ($to) {
1849 $hash->{'to'}=$USER.'@'.$DOMAIN;
1850 }
1851 else {
1852 $hash->{'to'}=$to.'@'.$DOMAIN unless $to =~ m/@/;
1853 }
1854
1855 my $file;
1856 local *ADDFILE;
1857 if (defined($hash->{file})) {
1858 #warning: assumes it's a decent, short text file!
1859 local $/=undef; #read whole file
1860 open(ADDFILE, '<'.$hash->{file}) || return "Cannot open file ".$hash->{file}."\n";
1861 $file=<ADDFILE>;
1862 close ADDFILE;
1863 }
1864 my $body = $hash->{'body'};
1865 $body.=$file;
1866 #my $fh;
1867 local* FH;
1868 open(FH, '| /usr/lib/sendmail -t -oi') || die "Error: cannot open sendmail pipe!\n";
1869 print(FH "To: $hash->{to}\n");
1870 print(FH "From: $hash->{from}\n");
1871 print(FH "Subject: $hash->{subj}\n\n");
1872 print(FH $body);
1873 close(FH);
1874 }
1875
1876 sub getTime {
1877 my $date=localtime();
1878 #get rid of the day so Sybase will accept it
1879 (my $wday,$date)=split(/\s+/,$date,2);
1880 return $date;
1881 }

Properties

Name Value
svn:executable *