LDAS logo
TclDOC logo

The eventmon.tcl Script

Modification Date: 11/26/2008

Table of Procedures

red ball ${API}::atExit
red ball ${API}::checkBuckets
red ball ${API}::ckaddDataErrs
red ball ${API}::expandWrapperData
red ball ${API}::handleMdd
red ball ${API}::init
red ball ${API}::killJob
red ball ${API}::processProduct
red ball ${API}::sendData
red ball ${API}::setProductType
red ball ${API}::validateOpts
red ball ${API}::validateOpts
red ball ${API}::writeState
red ball dataSendReaper
red ball dumpILwd
red ball eventmon::addDataThreadReaper
red ball eventmon::defunctJobs
red ball eventmon::expiredJobKiller
red ball eventmon::setDatabaseRate
red ball eventmon::standAloneCkDone
red ball eventmon::wakeupDataRecvThreads
red ball eventmon::wakeupaddDataThreads
red ball handleReturnCode
red ball processJobProducts

This is the Laser Interferometer Gravitational Oservatory (LIGO) Event Monitor server Tcl Script.
This is the main script called by eventmonAPI to bring in the eventmon packages which all uses eventmon namespace.
This module sources the following sub-modules:

  1. eventmon.tcl
API Interactions:
eventmonAPI server interacts with the following APIs
  1. wrapperAPI
  2. ligolwAPI or frameAPI
  3. metadataAPI event scenarios for a job Case 1 got macro cmd, got objects Response: processed objects, macro returned Case 2 no macro cmd, got objects Response: job timed out and job removed Case 3 got macro cmd, but no objects Response: macro timed out and job removed Case 4 error Response: error returned, job removed Data structures used in eventmon tcl layer:
    1. Job related data are kept in array ::${jobid}
    2. Thread related data are kept array ::threadremoveJob and array ::threadJob which correlates thread to a jobid Every second, coinciding with handleDataSock, processJobs is called to handle objects, job data and threads.
      Product processing is directed by tcl commands macro getJobOpts but the command may or may not come in (if error somewhere).
      NOTE: A lot of synchronization is done via trace so the order of tcl code is critical in some procs.
      package provide eventmon 1.0
      namespace eval eventmon {}
      package require eventmonAPI
      
      
      §   §   §

      Name: dumpILwd

      Description:
      output ilwd file for debugging Usage:
      
      Comments:
      
      proc dumpILwd { jobid ptr type { format ascii } { comp none } } {
          if  { [ regexp {_p_Ldas} $ptr ] } {
              set ilwdfile ${jobid}${type}.ilwd
              if  { [ catch {
                  set target [ ilwd::writeFile $jobid $ptr $ilwdfile $format $comp ]
              } err ] } {
                  addLogEntry "failed to write $ilwdfile: $err"
              }
          }
      }
      
      
      §   §   §

      Name: eventmon::setDatabaseRate

      Description:
      set the rate of insertion per sec to reject high volume insertions Usage:
      
      Comments:
      
      proc eventmon::setDatabaseRate { args } {
          if  { [ catch {
              setMaxDatabaseRows $::MAX_DB_INSERTIONS_PER_SEC
              addLogEntry "Max number of insertions/secs set to $::MAX_DB_INSERTIONS_PER_SEC (-1 = no limit)" blue
              set ::eventmon::max_db_Insertions_per_sec $::MAX_DB_INSERTIONS_PER_SEC
          } err ] } {
              set body "$err, please reset to valid numeric value if not set"
              set subject "$::LDAS_SYSTEM failed to adjust database insertion rate"
              addLogEntry "Subject: ${subject}; Body: $body" email
              if  { [ info exist ::eventmon::max_db_Insertions_per_sec ] } {
                  set ::MAX_DB_INSERTIONS_PER_SEC $::eventmon::max_db_Insertions_per_sec
              }
              return -code error $body
          }
      }
      
      
      §   §   §

      Name: ${API}::init

      Description:
      initialization start up standalone utilities to:
      monitor API logs for email monitor database disk space, database manager, insertion rates.
      Usage:
      
      Comments:
      
      must be short because manager needs to communicate right away
      proc ${API}::init {} {
          if  { ! [ info exist ::MAX_DB_INSERTIONS_PER_SEC ] } {
              set ::MAX_DB_INSERTIONS_PER_SEC -1
          }
          eventmon::setDatabaseRate
          if	{ [ regexp {8.4} $::tcl_version ] } {
          	set ::eventmon::tcl8.4 1
          	trace add variable ::MAX_DB_INSERTIONS_PER_SEC { write } eventmon::setDatabaseRate
          } else {
          	trace variable ::MAX_DB_INSERTIONS_PER_SEC w "eventmon::setDatabaseRate"
          }
          if  { ! [ info exists ::MANAGER_ABORT_AFTER_N_SECONDS_IN_ONE_API ] } {
              set ::MANAGER_ABORT_AFTER_N_SECONDS_IN_ONE_API 1000
          }
          if  { ! [ info exist ::EVENTMON_THREAD_TIMEOUT_SECS ] } {
              set  ::EVENTMON_THREAD_TIMEOUT_SECS  50
          }
          if  { ! [ info exist ::DEBUG_THREADS ] } {
              set  ::DEBUG_THREADS 0
          }
      	;## checkBuckets is automatically invoked by data socket handler
      	;## Thread Handlers
      	bgLoop eventmon_expiredJobKiller "eventmon::expiredJobKiller" 10
          bgLoop eventmon_defunctjobs eventmon::defunctJobs 300
          ;## bgLoop eventmon_wakeupDataRecvThreads eventmon::wakeupDataRecvThreads 60
          ;## bgLoop eventmon_wakeupaddDataThreads eventmon::wakeupaddDataThreads 120
      }
      
      
      §   §   §

      Name: ${API}::atExit

      Description:
      dump more thread info via ssh Usage:
      
      Comments:
      
      proc ${API}::atExit {} {
          set threads [ getThreadList ]
          addLogEntry "cleaning up threads $threads" blue
          set text ""
          foreach { tid func status } $threads {
              regexp {\(([^\)]+)\)} $func -> threadfunc
              append text "calling thread function ${threadfunc}_r for $tid"
              catch { ${threadfunc}_r $tid }
          }
          if  { [ string length $text ] } {
              addLogEntry $text blue
          }
      }
      
      
      §   §   §

      Name: ${API}::validateOpts

      Description:
      validate pipeline or cmd options and set defaults -metadataapi -mddapi or -multidimdatatarget -returnprotococol Usage:
      
      Comments:
      
      proc ${API}::dummyObj  {} {
      	if	{ [ catch {
      		set contp [ ilwd::newcontp dummy ]
      		setElementMetadata $contp ignore yes
      		set dummyline "<lstring name='dummy' size='5'>dummy</lstring>"
      		set elemp [ putElement $dummyline ]
      		addContainerElement $contp $elemp
      		destructElement $elemp
      	} err ] } {
      		catch { destructElement $elemp }
      		return -code "error creating dummy: $err"
      	}
      	return $contp 		
      }
      
      
      §   §   §

      Name: ${API}::validateOpts

      Description:
      validate pipeline or cmd options and set defaults -metadataapi -mddapi or -multidimdatatarget -returnprotococol Usage:
      
      Comments:
      
      proc ${API}::validateOpts { jobid } {
           uplevel {
              set mddtarget  [ getMddTarget $jobid ]
              if { [ string length $mddtarget ] } {
                 if { ! [ regexp $::MDD_TARGETS $mddtarget ] } {
                    set msg "Invalid -multiDimDataTarget or "
                    append msg "-mddapi option '$mddtarget', "
                    append msg "must be frame or ligolw."
                    error $msg
                 }
                 if { [ string match {frame} $mddtarget ] } {
                    set rp [ set ::${jobid}(-returnprotocol) ]
                    if { ! [ string length $rp ] } {
                       set msg "No returnprotocol for "
                       append msg "-multiDimDataTarget frame option."
                       error $msg
                    }
                 }
              } else {
                 ;## no mdd, check for wrapper data from files
                 if { [ info exist ::${jobid}(-wrapperdata) ] } {
                    set wrapperdata [ set ::${jobid}(-wrapperdata) ]
                    if { ! [ string length $wrapperdata ] } {
                       error "No files specified for -wrapperdata option."
                    }
                 }
              }
              set ::${jobid}(mddtarget) $mddtarget
              ;## either metadata or ligolw for metadata target
              if { [ info exist ::${jobid}(-metadataapi) ] } {
                 set metadataapi [ set ::${jobid}(-metadataapi) ]
                 if { ! [ regexp {(ligolw|metadata|tee)} $metadataapi ] } {
                    set msg "Invalid -metadataapi option: "
                    append msg "must be ligolw, metadata or "
                    append msg "tee for both."
                    error $msg
                  }
              } else {
                 set metadataapi metadata
              }
           }
      }
      
      
      §   §   §

      Name: ${API}::expandWrapperData

      Description:
      expand -wrapperdata option by listing all filenames Usage:
      
      Comments:
      
      proc ${API}::expandWrapperData { jobid } {
      	set wrapperdata [ set ::${jobid}(-wrapperdata) ]
      	set files [ list ]
      	foreach entry $wrapperdata {
      		if	{ [ file isdirectory $entry ] } {
      			foreach file [ glob $entry/*.ilwd ]  {
      				lappend files $file
      			}
      		} elseif { [ file exists $entry ] } {
                  set ext [ file extension $entry ]
                  if  { [ string match .ilwd $ext ] } {
      			    lappend files $entry
                  } else {
                      return -code error "Some non ilwd is specified by -wrapperdata option: $entry"
                  }
      		}
      	}
      	if	{ [ llength $files ] } {
      		return $files
      	} else {
      		return -code error "No files specified for -wrapperdata option."
      	}	
      }
      
      
      §   §   §

      Name: ${API}::killJob

      Description:
      remove job data structures when no longer processing it either due to completion or abort Usage:
      This proc can be called:
      
      1. job terminates normally with macro and objects
      2. error processing the job object
      3. job objects are timed out, no tcl command
      4. macro timed out, no objects. The return is "$rc $msg" to user cmd 0 - continue with command, no msg 1 - continue with command, msg 3 - exception, abort 4 - no products, end of chain
      Comments:
      original products from shared object are destroyed here other objects created by tcl are cleaned when done.
      returns actual return code and msg.
      proc ${API}::killJob { jobid { rc 3 } { msg "" } } {
          set origrc $rc
          if  { ! [ string length [ array names ::${jobid} ] ] } {
              return [ list 4 "[myName] $jobid failed due to no array ::$jobid" ]
          }
          foreach entry [ array names ::addData *${jobid} ] {
      			foreach { tid jobid } [ split $entry , ] { break }
                  addLogEntry "waiting for addData thread $tid to finish" blue
                  catch { addData_r $tid } result
                  unset ::$tid
                  addLogEntry "$tid addData_r result: '$result'" blue
                  foreach entry $result {
                  	if  { [ regexp {_p_Ldas} $entry ] } {
                          catch { destructElement $entry }
                      }
                  }
                  unset ::addData($tid,$jobid)
          }
          ;## remove the container objects
          catch { emptyDataBucket $jobid }
      	
          ;## remove the result objects if any for exception completion
          ;## if there is product, shared object has removed the wrapper objects.
      	
          if  { [ info exist ::${jobid}(productType) ] } {
              set bits [ set ::${jobid}(productType) ]
              ;## no mdd or metadata, end the pipeline
              if  { $bits == 0 || $bits == 4 } {
                  set rc 4
              }
          } else {
      		;## issue removeJob only if there is an exception
      		if	{ $rc == 3 } {
      			regexp {(\d+)} $jobid -> id
      			catch { ::listJob $id } msg1
      			catch { ::killJob $id } err1
                  addLogEntry "internal data from listJob: $msg1; killJob $err1" purple
                  ;## remaining threads could be from dataRecv
                  if  { [ array exist ::dataRecvStates ] } {
                      addLogEntry "current dataRecv thread states [ array get ::dataRecvStates ] "  purple
                  }
      		}
          }
          ;## always remove the end products created by shared object
          if  { [ info exist ::${jobid}(product) ] } {
              addLogEntry "products to remove [ set ::${jobid}(product) ]"
              foreach ptrs [ set ::${jobid}(product) ] {
      	        foreach ptr $ptrs {
      		        if  { [ regexp {_p_Ldas} $ptr ] } {
              	        destructElementWrap $ptr endproducts 1
          	        }
      	        }
              }
          }
          ;## remove after process if job terminates before it
          catch { after cancel [ set ::${jobid}(afterId) ] } err
          ;## wake up any tcl command waiting on it here to return to manager
          ;## do not do earlier than this
          if  { $origrc == 3 } {
              if  { [ info exist ::${jobid}(exception) ] } {
                  lappend ::${jobid}(exception) "Job aborted or timed out $msg"
              } else {
                  set ::${jobid}(exception) "Job aborted or timed out $msg"
              }
          }
          addLogEntry "$jobid processed return code=$rc, called with rc=$origrc $msg" blue
          # array set ::$jobid [ list product [ list ] ]
          if  { [ regexp {8.4} $::tcl_version ] } {
          	foreach entry [ trace info variable ::${jobid}(product) ] {
              	foreach { oplist cmd } $entry { break }
                  trace remove variable ::${jobid}(product) $oplist $cmd
              }
      	} else {
          	catch { set cmd [ eval lindex [ trace vinfo ::${jobid}(product) ] 1 ] } err
              catch { trace vdelete ::${jobid}(product) w $cmd } err
          }
          ;## unset here just in case there are no macro commands
          after 100 "catch { unset ::$jobid }"
          catch { set ::jobid {} }	
          ;## return here may be ignored if not returning to manager
          return [ list $rc $msg ]
      }
      
      
      §   §   §

      Name: ${API}::setProductType

      Description:
      writes state object to state repository Usage:
      
      Comments:
      
      only writes if non-null object
      proc ${API}::setProductType { ptr msg order } {
      	
          set rc 0
          if  { [ regexp {_p_Ldas} $ptr ] &&
      		  ! [ string length $msg ] } {
      		set rc 1
      	}
      	return [ expr $rc << $order ]
      }
      
      
      §   §   §

      Name: ${API}::writeState

      Description:
      writes state object to state repository Usage:
      
      Comments:
      
      ptr is destroyed in killJob, do not destroy it here.
      proc ${API}::writeState { jobid ptr { format binary } {comp none} } {
           set seqpt {}
      	;## from wrapper files, do not process state data,
           ;## state ptr will be destroyed by killJob
      	if	{ [ info exist ::${jobid}(objectCnt) ] } {
      		return {}
      	}
      	set target ""
          if  { [ catch {
              if  { [ regexp {_p_Ldas} $ptr ] } {
                  set url "file:${jobid}State.ilwd"
                  set seqpt "url2file $jobid $ptr $url ilwd"
                  foreach { - ilwdfile } [ url2file $jobid $url ilwd ] { break }
                  set format $::STATEILWDFORMAT
                  set seqpt "ilwd::writeFile $jobid $ptr $ilwdfile $format $comp"
                  set target [ ilwd::writeFile $jobid $ptr $ilwdfile $format $comp ]
                  addLogEntry "state object written as $target"
              }
          } err ] } {
              return -code error "$seqpt $ptr $err"
          }
      	return [ file tail $target ]
      }
      
      
      §   §   §

      Name: ${API}::sendData

      Description:a
      send data to the next api e.g. mdd data to frame or ligolw metadata to metadataAPI Usage:
      
      Comments:
      
      only writes if non-null object object is cleaned up in killJob dont retry on validService error
      proc ${API}::sendData { jobid ptr api } {
           set maxtimes 1
           set seqpt {}
           if { [ string length $api ] } {
              for { set i 0 } { $i < $maxtimes } { incr i 1 } {
                 if { [ catch {
                    # dataSend $jobid $api $ptr
                    foreach { api host port service } [ validService $api data ] { break }
                    ilwd::setjob $ptr $jobid
                    ;## if    { ! [ info exist ::${ptr}_lock ] } {
                    ;##      set ::${ptr}_lock 1
                    ;##      set ::eventmon::sendElement_$ptr $jobid
                    ;##      addLogEntry "registered $ptr as ::eventmon::sendElement_$ptr" purple
                    ;## } else {
                    ;##      incr ::${ptr}_lock 1
                    ;## }
                    set tid [ sendDataElement_t $ptr $port $host ]
                    ::setAlert $tid ::$tid
                    ::setTIDCallback $tid "::dataSendReaper $tid $jobid $ptr"
      	          debugPuts "jobid '[ ilwd::getjob $ptr ]' attached to $ptr, to $api, thread $tid"
                    ;## force one call in case thread finishes before trace is set
                    ;## ::dataSendReaper $tid $jobid $ptr
                 } err ] } {
                    if { [ regexp {validService} $err ] } {
                       return -code error "[ myName ]: $err"
                    } else {
                       debugPuts "error: '$err' after $i retries"
                       if { $i == $maxtimes } {
                          set msg "could not send data to $api API after "
                          append msg "$i retries, quitting!"
                          return -code error $msg
                       }
                       after 1000
                    }
                 }
              }
           }
      }
      
      
      §   §   §

      Name: ${API}::ckaddDataErrs

      Description:a
      check for errors from addData and save them Usage:
      
      Comments:
      
      proc ${API}::ckaddDataErrs {} {
      	set seqpt {}
      	uplevel {
             	;## throw an exception if there are some errors in state, metadata
              set seqpt "check for errors"
              foreach { type errmsg } [ list state_p stateMsg mdd_p mddMsg metadata_p metadataMsg ] {
              	if  { [ string length [ set $errmsg ] ] } {
                      lappend ::${jobid}(exception) [ set $errmsg ]
                 	}
      		}
      		if	{ [ info exist ::${jobid}(exception) ] && [ llength [ set ::${jobid}(exception) ]] } {
      			if	{ ! [ regexp {(?n)^\s*$} [ join ::${jobid}(exception) ] ] } {
      				set rc 1
      				set msg [ join [ set ::${jobid}(exception) ] \n ]
                      set msg "invalid objects: $msg"
                      error $msg
      			}
      		}
      	}
      }
      
      
      §   §   §

      Name: ${API}::handleMdd

      Description:a
      handle multi-dim data Usage:
      
      Comments:
      
      wrapped mdd data is destroyed after thread send is completed
      proc ${API}::handleMdd { jobid mdd_p bits } {
           set seqpt {}
           if { [ catch {
              set mdderrs [ list ]
              set mddtarget  [ getMddTarget $jobid ]
              ;## if send to frame, set ilwd comment to returnprotocol
              ;## do not send dummy ilwd to frame
              ;## frame automatically process ilwd without macro
              set i 1
              if { $bits & 2 } {
                 if { $::DEBUG >= 3 } {
                     dumpILwd $jobid $mdd_p wrapped
                 }
                 if { [ string match ligolw $mddtarget ] } {
                    after 0 [ list eventmon::sendData $jobid $mdd_p $mddtarget ]
                    if { $::DEBUG >= 3 } {
                       dumpILwd $jobid $mdd_p wrapped
                    }
                 } elseif  { [ string match {frame} $mddtarget ] } {
                    set rp [ set ::${jobid}(-returnprotocol) ]
                    if { [ string match "*Ldas*" $mdd_p ] } {
                       if { [ catch {
                          set comment [ getElementAttribute $mdd_p comment ]
                          set comment [ list $rp $comment ]
                          setElementAttribute $mdd_p comment $comment
                       } err ] } {
                          lappend ::${jobid}(exception) "error adding url to comment: $err"
                          addLogEntry "error adding url to comment: $err"
                       }
                    }
                    ;## destroy mdd_p after dataSend
                    set seqpt "sendData $jobid $mdd_p $mddtarget"
                    after  0 [ list eventmon::sendData $jobid $mdd_p $mddtarget ]
                 } else {
                     addLogEntry "Mdd target $mddtarget (not frame or ligolw), destroying $mdd_p" blue
                     destructElement $mdd_p
                 }
              } else {
                 set dummy_mdd_p [ dummyObj ]
                 set seqpt "after 0 list sendData $jobid $dummy_mdd_p $mddtarget, dummy object"
                 after 0 [ list eventmon::sendData $jobid $dummy_mdd_p $mddtarget ]
              }
              addLogEntry "$seqpt, processed $i multi-dim objects"
           } err ] } {
              if { [ info exist dummy_mdd_p ] } {
                 destructElementWrap $dummy_mdd_p handleMdd-err
              }
              return -code error $err
           }
      }
      
      
      §   §   §

      Name: ${API}::processProduct

      Description:
      process the end product by sending data to the next api e.g. mdd data to frame or ligolw metadata to metadataAPI Usage:
      
      Comments:
      
      only process if there are end products job related objects and data are cleaned up in killJob exception handling rc are return codes to usercmd 0 - completed with products sent 1 - completed with products sent, some errors.
      3 - exception, early termination 4 - no products, terminate pipeline
      proc ${API}::processProduct { jobid } {
           set rc 0
           set msg [ list ]
           set seqpt {}
           ;## validate job options here
           if { [ catch {
              validateOpts $jobid
            } err ] } {
               addLogEntry $err red
               set rc 3
               set msg $err
               set rc [ killJob $jobid $rc $msg ]
               return $rc
            }
            if { [ catch {
            if { [ info exist ::${jobid}(product) ] && \
               [ llength  [ set ::${jobid}(product) ] ] } {
               if { [ catch {
                  set seqpt "set result ( set ::${jobid}(product) )"
                  set result [ set ::${jobid}(product) ]
                  foreach [ list id state_p stateMsg mdd_p mddMsg \
                                 metadata_p metadataMsg ] $result { break }
                  set msg "Got job $id objects: state $state_p, "
                  append msg "mdd $mdd_p, metadata $metadata_p, "
                  append msg "messages: state $stateMsg, mdd $mddMsg metadata $metadataMsg."
                  debugPuts $msg
                  set msg [ list ]
                  set seqpt "writeState $jobid $state_p"
                  set statefile [ writeState $jobid $state_p ]
                  ckaddDataErrs
                  set bits 0
                  # bit order from left state, mdd, metadata
                  set seqpt "setProductType"
                  set bits [ expr $bits | [ setProductType $state_p $stateMsg 2 ] ]
                  set bits [ expr $bits | [ setProductType $mdd_p $mddMsg 1 ] ]
                  set bits [ expr $bits | [ setProductType $metadata_p $metadataMsg 0 ] ]
                  array set ::${jobid} [ list productType $bits ]
                  ;## no mdd or metadata products
                  if { $bits > 0 && $bits != 4 } {
                     set seqpt "eventmon::handleMdd $jobid $mdd_p $bits"
                     eventmon::handleMdd $jobid $mdd_p $bits
                     if { ! [ string match "*Ldas*" $metadata_p ] } {
                        if { [ string length $mddtarget ] } {
                           set metadata_p [ dummyObj ]
                           set dummy_metadata 1
                        } else {
                           set rc 4
                           set msg "$jobid produced no products"
                        }
                     }
                     if { $::DEBUG >= 2 } {
                        set seqpt "dumpILwd $jobid $metadata_p metadata"
                        dumpILwd $jobid $metadata_p metadata
                     }
                     if { $rc != 4 } {
                        set seqpt "after 0 list eventmon::sendData $jobid $metadata_p $metadataapi"
                        ;## Duncans Patch per PR# 1859
                        if { [ regexp -nocase {tee} $metadataapi ] } {
                           set seqpt "sendData $jobid $metadata_p ligolw"
                           after 0 [ list eventmon::sendData $jobid $metadata_p ligolw ]
                           set seqpt "sendData $jobid $metadata_p metadata"
                           after 0 [ list eventmon::sendData $jobid $metadata_p metadata ]
                         } else {
                           after 0 [ list eventmon::sendData $jobid $metadata_p $metadataapi ]
                         }
                        ;## destroy actual object in killJob
                        ;## and the created one in dataSendReaper
                        ;## but destroy the tcl created one here
                        ;## destructElementWrap $metadata_p metadata
                        ;## remove non-dummy metadata object from list since
                        ;## it is destroyed after threaded send
                     }
                     ;## there are mdd and metadata so let dataSend remove them
                     set ::${jobid}(product) [ lrange $result 1 2 ]
                  } elseif { $bits == 4 && [ string length $statefile ] } {
                     ;## state file only
                     set rc 4
                     set result [ macroReturnMsg $jobid file $statefile ]
                     foreach { rc msg } $result { break }
                  } else {
                     set rc 4
                     set msg "$jobid produced no products"
                  }
               } err ] } {
                  addLogEntry "$seqpt $err" red
                  set rc 3
                  set msg $err
               }
            } else {
               set rc 4
               set msg [ list ]
               ;## Mary said that the text may have spaces and
               ;## newlines sprinkled willy-nilly or contain only
               ;## spaces and newlines!!
               if { [ info exist ::${jobid}(exception) ] } {
                  set exceptions [ set ::${jobid}(exception) ]
                  set exceptions [ split $exceptions "\n" ]
                  foreach exception $exceptions {
                     set exception [ string trim $exception ]
                     regsub -all -- {\s+} $exception { } exception
                     if { [ regexp {\S+} $exception ] } {
                        set rc 1
                        lappend msg $exception
                     }
                  }
               }
               if { $rc == 1 } { set msg [ join $msg "\n" ] }
               if { $rc == 4 } { set msg "$jobid produced no products" }
            }
            } err ] } {
              addLogEntry $err 2
              set rc 3
            }
            set rc [ killJob $jobid $rc $msg ]
            return $rc
      }
      
      
      §   §   §

      Name: ${API}::checkBuckets

      Description:
      This proc is invoked every 1000 seconds in sync with handleDataSock.
      It works on the following data:
      1. Job data buckets foreach object in each bucket, spawn an addData thread to process the objects from wrapperAPI.
      2. addData threads If a thread is finished, record products in variable ::${jobid}(product)
        which triggers products to be processed if there is a tcl command for it (processJobProduct -> procesProduct-> killJob)
      3. Expired jobs If these jobs are not being processed due to no tcl command and have reached the expiration limit, they are removed via killJob.
      4. removeJob threads If these threads are finished, any product result from them are destroyed and threads removed by calling the _r function.
        Usage:
        This proc should not be called externally
        
        Comments:
        NOTE:: called by a trace each time handleDataSock is called
        proc eventmon::checkBuckets { args } {
        	 set seqpt {}
             if { [ catch {
                set this_time [ clock seconds ]
                ;## go through each data bucket and thread the shared
                ;## object calls
        		;## buckets maybe destroyed so just record a note for now
        		
               	foreach bucket [ info vars ::*_DATABUCKET ] {
                   ;## if bucket has something, process it
        		   	if	{ ! [ info exist $bucket ] } {
        		   		continue
        			}
                   	if { [ llength [ set $bucket ] ] } {
                      regexp {\d+} $bucket id
                      set jobid ${::RUNCODE}$id
                      if { [ catch {
                         set dataptrs [ processDataBucket $jobid ]
                      } err ] } {
                         set msg "process data bucket error: $err, contents=[ set $bucket]"
        				 addLogEntry $msg red
        				 lappend ::${jobid}(exception) $msg
        				 ;## terminate standalone wrapper jobs
        				 if	{ [ info exist ::${jobid}(-wrapperdata) ] } {
        					array set ::${jobid} [ list product {} ]				
        				 }
                      } else {
                         ;## we read the data bucket fine, so let's go!
                         foreach datap $dataptrs {
                            set selfDestruct 1
                            if { [ catch {
        					   ;## if standalone wrapper, set jobid in attribute to
        					   ;## avoid shared object complain
        					   ;## process table jobid has original jobId
        					
        					   if	{ [ info exist ::${jobid}(-wrapperdata) ] } {
        					   		ilwd::setjob $datap $jobid
        					   }
        					   if	{ ! [ info exist ::${jobid}(exception) ] } {
        					   		set ::${jobid}(exception) [ list ]
        					   }
                               set seqpt "addData_t($id $datap):"
                               if   { [ info exist ::${jobid}(objectCnt) ] } {
                                    set selfDestruct 0
                               }
                               set tid [ addData_t $id $datap $selfDestruct ]
                               if   { $::DEBUG_THREADS } {
                                    addLogEntry "addData thread $tid created for job $jobid" purple
                               }
                               set ::addData($tid,$jobid) [ list $datap $this_time ]
                               ::setAlert $tid ::$tid
                               ::setTIDCallback $tid "eventmon::addDataThreadReaper $tid $jobid $datap $this_time"
                               ;## check once in case thread finishes before trace can be set
                               ;## eventmon::addDataThreadReaper $tid $jobid $datap $this_time
                               if	{ [ info exist ::eventmon::tcl8.4 ] } {
                               		set tracedata [ trace info variable ::${jobid}(threadsdone) ]
                               		if	{ ![ string length $tracedata ] } {
                               			trace add variable ::${jobid}(threadsdone) { write } \
                                        [ list eventmon::standAloneCkDone $jobid ]
                               		}
                               } else {
                               		set tracedata [ trace vinfo ::${jobid}(threadsdone) ]
                               		if	{ ![ string length $tracedata ] } {
                               			trace variable ::${jobid}(threadsdone) w \
                                        [ list eventmon::standAloneCkDone $jobid ]
                               		}
                               }
                            } err ] } {
                               set msg "$seqpt: $err (destroying $datap now)"
                               addLogEntry $msg red
                               destructElementWrap $datap "$id,$datap,bucket-error"
        					   lappend ::${jobid}(exception) $msg
        					   ;## terminate for wrapper jobs
        					   if	{ [ info exist ::${jobid}(-wrapperdata) ] } {
        							array set ::${jobid} [ list product {} ]				
        					   }
                            }
                         }
                      }
                   }
                }
             } err ] } {
                addLogEntry "$seqpt $err" red
             }
        }
        
        
        §   §   §

        Name: eventmon::addDataThreadReaper

        Description:

        Parameters:
        Usage:
        
        Comments:
        
        threads are not read in order they are submitted info vars gets mixed vars ::LDAS-DEV12761082_DATABUCKET ::LDAS-DEV12761080_DATABUCKET ::LDAS-DEV12761080 ::LDAS-DEV12761088_DATABUCKET
        proc eventmon::addDataThreadReaper { tid jobid datap start_time args } {
        	if	{ ! [ info exist ::$tid ] } {
            	;## addLogEntry "cannot locate ::$tid" purple
                return
            }
            if	{ [ catch {
                set color red
                set state [ set ::$tid ]
                lappend ::addDataStates($tid,$jobid) $state
                set currtime [ gpsTime now ]
                if  { [ info exist ::addData($tid,$jobid) ] } {
                    foreach  [ list $datap $start_time ]  [ set ::addData($tid,$jobid) ] { break }
                	;## force thread to finish before job abort time
                	if 	{ ! [ string equal FINISHED $state ] &&
                      ! [ string equal $state $::TID_FINISHED ]  } {
                    	set runtime [ expr [ clock seconds ] - $start_time ]
                    	if  { $runtime > $::EVENTMON_THREAD_TIMEOUT_SECS } {
                        	addLogEntry "Forcing addData thread $tid for job $jobid to finish \
                            after running for $runtime seconds since $start_time ! \
                            API could block if thread does not finish.: states [ set ::addDataStates($tid,$jobid) ]" orange
                        	set state FINISHED
                    	}
                   	}
                }
                if 	{ [ string equal FINISHED $state ] ||
                      [ string equal $state $::TID_FINISHED ]  } {
                    set result [ addData_r $tid ]
                    ::unset ::$tid
                    ::unset ::addData($tid,$jobid)
                    ::unset ::addDataStates($tid,$jobid)
                    if  { $::DEBUG_THREADS } {
                        addLogEntry "addData $tid finished,result='$result'" purple
                    }
                    if  { [ info exist ::${jobid} ] } {
                        ;## standAloneCount
                        ;## note here that setting a common variable in a trace callback is
                        ;## visible to other threads issuing this callback as
                        ;## the setting of the common var is also queued.
                        ;## all pipelines except putStandAlone
                        if  { ! [ info exist ::${jobid}(objectCnt) ] } {
                            if  { [ string length $result ] } {
                                addLogEntry "result: '$result'"  purple
                                ;## this trigger trace to process job
                                set ::${jobid}(product) $result
                            }
                        } else {
                        	;## so thread events can happen in order
                            after 0 [ list eval lappend ::${jobid}(threadsdone) $tid ]
                        	;## after 0 [ list eventmon::standAloneCkDone $jobid ]
                        }
                    }
                }
        	} err ] } {
        		addLogEntry "$tid $err" $color
                catch { ::unset ::$tid }
                catch { ::unset ::addDataStates($tid,$jobid) }
                catch { ::unset ::addData($tid,$jobid) }
                destructElementWrap $datap addDataErr
                if  { [ array exist ::${jobid} ] } {
                    lappend ::${jobid}(exception) $err
                    ;## trigger job completion
                    set ::${jobid}(product) {}
                }
        	}
        }	
        
        
        Name: eventmon::expiredJobKiller

        Description:

        Parameters:
        Usage:
        
        Comments:
        
        if job does not have updatedTime, then set it so it can be expired.
        proc eventmon::expiredJobKiller {} {
             set seqpt {}
             if { [ catch {
        	 	set this_time [ clock seconds ]
                foreach var [ info vars ::${::RUNCODE}* ] {
                   if { [ regexp "(${::RUNCODE}\\d+)$" $var -> jobid ] } {
                      if { [ info exist ::${jobid}(updatedTime) ] } {
                         set last_time [ set ::${jobid}(updatedTime) ]
                         set dt [ expr $this_time - $last_time ]
                         if { $dt > $::MANAGER_ABORT_AFTER_N_SECONDS_IN_ONE_API } {
                            set jobdata [ array get ::$jobid ]
                            set msg "$jobid aborted after $dt seconds"
                            debugPuts "$var $msg $jobdata"
                            set msg [ eventmon::killJob $jobid 3 $msg ]
                            addLogEntry $msg red
                        }
                      }
                   }
                }
             } err ] } {
                addLogEntry "[ myName ]: $err" red
             }
        }
        
        
        §   §   §

        Name: eventmon::standAloneCkDone

        Description:

        Parameters:
        Usage:
        
        Comments:
        
        does not unset ::addData threads since there may be exception so wait for threads at killJob.
        proc eventmon::standAloneCkDone { jobid args } {
        	if	{ [ catch {
                set numThreads [ llength [ set ::${jobid}(threadsdone) ] ]
                set expected [ set ::${jobid}(objectCnt) ]
                if	{ $::DEBUG > 1 } {
                	addLogEntry "threads done $numThreads, expected $expected" purple
                }
               	if  { $numThreads >= $expected } {
                    	regexp {(\d+)} $jobid -> id
                    	set result [ removeJob $id ]
                        addLogEntry "set ::${jobid}(product)  with $result" purple
                        unset ::${jobid}(threadsdone)
                    	set ::${jobid}(product) $result
                }
                	
        	} err ] } {
            	addLogEntry $err 2
            }
        }
        
        
        §   §   §
        proc eventmon::standAloneCkDone2 { jobid } {
        	
            if	{ [ catch {
            	regexp {(\d+)} $jobid -> id
            	set  msg [ ::listJob $id ]
                addLogEntry "listJob:  '$msg'" purple
                if	{ [ regexp {receivedObjectCount=(\d+) expectedObjectCount=(\d+)} $msg -> objRecv objExpected ] } {
                	if	{ [ string equal $objRecv $objExpected ] } {
                    	set ::${jobid}(addData) [ list ]
                    	regexp {(\d+)} $jobid -> id
                    	set result [ removeJob $id ]
                        addLogEntry "set ::${jobid}(product)  with $result" purple
                    	set ::${jobid}(product) $result
                    }
                } else {
                	error "Unable to locate receivedObjectCount expectedObjectCount from shared object."
                }
           } err ] } {
           		addLogEntry $err 2
                return -code error $err
           }
        }
        
        
        §   §   §
        macro callbacks
        Name: handleReturnCode

        Description:
        Handles the return code from processProduct Forumlates the result to return back to manager
        Comments:
        proc handleReturnCode { jobid } {
             set seqpt {}
        	foreach { rc msg } [ eventmon::processProduct $jobid ] { break }
        	set subj ""
        	if	{ [ info exist ::${jobid}(-subject) ] } {
        		set subj [ set ::${jobid}(-subject) ]
        	}
        	if	{ ! [ string length $subj ] } {
        		set subj completed
        	}
        	switch $rc {
            	0   { 	if	{ [ string length $msg ] > 1 } {
                            set result [ list 1 $msg $subj ]
                   		} else {
                            set result [ list 0 0 0 ]
                       	}
                   	}
               	4   {
        			set result [ list 4 $msg $subj ] }
             	default { set result [ list 3 "${jobid}: eventmon [ myName ]: $msg" error! ] }
        	}
        	return $result
        }
        
        
        §   §   §

        Name: processJobProducts

        Description:
        A trace function for continuation of the getJobOpts macro when products are available or when job is timed out.
        Forumlates the result to return back to manager via reattach to operator socket.
        Comments:
        proc processJobProducts { jobid cid args } {
            set seqpt {}
            if  { [ catch {
        		set ::$cid [ handleReturnCode $jobid ]
            } err ] } {
                set ::$cid [ list 3 "${jobid}: [ myName ]:$err" error! ]
            }
            ;## unset here after waiting for job to finished
            catch { unset ::$jobid }
            catch { set ::jobid {} }
            reattach $jobid $cid
        }
        
        
        §   §   §
        proc eventmon::memSnapShot {} {
        	if	{ [ catch {
        	 	set numjobs 0
        		set numobjects 0
        		set numthreads 0
        		set jobs [ list ]
             	set jobid_rx (${::RUNCODE}\\d+)
             	foreach var [ info vars ::${::RUNCODE}* ] {
                	if	{ [ regexp $jobid_rx\$ $var -> jobid ] } {
        				lappend jobs $jobid
        				incr numjobs 1
        			}
        		}
        		set objects [ getElementList ]
        		regsub {\{} $objects {} objects
        		regsub {\}} $objects {} objects
        		set numobjects [ llength $objects ]
        		set threads [ getThreadList ]
        		if	{ [ regexp tid $threads ] } {
        			set threads  [ split $threads ]
        			set numthreads [ llength $threads ]
        		}
        		addLogEntry "$numjobs jobs \{$jobs\}, $numobjects objects, $numthreads threads" blue
        	} err ] } {
        		addLogEntry $err 2
        	}
        }
        
        
        §   §   §

        Name: dataSendReaper

        Description:
        wait for send data to finish
        Parameters:
        • easyInstance stmt instance name
        • ilwd filename filename for ilwd
        • tag tag to identify the results
        Usage:
         metadata::outIlwd $stmtp "results.out"
        
        Comments:
        uses ilwd namespace procs to write ilwd output of metadata results as native ilwd pointer to ilwd object set in ::metadata::ilwdObject is destroyed by thread
        proc dataSendReaperX { tid jobid datap args } {
            set myName [ myName ]
        	if	{ [ catch {
                set state [ set ::$tid ]
                if 	{ [ string equal FINISHED $state ] ||
                      [ string equal $state $::TID_FINISHED ]  } {
        		    sendDataElement_r $tid
                    ::unset ::$tid
                    if  { [ info exist ::${datap}_lock ] } {
                        incr ::${datap}_lock -1
                    } else {
                        set ::${datap}_lock 0
                    }
                    if  { ! [ set ::${datap}_lock ] } {
                        unset ::${datap}_lock
                        destructElement $datap
                        debugPuts "$jobid $tid completed for sendDataElement_r $datap, destructed."
                        unset ::eventmon::sendElement_$datap
                        addLogEntry "unset ::eventmon::sendElement_$datap" purple
                    } else {
                        debugPuts "$tid completed for sendDataElement_r $datap, not destructed yet."
                    }
                }
        	} err ] } {
        		addLogEntry "$tid $err" 2
                if  { [ info exist ::${datap}_lock ] } {
                    incr ::${datap}_lock -1
                    if  { ! [ set ::${datap}_lock ] } {
                    	unset ::eventmon::sendElement_$datap
                        debugPuts "$jobid $tid error, $datap removed, unset ::eventmon::sendElement_$datap."
                        unset ::${datap}_lock
                        catch { destructElement $datap }
                    }
                } else {
                    debugPuts "$tid error, $datap removed but there is no lock on it."
                    catch { destructElement $datap }
                }
                catch { ::unset ::$tid }
        	}	
        }
        
        
        §   §   §
        proc dataSendReaper { tid jobid datap args } {
            set myName [ myName ]
            if	{ ! [ info exist ::$tid ] } {
            	;## addLogEntry "cannot locate ::$tid" purple
                return
            }
        	if	{ [ catch {
                set state [ set ::$tid ]
                if 	{ [ string equal FINISHED $state ] ||
                      [ string equal $state $::TID_FINISHED ]  } {
        		    sendDataElement_r $tid
                    set unset_tid 1
                    ::unset ::$tid
                    unset unset_tid
                    set unset_element 1
                    destructElement $datap
                    unset unset_element
                    debugPuts "$tid completed for sendDataElement_r $datap."
                }
        	} err ] } {
        		addLogEntry "$tid $err" 2
                if	{ [ info exist unset_tid ] } {
                	catch { ::unset ::$tid }
                }
                if	{ [ info exist unset_element ] } {
                	catch { destructElement $datap }
                }
        	}	
        }
        
        
        §   §   §

        Name: eventmon::defunctJobs

        Description:

        Parameters:
        Usage:
        
        Comments:
        
        proc eventmon::defunctJobs { args } {
            if { [ catch {
                if { [ info exists ::SORTED_LIVE_JOB_LIST_REPORT ] } {
                   set oldest [ lindex $::SORTED_LIVE_JOB_LIST_REPORT 0 ]
                   regexp {\d+} $oldest oldest
                   if { [ string length $oldest ] } {
                      set jobs [ info vars ::${::RUNCODE}* ]
        			  # eval set elements [ getElementList ]
                      foreach var [ info vars ::eventmon::sendElement_* ] {
                            regexp {::eventmon::sendElement_(\S+)} $var -> element
                            if  { [ catch {
        					    set jobid [ getElementAttribute $element jobid ]
                        	    lappend jobs $jobid
                        	    set datap($jobid) $element
                            } err ] } {
                                addLogEntry "$element getElementAttribute jobid error: $err" red
                            }
                            unset $var
        			   }
                      	set jobs [ lsort -unique $jobs ]
                      	foreach jobid $jobs {
                         regexp {\d+} $jobid job
                         set jobid $::RUNCODE$job
                         if { $job + 10 < $oldest  } {
                            if { [ info exists ::DEBUG_DEFUNCT_JOB_REAPER ] } {
                               addLogEntry "removing defunct jobid: $jobid" purple
                            }
                            if  { [ info exist ::${jobid} ] } {
                                eventmon::killJob $jobid 3 "defunct jobid $jobid"
                            } elseif { [ info exist datap($job) ] } {
        						addLogEntry "destroy job $job defunct object $datap($job)"
                                set ptr [ set datap($job) ]
                                if  { [ info exist ::${ptr}_lock ] } {
                                    unset ::${ptr}_lock
                                } else {
                                    addLogEntry "no lock on $job datap $ptr" blue
                                }
                                destructElement $datap($job)
                            }
                         }
                      }
                   }
                }
             } err ] } {
                set subject "$::LDAS_SYSTEM $::API API error!"
                set report "[ myName ]: $err"
                addLogEntry "Subject: ${subject}; Body: $report" email
             }
        }
        
        
        §   §   §

        Name: eventmon::wakeupDataRecvThreads

        Description:

        Parameters:
        Usage:
        
        Comments:
        
        wake up dataRecv threads for a job that did not reach finish state
        proc eventmon::wakeupDataRecvThreads  {} {
            if  { [ array exist ::dataRecvStates ] } {
                foreach tid [ array names ::dataRecvStates ] {
                    foreach { sid currtime } [ set ::dataRecvStates($tid) ] { break }
                    dataRecvReaper $tid $sid
                }
            }
        }
        
        
        §   §   §

        Name: eventmon::wakeupaddDataThreads

        Description:

        Parameters:
        Usage:
        
        Comments:
        
        wake up dataRecv threads for a job that did not reach finish state
        proc eventmon::wakeupaddDataThreads  {} {
            if  { [ array exist ::addDataStates ] } {
                foreach entry [ array names ::addDataStates ]  {
                	foreach { tid jobid } [ split $entry , ] { break }
                    foreach { datap currtime } [ set ::addDataStates($entry) ] { break }
                    addLogEntry "waking up jobid $jobid, thread $tid, '[ array get ::${jobid} ]'" purple
                    eventmon::addDataThreadReaper $tid $jobid $datap $currtime
                }
            }
        }
        
        
        §   §   §

        up arrow Back to Top up arrow