|
This is the Laser Interferometer Gravitational
Oservatory (LIGO) Event Monitor server Tcl Script.
This is the main script called by eventmonAPI
to bring in the eventmon packages which all
uses eventmon namespace.
This module sources the following sub-modules:
package provide eventmon 1.0
namespace eval eventmon {}
package require eventmonAPI
proc dumpILwd { jobid ptr type { format ascii } { comp none } } {
if { [ regexp {_p_Ldas} $ptr ] } {
set ilwdfile ${jobid}${type}.ilwd
if { [ catch {
set target [ ilwd::writeFile $jobid $ptr $ilwdfile $format $comp ]
} err ] } {
addLogEntry "failed to write $ilwdfile: $err"
}
}
}
proc eventmon::setDatabaseRate { args } {
if { [ catch {
setMaxDatabaseRows $::MAX_DB_INSERTIONS_PER_SEC
addLogEntry "Max number of insertions/secs set to $::MAX_DB_INSERTIONS_PER_SEC (-1 = no limit)" blue
set ::eventmon::max_db_Insertions_per_sec $::MAX_DB_INSERTIONS_PER_SEC
} err ] } {
set body "$err, please reset to valid numeric value if not set"
set subject "$::LDAS_SYSTEM failed to adjust database insertion rate"
addLogEntry "Subject: ${subject}; Body: $body" email
if { [ info exist ::eventmon::max_db_Insertions_per_sec ] } {
set ::MAX_DB_INSERTIONS_PER_SEC $::eventmon::max_db_Insertions_per_sec
}
return -code error $body
}
}
proc ${API}::init {} {
if { ! [ info exist ::MAX_DB_INSERTIONS_PER_SEC ] } {
set ::MAX_DB_INSERTIONS_PER_SEC -1
}
eventmon::setDatabaseRate
if { [ regexp {8.4} $::tcl_version ] } {
set ::eventmon::tcl8.4 1
trace add variable ::MAX_DB_INSERTIONS_PER_SEC { write } eventmon::setDatabaseRate
} else {
trace variable ::MAX_DB_INSERTIONS_PER_SEC w "eventmon::setDatabaseRate"
}
if { ! [ info exists ::MANAGER_ABORT_AFTER_N_SECONDS_IN_ONE_API ] } {
set ::MANAGER_ABORT_AFTER_N_SECONDS_IN_ONE_API 1000
}
if { ! [ info exist ::EVENTMON_THREAD_TIMEOUT_SECS ] } {
set ::EVENTMON_THREAD_TIMEOUT_SECS 50
}
if { ! [ info exist ::DEBUG_THREADS ] } {
set ::DEBUG_THREADS 0
}
;## checkBuckets is automatically invoked by data socket handler
;## Thread Handlers
bgLoop eventmon_expiredJobKiller "eventmon::expiredJobKiller" 10
bgLoop eventmon_defunctjobs eventmon::defunctJobs 300
;## bgLoop eventmon_wakeupDataRecvThreads eventmon::wakeupDataRecvThreads 60
;## bgLoop eventmon_wakeupaddDataThreads eventmon::wakeupaddDataThreads 120
}
proc ${API}::atExit {} {
set threads [ getThreadList ]
addLogEntry "cleaning up threads $threads" blue
set text ""
foreach { tid func status } $threads {
regexp {\(([^\)]+)\)} $func -> threadfunc
append text "calling thread function ${threadfunc}_r for $tid"
catch { ${threadfunc}_r $tid }
}
if { [ string length $text ] } {
addLogEntry $text blue
}
}
proc ${API}::dummyObj {} {
if { [ catch {
set contp [ ilwd::newcontp dummy ]
setElementMetadata $contp ignore yes
set dummyline "<lstring name='dummy' size='5'>dummy</lstring>"
set elemp [ putElement $dummyline ]
addContainerElement $contp $elemp
destructElement $elemp
} err ] } {
catch { destructElement $elemp }
return -code "error creating dummy: $err"
}
return $contp
}
proc ${API}::validateOpts { jobid } {
uplevel {
set mddtarget [ getMddTarget $jobid ]
if { [ string length $mddtarget ] } {
if { ! [ regexp $::MDD_TARGETS $mddtarget ] } {
set msg "Invalid -multiDimDataTarget or "
append msg "-mddapi option '$mddtarget', "
append msg "must be frame or ligolw."
error $msg
}
if { [ string match {frame} $mddtarget ] } {
set rp [ set ::${jobid}(-returnprotocol) ]
if { ! [ string length $rp ] } {
set msg "No returnprotocol for "
append msg "-multiDimDataTarget frame option."
error $msg
}
}
} else {
;## no mdd, check for wrapper data from files
if { [ info exist ::${jobid}(-wrapperdata) ] } {
set wrapperdata [ set ::${jobid}(-wrapperdata) ]
if { ! [ string length $wrapperdata ] } {
error "No files specified for -wrapperdata option."
}
}
}
set ::${jobid}(mddtarget) $mddtarget
;## either metadata or ligolw for metadata target
if { [ info exist ::${jobid}(-metadataapi) ] } {
set metadataapi [ set ::${jobid}(-metadataapi) ]
if { ! [ regexp {(ligolw|metadata|tee)} $metadataapi ] } {
set msg "Invalid -metadataapi option: "
append msg "must be ligolw, metadata or "
append msg "tee for both."
error $msg
}
} else {
set metadataapi metadata
}
}
}
proc ${API}::expandWrapperData { jobid } {
set wrapperdata [ set ::${jobid}(-wrapperdata) ]
set files [ list ]
foreach entry $wrapperdata {
if { [ file isdirectory $entry ] } {
foreach file [ glob $entry/*.ilwd ] {
lappend files $file
}
} elseif { [ file exists $entry ] } {
set ext [ file extension $entry ]
if { [ string match .ilwd $ext ] } {
lappend files $entry
} else {
return -code error "Some non ilwd is specified by -wrapperdata option: $entry"
}
}
}
if { [ llength $files ] } {
return $files
} else {
return -code error "No files specified for -wrapperdata option."
}
}
This proc can be called:Comments:
- job terminates normally with macro and objects
- error processing the job object
- job objects are timed out, no tcl command
- macro timed out, no objects. The return is "$rc $msg" to user cmd 0 - continue with command, no msg 1 - continue with command, msg 3 - exception, abort 4 - no products, end of chain
proc ${API}::killJob { jobid { rc 3 } { msg "" } } {
set origrc $rc
if { ! [ string length [ array names ::${jobid} ] ] } {
return [ list 4 "[myName] $jobid failed due to no array ::$jobid" ]
}
foreach entry [ array names ::addData *${jobid} ] {
foreach { tid jobid } [ split $entry , ] { break }
addLogEntry "waiting for addData thread $tid to finish" blue
catch { addData_r $tid } result
unset ::$tid
addLogEntry "$tid addData_r result: '$result'" blue
foreach entry $result {
if { [ regexp {_p_Ldas} $entry ] } {
catch { destructElement $entry }
}
}
unset ::addData($tid,$jobid)
}
;## remove the container objects
catch { emptyDataBucket $jobid }
;## remove the result objects if any for exception completion
;## if there is product, shared object has removed the wrapper objects.
if { [ info exist ::${jobid}(productType) ] } {
set bits [ set ::${jobid}(productType) ]
;## no mdd or metadata, end the pipeline
if { $bits == 0 || $bits == 4 } {
set rc 4
}
} else {
;## issue removeJob only if there is an exception
if { $rc == 3 } {
regexp {(\d+)} $jobid -> id
catch { ::listJob $id } msg1
catch { ::killJob $id } err1
addLogEntry "internal data from listJob: $msg1; killJob $err1" purple
;## remaining threads could be from dataRecv
if { [ array exist ::dataRecvStates ] } {
addLogEntry "current dataRecv thread states [ array get ::dataRecvStates ] " purple
}
}
}
;## always remove the end products created by shared object
if { [ info exist ::${jobid}(product) ] } {
addLogEntry "products to remove [ set ::${jobid}(product) ]"
foreach ptrs [ set ::${jobid}(product) ] {
foreach ptr $ptrs {
if { [ regexp {_p_Ldas} $ptr ] } {
destructElementWrap $ptr endproducts 1
}
}
}
}
;## remove after process if job terminates before it
catch { after cancel [ set ::${jobid}(afterId) ] } err
;## wake up any tcl command waiting on it here to return to manager
;## do not do earlier than this
if { $origrc == 3 } {
if { [ info exist ::${jobid}(exception) ] } {
lappend ::${jobid}(exception) "Job aborted or timed out $msg"
} else {
set ::${jobid}(exception) "Job aborted or timed out $msg"
}
}
addLogEntry "$jobid processed return code=$rc, called with rc=$origrc $msg" blue
# array set ::$jobid [ list product [ list ] ]
if { [ regexp {8.4} $::tcl_version ] } {
foreach entry [ trace info variable ::${jobid}(product) ] {
foreach { oplist cmd } $entry { break }
trace remove variable ::${jobid}(product) $oplist $cmd
}
} else {
catch { set cmd [ eval lindex [ trace vinfo ::${jobid}(product) ] 1 ] } err
catch { trace vdelete ::${jobid}(product) w $cmd } err
}
;## unset here just in case there are no macro commands
after 100 "catch { unset ::$jobid }"
catch { set ::jobid {} }
;## return here may be ignored if not returning to manager
return [ list $rc $msg ]
}
proc ${API}::setProductType { ptr msg order } {
set rc 0
if { [ regexp {_p_Ldas} $ptr ] &&
! [ string length $msg ] } {
set rc 1
}
return [ expr $rc << $order ]
}
proc ${API}::writeState { jobid ptr { format binary } {comp none} } {
set seqpt {}
;## from wrapper files, do not process state data,
;## state ptr will be destroyed by killJob
if { [ info exist ::${jobid}(objectCnt) ] } {
return {}
}
set target ""
if { [ catch {
if { [ regexp {_p_Ldas} $ptr ] } {
set url "file:${jobid}State.ilwd"
set seqpt "url2file $jobid $ptr $url ilwd"
foreach { - ilwdfile } [ url2file $jobid $url ilwd ] { break }
set format $::STATEILWDFORMAT
set seqpt "ilwd::writeFile $jobid $ptr $ilwdfile $format $comp"
set target [ ilwd::writeFile $jobid $ptr $ilwdfile $format $comp ]
addLogEntry "state object written as $target"
}
} err ] } {
return -code error "$seqpt $ptr $err"
}
return [ file tail $target ]
}
proc ${API}::sendData { jobid ptr api } {
set maxtimes 1
set seqpt {}
if { [ string length $api ] } {
for { set i 0 } { $i < $maxtimes } { incr i 1 } {
if { [ catch {
# dataSend $jobid $api $ptr
foreach { api host port service } [ validService $api data ] { break }
ilwd::setjob $ptr $jobid
;## if { ! [ info exist ::${ptr}_lock ] } {
;## set ::${ptr}_lock 1
;## set ::eventmon::sendElement_$ptr $jobid
;## addLogEntry "registered $ptr as ::eventmon::sendElement_$ptr" purple
;## } else {
;## incr ::${ptr}_lock 1
;## }
set tid [ sendDataElement_t $ptr $port $host ]
::setAlert $tid ::$tid
::setTIDCallback $tid "::dataSendReaper $tid $jobid $ptr"
debugPuts "jobid '[ ilwd::getjob $ptr ]' attached to $ptr, to $api, thread $tid"
;## force one call in case thread finishes before trace is set
;## ::dataSendReaper $tid $jobid $ptr
} err ] } {
if { [ regexp {validService} $err ] } {
return -code error "[ myName ]: $err"
} else {
debugPuts "error: '$err' after $i retries"
if { $i == $maxtimes } {
set msg "could not send data to $api API after "
append msg "$i retries, quitting!"
return -code error $msg
}
after 1000
}
}
}
}
}
proc ${API}::ckaddDataErrs {} {
set seqpt {}
uplevel {
;## throw an exception if there are some errors in state, metadata
set seqpt "check for errors"
foreach { type errmsg } [ list state_p stateMsg mdd_p mddMsg metadata_p metadataMsg ] {
if { [ string length [ set $errmsg ] ] } {
lappend ::${jobid}(exception) [ set $errmsg ]
}
}
if { [ info exist ::${jobid}(exception) ] && [ llength [ set ::${jobid}(exception) ]] } {
if { ! [ regexp {(?n)^\s*$} [ join ::${jobid}(exception) ] ] } {
set rc 1
set msg [ join [ set ::${jobid}(exception) ] \n ]
set msg "invalid objects: $msg"
error $msg
}
}
}
}
proc ${API}::handleMdd { jobid mdd_p bits } {
set seqpt {}
if { [ catch {
set mdderrs [ list ]
set mddtarget [ getMddTarget $jobid ]
;## if send to frame, set ilwd comment to returnprotocol
;## do not send dummy ilwd to frame
;## frame automatically process ilwd without macro
set i 1
if { $bits & 2 } {
if { $::DEBUG >= 3 } {
dumpILwd $jobid $mdd_p wrapped
}
if { [ string match ligolw $mddtarget ] } {
after 0 [ list eventmon::sendData $jobid $mdd_p $mddtarget ]
if { $::DEBUG >= 3 } {
dumpILwd $jobid $mdd_p wrapped
}
} elseif { [ string match {frame} $mddtarget ] } {
set rp [ set ::${jobid}(-returnprotocol) ]
if { [ string match "*Ldas*" $mdd_p ] } {
if { [ catch {
set comment [ getElementAttribute $mdd_p comment ]
set comment [ list $rp $comment ]
setElementAttribute $mdd_p comment $comment
} err ] } {
lappend ::${jobid}(exception) "error adding url to comment: $err"
addLogEntry "error adding url to comment: $err"
}
}
;## destroy mdd_p after dataSend
set seqpt "sendData $jobid $mdd_p $mddtarget"
after 0 [ list eventmon::sendData $jobid $mdd_p $mddtarget ]
} else {
addLogEntry "Mdd target $mddtarget (not frame or ligolw), destroying $mdd_p" blue
destructElement $mdd_p
}
} else {
set dummy_mdd_p [ dummyObj ]
set seqpt "after 0 list sendData $jobid $dummy_mdd_p $mddtarget, dummy object"
after 0 [ list eventmon::sendData $jobid $dummy_mdd_p $mddtarget ]
}
addLogEntry "$seqpt, processed $i multi-dim objects"
} err ] } {
if { [ info exist dummy_mdd_p ] } {
destructElementWrap $dummy_mdd_p handleMdd-err
}
return -code error $err
}
}
proc ${API}::processProduct { jobid } {
set rc 0
set msg [ list ]
set seqpt {}
;## validate job options here
if { [ catch {
validateOpts $jobid
} err ] } {
addLogEntry $err red
set rc 3
set msg $err
set rc [ killJob $jobid $rc $msg ]
return $rc
}
if { [ catch {
if { [ info exist ::${jobid}(product) ] && \
[ llength [ set ::${jobid}(product) ] ] } {
if { [ catch {
set seqpt "set result ( set ::${jobid}(product) )"
set result [ set ::${jobid}(product) ]
foreach [ list id state_p stateMsg mdd_p mddMsg \
metadata_p metadataMsg ] $result { break }
set msg "Got job $id objects: state $state_p, "
append msg "mdd $mdd_p, metadata $metadata_p, "
append msg "messages: state $stateMsg, mdd $mddMsg metadata $metadataMsg."
debugPuts $msg
set msg [ list ]
set seqpt "writeState $jobid $state_p"
set statefile [ writeState $jobid $state_p ]
ckaddDataErrs
set bits 0
# bit order from left state, mdd, metadata
set seqpt "setProductType"
set bits [ expr $bits | [ setProductType $state_p $stateMsg 2 ] ]
set bits [ expr $bits | [ setProductType $mdd_p $mddMsg 1 ] ]
set bits [ expr $bits | [ setProductType $metadata_p $metadataMsg 0 ] ]
array set ::${jobid} [ list productType $bits ]
;## no mdd or metadata products
if { $bits > 0 && $bits != 4 } {
set seqpt "eventmon::handleMdd $jobid $mdd_p $bits"
eventmon::handleMdd $jobid $mdd_p $bits
if { ! [ string match "*Ldas*" $metadata_p ] } {
if { [ string length $mddtarget ] } {
set metadata_p [ dummyObj ]
set dummy_metadata 1
} else {
set rc 4
set msg "$jobid produced no products"
}
}
if { $::DEBUG >= 2 } {
set seqpt "dumpILwd $jobid $metadata_p metadata"
dumpILwd $jobid $metadata_p metadata
}
if { $rc != 4 } {
set seqpt "after 0 list eventmon::sendData $jobid $metadata_p $metadataapi"
;## Duncans Patch per PR# 1859
if { [ regexp -nocase {tee} $metadataapi ] } {
set seqpt "sendData $jobid $metadata_p ligolw"
after 0 [ list eventmon::sendData $jobid $metadata_p ligolw ]
set seqpt "sendData $jobid $metadata_p metadata"
after 0 [ list eventmon::sendData $jobid $metadata_p metadata ]
} else {
after 0 [ list eventmon::sendData $jobid $metadata_p $metadataapi ]
}
;## destroy actual object in killJob
;## and the created one in dataSendReaper
;## but destroy the tcl created one here
;## destructElementWrap $metadata_p metadata
;## remove non-dummy metadata object from list since
;## it is destroyed after threaded send
}
;## there are mdd and metadata so let dataSend remove them
set ::${jobid}(product) [ lrange $result 1 2 ]
} elseif { $bits == 4 && [ string length $statefile ] } {
;## state file only
set rc 4
set result [ macroReturnMsg $jobid file $statefile ]
foreach { rc msg } $result { break }
} else {
set rc 4
set msg "$jobid produced no products"
}
} err ] } {
addLogEntry "$seqpt $err" red
set rc 3
set msg $err
}
} else {
set rc 4
set msg [ list ]
;## Mary said that the text may have spaces and
;## newlines sprinkled willy-nilly or contain only
;## spaces and newlines!!
if { [ info exist ::${jobid}(exception) ] } {
set exceptions [ set ::${jobid}(exception) ]
set exceptions [ split $exceptions "\n" ]
foreach exception $exceptions {
set exception [ string trim $exception ]
regsub -all -- {\s+} $exception { } exception
if { [ regexp {\S+} $exception ] } {
set rc 1
lappend msg $exception
}
}
}
if { $rc == 1 } { set msg [ join $msg "\n" ] }
if { $rc == 4 } { set msg "$jobid produced no products" }
}
} err ] } {
addLogEntry $err 2
set rc 3
}
set rc [ killJob $jobid $rc $msg ]
return $rc
}
This proc should not be called externallyComments:
proc eventmon::checkBuckets { args } {
set seqpt {}
if { [ catch {
set this_time [ clock seconds ]
;## go through each data bucket and thread the shared
;## object calls
;## buckets maybe destroyed so just record a note for now
foreach bucket [ info vars ::*_DATABUCKET ] {
;## if bucket has something, process it
if { ! [ info exist $bucket ] } {
continue
}
if { [ llength [ set $bucket ] ] } {
regexp {\d+} $bucket id
set jobid ${::RUNCODE}$id
if { [ catch {
set dataptrs [ processDataBucket $jobid ]
} err ] } {
set msg "process data bucket error: $err, contents=[ set $bucket]"
addLogEntry $msg red
lappend ::${jobid}(exception) $msg
;## terminate standalone wrapper jobs
if { [ info exist ::${jobid}(-wrapperdata) ] } {
array set ::${jobid} [ list product {} ]
}
} else {
;## we read the data bucket fine, so let's go!
foreach datap $dataptrs {
set selfDestruct 1
if { [ catch {
;## if standalone wrapper, set jobid in attribute to
;## avoid shared object complain
;## process table jobid has original jobId
if { [ info exist ::${jobid}(-wrapperdata) ] } {
ilwd::setjob $datap $jobid
}
if { ! [ info exist ::${jobid}(exception) ] } {
set ::${jobid}(exception) [ list ]
}
set seqpt "addData_t($id $datap):"
if { [ info exist ::${jobid}(objectCnt) ] } {
set selfDestruct 0
}
set tid [ addData_t $id $datap $selfDestruct ]
if { $::DEBUG_THREADS } {
addLogEntry "addData thread $tid created for job $jobid" purple
}
set ::addData($tid,$jobid) [ list $datap $this_time ]
::setAlert $tid ::$tid
::setTIDCallback $tid "eventmon::addDataThreadReaper $tid $jobid $datap $this_time"
;## check once in case thread finishes before trace can be set
;## eventmon::addDataThreadReaper $tid $jobid $datap $this_time
if { [ info exist ::eventmon::tcl8.4 ] } {
set tracedata [ trace info variable ::${jobid}(threadsdone) ]
if { ![ string length $tracedata ] } {
trace add variable ::${jobid}(threadsdone) { write } \
[ list eventmon::standAloneCkDone $jobid ]
}
} else {
set tracedata [ trace vinfo ::${jobid}(threadsdone) ]
if { ![ string length $tracedata ] } {
trace variable ::${jobid}(threadsdone) w \
[ list eventmon::standAloneCkDone $jobid ]
}
}
} err ] } {
set msg "$seqpt: $err (destroying $datap now)"
addLogEntry $msg red
destructElementWrap $datap "$id,$datap,bucket-error"
lappend ::${jobid}(exception) $msg
;## terminate for wrapper jobs
if { [ info exist ::${jobid}(-wrapperdata) ] } {
array set ::${jobid} [ list product {} ]
}
}
}
}
}
}
} err ] } {
addLogEntry "$seqpt $err" red
}
}
proc eventmon::addDataThreadReaper { tid jobid datap start_time args } {
if { ! [ info exist ::$tid ] } {
;## addLogEntry "cannot locate ::$tid" purple
return
}
if { [ catch {
set color red
set state [ set ::$tid ]
lappend ::addDataStates($tid,$jobid) $state
set currtime [ gpsTime now ]
if { [ info exist ::addData($tid,$jobid) ] } {
foreach [ list $datap $start_time ] [ set ::addData($tid,$jobid) ] { break }
;## force thread to finish before job abort time
if { ! [ string equal FINISHED $state ] &&
! [ string equal $state $::TID_FINISHED ] } {
set runtime [ expr [ clock seconds ] - $start_time ]
if { $runtime > $::EVENTMON_THREAD_TIMEOUT_SECS } {
addLogEntry "Forcing addData thread $tid for job $jobid to finish \
after running for $runtime seconds since $start_time ! \
API could block if thread does not finish.: states [ set ::addDataStates($tid,$jobid) ]" orange
set state FINISHED
}
}
}
if { [ string equal FINISHED $state ] ||
[ string equal $state $::TID_FINISHED ] } {
set result [ addData_r $tid ]
::unset ::$tid
::unset ::addData($tid,$jobid)
::unset ::addDataStates($tid,$jobid)
if { $::DEBUG_THREADS } {
addLogEntry "addData $tid finished,result='$result'" purple
}
if { [ info exist ::${jobid} ] } {
;## standAloneCount
;## note here that setting a common variable in a trace callback is
;## visible to other threads issuing this callback as
;## the setting of the common var is also queued.
;## all pipelines except putStandAlone
if { ! [ info exist ::${jobid}(objectCnt) ] } {
if { [ string length $result ] } {
addLogEntry "result: '$result'" purple
;## this trigger trace to process job
set ::${jobid}(product) $result
}
} else {
;## so thread events can happen in order
after 0 [ list eval lappend ::${jobid}(threadsdone) $tid ]
;## after 0 [ list eventmon::standAloneCkDone $jobid ]
}
}
}
} err ] } {
addLogEntry "$tid $err" $color
catch { ::unset ::$tid }
catch { ::unset ::addDataStates($tid,$jobid) }
catch { ::unset ::addData($tid,$jobid) }
destructElementWrap $datap addDataErr
if { [ array exist ::${jobid} ] } {
lappend ::${jobid}(exception) $err
;## trigger job completion
set ::${jobid}(product) {}
}
}
}
Name: eventmon::expiredJobKiller
Description:
Parameters:
Usage:
Comments:
if job does not have updatedTime, then set it so it can be expired.
proc eventmon::expiredJobKiller {} {
set seqpt {}
if { [ catch {
set this_time [ clock seconds ]
foreach var [ info vars ::${::RUNCODE}* ] {
if { [ regexp "(${::RUNCODE}\\d+)$" $var -> jobid ] } {
if { [ info exist ::${jobid}(updatedTime) ] } {
set last_time [ set ::${jobid}(updatedTime) ]
set dt [ expr $this_time - $last_time ]
if { $dt > $::MANAGER_ABORT_AFTER_N_SECONDS_IN_ONE_API } {
set jobdata [ array get ::$jobid ]
set msg "$jobid aborted after $dt seconds"
debugPuts "$var $msg $jobdata"
set msg [ eventmon::killJob $jobid 3 $msg ]
addLogEntry $msg red
}
}
}
}
} err ] } {
addLogEntry "[ myName ]: $err" red
}
}
§ § §
Name: eventmon::standAloneCkDone
Description:
Parameters:
Usage:
Comments:
does not unset ::addData threads since there may be exception
so wait for threads at killJob.
proc eventmon::standAloneCkDone { jobid args } {
if { [ catch {
set numThreads [ llength [ set ::${jobid}(threadsdone) ] ]
set expected [ set ::${jobid}(objectCnt) ]
if { $::DEBUG > 1 } {
addLogEntry "threads done $numThreads, expected $expected" purple
}
if { $numThreads >= $expected } {
regexp {(\d+)} $jobid -> id
set result [ removeJob $id ]
addLogEntry "set ::${jobid}(product) with $result" purple
unset ::${jobid}(threadsdone)
set ::${jobid}(product) $result
}
} err ] } {
addLogEntry $err 2
}
}
§ § §
proc eventmon::standAloneCkDone2 { jobid } {
if { [ catch {
regexp {(\d+)} $jobid -> id
set msg [ ::listJob $id ]
addLogEntry "listJob: '$msg'" purple
if { [ regexp {receivedObjectCount=(\d+) expectedObjectCount=(\d+)} $msg -> objRecv objExpected ] } {
if { [ string equal $objRecv $objExpected ] } {
set ::${jobid}(addData) [ list ]
regexp {(\d+)} $jobid -> id
set result [ removeJob $id ]
addLogEntry "set ::${jobid}(product) with $result" purple
set ::${jobid}(product) $result
}
} else {
error "Unable to locate receivedObjectCount expectedObjectCount from shared object."
}
} err ] } {
addLogEntry $err 2
return -code error $err
}
}
§ § §
macro callbacks
Name: handleReturnCode
Description:
Handles the return code from processProduct
Forumlates the result to return back to manager
Comments:
proc handleReturnCode { jobid } {
set seqpt {}
foreach { rc msg } [ eventmon::processProduct $jobid ] { break }
set subj ""
if { [ info exist ::${jobid}(-subject) ] } {
set subj [ set ::${jobid}(-subject) ]
}
if { ! [ string length $subj ] } {
set subj completed
}
switch $rc {
0 { if { [ string length $msg ] > 1 } {
set result [ list 1 $msg $subj ]
} else {
set result [ list 0 0 0 ]
}
}
4 {
set result [ list 4 $msg $subj ] }
default { set result [ list 3 "${jobid}: eventmon [ myName ]: $msg" error! ] }
}
return $result
}
proc processJobProducts { jobid cid args } {
set seqpt {}
if { [ catch {
set ::$cid [ handleReturnCode $jobid ]
} err ] } {
set ::$cid [ list 3 "${jobid}: [ myName ]:$err" error! ]
}
;## unset here after waiting for job to finished
catch { unset ::$jobid }
catch { set ::jobid {} }
reattach $jobid $cid
}
proc eventmon::memSnapShot {} {
if { [ catch {
set numjobs 0
set numobjects 0
set numthreads 0
set jobs [ list ]
set jobid_rx (${::RUNCODE}\\d+)
foreach var [ info vars ::${::RUNCODE}* ] {
if { [ regexp $jobid_rx\$ $var -> jobid ] } {
lappend jobs $jobid
incr numjobs 1
}
}
set objects [ getElementList ]
regsub {\{} $objects {} objects
regsub {\}} $objects {} objects
set numobjects [ llength $objects ]
set threads [ getThreadList ]
if { [ regexp tid $threads ] } {
set threads [ split $threads ]
set numthreads [ llength $threads ]
}
addLogEntry "$numjobs jobs \{$jobs\}, $numobjects objects, $numthreads threads" blue
} err ] } {
addLogEntry $err 2
}
}
metadata::outIlwd $stmtp "results.out"Comments:
proc dataSendReaperX { tid jobid datap args } {
set myName [ myName ]
if { [ catch {
set state [ set ::$tid ]
if { [ string equal FINISHED $state ] ||
[ string equal $state $::TID_FINISHED ] } {
sendDataElement_r $tid
::unset ::$tid
if { [ info exist ::${datap}_lock ] } {
incr ::${datap}_lock -1
} else {
set ::${datap}_lock 0
}
if { ! [ set ::${datap}_lock ] } {
unset ::${datap}_lock
destructElement $datap
debugPuts "$jobid $tid completed for sendDataElement_r $datap, destructed."
unset ::eventmon::sendElement_$datap
addLogEntry "unset ::eventmon::sendElement_$datap" purple
} else {
debugPuts "$tid completed for sendDataElement_r $datap, not destructed yet."
}
}
} err ] } {
addLogEntry "$tid $err" 2
if { [ info exist ::${datap}_lock ] } {
incr ::${datap}_lock -1
if { ! [ set ::${datap}_lock ] } {
unset ::eventmon::sendElement_$datap
debugPuts "$jobid $tid error, $datap removed, unset ::eventmon::sendElement_$datap."
unset ::${datap}_lock
catch { destructElement $datap }
}
} else {
debugPuts "$tid error, $datap removed but there is no lock on it."
catch { destructElement $datap }
}
catch { ::unset ::$tid }
}
}
proc dataSendReaper { tid jobid datap args } {
set myName [ myName ]
if { ! [ info exist ::$tid ] } {
;## addLogEntry "cannot locate ::$tid" purple
return
}
if { [ catch {
set state [ set ::$tid ]
if { [ string equal FINISHED $state ] ||
[ string equal $state $::TID_FINISHED ] } {
sendDataElement_r $tid
set unset_tid 1
::unset ::$tid
unset unset_tid
set unset_element 1
destructElement $datap
unset unset_element
debugPuts "$tid completed for sendDataElement_r $datap."
}
} err ] } {
addLogEntry "$tid $err" 2
if { [ info exist unset_tid ] } {
catch { ::unset ::$tid }
}
if { [ info exist unset_element ] } {
catch { destructElement $datap }
}
}
}
proc eventmon::defunctJobs { args } {
if { [ catch {
if { [ info exists ::SORTED_LIVE_JOB_LIST_REPORT ] } {
set oldest [ lindex $::SORTED_LIVE_JOB_LIST_REPORT 0 ]
regexp {\d+} $oldest oldest
if { [ string length $oldest ] } {
set jobs [ info vars ::${::RUNCODE}* ]
# eval set elements [ getElementList ]
foreach var [ info vars ::eventmon::sendElement_* ] {
regexp {::eventmon::sendElement_(\S+)} $var -> element
if { [ catch {
set jobid [ getElementAttribute $element jobid ]
lappend jobs $jobid
set datap($jobid) $element
} err ] } {
addLogEntry "$element getElementAttribute jobid error: $err" red
}
unset $var
}
set jobs [ lsort -unique $jobs ]
foreach jobid $jobs {
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
if { $job + 10 < $oldest } {
if { [ info exists ::DEBUG_DEFUNCT_JOB_REAPER ] } {
addLogEntry "removing defunct jobid: $jobid" purple
}
if { [ info exist ::${jobid} ] } {
eventmon::killJob $jobid 3 "defunct jobid $jobid"
} elseif { [ info exist datap($job) ] } {
addLogEntry "destroy job $job defunct object $datap($job)"
set ptr [ set datap($job) ]
if { [ info exist ::${ptr}_lock ] } {
unset ::${ptr}_lock
} else {
addLogEntry "no lock on $job datap $ptr" blue
}
destructElement $datap($job)
}
}
}
}
}
} err ] } {
set subject "$::LDAS_SYSTEM $::API API error!"
set report "[ myName ]: $err"
addLogEntry "Subject: ${subject}; Body: $report" email
}
}
proc eventmon::wakeupDataRecvThreads {} {
if { [ array exist ::dataRecvStates ] } {
foreach tid [ array names ::dataRecvStates ] {
foreach { sid currtime } [ set ::dataRecvStates($tid) ] { break }
dataRecvReaper $tid $sid
}
}
}
proc eventmon::wakeupaddDataThreads {} {
if { [ array exist ::addDataStates ] } {
foreach entry [ array names ::addDataStates ] {
foreach { tid jobid } [ split $entry , ] { break }
foreach { datap currtime } [ set ::addDataStates($entry) ] { break }
addLogEntry "waking up jobid $jobid, thread $tid, '[ array get ::${jobid} ]'" purple
eventmon::addDataThreadReaper $tid $jobid $datap $currtime
}
}
}
Back to Top