Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages

  
 
  

home | help
MCE::Hobo(3)	      User Contributed Perl Documentation	  MCE::Hobo(3)

NAME
       MCE::Hobo - A threads-like parallelization module

VERSION
       This document describes MCE::Hobo version 1.873

SYNOPSIS
	use MCE::Hobo;

	MCE::Hobo->init(
	    max_workers	=> 'auto',   # default undef, unlimited
	    hobo_timeout => 20,	     # default undef, no timeout
	    posix_exit => 1,	     # default undef, CORE::exit
	    void_context => 1,	     # default undef
	    on_start =>	sub {
		my ( $pid, $ident ) = @_;
		...
	    },
	    on_finish => sub {
		my ( $pid, $exit, $ident, $signal, $error, @ret	) = @_;
		...
	    }
	);

	MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();

	sub parallel {
	    my ($arg1) = @_;
	    print "Hello again,	$arg1\n" if defined($arg1);
	    print "Hello again,	$_\n"; # same thing
	}

	MCE::Hobo->create( \&parallel, $_ ) for	1 .. 3;

	my @hobos    = MCE::Hobo->list();
	my @pids     = MCE::Hobo->list_pids();
	my @running  = MCE::Hobo->list_running();
	my @joinable = MCE::Hobo->list_joinable();
	my @count    = MCE::Hobo->pending();

	# Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
	$_->join() for @hobos;	 # (or)
	$_->join() for @joinable;

	# Joining occurs immediately as	hobo processes complete	execution.
	1 while	MCE::Hobo->wait_one();

	my $hobo = mce_async { foreach (@files)	{ ... }	};

	$hobo->join();

	if ( my	$err = $hobo->error() )	{
	    warn "Hobo error: $err\n";
	}

	# Get a	hobo's object
	$hobo =	MCE::Hobo->self();

	# Get a	hobo's ID
	$pid = MCE::Hobo->pid();  # $$
	$pid = $hobo->pid();
	$pid = MCE::Hobo->tid();  # tid	is an alias for	pid
	$pid = $hobo->tid();

	# Test hobo objects
	if ( $hobo1 == $hobo2 )	{
	    ...
	}

	# Give other workers a chance to run
	MCE::Hobo->yield();
	MCE::Hobo->yield(0.05);

	# Return context, wantarray aware
	my ($value1, $value2) =	$hobo->join();
	my $value = $hobo->join();

	# Check	hobo's state
	if ( $hobo->is_running() ) {
	    sleep 1;
	}
	if ( $hobo->is_joinable() ) {
	    $hobo->join();
	}

	# Send a signal	to a hobo
	$hobo->kill('SIGUSR1');

	# Exit a hobo
	MCE::Hobo->exit(0);
	MCE::Hobo->exit(0, @ret);  # MCE::Hobo 1.827+

DESCRIPTION
       A hobo is a migratory worker inside the machine that carries the
       asynchronous gene. Hobo processes are equipped with "threads"-like
       capability for running code asynchronously. Unlike threads, each	hobo
       is a unique process to the underlying OS. The IPC is managed by
       "MCE::Shared", which runs on all	the major platforms including Cygwin
       and Strawberry Perl.

       An exception was	made on	the Windows platform to	spawn threads versus
       children	in "MCE::Hobo" 1.807 through 1.816. For	consistency, the 1.817
       release reverts back to spawning	children on all	supported platforms.

       "MCE::Hobo" may be used as a standalone or together with	"MCE"
       including running alongside "threads".

	use MCE::Hobo;
	use MCE::Shared;

	# synopsis: head -20 file.txt |	perl script.pl

	my $ifh	= MCE::Shared->handle( "<", \*STDIN  );	 # shared
	my $ofh	= MCE::Shared->handle( ">", \*STDOUT );
	my $ary	= MCE::Shared->array();

	sub parallel_task {
	    my ( $id ) = @_;
	    while ( <$ifh> ) {
		printf {$ofh} "[ %4d ] %s", $.,	$_;
	      #	$ary->[	$. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
		$ary->set( $. -	1, "[ ID $id ] read line $.\n" );  # faster via	OO
	    }
	}

	my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
	my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
	my $hobo3 = MCE::Hobo->new( sub	{ parallel_task(3) } );

	$_->join for MCE::Hobo->list();	 # ditto: MCE::Hobo->wait_all();

	# search array (total one round-trip via IPC)
	my @vals = $ary->vals( "val =~ / ID 2 /" );

	print {*STDERR}	join("", @vals);

