Learning
Documentation
Community
Open Exchange
Global Masters
Home / Class Reference / %SYS namespace / %SYSTEM.WorkMgr
Private  Storage   

%SYSTEM.WorkMgr


class %SYSTEM.WorkMgr extends
%SystemBase

This class provides an interface to the work queue manager code that allows work to be distributed to multiple processes in order to improve performance. In order to use this you need to divide the work up into units that can be processed independently then you initialize the worker jobs, then queue each unit of work and finally you wait for the work to be completed. The units of work can output to the current device which will be buffered up and output to the main job's device when that unit of work is signalled as complete. Also all units of work by default are expected to return a %Status value so it can indicate errors, these are displayed and returned by the WaitForComplete method.

A typical calling sequence is:

	Set queue=$system.WorkMgr.%New("") If queue="" ; Report Error, can check %objlasterror for %Status code
	For i=1:1:100 {
		Set sc=queue.Queue("##class(MyClass).ClassMethod",i) If $$$ISERR(sc) ; Report Error
	}
	Set sc=queue.WaitForComplete() If $$$ISERR(sc) ; Report Error
The call to Initialize will allocate some worker jobs from the pool to the work group you are creating, if there are not enough worker jobs in the pool then a daemon process will start additional worker automatically. The number of worker jobs we start determined by the work queue manager based on current machine load and characteristics of the CPU the machine is running on. If numberjobs=0 is passed in on the %New call we will not use any workers jobs at all and will do all the processing in the current job in the WaitForComplete call.

Then you call Queue to queue a unit of work to be completed, this takes either a class method call, or a '$$func^rtn' reference and then any arguments you need to pass to this function. As soon as the first Queue is called a worker will start processing this item of work. It is important to make sure that all the units of work are totally independent and they do not rely on other work units. You can not rely on the order in which the units of work are processed. If the units may be changing a common global you will need to add locking to ensure one worker can not change a global while another worker is in the middle of reading this global. When a unit of work is queued the current security context is stored so when the work is run it will use the current security context. Note that the worker jobs are started by the super server and so will run as the operating system user that the super server process is setup to use, this may be different to your current logged in operating system user.

Finally you call WaitForComplete to wait for all the units of work to be complete, display any output each unit produced and report any errors reported from the work units. The WaitForComplete will use the qualifiers provided in the Initialize.

Worker jobs are owned by the master process while they are performing work in this group, so when the master exits the worker jobs will be released immediately. When the object returned by Initialize is destroyed this will remove all work associated with this group automatically, and release any workers.

Note that the work queued should not use exclusive news, kills or unlocks as this will interfere with the framework. Also if you use process private globals to store data during the processing note that as multiple jobs will be processing each chunk you can not rely on accessing these from the master process (or even from another chunk). The size of each chunk should be on the order of thousands of lines of COS code to ensure the overhead of the framework is not a significant factor, also rather than a few very large chunks (e.g. 4 big chunks) if possible it is better to have a fairly large number (100 say) of chunks as this allows us to scale with CPU cores. Worker jobs once started will remain until they time out given a long enough period of inactivity.

Inventory


Parameters Properties Methods Queries Indices ForeignKeys Triggers
7 26


Summary


Properties
DeleteTimeout NumActiveWorkers NumWorkers WorkQueueCount

Methods
Attach Clear DefaultNumWorkers Detach
Flush Free Help Initialize
IsWorkerJob NumActiveWorkersGet NumberWorkers Pause
Queue QueueCallback QueueLocalEntry QueueLocalEntryMulti
QueueShardWork Resume Setup SignalAll
StopWorkers TearDown Wait WaitForComplete

Subclasses
%SYSTEM.WorkMgrIPQ

Properties


• property DeleteTimeout as %Integer [ InitialExpression = 5 ];
When the work queue oref is killed or goes out of scope this is the timeout we pass to the call the destructor makes to delete the work queue. The delete queue logic will wait for this timeout period for the worker/s to respond but if the worker/s is/are still busy it will force these processes down.
• property NumActiveWorkers as %Integer [ Calculated ];
Number of active workers attached to this group. If the system is at the limit then we limit the number of worker jobs so you may need to wait for existing worker jobs to become free in order to attach to your work group. If queue is detached will return -1.
• property NumWorkers as %Integer;
After the work group is created the number of workers allocated to this group. Note this is the number of jobs we requested, not the number actively working for this group at this moment. The active number is NumActiveWorkers.
• property WorkQueueCount as %Integer [ MultiDimensional ];
Keep track of how many items we have queued
• property group as %Integer [ Private ];
Group assocaited with this work.
• property location as %RawString [ Private ];
Used by WaitForComplete to store information about the current location
• property suppressoutput as %Boolean [ Private ];
Disable output from worker jobs

