Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages

  
 
  

home | help
Thread::Queue::Duplex(User Contributed Perl DocumentatThread::Queue::Duplex(3)

NAME
       Thread::Queue::Duplex - thread-safe request/response queue with
       identifiable elements

SYNOPSIS
	       use Thread::Queue::Duplex;
	       #
	       #       create new queue, and require that there	be
	       #       registered listeners for	an enqueue operation
	       #       to succeed, and limit the max pending requests
	       #       to 20
	       #
	       my $q = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 20);
	       #
	       #       register	as a listener
	       #
	       $q->listen();
	       #
	       #       unregister as a listener
	       #
	       $q->ignore();
	       #
	       #       wait for	a listener to register
	       #
	       $q->wait_for_listener($timeout);
	       #
	       #       change the max pending limit
	       #
	       $q->set_max_pending($limit);
	       #
	       #       enqueue elements, returning a unique queue ID
	       #       (used in	the client)
	       #
	       my $id =	$q->enqueue("foo", "bar");
	       #
	       #       enqueue elements, and wait for a	response
	       #       (used in	the client)
	       #
	       my $resp	= $q->enqueue_and_wait("foo", "bar");
	       #
	       #       enqueue elements, and wait for a	response
	       #       until $timeout secs (used in the	client)
	       #
	       my $resp	= $q->enqueue_and_wait_until($timeout, "foo", "bar");
	       #
	       #       enqueue elements	at head	of queue, returning a
	       #       unique queue ID (used in	the client)
	       #
	       my $id =	$q->enqueue_urgent("foo", "bar");
	       #
	       #       enqueue elements	at head	of queue and wait for response
	       #
	       my $resp	= $q->enqueue_urgent_and_wait("foo", "bar");
	       #
	       #       enqueue elements	at head	of queue and wait for
	       #       response	until $timeout secs
	       #
	       my $resp	= $q->enqueue_urgent_and_wait_until($timeout, "foo", "bar");
	       #
	       #       enqueue elements	for simplex operation (no response)
	       #       returning the queue object
	       #
	       $q->enqueue_simplex("foo", "bar");

	       $q->enqueue_simplex_urgent("foo", "bar");
	       #
	       #       dequeue next available element (used in the server),
	       #       waiting indefinitely for	an element to be made available
	       #       returns shared arrayref,	first element is unique	ID,
	       #       which may be undef for simplex requests
	       #
	       my $foo = $q->dequeue;
	       #
	       #       dequeue next available element (used in the server),
	       #       returns undef if	no element immediately available
	       #       otherwise, returns shared arrayref, first element is unique ID,
	       #       which may be undef for simplex requests
	       #
	       my $foo = $q->dequeue_nb;
	       #
	       #       dequeue next available element (used in the server),
	       #       returned	undef if no element available within $timeout
	       #       seconds;	otherwise, returns shared arrayref, first
	       #       element is unique ID, which may be undef	for simplex requests
	       #
	       my $foo = $q->dequeue_until($timeout);
	       #
	       #       dequeue next available element (used in the server),
	       #       but only	if marked urgent; otherwise, returns undef
	       #
	       my $foo = $q->dequeue_urgent();
	       #
	       #       returns number of items still in	queue
	       #
	       my $left	= $q->pending;
	       #
	       #       maps a response for the
	       #       queued element identified by $id;
	       #
	       $q->respond($id,	@list);
	       #
	       #       tests for a response to the queued
	       #       element identified by $id; returns undef	if
	       #       not yet available, else returns the queue object
	       #
	       my $result = $q->ready($id);
	       #
	       #       returns list of available response ID's;
	       #       if list provided, only returns ID's from	the list.
	       #       Returns undef if	none available.
	       #       In scalar context, returns only first available;
	       #       Else a list of available	IDs
	       #
	       my @ids = $q->available();
	       #
	       #       wait for	and return the response	for the
	       #       specified unique	identifier
	       #       (dequeue_response is alias)
	       #
	       my $result = $q->wait($id);
	       my $result = $q->dequeue_response($id);
	       #
	       #       waits up	to $timeout seconds for	a response to
	       #       the queued element identified by	$id; returns undef if
	       #       not available within $timeout, else returns the queue object
	       #
	       my $result = $q->wait_until($id,	$timeout);
	       #
	       #       wait for	a response to the queued
	       #       elements	listed in @ids,	returning a hashref of
	       #       the first available response(s),	keyed by id
	       #
	       my $result = $q->wait_any(@ids);
	       #
	       #       wait upto $timeout seconds for a	response to
	       #       the queued elements listed in @ids, returning
	       #       a hashref of the	first available	response(s), keyed by id
	       #       Returns undef if	none available in $timeout seconds
	       #
	       my $result = $q->wait_any_until($timeout, @ids);
	       #
	       #       wait for	responses to all the queued
	       #       elements	listed in @ids,	returning a hashref of
	       #       the response(s),	keyed by id
	       #
	       my $result = $q->wait_all(@ids);
	       #
	       #       wait upto $timeout seconds for responses	to
	       #       all the queued elements listed in @ids, returning
	       #       a hashref of the	response(s), keyed by id
	       #       Returns undef if	all responses not recv'd
	       #       in $timeout seconds
	       #
	       my $result = $q->wait_all_until($timeout, @ids);
	       #
	       #       mark an existing	request
	       #
	       $q->mark($id, 'CANCEL');
	       #
	       #       test if a request is marked
	       #
	       print "Marked for cancel!"
		       if $q->marked($id, 'CANCEL');
	       #
	       #       cancel specific operations
	       #
	       my $result = $q->cancel(@ids);
	       #
	       #       cancel all operations
	       #
	       my $result = $q->cancel_all();
	       #
	       #       test if specified request has been cancelled
	       #
	       my $result = $q->cancelled($id);
	       #
	       #       (class-level method) wait for an	event on
	       #       any of the listed queue objects.	Returns	a
	       #       list of queues which have events	pending
	       #
	       my $result = Thread::Queue::Duplex->wait_any(
		       [ $q1 ],	[ $q2, @ids ]);
	       #
	       #       (class-level method) wait upto $timeout seconds
	       #       for an event on any of the listed queue objects.
	       #       Returns undef if	none available in $timeout seconds,
	       #       otherwise, returns a list of queues with	events pending
	       #
	       my $result = Thread::Queue::Duplex->wait_any_until(
		       $timeout, [ $q1 ], [ $q2, @ids ]);
	       #
	       #       (class-level method) wait for events on all the listed
	       #       queue objects. Returns the list of queue	objects.
	       #
	       my $result = Thread::Queue::Duplex->wait_all(
		       [ $q1 ],	[ $q2, @ids ]);
	       #
	       #       (class-level method) wait upto $timeout seconds for
	       #       events on all the listed	queue objects.
	       #       Returns empty list if all listed	queues do not have
	       #       an event	in $timeout seconds, otherwise returns
	       #       the list	of queues
	       #
	       my $result = Thread::Queue::Duplex->wait_all_until(
		       $timeout, [ $q1 ], [ $q2, @ids ]);

