Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages

  
 
  

home | help
MCE::Queue(3)	      User Contributed Perl Documentation	 MCE::Queue(3)

NAME
       MCE::Queue - Hybrid (normal and priority) queues

VERSION
       This document describes MCE::Queue version 1.829

SYNOPSIS
	  use MCE;
	  use MCE::Queue;

	  my $q	= MCE::Queue->new;

	  $q->enqueue( qw/ wherefore art thou romeo / );

	  my $item = $q->dequeue;

	  if ( $q->pending ) {
	     ;
	  }

DESCRIPTION
       This module provides a queue interface supporting normal	and priority
       queues and utilizing the	IPC engine behind MCE. Data resides under the
       manager process.	Three options are available for	overriding the default
       value for new queues. The porder	option applies to priority queues
       only.

	  use MCE::Queue porder	=> $MCE::Queue::HIGHEST,
			 type	=> $MCE::Queue::FIFO,
			 fast	=> 0;

	  use MCE::Queue;		 # Same	as above

	  ## Possible values

	  porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
		    $MCE::Queue::LOWEST	 # Lowest priority items dequeue first

	  type	 => $MCE::Queue::FIFO	 # First in, first out
		    $MCE::Queue::LIFO	 # Last	in, first out
		    $MCE::Queue::LILO	 # (Synonym for	FIFO)
		    $MCE::Queue::FILO	 # (Synonym for	LIFO)

DEMONSTRATION
       MCE::Queue provides two run modes.

       (A) The "MCE::Queue" object is constructed before running MCE. The data
       resides under the manager process. Workers send and request data	via
       IPC.

       (B) Workers might want to construct a queue for local access. In	this
       mode, the data resides under the	worker process and not available to
       other workers including the manager process.

	  use MCE;
	  use MCE::Queue;

	  my $F	= MCE::Queue->new( fast	=> 1 );
	  my $consumers	= 8;

	  my $mce = MCE->new(

	     task_end => sub {
		my ($mce, $task_id, $task_name)	= @_;
		$F->end() if $task_name	eq 'dir';
	     },

	     user_tasks	=> [{
		max_workers => 1, task_name => 'dir',

		user_func => sub {
		   ## Create a "standalone queue" only accessible to this worker.
		   my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);

		   while (defined (my $dir = $D->dequeue_nb)) {
		      my (@files, @dirs); foreach (glob("$dir/*")) {
			 if (-d	$_) { push @dirs, $_; next; }
			 push @files, $_;
		      }
		      $D->enqueue(@dirs	) if scalar @dirs;
		      $F->enqueue(@files) if scalar @files;
		   }
		}
	     },{
		max_workers => $consumers, task_name =>	'file',

		user_func => sub {
		   while (defined (my $file = $F->dequeue)) {
		      MCE->say($file);
		   }
		}
	     }]

	  )->run({ user_args =>	[ $ARGV[0] || '.' ] });

	  __END__

	  Results taken	from files_mce.pl and files_thr.pl on the web.
	  https://github.com/marioroy/mce-examples/tree/master/other

	  Usage:
	     time ./files_mce.pl /usr 0	| wc -l
	     time ./files_mce.pl /usr 1	| wc -l
	     time ./files_thr.pl /usr	| wc -l

	  Darwin (OS)	 /usr:	  216,271 files
	     MCE::Queue, fast => 0 :	4.17s
	     MCE::Queue, fast => 1 :	2.62s
	     Thread::Queue	   :	4.14s

	  Linux	(VM)	 /usr:	  186,154 files
	     MCE::Queue, fast => 0 :   12.57s
	     MCE::Queue, fast => 1 :	3.36s
	     Thread::Queue	   :	5.91s

	  Solaris (VM)	 /usr:	  603,051 files
	     MCE::Queue, fast => 0 :   39.04s
	     MCE::Queue, fast => 1 :   18.08s
	     Thread::Queue	* Perl not built to support threads

