|
ligolw.tcl Version 1.0
Wraps the lwAPI.so and the genericAPI.so and the
genericAPI.tcl for use by the ligolw API.
package provide ligolw 1.0
package require ligolwAPI
namespace eval ligolw {
set errlvl 1
}
proc ligolw::bak { fname { levels 10 } } {
if { [ catch {
if { [ file exists $fname ] } {
set dir [ file dirname $fname ]
set files [ glob -nocomplain $dir ${fname}.ba* ]
set i $levels
while { [ incr i -1 ] } {
if { [ lsearch $files ${fname}.ba$i ] > -1 } {
file rename -force ${fname}.ba$i ${fname}.ba[ incr i ]
incr i -1
}
}
if { [ file exists ${fname}.bak ] } {
file rename -force ${fname}.bak ${fname}.ba2
}
file rename -force $fname ${fname}.bak
} else {
catch { touch $fname }
}
} err ] } {
return -code error "bak($fname $levels): $err"
}
}
proc ${API}::atExit {} {
;## remove the threads
set threads [ getThreadList ]
set text "cleaning up threads $threads"
foreach { tid func status } [ getThreadList ] {
regexp {\(([^\)]+)\)} $func -> threadfunc
append text "thread function ${threadfunc}_r"
${threadfunc}_r $tid
}
eval set objects [ getElementList ]
append text "destroying objects $objects, "
foreach datap $objects {
destructElement $datap
}
append text "channels [ countChannels ]"
addLogEntry $text
}
proc ligolw::init { } {
cppBase64Init
if { ! [ info exist ::LIGOLW_MAX_JOB_TIME ] } {
set ::LIGOLW_MAX_JOB_TIME 100000
}
if { ! [ info exist ::THREAD_TIMEOUT_SECS ] } {
set ::THREAD_TIMEOUT_SECS 20
}
if { ! [ info exist ::DATARECV_BGLOOP_SECS ] } {
set ::DATARECV_BGLOOP_SECS 60
}
;## bgLoop checkTardyThreads checkTardyThreads $::DATARECV_BGLOOP_SECS
}
proc ligolw::killJob { jobid } {
if { [ catch {
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
set now [ clock seconds ]
emptyDataBucket $jobid
catch { ::unset ::$jobid }
set ::jobid {}
lappend ::__killed_jobs $jobid $now
} err ] } {
;## ::debugPuts "$jobid non-existent,$err"
}
}
proc ligolw::pipelineBypass { jobid } {
set rp [ set ::${jobid}(-returnprotocol) ]
if { [ info exists ::${jobid}(-outputformat) ] } {
set of [ set ::${jobid}(-outputformat) ]
} else {
set of [ list ]
}
set bypass 1
set rc 0
set mddtarget [ getMddTarget $jobid ]
if { [ string equal -nocase "ligolw" $mddtarget ] } {
set bypass 0
}
if { [ info exist ::${jobid}(-metadataapi) ] } {
set metadataapi [ set ::${jobid}(-metadataapi) ]
if { [ regexp -nocase {ligolw|tee} $metadataapi ] } {
set bypass 0
}
}
if { ! [ regexp {LIGO_LW} $of ] && $bypass } {
addLogEntry "LIGO_LW not requested or ligolwAPI not a pipeline target, bypassing."
return 1
}
return 0
}
proc ligolw::wrapObject { datap } {
set seqpt {}
if { [ catch {
set seqpt "getElementAttribute $datap name"
set name [ getElementAttribute $datap name ]
if { [ regexp -nocase -- "$::VALID_KEYWORDS" $name ] } {
set seqpt "ilwd::newcontp wrapped"
set ndatap [ ilwd::newcontp wrapped ]
set seqpt "addContainerElement $datap"
addContainerElement $ndatap $datap
destructElement $datap
set datap $ndatap
}
} err ] } {
return -code error "[ myName ]: $seqpt: $err"
}
return $datap
}
proc ligolw::unwrapContainer { contp jobid } {
set seqpt {}
set ptrs [ list ]
if { [ catch {
set seqpt "getElementAttribute $contp name"
set name [ getElementAttribute $contp name ]
if { [ regexp -nocase -- "$::WRAPPED_CONTAINER" $name ] } {
set seqpt "getElementAttribute $contp size"
set size [ getElementAttribute $contp size ]
for { set i 0 } { $i < $size } { incr i 1 } {
set newptr [ copyContainerElement $contp $i ]
lappend ptrs $newptr
}
}
} err ] } {
return -code error "$seqpt: $err"
}
if { $::DEBUG > 1 } {
::debugPuts "unwrapped [ llength $ptrs ] objects for $jobid"
}
if { ! [ llength $ptrs ] } {
set ptrs $contp
}
return $ptrs
}
proc ligolw::Lw2ILwdValidate { jobid } {
set ext ""
if { ! [ info exist ::${jobid}(-ingestdata) ] } {
return -code error "-ingestdata option required"
}
set ext [ file extension [ set ::${jobid}(-ingestdata) ] ]
if { ! [ regexp {ilwd|xml} $ext ] } {
return -code error "$ingestdata extension $ext is not .xml or .ilwd"
}
if { [ info exist ::${jobid}(-returnprotocol) ] } {
set rp [ set ::${jobid}(-returnprotocol) ]
if { [ string length $rp ] } {
if { ! [ regexp -- "$::VALID_URLS" $rp ] } {
return -code error "-returnprotocol must have http://<url>, file://<path> or ftp://<path> format"
}
}
}
if { [ info exist ::${jobid}(-outputformat) ] } {
set ilwdformat [ set ::${jobid}(-outputformat) ]
} else {
set ilwdformat {ilwd ascii}
}
if { ! [ string length $ilwdformat ] || [ string match ilwd $ilwdformat ] } {
set ilwdformat {ilwd ascii}
}
if { ! [ regexp {binary|ascii} $ilwdformat ] } {
return -code error "valid -outputformat options are {ilwd binary} and {ilwd ascii}"
}
set ::${jobid}(ilwdformat) $ilwdformat
}
proc ligolw::ILwd2LwValidate { jobid } {
set out ""
set format ""
if { [ info exist ::${jobid}(-ligolwformat) ] } {
foreach {out format} [ set ::${jobid}(-ligolwformat) ] { break }
} else {
if { [ info exist ::${jobid}(-outputformat) ] } {
foreach {out format} [ set ::${jobid}(-outputformat) ] { break }
}
}
if { ! [ string length $out ] } {
set out LIGO_LW
} elseif { ! [ string match LIGO_LW $out ] } {
return -code error "-outputformat must be LIGO_LW"
}
if { [ string length $format ] } {
if { ! [ regexp {base64|Text} $format ] } {
return -code error "-outputformat must be LIGO_LW or LIGO_LW base64"
}
} else {
set format Text
}
set ::${jobid}(format) $format
}
proc ligolw::thread_ilwd2lw { jobid ilwdp } {
if { [ catch {
set tid [ ilwd2Lw_t $ilwdp [ set ::${jobid}(format) ] ]
# registerThread $tid $jobid ligolw::ilwd2LwReaper $tid $jobid $ilwdp
::setAlert $tid ::$tid
::setTIDCallback $tid "ligolw::ilwd2LwReaper $tid $jobid $ilwdp"
;## ligolw::ilwd2LwReaper $tid $jobid $ilwdp
# ::debugPuts "thread $tid started for $jobid $ilwdp"
} err ] } {
return -code error "[ myName ]: $err"
}
}
proc ligolw::thread_lw2ilwd { jobid lwp } {
set seqpt {}
if { [ catch {
if { ! [ info exist ::${jobid}(-ligolwquery) ] } {
set ::${jobid}(-ligolwquery) full
}
;## hands off to thread here
set query [ set ::${jobid}(-ligolwquery) ]
set tid [ getLwData_t $lwp $query ]
# registerThread $tid $jobid ligolw::Lw2ILwdReaper $tid $jobid $lwp
::setAlert $tid ::$tid
::setTIDCallback $tid "ligolw::Lw2ILwdReaper $tid $jobid $lwp"
;## ligolw::Lw2ILwdReaper $tid $jobid $lwp
if { $::DEBUG > 1 } {
::debugPuts "thread $tid started for $jobid $lwp"
}
} err ] } {
return -code error "[ myName ]: $seqpt $err"
}
}
proc ligolw::fixUrl { jobid url } {
if { [ catch {
foreach { targ fname } [ url2file $jobid $url xml ] { break }
while { [ file exists $fname ] } {
set root [ file rootname $url ]
set ext [ file extension $url ]
if { [ regexp {([^\d]+)(\d+)$} $root -> main int ] } {
incr int
} else {
set main $root
set int 1
}
set url $main$int$ext
foreach { targ fname } \
[ url2file $jobid $url xml ] { break }
unset root
unset ext
unset main
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $url
}
proc ligolw::ptrtest { ptr } {
set seqpt {}
if { ! [ regexp {^_[0-9a-f]+_p_[a-zA-Z]+$} $ptr ] } {
return -code error "[ myName ]: not a swig ptr name: $ptr"
}
set ptype unknown
if { [ catch {
switch -regexp -- $ptr {
{^_[0-9a-f]+_p_Frame$} {
set ptype frame
}
{^_[0-9a-f]+_p_LdasElement$} {
set ptype ilwdp
}
{^_[0-9a-f]+_p_LdasArrayBase$} {
error "Bare arrays are forbidden!"
}
{^_[0-9a-f]+_p_LdasContainer$} {
set ptype ilwdp
}
{^_[0-9a-f]+_p_LWDocument$} {
set ptype lwp
}
{^_[0-9a-f]+_p_ILwdFile$} {
set ptype ILwdFile
}
{^_[0-9a-f]+_p_FrameFile$} {
set ptype FrameFile
}
{^_[0-9a-f]+_p_LdasBinary$} {
error "cannot convert $ptr to lwp"
}
{^_[0-9a-f]+_p_tid$} {
set ptype threadp
}
default {
}
} ;## end of switch
} err ] } {
return -code error "[ myName ]:$err"
}
return $ptype
}
proc ligolw::strtest { jobid str } {
set datap {}
if { ! [ regexp {<([^>]+)>} $str -> tag ] } {
set err "argument string not xml-like (no tags)"
return -code error "[ myName ]: $err"
}
if { [ regexp -nocase {<ilwd>} $str ] } {
if { [ catch {
set seqpt "putElement($str):"
set ilwdp [ putElement $str ]
set seqpt {}
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
} elseif { [ regexp {<!DOCTYPE LIGO_LW} $str ] } {
set err "Dynamically created LIGO_LW is forbidden!"
return -code error "[ myName ]: $err"
} else {
set err "argument is xml-like, but not ilwd or ligolw"
return -code error "[ myName ]: $err"
}
return $datap
}
proc ligolw::ilwd2timeSeries { jobid elementp } {
if { [ catch {
set seqpt "ilwd2TimeSeries($elementp):"
set tsp [ ilwd2TimeSeries $elementp [ set ::${jobid}(format) ] ]
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
return $tsp
}
proc ligolw::toILwd { jobid ptr url { format ascii } {comp none} } {
if { [ catch {
set seqpt "url2file $jobid $ptr $url ilwd"
if { [ info exist ::${jobid}(ilwdformat) ] } {
set ilwdformat [ set ::${jobid}(ilwdformat) ]
foreach { ilwd format } $ilwdformat { break }
}
foreach { - ilwdfile } [ url2file $jobid $url ilwd ] { break }
set seqpt "ilwd::writeFile $jobid $ptr $ilwdfile $format $comp"
set target [ ilwd::writeFile $jobid $ptr $ilwdfile $format $comp ]
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
return $target
}
proc ligolw::isFileXml { filename } {
if { [ catch {
set xml 0
set fid [ open $filename r ]
set data [ read $fid 32 ]
close $fid
if { [ regexp -nocase {\?xml} $data ] } {
set xml 1
}
} err ] } {
catch { close $fid }
return -code error "[ myName ]: $err"
}
return $xml
}
proc ligolw::isDummyObject { datap jobid } {
set rc 0
if { [ catch {
set ignore [ getElementMetadata $datap ignore ]
;## if dummy ilwd from pipeline, just bypass the command
if { [ regexp -nocase {yes} $ignore ] } {
destructElement $datap
if { $::DEBUG > 1 } {
addLogEntry "No mdd data from pipeline for $jobid, bypassing."
}
set rc 1
}
} err ] } {
return 0
}
return $rc
}
proc ligolw::handleUrl { jobid fname url } {
if { [ catch {
set seqpt "fixUrlTarget $url"
foreach { prot targ port } [ fixUrlTarget $url ] { break }
set seqpt "$prot $targ $port"
;## ask managerAPI to handle http and ftp protocols
switch -regexp -- $prot {
http|ftp { outputUrls $jobid [ list $fname $url ] }
default {}
}
} err ] } {
return -code error "[myName]: $seqpt $err"
}
}
proc ligolw::handleILwd { jobid ilwdp } {
set seqpt {}
set done 0
if { [ catch {
if { [ info exist ::${jobid}(-metadataapi) ] } {
set api [ set ::${jobid}(-metadataapi) ]
if { [ string match metadata $api ] } {
foreach { api host port service } [ validService $api data ] { break }
ilwd::setjob $ilwdp $jobid
set tid [ sendDataElement_t $ilwdp $port $host ]
# registerThread $tid $jobid dataSendReaper $tid $jobid $ilwdp
::setAlert $tid ::$tid
::setTIDCallback $tid "dataSendReaper $tid $jobid $ilwdp"
;## ::dataSendReaper $tid $jobid $ilwdp
::debugPuts "jobid '[ ilwd::getjob $ilwdp ]' attached to $ilwdp, thread $tid"
set done 1
}
}
if { ! $done } {
if { [ info exist ::${jobid}(-returnprotocol) ] } {
set url [ ligolw::setreturnprotocol $jobid ilwd ]
set targ [ ligolw::toILwd $jobid $ilwdp $url ]
lappend ::${jobid}(products) $targ
set ::${jobid}(url) $url
destructElement $ilwdp
}
}
} err ] } {
return -code error "[myName] $err"
}
}
proc ligolw::thread_readLwFile { jobid fname } {
if { [ catch {
set seqpt "readLwFile_t($fname):"
set tid [ readLwFile_t $fname ]
# registerThread $tid $jobid readLwFile_t $tid $jobid $fname
set ::$tid ""
::setAlert $tid ::$tid
::setTIDCallback $tid "ligolw::readLwFileReaper $tid $jobid $fname"
;## ligolw::readLwFileReaper $tid $jobid $fname ]
} err ] } {
addLogEntry "err=$err"
return -code error "[ myName ]: $err"
}
return $tid
}
proc ligolw::readLwFileReaper { tid jobid fname args } {
set caller [ myName ]
if { ! [ info exist ::$tid ] } {
addLogEntry "::$tid does not exist" blue
return
}
set state [ set ::$tid ]
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
if { [ info exist ::Threads($tid,state) ] } {
if { [ string equal $jobid [ set ::Threads($tid,jobid) ] ] } {
set state FINISHED
addLogEntry "forced state to finish for $tid '$args'" blue
}
}
}
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
return
}
if { [ catch {
set ballcolor red
set lwp [ readLwFile_r $tid ]
# unregisterThread $tid $jobid
catch { ::unset ::$tid }
::debugPuts "thread $tid completed, $jobid, lwp $lwp"
if { [ array exist ::${jobid} ] } {
after 0 [ list ligolw::thread_lw2ilwd $jobid $lwp ]
} else {
set ballcolor orange
error "No job array when thread $tid is completed: '[dumpData]'"
}
} err ] } {
addLogEntry "[ myName ] $err" $ballcolor
catch { destructLWDocument $lwp }
# catch { unregisterThread $tid $jobid }
if { [ array exist ::${jobid} ] } {
append ::${jobid}(errors) "$err\n"
eval [ set ::${jobid}(callback) ] $jobid
}
}
}
proc ligolw::thread_writeLwFile { jobid lwp fname url } {
if { [ catch {
set seqpt "writeLwFile_t($lwp):"
set tid [ writeLwFile_t $lwp $fname ]
# registerThread $tid $jobid writeLwFile_t $tid $jobid $lwp $fname $url
set ::$tid ""
::setAlert $tid ::$tid
::setTIDCallback $tid "ligolw::writeLwFileReaper $tid $jobid $lwp $fname $url"
;## writeLwFileReaper $tid $jobid $lwp $fname $url
if { $::DEBUG > 1 } {
::debugPuts "started thread $tid for $jobid,$fname,$url"
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $tid
}
proc ligolw::writeLwFileReaper { tid jobid lwp fname url args } {
set caller [ myName ]
set seqpt {}
if { ! [ info exist ::$tid ] } {
addLogEntry "::$tid does not exist" blue
return
}
set state [ set ::$tid ]
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
if { [ info exist ::Threads($tid,state) ] } {
if { [ string equal $jobid [ set ::Threads($tid,jobid) ] ] } {
set state FINISHED
addLogEntry "forced state to finish for $tid '$args'" blue
}
}
}
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
return
}
if { [ catch {
set seqpt "writeLwFile_r $tid"
writeLwFile_r $tid
catch { unset ::$tid }
if { $::DEBUG > 1 } {
::debugPuts "thread $tid completed"
}
# unregisterThread $tid $jobid
destructLWDocument $lwp
if { [ array exist ::${jobid} ] } {
;## return if all objects are processed
;## ftp data if protocol dictates it
;## file->url handling is done by manager
set seqpt "destructLWDocument($lwp):"
file attributes $fname -permission 0644
lappend ::${jobid}(products) $fname
set ::${jobid}(url) $url
}
} err ] } {
catch { unset ::$tid }
addLogEntry "[ myName ]:$seqpt $err" 2
# catch { unregisterThread $tid $jobid }
# catch { destructLWDocument $lwp }
if { [ array exist ::${jobid} ] } {
append ::${jobid}(errors) "$err\n"
eval [ set ::${jobid}(callback) ] $jobid
}
}
if { [ array exist ::${jobid} ] } {
;## return if all objects are processed
incr ::${jobid}(objectCnt) -1
set cnt [ set ::${jobid}(objectCnt) ]
if { $cnt <= 0 } {
set seqpt "ligolw::ILwd2LwCallback $jobid"
ILwd2LwCallback $jobid
}
}
}
proc ligolw::ilwd2LwReaper { tid jobid ilwdp args } {
if { ! [ info exist ::$tid ] } {
addLogEntry "::$tid does not exist" blue
return
}
set caller [ myName ]
set state [ set ::$tid ]
set seqpt {}
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
if { [ info exist ::Threads($tid,state) ] } {
if { [ string equal $jobid [ set ::Threads($tid,jobid) ] ] } {
set state FINISHED
addLogEntry "forced state to finish for $tid '$args'" blue
}
}
}
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
return
}
if { [ catch {
set seqpt "ilwd2Lw_r $tid"
set lwp [ ilwd2Lw_r $tid ]
# unregisterThread $tid $jobid
catch { ::unset ::$tid }
;## destruct ilwd here
set url [ setreturnprotocol $jobid xml ]
if { $::DEBUG > 1 } {
::debugPuts "[myName] thread $tid completed for $jobid $lwp,url $url"
}
destructElement $ilwdp
;## unset this first since tids are reusable
if { [ array exist ::${jobid} ] } {
set seqpt "ligolw::fixUrl $jobid $lwp $url"
;## this threads the writing of Lw document to file
set url [ ligolw::fixUrl $jobid $url ]
foreach { targ fname } [ url2file $jobid $url xml ] { break }
ligolw::bak $fname
set seqpt "writeLwFile($lwp $fname): $url"
addLogEntry $seqpt
after 0 [ list ligolw::thread_writeLwFile $jobid $lwp $fname $url ]
} else {
destructLWDocument $lwp
}
} err ] } {
# catch { unregisterThread $tid $jobid }
catch { unset ::$tid }
addLogEntry "[ myName ]:$seqpt $err"
if { [ array exist ::${jobid} ] } {
append ::${jobid}(errors) "$err\n"
eval [ set ::${jobid}(callback) ] $jobid
}
}
}
proc ligolw::Lw2ILwdReaper { tid jobid lwp args } {
if { ! [ info exist ::$tid ] } {
addLogEntry "::$tid does not exist" blue
return
}
set caller [ myName ]
set state [ set ::$tid ]
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
if { [ info exist ::Threads($tid,state) ] } {
if { [ string equal $jobid [ set ::Threads($tid,jobid) ] ] } {
set state FINISHED
addLogEntry "forced state to finish for $tid '$args'" blue
}
}
}
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
return
}
if { [ catch {
::debugPuts "[myName] thread $tid completed for $jobid"
# unregisterThread $tid $jobid
foreach { ilwdp pos } [ getLwData_r $tid ] { break }
catch { ::unset ::$tid }
destructLWDocument $lwp
if { [ array exist ::${jobid} ] } {
handleILwd $jobid $ilwdp
incr ::${jobid}(objectCnt) -1
set cnt [ set ::${jobid}(objectCnt) ]
if { $cnt <= 0 } {
set seqpt "ligolw::Lw2ILwdCallback $jobid"
Lw2ILwdCallback $jobid
}
} else {
destructElement $ilwdp
}
} err ] } {
addLogEntry "[ myName ]: $err"
# catch { unregisterThread $tid $jobid }
if { [ array exist ::${jobid} ] } {
append ::${jobid}(errors) "$err\n"
eval [ set ::${jobid}(callback) ] $jobid
}
}
}
proc ligolw::setreturnprotocol { jobid outext } {
if { [ info exist ::${jobid}(-returnprotocol) ] } {
set rp [ set ::${jobid}(-returnprotocol) ]
} else {
set rp ""
}
if { [ string length $rp ] } {
foreach { protocol target port targ2 } [ fixUrlTarget $rp ] { break }
} elseif { [ info exist ::${jobid}(-ingestdata) ] } {
set target [ set ::${jobid}(-ingestdata) ]
set target [ file tail $target ]
set protocol "http://"
} else {
return "http://results.$outext"
}
set ext [ file extension $target ]
set fname [ file tail $target ]
if { [ string length $ext ] } {
regsub "$ext$" $fname .$outext outfile
return "$protocol://$outfile"
} else {
return "$protocol://$fname.$outext"
}
}
proc ILwd2LwCallback { jobid } {
if { [ catch {
;## reinstall the trace to connect back
set cid [ set ::${jobid}(cid) ]
#trace variable ::$cid w "reattach $jobid $cid"
if { [ info exist ::${jobid}(errors) ] } {
set err [ set ::${jobid}(errors) ]
if { [ string length $err ] } {
error $err
}
}
set fnames ""
if { [ info exist ::${jobid}(products) ] } {
set fnames [ set ::${jobid}(products) ]
}
if { [ info exist ::${jobid}(-subject) ] } {
set subj [ set ::${jobid}(-subject) ]
} else {
set subj "$jobid completed"
}
set url [ set ::${jobid}(url) ]
regexp {^([^:]+):} $url -> protocol
set msg [ macroReturnMsg $jobid $protocol $fnames ]
;## msg has a 4 for return code
set done 0
set api ""
if { [ info exist ::${jobid}(-metadataapi) ] } {
set api [ set ::${jobid}(-metadataapi) ]
}
set rc [ regexp -nocase {ligolw} $api ]
;## not a pipeline or pipeline ends in ligolw, end here
if { ! [ string length [ getMddTarget $jobid ] ] && $rc } {
lappend msg $subj
set done 1
} elseif { $rc } {
lappend msg $subj
set done 1
} elseif { ! [ string length $api ] } {
lappend msg $subj
set done 1
}
if { ! $done } {
set result [ lindex $msg 1 ]
if { [ string length $result ] } {
set msg [ list 2 $result $subj ]
} else {
set msg [ list 0 0 0 ]
}
}
::ligolw::killJob $jobid
set ::$cid $msg
} err ] } {
::ligolw::killJob $jobid
set msg "ligolwAPI: ILwd2Lw failed: $err"
addLogEntry $msg 2
set ::$cid [ list 3 $msg error! ]
}
reattach $jobid $cid
}
proc Lw2ILwdCallback { jobid } {
if { [ catch {
set cid [ set ::${jobid}(cid) ]
set seqpt ""
if { [ info exist ::${jobid}(errors) ] } {
set msg [ set ::${jobid}(errors) ]
if { [ string length $msg ] } {
error $msg
}
}
set fnames ""
if { [ info exist ::${jobid}(products) ] } {
set fnames [ set ::${jobid}(products) ]
}
if { [ info exist ::${jobid}(-subject) ] } {
set subj [ set ::${jobid}(-subject) ]
} else {
set subj "$jobid completed"
}
set msg [ list 0 0 0 ]
if { [ info exist ::${jobid}(-metadataapi) ] } {
set api [ set ::${jobid}(-metadataapi) ]
if { [ regexp -nocase {ligolw} $api ] } {
set url [ set ::${jobid}(url) ]
regexp {^([^:]+):} $url -> protocol
set fnames ""
if { [ info exist ::${jobid}(products) ] } {
set fnames [ set ::${jobid}(products) ]
}
set msg [ macroReturnMsg $jobid $protocol $fnames ]
set result [ lindex $msg 1 ]
set msg [ list 4 $result $subj ]
}
}
::ligolw::killJob $jobid
set ::$cid $msg
} err ] } {
::ligolw::killJob $jobid
set msg "ligolwAPI: Lw2ILwd failed: $err"
set msg [ list 3 $msg error! ]
::debugPuts "$jobid returns error $msg."
set ::$cid $msg
}
reattach $jobid $cid
}
proc ligolw::processILwdDataFromBucket { jobid } {
if { ! [ array exist ::$jobid ] } {
addLogEntry "$jobid array no longer exist, data not processed" blue
catch { emptyDataBucket $jobid }
return
}
if { [ catch {
set cid [ set ::${jobid}(cid) ]
set dataptrs [ processDataBucket $jobid ]
set newptrs [ list ]
foreach datap $dataptrs {
if { ! [ ligolw::isDummyObject $datap $jobid ] } {
set ptrs [ ligolw::unwrapContainer $datap $jobid ]
;## if it is a wrapped container, free the wrapped container
;## after objects are extracted
if { [ llength $ptrs ] > 1 } {
destructElement $datap
} elseif { [ llength $ptrs ] == 1 } {
if { ! [ string match $ptrs $datap ] } {
destructElement $datap
}
}
set newptrs [ concat $newptrs $ptrs ]
set nondummy 1
} else {
set dummy 1
}
}
set numObjects [ llength $newptrs ]
set ::${jobid}(objectCnt) $numObjects
;## hand off to thread, when done, callback is invoked
foreach datap $newptrs {
after 0 [ list ligolw::thread_ilwd2lw $jobid $datap ]
}
;## pipeline dummy object to prevent timeout, just hop to next step
if { [ llength $dataptrs ] && ! [ info exist nondummy ] } {
catch { emptyDataBucket $jobid }
unset ::${jobid}
set ::jobid {}
set ::$cid [ list 0 0 0 ]
reattach $jobid $cid
}
} err ] } {
addLogEntry $err 2
catch { emptyDataBucket $jobid }
set ::$cid [ list 3 "$jobid: $err" error! ]
unset ::${jobid}
set ::jobid {}
reattach $jobid $cid
}
}
proc ligolw::ILwd2LwMacroProcess { jobid } {
set cid [ set ::${jobid}(cid) ]
set ::${jobid}(callback) ILwd2LwCallback
set ::${jobid}(errors) ""
if { [ ligolw::pipelineBypass $jobid ] } {
unset ::${jobid}
set ::jobid {}
set ::$cid [ list 0 0 0 ]
reattach $jobid $cid
return
}
if { [ catch {
set cid [ set ::${jobid}(cid) ]
set returnprotocol [ set ::${jobid}(-returnprotocol) ]
if { ! [ string length [ join $returnprotocol ] ] } {
set returnprotocol http://${jobid}.xml
set ::${jobid}(-returnprotocol) $returnprotocol
}
if { ! [ regexp -- "$::VALID_URLS" $returnprotocol ] } {
error "returnprotocol must have http://<url>, file://<path> or ftp://<path> formats"
}
ligolw::ILwd2LwValidate $jobid
;## ilwd is in form of files
if { [ info exist ::${jobid}(-ingestdata) ] } {
set ingestdata [ set ::${jobid}(-ingestdata) ]
if { [ string length $ingestdata ] && ! [ regexp {port} $ingestdata ] } {
set ::${jobid}_DATABUCKET [ list ]
lappend ::${jobid}_DATABUCKET $ingestdata
}
}
if { [ info exist ::${jobid}_DATABUCKET ] } {
ligolw::processILwdDataFromBucket $jobid
} else {
addLogEntry "waiting for data object" blue
set ::${jobid}(timeoutId) [ after $::LIGOLW_MAX_JOB_TIME \
[ list ligolw::jobTimedOut $jobid ] ]
if { [ regexp {8.4} $::tcl_version ] } {
::trace add variable ::${jobid}_DATABUCKET { write } \
[ list ligolw::ILwd2LwLateData $jobid ]
} else {
::trace variable ::${jobid}_DATABUCKET w \
[ list ligolw::ILwd2LwLateData $jobid ]
}
}
} err ] } {
addLogEntry $err 2
catch { emptyDataBucket $jobid }
set ::$cid [ list 3 "$jobid: $err" error! ]
unset ::${jobid}
set ::jobid {}
reattach $jobid $cid
}
}
proc ligolw::Lw2ILwdMacroProcess { jobid } {
set ::${jobid}(callback) Lw2ILwdCallback
set ::${jobid}(errors) ""
set cid [ set ::${jobid}(cid) ]
if { [ catch {
set database ""
if { [ info exist ::${jobid}(-database) ] } {
set database [ set ::${jobid}(-database) ]
}
if { ! [ string length $database ] } {
set database default
}
ligolw::Lw2ILwdValidate $jobid
set ingestdata [ set ::${jobid}(-ingestdata) ]
set ::${jobid}(objectCnt) [ llength $ingestdata ]
set ::${jobid}_DATABUCKET [ list ]
lappend ::${jobid}_DATABUCKET $ingestdata
addLogEntry "ingesting $ingestdata for $database database"
;## this threads the readLwFile call
set dataptrs [ processDataBucket $jobid ]
;## if the input file is an ilwd, just pass through to an api or write it to file
;## if -returnprotocol present and -metadataapi set to {}
set nowait 0
foreach datap $dataptrs {
if { [ string match ilwdp [ ligolw::ptrtest $datap ] ] } {
ligolw::handleILwd $jobid $datap
set nowait 1
}
}
if { $nowait } {
Lw2ILwdCallback $jobid
}
} err ] } {
addLogEntry $err 2
catch { emptyDataBucket $jobid }
unset ::${jobid}
set ::jobid {}
set ::$cid [ list 3 "$jobid: $err" error! ]
reattach $jobid $cid
}
}
proc ligolw::ILwd2LwLateData { jobid args } {
addLogEntry "getting data object after tcl macro" blue
after cancel [ set ::${jobid}(timeoutId) ]
# after 10 [ list ligolw::processILwdDataFromBucket $jobid ]
catch { ligolw::processILwdDataFromBucket $jobid }
}
proc ligolw::jobTimedOut { jobid } {
addLogEntry "$jobid timed out, no data arrived"
if { [ info exist ::${jobid}(callback) ] } {
append ::${jobid}(errors) "No data bucket for this job\n"
[ set ::${jobid}(callback) ] $jobid
} else {
ligolw::killJob $jobid
}
}
metadata::outIlwd $stmtp "results.out"Comments:
proc dataSendReaper { tid jobid datap args } {
if { ! [ info exist ::$tid ] } {
addLogEntry "::$tid does not exist" blue
return
}
set myName [ myName ]
if { [ catch {
set state [ set ::$tid ]
if { [ string equal FINISHED $state ] ||
[ string equal $state $::TID_FINISHED ] } {
unregisterThread $tid $jobid
sendDataElement_r $tid
set do_unset 1
unregisterThread $tid $jobid
::unset ::$tid
unset do_unset
::debugPuts "$tid completed for sendDataElement_r $datap"
set do_destruct 1
destructElement $datap
unset do_destruct
}
} err ] } {
addLogEntry "$tid $datap $err" 2
catch { unregisterThread $tid $jobid }
if { [ info exist do_unset ] } {
addLogEntry "unset ::$tid" purple
catch { unset ::$tid }
}
if { [ info exist do_destruct ] } {
addLogEntry "destruct $datap" purple
catch { destructElement $datap }
}
}
}
proc registerThread { tid jobid args } {
set ::Threads($tid,start_time) [ clock seconds ]
set ::Threads($tid,reaper) $args
set ::Threads($tid,jobid) $jobid
catch { unset ::Threads($tid,state) }
}
proc unregisterThread { tid jobid } {
if { [ catch {
if { [ info exist ::Threads($tid,jobid) ] } {
set thread_jobid [ set ::Threads($tid,jobid) ]
;## addLogEntry "thread $tid jobid $thread_jobid, to match jobid=$jobid" purple
if { [ string equal $jobid $thread_jobid ] } {
set vars [ lsort [ array names ::Threads ${tid}* ] ]
foreach entry $vars {
unset ::Threads($entry)
}
}
}
} err ] } {
addLogEntry $err 2
}
}
proc checkTardyThreads {} {
set now [ clock seconds ]
eval set threads [ getThreadList ]
foreach { tid func state } $threads {
set vars [ list ]
if { [ catch {
set vars [ lsort [ array names ::Threads ${tid}* ] ]
foreach entry $vars {
regexp {([^,]+),(\S+)} $entry -> tid attribute
if { [ string equal start_time $attribute ] } {
set start_time [ set ::Threads($entry) ]
regexp {\((\S+)\)} $func -> func
set threadvar ::THREAD_TIMEOUT_SECS_[ string toupper $func ]
if { [ info exist $threadvar ] } {
set timeout [ set $threadvar ]
} else {
set timeout $::THREAD_TIMEOUT_SECS
}
set duration [ expr $now - $start_time ]
if { $duration > $timeout } {
set cmd [ set ::Threads($tid,reaper) ]
set ::Threads($tid,state) FINISHED
set jobid [ set ::Threads($tid,jobid) ]
;## need to set the state to finish first before reaper can take
;## do not use after inside bgLoop
addLogEntry "reaping $jobid $func thread $tid running for $duration secs via '$cmd'" orange
set rc [ catch { eval $cmd } err1 ]
addLogEntry "reap $jobid $func thread $tid running for $duration secs via '$cmd', rc=$rc $err1" purple
}
}
}
} err ] } {
addLogEntry "error processing $func thread $tid: $err" 2
foreach entry $vars {
unset ::Threads($entry)
}
}
}
}
Back to Top