Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages


home | help
IPC::DirQueue(3)      User Contributed Perl Documentation     IPC::DirQueue(3)

       IPC::DirQueue - disk-based many-to-many task queue

	   my $dq = IPC::DirQueue->new({ dir =>	"/path/to/queue" });

	   my $dq = IPC::DirQueue->new({ dir =>	"/path/to/queue" });
	   my $job = $dq->pickup_queued_job();
	   if (!$job) {	print "no jobs left\n";	exit; }
	   # something interesting with $job->get_data_path() ...

       This module implements a	FIFO queueing infrastructure, using a
       directory as the	communications and storage media.  No daemon process
       is required to manage the queue;	all communication takes	place via the

       A common	UNIX system design pattern is to use a tool like "lpr" as a
       task queueing system; for example,
       "" describes
       the use of "lpr"	as an MP3 jukebox.

       However,	"lpr" isn't as efficient as it could be.  When used in this
       way, you	have to	restart	each task processor for	every new task.	 If
       you have	a lot of startup overhead, this	can be very inefficient.
       With "IPC::DirQueue", a processing server can run persistently and
       cache data needed across	multiple tasks efficiently; it will not	be
       restarted unless	you restart it.

       Multiple	enqueueing and dequeueing processes on multiple	hosts (NFS-
       safe locking is used) can run simultaneously, and safely, on the	same

       Since multiple dequeuers	can run	simultaneously,	this provides a	good
       way to process a	variable level of incoming tasks using a pre-defined
       number of worker	processes.

       If you need more	CPU power working on a queue, you can simply start
       another dequeuer	to help	out.  If you need less,	kill off a few

       If you need to take down	the server to perform some maintainance	or
       upgrades, just kill the dequeuer	processes, perform the work, and start
       up new ones. Since there's no 'socket' or similar point of failure
       aside from the directory	itself,	the queue will just quietly fill with
       waiting jobs until the new dequeuer is ready.

       Arbitrary 'name = value'	string-pair metadata can be transferred
       alongside data files.   In fact,	in some	cases, you may find it easier
       to send unused and empty	data files, and	just use the 'metadata'	fields
       to transfer the details of what will be worked on.

       $dq->new	($opts);
	   Create a new	queue object, suitable for either enqueueing jobs or
	   picking up already-queued jobs for processing.

	   $opts is a reference	to a hash, which may contain the following

	   dir => $path_to_directory (no default)
	       Name the	directory where	the queue files	are stored.  This is

	   data_file_mode => $mode (default: 0666)
	       The "chmod"-style file mode for data files.  This should	be
	       specified as a string with a leading 0.	It will	be affected by
	       the current process "umask".

	   queue_file_mode => $mode (default: 0666)
	       The "chmod"-style file mode for queue control files.  This
	       should be specified as a	string with a leading 0.  It will be
	       affected	by the current process "umask".

	   ordered => {	0 | 1 }	(default: 1)
	       Whether the jobs	should be processed in order of	submission, or
	       in no particular	order.

	   queue_fanout	=> { 0 | 1 } (default: 0)
	       Whether the queue directory should be 'fanned out'.  This
	       allows better scalability with NFS-shared queues	with large
	       numbers of pending files, but hurts performance otherwise.   It
	       also implies ordered = 0. (This is strictly experimental, has
	       overall poor performance, and is	not recommended.)

	   indexd_uri => $uri (default:	undef)
	       A URI of	a "dq-indexd" daemon, used to maintain the list	of
	       waiting jobs.  The URI must be of the form
	       "dq://hostname[:port]" .	(This is strictly experimental,	and is
	       not recommended.)

	   buf_size => $number (default: 65536)
	       The buffer size to use when copying files, in bytes.

	   active_file_lifetime	=> $number (default: 600)
	       The lifetime of an untouched active lockfile, in	seconds.  See
	       'STALE LOCKS AND	SIGNAL HANDLING', below, for more details.

       $dq->enqueue_file ($filename [, $metadata [, $pri] ] );
	   Enqueue a new job for processing. Returns 1 if the job was
	   enqueued, or	"undef"	on failure.

	   $filename is	the path to the	file to	be enqueued.  Its contents
	   will	be read, and will be used as the contents of the data file
	   available to	dequeuers using	"IPC::DirQueue::Job::get_data_path()".

	   $metadata is	an optional hash reference; every item of metadata
	   will	be available to	worker processes on the	"IPC::DirQueue::Job"
	   object, in the "$job->{metadata}" hashref.  Note that using this
	   channel for metadata	brings with it several restrictions:

	   1. it requires that the metadata be stored as 'name'	=> 'value'
	   string pairs
	   2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0)
	   3. 'name' cannot contain colon (:) characters
	   4. 'name' cannot start with a capital letter	'Q' and	be 4
	   characters in length

	   If those restrictions are broken, die() will	be called with the
	   following error:

		 die "IPC::DirQueue: invalid metadatum:	'$k'";

	   This	is a change added in release 0.06; prior to that, that
	   metadatum would be silently dropped.

	   An optional priority	can be specified; lower	priorities are run
	   first.  Priorities range from 0 to 99, and 50 is default.

       $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] );
	   Enqueue a new job for processing. Returns 1 if the job was
	   enqueued, or	"undef"	on failure. $pri and $metadata are as
	   described in	"$dq->enqueue_file()".

	   $filehandle is a perl file handle that must be open for reading.
	   It will be closed on	completion, regardless of success or failure.
	   Its contents	will be	read, and will be used as the contents of the
	   data	file available to dequeuers using

       $dq->enqueue_string ($string [, $metadata [, $pri] ] );
	   Enqueue a new job for processing.  The job data is entirely read
	   from	$string. Returns 1 if the job was enqueued, or "undef" on
	   failure.  $pri and $metadata	are as described in

       $dq->enqueue_sub	($subref [, $metadata [, $pri] ] );
	   Enqueue a new job for processing. Returns 1 if the job was
	   enqueued, or	"undef"	on failure. $pri and $metadata are as
	   described in	"$dq->enqueue_file()".

	   $subref is a	perl subroutine, which is expected to return one of
	   the following each time it is called:

	       - a string of data bytes	to be appended to any existing data.  (the
		 string	may be empty, C<''>, in	which case it's	a no-op.)

	       - C<undef> when the enqueued data has ended, ie.	EOF.

	       - C<die()> if an	error occurs.  The C<die()> message will be converted into
		 a warning, and	the C<enqueue_sub()> call will return C<undef>.

	   (Tip: note that this	is a closure, so variables outside the
	   subroutine can be accessed safely.)

       $job = $dq->pickup_queued_job( [	path =>	$path ]	);
	   Pick	up the next job	in the queue, so that it can be	processed.

	   If no job is	available for processing, either because the queue is
	   empty or because other worker processes are already working on
	   them, "undef" is returned; otherwise, a new instance	of
	   "IPC::DirQueue::Job"	is returned.

	   Note	that the job is	marked as active until "$job->finish()"	is

	   If the (optional) parameter "path" is used, its value indicates the
	   path	of the desired job's data file.	By using this, it is possible
	   to cancel not-yet-active items from anywhere	in the queue, or pick
	   up jobs out of sequence.  The data path must	match the value	of the
	   pathqueue member of the "IPC::DirQueue::Job"	object passed to the
	   "visit_all_jobs()" callback.

       $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
	   Wait	for a job to be	queued within the next $timeout	seconds.

	   If there is already a job ready for processing, this	will return
	   immediately.	 If one	is not available, it will sleep, wake up
	   periodically, check for job availabilty, and	either carry on
	   sleeping or return the new job if one is now	available.

	   If a	job becomes available, a new instance of "IPC::DirQueue::Job"
	   is returned.	If the timeout is reached, "undef" is returned.

	   If $timeout is not specified, or is less than 1, this function will
	   wait	indefinitely.

	   The optional	parameter $pollinterval	indicates how frequently to
	   wake	up and check for new jobs.  It is specified in seconds,	and
	   floating-point precision is supported.  The default is 1.

	   Note	that if	$timeout is not	a round	multiple of $pollinterval, the
	   nearest round multiple of $pollinterval greater than	$timeout will
	   be used instead.  Also note that $timeout is	used as	an integer.

       $dq->visit_all_jobs($visitor, $visitcontext);
	   Visit all the jobs in the queue, in a read-only mode.  Used to list
	   the entire queue.

	   The callback	function $visitor will be called for each job in the
	   queue, like so:

	     &$visitor ($visitcontext, $job);

	   $visitcontext is whatever you pass in that variable above.  $job is
	   a new, read-only instance of	"IPC::DirQueue::Job" representing that

	   If a	job is active (being processed), the $job object also contains
	   the following additional data:

	     'active_host': the	hostname on which the job is active
	     'active_pid': the process ID of the process which picked up the job

       If interrupted or terminated, dequeueing	processes should be careful to
       either call "$job->finish()" or "$job->return_to_queue()" on any	active
       tasks before exiting -- otherwise those jobs will remain	marked active.

       Dequeueing processes can	also call "$job->touch_active_lock()"
       periodically, while processing large tasks, to ensure that the task is
       still marked as active.

       Stale locks are normally	dealt with automatically.  If a	lock is	still
       active after about 10 minutes of	inactivity, the	other dequeuers	on
       that machine will probe the process ID listed in	that lock file using
       kill(0).	 If that process ID is no longer running, the lock is presumed
       likely to be stale. If a	given timeout (10 minutes plus a random	value
       between 0 and 256 seconds) has elapsed since the	lock file was last
       modified, the lock file is deleted.

       This 10-minute default can be modified using the	"active_file_lifetime"
       parameter to the	"IPC::DirQueue"	constructor.

       Note: this means	that if	the dequeueing processes are spread among
       multiple	machines, and there is no longer a dequeuer running on the
       machine that initially 'locked' the task, it will never be unlocked,
       unless you delete the active file for that task.

       "IPC::DirQueue" maintains the following structure for a queue

       queue directory
	   The queue directory is used to store	the queue control files.
	   Queue control files determine what jobs are in the queue; if	a job
	   has a queue control file in this directory, it is listed in the

	   The filename	format is as follows:


	   The first two digits	(50) are the priority of the job.  Lower
	   priority numbers are	run first.  20040909232529 is the current date
	   and time when the enqueueing	process	was run, in "YYYYMMDDHHMMSS"
	   format.   941258 is the time	in microseconds, as returned by
	   "gettimeofday()".  And finally, "HASH" is a variable-length hash of
	   some	semi-random data, used to increase the chance of uniqueness.

	   If there is a collision, the	timestamps are regenerated after a 250
	   msec	sleep, and further randomness will be added at the end of the
	   string (namely, the current process ID and a	random integer value).
	   Up to 10 retries will be attempted.	Once the file is atomically
	   moved into the queue	directory without collision, the retries

	   If queue_fanout was used in the "IPC::DirQueue" constructor,	then
	   the queue directory does not	contain	the queue control files
	   directly; instead, there is an interposing set of 16	"fan-out"
	   directories,	named according	to the hex digits from 0 to "f".

       active directory
	   The active directory	is used	to store active	queue control files.

	   When	a job becomes 'active' -- ie. is picked	up by
	   "pickup_queued_job()" -- its	control	file is	moved from the queue
	   directory into the active directory while it	is processed.

       data directory
	   The data directory is used to store enqueued	data files.

	   It contains a two-level "fan-out" hashed directory structure; each
	   data	file is	stored under a single-letter directory,	which in turn
	   is under a single-letter directory.	 This increases	the efficiency
	   of directory	lookups	under many filesystems.

	   The format of filenames here	is similar to that used	in the queue
	   directory, except that the last two characters are removed and used
	   instead for the "fan-out" directory names.

       tmp directory
	   The tmp directory contains temporary	work files that	are in the
	   process of enqueueing, and not ready	ready for processing.

	   The filename	format here is similar to the above, with suffixes
	   indicating the type of file (".ctrl", ".data").

       Atomic, NFS-safe	renaming is used to avoid collisions, overwriting or
       other unsafe operations.


       Justin Mason <dq	/at/>

       The IPC::DirQueue mailing list is at <>.

       "IPC::DirQueue" is distributed under the	same license as	perl itself.

       The latest version of this library is likely to be available from CPAN.

perl v5.32.0			  2008-04-18		      IPC::DirQueue(3)


Want to link to this manual page? Use this URL:

home | help