API DOCUMENTATION
   MCE::Queue->new ( [ queue =>	\@array, await => 1, fast => 1 ] )
       This creates a new queue. Available options are queue, porder, type,
       await, fast, and	gather.

	  use MCE;
	  use MCE::Queue;

	  my $q1 = MCE::Queue->new();
	  my $q2 = MCE::Queue->new( queue  => [	0, 1, 2	] );

	  my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
	  my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST  );

	  my $q5 = MCE::Queue->new( type   => $MCE::Queue::FIFO	);
	  my $q6 = MCE::Queue->new( type   => $MCE::Queue::LIFO	);

	  my $q7 = MCE::Queue->new( await  => 1	);
	  my $q8 = MCE::Queue->new( fast   => 1	);

       The 'await' option, when	enabled, allows	workers	to block (semaphore-
       like) until the number of items pending is equal	or less	than a
       threshold value.	 The $q->await method is described below.

       The 'fast' option speeds	up dequeues and	is not enabled by default. It
       is beneficial for queues	not calling (->clear or	->dequeue_nb) and not
       altering	the optional count value while running;	e.g.
       ->dequeue($count). Basically, do	not enable 'fast' if varying the count
       dynamically.

       The 'gather' option is mainly for running with MCE and wanting to pass
       item(s) to a callback function for appending to the queue. Multiple
       queues may point	to the same callback function. The callback receives
       the queue object	as the first argument and items	after it.

	  sub _append {
	     my	($q, @items) = @_;
	     $q->enqueue(@items);
	  }

	  my $q7 = MCE::Queue->new( gather => \&_append	);
	  my $q8 = MCE::Queue->new( gather => \&_append	);

	  ## Items are diverted	to the callback	function, not the queue.
	  $q7->enqueue(	'apple', 'orange' );

       Specifying the 'gather' option allows one to store items	temporarily
       while ensuring output order. Although a queue object is not required,
       this is simply a	demonstration of the gather option in the context of a
       queue.

	  use MCE;
	  use MCE::Queue;

	  sub preserve_order {
	     my	%tmp; my $order_id = 1;

	     return sub	{
		my ($q,	$chunk_id, $data) = @_;
		$tmp{$chunk_id}	= $data;

		while (1) {
		   last	unless exists $tmp{$order_id};
		   $q->enqueue(	delete $tmp{$order_id++} );
		}

		return;
	     };
	  }

	  my @squares; my $q = MCE::Queue->new(
	     queue => \@squares, gather	=> preserve_order
	  );

	  my $mce = MCE->new(
	     chunk_size	=> 1, input_data => [ 1	.. 100 ],
	     user_func => sub {
		$q->enqueue( MCE->chunk_id, $_ * $_ );
	     }
	  );

	  $mce->run;

	  print	"@squares\n";

   $q->await ( $pending_threshold )
       The await method	is beneficial when wanting to throttle worker(s)
       appending to the	queue. Perhaps,	consumers are running a	bit behind and
       wanting to keep tabs on memory consumption. Below, the number of	items
       pending will never go above 20.

	  use Time::HiRes qw( sleep );

	  use MCE::Flow;
	  use MCE::Queue;

	  my $q	= MCE::Queue->new( await => 1, fast => 1 );
	  my ( $producers, $consumers )	= ( 1, 8 );

	  mce_flow {
	     task_name	 => [ 'producer', 'consumer' ],
	     max_workers => [ $producers, $consumers ],
	  },
	  sub {
	     ##	producer
	     for my $item ( 1 .. 100 ) {
		$q->enqueue($item);

		## blocks until	the # of items pending reaches <= 10
		if ($item % 10 == 0) {
		   MCE->say( 'pending: '.$q->pending() );
		   $q->await(10);
		}
	     }

	     ##	notify consumers no more work
	     $q->end();

	  },
	  sub {
	     ##	consumers
	     while (defined (my	$next =	$q->dequeue()))	{
		MCE->say( MCE->task_wid().': '.$next );
		sleep 0.100;
	     }
	  };

   $q->clear ( void )
       Clears the queue	of any items. This has the effect of nulling the queue
       and the socket used for blocking.

	  my @a; my $q = MCE::Queue->new( queue	=> \@a );

	  @a = ();     ## bad, the blocking socket may become out of sync
	  $q->clear;   ## ok

   $q->end ( void )
       Stops the queue from receiving more items. Any worker blocking on
       "dequeue" will be unblocked automatically. Subsequent calls to
       "dequeue" will behave like "dequeue_nb".	Current	API available since
       MCE 1.818.

	  $q->end();

       MCE Models (e.g.	MCE::Flow) may persist between runs. In	that case, one
       might want to enqueue "undef"'s versus calling "end". The number	of
       "undef"'s depends on how	many items workers dequeue at a	time.

	  $q->enqueue((undef) x	($N_workers * 1));  # $q->dequeue()   1	item
	  $q->enqueue((undef) x	($N_workers * 2));  # $q->dequeue(2)  2	items
	  $q->enqueue((undef) x	($N_workers * N));  # $q->dequeue(N)  N	items

   $q->enqueue ( $item [, $item, ... ] )
       Appends a list of items onto the	end of the normal queue.

	  $q->enqueue( 'foo' );
	  $q->enqueue( 'bar', 'baz' );

   $q->enqueuep	( $p, $item [, $item, ... ] )
       Appends a list of items onto the	end of the priority queue with
       priority.

	  $q->enqueue( $priority, 'foo'	);
	  $q->enqueue( $priority, 'bar', 'baz' );

   $q->dequeue ( [ $count ] )
       Returns the requested number of items (default 1) from the queue.
       Priority	data will always dequeue first before any data from the	normal
       queue.

	  $q->dequeue( 2 );
	  $q->dequeue; # default 1

       The method will block if	the queue contains zero	items. If the queue
       contains	fewer than the requested number	of items, the method will not
       block, but return whatever items	there are on the queue.

       The $count, used	for requesting the number of items, is beneficial when
       workers are passing parameters through the queue. For this reason,
       always remember to dequeue using	the same multiple for the count. This
       is unlike Thread::Queue which will block	until the requested number of
       items are available.

	  # MCE::Queue 1.820 and prior releases
	  while	( my @items = $q->dequeue(2) ) {
	     last unless ( defined $items[0] );
	     ...
	  }

	  # MCE::Queue 1.821 and later
	  while	( my @items = $q->dequeue(2) ) {
	     ...
	  }

   $q->dequeue_nb ( [ $count ] )
       Returns the requested number of items (default 1) from the queue. Like
       with dequeue, priority data will	always dequeue first. This method is
       non-blocking and	returns	"undef"	in the absence of data.

	  $q->dequeue_nb( 2 );
	  $q->dequeue_nb; # default 1

   $q->insert (	$index,	$item [, $item,	... ] )
       Adds the	list of	items to the queue at the specified index position (0
       is the head of the list). The head of the queue is that item which
       would be	removed	by a call to dequeue.

	  $q = MCE::Queue->new(	type =>	$MCE::Queue::FIFO );
	  $q->enqueue(1, 2, 3, 4);
	  $q->insert(1,	'foo', 'bar');
	  # Queue now contains:	1, foo,	bar, 2,	3, 4

	  $q = MCE::Queue->new(	type =>	$MCE::Queue::LIFO );
	  $q->enqueue(1, 2, 3, 4);
	  $q->insert(1,	'foo', 'bar');
	  # Queue now contains:	1, 2, 3, 'foo',	'bar', 4

   $q->insertp ( $p, $index, $item [, $item, ... ] )
       Adds the	list of	items to the queue at the specified index position
       with priority. The behavior is similarly	to "$q-"insert>	otherwise.

   $q->pending ( void )
       Returns the number of items in the queue. The count includes both
       normal and priority data. Returns "undef" if the	queue has been ended,
       and there are no	more items in the queue.

	  $q = MCE::Queue->new();
	  $q->enqueuep(5, 'foo', 'bar');
	  $q->enqueue('sunny', 'day');

	  print	$q->pending(), "\n";
	  # Output: 4

   $q->peek ( [	$index ] )
       Returns an item from the	normal queue, at the specified index, without
       dequeuing anything. It defaults to the head of the queue	if index is
       not specified. The head of the queue is that item which would be
       removed by a call to dequeue. Negative index values are supported,
       similarly to arrays.

	  $q = MCE::Queue->new(	type =>	$MCE::Queue::FIFO );
	  $q->enqueue(1, 2, 3, 4, 5);

	  print	$q->peek(1), ' ', $q->peek(-2),	"\n";
	  # Output: 2 4

	  $q = MCE::Queue->new(	type =>	$MCE::Queue::LIFO );
	  $q->enqueue(1, 2, 3, 4, 5);

	  print	$q->peek(1), ' ', $q->peek(-2),	"\n";
	  # Output: 4 2

   $q->peekp ( $p [, $index ] )
       Returns an item from the	queue with priority, at	the specified index,
       without dequeuing anything. It defaults to the head of the queue	if
       index is	not specified. The behavior is similarly to "$q-"peek>
       otherwise.

   $q->peekh ( [ $index	] )
       Returns an item from the	head of	the heap or at the specified index.

	  $q = MCE::Queue->new(	porder => $MCE::Queue::HIGHEST );
	  $q->enqueuep(5, 'foo');
	  $q->enqueuep(6, 'bar');
	  $q->enqueuep(4, 'sun');

	  print	$q->peekh(0), "\n";
	  # Output: 6

	  $q = MCE::Queue->new(	porder => $MCE::Queue::LOWEST );
	  $q->enqueuep(5, 'foo');
	  $q->enqueuep(6, 'bar');
	  $q->enqueuep(4, 'sun');

	  print	$q->peekh(0), "\n";
	  # Output: 4

   $q->heap ( void )
       Returns an array	containing the heap data. Heap data consists of
       priority	numbers, not the data.

	  @h = $q->heap;   # $MCE::Queue::HIGHEST
	  # Heap contains: 6, 5, 4

	  @h = $q->heap;   # $MCE::Queue::LOWEST
	  # Heap contains: 4, 5, 6

