com.waveset.task
Class Scheduler

java.lang.Object
  extended byjava.lang.Thread
      extended bycom.waveset.task.Scheduler
All Implemented Interfaces:
javax.management.NotificationBroadcaster, javax.management.NotificationEmitter, java.lang.Runnable, SchedulerMBean, com.waveset.server.ServerConfiguration.Listener

public class Scheduler
extends java.lang.Thread
implements com.waveset.server.ServerConfiguration.Listener, SchedulerMBean, javax.management.NotificationEmitter

The background task execution thread.

The scheduler is a singleton thread that periodically checks for task instances in the repository that need attention. This includes ready tasks that can be executed, executing tasks that have run too long, and finished tasks that whose results can be deleted.

Tasks are ordinarily launched in their own thread, but the definition can also request "synchronous" execution, in which case the task will execute in the scheduler's thread. This should be only used for very brief tasks. It is somewhat dangerous and may be removed.


Nested Class Summary
static class Scheduler.TaskInfo
          A helper class we use to maintain information about the tasks currently executing in this JVM.
static interface Scheduler.TaskListener
          Classes that want to be notified when tasks complete can implement TaskListener and register with the listener interface.
 
Field Summary
protected  boolean _trace
          If set, causes debug messages to be displayed on the console.
static java.lang.String CLEANUP_TASK_RESULT_CYCLE
           
static java.lang.String CLEANUP_TASK_RESULT_DISABLE
           
static java.lang.String CLEANUP_TASK_RESULT_TRACE
           
static java.lang.String code_id
           
static int DEFAULT_BASE_CYCLE_TIME
          Default base cycle time.
static int DEFAULT_FINISHED_CYCLE_TIME
          Default finished cycle time.
static int DEFAULT_READY_CYCLE_TIME
          Default ready cycle time.
static int DEFAULT_SCHEDULED_CYCLE_TIME
          Default scheduled cycle time.
static int DEFAULT_TASK_LIMIT
           
static java.lang.String DISABLE_DAEMON
           
static java.lang.String DISABLE_FAILOVER
           
static java.lang.String DISABLE_SCHEDULED
           
static java.lang.String START_DISABLED
           
static java.lang.String START_ENABLED
           
static java.lang.String START_MANUAL
           
static java.lang.String START_SUSPENDED
           
 
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
Fields inherited from interface com.waveset.task.SchedulerMBean
NAME, OBJECT_NAME, RESUMED, STARTED, STATUS, STOPPED, SUSPENDED
 
Method Summary
 void addNotificationListener(javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)
           
 void cleanupOrphanTaskResults()
          Find and delete any offspring of deleted TaskInstances.
static void defibrillate()
          "Shock" the Scheduler into calling ServerRegistry and picking up a new value for heartbeat interval.
 void deleteTaskInstance(java.lang.String taskId, TaskDefinition taskDef, TaskInstance ti, java.lang.String repoUser)
           
 void deleteTaskInstance(TaskInstance ti, java.lang.String repoUser)
          Public so it can be called by DeleteVisitor.
 void deleteTaskInstanceSubObjects(java.lang.String taskId, TaskDefinition taskDef, java.lang.String repoUser)
           
 void deleteTaskInstanceSubObjects(TaskInstance ti, java.lang.String repoUser)
           
static void deRegisterListener(Scheduler.TaskListener tl)
           
 TaskInstance executeTask(java.lang.String id)
          Public method to resume a suspended task and run it now synchronously.
protected  void finishExecution(TaskInstance task, boolean ignoreLockContention)
          Cleanup task state after execution has finished.
 int getCycles()
          The number of second we want to sleep between cycles.
 int getErrorCount()
          The number of exceptions we've caught during task processing.