API DOCUMENTATION
       $hobo = MCE::Hobo->create( FUNCTION, ARGS )
       $hobo = MCE::Hobo->new( FUNCTION, ARGS )
	  This will create a new hobo process that will	begin execution	with
	  function as the entry	point, and optionally ARGS for list of
	  parameters. It will return the corresponding MCE::Hobo object, or
	  undef	if hobo	creation failed.

	  FUNCTION may either be the name of a function, an anonymous
	  subroutine, or a code	ref.

	   my $hobo = MCE::Hobo->create( "func_name", ... );
	       # or
	   my $hobo = MCE::Hobo->create( sub { ... }, ... );
	       # or
	   my $hobo = MCE::Hobo->create( \&func, ... );

       $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
       $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS	)
	  Options, excluding "ident", may be specified globally	via the	"init"
	  function.  Otherwise,	"ident", "hobo_timeout", "posix_exit", and
	  "void_context" may be	set uniquely.

	  The "ident" option, available	since 1.827, is	used by	callback
	  functions "on_start" and "on_finish" for identifying the started and
	  finished hobo	process	respectively.

	   my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
	       ...
	   } );

	   $hobo1->join;

	   my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
	       sleep 1 for ( 1 .. 9 );
	   } );

	   $hobo2->join;

	   if (	$hobo2->error()	eq "Hobo timed out\n" )	{
	       ...
	   }

	  The "new()" method is	an alias for "create()".

       mce_async { BLOCK } ARGS;
       mce_async { BLOCK };
	  "mce_async" runs the block asynchronously similarly to
	  "MCE::Hobo->create()".  It returns the hobo object, or undef if hobo
	  creation failed.

	   my $hobo = mce_async	{ foreach (@files) { ... } };

	   $hobo->join();

	   if (	my $err	= $hobo->error() ) {
	       warn("Hobo error: $err\n");
	   }

       $hobo->join()
	  This will wait for the corresponding hobo process to complete	its
	  execution.  In non-voided context, "join()" will return the value(s)
	  of the entry point function.

	  The context (void, scalar or list) for the return value(s) for
	  "join" is determined at the time of joining and mostly "wantarray"
	  aware.

	   my $hobo1 = MCE::Hobo->create( sub {
	       my @res = qw(foo	bar baz);
	       return (@res);
	   });

	   my @res1 = $hobo1->join();  # ( foo,	bar, baz )
	   my $res1 = $hobo1->join();  #   baz

	   my $hobo2 = MCE::Hobo->create( sub {
	       return 'foo';
	   });

	   my @res2 = $hobo2->join();  # ( foo )
	   my $res2 = $hobo2->join();  #   foo

       $hobo1->equal( $hobo2 )
	  Tests	if two hobo objects are	the same hobo or not. Hobo comparison
	  is based on process IDs. This	is overloaded to the more natural
	  forms.

	   if (	$hobo1 == $hobo2 ) {
	       print("Hobo objects are the same\n");
	   }
	   # or
	   if (	$hobo1 != $hobo2 ) {
	       print("Hobo objects differ\n");
	   }

       $hobo->error()
	  Hobo processes are executed in an "eval" context. This method	will
	  return "undef" if the	hobo terminates	normally. Otherwise, it
	  returns the value of $@ associated with the hobo's execution status
	  in its "eval"	context.

       $hobo->exit()
	  This sends 'SIGQUIT' to the hobo process, notifying the hobo to
	  exit.	 It returns the	hobo object to allow for method	chaining. It
	  is important to join later if	not immediately	to not leave a zombie
	  or defunct process.

	   $hobo->exit()->join();
	   ...

	   $hobo->join();  # later

       MCE::Hobo->exit(	0 )
       MCE::Hobo->exit(	0, @ret	)
	  A hobo can exit at any time by calling "MCE::Hobo->exit()".
	  Otherwise, the behavior is the same as "exit(status)"	when called
	  from the main	process. Current since 1.827, the hobo process may
	  optionally return data, to be	sent via IPC.

       MCE::Hobo->finish()
	  This class method is called automatically by "END", but may be
	  called explicitly. An	error is emitted via croak if there are	active
	  hobo processes not yet joined.

	   MCE::Hobo->create( 'task1', $_ ) for	1 .. 4;
	   $_->join for	MCE::Hobo->list();

	   MCE::Hobo->create( 'task2', $_ ) for	1 .. 4;
	   $_->join for	MCE::Hobo->list();

	   MCE::Hobo->create( 'task3', $_ ) for	1 .. 4;
	   $_->join for	MCE::Hobo->list();

	   MCE::Hobo->finish();

       MCE::Hobo->init(	options	)
	  The init function accepts a list of MCE::Hobo	options.

	   MCE::Hobo->init(
	       max_workers => 'auto',	# default undef, unlimited
	       hobo_timeout => 20,	# default undef, no timeout
	       posix_exit => 1,		# default undef, CORE::exit
	       void_context => 1,	# default undef
	       on_start	=> sub {
		   my (	$pid, $ident ) = @_;
		   ...
	       },
	       on_finish => sub	{
		   my (	$pid, $exit, $ident, $signal, $error, @ret ) = @_;
		   ...
	       }
	   );

	   # Identification given as an	option or the 1st argument.
	   # Current API available since 1.827.

	   for my $key ( 'aa' .. 'zz' )	{
	       MCE::Hobo->create( { ident => $key }, sub { ... } );
	       MCE::Hobo->create( $key,	sub { ... } );
	   }

	   MCE::Hobo->wait_all;

	  Set "max_workers" if you want	to limit the number of workers by
	  waiting automatically	for an available slot. Specify "auto" to
	  obtain the number of logical cores via "MCE::Util::get_ncpu()".

	  Set "hobo_timeout", in number	of seconds, if you want	the hobo
	  process to terminate after some time.	The default is 0 for no
	  timeout.

	  Set "posix_exit" to avoid all	END and	destructor processing.
	  Constructing MCE::Hobo inside	a thread implies 1 or if present CGI,
	  FCGI,	Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
	  Mojo::IOLoop,	STFL, Tk, Wx, or Win32::GUI.

	  Set "void_context" to	create the hobo	process	in void	context	for
	  the return value. Otherwise, the return context is wantarray-aware
	  for "join()" and "result()" and determined when retrieving the data.

	  The callback options "on_start" and "on_finish" are called in	the
	  parent process after starting	the worker and later when terminated.
	  The arguments	for the	subroutines were inspired by
	  Parallel::ForkManager.

	  The parameters for "on_start"	are the	following:

	   - pid of the	hobo process
	   - identification (ident option or 1st arg to	create)

	  The parameters for "on_finish" are the following:

	   - pid of the	hobo process
	   - program exit code
	   - identification (ident option or 1st arg to	create)
	   - exit signal id
	   - error message from	eval inside MCE::Hobo
	   - returned data

       $hobo->is_running()
	  Returns true if a hobo is still running.

       $hobo->is_joinable()
	  Returns true if the hobo has finished	running	and not	yet joined.

       $hobo->kill( 'SIG...' )
	  Sends	the specified signal to	the hobo. Returns the hobo object to
	  allow	for method chaining. As	with "exit", it	is important to	join
	  eventually if	not immediately	to not leave a zombie or defunct
	  process.

	   $hobo->kill('SIG...')->join();

	  The following	is a parallel demonstration comparing "MCE::Shared"
	  against "Redis" and "Redis::Fast" on a Fedora	23 VM. Joining begins
	  after	all workers have been notified to quit.

	   use Time::HiRes qw(time);

	   use Redis;
	   use Redis::Fast;

	   use MCE::Hobo;
	   use MCE::Shared;

	   my $redis = Redis->new();
	   my $rfast = Redis::Fast->new();
	   my $array = MCE::Shared->array();

	   sub parallel_redis {
	       my ($_redis) = @_;
	       my ($count, $quit, $len)	= (0, 0);

	       # instead, use a	flag to	exit loop
	       $SIG{'QUIT'} = sub { $quit = 1 };

	       while ()	{
		   $len	= $_redis->rpush('list', $count++);
		   last	if $quit;
	       }

	       $count;
	   }

	   sub parallel_array {
	       my ($count, $quit, $len)	= (0, 0);

	       # do not	exit from inside handler
	       $SIG{'QUIT'} = sub { $quit = 1 };

	       while ()	{
		   $len	= $array->push($count++);
		   last	if $quit;
	       }

	       $count;
	   }

	   sub benchmark_this {
	       my ($desc, $num_procs, $timeout,	$code, @args) =	@_;
	       my ($start, $total) = (time(), 0);

	       MCE::Hobo->new($code, @args) for	1..$num_procs;
	       sleep $timeout;

	       # joining is not	immediate; ok
	       $_->kill('QUIT')	for MCE::Hobo->list();

	       # joining later;	ok
	       $total += $_->join() for	MCE::Hobo->list();

	       printf "$desc <>	duration: %0.03f secs, count: $total\n",
		   time() - $start;

	       sleep 0.2;
	   }

	   benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
	   benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
	   benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);

       MCE::Hobo->list()
	  Returns a list of all	hobo objects not yet joined.

	   @hobos = MCE::Hobo->list();

       MCE::Hobo->list_pids()
	  Returns a list of all	hobo pids not yet joined (available since
	  1.849).

	   @pids = MCE::Hobo->list_pids();

	   $SIG{INT} = $SIG{HUP} = $SIG{TERM} =	sub {
	       # Signal	workers	and the	shared manager all at once
	       CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
	       exec('reset');
	   };

       MCE::Hobo->list_running()
	  Returns a list of all	hobo objects that are still running.

	   @hobos = MCE::Hobo->list_running();

       MCE::Hobo->list_joinable()
	  Returns a list of all	hobo objects that have completed running.
	  Thus,	ready to be joined without blocking.

	   @hobos = MCE::Hobo->list_joinable();

       MCE::Hobo->max_workers([	N ])
	  Getter and setter for	max_workers. Specify a number or 'auto'	to
	  acquire the total number of cores via	MCE::Util::get_ncpu. Specify a
	  false	value to set back to no	limit.

	  API available	since 1.835.

       MCE::Hobo->pending()
	  Returns a count of all hobo objects not yet joined.

	   $count = MCE::Hobo->pending();

       $hobo->result()
	  Returns the result obtained by "join", "wait_one", or	"wait_all". If
	  the process has not yet exited, waits	for the	corresponding hobo to
	  complete its execution.

	   use MCE::Hobo;
	   use Time::HiRes qw(sleep);

	   sub task {
	       my ($id)	= @_;
	       sleep $id * 0.333;
	       return $id;
	   }

	   MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );

	   # 1 while MCE::Hobo->wait_one();

	   while ( my $hobo = MCE::Hobo->wait_one() ) {
	       my $err = $hobo->error()	|| 'no error';
	       my $res = $hobo->result();
	       my $pid = $hobo->pid();

	       print "[$pid] $err : $res\n";
	   }

	  Like "join" described	above, the context (void, scalar or list) for
	  the return value(s) is determined at the time	"result" is called and
	  mostly "wantarray" aware.

	   my $hobo1 = MCE::Hobo->create( sub {
	       my @res = qw(foo	bar baz);
	       return (@res);
	   });

	   my @res1 = $hobo1->result();	 # ( foo, bar, baz )
	   my $res1 = $hobo1->result();	 #   baz

	   my $hobo2 = MCE::Hobo->create( sub {
	       return 'foo';
	   });

	   my @res2 = $hobo2->result();	 # ( foo )
	   my $res2 = $hobo2->result();	 #   foo

       MCE::Hobo->self()
	  Class	method that allows a hobo to obtain it's own MCE::Hobo object.

       $hobo->pid()
       $hobo->tid()
	  Returns the ID of the	hobo.

	   pid:	$$  process id
	   tid:	$$  alias for pid

       MCE::Hobo->pid()
       MCE::Hobo->tid()
	  Class	methods	that allows a hobo to obtain its own ID.

	   pid:	$$  process id
	   tid:	$$  alias for pid

       MCE::Hobo->wait_one()
       MCE::Hobo->waitone()
       MCE::Hobo->wait_all()
       MCE::Hobo->waitall()
	  Meaningful for the manager process only, waits for one or all	hobo
	  processes to complete	execution. Afterwards, returns the
	  corresponding	hobo objects.  If a hobo doesn't exist,	returns	the
	  "undef" value	or an empty list for "wait_one"	and "wait_all"
	  respectively.

	  The "waitone"	and "waitall" methods are aliases since	1.827 for
	  backwards compatibility.

	   use MCE::Hobo;
	   use Time::HiRes qw(sleep);

	   sub task {
	       my $id =	shift;
	       sleep $id * 0.333;
	       return $id;
	   }

	   MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );

	   # join, traditional use case
	   $_->join() for MCE::Hobo->list();

	   # wait_one, simplistic use case
	   1 while MCE::Hobo->wait_one();

	   # wait_one
	   while ( my $hobo = MCE::Hobo->wait_one() ) {
	       my $err = $hobo->error()	|| 'no error';
	       my $res = $hobo->result();
	       my $pid = $hobo->pid();

	       print "[$pid] $err : $res\n";
	   }

	   # wait_all
	   my @hobos = MCE::Hobo->wait_all();

	   for ( @hobos	) {
	       my $err = $_->error() ||	'no error';
	       my $res = $_->result();
	       my $pid = $_->pid();

	       print "[$pid] $err : $res\n";
	   }

       MCE::Hobo->yield( [ floating_seconds ] )
	  Prior	API till 1.826.

	  Let this hobo	yield CPU time to other	workers. By default, the class
	  method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
	  including Cygwin.

	   MCE::Hobo->yield();
	   MCE::Hobo->yield(0.05);

	   # total run time: 0.25 seconds, sleep occuring in parallel

	   MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 ..	4;
	   MCE::Hobo->wait_all();

	  Current API available	since 1.827.

	  Give other workers a chance to run, optionally for given time. Yield
	  behaves similarly to MCE's interval option. It throttles workers
	  from running too fast.  A demonstration is provided in the next
	  section for fetching URLs in parallel.

	  The default "floating_seconds" is 0.008 and 0.015 on UNIX and
	  Windows, respectively. Pass 0	if simply wanting to give other
	  workers a chance to run.

	   # total run time: 1.00 second

	   MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 ..	4;
	   MCE::Hobo->wait_all();