DESCRIPTION
       A mapped	queue, similar to Thread::Queue, except	that as	elements are
       queued, they are	assigned unique	identifiers, which are used to
       identify	responses returned from	the dequeueing thread. This class
       provides	a simple RPC-like mechanism between multiple client and	server
       threads,	so that	a single server	thread can safely multiplex requests
       from multiple client threads. Note that simplex versions	of the enqueue
       methods are provided which do not assign	unique identifiers, and	are
       used for	requests to which no response is required/expected.

       In addition, elements are inspected as they are enqueued/dequeued to
       determine if they are Thread::Queue::Queueable (aka TQQ)	objects, and,
       if so, the onEnqueue() or onDequeue() methods are called	to permit any
       additional class-specific marshalling/unmarshalling to be performed.
       Thread::Queue::Duplex (aka TQD) is itself a Thread::Queue::Queueable
       object, thus permitting TQD objects to be passed	between	threads.

       NOTE: Thread::Queue::Duplex does	not perform any	default	marshalling of
       complex structures; it is the responsibility of the application to
       either

       o   use threads::shared objects for all queued structures

       o   implement its own application specific marshalling via, e.g.,
	   Storable

       o   implement a Thread::Queue::Queueable	wrapper	class for the
	   structure

       Various "wait()"	methods	are provided to	permit waiting on individual
       responses, any or all of	a list of responses, and time-limited waits
       for each. Additionally, class-level versions of the "wait()" methods
       are provided to permit a	thread to simultaneously wait for either
       enqueue or response events on any of a number of	queues,	or on objects
       implementing Thread::Queue::TQDContainer.

       A "mark()" method is provided to	permit out-of-band information to be
       applied to pending requests. A responder	may test for marks via the
       "marked()" method prior to "respond()"ing to a request.	An application
       may specify a mark value, which the responder can test for; if no
       explicit	mark value is given, the value 1 is used.

       "cancel()" and "cancel_all()" methods are provided to explicitly	cancel
       one or more requests, and invoke	the "onCancel()" method	of any
       Thread::Queue::Queueable	objects	in the request.	Cancelling will	result
       in one of

       o   marking the request as cancelled if it has not yet been dequeued
	   (note that it cannot	be spliced from	the queue due
	   "threads::shared"'s lack of support for array splicing)

       o   removal and discarding of the response from the response map	if the
	   request has already been processed

       o   if the request is in	progress, the responder	will detect the
	   cancellation	when it	attempts to "respond()", and the response will
	   be discarded

       "listen()" and "ignore()" methods are provided so that server-side
       threads can register/unregister as listeners on the queue; the
       constructor accepts a "ListenerRequired"	attribute argument. If set,
       then any	"enqueue()" operation will fail	and return undef if there are
       no registered listeners.	This feature provides some safeguard against
       "stuck" requestor threads when the responder(s) have shutdown for some
       reason. In addition, a "wait_for_listener()" method is provided to
       permit an initiating thread to wait until another thread	registers as a
       listener.

       The constructor also accepts a "MaxPending" attribute that specifies
       the maximum number of requests that may be pending in the queue before
       the operation will block.  Note that responses are not counted in this
       limit.

       "Thread::Queue::Duplex" objects encapsulate

       o   a shared array, used	as the queue (same as Thread::Queue)

       o   a shared scalar, used to provide unique identifier sequence numbers

       o   a shared hash, aka the mapping hash,	used to	return responses to
	   enqueued elements, using the	generated uniqiue identifier as	the
	   hash	key

       o   a listener count, incremented each time "listen()" is called,
	   decremented each time "ignore()" is called, and, if the "listener
	   required" flag has been set on construction,	tested for each
	   "enqueue()" call.

       A normal	processing sequence for	Thread::Queue::Duplex might be:

	       #
	       #       Thread A	(the client):
	       #
		       ...marshal parameters for a coroutine...
		       my $id =	$q->enqueue('function_name', \@paramlist);
		       my $results = $q->dequeue_response($id);
		       ...process $results...
	       #
	       #       Thread B	(the server):
	       #
		       while (1) {
			       my $call	= $q->dequeue;
			       my ($id,	$func, @params)	= @$call;
			       $q->respond($id,	$self->$func(@params));
		       }

METHODS
       Refer to	the classdocs for summary and detailed method descriptions.

SEE ALSO
       Thread::Queue::Queueable, Thread::Queue::TQDContainer, threads,
       threads::shared,	Thread::Queue, Thread::Queue::Multiplex,
       Thread::Apartment

AUTHOR,	COPYRIGHT, & LICENSE
       Dean Arnold, Presicient Corp. darnold@presicient.com

       Copyright(C) 2005,2006, Presicient Corp., USA

       Licensed	under the Academic Free	License	version	2.1, as	specified in
       the License.txt file included in	this software package, or at
       OpenSource.org <http://www.opensource.org/licenses/afl-2.1.php>.

perl v5.32.0			  2006-03-18	      Thread::Queue::Duplex(3)

NAME | SYNOPSIS | DESCRIPTION | METHODS | SEE ALSO | AUTHOR, COPYRIGHT, & LICENSE

Want to link to this manual page? Use this URL:
<https://www.freebsd.org/cgi/man.cgi?query=Thread::Queue::Duplex&sektion=3&manpath=FreeBSD+12.2-RELEASE+and+Ports>

home | help