protected  Executor getExecutor(TaskInstance task)
          Find the executor for a task.
 int getExpiredCount()
          The number of finished tasks we've expired.
 int getFinishedCycleCounter()
          The number of seconds we've been waiting for a finished cycle.
 int getFinishedCycleTime()
          The cycle time in seconds for processing FINISHED tasks.
 long getLastHeartbeat()
          Returns the date of the last heartbeat in ms.
 int getLaunchedCount()
          The number of scheduled tasks we've launched.
 long getMostRecentHeartbeat()
          Gets the date of the most recent heartbeat from the scheduler in ms.
 javax.management.MBeanNotificationInfo[] getNotificationInfo()
           
 int getReadyCount()
          The number of ready tasks we've executed during the ready cycle.
 int getReadyCycleCounter()
          The number of seconds we've been waiting for a ready cycle.
 int getReadyCycleTime()
          The cycle time in seconds for processing READY tasks.
 int getScheduledCycleCounter()
          The number of seconds we've been waiting for a scheduled cycle.
 int getScheduledCycleTime()
          The cycle time in seconds for processing SCHEDULED tasks.
static Scheduler getScheduler(LighthouseContext lh)
          Get the singleton scheduler.
 int getStatus()
          Get state as an integer.
 void getStatus(java.lang.StringBuffer b)
          Return scheduler diagnostic messages.
 java.lang.String getStatusDisplay()
          Get the state in a displayable string.
 TaskInstance launchTask(TaskTemplate tt)
          Create a new TaskInstance object and make it ready for execution; This can be called by the TaskManager in response to a call from the Session API.
protected  void poolExecutor(Executor e)
          Return an executor to the pool.
static void println(java.lang.String msg)
          The ususal convenience "macro".
static void registerListener(Scheduler.TaskListener tl)
           
 void removeNotificationListener(javax.management.NotificationListener listener)
           
 void removeNotificationListener(javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)
           
 void restoreCycleTimes()
          Reset cycle times to their default values.
 void resumeScheduler()
          Resumes the scheduler after it has been suspended.
 void run()
          The scheduler main loop.
 void runNow()
          Can be called by TaskManager in response to a user request to make the scheduler go through a cycle immediately rather than waiting for its next cycle.
 void runNow(long waitMillis)
          Run the scheduler immediately, and wait for it to finish processing tasks.
 void serverConfigurationChanged()
          Function called by ServerConfiguration whenever changes are detected.
 void setCycleTime(TaskState state, int seconds)
          Sets the cycle time for processing a particular kind of task.
 void setTrace(boolean t)
          Turn debug tracing on and off.
static void shutdown(boolean wait)
          Shuts down the scheduler.
 void start()
           
 void suspendScheduler()
          Suspends the scheduler.
 boolean tooBusy()
          Tests to see if the scheduler on this machine considers itself too busy to handle another task.
 
Methods inherited from class java.lang.Thread
activeCount, checkAccess, countStackFrames, currentThread, destroy, dumpStack, enumerate, getContextClassLoader, getName, getPriority, getThreadGroup, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setName, setPriority, sleep, sleep, stop, stop, suspend, toString, yield
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

code_id

public static final java.lang.String code_id
See Also:
Constant Field Values

DEFAULT_BASE_CYCLE_TIME

public static final int DEFAULT_BASE_CYCLE_TIME
Default base cycle time.

See Also:
Constant Field Values

DEFAULT_READY_CYCLE_TIME

public static final int DEFAULT_READY_CYCLE_TIME
Default ready cycle time.

See Also:
Constant Field Values

DEFAULT_SCHEDULED_CYCLE_TIME

public static final int DEFAULT_SCHEDULED_CYCLE_TIME
Default scheduled cycle time. This will be used both to check TaskSchedule objects, as well as to examine suspended tasks that left a wakup call.

See Also:
Constant Field Values

DEFAULT_FINISHED_CYCLE_TIME

public static final int DEFAULT_FINISHED_CYCLE_TIME
Default finished cycle time.

See Also:
Constant Field Values

CLEANUP_TASK_RESULT_CYCLE

public static final java.lang.String CLEANUP_TASK_RESULT_CYCLE
See Also:
Constant Field Values

CLEANUP_TASK_RESULT_DISABLE

public static final java.lang.String CLEANUP_TASK_RESULT_DISABLE
See Also:
Constant Field Values

CLEANUP_TASK_RESULT_TRACE

public static final java.lang.String CLEANUP_TASK_RESULT_TRACE
See Also:
Constant Field Values

DISABLE_FAILOVER

public static final java.lang.String DISABLE_FAILOVER
See Also:
Constant Field Values

