|
Description: datacond.tcl Version 1.0
Provides Tcl functions to access the data conditioning
API.
Comments:
When a user command is received by the data conditioning
API, the following events occur:
set ::RCS_ID_datacondtcl {$Id: datacond.tcl,v 1.331 2006/07/19 17:47:40 mlei Exp $}
set ::RCS_ID_datacondtcl [ string trim $::RCS_ID_datacondtcl "\$" ]
package provide datacond 1.0
package require datacondAPI
namespace eval dc {
set state(NULL) [ list ]
}
proc datacond::init { } {
;## set this symbol to 1 to get full ilwd object dumping
if { ! [ info exists ::DEBUG_DUMP_OBJECTS ] } {
set ::DEBUG_DUMP_OBJECTS 0
}
;## set this symbol to 1 to get diagnostic messages about
;## all CallChain actions
if { ! [ info exists ::DEBUG_TRACE_CPP ] } {
set ::DEBUG_TRACE_CPP 0
}
;## set this symbol to 1 to reduce the number of threaded
;## calls by about 80%
if { ! [ info exists ::DEBUG_NO_THREADS ] } {
set ::DEBUG_NO_THREADS 0
}
;## set this symbol to 1 to dump the table of contents
;## of the object which will be sent to the wrapper
if { ! [ info exists ::DEBUG_SHOW_WRAPPER_DATA_TOC ] } {
set ::DEBUG_SHOW_WRAPPER_DATA_TOC 0
}
;## set this symbol to 1 to trace the destruction of
;## all ilwd objects in the datacond API
if { ! [ info exists ::DEBUG_TRACE_DESTRUCT ] } {
set ::DEBUG_TRACE_DESTRUCT 0
}
if { ! [ info exists ::FREEMEM_LOOP_DELAY_S ] } {
set ::FREEMEM_LOOP_DELAY_S 2
}
if { ! [ info exists ::DC_WRAPPER_COMM_PORT_OFFSET ] } {
set msg "::DC_WRAPPER_COMM_PORT_OFFSET not defined\n"
append msg "please set ::DC_WRAPPER_COMM_PORT_OFFSET in\n"
append msg "in the LDASapi.rsc file"
return -code error $msg
}
set port \
[ expr { $::BASEPORT + $::DC_WRAPPER_COMM_PORT_OFFSET } ]
set ::datacond(wrappercomm) $port
openListenSock wrappercomm $port
bgLoop waitforfreemem dc::waitForFreeMem $::FREEMEM_LOOP_DELAY_S
bgLoop defunctjobs datacond::defunctJobs 300
}
proc dc::parseOpts { jobid } {
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
set options [ list ]
if { [ catch {
set opts [ array names ::$jobid * ]
;## to help people write macros without having
;## to be excessively rigorous.
set required {
-dbquery
-dbspectrum
-dbntuple
-dbqualitychannel
-responsefunction
-responsefiles
-state
}
;## load the databucket from the inputprotocol if
;## there is anything there!
set inputprotocol [ set ::${jobid}(-inputprotocol) ]
if { [ string length $inputprotocol ] } {
if { [ llength $inputprotocol ] == 1 } {
set inputprotocol [ lindex $inputprotocol 0 ]
}
;## cannot put this in after as err will empty data bucket
eval lappend ::${jobid}_DATABUCKET $inputprotocol
addLogEntry "lappend inputprotocol '$inputprotocol' bucket '::${jobid}_DATABUCKET' has [ set ::${jobid}_DATABUCKET ]" purple
set ::${jobid}(-inputprotocol) [ list ]
}
foreach opt $opts {
set arg [ set ::${jobid}($opt) ]
;## sanitizer for badly formed args...
regsub -all -- {([\"\}])([\)\,\;])} $arg {\1 \2} arg
lappend options $opt $arg
}
} err ] } {
after 5000 emptyDataBucket $jobid
return -code error "[ myName ]:${jobid}: $err"
}
return $options
}
proc dc::run { jobid } {
;## uplevel #1 is the operator socket handler
set cid [ uplevel #1 set cid ]
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
if { [ catch {
set args [ dc::parseOpts $jobid ]
} err ] } {
return -code error "error parsing argument list: '$err'"
}
;## initialise the state array
set ::dc::state($job,cid) $cid
set ::dc::state($job,end) 0
set ::dc::state($job,abort) 0
set ::dc::state($job,dropdata) 0
set ::dc::state($job,cc) [ list ]
set ::dc::state($job,inputptrs) [ list ]
set ::dc::state($job,resultptr) [ list ]
set fname {}
set seqpt {}
set outputflag 0
set opts {
-inputprotocol {}
-times {}
-interferometers {}
-dataquery {}
-aliases {}
-algorithms {}
-dbquery {}
-dbspectrum {}
-dbntuple {}
-dbqualitychannel {}
-setsingledc {}
-returnprotocol {}
-outputformat ilwd
-jobid {}
-state {}
-responsefunction {}
-responsefiles {}
-dropdata 0
-datacondtarget {}
}
regsub -all {\\} $args {__bS} args
if { [ catch {
;## expandOpts is our getopts-like function
foreach { opt val } [ expandOpts ] {
set opt [ string trim $opt - ]
set $opt $val
set ::dc::state($job,$opt) $val
}
} err ] } {
return -code error "[ myName ]: $err"
}
;## missing arguments must be handled explicitly
if { ! [ string length $algorithms ] && \
! [ string length $dbquery ] && \
! [ string length $dbspectrum ] && \
! [ string length $dbntuple ] && \
! [ string length $dbqualitychannel ] } {
return -code error "[ myName ]: no algorithms specified"
}
;## handle the substitution the manager did on the
;## input semicolon delimiters and update the state
;## array
regsub -all {\|} $algorithms {;} algorithms
regsub -all {\|} $aliases {;} aliases
regsub -all {\,} $aliases {__cOmMa} aliases
set ::dc::state($job,algorithms) $algorithms
set ::dc::state($job,aliases) $aliases
if { [ catch {
;## handle datacondtarget of mpi, which is illegal
set target $::dc::state($job,datacondtarget)
if { [ regexp -nocase {mpi} $target ] } {
set msg "\n***'mpi' is no longer an accepted value for the\n"
append msg "-datacondtarget option. please rewrite your\n"
append msg "command scripts to use 'wrapper' instead.***\n"
return -code error $msg
}
;## get the objects to be ingested out of the data bucket
dc::input $jobid
if { [ llength $::dc::state($job,objptrs) ] } {
dc::startCallback $jobid
} else {
if { [ regexp {8.4} $::tcl_version ] } {
trace add variable ::dc::state($job,objptrs) { write } \
[ list dc::startCallback $jobid ]
} else {
trace variable ::dc::state($job,objptrs) w \
[ list dc::startCallback $jobid ]
}
}
if { $::dc::state($job,end) == 1 } {
dc::endCallback $jobid
} else {
if { [ regexp {8.4} $::tcl_version ] } {
trace add variable ::dc::state($job,end) { write } \
[ list dc::endCallback $jobid ]
} else {
trace variable ::dc::state($job,end) w \
[ list dc::endCallback $jobid ]
}
}
;## error, possibly while executing call chain!
} err ] } {
addLogEntry $err 2
set ::$cid [ list 3 $err error! ]
reattach $jobid $cid
;## there could be all sorts of garbage even
;## at a very early stage, since we can get
;## data asynchronously.
dc::cleanup $jobid
}
}
proc dc::input { jobid { i 0 } } {
if { [ catch {
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
if { ! [ info exists ::dc::state($job,objptrs) ] } {
set ::dc::state($job,objptrs) [ list ]
}
;## memory based throttling
if { [ info exists ::DATACOND_MEMORY_LOCK ] } {
set wait $::DATACOND_MEMORY_LOCK
} else {
set wait 0
}
;## Stuart Anderson asked that we NOT do local
;## memory throttling :TODO:
set wait 0
;## we are going to do non-busy waiting until data
;## comes in. special user command macros can
;## populate the bucket for immediate processing.
if { $wait == 0 && \
[ info exists ::${jobid}_DATABUCKET ] && \
[ llength [ set ::${jobid}_DATABUCKET ] ] } {
set ptrs [ processDataBucket $jobid ]
dc::dataFromFile $jobid $ptrs
return {}
}
;## nothing showed up yet, so we go away for 1 second.
if { $i >= $::DATABUCKET_TIMEOUT } {
set ptrs [ dc::dataFromFile $jobid {} ]
if { [ llength $ptrs ] == 0 } {
set msg "timed out reading input data after $i seconds"
if { $wait == 1 } {
set msg "timed out waiting for sufficient free"
append msg " memory after $i seconds"
}
set ::dc::state($job,err) $msg
set ::dc::state($job,end) 1
}
return {}
}
} err ] } {
;## we can throw an error on our first pass, but
;## after that we bgerror (not too likely!)
if { [ string length $err ] } {
return -code error "[ myName ]: $err"
} else {
;## short circuit and avoid another turn through the
;## event loop.
return {}
}
}
;## and off we go into the event loop
incr i 3
after 3000 [ list dc::input $jobid $i ]
}
proc dc::waitForFreeMem { args } {
if { [ catch {
if { ! [ info exists ::MINIMUM_FREE_MEM_K ] } {
set minfree 256000
} else {
set minfree $::MINIMUM_FREE_MEM_K
}
set free [ freeMemOnBox ]
if { $free <= $minfree } {
set wait 1
if { [ info exists ::WAITING_FOR_FREE_MEM ] && \
$::WAITING_FOR_FREE_MEM == 0 } {
set ::WAITING_FOR_FREE_MEM [ clock seconds ]
} elseif { ! [ info exists ::WAITING_FOR_FREE_MEM ] } {
set ::WAITING_FOR_FREE_MEM [ clock seconds ]
}
} else {
set wait 0
set ::WAITING_FOR_FREE_MEM 0
}
if { [ info exists ::DEBUG_MINIMUM_FREE_MEM_K ] && \
[ string equal 1 $::DEBUG_MINIMUM_FREE_MEM_K ] && \
$::WAITING_FOR_FREE_MEM == 0 } {
set msg "flag: $wait free: $free k minfree: $minfree k"
addLogEntry $msg blue
}
if { $::WAITING_FOR_FREE_MEM != 0 } {
set msg "flag: $wait free: $free k minfree: $minfree k"
addLogEntry $msg orange
}
if { ! [ info exists ::DATACOND_MEMORY_LOCK ] || \
! [ string equal $wait $::DATACOND_MEMORY_LOCK ] } {
set ::DATACOND_MEMORY_LOCK $wait
set sid [ sock::open manager emergency ]
puts $sid "orange NULL NULL set ::DATACOND_MEMORY_LOCK $wait"
::close $sid
}
if { $::WAITING_FOR_FREE_MEM != 0 } {
set now [ clock seconds ]
if { ($now - $::WAITING_FOR_FREE_MEM) > 900 } {
set subject "LDAS $::LDAS_SYSTEM datacondAPI "
append subject "INSUFFICIENT FREE MEMORY!"
set msg "${subject}\n\n"
append msg "Available memory on the datacond API host\n"
append msg "machine $::DATACOND_API_HOST has fallen to\n"
append msg "$free kilobytes.\n\n"
append msg "The variable ::MINIMUM_FREE_MEM_K in the\n"
append msg "LDASdatacond.rsc file is currently set to\n"
append msg "${::MINIMUM_FREE_MEM_K}.\n\n"
append msg "The LDAS manager API will suspend all user\n"
append msg "commands which depend upon the datacond API\n"
append msg "until sufficient free memory is available!"
addLogEntry "Subject: ${subject}; Body: $msg " email
set ::WAITING_FOR_FREE_MEM $now
}
}
} err ] } {
catch { ::close $sid }
set wait 0
set ::WAITING_FOR_FREE_MEM 0
addLogEntry $err red
}
return $wait
}
proc dc::dataFromFile { jobid ptrs } {
if { [ catch {
regexp {\d+} $jobid job
;## get specified files assuming ilwd
set ip [ set ::dc::state($job,inputprotocol) ]
foreach filename $ip {
lappend $ptrs [ ilwd::readFile $filename $jobid ]
}
;## we trigger dc::startCallback by populating
;## ::dc::state($job,objptrs) ONLY if there are
;## any!!
if { [ string length [ lindex $ptrs 0 ] ] } {
set ::dc::state($job,objptrs) $ptrs
} else {
set ptrs [ list ]
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $ptrs
}
proc dc::startCallback { jobid args } {
set traced 0
if { [ catch {
regexp {\d+} $jobid job
;## were we called from a trace, or directly?
if { [ regexp {8.4} $::tcl_version ] } {
foreach entry [ trace info variable ::dc::state($job,objptrs) ] {
foreach { oplist cmd } $entry { break }
trace remove variable ::dc::state($job,objptrs) $oplist $cmd
set traced 1
}
} else {
set traced [ trace vinfo ::dc::state($job,objptrs) ]
if { [ string length $traced ] } {
trace vdelete \
::dc::state($job,objptrs) w "dc::startCallback $jobid"
set traced 1
}
}
;## handle -responsfiles option
dc::ingestResponseFiles $jobid
;## and ingest other (frame, metadata) input data
dc::ingestBucketData $jobid
;## do alias substitution, defaulting to ch[N] mapping
dc::aliasSubst $jobid
;## parse the algorithms option
dc::parseAlgorithm $jobid
;## and create a series of call chain links
dc::forgeChain $jobid
;## and load the call chain
dc::loadCallChain $jobid
if { [ info exists ::DEBUG_SYMBOL_TABLE ] && \
! [ string equal 0 $::DEBUG_SYMBOL_TABLE ] } {
set msg [ CallChain_DumpSymbolTable $::dc::state($job,cc) ]
addLogEntry "Symbol Table: ($msg)" purple
}
if { $::DEBUG_NO_THREADS } {
dc::execute_nothreads $jobid
} else {
dc::execute $jobid
}
} err ] } {
if { $traced } {
set ::dc::state($job,err) $err
set ::dc::state($job,end) 1
} else {
return -code error "[ myName ]: $err"
}
}
}
proc dc::expandAlgorithms { jobid elements alii } {
;## the alii are those returned by dc::ingestBucketData
if { ! [ info exists ::${jobid}(-autoexpand) ] || \
! [ string length [ set ::${jobid}(-autoexpand) ] ] || \
[ set ::${jobid}(-autoexpand) ] == 0 } {
return $alii
}
regexp {\d+} $jobid job
set algorithms [ list ]
set aliases [ list ]
if { [ catch {
set protoalias $::dc::state($job,aliases)
set protoalgos $::dc::state($job,algorithms)
set chN _chN
regsub -all {\s+} $protoalias {} protoalias
set protoalias [ string trim $protoalias " ;" ]
regexp {([^=]+)=([^\;]+)} $protoalias -> protoalias chN
set chN "($protoalias|$chN)"
set elements [ lsort -dictionary $elements ]
set N 0
foreach channel $elements {
foreach protoalgo $protoalgos {
regsub -all -- $chN $protoalgo \$channel protoalgo
if { [ catch {
lappend algorithms [ subst $protoalgo ]
} err ] } {
set err "$err while substituting -algorithms prototype"
return -code error $err
}
}
lappend aliases $protoalias$N=$channel
incr N
}
set alii [ concat $alii $aliases ]
set ::dc::state($job,aliases) $alii
set ::dc::state($job,algorithms) $algorithms
} err ] } {
return -code error "[ myName ]: $err"
}
return $aliases
}
proc dc::endCallback { jobid args } {
if { [ catch {
set files {}
set tails {}
set outputflag 0
regexp {\d+} $jobid job
;## make sure we don't trigger again.
;## this cancels all traces.
unset ::dc::state($job,end)
set ::dc::state($job,end) 1
set rp $::dc::state($job,returnprotocol)
set cid $::dc::state($job,cid)
set of $::dc::state($job,outputformat)
set targ $::dc::state($job,datacondtarget)
set result $::dc::state($job,resultptr)
set drop $::dc::state($job,dropdata)
;## default subject for gridftp interaction needs
;## to be grid_output per PR #2148
set subj [ set ::${jobid}(-subject) ]
if { ! [ string length $subj ] } {
if { [ regexp -nocase {gridftp} $rp ] } {
set subj grid_output
} else {
set subj results
}
}
if { [ info exists ::dc::state($job,err) ] } {
;## prevent bad list formation from erroring
regsub -all -- {([\"\}])(\S)} \
$::dc::state($job,err) {\1 \2} ::dc::state($job,err)
if { [ string length $::dc::state($job,err) ] > 3 } {
return -code error $::dc::state($job,err)
}
}
set outputflag 1
set files [ dc::output $jobid ]
foreach file $files { lappend tails [ file tail $file ] }
if { $drop } { set targ datacond }
;## if -datacondtarget is 'datacond' and NOTHING is
;## intended to go to the mpi API, we can short circuit
;## by sending a return code of 4 to the manager.
set returncode [ dc::endCallbackDataApi $jobid ]
if { $returncode == 4 } {
set ::dc::state($job,dropdata) 1
set drop 1
}
if { [ string equal frame $targ ] } {
;## I *think* this is the place where dropped files
;## were getting lost when frames were also produced
if { [ string length [ lindex $tails 0 ] ] } {
set jobdir [ jobDirectory ]
regsub $::HTTPDIR $jobdir {} jobdir
set msg "Your results:\n${tails}\ncan be found at:\n"
append msg "$::HTTPURL$jobdir"
set ::$cid [ list 0 $msg 0 ]
} else {
set ::$cid [ list 0 0 0 ]
}
} elseif { [ regexp -nocase -- {LIGO_LW} $of ] || \
! [ string equal datacond $targ ] } {
if { [ string length [ lindex $tails 0 ] ] } {
set jobdir [ jobDirectory ]
regsub $::HTTPDIR $jobdir {} jobdir
set msg "Your results:\n${tails}\ncan be found at:\n"
append msg "$::HTTPURL$jobdir"
set ::$cid [ list 0 $msg 0 ]
} else {
set ::$cid [ list 0 0 0 ]
}
} elseif { $drop } {
set url [ lindex $files 0 ]
set msg "Your results:\n${tails}\ncan be found at:\n"
append msg "[ join [ lrange [ split $url / ] 0 end-1 ] / ]"
;## for gridftp result reporting
set url2 [ lindex $files 1 ]
if { [ string length $url2 ] && \
[ regexp {gridftp} $url2 ] } {
set msg "Your results:\n${url2}\n\n$msg"
}
set ::$cid [ list $returncode $msg $subj ]
} else {
set jobdir [ jobDirectory ]
regsub $::HTTPDIR $jobdir {} jobdir
set msg "Your results:\n${tails}\ncan be found at:\n"
append msg "$::HTTPURL$jobdir"
set lvl $returncode
if { ! [ regexp -nocase {datacond} $targ ] } {
set lvl 2
set subj 0
}
set ::$cid [ list $lvl $msg $subj ]
}
reattach $jobid $cid
;## if the -datacondtarget was datacond or frame
;## then we do not need to wait around for the
;## wrapper to pickup, so just clean up.
if { ! [ regexp -nocase {wrapper} $targ ] } {
dc::cleanup $jobid
}
;## error, possibly while executing call chain!
} err ] } {
dc::endCallbackErr $jobid $rp $files $cid $outputflag $err
}
}
proc dc::endCallbackDataApi { jobid } {
if { [ catch {
set dataapi UNKNOWN
set dctarget UNKNOWN
set wrapperoutput 0
set returncode 0
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
if { [ info exists ::dc::state($job,dataapi) ] } {
set dataapi $::dc::state($job,dataapi)
}
if { [ info exists ::dc::state($job,datacondtarget) ] } {
set dctarget $::dc::state($job,datacondtarget)
}
;## need some state variable corresponding to
;## wrapper output...
if { [ info exists ::dc::state($job,dataready) ] } {
set wrapperoutput 1
}
if { [ string equal datacond $dataapi ] && \
[ string equal datacond $dctarget ] && \
$wrapperoutput == 0 } {
set returncode 4
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $returncode
}
proc dc::endCallbackErr { jobid rp files cid outputflag err } {
if { [ catch {
regexp {\d+} $jobid job
set primary_msg "(no primary specified, "
append primary_msg "or specified primary was not "
append primary_msg "of time, frequency, or time-frequency "
append primary_msg "series type)"
regsub -nocase {no primary specified} $err $primary_msg err
set err2 {}
;## check for a partial result
set tmp $::dc::state($job,resultptr)
if { [ regexp {_p_LdasContainer} $tmp ] && ! $outputflag } {
set of [ list ilwd ascii ]
;## generate a sensible file name for the partial
;## result.
set rproot [ file rootname [ file tail $rp ] ]
if { [ string length $rproot ] } {
set rproot ${rproot}_partial_result.ilwd
} else {
set rproot partial_result.ilwd
}
set rp http://$rproot
catch { set files [ dc::output $jobid ] } err2
}
;## but if we succeeded in creating a partial
;## output file, let the user know.
if { [ string length $files ] } {
set msg "[ myName ]: $err"
append msg " (partial output returned as: '$files')"
addLogEntry $msg red
set ::$cid [ list 3 $msg error! ]
} else {
;## unless dc::output failed.
addLogEntry "$err $err2" red
set ::$cid [ list 3 "[ myName ]: $err $err2" error! ]
}
reattach $jobid $cid
after 0 datacond::killJob $jobid
} err ] } {
addLogEntry "OOPS! Can't get here!! : $err" email
}
}
proc dc::ingestData { jobid data } {
set elements [ list ]
set alii [ list ]
set qchans [ list ]
set seqpt {}
if { [ catch {
regexp {\d+} $jobid job
set cc $::dc::state($job,cc)
if { $::DEBUG_DUMP_OBJECTS } {
foreach datap $data {
dc::0xdb $jobid $datap http://${datap}_input_dump.ilwd
}
}
;## instantiate new Call Chain
if { ! [ string length $cc ] } {
set seqpt "new_CallChain():"
if { $::DEBUG_TRACE_CPP } {
debugPuts "--> start $seqpt"
}
set cc [ new_CallChain ]
set ::dc::state($job,cc) $cc
if { $::DEBUG_TRACE_CPP } {
debugPuts "<-- end $seqpt"
}
set setsingle [ set ::dc::state($job,setsingledc) ]
set seqpt "CallChain_SetSingleFrameMode($cc $setsingle):"
CallChain_SetSingleFrameMode $cc $setsingle
;## and if we have a "target", set it so that output
;## is formatted properly.
set targ [ set ::dc::state($job,datacondtarget) ]
if { [ string length $targ ] } {
set seqpt "CallChain_SetReturnTarget($cc $targ):"
if { $::DEBUG_TRACE_CPP } {
debugPuts "--> start $seqpt"
}
;## and set Return target as early as possible!
CallChain_SetReturnTarget $cc $targ
if { $::DEBUG_TRACE_CPP } {
debugPuts "<-- end $seqpt"
}
}
}
set seqpt {}
if { [ info exists ::DEBUG_INGESTION_RATE ] && \
[ string equal 1 $::DEBUG_INGESTION_RATE ] } {
__t::start ingest$jobid
}
foreach datap $data {
set alias [ list ]
set elems [ list ]
set pass_thrus 0
if { [ catch {
;## if we catch here the object is already destructed
;## so just get along...
set seqpt "getContainerSize($datap):"
set size [ getContainerSize $datap ]
set seqpt {}
} err ] } {
addLogEntry "already destructed object: '$datap'!" red
continue
}
;## defer quality channel creation until all other data
;## is ingested.
set flag 0
set qcs [ dc::testForQualityChannel $jobid $datap ]
foreach { name pushpass ilwdp } $qcs {
if { ! [ string length $ilwdp ] } { continue }
if { [ string length $name ] } {
if { [ lsearch -regexp $qchans ^\\s*$name ] > -1 } {
set msg "quality channel name: '$name' duplicated!"
return -code error $msg
}
lappend qchans [ list $name $ilwdp ]
set flag 1
continue
}
}
if { $flag } { continue }
foreach { type pushpass alias } \
[ dc::testForDBQuery $jobid $datap ] { break }
;## if the user got the alias and pushpass options
;## in the wrong order, the metadata api will pass
;## them in the ilwd comment in the wrong order!
if { [ regexp {^(push|pass)$} $alias ] } {
set tmp $alias
set alias $pushpass
set pushpass $tmp
}
if { [ string length $type ] } {
foreach { alias elems } [ dc::ingestDBData \
$jobid $type $pushpass $alias $cc $datap ] { break }
;## associate the db query derived alias with the name
if { [ string length $alias ] \
&& [ string length $elems ] } {
lappend alii ${alias}=$elems
}
continue
}
;## this condition is unreachable!! :TODO:
;## what was it for??
if { [ string equal _pass $alias ] } {
set seqpt "CallChain_PassThroughILwd($cc $datap):"
if { $::DEBUG_TRACE_CPP } {
debugPuts "--> start $seqpt"
}
CallChain_PassThroughILwd $cc $datap
incr pass_thrus
if { $::DEBUG_TRACE_CPP } {
debugPuts "<-- end $seqpt"
}
set alias [ list ]
} else {
set seqpt "CallChain_IngestILwd($cc $datap):"
if { $::DEBUG_TRACE_CPP } {
debugPuts "--> start $seqpt"
}
set elems [ CallChain_IngestILwd $cc $datap ]
if { $::DEBUG_TRACE_CPP } {
debugPuts "<-- end $seqpt"
}
}
if { [ string length $elems ] > 3 } {
debugPuts \
"[ llength $elems ] elements ingested from $datap: $elems"
} elseif { $pass_thrus } {
debugPuts \
"$pass_thrus elements passing through from $datap"
} else {
set elems [ list ]
addLogEntry "$cc: no data elements found in '$datap'" 2
}
regsub -all {\\} $elems {__bS} elems
regsub -all {\,} $elems {__cOmMa} elems
eval lappend elements $elems
::dc::destructElement $jobid $datap
}
::dc::qualityChannel $jobid $cc $qchans
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
if { [ info exists ::DEBUG_INGESTION_RATE ] && \
[ string equal 1 $::DEBUG_INGESTION_RATE ] } {
__t::end "call chain ingested data in" ingest$jobid
catch { __t::cancel ingest$jobid }
}
return [ list $elements $alii ]
}
proc dc::testForDBQuery { jobid ilwdp } {
if { [ catch {
;## the alias will be used when pushpass is "push"
set dbop [ list ]
set pushpass [ list ]
set alias [ list ]
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
if { [ catch {
set seqpt "getElementMetadata($ilwdp dboperation):"
set dbop [ getElementMetadata $ilwdp dboperation ]
if { [ string equal qualitychannel $dbop ] } {
set dbop [ list ]
}
} err ] } {
;## oops, it's not a dbquery
}
if { [ catch {
set seqpt "getElementMetadata($ilwdp pushpass):"
set pushpass [ getElementMetadata $ilwdp pushpass ]
} err ] } {
;## oops, it must be a pass
set pushpass pass
}
if { [ catch {
set seqpt "getElementMetadata($ilwdp alias):"
set alias [ getElementMetadata $ilwdp alias ]
} err ] } {
;## oops, it must be a pass
}
;## if the user got the alias and pushpass options
;## in the wrong order, the metadata api will pass
;## them in the ilwd comment in the wrong order!
if { [ regexp {^(push|pass)$} $alias ] } {
set tmp $alias
set alias $pushpass
set pushpass $tmp
}
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
return [ list $dbop $pushpass $alias ]
}
proc dc::ingestDBData { jobid type pushpass alias cc ilwdp } {
if { [ catch {
regexp {\d+} $jobid job
set seqpt {}
set methods \
[ list IngestDBSpectrum IngestDBQuery IngestNTuples ]
;## pushpass defaults to pass
if { ! [ string length $pushpass ] } {
set pushpass pass
}
;## pushing without an alias is nonsense
if { ! [ string length $alias ] } {
if { [ string equal push $pushpass ] } {
return -code error "'push' specified, but no alias given!"
}
}
;## passing with an alias is nonsense
if { [ string equal pass $pushpass ] } {
;## Ed says, "not at all!"
#set alias [ list ]
}
regexp -nocase -- {db(\S+)} $type -> type
foreach method $methods {
if { [ regexp -nocase -- $type $method ] } {
break
}
set method none
}
if { [ string equal none $method ] } {
return -code error "Cannot ingest database derived '$type'"
}
set datap [ dc::registerObject $jobid $ilwdp ]
set seqpt "CallChain_${method}($cc $datap $alias $pushpass):"
if { $::DEBUG_TRACE_CPP } {
debugPuts "--> start $seqpt"
}
set elems [ CallChain_$method $cc $datap $alias $pushpass ]
if { $::DEBUG_TRACE_CPP } {
debugPuts "<-- end $seqpt"
}
::dc::destructElement $jobid $datap
addLogEntry "${pushpass}ed objects ingested from $datap"
} err ] } {
return -code error "[ myName ]:$seqpt $err"
}
return [ list $alias $elems ]
}
proc dc::testForQualityChannel { jobid ilwdp } {
if { [ catch {
set seqpt {}
set dbop [ list ]
set pushpass [ list ]
set alias [ list ]
set datap [ list ]
regexp {\d+} $jobid job
set jobid $::RUNCODE$job
set seqpt "getContainerSize($ilwdp):"
set size [ getContainerSize $ilwdp ]
if { [ catch {
set seqpt "getElementMetadata($ilwdp dboperation):"
set dbop [ getElementMetadata $ilwdp dboperation ]
if { ! [ string equal qualitychannel $dbop ] } {
return {}
}
} err ] } {
;## oops, it's not a dbquery
return {}
}
if { [ catch {
set seqpt "getElementMetadata($ilwdp pushpass):"
set pushpass [ getElementMetadata $ilwdp pushpass ]
} err ] } {
;## oops, it must be a pass
set pushpass pass
}
if { [ catch {
set seqpt "getElementMetadata($ilwdp alias):"
set alias [ getElementMetadata $ilwdp alias ]
} err ] } {
;## oops, it must be a pass
}
set seqpt {}
;## if the user got the alias and pushpass options
;## in the wrong order, the metadata api will pass
;## them in the ilwd comment in the wrong order!
if { [ regexp {^(push|pass)$} $alias ] } {
set tmp $alias
set alias $pushpass
set pushpass $tmp
}
;## pushing without an alias is nonsense
if { ! [ string length $alias ] } {
if { [ string equal push $pushpass ] } {
return -code error "'push' specified, but no alias given!"
} elseif { [ string equal pass $pushpass ] } {
set alias quality_channel
} else {
return -code error "invalid push/pass: '$pushpass'"
}
}
set datap [ dc::registerObject $jobid $ilwdp ]
} err ] } {
if { [ string length $err ] } {
return -code error "[ myName ]:$seqpt $err"
}
}
return [ list $alias $pushpass $datap ]
}
given:Comments:
set algo {x=foo(1,2); y=foo.apply(out, in); z=x+y; x=y \ output(z,1,2,3,4)} running:
::dc::parseAlgorithm $algo returns:
{foo (1,2)x} {foo.apply (out,in)y} {add (x,y)z} {y x} \ {output (z,1,2,3,4)}
proc dc::parseAlgorithm { jobid } {
set atoms [ list ]
set atom [ list ]
set retval [ list ]
set tmp [ list ]
regexp {\d+} $jobid job
;## sanitize input
if { [ catch {
set algorithms [ set ::dc::state($job,algorithms) ]
set atoms [ dc::optPreProcess $jobid $algorithms ]
set atoms [ split $atoms ";" ]
;## get methods and arguments
foreach atom $atoms {
if { ! [ string length $atom ] } { continue }
if { [ regexp {^\s*output\(} $atom ] } {
} else {
if { [ regexp {[\=]+} $atom ] } {
set atom [ dc::mathops $atom ]
set atom [ split $atom ";" ]
}
}
eval lappend tmp $atom
}
set atoms $tmp
set tmp [ list ]
foreach atom $atoms {
if { ! [ string length $atom ] } { continue }
set method [ list ]
set arguments [ list ]
;## leading and trailing whitespace should go
string trim $atom
;## break methods and arguments with a space
regsub -all {(\S)(\(\S+)} $atom {\1 \2} atom
set method [ string trim [ lindex $atom 0 ] ]
set arguments [ lrange $atom 1 end ]
;## remove spaces from parenthesized arguments
regsub -all {\s+} $arguments {} arguments
;## the return value will be a list of lists
lappend retval [ list $method $arguments ]
}
set ::dc::state($job,atoms) $retval
} err ] } {
set err "[ myName ]: $err while handling atom: '$atom'"
return -code error $err
}
}
proc dc::mathops { atom } {
if { [ catch {
regsub -all {\s+} $atom {} atom
regexp {([^\s]+)(=)([^\s]+)} $atom -> lhs op rhs
switch -exact -- $op {
+ {
set method add
set atom ${method}($lhs,$rhs)
return -code error "infix math op '+' not supported"
}
- {
set method sub
set atom ${method}($lhs,$rhs)
return -code error "infix math op '-' not supported"
}
* {
set method mul
set atom ${method}($lhs,$rhs)
return -code error "infix math op '*' not supported"
}
/ {
set method div
set atom ${method}($lhs,$rhs)
return -code error "infix math op '/' not supported"
}
** {
set method pow
set atom ${method}($lhs,$rhs)
return -code error "infix math op '**' not supported"
}
^ {
set method pow
set atom ${method}($lhs,$rhs)
return -code error "infix math op '^' not supported"
}
= {
set atom [ list $rhs $lhs ]
}
default {}
}
;## handle simple assignment based on simple math op
;## i.e. z=x+y becomes add(x,y)z, which is expanded
;## to {CallChain_AppendCallFunction _XXX_Chain_p add {x y} z}
if { [ regexp {(.+)=(.+)} $lhs -> ret arg1 ] } {
set atom ${method}($arg1,$rhs)$ret
}
} err ] } {
return -code error "[ myName ]:$err"
}
return $atom
}
proc dc::aliasSubst { jobid } {
set parms [ list ]
set aliases [ list ]
set algorithms [ list ]
set symb [ list ]
set subst [ list ]
set act [ list ]
set seqpt "-->(state_dump: aliases: '\$aliases'
algorithms: '\$algorithms'
symb: '\$symb'
subst: '\$subst'
action: '\$act'
params: '\$parms')"
regsub -all {[\n\t\s]+} $seqpt { } seqpt
if { [ catch {
regexp {\d+} $jobid job
set aliases [ set ::dc::state($job,aliases) ]
set elements [ set ::dc::state($job,elements) ]
set algorithms [ set ::dc::state($job,algorithms) ]
;## attempt to split the user supplied aliases
set aliases [ split [ dc::optPreProcess $jobid $aliases ] ";" ]
;## these two lines are solely for debugging
;## sub backslashes and semicolons
regsub -all {__bS} $aliases {\\} dbg
regsub -all {__cOmMa} $dbg {,} dbg
regsub -all {__sPaCe} $dbg { } dbg
regsub -all {__sC} $dbg {;} dbg
debugPuts $dbg
foreach alias $aliases {
;## make sure user is not creating forbidden alias names
;## with a leading underscore.
if { ! [ string length $alias ] } { continue }
regexp {(.+)=(.+)} $alias -> symb subst
if { [ regexp {^_} $symb ] } {
return -code error "user supplied aliases may not begin with '_'"
}
}
set i -1
while { [ incr i ] < [ llength $elements ] } {
lappend aliases _ch$i=[ lindex $elements $i ]
}
;## preprocess the algorithms list
set algorithms \
[ split [ dc::optPreProcess $jobid $algorithms ] ";" ]
;## and perform substitutions
foreach alias $aliases {
if { ! [ string length $alias ] } { continue }
set symb [ list ]
set subst [ list ]
set tmp [ list ]
regexp {(.+)=(.+)} $alias -> symb subst ]
set subst [ dc::aliasMatchPattern $jobid $subst $elements ]
;## make certain that aliases do not map to the lhs
;## of an assignment.
foreach algorithm $algorithms {
if { ! [ string length $algorithm ] } { continue }
set rhs [ list ]
set lhs [ list ]
set rhs $algorithm
if { ! [ regexp output $algorithm ] } {
regexp {(.+)=(.+)} $algorithm -> lhs rhs
}
;## test for matches on lhs. we wouldn't substitute
;## them anyway, but we are being s-t-r-i-c-t
if { [ regexp ^${symb}\$ $lhs ] } {
return -code error \
"[ myName ]: lhs aliases forbidden('$symb' matched '$lhs')"
}
;## parse the right hand side into action and params
set act {}
set parms [ list ]
regexp {([^\(]+)\((.+)\)$} $rhs -> act parms
;## SUBSTITUTE
set done 0
;## output actions do not get all their args substituted
if { [ regexp output $act ] } {
lappend tmp \
[ dc::outputParmSub $jobid $subst $symb $rhs ]
set done 1
;## all other actions get their parms fully substituted
} elseif { ! $done } {
lappend tmp \
[ dc::actionParmSub $jobid $parms $act $subst $symb $lhs $rhs ]
} else {
lappend tmp $algorithm
}
} ;## end of foreach on algorithms
;## set algorithms to substituted list
set algorithms $tmp
} ;## end of foreach on aliases/chnames
set ::dc::state($job,algorithms) [ join $algorithms ";" ]
} err ] } {
return -code error "[ myName ]:[ subst -nocommands $seqpt ] $err"
}
}
proc dc::aliasMatchPattern { jobid alias elements } {
set matches [ list ]
if { [ catch {
if { [ regexp {^_ch\d+$} $alias ] } {
set elements [ list ]
}
foreach element $elements {
regsub -all {[\)\(]} $alias {\\&} alias_rx
if { [ regexp -nocase -- $alias_rx $element ] } {
lappend matches $element
}
}
set msg "alias: '$alias' matched [ llength $matches ] "
append msg "ingested elements: ($matches)"
if { [ llength $matches ] > 1 } {
addLogEntry $msg red
append msg ". alias must match uniquely!"
return -code error $msg
}
if { [ llength $matches ] == 1 } {
if { [ info exists ::DEBUG_ALIAS_PATTERN_MATCH ] } {
if { [ string equal 1 $::DEBUG_ALIAS_PATTERN_MATCH ] } {
addLogEntry $msg green
}
}
set retval [ lindex $matches 0 ]
}
if { [ llength $matches ] == 0 } {
if { ! [ regexp {^_ch\d+$} $alias ] } {
if { [ info exists ::DEBUG_ALIAS_PATTERN_MATCH ] } {
if { [ string equal 1 $::DEBUG_ALIAS_PATTERN_MATCH ] } {
addLogEntry $msg blue
}
}
}
set retval $alias
}
} err ] } {
return -code error "[ myName ]: $err alias: ($alias)"
}
return $retval
}
proc dc::actionParmSub { jobid parms action subst symbol lhs rhs } {
if { [ catch {
set retval [ list ]
set parms [ split $parms "," ]
set tmpparms [ list ]
foreach parm $parms {
set parm [ string trim $parm ]
if { ! [ string length $parm ] } { set parm _null }
regsub {^\s*_\s*$} $parm _null parm
regsub ^${symbol}\$ $parm $subst parm
lappend tmpparms $parm
}
set parms [ join $tmpparms "," ]
set rhs ${action}($parms)
if { [ string length $lhs ] } {
set retval $lhs=$rhs
} else {
set retval $rhs
}
} err ] } {
return -code error "[ myName ]: $err"
}
return $retval
}
proc dc::outputParmSub { jobid subst symb rhs } {
if { [