Methods


• private method %OnClose() as %Status
• private method %OnNew(ByRef qstruct As %String, numberjobs As %Integer) as %Status
Initialize the worker jobs so they are ready to start accepting work items. In the qspec that is passed you can determine if you want output with the 'd' flag. If you wish to disable multiple jobs and process the work in this master process pass in numberjobs=0. This returns an instance of the worker queue manager which you can then queue work against. If you wish to specify how many workers you require then pass the numberjobs equal to the number you require.

Note that we qstruct is deprecated. We no longer use /multicompile qualifier in the work queue manager at all, so passing this qualifier into this class has no effect.

• classmethod Attach(token, ByRef sc As %Status) as WorkMgr
If you have called Detach on a work queue and have the associated token you can pass this into this class method and assuming the work queue still exists it will create an instance of the work queue manager associated this this queue. If it fails then it will return $$$NULLOREF and set sc with the error %Status value.
• method Clear(timeout As %Integer = 5) as %Status
Clear any existing work from this work queue, it does this by removing the queue and creating a new one. This will wait for up to timeout seconds for the workers to finish their current task before killing the jobs. When we return from this function all work on the group has terminated so you can cleanup any temporary globals etc used by the workers.
• classmethod DefaultNumWorkers() as %Integer
Return the default number of worker jobs we will use if no specific number is specified
• method Detach(ByRef token As %String, timeout As %Integer = 86400) as %Status
Detach this oref from the work queue and set token which you can use in another process (or in this process if wanted) to call Attach. The timeout is how long in seconds we will keep information about this work queue in the system, so if you do not Attach to this within this period of time we will remove all information about this queue and any subsequent call to Attach will fail.
• classmethod Flush() as %Status
Called from a worker job to flush any output produced so far to the master process. Without this all output from a worker job is buffered until this unit of work is complete and only then is it displayed in the master process.
• method Free() as %Status
Free any workers associated with this group to put them back into general pool. You can do this after you have queued all the work, it is useful to avoid holding onto these workers while you WaitForComplete.
• classmethod Help(method As %String = "") as %String
Write out a list of the methods of this object to the console. Pass the method name as an argument to see the full description for this item.
• classmethod Initialize(qspec As %String = "", ByRef sc As %Status, numberjobs As %Integer) as WorkMgr
Initialize the worker jobs so they are ready to start accepting work items. In the qspec that is passed you can determine if you want output with the 'd' flag. If you wish to disable multiple jobs and process the work in this master process pass in numberjobs=0. This returns an instance of the worker queue manager which you can then queue work against. If you wish to specify how many workers you require then pass the numberjobs equal to the number you require.

Note that we no longer use /multicompile qualifier in the work queue manager at all, so passing this qualifier into this class has no effect.