DISABLE_DAEMON

public static final java.lang.String DISABLE_DAEMON
See Also:
Constant Field Values

DISABLE_SCHEDULED

public static final java.lang.String DISABLE_SCHEDULED
See Also:
Constant Field Values

START_ENABLED

public static final java.lang.String START_ENABLED
See Also:
Constant Field Values

START_DISABLED

public static final java.lang.String START_DISABLED
See Also:
Constant Field Values

START_SUSPENDED

public static final java.lang.String START_SUSPENDED
See Also:
Constant Field Values

START_MANUAL

public static final java.lang.String START_MANUAL
See Also:
Constant Field Values

DEFAULT_TASK_LIMIT

public static final int DEFAULT_TASK_LIMIT
See Also:
Constant Field Values

_trace

protected boolean _trace
If set, causes debug messages to be displayed on the console.

Method Detail

getScheduler

public static Scheduler getScheduler(LighthouseContext lh)
                              throws WavesetException
Get the singleton scheduler. It will be launched if it isn't yet running.

Throws:
WavesetException

serverConfigurationChanged

public void serverConfigurationChanged()
Function called by ServerConfiguration whenever changes are detected. Cache some options.

Specified by:
serverConfigurationChanged in interface com.waveset.server.ServerConfiguration.Listener

shutdown

public static void shutdown(boolean wait)
Shuts down the scheduler.

The thread shutdown methods from 1.x have been deprecated, so we just suggest stopping by setting a flag. This probably isn't necessary if we're a daemon thread.


suspendScheduler

public void suspendScheduler()
Suspends the scheduler. Not to be confused with the deprecated Thread.suspend method, this one doesn't really suspend the thread, it just sets a flag to keep the thread from doing any task processing for awhile.


resumeScheduler

public void resumeScheduler()
Resumes the scheduler after it has been suspended.


setTrace

public void setTrace(boolean t)
Turn debug tracing on and off.


setCycleTime

public void setCycleTime(TaskState state,
                         int seconds)
Sets the cycle time for processing a particular kind of task. If the state is null, it sets the base cycle time.

State cycle times should always be greater than the base cycle time, but if it isn't it doesn't hurt, it will behave as if the cycle time is the same as the base cycle.


restoreCycleTimes

public void restoreCycleTimes()
Reset cycle times to their default values. Typically used after tests that lower the cycle times.


println

public static void println(java.lang.String msg)
The ususal convenience "macro".


getStatus

public void getStatus(java.lang.StringBuffer b)
Return scheduler diagnostic messages.


run

public void run()
The scheduler main loop.

We wait awhile, and then check for activity.

For test "liveliness", I wanted to use Thread.interrupt here to force the thread out of sleep and make it do a cycle without waiting 5 seconds for the timeout. Unfortunately, I haven't been able to get Thread.interrupt to cause Thread.sleep to throw an InterruptedException. I kludged a workaround that used an primary sleep time of 1 second, and a secondary time that could be greater for processing tasks. This turned out to be useful as we can now have different cycle times for each task type, which will improve performance.

Specified by:
run in interface java.lang.Runnable

defibrillate

public static void defibrillate()
                         throws WavesetException
"Shock" the Scheduler into calling ServerRegistry and picking up a new value for heartbeat interval.

Throws:
WavesetException
See Also:
ServerRegistry.getHeartBeatIntervalSeconds(com.waveset.object.LighthouseContext)

getLastHeartbeat

public long getLastHeartbeat()
Returns the date of the last heartbeat in ms.

Returns:
Date in ms of the last heartbeat.

runNow

public void runNow()
Can be called by TaskManager in response to a user request to make the scheduler go through a cycle immediately rather than waiting for its next cycle.

I wanted to just call Thread.interrupt here, but that doesn't seem to break us out of a sleep, explore someday. Added the staggered cycle times instead.


runNow

public void runNow(long waitMillis)
Run the scheduler immediately, and wait for it to finish processing tasks.

Parameters:
waitMillis - - wait this long. (If -1, wait indefinitely.)

launchTask

public TaskInstance launchTask(TaskTemplate tt)
                        throws TaskExists,
                               PriorTaskResults,
                               WavesetException
