|
-*- mode: tcl; tcl-indent-level: 3; indent-tabs-mode: nil; -*-
diskcache.tcl Version 1.0
Maintain the array ::disk::cache.
disk::cache(dirs) is a list of directories and the gps
timestamps of the frames in those directories with hash
information for rapid location of input frame data and
identification of dropouts.
the disk cache API provides the frame API with filenames
and metadata which are used to extract necessary data
products for use by other API's.
package provide diskcache 1.0
load /ldas/lib/diskcacheAPI/libdiskcacheAPI.so
namespace eval disk {}
namespace eval diskcache {
namespace eval local {}
}
proc diskcache::init { args } {
if { [ catch {
set ::number_of_running_threads 0
set ::last_reorder_dir [ list ]
set ::cache::nextupdateindex 0
set ::get_dir_cache_last_called 0
set ::cache::queuestart [ clock clicks -milliseconds ]
set ::cache::writeDirCacheLock 0
;## the diskcacheAPI does not use a data socket.
catch { closeDataSock }
;## this regexp will return 6 elements when it
;## matches:
;## given:
;## G-R-6666666666-1.gwf
;## returns:
;## G-R-6666666666-1.gwf G R 6666666666 1 gwf
;##
set ::official_fn_rx {^([^-]+)-([^-]+)-(\d{9,10})-(\d+)\.(gwf)$}
;## resource variable which causes the complex filename
;## list for gappy concatenation requests to be dumped
;## in the log with a purple ball.
if { ! [ info exists ::DEBUG_CONCAT_RETURN_LIST ] } {
set ::DEBUG_CONCAT_RETURN_LIST 0
}
if { ! [ info exists ::DEBUG_SCAN_RATE ] } {
set ::DEBUG_SCAN_RATE 0
}
;## Mount point entries in this list will not generate email
;## as their subdirectories update. Purple log entries will
;## be made instead.
if { ! [ info exists ::IGNORE_REMOVED_DIRS_UNDER_MTPT ] } {
set ::IGNORE_REMOVED_DIRS_UNDER_MTPT [ list ]
}
;## how long to wait, in seconds, between repeated
;## error reports on a given directory
if { ! [ info exists ::ERROR_REPORT_INTERVAL ] } {
set ::ERROR_REPORT_INTERVAL 3600
}
if { ! [ info exists ::DEBUG_CACHE_SYNCHRONIZE ] } {
set ::DEBUG_CACHE_SYNCHRONIZE 0
}
if { ! [ info exists ::SCANNED_FILENAME_EXTENSIONS ] } {
set ::SCANNED_FILENAME_EXTENSIONS .gwf
}
if { ! [ info exists ::EXCLUDE_THESE_DIRS_FROM_UPDATES ] } {
set ::EXCLUDE_THESE_DIRS_FROM_UPDATES [ list ]
}
if { ! [ info exists ::DISKCACHE_HASHFILE_NAME_BINARY ] || \
! [ string length [ string trim $::DISKCACHE_HASHFILE_NAME_BINARY ] ] } {
set ::DISKCACHE_HASHFILE_NAME_BINARY .frame.cache
}
if { ! [ info exists ::DISKCACHE_HASHFILE_NAME_ASCII ] || \
! [ string length [ string trim $::DISKCACHE_HASHFILE_NAME_ASCII ] ] } {
set ::DISKCACHE_HASHFILE_NAME_ASCII frame_cache_dump
}
if { ! [ info exist ::MOUNT_PT_LOOP_INTERVAL_MS ] || \
$::MOUNT_PT_LOOP_INTERVAL_MS < 500 } {
set ::MOUNT_PT_LOOP_INTERVAL_MS 500
}
if { ! [ info exist ::CACHE_WRITE_DELAY_SECS ] } {
set ::CACHE_WRITE_DELAY_SECS 60
}
trace add variable ::CACHE_WRITE_DELAY_SECS { write } "cache::updateWriteDelayLoop"
;## enable bgLoop again after 1 hour if nothing happens
if { ! [ info exist ::CACHE_WRITE_RESUME_SECS ] } {
set ::CACHE_WRITE_RESUME_SECS 3600
}
;## warn about directories that take a really long time
;## to update.
if { ! [ info exists ::DIR_SLOW_UPDATE_WARNING_THRESHHOLD ] } {
set ::DIR_SLOW_UPDATE_WARNING_THRESHHOLD 60
}
;## how long to wait for running threads to complete before
;## giving up on an attempt to rebuild the cache from scratch.
if { ! [ info exists ::WAIT_N_SECONDS_FOR_THREADS_TO_COMPLETE ] } {
set ::WAIT_N_SECONDS_FOR_THREADS_TO_COMPLETE 10
}
;## how many directories should be updated simultaneously?
;## there may be a practical limit of about one thread per
;## available cpu when directories are actually changing.
if { ! [ info exists ::NUMBER_OF_RUNNING_THREADS_PERMITTED ] } {
set ::NUMBER_OF_RUNNING_THREADS_PERMITTED 4
}
;## Should the mount point conflicts be checked for?
if { ! [ info exists ::CHECK_FOR_MOUNT_PT_CONFLICT ] } {
set ::CHECK_FOR_MOUNT_PT_CONFLICT 1
}
;## enable/disable thread diagnostic info
if { [ regexp {8.4} $::tcl_version ] } {
trace add variable ::DEBUG_THREADS { write } "cache::debugThreads"
} else {
trace variable ::DEBUG_THREADS w "cache::debugThreads"
}
cache::debugThreads
;## Make sure the TCL variable and the C++ variable stay synced
bindToCheckForMountPTConflictVariable ::CHECK_FOR_MOUNT_PT_CONFLICT
updateFileExtList \
[ cache::extensionListMangler $::SCANNED_FILENAME_EXTENSIONS ]
cache::hashFile readInitial
cache::afterBootlock
## -------------------------------------------------------------
## Getting TCL values to the C++ layer
## -------------------------------------------------------------
if { ! [ info exists ::STAT_TIMEOUT ] } {
set ::STAT_TIMEOUT 5
}
if { [ regexp {8.4} $::tcl_version ] } {
trace add variable ::STAT_TIMEOUT { write } stat::resetStatTimeout
trace add variable ::STAT_TIMEOUT { unset } stat::resetStatTimeout
} else {
trace variable ::STAT_TIMEOUT wu stat::resetStatTimeout
}
stat::resetStatTimeout
local::updateRWLockInterval
local::updateRWLockTimeout
## -------------------------------------------------------------
## Starting of background loops
## -------------------------------------------------------------
bgLoop slowupdatealarm cache::noUpdateAlarm \
$::DIR_SLOW_UPDATE_WARNING_THRESHHOLD
bgLoop stattimeout ::stat::updateStatTimeout 300
bgLoop rwlockinterval ::diskcache::local::updateRWLockInterval 300
bgLoop rwlocktimeout ::diskcache::local::updateRWLockTimeout 300
} err ] } {
return -code error "[ myName ]: $err"
}
}
proc ${API}::atExit {} {
if { [ catch {
writeDirCacheFiles \
$::DISKCACHE_HASHFILE_NAME_BINARY \
$::DISKCACHE_HASHFILE_NAME_ASCII
} err ] } {
addLogEntry $err 2
}
}
proc cache::pushHashfileName { args } {
if { [ catch {
set fname $::DISKCACHE_HASHFILE_NAME_BINARY
set sid [ sock::open manager emergency ]
fconfigure $sid -blocking off
puts $sid "$::MGRKEY NULL NULL set ::DISKCACHE_HASHFILE_NAME_BINARY $fname"
::close $sid
set sid [ sock::open manager emergency ]
fconfigure $sid -blocking off
puts $sid "$::MGRKEY NULL NULL set ::DISKCACHE_HASHFILE_NAME_ASCII $::DISKCACHE_HASHFILE_NAME_ASCII"
::close $sid
} err ] } {
catch { ::close $sid }
addLogEntry "[ myName ]: $err" email
}
}
proc cache::getOptArray { jobid } {
set args [ list ]
set errs [ list ]
set opts [ list ]
if { [ catch {
;## automagically get all arguments which are
;## relevant to this API
foreach { name rx default } $::cache::options {
set name [ string trim $name "=" ]
set name [ string tolower $name ]
if { [ info exists ::${jobid}($name) ] } {
set arg [ set ::${jobid}($name) ]
if { [ regexp -- $rx $arg ] } {
lappend args $name $arg
} else {
lappend errs "$name '$arg'"
}
}
lappend opts $name $default
}
if { [ llength $errs ] } {
return -code error "malformed arguments: $errs"
}
foreach { opt val } [ expandOpts ] {
set opt [ string trim $opt - ]
lappend arglist $opt $val
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $arglist
}
proc cache::runJob { jobid } {
if { [ catch {
regexp {\d+} $jobid job
set jobid ${::RUNCODE}$job
array set opt [ cache::getOptArray $jobid ]
set concat $opt(concatenate)
set ag $opt(allowgaps)
set framequery $opt(framequery)
if { [ info exists opt(exception) ] } {
set exception $opt(exception)
}
if { $concat != -1 } {
cache::concatGetFilenames $jobid $framequery $ag
} elseif { [ info exists exception ] } {
cache::getFilenames $jobid $framequery $exception
} else {
cache::getFilenames $jobid $framequery
}
cache::pushResults $jobid
cache::cleanup $jobid
} err ] } {
cache::cleanup $jobid
return -code error "[ myName ]: $err"
}
}
proc cache::getFilenames { jobid framequery { exception 0 } } {
if { [ catch {
regexp {\d+} $jobid job
set jobid ${::RUNCODE}$job
set ::cache::state($job,framequery) $framequery
set ::cache::state($job,gapflag) 0
set ::cache::state($job,concat) 0
if { ! [ info exists ::cache::state($job,atoms) ] } {
set atoms [ cache::parseQuery $jobid $framequery ]
foreach [ list ifos types frames times channels ] [ join $atoms ] {
set ::${jobid}(-channels) $channels
if { [ llength $frames ] && \
[ string length [ lindex $frames 0 ] ] } {
lappend files \
[ cache::find $jobid {} {} $frames $times ]
continue
}
foreach type $types {
foreach ifo $ifos {
lappend files \
[ cache::find $jobid $ifo $type {} $times $exception ]
set files \
[ cache::collapseRetval $jobid $ifos $files ]
}
}
}
set ::cache::state($job,filenames) [ list $files ]
set ::cache::state($job,atoms) $atoms
} else {
set atoms [ set ::cache::state($job,atoms) ]
set files [ set ::cache::state($job,filenames) ]
}
if { $::DEBUG_CONCAT_RETURN_LIST } {
addLogEntry $files purple
}
} err ] } {
return -code error "[ myName ]: $err"
}
return [ list $atoms $files ]
}
proc cache::collapseRetval { jobid ifos retval } {
if { [ catch {
set collapsed 0
;## if more than one ifo is being iterated over,
;## then some rearrangement of the frame file sets
;## may be required.
if { [ llength $ifos ] > 1 } {
;## if more than one set of frames was actually
;## retrieved, collect the frames into a single
;## group, since the time range will be identical,
;## and the frame API now handles mixed groups
;## of frames properly.
if { [ llength $retval ] > 1 } {
set last [ lindex $retval end ]
set nexttolast [ lindex $retval end-1 ]
if { [ string equal $last $nexttolast ] } {
set retval [ lrange $retval 0 end-1 ]
set collapsed 1
}
;## if no collapsing was done above, there still
;## may be some repackaging of the frame filename
;## list required.
if { $collapsed == 0 } {
set time1 [ lindex $last 0 ]
set time2 [ lindex $nexttolast 0 ]
if { [ string equal $time1 $time2 ] } {
set files1 [ lindex $last 1 ]
set files2 [ lindex $nexttolast 1 ]
set flag [ lindex $last 2 ]
set retval \
[ list $time1 [ concat $files1 $files2 ] $flag ]
}
}
}
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $retval
}
proc cache::concatGetFilenames { jobid framequery global_gaps } {
if { [ catch {
regexp {\d+} $jobid job
set jobid ${::RUNCODE}$job
set ::cache::state($job,framequery) $framequery
set ::cache::state($job,gapflag) $global_gaps
set ::cache::state($job,concat) 1
if { ! [ info exists ::cache::state($job,atoms) ] } {
set atoms [ cache::parseQuery $jobid $framequery ]
set ::cache::state($job,atoms) $atoms
set retval [ list ]
foreach { ifos types frames times channels } [ join $atoms ] {
if { [ regsub :allow_gaps $times {} _times ] || \
$global_gaps } {
set allow_gaps 1
} else {
set allow_gaps 0
}
if { [ llength $frames ] && \
[ string length [ lindex $frames 0 ] ] } {
set files \
[ cache::find $jobid {} {} $frames $_times ]
if { ! [ regexp {\d{9,10}} $_times ] } {
set times {}
}
lappend retval [ list $_times $files $allow_gaps ]
continue
}
foreach type $types {
foreach ifo $ifos {
foreach timerange $times {
lappend retval [ cache::concatCollect \
$jobid $timerange $global_gaps $ifo $type ]
set retval \
[ cache::collapseRetval $jobid $ifos $retval ]
}
}
}
}
set retval [ list $retval ]
set ::cache::state($job,filenames) $retval
} else {
set retval $::cache::state($job,filenames)
}
if { $::DEBUG_CONCAT_RETURN_LIST } {
addLogEntry $retval purple
}
if { ! [ string length $retval ] } {
return -code error "No files matched framequery: '$framequery'"
}
} err ] } {
return -code error "[ myName ]: $err"
}
if { ! [ string length $retval ] } {
set retval [ list {} {} {} ]
}
return $retval
}
proc cache::concatCollect { jobid times global_gaps ifo type } {
if { [ catch {
set retval [ list ]
foreach trange $times {
if { $global_gaps || \
[ regsub :allow_gaps $trange {} trange ] } {
set allow_gaps 1
set exception 0
} else {
set allow_gaps 0
set exception 1
}
lappend filenames \
[ cache::findByTime $jobid $trange $ifo $type $exception ]
}
;## determine the total time range among all specified
;## ranges and or times
set ttrange [ split $trange {- } ]
set ttrange [ lindex $ttrange 0 ]-[ lindex $ttrange end ]
set filenames [ join $filenames ]
;## contruct a list of three elements for the
;## current time range
if { [ string length $filenames ] } {
lappend retval $ttrange $filenames $allow_gaps
}
if { ! [ info exists allow_gaps ] } {
set msg "unknown error while parsing -framequery option"
return -code error $msg
}
if { ! [ string length $retval ] && ! $allow_gaps } {
set msg "no files matched, and allow_gaps is not set. args: "
append msg "(times: '$times' ifo: '$ifo' type: '$type')"
return -code error $msg
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $retval
}
supports -framequery options in several formats:Comments:
-framequery { F {H L} {} 600000000 Adc(0) } -framequery { { F H {} 600000000 Adc(0) } { R L {} 600000000 Adc(100) } }
proc cache::parseQuery { jobid args } {
if { [ llength $args ] == 1 } {
;## Determine whether this is a simple query or a complex query
set qlist [ lindex $args 0 ]
if { [ llength $qlist ] == 5 && \
[ llength [ lindex $qlist 0 ] ] != 5 } {
;## This is a simple query
} else {
;## This is a complex query, so modify args to be a list of
;## the component simple-queries
set args $qlist
}
}
set retval [ list ]
if { [ catch {
foreach arg $args {
if { ! [ string length $arg ] } { continue }
foreach [ list ifos types frames times channels ] \
[ cache::analyzeQueryElement $jobid $arg ] { break }
cache::validateFrameQuery $frames $times $channels
lappend retval [ list $ifos $types $frames $times $channels ]
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $retval
}
proc cache::validateFrameQuery { frames times channels } {
if { [ catch {
set time_rx {^(\d{9,10})([\,\-])?(\d+)?}
set frame_rx {\d{9,10}\-\d+\.gwf$}
set chan_rx {^[a-zA-Z]+\([^\)]*\)$}
set frame [ string trim [ lindex $frames 0 ] ]
set time [ string trim [ lindex $times 0 ] ]
set channel [ string trim [ lindex $channels 0 ] ]
set errors [ list ]
if { [ info exists ::DEBUG_VALIDATE_QUERY ] && \
[ string equal 1 $::DEBUG_VALIDATE_QUERY ] } {
addLogEntry "frame: '$frame' time: '$time' channel: '$channel'" purple
}
;## timerange tests
if { [ regexp $time_rx $time -> begin token next ] } {
if { [ string length $token ] } {
if { ! [ string length $next ] } {
set errmsg "timerange spec in -framequery option "
append errmsg "seems to be invalid: '$times'"
} elseif { [ string length $next ] < 9 } {
set errmsg "timerange spec in -framequery option "
append errmsg "includes impossibly small value: '$next'"
} elseif { [ string length $next ] > 10 } {
set errmsg "timerange spec in -framequery option "
append errmsg "includes impossibly large value: '$next'"
}
if { [ info exists errmsg ] } {
lappend errors $errmsg
}
}
} elseif { [ string length $time ] } {
set errmsg "timerange spec in -framequery option "
append errmsg "appears to be invalid: '$times'"
lappend errors $errmsg
}
;## frame name test
if { [ string length $frame ] } {
if { ! [ regexp $frame_rx $frame ] } {
set errmsg "frames spec in -framequery option "
append errmsg "appears to be invalid: '$frames'."
lappend errors $errmsg
}
}
;## channel spec test
if { ! [ regexp $chan_rx $channel ] } {
set errmsg "channel spec in -framequery option "
append errmsg "appears to be invalid: '$channels'."
if { [ regexp -nocase {^[a-z]+\{} $channel ] } {
append errmsg " it looks like you are using braces "
append errmsg "where parentheses are required."
}
lappend errors $errmsg
}
if { [ llength $errors ] } {
return -code error $errors
}
} err ] } {
return -code error "[ myName ]: $err"
}
}
proc cache::analyzeQueryElement { jobid args } {
if { [ llength $args ] == 1 } {
set args [ lindex $args 0 ]
}
set frames_tmp [ list ]
if { [ catch {
set types [ list ]
set frames [ list ]
set channels [ list ]
set interferometers [ list ]
set times [ list ]
foreach { typs ifos frames tims ques } $args { break }
if { ! [ string length $typs ] } { set typs R }
;## ifo spec overrides channel names now...
if { ! [ string length [ lindex $ifos 0 ] ] } {
set temp [ cache::channelsToIfos $ques ]
if { [ llength $temp ] > 0 } {
set ifos $temp
}
::unset temp
}
if { [ llength $ifos ] == 0 } { set ifos 0 }
set typ_length [ llength $typs ]
set ifo_length [ llength $ifos ]
set frm_length [ llength $frames ]
set tim_length [ llength $tims ]
set que_length [ llength $ques ]
;## any given framequery item can only refer to a single
;## frame family. so one framequery is required for each
;## frame family accessed by a single job.
set name short
foreach frame $frames {
set file_name_atoms [ cache::analyzeFilename $frame ]
if { [ string length $file_name_atoms ] } {
array set temp $file_name_atoms
if { ! [ string equal official $temp(type) ] } {
set err "Files with names that do not adhere to the "
append err "official frame naming convention are no "
append err "longer supported by LDAS. The filename "
append err "you specified: '$frame' seems to be of "
append err "type '$temp(type)', which is no longer "
append err "supported. Please rename your frames to "
append err "follow the official frame naming "
append err "convention. The official convention "
append err "frame names of the form: "
append err "IFO-TYPE-GPSTIME-DURATION.gwf"
return -code error $err
}
set name $temp(type)
set dt $temp(tdt)
set t0 $temp(gps)
set frame_tmp $frames
break
}
}
if { [ info exists frames_tmp ] && \
[ string length $frames_tmp ] } {
set frames $frames_tmp
}
;## specific channels from explicitly named
;## frames.
if { $frm_length && $que_length } {
set interferometers {}
set ifo_length 0
set types {}
set typ_length 0
;## if user specified a time range
if { [ string length [ lindex $tims 0 ] ] } {
set times $tims
;## otherwise the time range should be the duration
;## of the named frame as determined from the frame name
;## when a single frame name is specified.
} elseif { [ info exists t0 ] && [ llength $frames ] == 1 } {
set times $t0-[ expr {$t0 + $dt - 1} ]
;## otherwise, punt!
} else {
set times [ list ]
}
set channels $ques
}
;## specific channels over some time range with
;## ifo(s) specified. this is slightly complicated
;## by the fact that frames will have channels for
;## more than one ifo, as in H0, H1 and H2 channels in
;## a Hanford frame...
if { $typ_length && $ifo_length && $tim_length && $que_length } {
foreach ifo $ifos {
set types $typs
set interferometers $ifos
set times $tims
set frames {}
set channels $ques
}
}
} err ] } {
return -code error "[ myName ]: $err"
}
return [ list $interferometers $types $frames $times $channels ]
}
get abspathlist [ cache::find $framelist ]Comments:
proc cache::find { jobid { ifo "" } { type R } { frames "" } { times "" } { exception 0 } } {
if { [ catch {
set done 0
set globbed 0
if { ! [ info exists ::disk::cache(dirs) ] } {
set err "recently restarted ${::API}API not yet initialized. "
append err "please retry job in a few seconds."
return -code error $err
}
if { [ llength $::disk::cache(dirs) ] < 1 } {
addLogEntry "No frames found under $::MOUNT_PT" red
}
set tmp [ list ]
if { ! [ string length $frames ] } {
set tmp \
[ cache::findByTime $jobid $times $ifo $type $exception ]
if { [ llength $tmp ] && [ string length $tmp ] } {
set done 1
set globbed 1
} else {
set tmp [ list ]
}
}
;## maybe we have filenames in $frames
if { ! $done } {
foreach frame $frames {
set frame [ cache::findFileOnDisk $jobid $frame ]
if { [ string length $frame ] } {
lappend tmp $frame
}
}
}
if { [ llength $tmp ] && [ string length $tmp ] } {
set done 1
} else {
set tmp [ list ]
}
set tmp [ cache::sortFrames $tmp 1 ]
set n [ llength $tmp ]
if { $n } {
debugPuts "$n frame(s) match query"
} else {
set msg "no frames match query: (frames: '$frames' "
append msg "times: '$times' ifo: '$ifo' type: '$type')"
return -code error $msg
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $tmp
}
proc cache::findFileOnDisk { jobid filename } {
if { [ catch {
;## filename is relatively or absolutely visible
;## from working directory
if { [ file exists $filename ] } {
if { [ file isdirectory $filename ] } {
set err "requested file: $filename is a directory!"
return -code error $err
}
;## we only test for filetype on explicitly named
;## files.
set type [ fileType $filename ]
if { [ lsearch $type frame ] < 0 } {
set err "specified file: $filename is not a frame file!"
append err "file type seems to be: $type "
return -code error $err
}
}
} err ] } {
if { [ string length $err ] } {
return -code error "[ myName ]: $err"
}
}
return $filename
}
proc cache::analyzeFilename { filename } {
if { [ catch {
set atoms [ list ]
if { [ string equal .gwf [ file extension $filename ] ] } {
set atoms [ cache::analyzeOfficialFilename $filename ]
} else {
return {}
}
if { [ llength $atoms ] == 6 } {
lappend atoms type official
lappend atoms ifo [ lindex $atoms 1 ]
lappend atoms typ [ lindex $atoms 2 ]
lappend atoms gps [ lindex $atoms 3 ]
;## we will have to use the tdt and
;## getFrameNumber to set the "N" and "dt" later
lappend atoms frn 1
lappend atoms tdt [ lindex $atoms 4 ]
lappend atoms ext gwf
} else {
set atoms [ list ]
}
} err ] } {
if { [ string length $err ] } {
return -code error "[ myName ]: $err"
}
}
return $atoms
}
proc cache::analyzeOfficialFilename { filename } {
if { [ catch {
set atoms [ list ]
set tail [ file tail [ string trim $filename ] ]
set tail [ string trim $tail ]
set atoms [ regexp -inline -- $::official_fn_rx $tail ]
;## if the pattern matched, but the filename was
;## somehow malformed... should not be possible
if { [ llength $atoms ] } {
if { [ regsub -all -- {-} $tail {} foo ] != 3 } {
set msg "malformed 'official' frame filename: '$filename'"
return -code error $msg
}
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $atoms
}
proc cache::findByTimeUnThreaded { jobid times ifo type { exception 0 } } {
if { [ catch {
set fnames [ list ]
set seqpt [ list ]
if { $exception == 0 } {
set gaps 1
} else {
set gaps 0
}
if { [ info exists ::${jobid}(-rds) ] } {
set rdsflag [ set ::${jobid}(-rds) ]
set channels [ set ::${jobid}(-channels) ]
set resample [ resamplingRequested $jobid $channels ]
} else {
set rdsflag 0
set channels [ list ]
set resample 0
}
if { [ info exists ::${jobid}(-allowgaps) ] } {
set gaps [ set ::${jobid}(-allowgaps) ]
}
foreach timerange [ cache::parseTimeSpec $times ] {
foreach [ list start end ] $timerange {
if { $rdsflag } {
set seqpt "getRDSFrameFiles($ifo $type $start $end $resample):"
foreach [ list files errors ] \
[ getRDSFrameFiles $ifo $type $start $end $resample ] \
{ break }
} else {
set seqpt "getFrameFiles($ifo $type $start $end $gaps):"
foreach [ list files errors ] \
[ getFrameFiles $ifo $type $start $end $gaps ] \
{ break }
}
if { [ string length [ lindex $files 0 ] ] } {
lappend fnames $files
} else {
set files "(no files found)"
}
if { [ string length [ lindex $errors 0 ] ] } {
return -code error [ list $files $errors ]
}
}
set seqpt [ list ]
}
set fnames [ join $fnames ]
set caller [ uplevel myName ]
if { ! [ string equal cache::find $caller ] } {
set n [ llength $fnames ]
if { $n } {
debugPuts "$n frame names calculated which match query"
} else {
set msg "no frames match query: "
append msg "(times: '$times' ifo: '$ifo' type: '$type')"
return -code error $msg
}
}
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
return $fnames
}
proc cache::findByTime { jobid times ifo type { exception 0 } } {
if { [ catch {
set seqpt [ list ]
if { $exception == 0 } {
set gaps 1
} else {
set gaps 0
}
if { [ info exists ::${jobid}(-rds) ] } {
set rdsflag [ set ::${jobid}(-rds) ]
set channels [ set ::${jobid}(-channels) ]
set resample [ resamplingRequested $jobid $channels ]
} else {
set rdsflag 0
set channels [ list ]
set resample 0
}
catch { unset ::${jobid}(threads_done) }
set ::${jobid}(fnames) [ list ]
if { [ info exists ::${jobid}(-allowgaps) ] } {
set gaps [ set ::${jobid}(-allowgaps) ]
}
set ::${jobid}(caller) [ uplevel myName ]
set ::${jobid}(times) $times
set ::${jobid}(threads_created) 0
set ::threadcount($jobid) [ list ]
foreach timerange [ cache::parseTimeSpec $times ] {
foreach [ list start end ] $timerange {
if { $rdsflag } {
set seqpt "getRDSFrameFiles_t($ifo $type $start $end $resample):"
set tid [ getRDSFrameFiles_t $ifo $type $start $end $resample ]
addLogEntry "$seqpt $tid created" purple
lappend ::threadcount($jobid) $tid
incr ::${jobid}(threads_created)
catch { ::unset ::$tid }
# after 500 [ list cache::findByTimeCallback $jobid $tid $ifo $type ]
after 20
::setAlert $tid ::$tid
::setTIDCallback $tid "cache::findByTimeCallback $jobid $tid $ifo $type"
} else {
set seqpt "getFrameFiles_t($ifo $type $start $end $gaps):"
set tid [ getFrameFiles_t $ifo $type $start $end $gaps ]
incr ::${jobid}(threads_created)
lappend ::threadcount($jobid) $tid
addLogEntry "$seqpt $tid created" purple
catch { ::unset ::$tid }
after 20
::setAlert $tid ::$tid
::setTIDCallback $tid "cache::findByTimeCallback $jobid $tid $ifo $type"
}
}
}
set ::${jobid}(threads_done) 0
trace add variable ::${jobid}(threads_done) { write } [ list cache::findByTimeDone $jobid $tid $ifo $type ]
vwait ::${jobid}(result)
if { [ info exist ::${jobid}(errors) ] } {
error [ set ::${jobid}(errors) ]
}
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
return [ set ::${jobid}(result) ]
}
proc cache::findByTimeCallback { jobid tid ifo type args } {
set seqpt {}
if { ! [ info exist ::$tid ] } {
;## addLogEntry "::$tid does not exist" purple
return
}
if { [ catch {
set safe 0
set thread_state [ getThreadStatus $tid ]
if { [ string equal FINISHED $thread_state ] || \
[ string equal $thread_state $::TID_FINISHED ] } {
set now [ clock seconds ]
catch { set reaper [ getThreadFunction $tid ] }
set seqpt "$reaper ($tid):"
catch { ${reaper}_r $tid } result
::unset ::$tid
foreach [ list files errors ] $result { break }
addLogEntry "$tid reaped, $jobid '$files' '$errors'" purple
if { [ string length [ lindex $files 0 ] ] } {
lappend ::${jobid}(fnames) $files
} else {
lappend ::${jobid}(fnames) "(No frame files found)"
set errors [ list "No frame files found" ]
}
if { [ string length [ lindex $errors 0 ] ] } {
lappend ::${jobid}(errors) [ list $files $errors ]
}
set seqpt [ list ]
;## this enables the trace
incr ::${jobid}(threads_done) 1
}
} err ] } {
addLogEntry "[ myName ]:$seqpt $err" red
return -code error "[ myName ]:$seqpt $err"
}
}
proc cache::findByTimeDone { jobid ifo type args } {
if { [ catch {
set threads_done [ set ::${jobid}(threads_done) ]
set threads_created [ set ::${jobid}(threads_created) ]
if { $threads_done >= $threads_created } {
set fnames [ join [ set ::${jobid}(fnames) ] ]
set caller [ set ::${jobid}(caller) ]
if { ! [ string equal cache::find $caller ] } {
set n [ llength $fnames ]
if { $n } {
debugPuts "$n frame names calculated which match query"
} else {
set msg "no frames match query: "
set times [ set ::${jobid}(times) ]
append msg "(times: '$times' ifo: '$ifo' type: '$type')"
addLogEntry "no match $msg" purple
set ::${jobid}(result) $msg
}
}
unset ::${jobid}(threads_done)
unset ::threadcount($jobid)
;## make this one last to enable wakeup from vwait
set ::${jobid}(result) $fnames
}
} err ] } {
return -code error "[ myName ]:$err"
}
}
proc cache::parseTimeSpec { times } {
if { [ catch {
set timeranges [ list ]
set times [ split $times " ," ]
foreach time $times {
if { [ regexp {(\d{9,10})-(\d{9,10})} $time -> start end ] } {
;## do nothing
} elseif { [ regexp {^\d{9,10}$} $time start ] } {
set end [ expr { $start + 1 } ]
} else {
return -code error "invalid time spec: '$time'"
}
lappend timeranges [ list $start $end ]
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $timeranges
}
proc cache::updateDirs { { jobid { } } { flag1 { } } { flag2 { } } } {
if { [ catch {
set seqpt {}
set msg {}
if { [ string equal PURGE $jobid ] && \
[ string equal 0 $flag1 ] && \
[ string equal 1 $flag2 ] } {
::bootLock ON
if { $::number_of_running_threads == 0 } {
set seqpt "deleteDirCache(): "
deleteDirCache
set ::scan_state_vector [ list ]
set seqpt {}
cache::updateExcludedDirs
set subject "$::LDAS_SYSTEM diskcache is being rebuilt!"
set body "$subject\n\n All existing data has been purged!"
set msg "Subject: ${subject}; Body: $body"
;## addLogEntry $msg email
logMailEntry $subject $msg
unset ::disk::cache
cache::updateMountPoint
if { [ info exists ::WAITING_FOR_ALL_THREADS ] } {
cache::waitOnAllThreads
}
::bootLock OFF
} elseif { ! [ info exists ::WAITING_FOR_ALL_THREADS ] } {
cache::waitOnAllThreads
set sec $::WAIT_N_SECONDS_FOR_THREADS_TO_COMPLETE
set delay [ expr { $sec * 1000 } ]
after $delay cache::updateDirs PURGE 0 1
set msg "waiting $sec seconds for all currently running"
append msg "threads to complete. see diskcache API logs"
append msg "for results."
} else {
cache::waitOnAllThreads
::bootLock OFF
set subject "$::LDAS_SYSTEM diskcache is NOT being rebuilt!"
set body "$subject\n\n Some threads are still running,\n"
append body "rebuilding the cache may corrupt memory.\n"
append body "If you really want to rebuild the cache, you\n"
append body "will have to restart the diskcache API."
set msg "Subject: ${subject}; Body: $body"
;## addLogEntry $msg email
logMailEntry $subject $msg
}
}
} err ] } {
catch { ::bootLock OFF }
return -code error "[ myName ]:$seqpt $err"
}
return $msg
}
proc cache::updateDirsAsync { jobid cid } {
if { [ catch {
::bootLock ON
set ::BLOCK_REBUILD_REQUESTS 1
if { $::number_of_running_threads == 0 } {
set seqpt "deleteDirCache(): "
deleteDirCache
set ::scan_state_vector [ list ]
set seqpt {}
cache::updateExcludedDirs
set subject "$::LDAS_SYSTEM diskcache is being rebuilt!"
set body "$subject\n\n All existing data has been purged!"
set msg "Subject: ${subject}; Body: $body"
;## addLogEntry $msg email
logMailEntry $subject $msg
unset ::disk::cache
cache::updateMountPoint
if { [ info exists ::WAITING_FOR_ALL_THREADS ] } {
cache::waitOnAllThreads
}
::bootLock OFF
cache::updateDirsCallback $jobid $cid $msg
} elseif { ! [ info exists ::WAITING_FOR_ALL_THREADS ] } {
cache::waitOnAllThreads
set sec $::WAIT_N_SECONDS_FOR_THREADS_TO_COMPLETE
set delay [ expr { $sec * 1000 } ]
after $delay cache::updateDirsAsync $jobid $cid
set msg "waiting $sec seconds for all currently running "
append msg "threads to complete. see diskcache API logs"
append msg "for results."
;## addLogEntry $msg email
logMailEntry $subject $msg
} else {
cache::waitOnAllThreads
::bootLock OFF
set subject "$::LDAS_SYSTEM diskcache is NOT being rebuilt!"
set body "$subject\n\n Some threads are still running,\n"
append body "rebuilding the cache may corrupt memory.\n"
append body "If you really want to rebuild the cache, you\n"
append body "will have to restart the diskcache API."
set msg "Subject: ${subject}; Body: $body"
;## addLogEntry $msg email
logMailEntry $subject $msg
cache::updateDirsCallback $jobid $cid $msg
}
} err ] } {
catch { ::bootLock OFF }
set msg "[ myName ]: $err"
cache::updateDirsCallback $jobid $cid $msg
}
}
proc cache::updateDirsCallback { jobid cid msg } {
if { [ catch {
regsub {Subject:.+Body:\s+} $msg {} msg
catch { ::unset ::BLOCK_REBUILD_REQUESTS }
set ::$cid [ list 0 $msg results ]
cache::cleanup $jobid
reattach $jobid $cid
} err ] } {
return -code error "[ myName ]: $err"
}
}
proc cache::updateMountPoint { args } {
if { [ catch {
set seqpt [ list ]
set rscfile LDASdiskcache.rsc
;## for a brand new system
if { ! [ file exists $rscfile ] } {
set sysrscfile ${::LDASLIB}/${::API}API/$rscfile
file copy -force $sysrscfile $rscfile
set subject "$::LDAS_SYSTEM ${::API}API warning!"
set body "$::LDAS_SYSTEM ${::API}API has created the\n"
append body "required system file [pwd]/${rscfile}.\n"
append body "The value of the ::MOUNT_PT variable in\n"
append body "this file must be set to the correct list\n"
append body "of directories for this system or no frame\n"
append body "files will be known to the system!"
set msg "Subject: ${subject}; Body: $body"
;## addLogEntry $msg email
logMailEntry $subject $msg
}
set mtpt $::MOUNT_PT
if { [ file exists .mount.point ] } {
set mtpt [ dumpFile .mount.point ]
}
set data [ dumpFile $rscfile ]
set oldext [ string trim [ getFileExtList ] ]
set target [ cache::multiLineMtPt $data ]
set test [ cache::setExtListFromRsc $data ]
set extlist [ cache::extensionListMangler $test ]
if { [ string length $target ] } {