1 |
#!/usr/bin/perl |
2 |
require 5.8.0; |
3 |
use strict; |
4 |
use Getopt::Std; |
5 |
use File::Basename; #for basename, dirname |
6 |
use POSIX "sys_wait_h"; # mostly for the handy uname() function |
7 |
use Cwd qw(abs_path cwd); |
8 |
use Fcntl qw(:DEFAULT :seek); #mostly for the SEEK constants |
9 |
use FindBin; use lib "$FindBin::Bin"; |
10 |
|
11 |
my $NORMAL_ENDING=1; #set to 0 while working, to 1 upon normal ending of script (i.e. no die()) |
12 |
my $USER=$ENV{USER} || POSIX::cuserid(); #it may not be there for condor workers |
13 |
my $PWD=cwd(); #from Cwd module |
14 |
my $PERL_BIN='/usr/bin/perl'; |
15 |
my $MAX_RETRIES=3; #-- how many times to try a failed task |
16 |
my $F_WRKCOUNT='.wrkCount'; # count of currently running workers |
17 |
my $F_ALLDONE='.all.Done.'; |
18 |
my $F_WRKSTART='.wrkStarted'; #count of (ever) started workers |
19 |
my $F_LASTTASK='.lastTask'; # maximum task# ever started |
20 |
my $F_TASKSDONE='tasksDone'; # number of tasks successfully finished |
21 |
my $F_ERRTASKS='err.tasks'; #LIST of task#s which returned non-zero status |
22 |
# even after MAX_RETRIES |
23 |
my $F_RETRYTASKS='retry.tasks'; #stack of task#s which returned non-zero status |
24 |
#less then MAX_RETRIES times |
25 |
my $F_WRKRUNNING='.running-worker'; #semaphore file in each worker directory |
26 |
my $F_ENDCMD='epilogue'; |
27 |
my $F_WRKDIR='workdir'; |
28 |
my $F_NOTIFY='notify'; |
29 |
my $F_TASKDB='taskDb'; |
30 |
my $GRID_DEBUG; |
31 |
my $starting_dir; |
32 |
my $STARTED_GRID_TASK; |
33 |
my $SMPChildren=0; #SMP case: number of children running |
34 |
my %SMPChildren=(); #set of child PIDs |
35 |
my %Locks = (); # filehandle -> [lockfilelink, hostlockfile] |
36 |
my $HOSTNAME = (&POSIX::uname)[1]; # can't trust HOST envvar under condor, because |
37 |
# it will either inherit HOST and HOSTNAME from submitter env for getenv=TRUE, |
38 |
# OR it will not set anything for getenv=FALSE |
39 |
chomp($HOSTNAME); |
40 |
$HOSTNAME=lc($HOSTNAME); |
41 |
my $HOST=$HOSTNAME; |
42 |
my ($DOMAIN)=($HOST=~m/^[\w\-]+\.(.+)/); |
43 |
unless ($DOMAIN) { |
44 |
if ($HOST=~m/^flicker|^wren|^ibis/) { |
45 |
$DOMAIN='umiacs.umd.edu'; |
46 |
} |
47 |
unless ($DOMAIN) { |
48 |
$DOMAIN=($PWD=~/^\/export|^\/home/) ? 'dfci.harvard.edu' : 'umiacs.umd.edu'; |
49 |
} |
50 |
} |
51 |
($HOST)=($HOST=~m/^([\w\-]+)/); |
52 |
$ENV{HOST}=$HOST; |
53 |
$ENV{MACHINE}=$HOST; |
54 |
$ENV{HOSTNAME}=$HOSTNAME; |
55 |
$ENV{USER}=$USER; |
56 |
#&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! - site specific - |
57 |
#---------- modify the SITE-SPECIFIC paths here if needed: |
58 |
#&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! |
59 |
# ===== PATH management ==== |
60 |
# default: assume architecture/PATH/LIB uniformity across nodes |
61 |
# you can change this by adding your own worker/server paths here |
62 |
my $binpath=$ENV{PATH}; |
63 |
my $libpath=$ENV{LD_LIBRARY_PATH}; |
64 |
my $perllib=$ENV{PERLLIB}; |
65 |
my $perl5lib=$ENV{PERL5LIB}; |
66 |
my $pythonpath=$ENV{PYTHONPATH}; |
67 |
|
68 |
# use home directory for symbolic links to working directories |
69 |
my $homebase=$ENV{HOME}; |
70 |
$homebase=~s/(\/[^\/]+$)//; |
71 |
|
72 |
$homebase='/fs/wrenhomes' unless $homebase; #this is just valid for UMIACS here |
73 |
# sometimes the HOME path is not defined within the condor job, |
74 |
# please use your own globally mounted path instead |
75 |
|
76 |
# default grid engine used: |
77 |
#my $GRID_ENGINE='sge'; # can also be: 'condor' and 'smp' |
78 |
my ($GRID_MONHOME, $GRID_ENGINE) = ($DOMAIN=~/dfci/) ? ('/home', 'sge') |
79 |
: ($homebase, 'condor'); |
80 |
my $sysopen_mode = O_RDWR | O_CREAT ; |
81 |
$sysopen_mode |= O_DIRECT if $DOMAIN!~/umiacs/; |
82 |
|
83 |
#print STDERR "|$GRID_MONHOME|\n"; |
84 |
#&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! - |
85 |
# |
86 |
my $morehints=qq{ |
87 |
The following entries will be created by gridx: |
88 |
<GRID_JOBDIR>/epilogue - only if -e option was given, a file containing the |
89 |
<end_cmd> parameter |
90 |
<GRID_JOBDIR>/notify - a file containing the e-mail address to notify |
91 |
if -m was given |
92 |
<GRID_JOBDIR>/.lastTask - a file keeping track of the last task# scheduled |
93 |
<GRID_JOBDIR>/.wrkStarted - a file keeping track of the number of |
94 |
<GRID_JOBDIR>/.wrkCount - a file keeping track of the number of |
95 |
currenly running workers |
96 |
<GRID_JOBDIR>/err.tasks - list of all error-terminated tasks, after retry |
97 |
(should be zero or empty if all tasks |
98 |
finished successfully) |
99 |
<GRID_JOBDIR>/retry.tasks - list of all error-terminated tasks that will |
100 |
still be retried |
101 |
<GRID_JOBDIR>/taskDb - pseudo-fasta db with the info & status |
102 |
of each task |
103 |
<GRID_JOBDIR>/taskDb.cidx - the cdbfasta index of the above db file |
104 |
<GRID_JOBDIR>/locks/ - lockfiles/links are placed here |
105 |
<GRID_JOBDIR>/running/ - while a task is processed, a file entry |
106 |
called task_<GRID_TASK> will be created in |
107 |
here for locking purposes; such file will have |
108 |
a line with processing info for the current |
109 |
task: <hostname> <pid> <CPU#> |
110 |
<GRID_JOBDIR>/wrk_<CPU#>/ - working directory exclusive to each worker |
111 |
process, one per CPU (1 <= CPU <= maxCPUs); |
112 |
also contains stderr and stdout log files |
113 |
for each worker process (wrk_stderr.log and |
114 |
wrk_stdout.log) |
115 |
* if -i option is used, a slice db file is created for <fastadb> with the |
116 |
positions of all slices of <numseqs> fasta records in the <fastadb> file. |
117 |
This file is called <GRID_JOBDIR>/taskDb |
118 |
* if -b option was given: executes the <begin_script> in the current submit |
119 |
directory (the one above <GRID_JOBDIR>) |
120 |
* if all tasks completed successfully and if -e option was given |
121 |
the <end_script> is executed in the <GRID_JOBDIR> subdirectory |
122 |
}; |
123 |
|
124 |
my $usage=qq{ |
125 |
Submit a list of commands to the grid. Can also automatically |
126 |
slice a multi-fasta file into chunks and pass the slice file names as |
127 |
parameters to a <cmd>: |
128 |
|
129 |
Usage: |
130 |
|
131 |
gridx \{-f <cmds_file> | -i <fastadb> | -r <numtasks> \} -p <numprocs> |
132 |
\{-U|-u <slot#>\} [-n <numseqs>] [-s <skipseqs>] [-t <totalseqs>] |
133 |
[-g <engine>] [-d <dir_prefix>] [-a] [-M <monitoring_basedir>] |
134 |
[-L <local_wrk_dir>] [-O <log_files_dir>] [-S] [-T] [-v] |
135 |
[-x <node_exclude_list>] [-y <node_targets_list>] |
136 |
[-b <begin_script>] [-e <end_script>] [-q] [-m <e-mail>] |
137 |
[-c] <cmd> [-C] [<cmd_args>..] |
138 |
|
139 |
Unless -J option is given, gridx will submit the job to the grid, |
140 |
creating a <GRID_JOBDIR> subdirectory (which on Condor has the format: |
141 |
gridx-<hostname>_<job_id>). A file called 'cmdline-<numtasks>.cmd' |
142 |
will also be created there containing the current gridx commmand line. |
143 |
|
144 |
Options: |
145 |
-g grid engine to use, can be 'smp', 'condor' or 'sge' (default: $GRID_ENGINE) |
146 |
-p maximum number of grid CPUs to allocate for this job |
147 |
-f provide a file with a list of commands to be run on the grid (one |
148 |
command per line per CPU) |
149 |
-i grid processing of slices of the given multi-fasta file <fastadb>; |
150 |
the total number of tasks/iterations is given by the number of slices |
151 |
in <fastadb> (and the -n <numseqs> option); the script will |
152 |
automatically create the slice file in the worker directory before |
153 |
<cmd> is run with these parameters: |
154 |
<fastaslice> <numseqs> <slice#> <is_last> <skipped> <total> <cmd_args> |
155 |
-n slice size: for -i option, how many fasta records from <fastadb> |
156 |
should be placed into a slice file for each iteration (default: 2000) |
157 |
-C legacy option for psx-like usage (to pass <cmd_args>) |
158 |
-r submit an array job with <numtasks> iterations/tasks (default: 1) |
159 |
|
160 |
-S switch to (stay in) the current directory (where gridx was launched from) |
161 |
before each task execution (especially useful for -f option) |
162 |
-a (psx legacy): inherit the submitter environment (default) |
163 |
-b prologue script to be executed BEFORE the grid job is submitted |
164 |
-e epilogue script to be executed AFTER all the grid tasks were completed, |
165 |
and only if ALL the tasks terminated successfully. |
166 |
-m send an e-mail to the given address when all tasks are finished |
167 |
-v do not try to validate <cmd> (assume it is going to be valid on the grid |
168 |
nodes) |
169 |
-x provide a list of machines (host names) that should be excluded |
170 |
from the condor pool for this submitted job ('condor' engine only) |
171 |
-y the submitted job is only allowed to run on the machines on this list |
172 |
('condor' engine only) |
173 |
-d create <dir_prefix> prefixed symbolic links to worker directories |
174 |
in the current directory |
175 |
-O place all Condor log files (condor stderr/stdout redirects) |
176 |
into <log_files_dir> |
177 |
-M parent directory for creating a "monitoring" symlink; a symbolic link to |
178 |
the current <GRID_JOBDIR> will be created there, under |
179 |
<monitoring_basedir>/$USER/ (default: home directory) |
180 |
-T do not exit after the job is submitted, wait around instead |
181 |
until the last worker terminates and prints some job completion stats |
182 |
-U force one worker per machine (only for 'condor' engine) |
183 |
-u force each worker to run only on CPU <slot#> (between 1 and 12) of each |
184 |
machine (only for 'condor' engine) |
185 |
For -r option, the provided <cmd> could make use of the environment variables |
186 |
GRID_TASK, GRID_TASKLAST to determine the current iteration being executed. |
187 |
|
188 |
Unless -S option was used, each <cmd> iteration will be executed in its own |
189 |
worker subdirectory <GRID_JOBDIR>/wrk_<CPU#> |
190 |
|
191 |
Job monitoring/resuming/stopping (-J mode): |
192 |
|
193 |
gridx [-m e-mail] [-e <end_script>] [-K|-R] -J <jobID> |
194 |
|
195 |
If -J option is provided, gridx will report the status of the job <jobID> |
196 |
which must have been submitted by a previous, "regular" use of gridx; it relies |
197 |
on the the symbolic link <monitor_basedir>/$USER/gridx-<jobID> which must |
198 |
be valid (<monitor_basedir> can be set by -M). |
199 |
|
200 |
Additional/alternate actions for -J mode: |
201 |
-e update the <end_script> for the *running* <jobID> or for the <jobID> |
202 |
rerun (if -R option is also given); does not work with -K option |
203 |
-m update the e-mail notification option for job <jobID> |
204 |
-K kill/abandon/terminate ALL pending tasks for grid job jobID |
205 |
(trying to remove all running/pending tasks on the grid) |
206 |
-R rerun/resubmit all the unfinished/unprocessed or unsuccessful tasks |
207 |
for <GRID_JOB>; this assumes -K -J <JOB_ID> was given first (so there |
208 |
are no pending tasks) and then will submit a new job in the same working |
209 |
directory, renaming the GRID_JOBDIR accordingly while workers |
210 |
will now *skip* all the tasks found with a "Done" status ('.') in the |
211 |
<GRID_JOBDIR>/taskDb file |
212 |
}; #' |
213 |
|
214 |
|
215 |
RESUME_JOBID: |
216 |
my @ar=@ARGV; |
217 |
while ($ar[0] eq '-Z') { shift(@ar); shift(@ar); } |
218 |
my $CMDLINE="$FindBin::Script\t".join("\t",@ar); |
219 |
# parse script options |
220 |
print STDERR "Running on $HOST (domain: $DOMAIN): $0 ".join(' ',@ARGV)."\n"; |
221 |
getopts('USWHM:O:NKRDTqvaJZ:L:u:r:x:y:i:Ff:n:t:d:s:p:m:g:b:e:c:C:') || die($usage."\n"); |
222 |
umask 0002; |
223 |
if ($Getopt::Std::opt_H) { |
224 |
print STDERR $usage; |
225 |
die($morehints."\n"); |
226 |
} |
227 |
my $SwitchDir=$Getopt::Std::opt_S; |
228 |
$NORMAL_ENDING=0; |
229 |
$GRID_DEBUG=$Getopt::Std::opt_D; |
230 |
#print STDERR "Host=$HOST, Domain=$DOMAIN\n" if $GRID_DEBUG; |
231 |
|
232 |
my $GRID_DIRPREFIX=$Getopt::Std::opt_d; |
233 |
my ($submitJob, $removeJob); |
234 |
$GRID_ENGINE=lc($Getopt::Std::opt_g) if $Getopt::Std::opt_g; |
235 |
if ($GRID_ENGINE eq 'sge') { |
236 |
($submitJob, $removeJob)=(\&submitJob_sge, \&removeJob_sge); |
237 |
} |
238 |
elsif ($GRID_ENGINE eq 'condor') { |
239 |
($submitJob, $removeJob)=(\&submitJob_condor, \&removeJob_condor); |
240 |
} |
241 |
elsif ($GRID_ENGINE eq 'smp') { |
242 |
($submitJob, $removeJob)=(\&submitJob_smp, \&removeJob_smp); |
243 |
} |
244 |
else { |
245 |
die("Error: invalid grid engine given (only 'sge', 'condor' or 'smp' are accepted)!\n"); |
246 |
} |
247 |
my $UniqueVM=$Getopt::Std::opt_U; |
248 |
|
249 |
my $UniqVMreq=$Getopt::Std::opt_u; |
250 |
$UniqVMreq=int($UniqVMreq) if $UniqVMreq; |
251 |
die("Error: use either -U or -u option, not both!\n") if $UniqueVM && $UniqVMreq>0; |
252 |
# - exclude the following machines |
253 |
my @xmachinelist=split(/\,/,$Getopt::Std::opt_x); #do NOT allow jobs to run on these machines |
254 |
my @ymachinelist=split(/\,/,$Getopt::Std::opt_y); #only allow jobs to run on these machines, not others |
255 |
#submitJob MUST use the globals: |
256 |
# GRID_CMD, GRID_TASKLAST, GRID_MONHOME and update GRID_JOB |
257 |
|
258 |
$GRID_MONHOME=$Getopt::Std::opt_M if $Getopt::Std::opt_M; |
259 |
#$GRID_MONHOME=$ENV{HOME} unless $GRID_MONHOME; |
260 |
$GRID_MONHOME.='/'.$USER unless $GRID_MONHOME=~m/$USER$/; |
261 |
# |
262 |
|
263 |
die("Error: directory $GRID_MONHOME should already be created! Aborting..\n") |
264 |
unless (-d $GRID_MONHOME); |
265 |
|
266 |
my $mailnotify=$Getopt::Std::opt_m; |
267 |
#-------- GLOBALs --------------------------- |
268 |
my $GRID_JOBDIR; |
269 |
my $GRID_JOB; |
270 |
my $GRID_LOCKDIR; |
271 |
my $GRID_TASKLAST; |
272 |
my $GRID_CMD; # user's command and arguments |
273 |
my $GRID_CMDFILE=$Getopt::Std::opt_f; # one line per run.. with arguments for command <cmd> |
274 |
my $GRID_USECMDLIST=1 if $GRID_CMDFILE || $Getopt::Std::opt_F; |
275 |
|
276 |
my $GRID_PSXFASTA; #if -i was used |
277 |
my $GRID_PSXSTEP; #if -i was used (-n value) |
278 |
my $GRID_PSXSKIP=0; # -s option |
279 |
my $GRID_PSXTOTAL=0; # -t option |
280 |
|
281 |
my $GRID_NUMPROCS=0; # -p option |
282 |
my $GRID_RESUME=$Getopt::Std::opt_R || $Getopt::Std::opt_Z; |
283 |
my $GRID_LOCAL_JOBDIR=$Getopt::Std::opt_L; |
284 |
#---------- worker side global vars: |
285 |
my $GRID_ENVSET=0; #was the environment set? |
286 |
my $GRID_WORKER=0; # worker# |
287 |
#my $GRID_NOWRKLINKS=1; |
288 |
my $GRID_LOGDIR=$Getopt::Std::opt_O; |
289 |
my $GRID_TASK; # dynamic -- task iteration# |
290 |
my $TASK_ERRCOUNT; # dynamic -- current task error (retry) counter |
291 |
my $TASK_DATA; # current task's user data as stored in taskDb |
292 |
my $TASK_LOCKH; #file handle for the current task lock file |
293 |
my $TASK_LOCKF; #file name for the current task lock file |
294 |
my $GRID_WRKDIR; #only for the worker case, it's the current worker's subdirectory |
295 |
|
296 |
if ($Getopt::Std::opt_J) { |
297 |
################################################################# |
298 |
# gridx job monitoring use: |
299 |
#--------------------------------------------------- |
300 |
# gridx -J [-m <e-mail>] [-M <mondir>] [-R | -K] <jobid> |
301 |
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv |
302 |
my ($jobid)=shift(@ARGV); |
303 |
$jobid=~tr/ //d; |
304 |
unless ($jobid) { # list mode |
305 |
my $fmask="$GRID_MONHOME/gridx-*"; |
306 |
my @jdirs=<${fmask}>; |
307 |
if (@jdirs>0) { |
308 |
print STDOUT "The following jobs were found (in $GRID_MONHOME):\n"; |
309 |
foreach (@jdirs) { |
310 |
my ($jobid)=(m/gridx\-(\w[\w\-]+)$/); |
311 |
print " $jobid\n"; |
312 |
} |
313 |
} |
314 |
else { |
315 |
print STDOUT "No jobs were found (in $GRID_MONHOME).\n"; |
316 |
} |
317 |
$NORMAL_ENDING=1; |
318 |
exit(0); |
319 |
} |
320 |
#a valid $jobid was given, hopefully |
321 |
$jobid=lc($jobid); |
322 |
my $subdir='gridx-'.$jobid; |
323 |
my $jobdir="$GRID_MONHOME/gridx-$jobid"; |
324 |
unless (-d $jobdir) { #try current directory.. |
325 |
if (-d $subdir) { |
326 |
$jobdir=$subdir; |
327 |
} |
328 |
else { |
329 |
die "No such job found($jobid) - neither $jobdir nor $subdir exist!\n"; |
330 |
} |
331 |
} |
332 |
#print STDERR "..found jobdir='$jobdir'\n"; |
333 |
chdir($jobdir) || die("Error at chdir($jobdir)!\n"); |
334 |
#my $msg=jobSummary($mailnotify); |
335 |
my $msg=jobSummary(); |
336 |
print STDOUT $msg."\n"; |
337 |
if ($Getopt::Std::opt_K) { |
338 |
&$removeJob($jobid); |
339 |
} |
340 |
elsif ($Getopt::Std::opt_R) { #resume/rerun! |
341 |
#chdir($jobdir) || die("Error at chdir($jobdir)!\n"); |
342 |
my @cmdfile=<cmdline-*.cmd>; |
343 |
die "Error getting the cmdline-*.cmd from current directory!\n" |
344 |
unless @cmdfile; |
345 |
my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/); |
346 |
die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0; |
347 |
my $cmdline=readFile($cmdfile[0]); |
348 |
chomp($cmdline); |
349 |
my @args=split(/\t/,$cmdline); |
350 |
die("$jobdir/$F_TASKDB and/or index not valid - cannot resume!\n") |
351 |
unless -s $F_TASKDB && -s "$F_TASKDB.cidx"; |
352 |
shift(@args); #discard gridx command itself |
353 |
chdir('..'); #go in the original working dir |
354 |
$PWD=cwd(); #from Cwd module |
355 |
$CMDLINE="$FindBin::Script\t".join("\t",@args); |
356 |
@ARGV=('-Z',$jobid, @args); |
357 |
undef($Getopt::Std::opt_J); |
358 |
undef($Getopt::Std::opt_R); |
359 |
goto RESUME_JOBID; |
360 |
} |
361 |
$NORMAL_ENDING=1; |
362 |
exit(0); |
363 |
} |
364 |
elsif ($Getopt::Std::opt_W) { |
365 |
########################################################## |
366 |
# gridx Worker use: |
367 |
#--------------------------------------------------------- |
368 |
# This is the actual job that gets submitted |
369 |
# gridx -W <cmd> <cmd_args> |
370 |
# At runtime the environment should be set accordingly |
371 |
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv |
372 |
beginWorker(); #set up environment and worker directory |
373 |
#(wrk_NNNNN) and chdir() to it |
374 |
$GRID_USECMDLIST=$Getopt::Std::opt_F; |
375 |
my @cmd=@ARGV; # cmd and cmdargs |
376 |
while (my $taskId = getNextTask()) { |
377 |
runTask($taskId, @cmd); |
378 |
} |
379 |
my $runningworkers=endWorker(); |
380 |
my $exitcode=0; |
381 |
if ($runningworkers==0) { #this was the last worker! |
382 |
#run any JOB finishing/clean up code |
383 |
chdir($GRID_JOBDIR); |
384 |
my $epilogue=readFile($F_ENDCMD) if -s $F_ENDCMD; |
385 |
if ($epilogue) { |
386 |
chomp($epilogue); |
387 |
#only run it if ALL tasks succeeded |
388 |
my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone); |
389 |
if ($tasksdone==$GRID_TASKLAST && !(-s $F_ERRTASKS)) { |
390 |
runCmd($epilogue); |
391 |
} |
392 |
} |
393 |
my $notify=readFile($F_NOTIFY) if -s $F_NOTIFY; |
394 |
chomp($notify); |
395 |
jobSummary($notify); |
396 |
local *FDONE; |
397 |
open(FDONE, '>'.$GRID_JOBDIR.'/'.$F_ALLDONE) || die("Error creating $GRID_JOBDIR/$F_ALLDONE\n"); |
398 |
print FDONE "done.\n"; |
399 |
close(FDONE); |
400 |
} #last worker ending |
401 |
$NORMAL_ENDING=1; |
402 |
exit(0); |
403 |
}# -- worker use case |
404 |
|
405 |
|
406 |
########################################################## |
407 |
# |
408 |
# gridx Submit use: |
409 |
# |
410 |
########################################################## |
411 |
|
412 |
my $gridBeginCmd=$Getopt::Std::opt_b; |
413 |
my $gridEndCmd=$Getopt::Std::opt_e; |
414 |
$GRID_RESUME=$Getopt::Std::opt_Z; #caused by an initial -J -R request |
415 |
$GRID_TASKLAST=$Getopt::Std::opt_r; |
416 |
#--submit specific globals -- temporary |
417 |
my $TASKDB; |
418 |
my $JOBID; |
419 |
#-- |
420 |
unless ($GRID_CMDFILE) { |
421 |
$GRID_CMD=$Getopt::Std::opt_c || shift(@ARGV); |
422 |
die "$usage\nError: no command given!\n" unless $GRID_CMD; |
423 |
unless ($Getopt::Std::opt_v) { |
424 |
$GRID_CMD = getCmdPath($GRID_CMD) || |
425 |
die "Error: command $GRID_CMD not found in PATH!\n"; |
426 |
} |
427 |
$GRID_CMD.=' '.$Getopt::Std::opt_C if $Getopt::Std::opt_C; |
428 |
$GRID_CMD.=' '.join(' ',@ARGV) if @ARGV; |
429 |
} |
430 |
else { |
431 |
$GRID_CMD='.'; |
432 |
} |
433 |
$GRID_NUMPROCS=$Getopt::Std::opt_p || 1; #numer of workers submitted |
434 |
|
435 |
if ($GRID_PSXFASTA=$Getopt::Std::opt_i) { #psx emulation case |
436 |
die "Error: -r and -i are mutually exclusive options!\n" if $GRID_TASKLAST; |
437 |
$GRID_PSXSTEP=$Getopt::Std::opt_n || 1; |
438 |
#it will be moved into GRID_JOBDIR right after submit |
439 |
$GRID_PSXSKIP=$Getopt::Std::opt_s || 0; |
440 |
$GRID_PSXTOTAL=$Getopt::Std::opt_t || 0; |
441 |
#$GRID_NOWRKLINKS=$Getopt::Std::opt_N; |
442 |
} |
443 |
else { $GRID_TASKLAST=1 unless $GRID_TASKLAST; } #one shot run |
444 |
prepareTaskDb(); #builds a taskDb in the current directory for now |
445 |
#checks global $GRID_PSXFASTA and $GRID_PSXSTEP |
446 |
#sets TASKDB to the file name, and GRID_TASKLAST is updated as needed |
447 |
|
448 |
if ($gridBeginCmd) { |
449 |
system($gridBeginCmd) && die("Error: exit status $? returned by prologue command: '$gridBeginCmd'\n"); |
450 |
} |
451 |
$JOBID = &$submitJob({PATH=> $binpath, PYTHONPATH=> $pythonpath, |
452 |
LD_LIBRARY_PATH=>$libpath, PERLLIB=>$perllib, PERL5LIB=>$perl5lib} ); |
453 |
#-- submitJob should also call setupJobDir() to create the gridx-<JOBID> subdirectory |
454 |
#-- and move/rename the taskDb in there. |
455 |
die("Error: no job ID has been obtained!\n") unless $JOBID; |
456 |
if ($Getopt::Std::opt_T) { #wait for all children to finish |
457 |
my $wstat; |
458 |
chdir($GRID_JOBDIR); |
459 |
do { |
460 |
sleep(5); |
461 |
} until (-e $F_ALLDONE); |
462 |
my $msg=jobSummary($mailnotify); |
463 |
my $exitcode = (-s $F_ERRTASKS) ? `wc -l $F_ERRTASKS` : 0; |
464 |
chomp($exitcode); |
465 |
print STDOUT $msg."\n"; |
466 |
chdir($PWD); #this should be the original working directory |
467 |
$NORMAL_ENDING=1; |
468 |
exit($exitcode); |
469 |
} |
470 |
$NORMAL_ENDING=1; |
471 |
exit(0); |
472 |
# if ($notify) { |
473 |
# #qsub -o /dev/null -N "$cmd-$jobid-($numtasks)-watcher" -hold_jid $jobid -b y -j y $notify sleep 0 |
474 |
# my $sgecmd=`which sgepl`; |
475 |
# qsub -o $GRID_MONHOME/.gridjob_$jobid/watcher.log -N "$cmdname-$jobid-($numtasks)-watcher" \ |
476 |
# -hold_jid $jobid -b y -j y -V /bin/tcsh -f $sgecmd -Done $cmdname $jobid $numtasks |
477 |
# echo "Submitted watchdog job $cmdname-$jobid-($numtasks)-watcher:" |
478 |
# echo " $sgecmd -Done $cmdname $jobid $numtasks" |
479 |
# } |
480 |
|
481 |
#********************************************************** |
482 |
#******************* SUBROUTINES ************************** |
483 |
|
484 |
|
485 |
|
486 |
#******************** SUBMIT SIDE ************************* |
487 |
|
488 |
=head2 ----------- taskDbRec ----------- |
489 |
|
490 |
taskDbRec($fhdb, $jobId, $command_line..) |
491 |
|
492 |
Creates (writes) a record into the taskDb file open for writing |
493 |
with the file handle $fhdb. This subroutine takes care of |
494 |
formatting the fixed length defline (header) of the job record. |
495 |
|
496 |
This subroutine should only be used by the program which |
497 |
creates the taskDb. After all the records are added to the taskDb; |
498 |
the taskDb should be eventually indexed with cdbfasta. |
499 |
|
500 |
=cut |
501 |
|
502 |
sub taskDbRec { |
503 |
my ($fh, $taskId, @userdata)=@_; |
504 |
my $rec='>'.$taskId."\t{-|-|----|----|--------}\t{".('-' x 25).'}'; |
505 |
$rec.="\t".join(' ',@userdata) if @userdata; |
506 |
print($fh $rec."\n"); |
507 |
} |
508 |
|
509 |
|
510 |
sub prepareTaskDb { |
511 |
if ($GRID_PSXFASTA) { #-i given, psx mode |
512 |
$GRID_PSXFASTA=getFullPath($GRID_PSXFASTA, 1); |
513 |
die ("Error: prepareTaskDb() called with PSXFASTA but no PSXSTEP!\n") |
514 |
unless $GRID_PSXSTEP; |
515 |
my $basename=getFName($GRID_PSXFASTA); |
516 |
$TASKDB='gridx-psx-'.$basename. |
517 |
".n$GRID_PSXSTEP.s$GRID_PSXSKIP.t$GRID_PSXTOTAL".'.taskDb'; |
518 |
if ($GRID_RESUME) { |
519 |
#taskDb must already be there! |
520 |
my $r=`cdbyank -s gridx-$GRID_RESUME/taskDb.cidx`; |
521 |
($GRID_TASKLAST)=($r=~m/\nNumber of records:\s+(\d+)\n/s); |
522 |
die "Error: couldn't determine number of records in gridx-$GRID_RESUME/taskDb\n" |
523 |
unless $GRID_TASKLAST>0; |
524 |
return; |
525 |
} |
526 |
|
527 |
#-- iterate through the fasta file, create slice index |
528 |
# and the index for the slice index.. |
529 |
local *TASKDB; |
530 |
local *PSXFASTA; |
531 |
open(PSXFASTA, $GRID_PSXFASTA) || die "Error opening file $GRID_PSXFASTA\n"; |
532 |
binmode(PSXFASTA); |
533 |
open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n"; |
534 |
my $foffset=0; |
535 |
my $rcount=0; |
536 |
my $numrecs=0; |
537 |
$GRID_TASKLAST=0; |
538 |
while (<PSXFASTA>) { |
539 |
my $linelen=length($_); |
540 |
if (/^>/) { |
541 |
$rcount++; |
542 |
if ($rcount>$GRID_PSXSKIP) { |
543 |
$numrecs++; |
544 |
if (($numrecs-1) % $GRID_PSXSTEP == 0) { |
545 |
$GRID_TASKLAST++; |
546 |
taskDbRec(\*TASKDB, $GRID_TASKLAST, $foffset); |
547 |
} |
548 |
} |
549 |
last if ($GRID_PSXTOTAL>0 && $numrecs>$GRID_PSXTOTAL); |
550 |
} |
551 |
$foffset+=$linelen; |
552 |
} |
553 |
close(PSXFASTA); |
554 |
#&taskdbRec(\*TASKDB, $GRID_TASKLAST, $foffset) |
555 |
# unless ($numrecs-1) % $GRID_PSXSTEP == 0; |
556 |
close(TASKDB); |
557 |
} |
558 |
elsif ($GRID_CMDFILE) { # -f option given |
559 |
return if ($GRID_RESUME); #taskDb must already be there! |
560 |
$TASKDB="gridx.$GRID_CMDFILE.taskDb"; |
561 |
local *TASKDB; |
562 |
open(CMDFILE, $GRID_CMDFILE) || die "Error opening file $GRID_CMDFILE\n"; |
563 |
open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n"; |
564 |
my $i=1; |
565 |
while(<CMDFILE>) { |
566 |
next if m/^\s*#/; |
567 |
chomp; |
568 |
taskDbRec(\*TASKDB, $i, $_); |
569 |
$i++; |
570 |
} |
571 |
close(TASKDB); |
572 |
close(CMDFILE); |
573 |
$GRID_TASKLAST=$i-1; |
574 |
} |
575 |
elsif ($GRID_TASKLAST) { # -r option given |
576 |
return if ($GRID_RESUME); #taskDb must already be there! |
577 |
$TASKDB="gridx.r$GRID_TASKLAST.taskDb"; |
578 |
|
579 |
local *TASKDB; |
580 |
open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n"; |
581 |
for (my $i=1;$i<=$GRID_TASKLAST;$i++) { |
582 |
taskDbRec(\*TASKDB, $i, $GRID_CMD); |
583 |
} |
584 |
close(TASKDB); |
585 |
} |
586 |
else { return; } |
587 |
runCmd("cdbfasta $TASKDB"); |
588 |
} |
589 |
|
590 |
sub getFName { |
591 |
return basename($_[0]); |
592 |
} |
593 |
|
594 |
sub getFDir { |
595 |
return dirname($_[0]); |
596 |
} |
597 |
|
598 |
sub getFullPath { |
599 |
my ($fname, $check)=@_; |
600 |
die("Error: file $fname does not exist!\n") if $check && !-r $fname; |
601 |
return abs_path($fname); #Cwd module |
602 |
} |
603 |
|
604 |
#== getCmdPath -- checks for executable in the PATH if no full path given |
605 |
# tests if the executable is a text file and interpreter was requested |
606 |
sub getCmdPath { |
607 |
my $cmd=$_[0]; |
608 |
my $fullpath; |
609 |
my $checkBinary=wantarray(); |
610 |
if ($cmd =~ m/^\//) { |
611 |
$fullpath = (-x $cmd) ? $cmd : ''; |
612 |
} |
613 |
elsif ($cmd =~ m/^\.+\//) { #relative path given |
614 |
$fullpath= (-x $cmd) ? abs_path($cmd) : ''; |
615 |
} |
616 |
else { #we search in the path.. |
617 |
my @paths=split(/:/, $ENV{'PATH'}); |
618 |
foreach my $p (@paths) { |
619 |
if (-x $p.'/'.$cmd) { |
620 |
$fullpath=$p.'/'.$cmd; |
621 |
last; |
622 |
} |
623 |
} |
624 |
}#path searching |
625 |
if ($checkBinary) { #asked for interpreter, if any |
626 |
if ($fullpath) { |
627 |
my $interpreter=''; |
628 |
if (-r $fullpath && -T $fullpath) {#readable text file, look for bang line |
629 |
open(TFILE, $fullpath); |
630 |
my $linesread=1; |
631 |
while ($linesread<10) {#read only the first 10 lines.. |
632 |
$_=<TFILE>; |
633 |
chomp; |
634 |
if (m/^\s*#\!\s*(\S.+)/) { |
635 |
$interpreter=$1; |
636 |
last; |
637 |
} |
638 |
$linesread++; |
639 |
} |
640 |
$interpreter=~s/\s+$//; |
641 |
} |
642 |
return ($fullpath, $interpreter); |
643 |
} |
644 |
else { return (); } #cmd not found; |
645 |
} |
646 |
else { return $fullpath; } |
647 |
} |
648 |
|
649 |
sub runCmd { |
650 |
my ($cmd, $jobid)=@_; |
651 |
my $exitstatus=system($cmd); |
652 |
if ($exitstatus != 0) { |
653 |
if ($jobid) { |
654 |
&$removeJob($jobid); |
655 |
} |
656 |
die("Error at running system command: $cmd\n"); |
657 |
} |
658 |
} |
659 |
|
660 |
|
661 |
sub removeJob_sge { |
662 |
my $jobid=shift(@_); |
663 |
runCmd("qdel -f $jobid"); |
664 |
} |
665 |
|
666 |
sub removeJob_condor { |
667 |
my $jobid=shift(@_); #machine+'_'+job# |
668 |
#must be on the same machine that submit was issuedlh |
669 |
my ($hostname, $job)=($jobid=~m/([\w\-]+)_(\d+)$/); |
670 |
die("Error parsing hostname, job# from $jobid!\n") |
671 |
unless $hostname && ($job>0); |
672 |
#print STDERR "$hostname, $HOST, $jobid, $job\n"; |
673 |
if (lc($hostname) eq lc($HOST)) { #local host |
674 |
runCmd("condor_rm $job"); |
675 |
} |
676 |
else { |
677 |
runCmd("condor_rm -name $hostname $job"); |
678 |
} |
679 |
} |
680 |
|
681 |
|
682 |
sub smpTaskReaper { # takes care of dead children $SIG{CHLD} = \&taskReaper; |
683 |
my $childpid; |
684 |
while (($childpid = waitpid(-1, WNOHANG)) > 0) { |
685 |
$SMPChildren --; |
686 |
delete $SMPChildren{$childpid}; |
687 |
} |
688 |
$SIG{CHLD}=\&smpTaskReaper; |
689 |
} |
690 |
|
691 |
sub smpTaskKiller { # signal handler for SIGINT |
692 |
local($SIG{CHLD}) = 'IGNORE'; # we're going to kill our children |
693 |
kill 'INT' => keys %SMPChildren; |
694 |
} |
695 |
|
696 |
sub removeJob_smp { |
697 |
taskKiller(); |
698 |
|
699 |
} |
700 |
|
701 |
sub submitJob_sge { |
702 |
my ($envhash)=@_; |
703 |
# submit and array job, SGE style |
704 |
my $array=''; |
705 |
if ($GRID_NUMPROCS > 1) { |
706 |
$array="-t 1-$GRID_NUMPROCS"; |
707 |
} |
708 |
#append our GRID_ environment |
709 |
my $envparam="-v 'GRID_ENGINE=sge,". |
710 |
"GRID_JOBDIR=$PWD,". |
711 |
"GRID_TASKLAST=$GRID_TASKLAST,". |
712 |
"GRID_MONHOME=$GRID_MONHOME,". |
713 |
"GRID_CMD=$GRID_CMD,". |
714 |
"GRID_DIRPREFIX=$GRID_DIRPREFIX"; |
715 |
$envparam.=",GRID_RESUME=$GRID_RESUME" if $GRID_RESUME; |
716 |
$envparam.=",GRID_PSXFASTA=$GRID_PSXFASTA". |
717 |
",GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA; |
718 |
$envparam.= ",GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP; |
719 |
$envparam.= ",GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL; |
720 |
$envparam.= ",GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR" if $GRID_LOCAL_JOBDIR; |
721 |
if (keys(%$envhash)>0) { |
722 |
$envparam.=','; |
723 |
my @envars; |
724 |
while (my ($env, $val)= each(%$envhash)) { |
725 |
push(@envars, $env.'='.$val); |
726 |
} |
727 |
#$envparam.=" '".join(',',@envars)."'"; |
728 |
$envparam.=join(',',@envars); |
729 |
} |
730 |
$envparam.="'"; |
731 |
#-- |
732 |
|
733 |
my $sub="qsub -cwd -b y $envparam"; |
734 |
#$sub.="-e $errout" if $errout; |
735 |
#$sub.="-o $stdout" if $stdout; |
736 |
my $logdir=''; |
737 |
if ($GRID_LOGDIR) { |
738 |
#separate log dir given |
739 |
$logdir=$GRID_LOGDIR; |
740 |
if (-d $GRID_LOGDIR) { |
741 |
print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n"; |
742 |
} |
743 |
else { |
744 |
mkdir($logdir) || die("Error creating log dir '$logdir'!\n"); |
745 |
} |
746 |
$logdir.='/' unless $logdir=~m/\/$/; |
747 |
} |
748 |
my $otherflags=''; |
749 |
$otherflags.=' -D' if $GRID_DEBUG; |
750 |
#$otherflags.=' -N' if $GRID_NOWRKLINKS; |
751 |
$otherflags.=' -F' if $GRID_USECMDLIST; |
752 |
$otherflags.=' -S' if $SwitchDir; |
753 |
my $subcmd= "$sub $array $PERL_BIN $0 $otherflags -W $GRID_CMD"; |
754 |
print STDERR "$subcmd\n" if $GRID_DEBUG; |
755 |
my $subout = `$subcmd`; |
756 |
#print STDOUT $subout; |
757 |
#my ($jobid)=($subout=~/Your\s+job\S*\s+(\d+)/s); |
758 |
my ($jobid)=($subout=~/Your\s+job\S*\s+(\d+)/s); |
759 |
die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid); |
760 |
|
761 |
setupJobDir($jobid); |
762 |
|
763 |
return $jobid; |
764 |
} |
765 |
|
766 |
|
767 |
sub submitJob_smp { |
768 |
my ($envhash)=@_; |
769 |
my $jobid='smp_'.$$; |
770 |
$ENV{GRID_ENGINE}='smp'; |
771 |
@ENV{'GRID_JOBDIR', 'GRID_TASKLAST', 'GRID_MONHOME', 'GRID_CMD','GRID_DIRPREFIX'}= |
772 |
($PWD."/gridx-$jobid", $GRID_TASKLAST, $GRID_MONHOME, $GRID_CMD,$GRID_DIRPREFIX); |
773 |
$ENV{GRID_RESUME}=$GRID_RESUME; |
774 |
$ENV{GRID_JOB}=$jobid; |
775 |
@ENV{'GRID_PSXFASTA','GRID_PSXSTEP'}=($GRID_PSXFASTA, $GRID_PSXSTEP) if $GRID_PSXFASTA; |
776 |
$ENV{GRID_PSXSKIP}=$GRID_PSXSKIP if $GRID_PSXSKIP; |
777 |
$ENV{GRID_PSXTOTAL}=$GRID_PSXTOTAL if $GRID_PSXTOTAL; |
778 |
|
779 |
$ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR."/gridx-$jobid" if $GRID_LOCAL_JOBDIR; |
780 |
if (keys(%$envhash)>0) { |
781 |
while (my ($env, $val)= each(%$envhash)) { |
782 |
$ENV{$env}=$val; |
783 |
} |
784 |
} |
785 |
# Fork off the children |
786 |
my $pid; |
787 |
setupJobDir($jobid); #we do this one in advance.. |
788 |
$SIG{CHLD}=\&smpTaskReaper; |
789 |
print STDERR "..forking $GRID_NUMPROCS workers..\n" if $GRID_DEBUG; |
790 |
my $logdir=''; |
791 |
if ($GRID_LOGDIR) { |
792 |
#separate log dir given |
793 |
$logdir=$GRID_LOGDIR; |
794 |
if (-d $GRID_LOGDIR) { |
795 |
print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n"; |
796 |
} |
797 |
else { |
798 |
mkdir($logdir) || die("Error creating log dir '$logdir'!\n"); |
799 |
} |
800 |
$logdir.='/' unless $logdir=~m/\/$/; |
801 |
} |
802 |
my $otherflags; |
803 |
$otherflags=' -D' if $GRID_DEBUG; |
804 |
#$otherflags.=' -N' if $GRID_NOWRKLINKS; |
805 |
$otherflags.=' -F' if $GRID_USECMDLIST; |
806 |
$otherflags.=' -S' if $SwitchDir; |
807 |
|
808 |
for (1 .. $GRID_NUMPROCS) { |
809 |
die "Error at fork: $!" unless defined ($pid = fork); |
810 |
if ($pid) { # Parent here |
811 |
$SMPChildren{$pid} = 1; |
812 |
$SMPChildren++; |
813 |
next; |
814 |
} else { #Child here |
815 |
# Child can *not* return from this subroutine. |
816 |
#$SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did before |
817 |
exec("$PERL_BIN $0 $otherflags -W $GRID_CMD"); #never returns |
818 |
} |
819 |
} |
820 |
$SIG{INT}=\&smpTaskKiller; |
821 |
$SIG{TERM}=\&smpTaskKiller; |
822 |
return $jobid; |
823 |
} |
824 |
|
825 |
sub submitJob_condor { |
826 |
my ($envhash)=@_; |
827 |
my $jobid; |
828 |
my $queue='queue'; |
829 |
$queue.=" $GRID_NUMPROCS" if ($GRID_NUMPROCS > 1); |
830 |
#append our GRID_ environment |
831 |
my $dprefix="gridx-$HOST".'_'; |
832 |
my $envparam="GRID_ENGINE=condor;". |
833 |
"GRID_JOBDIR=$PWD/$dprefix\$(Cluster);". |
834 |
"GRID_JOB=$HOST\_\$(Cluster);". |
835 |
"GRID_TASKLAST=$GRID_TASKLAST;". |
836 |
"GRID_MONHOME=$GRID_MONHOME;". |
837 |
"GRID_CMD=$GRID_CMD;". |
838 |
"GRID_DIRPREFIX=$GRID_DIRPREFIX"; |
839 |
$envparam.=";GRID_RESUME=$GRID_RESUME" if $GRID_RESUME; |
840 |
$envparam.=";GRID_PSXFASTA=$GRID_PSXFASTA". |
841 |
";GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA; |
842 |
$envparam.= ";GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP; |
843 |
$envparam.= ";GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL; |
844 |
$envparam.=';BLASTMAT='.$ENV{BLASTMAT} if $ENV{BLASTMAT}; |
845 |
$envparam.= ";GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR/$dprefix\$(Cluster);" if $GRID_LOCAL_JOBDIR; |
846 |
if (keys(%$envhash)>0) { |
847 |
$envparam.=';'; |
848 |
my @envars; |
849 |
while (my ($env, $val)= each(%$envhash)) { |
850 |
push(@envars, $env.'='.$val); |
851 |
} |
852 |
$envparam.=join(';',@envars); |
853 |
} |
854 |
my $logdir=''; |
855 |
if ($GRID_LOGDIR) { |
856 |
#separate log dir given |
857 |
$logdir=$GRID_LOGDIR; |
858 |
if (-d $GRID_LOGDIR) { |
859 |
print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n"; |
860 |
} |
861 |
else { |
862 |
mkdir($logdir) || die("Error creating log dir '$logdir'!\n"); |
863 |
} |
864 |
$logdir.='/' unless $logdir=~m/\/$/; |
865 |
} |
866 |
my $otherflags; |
867 |
$otherflags=' -D' if $GRID_DEBUG; |
868 |
#$otherflags.=' -N' if $GRID_NOWRKLINKS; |
869 |
$otherflags.=' -F' if $GRID_USECMDLIST; |
870 |
$otherflags.=' -S' if $SwitchDir; |
871 |
|
872 |
my $mtime=time(); |
873 |
my $cmdfile="condor.$$.t$mtime.$USER.$HOST.cmd"; |
874 |
local *CMDFILE; |
875 |
open(CMDFILE, '>'.$cmdfile) || die "Cannot create $cmdfile!\n"; |
876 |
my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL" || Arch == "x86_64")'; |
877 |
#my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL")'; |
878 |
my $force_slot; |
879 |
if ($UniqVMreq>0) { |
880 |
$force_slot=$UniqVMreq; |
881 |
} |
882 |
elsif ($UniqueVM) { |
883 |
$force_slot=3+int(rand(9)); #this is messed up, only works on >11 core machines |
884 |
} |
885 |
$requirements .= ' && (VirtualMachineId == '.$force_slot.')' if $force_slot; |
886 |
if (@xmachinelist>0) { |
887 |
#map { $_='Machine != "'.$_.'.'.$DOMAIN.'"' } @xmachinelist; |
888 |
#$requirements.= ' && ('.join(' && ',@xmachinelist).')'; |
889 |
if (@xmachinelist==1 && ($xmachinelist[0]=~tr/|^?*\\//)) { |
890 |
$requirements.=' && (TRUE != regexp("'.$xmachinelist[0].'", Machine, "i"))'; |
891 |
} |
892 |
else { |
893 |
$requirements.=' && (TRUE != regexp("^('.join('|',@xmachinelist). |
894 |
')\..*", Machine, "i"))' |
895 |
} |
896 |
} |
897 |
elsif (@ymachinelist>0) { |
898 |
if (@ymachinelist==1 && ($ymachinelist[0]=~tr/|^?*\\//)>0) { |
899 |
$requirements.=' && regexp("'.$ymachinelist[0].'", Machine, "i")'; |
900 |
} |
901 |
else { |
902 |
#map { $_='Machine == "'.$_.'.'.$DOMAIN.'"' } @ymachinelist; |
903 |
#$requirements.= ' && ('.join(' || ',@ymachinelist).')'; |
904 |
$requirements.=' && regexp("^('.join('|',@ymachinelist). |
905 |
')\..*", Machine, "i")' |
906 |
} |
907 |
} |
908 |
print CMDFILE qq{universe = vanilla |
909 |
requirements = $requirements |
910 |
notification = Never |
911 |
executable = $0 |
912 |
initialdir = $PWD |
913 |
}; |
914 |
if ($logdir) { |
915 |
print CMDFILE "error = ${logdir}log_$dprefix\$(Cluster).\$(Process).stderr\n". |
916 |
"output = ${logdir}log_$dprefix\$(Cluster).\$(Process).stdout\n"; |
917 |
} |
918 |
print CMDFILE "arguments = $otherflags -W $GRID_CMD\n". |
919 |
"environment = $envparam;\n". |
920 |
"$queue\n"; |
921 |
close(CMDFILE); |
922 |
my $subcmd="condor_submit $cmdfile"; |
923 |
my $subout = `$subcmd`; |
924 |
($jobid)=($subout=~/submitted\s+to\s+cluster\s+(\d+)\./s); |
925 |
die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid); |
926 |
$jobid=$HOST.'_'.$jobid; |
927 |
setupJobDir($jobid); |
928 |
#setupJobDir also chdirs() in the $GRID_JOBDIR |
929 |
system("mv ../$cmdfile condor_submit.cmd"); |
930 |
return $jobid; |
931 |
} |
932 |
|
933 |
sub jobDie { |
934 |
my $jobid=shift @_; |
935 |
print STDERR "Error: ".join("\n",@_)."\n"; |
936 |
&$removeJob($jobid); |
937 |
die(); |
938 |
} |
939 |
|
940 |
sub setupJobDir { |
941 |
my ($jobid)=@_; |
942 |
my $jobdir = "gridx-$jobid"; |
943 |
jobDie($jobid, "job directory $jobdir already exists!") |
944 |
if (-d $jobdir); |
945 |
if ($GRID_RESUME) { |
946 |
my $prevjobdir='gridx-'.$GRID_RESUME; |
947 |
print STDERR " ..taking over jobdir: $prevjobdir\n"; |
948 |
unlink("$GRID_MONHOME/$prevjobdir") || warn(" couldn't unlink $GRID_MONHOME/$prevjobdir"); |
949 |
unlink("$prevjobdir/$F_LASTTASK"); |
950 |
unlink("$prevjobdir/$F_ALLDONE"); |
951 |
rename("$prevjobdir/$F_ERRTASKS", "$prevjobdir/prev_$F_ERRTASKS"); |
952 |
unlink("$prevjobdir/$F_RETRYTASKS"); |
953 |
system("/bin/rm -rf $prevjobdir/wrk_*/$F_WRKRUNNING"); |
954 |
system("/bin/rm -rf $prevjobdir/.wrk*"); |
955 |
system("/bin/rm -rf $prevjobdir/locks"); |
956 |
system("/bin/rm -rf $prevjobdir/running"); |
957 |
system("mv $prevjobdir $jobdir") && |
958 |
jobDie($jobid, "cannot 'mv $prevjobdir $jobdir' - $! - Resuming failed!"); |
959 |
} |
960 |
else { |
961 |
mkdir($jobdir) || jobDie($jobid, "cannot create subdirectory $jobdir"); |
962 |
runCmd("mv $TASKDB $jobdir/taskDb", $jobid); |
963 |
runCmd("mv $TASKDB.cidx $jobdir/taskDb.cidx", $jobid); |
964 |
} |
965 |
|
966 |
mkdir("$jobdir/locks") || jobDie($jobid, |
967 |
"cannot create subdirectory $jobdir/locks"); |
968 |
mkdir("$jobdir/running") || jobDie($jobid, |
969 |
"cannot create subdirectory $jobdir/running"); |
970 |
if ($GRID_MONHOME ne $PWD) { |
971 |
symlink("$PWD/$jobdir", "$GRID_MONHOME/$jobdir") |
972 |
|| jobDie($jobid, "cannot symlink $GRID_MONHOME/$jobdir"); |
973 |
} |
974 |
#- CHDIR to the new GRID_JOBDIR |
975 |
chdir($jobdir) || jobDie($jobid, "cannot chdir($jobdir) (from $PWD)!"); |
976 |
readFile($F_TASKSDONE); |
977 |
my $cmdfile="cmdline-$GRID_TASKLAST.cmd"; |
978 |
local *FHND; |
979 |
open(FHND, ">$cmdfile") || jobDie($jobid, "cannot create file $cmdfile\n"); |
980 |
print FHND $CMDLINE."\n"; |
981 |
close(FHND); |
982 |
open(FHND, ">$F_WRKDIR"); |
983 |
print FHND "$PWD/$jobdir\n"; |
984 |
close(FHND); |
985 |
|
986 |
if ($mailnotify) { |
987 |
open(FHND, ">$F_NOTIFY"); |
988 |
print FHND "$mailnotify\n"; |
989 |
close(FHND); |
990 |
} |
991 |
|
992 |
if ($gridEndCmd) { |
993 |
open(FHND, ">$F_ENDCMD") || die("Error creating file $jobdir/.$F_ENDCMD ($!)\n"); |
994 |
print FHND $gridEndCmd."\n"; |
995 |
close(FHND); |
996 |
} |
997 |
$GRID_JOBDIR = "$PWD/$jobdir"; |
998 |
print STDOUT "Job $jobid scheduled to run with GRID_JOBDIR = $PWD/$jobdir\n"; |
999 |
} #setupJobDir |
1000 |
|
1001 |
|
1002 |
sub jobSummary { |
1003 |
my ($mail) = @_; |
1004 |
#assuming we're in GRID_JOBDIR directory |
1005 |
#(either directly or by $GRID_MONHOME)! |
1006 |
die "Error: cannot locate $F_TASKSDONE and $F_WRKDIR in current directory ($ENV{PWD})!\n" |
1007 |
unless -f $F_TASKSDONE && -f $F_WRKDIR; |
1008 |
my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone); |
1009 |
$tasksdone=0 unless $tasksdone; |
1010 |
my $wrkdir=readFile($F_WRKDIR);chomp($wrkdir); |
1011 |
my ($jobid)=($wrkdir=~m/\/gridx\-(\w[\w\-]+)$/); |
1012 |
die("Error: cannot parse jobid from $F_WRKDIR content ($wrkdir)\n") unless $jobid; |
1013 |
my @cmdfile=<cmdline-*.cmd>; |
1014 |
die "Error getting the cmdline-*.cmd from current directory!\n" |
1015 |
unless @cmdfile; |
1016 |
my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/); |
1017 |
die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0; |
1018 |
open(CMDFILE, $cmdfile[0]); |
1019 |
my $cmdline=<CMDFILE>; |
1020 |
chomp($cmdline); |
1021 |
$cmdline=~s/\t/ /g; |
1022 |
close(CMDFILE); |
1023 |
unlink($F_NOTIFY) if -s $F_NOTIFY; |
1024 |
my ($msg, $subj); |
1025 |
my $sig=''; |
1026 |
if ($mail) { |
1027 |
$sig = "\n\n-------------------------\n -= mail sent from $HOST"; |
1028 |
$sig.=" (worker $GRID_WORKER)" if $GRID_WORKER; |
1029 |
$sig.=" \nWorking directory: $wrkdir]\n" if $wrkdir; |
1030 |
$sig.=" \nCommand line: \n $cmdline\n" if $cmdline; |
1031 |
$sig.=" =-\n"; |
1032 |
} |
1033 |
if ($tasksdone!=$numtasks) { |
1034 |
$msg="Summary of gridx job $jobid: $tasksdone tasks done out of $numtasks\n". |
1035 |
"Check $wrkdir for more details.\n"; |
1036 |
$subj="gridx job $jobid (done $tasksdone out of $numtasks)"; |
1037 |
} |
1038 |
else{ |
1039 |
$msg="gridx job $jobid - done all $numtasks tasks\n"; |
1040 |
$subj="gridx job $jobid Done (all $numtasks tasks)"; |
1041 |
} |
1042 |
$msg.=$sig; |
1043 |
send_mail( { to=>$mail, subj=>$subj, body=>$msg }) if $mail; |
1044 |
return $msg; |
1045 |
} |
1046 |
|
1047 |
|
1048 |
|
1049 |
############## WORKER SIDE subroutines ##################### |
1050 |
|
1051 |
#=================== taskDB handling ================= |
1052 |
|
1053 |
=head2 taskDbStat (taskdb, taskId [,tstatus, cpu#, host, retries, exitcode]) |
1054 |
|
1055 |
taskDbStat($taskdb, $taskId, [, $taskstatus, $CPUno, |
1056 |
$host, $retrycount, $exitcode]) |
1057 |
|
1058 |
gets/sets the status of a task in an existing taskdb file |
1059 |
|
1060 |
If only $taskdb and $taskId parameters are given it works |
1061 |
as a getter and returns the whole $taskdb entry, either as a raw |
1062 |
string or, if wantarray(), as: |
1063 |
|
1064 |
($taskstatus, $userdata, $CPUno, $host, $lastexitcode, $startTime, $retrycount) |
1065 |
|
1066 |
..where $startTime is in minutes since the epoch (time/60) |
1067 |
|
1068 |
If more than 2 parameters are given, it is a setter for the task status |
1069 |
and the other task related data. |
1070 |
|
1071 |
Valid status values: |
1072 |
'-' = queued/idle/unprocessed |
1073 |
'r' = running (could be a retry) |
1074 |
'.' = finished successfully |
1075 |
'E' = finished with error ending (after max retries) |
1076 |
|
1077 |
Internal details: |
1078 |
-Each record is assumed locked at the time of writing |
1079 |
(no other processes are trying to update the same task record). |
1080 |
-A taskdb record format is: |
1081 |
|
1082 |
>taskId\t{S|R|xxxx|dddd|mmmmmmmm}\t{hostname}[\t<additional data..>] |
1083 |
|
1084 |
where: |
1085 |
S = running status ('-','r','.' or 'E') |
1086 |
R = retry counter (0-9) |
1087 |
xxxx = last exit code, in hex |
1088 |
dddd = last CPU number, in hex (the wrk_<CPU> subdirectory) |
1089 |
mmmmmmmm = start minute since the epoch (time/60) in hexadecimal |
1090 |
hostname = fixed 25 char length machine name |
1091 |
(blank padded as needed) |
1092 |
|
1093 |
Error protocol for the setter: when given $status is 'E' |
1094 |
the retry counter is incremented and the actual status |
1095 |
would be updated to 'E' if > $maxRetries, or back to '-' otherwise. |
1096 |
|
1097 |
=cut |
1098 |
|
1099 |
sub taskDbStat { |
1100 |
my ($taskdb, $entry, $status, $dirno, $machine, $errcount, $exitcode)=@_; |
1101 |
my $taskdbidx=$taskdb; |
1102 |
$taskdbidx.='.cidx' unless ($taskdb=~s/\.cidx$//); |
1103 |
my $tdberr="taskDbStat(".join(',',@_).") Error:"; |
1104 |
wrkDie("$tdberr db $taskdb (and/or index) not found!". |
1105 |
"(pwd = $ENV{PWD})") unless (-e $taskdb && -e $taskdbidx); |
1106 |
wrkDie("$tdberr no entry given!") unless $entry; |
1107 |
local *TASKDB; |
1108 |
my $dbpos= `cdbyank -a '$entry' -P $taskdbidx`; |
1109 |
chomp($dbpos); |
1110 |
wrkDie("$tdberr at retrieving pos of entry $entry\n") |
1111 |
if ($? || $dbpos!~/^\d+$/); |
1112 |
|
1113 |
my $openmode = O_RDONLY | O_SYNC ; |
1114 |
if ($status) { # setter code: |
1115 |
wrkDie("$tdberr invalid update parameters ($status)") |
1116 |
if (length($status)>1 || ($dirno && ($dirno>65535 || $dirno<1))); |
1117 |
$openmode= O_RDWR | O_SYNC; # | O_DIRECT might be needed ? |
1118 |
} |
1119 |
sysopen(TASKDB, $taskdb, $openmode) || |
1120 |
wrkDie("$tdberr sysopening $taskdb failed!"); |
1121 |
binmode(TASKDB); |
1122 |
unless (sysseek(TASKDB, $dbpos, SEEK_SET)) { |
1123 |
close(TASKDB); |
1124 |
wrkDie("$tdberr at sysseek() to $dbpos for $entry"); |
1125 |
} |
1126 |
#----- read the next line and check the format |
1127 |
my $targetstr='>'.$entry."\t{-|-|----|----|--------}\t{".('-' x 25)."}"; |
1128 |
local $/="\n"; |
1129 |
my $dbline=readline(*TASKDB); |
1130 |
binmode(TASKDB); |
1131 |
my ($pentry, $pstats, $pmachine, $userdata)= |
1132 |
split(/\t/,$dbline,4); |
1133 |
chomp($pmachine);chomp($userdata); |
1134 |
my $tcheck=join("\t",$pentry,$pstats,$pmachine); |
1135 |
if ($pentry ne '>'.$entry || |
1136 |
length($tcheck)!=length($targetstr)) { |
1137 |
close(TASKDB); |
1138 |
wrkDie("$tdberr invalid record format for '$entry' ". |
1139 |
"pos $dbpos, Found:\n'$tcheck'\n ..instead of:\n'$targetstr'\n"); |
1140 |
} |
1141 |
|
1142 |
$pmachine=~tr/{} //d; |
1143 |
$pstats=~tr/{}//d; |
1144 |
my ($pstatus, $pfcount, $pxcode, $pdirno, $ptime)=split(/\|/,$pstats); |
1145 |
$ptime=hex($ptime) || $ptime; |
1146 |
$pfcount=0 unless $pfcount>0; |
1147 |
#---- |
1148 |
if ($status) { #---- setter code: |
1149 |
$status=lc($status); |
1150 |
if ($status!~/^[\-r\.e]$/) { |
1151 |
close(TASKDB); |
1152 |
wrkDie("$tdberr invalid status provided ($status)!"); |
1153 |
} |
1154 |
$dbpos+=length(">$entry\t{"); |
1155 |
sysseek(TASKDB, $dbpos, SEEK_SET); |
1156 |
if (defined($exitcode)) { |
1157 |
$pxcode = $exitcode>0 ? sprintf('%04x', $exitcode) : $pxcode; |
1158 |
} |
1159 |
|
1160 |
$pfcount=$errcount if defined($errcount); |
1161 |
if ($status eq 'r') { #mark this entry as "running" |
1162 |
$ptime=sprintf('%08x',int(time()/60)); |
1163 |
$pxcode='----'; |
1164 |
} |
1165 |
elsif ($status eq '-') { # mark this entry as "available" (idle) |
1166 |
$ptime='--------'; |
1167 |
} |
1168 |
$pmachine=$machine if $machine; |
1169 |
$pmachine=sprintf('%25s',$pmachine); |
1170 |
$dirno=$pdirno unless $dirno>=1; |
1171 |
my $wstr=$status.'|'.int($pfcount). |
1172 |
'|'.$pxcode. |
1173 |
'|'.sprintf('%04x', $dirno). |
1174 |
'|'.$ptime."}\t{".$pmachine.'}'; |
1175 |
if (length($ptime)>8) { |
1176 |
die("Error writing into taskDb record, ptime='$ptime' is too long when writing:\n". |
1177 |
"$wstr\n"); |
1178 |
} |
1179 |
my $wlen=length($wstr); |
1180 |
my $w=syswrite(TASKDB, $wstr, $wlen); |
1181 |
binmode(TASKDB); # 'cause this flushes any pending I/O buffers |
1182 |
if ($w!=$wlen) { |
1183 |
close(TASKDB); |
1184 |
wrkDie("$tdberr failed writing '$wstr' for $entry!"); |
1185 |
} |
1186 |
close(TASKDB); |
1187 |
return 1; |
1188 |
} # setter |
1189 |
else { # getter code |
1190 |
close(TASKDB); |
1191 |
if (wantarray()) { #retrieve the parsed list of values |
1192 |
return ($pstatus, $userdata, hex($pdirno), $pmachine, int($pfcount), |
1193 |
hex($pxcode), $ptime); |
1194 |
} |
1195 |
else { #return the raw taskDb line |
1196 |
return($dbline); |
1197 |
} |
1198 |
} # getter |
1199 |
} |
1200 |
|
1201 |
|
1202 |
sub gridWorkerEnv { |
1203 |
if ($_[0]) { |
1204 |
$GRID_TASK=$_[0]; |
1205 |
$ENV{GRID_TASK}=$_[0]; |
1206 |
} |
1207 |
return if $GRID_ENVSET; |
1208 |
my $gridengine=lc($ENV{GRID_ENGINE}); |
1209 |
( $GRID_JOBDIR, $GRID_TASKLAST, $GRID_RESUME, $GRID_MONHOME, $GRID_LOCAL_JOBDIR, |
1210 |
$GRID_PSXFASTA, $GRID_PSXSTEP, $GRID_PSXSKIP, $GRID_PSXTOTAL, $GRID_DIRPREFIX, )= |
1211 |
@ENV{'GRID_JOBDIR','GRID_TASKLAST','GRID_RESUME','GRID_MONHOME', 'GRID_LOCAL_JOBDIR', |
1212 |
'GRID_PSXFASTA','GRID_PSXSTEP','GRID_PSXSKIP','GRID_PSXTOTAL', 'GRID_DIRPREFIX'}; |
1213 |
if ($gridengine eq 'sge') { |
1214 |
#can't have dynamic environment variables in SGE |
1215 |
$GRID_JOB=$ENV{JOB_ID}; |
1216 |
#only static ones have been prepared (GRID_ENGINE, GRID_TASKLAST) |
1217 |
# with GRID_JOBDIR initially set to the submit working directory |
1218 |
$GRID_JOBDIR.='/gridx-'.$GRID_JOB; |
1219 |
|
1220 |
$GRID_LOCAL_JOBDIR.='/gridx-'.$GRID_JOB if $GRID_LOCAL_JOBDIR; |
1221 |
$GRID_PSXFASTA=$ENV{GRID_PSXFASTA}; |
1222 |
$GRID_PSXSTEP=$ENV{GRID_PSXSTEP}; |
1223 |
#dynamic ones are built now from SGE ones |
1224 |
$ENV{GRID_JOBDIR}=$GRID_JOBDIR; |
1225 |
$ENV{GRID_JOB}=$GRID_JOB; |
1226 |
$ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR; |
1227 |
} |
1228 |
elsif ($gridengine eq 'condor' || $gridengine eq 'smp') { |
1229 |
#condor should have all the environment in order |
1230 |
$GRID_JOB=$ENV{GRID_JOB}; |
1231 |
} |
1232 |
else { |
1233 |
die("Error: Invalid GRID_ENGINE (Is this a valid worker run?)\n"); |
1234 |
} |
1235 |
$GRID_LOCKDIR=$GRID_JOBDIR.'/locks'; |
1236 |
$GRID_ENVSET=1; |
1237 |
} |
1238 |
|
1239 |
sub beginWorker { |
1240 |
gridWorkerEnv(); #setup the worker environment |
1241 |
my $maxretries=5; #just give some time for the submit script to catch up |
1242 |
my $chdir=0; # (only if execution of a submitted task is extraordinarily fast) |
1243 |
my $retries=0; |
1244 |
while (!($chdir=chdir($GRID_JOBDIR))) { |
1245 |
sleep(1); |
1246 |
$retries++; |
1247 |
last if $retries==$maxretries; |
1248 |
} |
1249 |
die("Worker error: cannot chdir to $GRID_JOBDIR") unless $chdir; |
1250 |
# -- we are in GRID_JOBDIR now! |
1251 |
my $errmsg="Worker PID $$ ($GRID_WORKER) aborted on $HOST..\n"; |
1252 |
my $fh=setXLock($F_WRKSTART,120) || die $errmsg; #update the count of started workers |
1253 |
$GRID_WORKER=incFValue($fh, $F_WRKSTART); |
1254 |
endXLock($fh); |
1255 |
print STDERR "D: worker $GRID_WORKER assigned to host $HOST pid $$ at ".getTime()."\n" if $GRID_DEBUG; |
1256 |
$GRID_WRKDIR=sprintf("wrk_%04d",$GRID_WORKER); |
1257 |
$starting_dir=$GRID_JOBDIR; |
1258 |
$starting_dir=~s/[^\/]+\/?$//; |
1259 |
if ($GRID_LOCAL_JOBDIR) { |
1260 |
print STDERR "D: creating local dir on $HOST: $GRID_LOCAL_JOBDIR/$GRID_WRKDIR\n" if $GRID_DEBUG; |
1261 |
mkdir("$GRID_LOCAL_JOBDIR") unless -d $GRID_LOCAL_JOBDIR; |
1262 |
system("/bin/rm -f $GRID_LOCAL_JOBDIR/$GRID_WRKDIR"); |
1263 |
mkdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR") unless -d $GRID_LOCAL_JOBDIR; |
1264 |
wrkDie("Error: couldn't create local worker directory $GRID_LOCAL_JOBDIR/$GRID_WRKDIR on $HOST!\n") |
1265 |
unless (-d "$GRID_LOCAL_JOBDIR/$GRID_WRKDIR"); |
1266 |
} |
1267 |
#-- also updates the "currently running" counter |
1268 |
unless (-d "$GRID_JOBDIR/$GRID_WRKDIR") { #worker directory doesn't exit |
1269 |
unless (mkdir("$GRID_JOBDIR/$GRID_WRKDIR")) { |
1270 |
die "Error at mkdir $GRID_JOBDIR/$GRID_WRKDIR ($!)!\n"; |
1271 |
} |
1272 |
} |
1273 |
else { #existing working directory |
1274 |
my $frunning= "$GRID_WRKDIR/$F_WRKRUNNING"; |
1275 |
if (-f $frunning) { #weird -- should never happen.. |
1276 |
die "Error: another process running in $GRID_WRKDIR?!\n".`cat $frunning`."$errmsg\n"; |
1277 |
} |
1278 |
else { #normal case: no worker-running semaphore there already |
1279 |
local *RSEM; |
1280 |
open(RSEM, '>'.$frunning) || die "Error creating $frunning file! ($!)\n$errmsg"; |
1281 |
print RSEM join(" ",int(time()/60), $HOST, $$)."\n"; |
1282 |
close(RSEM); |
1283 |
} |
1284 |
} |
1285 |
if ($GRID_DIRPREFIX) { |
1286 |
my $gwrkdir="../$GRID_DIRPREFIX".'_'.$GRID_WORKER; |
1287 |
unlink($gwrkdir); |
1288 |
symlink("gridx-$GRID_JOB/$GRID_WRKDIR", $gwrkdir) || |
1289 |
print STDERR "Warning: cannot symlink gridx-$GRID_JOB/$GRID_WRKDIR to $gwrkdir ($!)\n"; |
1290 |
#needed by PSX emulation |
1291 |
} |
1292 |
# we are in GRID_JOBDIR now - descend into wrk_<GRID_WORKER> |
1293 |
$fh=setXLock($F_WRKCOUNT,70,3) || die $errmsg; # update the count of running workers |
1294 |
incFValue($fh, $F_WRKCOUNT); |
1295 |
endXLock($fh); |
1296 |
chdir("$GRID_JOBDIR/$GRID_WRKDIR") || |
1297 |
die "Error at chdir($GRID_JOBDIR/$GRID_WRKDIR) ($!)\n"; |
1298 |
|
1299 |
open(STDERR, ">wrk_err.log"); |
1300 |
print STDERR "worker $GRID_WORKER fully assigned to host $HOST PID $$\n" if $GRID_DEBUG; |
1301 |
open(STDOUT, ">wrk_log.log"); |
1302 |
local *WRKGRAB; |
1303 |
open(WRKGRAB, ">.on_$HOST"); |
1304 |
print WRKGRAB join("\t",$GRID_WRKDIR, $HOST, $$, 'start: '.getTime())."\n"; |
1305 |
close(WRKGRAB); |
1306 |
$ENV{GRID_WORKER}=$GRID_WORKER; |
1307 |
if ($SwitchDir) { |
1308 |
chdir($starting_dir) || die "Error at chdir($starting_dir)!\n"; |
1309 |
} |
1310 |
elsif ($GRID_LOCAL_JOBDIR) { |
1311 |
chdir($GRID_LOCAL_JOBDIR/$GRID_WRKDIR) || die "Error at chdir($starting_dir)!\n"; |
1312 |
} |
1313 |
} |
1314 |
|
1315 |
sub endWorker { |
1316 |
return -1 unless $GRID_WORKER && $GRID_WRKDIR; |
1317 |
#make sure we are back in the wrk_<GRID_WORKER> directory |
1318 |
unless (chdir("$GRID_JOBDIR/$GRID_WRKDIR")) { |
1319 |
die("ERROR: endWorker() could not change to $GRID_JOBDIR/$GRID_WRKDIR\n"); |
1320 |
} |
1321 |
|
1322 |
my $fh=setXLock($F_WRKCOUNT,70, 3) || die "Error updating the number of running workers!\n"; |
1323 |
# my $v=readFile($fh,0,$F_WRKCOUNT);chomp($v); |
1324 |
my $v=incFValue($fh, $F_WRKCOUNT, -1); |
1325 |
unlink($F_WRKRUNNING); #remove the "worker here" semaphore.. |
1326 |
if ($v<0) { |
1327 |
endXLock($fh); |
1328 |
die("Error: invalid number of workers ($v) reported in $GRID_JOBDIR/$F_WRKCOUNT\n") |
1329 |
} |
1330 |
endXLock($fh); |
1331 |
print STDERR join("\t","D. worker $GRID_WRKDIR", $HOST, $$, 'finished at:',getTime())."\n" if $GRID_DEBUG; |
1332 |
|
1333 |
local *WRKGRAB; |
1334 |
sysopen(WRKGRAB, ".on_$HOST", O_RDWR | O_SYNC ); |
1335 |
sysseek(WRKGRAB,-1,SEEK_END); |
1336 |
print WRKGRAB "\tend: ".getTime()."\n"; |
1337 |
close(WRKGRAB); |
1338 |
if ($GRID_LOCAL_JOBDIR) { |
1339 |
my $r=system("cp -pr $GRID_LOCAL_JOBDIR/$GRID_WRKDIR/* $GRID_JOBDIR/$GRID_WRKDIR/"); |
1340 |
if ($r) { |
1341 |
print STDERR "Error at copying $HOST local files back to $GRID_JOBDIR/$GRID_WRKDIR!\n"; |
1342 |
} |
1343 |
system("/bin/rm -rf $GRID_LOCAL_JOBDIR/$GRID_WRKDIR"); |
1344 |
rmdir($GRID_LOCAL_JOBDIR); #it'll fail if there are other subdirs there, but that's OK |
1345 |
} |
1346 |
undef($GRID_WORKER); |
1347 |
undef($GRID_WRKDIR); |
1348 |
return $v; #returns the number of workers left running |
1349 |
} |
1350 |
|
1351 |
sub writeFValue { |
1352 |
my ($fh, $v)=@_; |
1353 |
truncate($fh, 0); |
1354 |
seek($fh,0,SEEK_SET); |
1355 |
$v=int($v); |
1356 |
print $fh $v."\n"; |
1357 |
return $v; |
1358 |
} |
1359 |
|
1360 |
sub writeFList { #with truncate |
1361 |
my ($fh, $listref, $delim)=@_; |
1362 |
$delim='' unless $delim; |
1363 |
seek($fh,0,SEEK_SET); |
1364 |
truncate($fh,0); |
1365 |
print $fh join($delim,@$listref); |
1366 |
#truncate($fh, tell($fh)); |
1367 |
} |
1368 |
# |
1369 |
# incFValue(fhandle) => reads an int value from fhandle |
1370 |
# increments it, writes it back |
1371 |
# and returns it |
1372 |
# |
1373 |
sub incFValue { |
1374 |
my ($fh, $finfo, $incv)=@_; |
1375 |
seek($fh, 0, SEEK_SET); |
1376 |
$incv=1 unless $incv; |
1377 |
my $v=readFile($fh, 0, "incFValue(,,$incv) in $finfo"); |
1378 |
chomp($v); |
1379 |
return writeFValue($fh, int($v)+$incv); |
1380 |
} |
1381 |
|
1382 |
|
1383 |
#******************** worker side: |
1384 |
# getNextTask() -- returns a taskId for the next task to be processed |
1385 |
# or 0 if no more |
1386 |
# -ignore any entries which are already "done" |
1387 |
# -look first into the "retry pool" ($F_RETRYTASKS) to get some tasks from there, |
1388 |
# if any -- and removes that entry |
1389 |
#--------------- resources used: |
1390 |
# update $F_RETRYTASKS |
1391 |
# update $F_LASTTASK |
1392 |
# update ENV{GRID_TASK} and $GRID_TASK |
1393 |
# |
1394 |
sub getNextTask { |
1395 |
#check for any tasks in the "retry" queue |
1396 |
my ($taskID, $retries); |
1397 |
my $sourcemsg; |
1398 |
SKIP_DONE: |
1399 |
$retries=0; |
1400 |
if (-s "$GRID_JOBDIR/$F_RETRYTASKS") { |
1401 |
my $hr=setXLock($F_RETRYTASKS,120,5) |
1402 |
|| wrkDie("Error locking $F_RETRYTASKS ($HOST, $GRID_WORKER)"); |
1403 |
my @errstack=readFile($hr,0,$F_RETRYTASKS); #format: taskid <space> #retries |
1404 |
($taskID, $retries)=split(/\s+/, shift(@errstack)); #fetch last |
1405 |
if ($taskID>0) { #valid taskID to retry |
1406 |
chomp($retries); |
1407 |
writeFList($hr, \@errstack); #write the list back |
1408 |
$sourcemsg=' from the retry pool.'; |
1409 |
} |
1410 |
endXLock($hr); |
1411 |
} |
1412 |
NEXT_TASK: |
1413 |
unless ($taskID) { #getting the next available task |
1414 |
#no retry tasks, so get the next task not processed yet |
1415 |
my $fh=setXLock($F_LASTTASK, 70, 3) || |
1416 |
wrkDie("Error locking $F_LASTTASK ($HOST, $GRID_WORKER)"); #30 /retries |
1417 |
my $last=readFile($fh, 0, $F_LASTTASK);chomp($last); |
1418 |
if ($last<$GRID_TASKLAST) { # valid one, take the next |
1419 |
$taskID=writeFValue($fh,int($last)+1); |
1420 |
} |
1421 |
$sourcemsg=''; |
1422 |
endXLock($fh); |
1423 |
} |
1424 |
return undef unless $taskID; |
1425 |
$GRID_TASK=$taskID; |
1426 |
# lock this taskID |
1427 |
$TASK_LOCKF="$GRID_JOBDIR/running/task_$taskID"; |
1428 |
#$TASK_LOCKF="running/task_$taskID"; |
1429 |
catchSigs(1); #install signal handler |
1430 |
$TASK_LOCKH=setXLock($TASK_LOCKF, 70, 3); |
1431 |
unless ($TASK_LOCKH) { #this SHOULD be available immediately.. |
1432 |
my $lockedby=`cat $TASK_LOCKF`;chomp($lockedby); |
1433 |
print STDERR "WARNING: lock-fail on task $taskID $sourcemsg (previously locked in $TASK_LOCKF by [$lockedby])\n"; |
1434 |
#wrkDie("Error: couldn't get a lock on task $taskID..\n"); |
1435 |
undef($taskID); |
1436 |
goto SKIP_DONE; # don't kill the worker, just move on |
1437 |
} |
1438 |
print $TASK_LOCKH "$HOST $$ ".sprintf("wrk_%04d",$GRID_WORKER)."\n"; |
1439 |
# check the status of this task: |
1440 |
my ($tstatus, $tuserdata, $tdirno, $thost, $terrcount, |
1441 |
$texcode, $tstartmin)=taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID); |
1442 |
print STDERR ">task-$taskID assigned to worker $GRID_WORKER (on $HOST) $sourcemsg\n"; |
1443 |
if ($GRID_RESUME && $tstatus eq '.') { |
1444 |
#skip this one, it's finished (according to taskDb!) |
1445 |
print STDERR ">SKIP-done:$taskID \{$tstatus|$terrcount|$texcode|$tdirno|$tstartmin\}\t\{$thost\}\n"; |
1446 |
undef $taskID; |
1447 |
undef $GRID_TASK; |
1448 |
endXLock($TASK_LOCKH); |
1449 |
catchSigs(0); |
1450 |
unlink($TASK_LOCKF); |
1451 |
undef($TASK_LOCKH);undef($TASK_LOCKF); |
1452 |
goto SKIP_DONE; |
1453 |
} |
1454 |
#update status of this task to 'running' |
1455 |
$TASK_ERRCOUNT=$retries; |
1456 |
taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID, 'r', $GRID_WORKER, $HOST, $TASK_ERRCOUNT); |
1457 |
#-- |
1458 |
$GRID_TASK=$taskID; |
1459 |
$TASK_DATA=$tuserdata; |
1460 |
$ENV{'GRID_TASK'}=$taskID; |
1461 |
$STARTED_GRID_TASK=$taskID; |
1462 |
return $taskID; |
1463 |
} |
1464 |
|
1465 |
############################################## |
1466 |
# runTask($taskID, @cmd) |
1467 |
#------------------------ |
1468 |
# *runs into a ./wrk_NNNN subdirectory of GRID_JOBDIR |
1469 |
# *employs $GRID_TASK, $TASK_DATA, $TASK_ERRCOUNT |
1470 |
# *with GRID_PSXFASTA, it uses TASK_DATA and other GRID_PSX.. |
1471 |
# to prepare the fasta slice and pass it to the cmd |
1472 |
# *on exit, it should: |
1473 |
# - set a lock on the ../running/task_$taskID file |
1474 |
# - IF error exit of cmd (non-zero exit status), use $TASK_ERRCOUNT+1 and $MAX_RETRIES |
1475 |
# to determine if the job should be put in the $F_RETRYTASKS file |
1476 |
# or if status should be set to 'E' in taskDb and the entry added |
1477 |
# to $F_ERRTASKS |
1478 |
# - IF successful exit : increment $F_TASKSDONE |
1479 |
# - update taskDb with the current status |
1480 |
# - remove the lock on ../running/task_$taskID and delete this file! |
1481 |
# |
1482 |
#--------------------------------------------- |
1483 |
sub runTask { |
1484 |
my $taskID=shift(@_); |
1485 |
my @cmd=@_; |
1486 |
my $exitstatus; |
1487 |
my $runcmd; #the actuall command run using system(); |
1488 |
|
1489 |
catchSigs(1); #install signal handler (should have been done already in getNextTask()) |
1490 |
if ($GRID_LOCAL_JOBDIR) { |
1491 |
wrkDie("Fatal: cannot chdir to local dir $GRID_LOCAL_JOBDIR/$GRID_WRKDIR!") |
1492 |
unless chdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR"); |
1493 |
} |
1494 |
|
1495 |
#if ($SwitchDir) { |
1496 |
# we are currently in a wrk_* directory |
1497 |
# chdir('../..'); #change to the original directory (where gridx was launched from) |
1498 |
# } |
1499 |
if ($GRID_PSXFASTA) { #psx emulation |
1500 |
#prepare the fasta slice here |
1501 |
my $fslice=sprintf('%s.slice-%08d',getFName($GRID_PSXFASTA), $GRID_TASK); |
1502 |
#-- write the slicefile |
1503 |
local *FDB; |
1504 |
local *FSL; |
1505 |
open(FSL, '>'.$fslice) |
1506 |
|| wrkDie("Cannot create fasta slice $fslice"); |
1507 |
open(FDB, $GRID_PSXFASTA) || wrkDie("Cannot open fasta db $GRID_PSXFASTA"); |
1508 |
seek(FDB, $TASK_DATA, SEEK_SET); |
1509 |
local $/="\n"; |
1510 |
my $seqnext=$GRID_PSXSTEP*($GRID_TASK-1); #+GRID_PSXSKIP |
1511 |
my $seqcount=0; |
1512 |
my $seqmax=($GRID_PSXTOTAL<=0) ? 0 : $GRID_PSXTOTAL; |
1513 |
while (<FDB>) { |
1514 |
if (/^>/) { #record start |
1515 |
$seqnext++; |
1516 |
last if ($seqcount>=$GRID_PSXSTEP); |
1517 |
last if $seqmax && ($seqnext>$seqmax); |
1518 |
$seqcount++; |
1519 |
} |
1520 |
print FSL $_; |
1521 |
}#while input line from fastadb |
1522 |
close(FSL); |
1523 |
close(FDB); |
1524 |
|
1525 |
$runcmd=shift(@cmd); |
1526 |
$GRID_PSXSKIP=0 unless ($GRID_PSXSKIP); |
1527 |
$GRID_PSXTOTAL=-1 unless ($GRID_PSXTOTAL>0); |
1528 |
my $islast=($GRID_TASK==$GRID_TASKLAST) ? 1 : 0; |
1529 |
# 1 2 3 4 |
1530 |
$runcmd.=' '.join(' ',$fslice, $seqcount, $GRID_TASK, $islast, |
1531 |
# 5 6 7 |
1532 |
$GRID_PSXSKIP, $GRID_PSXTOTAL, @cmd); |
1533 |
|
1534 |
} |
1535 |
elsif ($GRID_USECMDLIST) { |
1536 |
# $TASK_DATA is the actual command to run |
1537 |
$runcmd=$TASK_DATA; |
1538 |
} |
1539 |
else { #normal repeat cmd -- let the cmd use the ENV accordingly |
1540 |
$runcmd=join(" ",@cmd); |
1541 |
} |
1542 |
|
1543 |
#... get $exitstatus for the system() call |
1544 |
print STDERR ">starting-task-$GRID_TASK by worker $GRID_WORKER (on $HOST): '$runcmd'\n" if $GRID_DEBUG; |
1545 |
$exitstatus=system($runcmd); |
1546 |
if ($SwitchDir) { |
1547 |
chdir("$GRID_JOBDIR/$GRID_WRKDIR"); |
1548 |
} |
1549 |
endTask($exitstatus); #taskID is taken from $GRID_TASK |
1550 |
} |
1551 |
|
1552 |
|
1553 |
sub catchSigs { # true/false |
1554 |
if ($_[0]) { |
1555 |
$SIG{INT}=\&sigHandler; |
1556 |
$SIG{TERM}=\&sigHandler; |
1557 |
} |
1558 |
else { |
1559 |
$SIG{INT}='DEFAULT'; |
1560 |
$SIG{TERM}='DEFAULT'; |
1561 |
} |
1562 |
} |
1563 |
|
1564 |
sub sigHandler { |
1565 |
my $signame=shift; |
1566 |
wrkDie("Signal $signame caught for worker $$ on $HOST, aborting..\n"); |
1567 |
} |
1568 |
|
1569 |
sub toRetry { |
1570 |
my ($taskID, $taskErrCount)=@_; |
1571 |
return unless $taskID; |
1572 |
$taskErrCount=1 unless $taskErrCount; |
1573 |
my $hr=setXLock($F_RETRYTASKS, 70, 3); |
1574 |
while (<$hr>) { |
1575 |
chomp; |
1576 |
my ($tid, $tec)=split(/\s+/); |
1577 |
if ($tid==$taskID) { #double retry, don't bother |
1578 |
endXLock($hr); |
1579 |
return; |
1580 |
} |
1581 |
} |
1582 |
fappend($hr, "$taskID\t$taskErrCount\n"); |
1583 |
endXLock($hr); |
1584 |
} |
1585 |
|
1586 |
sub endTask { |
1587 |
return unless $GRID_TASK; |
1588 |
# we MUST be in GRID_WRKDIR |
1589 |
chdir("$GRID_JOBDIR/$GRID_WRKDIR") || die("Error: failed to chdir to $GRID_JOBDIR/$GRID_WRKDIR!"); |
1590 |
my ($exitstatus)=@_; |
1591 |
|
1592 |
if (!$exitstatus && !$STARTED_GRID_TASK) { # could be a premature failure, like cdbyank not found, etc. |
1593 |
print STDERR "scheduling $GRID_TASK for retry..\n"; |
1594 |
toRetry($GRID_TASK, $TASK_ERRCOUNT); |
1595 |
endXLock($TASK_LOCKH) if $TASK_LOCKH; |
1596 |
unlink($TASK_LOCKF); |
1597 |
undef($TASK_LOCKH);undef($TASK_LOCKF); |
1598 |
unlink($F_WRKRUNNING); |
1599 |
catchSigs(0); |
1600 |
undef $GRID_TASK; |
1601 |
return; |
1602 |
} |
1603 |
my $dbstatus; |
1604 |
if (defined($exitstatus)) { |
1605 |
if ($exitstatus==0) { #success |
1606 |
$dbstatus='.'; |
1607 |
# update $F_TASKSDONE |
1608 |
my $fh=setXLock($F_TASKSDONE, 110, 5); |
1609 |
if ($fh) { |
1610 |
incFValue($fh, $F_TASKSDONE); |
1611 |
endXLock($fh); |
1612 |
} |
1613 |
} |
1614 |
else { #error status |
1615 |
$TASK_ERRCOUNT++; |
1616 |
print STDERR "task $GRID_TASK failed on $HOST (PID=$$, status='$exitstatus') (worker $GRID_WORKER); error count=$TASK_ERRCOUNT\n" if $GRID_DEBUG; |
1617 |
if ($TASK_ERRCOUNT>$MAX_RETRIES) { #trash it |
1618 |
my $he=setXLock($F_ERRTASKS, 70, 3); |
1619 |
fappend($he, join("\t",$GRID_TASK,$exitstatus,$HOST,$GRID_WRKDIR)."\n"); |
1620 |
endXLock($he); |
1621 |
$dbstatus='E'; |
1622 |
} |
1623 |
else {#give it another retry chance |
1624 |
toRetry($GRID_TASK, $TASK_ERRCOUNT); |
1625 |
$dbstatus='-'; |
1626 |
} |
1627 |
} |
1628 |
endXLock($TASK_LOCKH) if $TASK_LOCKH; |
1629 |
unlink($TASK_LOCKF) if $TASK_LOCKF; |
1630 |
undef($TASK_LOCKH);undef($TASK_LOCKF); |
1631 |
print STDERR "]task $GRID_TASK ended - updating taskDb with status code '$dbstatus'\n" if $GRID_DEBUG; |
1632 |
taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $GRID_TASK, $dbstatus, |
1633 |
$GRID_WORKER, $HOST, $TASK_ERRCOUNT, $exitstatus); |
1634 |
} # defined $exitstatus |
1635 |
else { #no exitstatus given, it was an interrupt signal or otherwise failed task |
1636 |
print STDERR "task $GRID_TASK ended with undefined exit status\n" if $GRID_DEBUG; |
1637 |
toRetry($GRID_TASK, $TASK_ERRCOUNT); |
1638 |
endXLock($TASK_LOCKH) if $TASK_LOCKH; |
1639 |
unlink($TASK_LOCKF) if $TASK_LOCKF; |
1640 |
undef($TASK_LOCKH);undef($TASK_LOCKF); |
1641 |
unlink($F_WRKRUNNING); |
1642 |
} |
1643 |
catchSigs(0); |
1644 |
undef($STARTED_GRID_TASK); |
1645 |
} |
1646 |
|
1647 |
sub wrkDie { |
1648 |
my ($msg)=@_; |
1649 |
endTask(); |
1650 |
my $grdwrk=$GRID_WORKER; |
1651 |
endWorker(); |
1652 |
#remove any other locks left in this worker.. |
1653 |
die("Error at worker $grdwrk on $HOST:\n$msg\n"); |
1654 |
} |
1655 |
|
1656 |
#--onExit, onEnd trigger |
1657 |
sub END { |
1658 |
unless ($NORMAL_ENDING) { |
1659 |
$NORMAL_ENDING=1; #to avoid recursion? |
1660 |
endTask(); |
1661 |
endWorker(); |
1662 |
#remove all locks |
1663 |
my @locks=keys(%Locks); |
1664 |
foreach my $lock (@locks) { |
1665 |
my $d=$Locks{$lock}; |
1666 |
endXLock($lock); |
1667 |
} |
1668 |
} |
1669 |
} |
1670 |
|
1671 |
sub fappend { |
1672 |
my $fh=shift(@_); |
1673 |
seek($fh,0,SEEK_END); |
1674 |
print $fh join("\n",@_); |
1675 |
} |
1676 |
|
1677 |
sub getLockDir { |
1678 |
my $fname=shift; |
1679 |
my $basename=basename($fname); |
1680 |
my $dirname=dirname($fname); |
1681 |
my $lockdir="$basename"; |
1682 |
$lockdir=$dirname.'/'.$lockdir if $dirname; |
1683 |
return ($lockdir, "by.$HOST.$$"); |
1684 |
} |
1685 |
|
1686 |
sub makeLockFile { |
1687 |
#this is called by a node just before a file lock is requested |
1688 |
my ($fname)=@_; #MUST be a path relative to GRID_JOBDIR |
1689 |
if (index($fname,$GRID_JOBDIR.'/')==0) { |
1690 |
#remove the full path, if there |
1691 |
$fname=substr($fname, length($GRID_JOBDIR)+1); |
1692 |
} |
1693 |
my $fnlock=$fname; |
1694 |
$fnlock=~tr/\/\\/--/s; |
1695 |
my $nodelockf="$GRID_LOCKDIR/$fnlock-lock.by.$HOST.$$"; |
1696 |
#-- |
1697 |
$nodelockf.='-'.substr(time(),-4); |
1698 |
#-- |
1699 |
my $fh; |
1700 |
open($fh, '>'.$nodelockf) || die("Error creating lockfile $nodelockf!\n"); |
1701 |
my $thismin=int(time()/60); # time when the lock was first initiated |
1702 |
print $fh "$HOST $$ $thismin $GRID_WORKER\n"; |
1703 |
close($fh); |
1704 |
return ($GRID_LOCKDIR.'/'.$fnlock, $nodelockf); |
1705 |
} |
1706 |
|
1707 |
sub getFileLock { |
1708 |
#attempts to make $lockfile a hard link to $nodefile which is host/process dependent |
1709 |
my ($lockfile, $nodefile)=@_; |
1710 |
if (link($nodefile, $lockfile)) { #could create link to node lock file |
1711 |
my @stat=stat($nodefile); |
1712 |
my $linkcount=$stat[3]; |
1713 |
if ($linkcount>2) { |
1714 |
print STDERR "WARNING: weird lnkcount=$linkcount ($GRID_WORKER, $HOST, task $GRID_TASK)!\n"; |
1715 |
} |
1716 |
return ($linkcount>1); #should never be more than 2.. |
1717 |
} |
1718 |
else { #cannot create link |
1719 |
return undef; |
1720 |
} |
1721 |
} |
1722 |
|
1723 |
#attempts to aquire an exclusive lock on a specific file |
1724 |
#-returns a file handle ref |
1725 |
sub setXLock { |
1726 |
my ($fname, $maxretries, $stalemin)=@_; |
1727 |
$maxretries=80 unless $maxretries; |
1728 |
# |
1729 |
# -- default: locks older than this are |
1730 |
# considered "stale" and removed! |
1731 |
$stalemin=7200 unless $stalemin; |
1732 |
my ($lockfile, $nodefile)=makeLockFile($fname); |
1733 |
my $retries=0; |
1734 |
my $startmin=int(time()/60); #the minute we started trying |
1735 |
#----------- try mkdir |
1736 |
my ($currentLocker, $lockage); |
1737 |
my $lock_age; |
1738 |
my $prevlock; |
1739 |
my $haveLock; |
1740 |
|
1741 |
while (!($haveLock=getFileLock($lockfile, $nodefile))) { |
1742 |
if ($retries>$maxretries) { |
1743 |
#tried too many times? |
1744 |
#check for stale lock forgotten here.. |
1745 |
if (-f $lockfile) { |
1746 |
#anyone ELSE holding it? |
1747 |
my $l=readFile($lockfile); |
1748 |
my ($lhost, $lpid, $ltime, $worker)=split(/\s+/,$l); |
1749 |
$prevlock="$lhost (pid $lpid, worker $worker)" if $lpid; |
1750 |
# code to check for a stale lock: |
1751 |
$lock_age=int(time()/60)-$ltime; |
1752 |
if ($lock_age>$stalemin) { #previous lock is older than $stalemin minutes |
1753 |
print STDERR "WARNING: removing stale $fname lock from $lhost PID $lpid (worker $worker), age $lockage minutes.\n"; |
1754 |
unlink($lockfile); |
1755 |
next; # hopefully we'll get it next time, unless another node steals it.. |
1756 |
} |
1757 |
} |
1758 |
last; #too many retries; |
1759 |
} |
1760 |
$retries++; |
1761 |
sleep(3); #pause 3 seconds between attempts |
1762 |
} #----- while failing at getting a lock |
1763 |
if ($haveLock) { |
1764 |
local *FH; |
1765 |
open(FH, ">$nodefile") |
1766 |
|| die "Error re-creating host lock file $nodefile ?! ($!)\n"; #should never happen, really |
1767 |
my $thismin= int(time()/60); |
1768 |
print FH "$HOST $$ $thismin $GRID_WORKER\n"; |
1769 |
#print FH "$HOST $$ $thismin $GRID_WORKER ".getTime()."\n"; |
1770 |
close(FH); |
1771 |
# -- now it's safe to open the actual file being locked.. |
1772 |
# has to be a path relative to $GRID_JOBDIR |
1773 |
$fname="$GRID_JOBDIR/$fname" unless $fname=~m/^[\/\\]/; |
1774 |
# -- |
1775 |
# -- add O_DIRECT only if you notice syncing/flushing issues for small files |
1776 |
# -- |
1777 |
#my $mode= O_RDWR | O_CREAT | O_SYNC | O_DIRECT; |
1778 |
#my $mode=(-f $fname) ? '+<' : '+>'; #create file if not there.. |
1779 |
my $fh; |
1780 |
#open($fh, $mode.$fname) || die "setXLock($fname) failed at open($mode.$fname): $!\n"; |
1781 |
sysopen($fh, $fname, $sysopen_mode) || die "setXLock($fname) failed at open ($fname): ($!)\n"; |
1782 |
$Locks{$fh}=[$lockfile, $nodefile]; |
1783 |
return $fh; |
1784 |
} |
1785 |
else { |
1786 |
print STDERR "ERROR getting a lock on $fname ($lockfile -> $nodefile) after $retries attempts!\n"; |
1787 |
print STDERR " (blocked by: $prevlock of $lock_age minutes)\n" if $prevlock; |
1788 |
unlink($nodefile); |
1789 |
return undef; |
1790 |
} |
1791 |
} |
1792 |
|
1793 |
sub endXLock { |
1794 |
my $fh=shift(@_); |
1795 |
close($fh); |
1796 |
my $d = delete($Locks{$fh}); |
1797 |
unless ($d) { |
1798 |
print STDERR "WARNING at endXLock(): no Locks entry found for file handle $fh!\n"; |
1799 |
return; |
1800 |
} |
1801 |
my ($locklink, $nodefile)=@$d; |
1802 |
unlink($locklink); |
1803 |
#-- keep these around for debugging purposes: |
1804 |
unlink($nodefile); |
1805 |
} |
1806 |
|
1807 |
# readFile(fname/fglobref) : read first or all lines from a given file |
1808 |
# or filehandle glob reference |
1809 |
sub readFile { |
1810 |
my ($f, $nonfatal, $context)=@_; |
1811 |
$context=" ($context)" if $context; |
1812 |
my ($fh, $open); |
1813 |
if (ref($f) eq 'GLOB') { # file handle or glob reference.. |
1814 |
$fh=$f; |
1815 |
$f='[fh]'; |
1816 |
} |
1817 |
else { #scalar: string = filename |
1818 |
#create if not there! |
1819 |
my $mode=(-f $f) ? '+<' : '+>'; #create if not exists |
1820 |
my $canopen=open($fh, $mode.$f); |
1821 |
unless ($canopen) { |
1822 |
return undef if $nonfatal; |
1823 |
die "readFile($mode $f)$context task $GRID_TASK on $HOST, open error: $!\n"; |
1824 |
} |
1825 |
$open=1; |
1826 |
} |
1827 |
local $/="\n"; |
1828 |
if (wantarray()) { |
1829 |
my @r=<$fh>; |
1830 |
close($fh) if $open; |
1831 |
return @r; |
1832 |
} |
1833 |
else { #first line only |
1834 |
my $line=<$fh> || ''; |
1835 |
close($fh) if $open; |
1836 |
return $line; |
1837 |
} |
1838 |
} |
1839 |
|
1840 |
#=================================================== |
1841 |
# Mailer subroutine |
1842 |
#--- only works for some Linux installations.. as it requires sendmail |
1843 |
sub send_mail { |
1844 |
my $hash=shift; |
1845 |
$hash->{'from'}=$USER.'@'.$HOSTNAME |
1846 |
unless defined($hash->{'from'}); |
1847 |
my $to=$hash->{'to'}; |
1848 |
unless ($to) { |
1849 |
$hash->{'to'}=$USER.'@'.$DOMAIN; |
1850 |
} |
1851 |
else { |
1852 |
$hash->{'to'}=$to.'@'.$DOMAIN unless $to =~ m/@/; |
1853 |
} |
1854 |
|
1855 |
my $file; |
1856 |
local *ADDFILE; |
1857 |
if (defined($hash->{file})) { |
1858 |
#warning: assumes it's a decent, short text file! |
1859 |
local $/=undef; #read whole file |
1860 |
open(ADDFILE, '<'.$hash->{file}) || return "Cannot open file ".$hash->{file}."\n"; |
1861 |
$file=<ADDFILE>; |
1862 |
close ADDFILE; |
1863 |
} |
1864 |
my $body = $hash->{'body'}; |
1865 |
$body.=$file; |
1866 |
#my $fh; |
1867 |
local* FH; |
1868 |
open(FH, '| /usr/lib/sendmail -t -oi') || die "Error: cannot open sendmail pipe!\n"; |
1869 |
print(FH "To: $hash->{to}\n"); |
1870 |
print(FH "From: $hash->{from}\n"); |
1871 |
print(FH "Subject: $hash->{subj}\n\n"); |
1872 |
print(FH $body); |
1873 |
close(FH); |
1874 |
} |
1875 |
|
1876 |
sub getTime { |
1877 |
my $date=localtime(); |
1878 |
#get rid of the day so Sybase will accept it |
1879 |
(my $wday,$date)=split(/\s+/,$date,2); |
1880 |
return $date; |
1881 |
} |