Create a new TaskInstance object and make it ready for execution; This can be called by the TaskManager in response to a call from the Session API.

Here we just assemble a TaskInstance object, and pass it down to the next launchTask method.

The execution mode may be specified in the template. It is normally ASYNC, but if we're being called directly from a GUI request thread, it may be SYNC or ASYNC_IMMEDIATE.

Throws:
TaskExists
PriorTaskResults
WavesetException

tooBusy

public boolean tooBusy()
Tests to see if the scheduler on this machine considers itself too busy to handle another task. This will be called during new task scheduling to see if we can bypass storing the task in the repository, and just start running it here. If we return true, we'll store the task in the the repository and let the other machines handle it.


deleteTaskInstance

public void deleteTaskInstance(TaskInstance ti,
                               java.lang.String repoUser)
                        throws WavesetException
Public so it can be called by DeleteVisitor. Supplied repoUser name may be different than ours.

Throws:
WavesetException

deleteTaskInstance

public void deleteTaskInstance(java.lang.String taskId,
                               TaskDefinition taskDef,
                               TaskInstance ti,
                               java.lang.String repoUser)
                        throws WavesetException
Throws:
WavesetException

deleteTaskInstanceSubObjects

public void deleteTaskInstanceSubObjects(TaskInstance ti,
                                         java.lang.String repoUser)
                                  throws WavesetException
Throws:
WavesetException

deleteTaskInstanceSubObjects

public void deleteTaskInstanceSubObjects(java.lang.String taskId,
                                         TaskDefinition taskDef,
                                         java.lang.String repoUser)
                                  throws WavesetException
Throws:
WavesetException

executeTask

public TaskInstance executeTask(java.lang.String id)
                         throws WavesetException
Public method to resume a suspended task and run it now synchronously. Normally tasks are executed by the Scheduler thread in response to task events, but in some cases it is necessary to cause a task to execute immediately within the requesting thread.

One common example is when advancing the state of a workflow task immediately after a work item has been saved, which allows us to report the new workflow state immediately, rather than waiting for the scheduler to advance workflow in the background.

We have to obtain locks on the TaskInstance to prevent the scheduler thread from trying to process this object. Note that we have to use a different lock user than the Scheduler thread.

Throws:
WavesetException

getExecutor

protected Executor getExecutor(TaskInstance task)
                        throws WavesetException
Find the executor for a task.

This can throw a host of internal exceptions if we have trouble finding the class. Callers are advised to catch the exceptions, terminate the task, and mark the task definition so that we don't keep trying this over and over again.

Made this public so the TaskManager can obtain one in order to call the new getExtendedResults and getTaskWorkFiles methods.

Throws:
WavesetException

poolExecutor

protected void poolExecutor(Executor e)
Return an executor to the pool. This will be called by the execute() method above for synchronous execution, and also by TaskThread when it finishes.


finishExecution

protected void finishExecution(TaskInstance task,
                               boolean ignoreLockContention)
                        throws WavesetException
Cleanup task state after execution has finished.

We can get here through various paths. The task will not be in the repository if we performed the execution immediately on the machine that scheduled the task. It will be in the repository if we read it from processTasks, or it was suspended at one time.

In either case, the task will not be locked, so we can lock it now to store results if necessary. If the task is in the repository, and there are no results to retain, we can delete it.

Throws:
WavesetException

cleanupOrphanTaskResults

public void cleanupOrphanTaskResults()
Find and delete any offspring of deleted TaskInstances.

It would be better never to produce any orphaned offspring in the first place (they should always be deleted along with the TaskInstance), but this is a reasonable fallback.


registerListener

public static void registerListener(Scheduler.TaskListener tl)

deRegisterListener

public static void deRegisterListener(Scheduler.TaskListener tl)

start

public void start()

getMostRecentHeartbeat

public long getMostRecentHeartbeat()
Description copied from interface: SchedulerMBean
Gets the date of the most recent heartbeat from the scheduler in ms.

Specified by:
getMostRecentHeartbeat in interface SchedulerMBean
Returns:
Returns in ms the most recent heartbeat.

getStatus