THREADS-like DETACH CAPABILITY
       Threads-like detach capability was added	starting with the 1.867
       release.

       A threads example is shown first	followed by the	MCE::Hobo example. All
       one needs to do is set the CHLD signal handler to IGNORE.
       Unfortunately, this works on UNIX platforms only. The hobo process
       restores	the CHLD handler to default, so	is able	to deeply spin workers
       and reap	if desired.

	use threads;

	for ( 1	.. 8 ) {
	    async {
		# do something
	    }->detach;
	}

	use MCE::Hobo;

	# Have the OS reap workers automatically when exiting.
	# The on_finish	option is ignored if specified (no-op).
	# Ensure not inside a thread on	UNIX platforms.

	$SIG{CHLD} = 'IGNORE';

	for ( 1	.. 8 ) {
	    mce_async {
		# do something
	    };
	}

	# Optionally, wait for any remaining workers before leaving.
	# This is necessary if workers are consuming shared objects,
	# constructed via MCE::Shared.

	MCE::Hobo->wait_all;

       The following is	another	way and	works on Windows.  Here, the on_finish
       handler works as	usual.

	use MCE::Hobo;

	MCE::Hobo->init(
	    on_finish =	sub {
		...
	    },
	);

	for ( 1	.. 8 ) {
	    $_->join for MCE::Hobo->list_joinable;
	    mce_async {
		# do something
	    };
	}

	MCE::Hobo->wait_all;

