%SYSTEM.WorkMgr
class %SYSTEM.WorkMgr extends %SYSTEM.AbstractWorkMgr
This class provides an interface to the work queue manager code that allows work to be distributed to multiple processes in order to improve performance. To use this you construct an instance of the work queue manager, then divide the work up into units that can be processed independently and queue each unit of work, and finally wait for the work to be completed. The units of work can output to the current device which will be buffered up and output to the main job's device when that unit of work is signalled as complete. Also all units of work by default are expected to return a %Status value so it can indicate errors, these are returned by the Sync() method (formerly WaitForComplete()) or the WaitOne() or the Wait().A typical calling sequence is:
Set queue=$system.WorkMgr.%New() If queue="" ; Report Error, can check %objlasterror for %Status code
For i=1:1:100 {
Set sc=queue.Queue("##class(MyClass).ClassMethod",i) If $$$ISERR(sc) ; Report Error
}
Set sc=queue.Sync() If $$$ISERR(sc) ; Report Error
Call Queue() to queue a unit of work to be run, this takes either a class method call, or a '$$func^rtn' reference and then any arguments you need to pass to this function. We support passing arrays here using the standard '.array' syntax although changes in the array by the work unit are not returned to the caller. As soon as the first Queue() is called a worker will start processing this item of work. It is important to make sure that all the units of work are totally independent and they do not rely on other work units. You must not rely on the order in which the units of work are processed. If the units may be changing a common global you will need to add locking to ensure one worker can not change a global while another worker is in the middle of reading this global. When a unit of work is queued the current security context is stored so the work unit will run inside the current security context. Note that the worker jobs are started by the Super Server and so will run as the operating system user that the Super Server process is setup to use, this may be different to your current logged in operating system user.
Finally call Sync() to wait for all the units of work to be complete, display any output each unit produced and report any errors reported from the work units. Instead of waiting for all work units to complete with the Sync() you can obtain notification of each completion using the WaitOne() interface which is an iterator over the work completion events. Another mechanism to run code when each work unit is completed is the QueueCallback() method.
Work units may write to the public variable %result which will be relayed to the parent process in either the WaitOne() call or in the callback function from the QueueCallback(). Worker jobs are owned by the parent process while they are performing work in this group, so when the parent exits the worker jobs will be released immediately. When the object returned by %New is destroyed this will remove all work associated with this group automatically, and release any workers.
The work queued should not perform exclusive kills or argumentless unlocks as this will interfere with the framework. Use of process private globals are only helpful within a chunk of work and not across chunks as different chunks of work are processed by different workers in different jobs. The size of each chunk should be on the order of thousands of lines of ObjectScript code to ensure the overhead of the framework is not a significant factor, also rather than a few very large chunks (e.g. 4 big chunks) if possible it is better to have a fairly large number (100 say) of chunks as this allows us to scale with CPU cores. Worker jobs once started will remain until they time out given a long enough period of inactivity as these jobs are shared among all work entered into the work queue manager in the same 'category'.
The third argument to %New() is the category which specifies which work queue pool to allocate worker jobs from. These categories are independent of each other so for example 'SQL' work is allocated from a different pool to 'Default' work and the load placed on the system from each of the categories can be tuned.
Method Inventory
- ActivateDeferred()
- Attach()
- DefaultNumWorkers()
- Detach()
- Flush()
- IsWorkerJob()
- NumberWorkers()
- Pause()
- Queue()
- QueueCallback()
- QueueDeferred()
- Resume()
Methods
PARAMETERS:
This is similar to the Queue() method in the way that it makes the deferred work runnable, but does not guarantee completion/priority. This ActivateDeferred() can be called with a valid dtoken from the process that created the dtoken, or from a different process that has a valid dtoken.
This function is called with the same arguments the original 'work' is called with so it can tell which unit of work is complete.
Also the callback function can access the '%job' public variable which is the $job of the process which really did the work; the '%status' public variable which is the %Status return code from the work unit this is the callback for; the '%workqueue' public variable which is the oref of the work queue instance; and the public variable %result array if it is set in the work unit.
Any error returned by the work unit will by default be added to the Sync() return %Status, but the callback may alter the the work units %Status by modifying the public variable '%status'. For example if the callback detects a specific error %Status from the work unit and does a 'Set %status=$$$OK' it will mark this error as handled and no error %Status will be added to the return from Sync().
If using the Wait() to wait for the work to be completed the callback can signal that it should return to the caller rather than waiting for more events by setting the public variable '%exit' to 1.
PARAMETERS:
Unlike work requested by Queue(), the work requested by this QueueDeferred() call is not made available for a worker job to process, as the unit is in a 'deferred' state until explicitly activated. A work queue can simultaneously contain both normal work units (i.e. units queued by Queue()) and deferred work units (i.e. units queued by QueueDeferred()).
EXAMPLE USAGE:
Set queue=$system.WorkMgr.%New() If queue="" ; Report Error, can check %objlasterror for %Status code
For i=1:1:10 { ; queue some normal work units 1-10.
Set sc=queue.Queue("##class(MyClass).ClassMethod",i) If $$$ISERR(sc) ; Report Error
}
For i=11:1:20 { ; queue some deferred work units 11-20.
Set sc=queue.QueueDeferred(.dtoken,"##class(MyClass).ClassMethod",i) If $$$ISERR(sc) ; Report Error
Set dtokens($i(dtokens))=dtoken ; store the 'dtoken's for these deferred work units
}
For i=1:1:dtokens { ; activate deferred work units 11-20. This can also be done from a different process with the correct 'dtoken's information.
Set sc = ##class(%SYSTEM.WorkMgr).ActivateDeferred(dtokens(i))
; Or: "Set sc = queue.ActivateDeferred(dtokens(i))"
If $$$ISERR(sc) ; Report Error
}
; Note that here the Sync() will wait for both the 1-10 normal work units and the
; 11-20 deferred work units to finish. Without the final loop above to call
; ActivateDeferred() for each 'dtoken', this Sync() will finish the 1-10 normal
; work units and then hang, waiting for all of the deferred work units to be activated!
Set sc=queue.Sync() If $$$ISERR(sc) ; Report Error
Inherited Members
Inherited Properties
Inherited Methods
- %ClassIsLatestVersion()
- %ClassName()
- %DispatchClassMethod()
- %DispatchGetModified()
- %DispatchGetProperty()
- %DispatchMethod()
- %DispatchSetModified()
- %DispatchSetMultidimProperty()
- %DispatchSetProperty()
- %Extends()
- %GetParameter()
- %IsA()
- %New()
- %OriginalNamespace()
- %PackageName()
- %SetModified()
- Cleanup()
- Clear()
- Help()
- Initialize()
- NumActiveWorkersGet()
- Setup()
- Sync()
- TearDown()
- Wait()
- WaitForComplete()
- WaitOne()