• classmethod IsWorkerJob() as %Boolean
Returns true/false based on if the current process is a worker job or not.
• method NumActiveWorkersGet() as %Integer
This is a Get accessor method for the NumActiveWorkers property.
• method NumberWorkers() as %Integer
After the Initialize is called this will return the current number of worker jobs that are active for this group. If the group is detached it will return -1.
• method Pause(timeout As %Integer, ByRef completed As %Boolean = 0) as %Status
Pause any work in this work queue, this stops any workers from picking up additional items from this specific queue, but leaves the work itself so you can call Resume. When no timeout is passed this will return immediately so there could still be work in progress from one of the work units that was being process at the time this function was called. If you pass in a none null timeout after removing existing work from the queue it will wait for up to this timeout value in seconds for work in progress to finish. If after the timeout the work in progress has exited it will set completed=1 else this will be 0. You can call this method passing in 0 for the timeout if you want to ask if work in progress has finished without waiting.
• method Queue(work As %String, args... As %String) as %Status
Queues a specific unit of work, you pass the entry point to call in 'work' argument. This can be either '##class(Classname).ClassMethod' or '$$entry^rtn' and it is expected to return a %Status code on completion. If you want to call a function that does not return any value on completion then prepend the class syntax with '=' such as '=##class(Classname).ClassMethod' or for calling a function do not include the '$$' for example 'entry^rtn'. The item being called may also throw exceptions in order to indicate an error happened which is trapped and converted to a %Status value to be returned in the master process. You can also pass additional arguments including arrays by reference. Note that the size of the data passed in these arguments should be kept relatively small, if there is a large amount of information that needs to be passed then put this in a global. The security context of the caller is also recorded when this function is called so it can be used when the work is executed.
• method QueueCallback(work As %String, callback As %String, args... As %String) as %Status
Similar to Queue except you can also pass in a 'callback' which is a function or class method that is called in the master process when this unit of work is complete. This function is called with the same arguments the original 'work' is called with so it knows which unit of work is complete. Also the callback function can access the '%job' public variable which is the $job of the process which really did the work, the '%status' public variable which is the %Status return code from the work unit this is the callback for and '%workqueue' public variable which is the oref of the work queue instance. If using the Wait to wait for the work to be completed the callback can signal that it should return to the caller rather than waiting for more events by setting the public variable '%exit' to 1.
• method QueueLocalEntry(rtnidx As %Binary, func As %String, args... As %String) as %Status
Internal method, not for customer use.
• method QueueLocalEntryMulti(rtnidx As %Binary, func As %String, total As %String) as %Status
Internal method, not for customer use.
• classmethod QueueShardWork(remoteSystem As %String, remoteNamespace As %String = $namespace, work As %String, args... As %String) as %Status
Internal method for use by Sharding only.

Add a task to the Work Queue Manager runqueue on a remote machine. You get a %Status if the work was queued correctly. There is no feedback on when the work is done or if it had an error when it was run, this will be added by future enhancements. Arguments are remoteSystem the namespace or implied reference that maps to the remote systems IRISLOCALDATA database so you can form a valid global reference with '^|remoteSystem|IRIS.WorkQueue' that will write data to the target system. remoteNamespace the namespace on the remote system to run the work it in, if not specified this defaults to the same namespace name as the current namespace (so assumes the namespace naming between systems is identical). work the entry point to call in the same format as Queue, and args which are the optional argument (including pass by reference arrays) to pass to this 'work' entry point also identical to Queue.

• method Resume() as %Status
Resume any work in this work queue which was paused with a call to Pause.
• method Setup(work As %String, args... As %String) as %Status
If this queue needs some setup work before we process the first item in this queue or if switching from processing items in another queue to one in this queue then specify the function to call here. The arguments are the same as for Queue. This must be called before you queue work.
• classmethod SignalAll(work As %String) as %Status
Deprecated method
• classmethod StopWorkers(timeout As %Integer = 5) as %Status
Deprecated method
• method TearDown(work As %String, args... As %String) as %Status
If this queue needs some work done to restore a process to the previous state before we process the first item in another queue when switching from processing items in this queue then specify the function to call here. The arguments are the same as for Queue. This must be called before you queue work.
• method Wait(qspec As %String, ByRef atend As %Boolean, timeout As %Integer = -1) as %Status
After work has been queued you can call this to process some work and then return to the caller where the caller will check the atend to see if all the work was processed and if not then you can call this method again to process additional items until atend is true.
The conditions which control when it will return is controlled by timeout argument. If this is -1 (the default) then after each unit of work is signalled as complete and you have a callback queued in QueueCallback which the public variable '%exit' to 1 then this will return to the caller. If timeout is zero we will count completion events until no completions are waiting to be counted and will return to the caller. If timeout is a positive value then this will return to the caller after one work complete event or after the specified number of seconds has elapsed. This method returns atend to show if all the work is complete or if there are still items outstanding. Note that in the function/method called in the QueueCallback callback you can reference the public variable '%workqueue' which is the oref of the instance of the work queue class in order to queue additional work.
• method WaitForComplete(qspec As %String, errorlog As %String) as %Status
After work has been queued this will wait for all the workers to complete. It will display any output the work writes to the current device and it will also combine all the %Status codes that the units of work report and return this combined %Status. If there are no workers then this will execute all the work in this main job during this phase. When this returns all the work queued up to this point has been completed. Also this is the phase that will run the QueueCallback callbacks as the workers jobs report that various units are complete. Note that in the function/method called in the QueueCallback callback you can reference the public variable '%workqueue' which is the oref of the instance of the work queue class in order to queue additional work.