PARALLEL::FORKMANAGER-like DEMONSTRATION
       MCE::Hobo behaves similarly to threads for the most part. It also
       provides	Parallel::ForkManager-like capabilities. The
       "Parallel::ForkManager" example is shown	first followed by a version
       using "MCE::Hobo".

       Parallel::ForkManager
	   use strict;
	   use warnings;

	   use Parallel::ForkManager;
	   use Time::HiRes 'time';

	   my $start = time;

	   my $pm = Parallel::ForkManager->new(10);
	   $pm->set_waitpid_blocking_sleep(0);

	   $pm->run_on_finish( sub {
	       my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp)	= @_;
	       print "child $pid completed: $ident => ", $resp->[0], "\n";
	   });

	   DATA_LOOP:
	   foreach my $data ( 1..2000 )	{
	       # forks and returns the pid for the child
	       my $pid = $pm->start($data) and next DATA_LOOP;
	       my $ret = [ $data * 2 ];

	       $pm->finish(0, $ret);
	   }

	   $pm->wait_all_children;

	   printf STDERR "duration: %0.03f seconds\n", time - $start;

       MCE::Hobo
	   use strict;
	   use warnings;

	   use MCE::Hobo 1.843;
	   use Time::HiRes 'time';

	   my $start = time;

	   MCE::Hobo->init(
	       max_workers => 10,
	       on_finish   => sub {
		   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) =	@_;
		   print "child	$pid completed:	$ident => ", $resp->[0], "\n";
	       }
	   );

	   foreach my $data ( 1..2000 )	{
	       MCE::Hobo->create( $data, sub {
		   [ $data * 2 ];
	       });
	   }

	   MCE::Hobo->wait_all;

	   printf STDERR "duration: %0.03f seconds\n", time - $start;

       Time to spin 2,000 workers and obtain results (in seconds).
	  Results were obtained	on a Macbook Pro (2.6 GHz ~ 3.6	GHz with Turbo
	  Boost).  Parallel::ForkManager 2.02 uses Moo.	Therefore, I ran again
	  with Moo loaded at the top of	the script.

	   MCE::Hobo uses MCE::Shared to retrieve data during reaping.
	   MCE::Child uses MCE::Channel, no shared-manager.

		    Version  Cygwin   Windows  Linux   macOS  FreeBSD

	   MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
	    MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
		P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s

	   MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
	    MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
		P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used

       Set posix_exit to avoid all END and destructor processing.
	  This is helpful for reducing overhead	when workers exit. Ditto if
	  using	a Perl module not parallel safe. The option is ignored on
	  Windows "$^O eq 'MSWin32'".

	   MCE::Child->init( posix_exit	=> 1, ... );
	    MCE::Hobo->init( posix_exit	=> 1, ... );

		    Version  Cygwin   Windows  Linux   macOS  FreeBSD

	   MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
	    MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded

