Skip site navigation (1)Skip section navigation (2)

FreeBSD Manual Pages


home | help
Kafka::Connection(3)  User Contributed Perl Documentation Kafka::Connection(3)

       Kafka::Connection - Object interface to connect to a kafka cluster.

       This documentation refers to "Kafka::Connection"	version	0.8010 .

	   use 5.010;
	   use strict;
	   use warnings;

	   use Scalar::Util qw(
	   use Try::Tiny;

	   # A simple example of Kafka::Connection usage:
	   use Kafka::Connection;

	   # connect to	local cluster with the defaults
	   my $connection;
	   try {
	       $connection = Kafka::Connection->new( host => 'localhost' );
	   } catch {
	       if ( blessed( $_	) && $_->isa( 'Kafka::Exception' ) ) {
		   warn	$_->message, "\n", $_->trace->as_string, "\n";
	       } else {
		   die $_;

	   # Closes the	connection and cleans up
	   undef $connection;

       The main	features of the	"Kafka::Connection" class are:

       o  Provides API for communication with Kafka 0.8	cluster.

       o  Performs requests encoding and responses decoding, provides
	  automatic selection or promotion of a	leader server from Kafka

       o  Provides information about Kafka cluster.

       The following constants are available for export


       These are non-fatal errors, which when happen causes refreshing of
       meta-data from Kafka followed by	another	attempt	to fetch data.


       Creates "Kafka::Connection" object for interaction with Kafka cluster.
       Returns created "Kafka::Connection" object.

       "new()" takes arguments in key-value pairs. The following arguments are
       currently recognized:

       "host =>	$host"
	  $host	is any Apache Kafka cluster host to connect to.	It can be a
	  hostname or the IP-address in	the "xx.xx.xx.xx" form.

	  Optional. Either "host" or "broker_list" must	be supplied.

       "port =>	$port"
	  Optional, default = $KAFKA_SERVER_PORT.

	  $port	is the attribute denoting the port number of the service we
	  want to access (Apache Kafka service). $port should be an integer

	  $KAFKA_SERVER_PORT is	the default Apache Kafka server	port constant
	  (9092) that can be imported from the Kafka module.

       "broker_list => $broker_list"
	  Optional, $broker_list is a reference	to array of the	host:port
	  strings, defining the	list of	Kafka servers. This list will be used
	  to locate the	new leader if the server specified via "host =>	$host"
	  and "port => $port" arguments	becomes	unavailable. Either "host" or
	  "broker_list"	must be	supplied.

       "timeout	=> $timeout"
	  Optional, default = $Kafka::REQUEST_TIMEOUT.

	  $timeout specifies how long we wait for the remote server to
	  respond.  $timeout is	in seconds, could be a positive	integer	or a
	  floating-point number	not bigger than	int32 positive integer.

	  Special behavior when	"timeout" is set to "undef":

       o  Alarms are not used internally (namely when performing

       o  Default $REQUEST_TIMEOUT is used for the rest	of IO operations.

       "CorrelationId => $correlation_id"
	  Optional, default = "undef" .

	  "Correlation"	is a user-supplied integer. It will be passed back
	  with the response by the server, unmodified. The $correlation_id
	  should be an integer number.

	  An exception is thrown if "CorrelationId" in response	does not match
	  the one supplied in request.

	  If "CorrelationId" is	not provided, it is set	to a random negative

       "SEND_MAX_ATTEMPTS => $attempts"
	  Optional, int32 signed integer, default = $Kafka::SEND_MAX_ATTEMPTS

	  In some circumstances	(leader	is temporarily unavailable, outdated
	  metadata, etc) we may	fail to	send a message.	 This property
	  specifies the	maximum	number of attempts to send a message.  The
	  $attempts should be an integer number.

       "RECEIVE_MAX_ATTEMPTS =>	$attempts"
	  Optional, int32 signed integer, default =

	  In some circumstances	(temporarily network issues, server high load,
	  socket error,	etc) we	may fail to receive a response.	 This property
	  specifies the	maximum	number of attempts to receive a	message.  The
	  $attempts should be an integer number.

       "RETRY_BACKOFF => $backoff"
	  Optional, default = $Kafka::RETRY_BACKOFF .

	  Since	leader election	takes a	bit of time, this property specifies
	  the amount of	time, in milliseconds, that the	producer waits before
	  refreshing the metadata.  The	$backoff should	be an integer number.

       "AutoCreateTopicsEnable => $mode"
	  Optional, default value is 0 (false).

	  Kafka	BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2):
	  AutoCreateTopicsEnable controls how this module handles the first
	  access to non-existent topic when "auto.create.topics.enable"	in
	  server configuration is "true".  If AutoCreateTopicsEnable is	false
	  (default), the first access to non-existent topic produces an
	  exception; however, the topic	is created and next attempts to	access
	  it will succeed.

	  If AutoCreateTopicsEnable is true, this module waits (according to
	  the "SEND_MAX_ATTEMPTS" and "RETRY_BACKOFF" properties) until	the
	  topic	is created, to avoid errors on the first access	to non-
	  existent topic.

	  If "auto.create.topics.enable" in server configuration is "false",
	  this setting has no effect.

       "MaxLoggedErrors	=> $number"
	  Optional, default value is 100.

	  Defines maximum number of last non-fatal errors that we keep in log.
	  Use method "nonfatal_errors" to access those errors.

       The following methods are defined for the "Kafka::Producer" class:


       Returns the list	of known Kafka servers (in host:port format).

       "get_metadata( $topic )"

       If $topic is present, it	must be	a non-false string of non-zero length.

       If  $topic is absent, this method returns metadata for all topics.

       Updates kafka cluster's metadata	description and	returns	the hash
       reference to metadata, which can	be schematically described as:

	       TopicName => {
		   Partition   => {
		       'Leader'	   => ...,
		       'Replicas'  => [
		       'Isr'	   => [

       Consult Kafka "Wire protocol" documentation for more details about
       metadata	structure.

       "is_server_known( $server )"

       Returns true, if	$server	(host:port) is known in	cluster.

       "is_server_alive( $server )"

       Returns true, if	known $server (host:port) is accessible.  Checks the
       accessibility of	the server.

       "is_server_connected( $server )"

       Returns true, if	successful connection is established with $server

       "receive_response_to_request( $request, $compression_codec )"

	  $request is a	reference to the hash representing the structure of
	  the request.

	  This method encodes $request,	passes it to the leader	of cluster,
	  receives reply, decodes and returns it in a form of hash reference.


       o  This method should be	considered private and should not be called by
	  an end user.

       o  In order to achieve better performance, this method does not perform
	  arguments validation.


	  $compression_codec sets the required type of $messages compression,
	  if the compression is	desirable.


       "exists_topic_partition(	$topic,	$partition )"

       Returns true if the metadata contains information about specified
       combination of topic and	partition.  Otherwise returns false.

       "exists_topic_partition()" takes	the following arguments:

	  The $topic must be a normal non-false	string of non-zero length.


       "close_connection( $server )"

       Closes connection with $server (defined as host:port).


       Closes connection with all known	Kafka servers.


       Returns a reference to a	hash.

       Each hash key is	the identifier of the server (host:port), and the
       value is	the last communication error with that server.

       An empty	hash is	returned if there were no communication	errors.


       Returns a reference to an array of the last non-fatal errors.

       Maximum number of entries is set	using "MaxLoggedErrors"	parameter of

       A reference to the empty	array is returned if there were	no non-fatal
       errors or parameter "MaxLoggedErrors" is	set to 0.


       Clears an array of the last non-fatal errors.

       A reference to the empty	array is returned because there	are no non-
       fatal errors now.

       When error is detected, an exception, represented by object of
       Kafka::Exception::Connection class, is thrown (see Kafka::Exceptions).

       code and	a more descriptive message provide information about
       exception. Consult documentation	of the Kafka::Exceptions for the list
       of all available	methods.

       Here is the list	of possible error messages that	"Kafka::Connection"
       may produce:

       "Invalid	argument"
	  Invalid argument was provided	to "new" constructor or	to other

       "Can't send"
	  Request cannot be sent to Kafka.

       "Can't recv"
	  Response cannot be received from Kafka.

       "Can't bind"
	  A successful TCP connection can't be established on given host and

       "Can't get metadata"
	  Error	detected during	parsing	of response from Kafka.

       "Leader not found"
	  Failed to locate leader of Kafka cluster.

       "Mismatch CorrelationId"
	  Mismatch of "CorrelationId" of request and response.

       "There are no known brokers"
	  Failed to locate cluster broker.

       "Can't get metadata"
	  Received meta	data is	incorrect or missing.

   Debug mode
       Debug output can	be enabled by passing desired level via	environment
       variable	using one of the following ways:

       "PERL_KAFKA_DEBUG=1"		- debug	is enabled for the whole Kafka

       "PERL_KAFKA_DEBUG=Connection:1"	- enable debug for "Kafka::Connection"

       "Kafka::Connection" prints to "STDERR" information about	non-fatal
       errors, re-connection attempts and such when debug level	is set to 1 or

       The basic operation of the Kafka	package	modules:

       Kafka - constants and messages used by the Kafka	package	modules.

       Kafka::Connection - interface to	connect	to a Kafka cluster.

       Kafka::Producer - interface for producing client.

       Kafka::Consumer - interface for consuming client.

       Kafka::Message -	interface to access Kafka message properties.

       Kafka::Int64 - functions	to work	with 64	bit elements of	the protocol
       on 32 bit systems.

       Kafka::Protocol - functions to process messages in the Apache Kafka's

       Kafka::IO - low-level interface for communication with Kafka server.

       Kafka::Exceptions - module designated to	handle Kafka exceptions.

       Kafka::Internals	- internal constants and functions used	by several
       package modules.

       A wealth	of detail about	the Apache Kafka and the Kafka Protocol:

       Main page at <>

       Kafka Protocol at

       Kafka package is	hosted on GitHub:

       Sergey Gladkov, <>

       Alexander Solovey

       Jeremy Jordan

       Sergiy Zuban

       Vlad Marchenko

       Copyright (C) 2012-2013 by TrackingSoft LLC.

       This package is free software; you can redistribute it and/or modify it
       under the same terms as Perl itself. See	perlartistic at

       This program is distributed in the hope that it will be useful, but
       WITHOUT ANY WARRANTY; without even the implied warranty of

perl v5.32.0			  2015-02-06		  Kafka::Connection(3)


Want to link to this manual page? Use this URL:

home | help