Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages


home | help
AnyEvent::Task(3)     User Contributed Perl Documentation    AnyEvent::Task(3)

       AnyEvent::Task -	Client/server-based asynchronous worker	pool

	   use AnyEvent::Task::Server;
	   use Authen::Passphrase::BlowfishCrypt;

	   my $dev_urandom;
	   my $server =	AnyEvent::Task::Server->new(
			  listen => ['unix/', '/tmp/anyevent-task.socket'],
			  setup	=> sub {
			    open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
			  interface => {
			    hash => sub	{
			      my ($plaintext) =	@_;
			      read($dev_urandom, my $salt, 16) == 16 ||	die "bad read from urandom";
			      return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
									    salt => $salt,
									    passphrase => $plaintext)

			    verify => sub {
			      my ($crypted, $plaintext)	= @_;
			      return Authen::Passphrase::BlowfishCrypt->from_crypt($crypted)

	   $server->run; # or AE::cv->recv

	   use AnyEvent::Task::Client;

	   my $client =	AnyEvent::Task::Client->new(
			  connect => ['unix/', '/tmp/anyevent-task.socket'],

	   my $checkout	= $client->checkout( timeout =>	5, );

	   my $cv = AE::cv;

	     sub {
	       my ($checkout, $crypted)	= @_;

	       print "Hashed password is $crypted\n";

	       $checkout->verify($crypted, 'secret',
		 sub {
		   my ($checkout, $result) = @_;
		   print "Verify result	is $result\n";


	   Hashed password is $2a$10$NwTOwxmTlG0Lk8YZMT29/uysC9RiZX4jtWCx.deBbb2evRjCq6ovi
	   Verify result is 1

	   use AnyEvent::Task::Server;
	   use DBI;

	   my $dbh;

	     listen => ['unix/', '/tmp/anyevent-task.socket'],
	     setup => sub {
	       $dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
	     interface => sub {
	       my ($method, @args) = @_;

	   use AnyEvent::Task::Client;

	   my $client =	AnyEvent::Task::Client->new(
			  connect => ['unix/', '/tmp/anyevent-task.socket'],

	   my $dbh = $client->checkout;

	   my $cv = AE::cv;

	   $dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
	     sub { });

	   ## Requests will queue up on	the checkout and execute in order:

	   $dbh->do(q{ INSERT INTO user	(username, email) VALUES (?, ?)	},
		    undef, 'jimmy',
	     sub { });

	   $dbh->selectrow_hashref(q{ SELECT * FROM user }, sub	{
	     my	($dbh, $user) =	@_;
	     print "username: $user->{username}, email:	$user->{email}\n";


	   username: jimmy, email:

       The synopses make this module look much more complicated	than it
       actually	is. In a nutshell, a synchronous worker	process	is forked off
       by a server whenever a client asks for one. The client keeps as many of
       these workers around as it wants	and delegates tasks to them

       Another way of saying that is that AnyEvent::Task is a pre-fork-on-
       demand server (AnyEvent::Task::Server) combined with a persistent
       worker-pooled client (AnyEvent::Task::Client).

       The examples in the synopses are	complete stand-alone programs. Run the
       server in one window and	the client in another. The server will remain
       running but the client will exit	after printing its output. Typically
       the "client" programs would be embedded in a server program such	as a

       Note that the client examples don't implement error checking (see the
       "ERROR HANDLING"	section).

       A server	is started with	"AnyEvent::Task::Server->new". This
       constructor should be passed in at least	the "listen" and "interface"
       arguments. Keep the returned server object around for as	long as	you
       want the	server to be running. "listen" is an array ref containing the
       host and	service	options	to be passed to	AnyEvent::Socket's
       "tcp_server" function. "interface" is the code that should handle each
       request.	See the	INTERFACE section below	for its	specification. A
       "setup" coderef can be passed in	to run some code after a new worker is
       forked. A "checkout_done" coderef can be	passed in to run some code
       whenever	a checkout is released in order	to perform any required	clean-

       A client	is started with	"AnyEvent::Task::Client->new". You only	need
       to pass "connect" to this constructor which is an array ref containing
       the host	and service options to be passed to AnyEvent::Socket's
       "tcp_connect". Keep the returned	client object around as	long as	you
       wish the	client to be connected.

       After the server	and client are initialised, each process must enter
       AnyEvent's "main	loop" in some way, possibly just "AE::cv->recv". The
       "run" method on the server object is a convenient short-cut for this.

       To acquire a worker process you call the	"checkout" method on the
       client object. The "checkout" method doesn't need any arguments,	but
       several optional	ones such as "timeout" are described below. As long as
       the checkout object is around, this checkout has	exclusive access to
       the worker.

       The checkout object is an object	that proxies its method	calls to a
       worker process or a function that does the same.	The arguments to this
       method/function are the arguments you wish to send to the worker
       process followed	by a callback to run when the operation	completes. The
       callback	will be	passed two arguments: the original checkout object and
       the value returned by the worker	process. The checkout object is	passed
       into the	callback as a convenience just in case you no longer have the
       original	checkout available lexically.

       In the event of an exception thrown by the worker process, a timeout,
       or some other unexpected	condition, an error is raised in the dynamic
       context of the callback (see the	"ERROR HANDLING" section).

       Both client and server are of course built with AnyEvent. However,
       workers can't use AnyEvent (yet). I've never found a need to do event
       processing in the worker	since if the library you wish to use is
       already AnyEvent-compatible you can simply use the library in the
       client process. If the client process is	too over-loaded, it may	make
       sense to	run multiple client processes.

       Each client maintains a "pool" of connections to	worker processes.
       Every time a checkout is	requested, the request is placed into a	first-
       come, first-serve queue.	Once a worker process becomes available, it is
       associated with that checkout until that	checkout is garbage collected
       which in	perl means as soon as it is no longer needed. Each checkout
       also maintains a	queue of requested method-calls	so that	as soon	as a
       worker process is allocated to a	checkout, any queued method calls are
       filled in order.

       "timeout" can be	passed as a keyword argument to	"checkout". Once a
       request is queued up on that checkout, a	timer of "timout" seconds
       (default	is 30, undef means infinity) is	started. If the	request
       completes during	this timeframe,	the timer is cancelled.	If the timer
       expires,	the worker connection is terminated and	an exception is	thrown
       in the dynamic context of the callback (see the "ERROR HANDLING"

       Note that since timeouts	are associated with a checkout,	checkouts can
       be created before the server is started.	As long	as the server is
       running within "timeout"	seconds, no error will be thrown and no
       requests	will be	lost. The client will continually try to acquire
       worker processes	until a	server is available, and once one is available
       it will attempt to allocate all queued checkouts.

       Because of checkout queuing, the	maximum	number of worker processes a
       client will attempt to obtain can be limited with the "max_workers"
       argument	when creating a	client object. If there	are more live
       checkouts than "max_workers", the remaining checkouts will have to wait
       until one of the	other workers becomes available. Because of timeouts,
       some checkouts may never	be serviced if the system can't	handle the
       load (the timeout error should be handled to indicate the service is
       temporarily unavailable).

       The "min_workers" argument determines how many "hot-standby" workers
       should be pre-forked when creating the client. The default is 2 though
       note that this may change to 0 in the future.

       Typically you will want to start	the client and server as completely
       separate	processes as shown in the synopses.

       Running the server and the client in the	same process is	technically
       possible	but is highly discouraged since	the server will	"fork()" when
       the client demands a new	worker process.	In this	case, all descriptors
       in use by the client are	duped into the worker process and the worker
       ought to	close these extra descriptors. Also, forking a busy client may
       be memory-inefficient (and dangerous if it uses threads).

       Since it's more of a bother than	it's worth to run the server and the
       client in the same process, there is an alternate server	constructor,
       "AnyEvent::Task::Server::fork_task_server" for when you'd like to fork
       a dedicated server process. It can be passed the	same arguments as the
       regular "new" constructor:

	   ## my ($keepalive_pipe, $server_pid)	=
	     listen => ['unix/', '/tmp/anyevent-task.socket'],
	     interface => sub {
				return "Hello from PID $$";

       The only	differences between this and the regular constructor is	that
       "fork_task_server" will fork a process which becomes the	server and
       will also install a "keep-alive"	pipe between the server	and the
       client. This keep-alive pipe will be used by the	server to detect when
       its parent (the client process) exits.

       If "AnyEvent::Task::Server::fork_task_server" is	called in a void
       context then the	reference to this keep-alive pipe is pushed onto
       @AnyEvent::Task::Server::children_sockets. Otherwise, the keep-alive
       pipe and	the server's PID are returned. Closing the pipe	will terminate
       the server gracefully. "kill" the PID to	terminate it immediately. Note
       that even when the server is shutdown, existing worker processes	and
       checkouts may still be active in	the client. The	client object and all
       checkout	objects	should be destroyed if you wish	to ensure all workers
       are shutdown.

       Since the "fork_task_server" constructor	calls fork and requires	using
       AnyEvent	in both	the parent and child processes,	it is important	that
       you not install any AnyEvent watchers before calling it.	The usual
       caveats about forking AnyEvent processes	apply (see the AnyEvent	docs).

       You should also not call	"fork_task_server" after having	started
       threads since, again, this function calls fork. Forking a threaded
       process is dangerous because the	threads	might have userspace data-
       structures in inconsistent states at the	time of	the fork.

       When creating a server, there are two possible formats for the
       "interface" option. The first and most general is a coderef. This
       coderef will be passed the list of arguments that were sent when	the
       checkout	was called in the client process (without the trailing
       callback	of course).

       As described above, you can use a checkout object as a coderef or as an
       object with methods. If the checkout is invoked as an object, the
       method name is prepended	to the arguments passed	to "interface":

	   interface =>	sub {
	     my	($method, @args) = @_;

       If the checkout is invoked as a coderef,	method is omitted:

	   interface =>	sub {
	     my	(@args)	= @_;

       The second format possible for "interface" is a hash ref. This is a
       simple method dispatch feature where the	method invoked on the checkout
       object is the key used to lookup	which coderef to run in	the worker:

	   interface =>	{
	     method1 =>	sub {
	       my (@args) = @_;
	     method2 =>	sub {
	       my (@args) = @_;

       Note that since the protocol between the	client and the worker process
       is currently JSON-based,	all arguments and return values	must be
       serializable to JSON. This includes most	perl scalars like strings, a
       limited range of	numerical types, and hash/list constructs with no
       cyclical	references.

       Because there isn't any way for the callback to indicate	the context it
       desires,	interface subs are always called in scalar context.

       A future	backwards compatible RPC protocol may use Sereal. Although
       it's inefficient	you can	already	serialise an object with Sereal
       manually, send the resulting string over	the existing protocol, and
       then deserialise	it in the worker.

       Because workers run in a	separate process, they can't directly use
       logging contexts	in the client process. That is why this	module is
       integrated with Log::Defer.

       A Log::Defer object is created on demand	in the worker process. Once
       the worker is done an operation,	any messages in	the object will	be
       extracted and sent back to the client. The client then merges this into
       its main	Log::Defer object that was passed in when creating the

       In your server code, use	AnyEvent::Task::Logger.	It exports the
       function	"logger" which returns a Log::Defer object:

	   use AnyEvent::Task::Server;
	   use AnyEvent::Task::Logger;

	     listen => ['unix/', '/tmp/anyevent-task.socket'],
	     interface => sub {
	       logger->info('about to compute some operation');
		 my $timer = logger->timer('computing some operation');
		 select	undef,undef,undef, 1; ## sleep for 1 second

       Note: Portable server code should never call "sleep" because on some
       systems it will interfere with the recoverable worker timeout feature
       implemented with	"SIGALRM".

       In your client code, pass a Log::Defer object in	when you create	a

	   use AnyEvent::Task::Client;
	   use Log::Defer;

	   my $client =	AnyEvent::Task::Client->new(
			  connect => ['unix/', '/tmp/anyevent-task.socket'],

	   my $log_defer_object	= Log::Defer->new(sub {
						    my $msg = shift;
						    use	Data::Dumper; ## or whatever
						    print Dumper($msg);

	   $log_defer_object->info('going to compute some operation in a worker');

	   my $checkout	= $client->checkout(log_defer_object =>	$log_defer_object);

	   my $cv = AE::cv;

	   $checkout->(sub {
	     $log_defer_object->info('finished some operation');


       When run, the above client will print something like this:

	   $VAR1 = {
		 'start' => '1363232705.96839',
		 'end' => '1.027309',
		 'logs'	=> [
			       'going to compute some operation	in a worker'
			       'about to compute some operation'
			       'finished some operation'
		 'timers' => {
			       'computing some operation' => [

       In a synchronous	program, if you	expected some operation	to throw an
       exception you might wrap	it in "eval" like this:

	   my $crypted;

	   eval	{
	     $crypted =	hash('secret');

	   if ($@) {
	     say "hash failed: $@";
	   } else {
	     say "hashed password is $crypted";

       But in an asynchronous program, typically "hash"	would initiate some
       kind of asynchronous operation and then return immediately, allowing
       the program to go about other tasks while waiting for the result. Since
       the error might come back at any	time in	the future, the	program	needs
       a way to	map the	exception that is thrown back to the original context.

       AnyEvent::Task accomplishes this	mapping	with Callback::Frame.

       Callback::Frame lets you	preserve error handlers	(and "local"
       variables) across asynchronous callbacks. Callback::Frame is not	tied
       to AnyEvent::Task, AnyEvent or any other	async framework	and can	be
       used with almost	all callback-based libraries.

       However,	when using AnyEvent::Task, libraries that you use in the
       client must be AnyEvent compatible. This	restriction obviously does not
       apply to	your server code, that being the main purpose of this module:
       accessing blocking resources from an asynchronous program. In your
       server code, when there is an error condition you should	simply "die"
       or "croak" as in	a synchronous program.

       As an example usage of Callback::Frame, here is how we would handle
       errors thrown from a worker process running the "hash" method in	an
       asychronous client program:

	   use Callback::Frame;

	   frame(code => sub {

	     $client->checkout->hash('secret', sub {
	       my ($checkout, $crypted)	= @_;
	       say "Hashed password is $crypted";

	   }, catch => sub {

	     my	$back_trace = shift;
	     say "Error	is: $@";
	     say "Full back-trace: $back_trace";

	   })->(); ## <-- frame	is created and then immediately	executed

       Of course if "hash" is something	like a bcrypt hash function it is
       unlikely	to raise an exception so maybe that's a	bad example. On	the
       other hand, maybe it's a	really good example: In	addition to errors
       that occur while	running	your callbacks,	AnyEvent::Task uses
       Callback::Frame to throw	errors if the worker process times out,	so if
       the bcrypt "cost" is really cranked up it might hit the default 30
       second time limit.

   Rationale for Callback::Frame
       Why not just call the callback but set $@ and indicate an error has
       occurred? This is the approach taken with AnyEvent::DBI for example. I
       believe the Callback::Frame interface is	superior to this method. In a
       synchronous program, exceptions are out-of-band messages	and code
       doesn't need to locally handle them. It can let them "bubble up"	the
       stack, perhaps to a top-level error handler. Invoking the callback when
       an error	occurs forces exceptions to be handled in-band.

       How about having	AnyEvent::Task expose an error callback? This is the
       approach	taken by AnyEvent::Handle for example. I believe
       Callback::Frame is superior to this method also.	Although separate
       callbacks are (sort of) out-of-band, you	still have to write error
       handler callbacks and do	something relevant locally instead of allowing
       the exception to	bubble up to an	error handler.

       In servers, Callback::Frame helps you maintain the "dynamic state"
       (error handlers and dynamic variables) installed	for a single
       connection. In other words, any errors that occur while servicing that
       connection will be able to be caught by an error	handler	specific to
       that connection.	This lets you send an error response to	the client and
       collect associated log messages in a Log::Defer object specific to that

       Callback::Frame provides	an error handler stack so you can have a top-
       level handler as	well as	nested handlers	(similar to nested "eval"s).
       This is useful when you wish to have a top-level	"bail-out" error
       handler and also	nested error handlers that know	how to retry or
       recover from an error in	an async sub-operation.

       Callback::Frame is designed to be easily	used with callback-based
       libraries that don't know about Callback::Frame.	"fub" is a shortcut
       for "frame" with	just the "code"	argument. Instead of passing "sub {
       ... }" into libraries you can pass in "fub { ...	}". When invoked, this
       wrapped callback	will first re-establish	any error handlers that	you
       installed with "frame" and then run your	provided code. Libraries that
       force in-band error signalling can be handled with callbacks such as
       "fub { die $@ if	$@; ...	}". Separate error callbacks should simply be
       "fub { die "failed becase ..." }".

       It's important that all callbacks be created with "fub" (or "frame")
       even if you don't expect	them to	fail so	that the dynamic context is
       preserved for nested callbacks that may.	An exception is	the callbacks
       provided	to AnyEvent::Task checkouts: These are automatically wrapped
       in frames for you (although explicitly passing in fubs is fine too).

       The Callback::Frame documentation explains how this works in much more

   Reforking of	workers	after errors
       If a worker throws an error, the	client receives	the error but the
       worker process stays running. As	long as	the client has a reference to
       the checkout (and as long as the	exception wasn't "fatal" -- see
       below), it can still be used to communicate with	that worker so you can
       access error states, rollback transactions, or do any sort of required

       However,	once the checkout object is destroyed, by default the worker
       will be shutdown	instead	of returning to	the client's worker pool as in
       the normal case where no	errors were thrown. This is a "safe-by-
       default"	behaviour that may help	in the event that an exception thrown
       by a worker leaves the worker process in	a broken/inconsistent state
       for some	reason (for example a DBI connection died). This can be
       overridden by setting the "dont_refork_after_error" option to 1 in the
       client constructor. This	will only matter if errors are being thrown
       frequently and your "setup" routines take a long	time (aside from the
       setup routine, creating new workers is quite fast since the server has
       already compiled	all the	application code and just has to fork).

       There are cases where workers will never	be returned to the worker
       pool: workers that have thrown fatal errors such	as loss	of worker
       connection or hung worker timeout errors. These errors are stored in
       the checkout and	for as long as the checkout exists any methods on the
       checkout	will immediately return	the stored fatal error.	Your client
       process can invoke this behaviour manually by calling the
       "throw_fatal_error" method on a checkout	object to cancel an operation
       and force-terminate a worker.

       Another reason that a worker might not be returned to the worker	pool
       is if it	has been checked out "max_checkouts" times. If "max_checkouts"
       is specified as an argument to the Client constructor, then workers
       will be destroyed and reforked after being checked out this number of
       times. When not specified, workers are never re-forked for this reason.
       This parameter is useful	for coping with	libraries that leak memory or
       otherwise become	slower/more resource-hungry over time.

       Why a custom protocol, client, and server? Can't	we just	use something
       like HTTP?

       It depends.

       AnyEvent::Task clients send discrete messages and receive ordered
       replies from workers, much like HTTP. The AnyEvent::Task	protocol can
       be extended in a	backwards-compatible manner like HTTP. AnyEvent::Task
       communication can be pipelined and possibly in the future even
       compressed like HTTP.

       The current AnyEvent::Task server obeys a very specific implementation
       policy: It is like a CGI	server in that each process it forks is
       guaranteed to be	handling only one connection at	once so	it can perform
       blocking	operations without worrying about holding up other

       But since a single process can handle many requests in a	row without
       exiting,	they are more like persistent FastCGI processes. The
       difference however is that while	a client holds a checkout it is
       guaranteed an exclusive lock on that process (useful for	supporting DB
       transactions for	example). With a FastCGI server	it is assumed that
       requests	are stateless so you can't necessarily be sure you'll get the
       same process for	two consecutive	requests. In fact, if an error is
       thrown in the FastCGI handler you may never get the same	process	back
       again, preventing you from being	able to	recover	from the error,	retry,
       or at least collect process state for logging reasons.

       The fundamental difference between the AnyEvent::Task protocol and HTTP
       is that in AnyEvent::Task the client is the dominant protocol
       orchestrator whereas in HTTP it is the server.

       In AnyEvent::Task, the client manages the worker	pool and the client
       decides if/when worker processes	should terminate. In the normal	case,
       a client	will just return the worker to its worker pool.	A worker is
       supposed	to accept commands for as long as possible until the client
       dismisses it.

       The client decides the timeout for each checkout	and different clients
       can have	different timeouts while connecting to the same	server.

       Client processes	can be started and checkouts can be obtained before
       the server is even started. The client will continue trying to connect
       to the server to	obtain worker processes	until either the server	starts
       or the checkout's timeout period	lapses.	As well	as freeing you from
       having to start your services in	the "right" order, this	also means
       servers can be restarted	without	throwing any errors (aka "zero-
       downtime	restarts").

       The client even decides how many	minimum	workers	should be in the pool
       upon start-up and how many maximum workers to acquire before checkout
       creation	requests are queued. The server	is really just a dumb fork-on-
       demand server and most of the sophistication is in the asynchronous

       The AnyEvent::Task github repo <

       In order	to handle exceptions in	a meaningful way with this module, you
       must use	Callback::Frame. In order to maintain seamless request logging
       across clients and workers, you should use Log::Defer.

       There are many modules on CPAN similar to AnyEvent::Task.

       This module is designed to be used in a non-blocking, process-based
       unix program. Depending on your exact requirements you might find
       something else useful: Parallel::ForkManager, Thread::Pool, or an HTTP
       server of some kind.

       If you're into AnyEvent,	AnyEvent::DBI and AnyEvent::Worker (based on
       AnyEvent::DBI), AnyEvent::ForkObject, and AnyEvent::Fork::RPC send and
       receive commands	from worker processes similar to this module.
       AnyEvent::Worker::Pool also has an implementation of a worker pool.
       AnyEvent::Gearman can interface with Gearman services.

       If you're into POE there	is POE::Component::Pool::DBI,
       POEx::WorkerPool, POE::Component::ResourcePool,
       POE::Component::PreforkDispatch,	Cantella::Worker.

       Although	this module's interface	is now stable and has been in
       production use for some time, there are few remaining TODO items	(see
       the bottom of

       Doug Hoyte, "<>"

       Copyright 2012-2014 Doug	Hoyte.

       This module is licensed under the same terms as perl itself.

perl v5.32.0			  2014-04-15		     AnyEvent::Task(3)


Want to link to this manual page? Use this URL:

home | help