PARALLEL HTTP GET DEMONSTRATION	USING ANYEVENT
       This demonstration constructs two queues, two handles, starts the
       shared-manager process if needed, and spawns four workers.  For this
       demonstration, am chunking 64 URLs per job. In reality, one may run
       with 200	workers	and chunk 300 URLs on a	24-way box.

	# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	# perl demo.pl		    -- all output
	# perl demo.pl	>/dev/null  -- mngr/hobo output
	# perl demo.pl 2>/dev/null  -- show results only
	#
	# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

	use strict;
	use warnings;

	use AnyEvent;
	use AnyEvent::HTTP;
	use Time::HiRes	qw( time );

	use MCE::Hobo;
	use MCE::Shared;

	# Construct two	queues,	input and return.

	my $que	= MCE::Shared->queue();
	my $ret	= MCE::Shared->queue();

	# Construct shared handles for serializing output from many workers
	# writing simultaneously. This prevents	garbled	output.

	mce_open my $OUT, ">>",	\*STDOUT or die	"open error: $!";
	mce_open my $ERR, ">>",	\*STDERR or die	"open error: $!";

	# Spawn	workers	early for minimum memory consumption.

	MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;

	# Obtain or generate input data	for workers to process.

	my ( $count, @urls ) = ( 0 );

	push @urls, map	{ "http://127.0.0.$_/"	 } 1..254;
	push @urls, map	{ "http://192.168.0.$_/" } 1..254; # 508 URLs total

	while (	@urls )	{
	    my @chunk =	splice(@urls, 0, 64);
	    $que->enqueue( { ID	=> ++$count, INPUT => \@chunk }	);
	}

	# So that workers leave	the loop after consuming the queue.

	$que->end();

	# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	# Loop for the manager process.	The manager may	do other work if
	# need be and periodically check $ret->pending() not shown here.
	#
	# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

	my $start = time;

	printf {$ERR} "Mngr - entering loop\n";

	while (	$count ) {
	    my ( $result, $failed ) = $ret->dequeue( 2 );

	    # Remove ID	from result, so	not treated as a URL item.

	    printf {$ERR} "Mngr	- received job %s\n", delete $result->{ID};

	    # Display the URL and the size captured.

	    foreach my $url ( keys %{ $result }	) {
		printf {$OUT} "%s: %d\n", $url,	length($result->{$url})
		    if $result->{$url};	 # url has content
	    }

	    # Display URLs could not reach.

	    if ( @{ $failed } )	{
		foreach	my $url	( @{ $failed } ) {
		    print {$OUT} "Failed: $url\n";
		}
	    }

	    # Decrement	the count.

	    $count--;
	}

	MCE::Hobo->wait_all();

	printf {$ERR} "Mngr - exiting loop\n\n";
	printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;

	exit;

	# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	# Hobo processes enqueue two items ( $result and $failed ) per each
	# job for the manager process. Likewise, the manager process dequeues
	# two items above. Optionally, hobo processes may include the ID in
	# the result.
	#
	# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

	sub task {
	    my ( $id ) = @_;
	    printf {$ERR} "Hobo	$id entering loop\n";

	    while ( my $job = $que->dequeue() )	{
		my ( $result, $failed )	= ( { ID => $job->{ID} }, [ ] );

		# Walk URLs, provide a hash and	array refs for data.

		printf {$ERR} "Hobo $id	running	 job $job->{ID}\n";
		walk( $job, $result, $failed );

		# Send results to the manager process.

		$ret->enqueue( $result,	$failed	);
	    }

	    printf {$ERR} "Hobo	$id exiting loop\n";
	}

	sub walk {
	    my ( $job, $result,	$failed	) = @_;

	    # Yielding is critical when	running	an event loop in parallel.
	    # Not doing	so means that the app may reach	contention points
	    # with the firewall	and likely impose unnecessary hardship at
	    # the OS level. The	idea here is not to have multiple workers
	    # initiate HTTP requests to	a batch	of URLs	at the same time.
	    # Yielding in 1.827+ behaves similarly like	scatter	to have
	    # the hobo process run solo	for a fraction of time.

	    MCE::Hobo->yield( 0.03 );	# MCE::Hobo 1.827+

	    my $cv = AnyEvent->condvar();

	    # Populate the hash	ref for	the URLs it could reach.
	    # Do not mix AnyEvent timeout with hobo timeout.
	    # Therefore, choose	event timeout when available.

	    foreach my $url ( @{ $job->{INPUT} } ) {
		$cv->begin();
		http_get $url, timeout => 2, sub {
		    my ( $data,	$headers ) = @_;
		    $result->{$url} = $data;
		    $cv->end();
		};
	    }

	    $cv->recv();

	    # Populate the array ref for URLs it could not reach.

	    foreach my $url ( @{ $job->{INPUT} } ) {
		push @{	$failed	}, $url	unless (exists $result->{ $url });
	    }

	    return;
	}

	__END__

	$ perl demo.pl

	Hobo 1 entering	loop
	Hobo 2 entering	loop
	Hobo 3 entering	loop
	Mngr - entering	loop
	Hobo 2 running	job 2
	Hobo 3 running	job 3
	Hobo 1 running	job 1
	Hobo 4 entering	loop
	Hobo 4 running	job 4
	Hobo 2 running	job 5
	Mngr - received	job 2
	Hobo 3 running	job 6
	Mngr - received	job 3
	Hobo 1 running	job 7
	Mngr - received	job 1
	Hobo 4 running	job 8
	Mngr - received	job 4
	http://192.168.0.1/: 3729
	Hobo 2 exiting loop
	Mngr - received	job 5
	Hobo 3 exiting loop
	Mngr - received	job 6
	Hobo 1 exiting loop
	Mngr - received	job 7
	Hobo 4 exiting loop
	Mngr - received	job 8
	Mngr - exiting loop

	Duration: 4.131	seconds