ACKNOWLEDGMENTS
       List::BinarySearch
	  The bsearch_num_pos method was helpful for accommodating the highest
	  and lowest order in MCE::Queue.

       POE::Queue::Array
	  For extra optimization, two if statements were adopted for checking
	  if the item belongs at the end or head of the	queue.

       List::Priority
	  MCE::Queue supports both normal and priority queues.

       Thread::Queue
	  Thread::Queue	is used	as a template for identifying and documenting
	  the methods.

	  MCE::Queue is	not fully compatible due to supporting normal and
	  priority queues simultaneously; e.g.

	     $q->enqueue( $item	[, $item, ... ]	);	   # normal queue
	     $q->enqueuep( $p, $item [,	$item, ... ] );	   # priority queue

	     $q->dequeue( [ $count ] );	     # priority	data dequeues first
	     $q->dequeue_nb( [ $count ]	);

	     $q->pending();		     # counts both normal/priority queues

       Parallel::DataPipe
	  The recursion	example, in the	synopsis above,	was largely adopted
	  from this module.

INDEX
       MCE, MCE::Core

AUTHOR
       Mario E.	Roy, <marioeroyA ATA gmailA DOTA com>

perl v5.24.1			  2017-05-03			 MCE::Queue(3)

NAME | VERSION | SYNOPSIS | DESCRIPTION | DEMONSTRATION | API DOCUMENTATION | ACKNOWLEDGMENTS | INDEX | AUTHOR

Want to link to this manual page? Use this URL:
<https://www.freebsd.org/cgi/man.cgi?query=MCE::Queue&sektion=3&manpath=FreeBSD+12.1-RELEASE+and+Ports>

home | help