public int getStatus()
Description copied from interface: SchedulerMBean
Get state as an integer.

Specified by:
getStatus in interface SchedulerMBean

getStatusDisplay

public java.lang.String getStatusDisplay()
Description copied from interface: SchedulerMBean
Get the state in a displayable string.

Specified by:
getStatusDisplay in interface SchedulerMBean

getCycles

public int getCycles()
Description copied from interface: SchedulerMBean
The number of second we want to sleep between cycles. We may not always do something each cycle, other cycle times affect what we will do. This must be the smallest of all cycle times.

It should be relatively small so we can respond to scheduler control events like changing cycle times, requesting an immediate cycle, shutting down the thread, etc.

Specified by:
getCycles in interface SchedulerMBean
Returns:
Returns the current cycle time in seconds.

getReadyCount

public int getReadyCount()
Description copied from interface: SchedulerMBean
The number of ready tasks we've executed during the ready cycle. This won't include tasks executed synchronously through the API, only those we discovered in the READY state in the repository.

Specified by:
getReadyCount in interface SchedulerMBean

getLaunchedCount

public int getLaunchedCount()
Description copied from interface: SchedulerMBean
The number of scheduled tasks we've launched.

Specified by:
getLaunchedCount in interface SchedulerMBean

getExpiredCount

public int getExpiredCount()
Description copied from interface: SchedulerMBean
The number of finished tasks we've expired.

Specified by:
getExpiredCount in interface SchedulerMBean

getErrorCount

public int getErrorCount()
Description copied from interface: SchedulerMBean
The number of exceptions we've caught during task processing.

Specified by:
getErrorCount in interface SchedulerMBean

getFinishedCycleCounter

public int getFinishedCycleCounter()
Description copied from interface: SchedulerMBean
The number of seconds we've been waiting for a finished cycle.

Specified by:
getFinishedCycleCounter in interface SchedulerMBean

getFinishedCycleTime

public int getFinishedCycleTime()
Description copied from interface: SchedulerMBean
The cycle time in seconds for processing FINISHED tasks.

Specified by:
getFinishedCycleTime in interface SchedulerMBean

getReadyCycleCounter

public int getReadyCycleCounter()
Description copied from interface: SchedulerMBean
The number of seconds we've been waiting for a ready cycle.

Specified by:
getReadyCycleCounter in interface SchedulerMBean

getReadyCycleTime

public int getReadyCycleTime()
Description copied from interface: SchedulerMBean
The cycle time in seconds for processing READY tasks.

Specified by:
getReadyCycleTime in interface SchedulerMBean

getScheduledCycleCounter

public int getScheduledCycleCounter()
Description copied from interface: SchedulerMBean
The number of seconds we've been waiting for a scheduled cycle.

Specified by:
getScheduledCycleCounter in interface SchedulerMBean

getScheduledCycleTime

public int getScheduledCycleTime()
Description copied from interface: SchedulerMBean
The cycle time in seconds for processing SCHEDULED tasks.

Specified by:
getScheduledCycleTime in interface SchedulerMBean

removeNotificationListener

public void removeNotificationListener(javax.management.NotificationListener listener,
                                       javax.management.NotificationFilter filter,
                                       java.lang.Object handback)
                                throws javax.management.ListenerNotFoundException
Specified by:
removeNotificationListener in interface javax.management.NotificationEmitter
Throws:
javax.management.ListenerNotFoundException

addNotificationListener

public void addNotificationListener(javax.management.NotificationListener listener,
                                    javax.management.NotificationFilter filter,
                                    java.lang.Object handback)
                             throws java.lang.IllegalArgumentException
Specified by:
addNotificationListener in interface javax.management.NotificationBroadcaster
Throws:
java.lang.IllegalArgumentException

getNotificationInfo

public javax.management.MBeanNotificationInfo[] getNotificationInfo()
Specified by:
getNotificationInfo in interface javax.management.NotificationBroadcaster

removeNotificationListener

public void removeNotificationListener(javax.management.NotificationListener listener)
                                throws javax.management.ListenerNotFoundException
Specified by:
removeNotificationListener in interface javax.management.NotificationBroadcaster
Throws:
javax.management.ListenerNotFoundException