CROSS-PLATFORM TEMPLATE	FOR BINARY EXECUTABLE
       Making an executable is possible	with the PAR::Packer module.  On the
       Windows platform, threads, threads::shared, and exiting via threads are
       necessary for the binary	to exit	successfully.

	# https://metacpan.org/pod/PAR::Packer
	# https://metacpan.org/pod/pp
	#
	#   pp -o demo.exe demo.pl
	#   ./demo.exe

	use strict;
	use warnings;

	use if $^O eq "MSWin32", "threads";
	use if $^O eq "MSWin32", "threads::shared";

	# Include minimum dependencies for MCE::Hobo.
	# Add other modules required by	your application here.

	use Storable ();
	use Time::HiRes	();

	# use IO::FDPass ();  #	optional: for condvar, handle, queue
	# use Sereal ();      #	optional: for faster serialization

	use MCE::Hobo;
	use MCE::Shared;

	# For PAR to work on the Windows platform, one must include manually
	# any shared modules used by the application.

	# use MCE::Shared::Array;    # if using	MCE::Shared->array
	# use MCE::Shared::Cache;    # if using	MCE::Shared->cache
	# use MCE::Shared::Condvar;  # if using	MCE::Shared->condvar
	# use MCE::Shared::Handle;   # if using	MCE::Shared->handle, mce_open
	# use MCE::Shared::Hash;     # if using	MCE::Shared->hash
	# use MCE::Shared::Minidb;   # if using	MCE::Shared->minidb
	# use MCE::Shared::Ordhash;  # if using	MCE::Shared->ordhash
	# use MCE::Shared::Queue;    # if using	MCE::Shared->queue
	# use MCE::Shared::Scalar;   # if using	MCE::Shared->scalar

	# Et cetera. Only load modules needed for your application.

	use MCE::Shared::Sequence;   # if using	MCE::Shared->sequence

	my $seq	= MCE::Shared->sequence( 1, 9 );

	sub task {
	    my ( $id ) = @_;
	    while ( defined ( my $num =	$seq->next() ) ) {
		print "$id: $num\n";
		sleep 1;
	    }
	}

	sub main {
	    MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
	    MCE::Hobo->wait_all();
	}

	# Main must run	inside a thread	on the Windows platform	or workers
	# will fail duing exiting, causing the exe to crash. The reason	is
	# that PAR or a	dependency isn't multi-process safe.

	( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();

	threads->exit(0) if $INC{"threads.pm"};

CREDITS
       The inspiration for "MCE::Hobo" comes from wanting "threads"-like
       behavior	for processes. Both can	run side-by-side including safe-use by
       MCE workers.  Likewise, the documentation resembles "threads".

       The inspiration for "wait_all" and "wait_one" comes from	the
       "Parallel::WorkUnit" module.

SEE ALSO
       o  forks

       o  forks::BerkeleyDB

       o  MCE::Child

       o  Parallel::ForkManager

       o  Parallel::Loops

       o  Parallel::Prefork

       o  Parallel::WorkUnit

       o  Proc::Fork

       o  Thread::Tie

       o  threads

INDEX
       MCE, MCE::Channel, MCE::Shared

AUTHOR
       Mario E.	Roy, <marioeroyA ATA gmailA DOTA com>

perl v5.32.0			  2020-08-01			  MCE::Hobo(3)

NAME | VERSION | SYNOPSIS | DESCRIPTION | API DOCUMENTATION | THREADS-like DETACH CAPABILITY | PARALLEL::FORKMANAGER-like DEMONSTRATION | PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT | CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE | CREDITS | SEE ALSO | INDEX | AUTHOR

Want to link to this manual page? Use this URL:
<https://www.freebsd.org/cgi/man.cgi?query=MCE::Hobo&sektion=3&manpath=FreeBSD+12.2-RELEASE+and+Ports>

home | help