Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sentinel role support #114

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Revision history for Redis

{{$NEXT}}
* add supporte for new()'s "role" parameter: connect to slaves using
Sentinel for discovery

1.979 2015-05-14 14:28:35CEST+0200 Europe/Amsterdam

Expand Down
133 changes: 111 additions & 22 deletions lib/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ sub new {
}

defined $args{$_}
and $self->{$_} = $args{$_} for
and $self->{$_} = $args{$_} for
qw(password on_connect name no_auto_connect_on_new cnx_timeout
write_timeout read_timeout sentinels_cnx_timeout sentinels_write_timeout
sentinels_read_timeout no_sentinels_list_update);

$self->{reconnect} = $args{reconnect} || 0;
$self->{conservative_reconnect} = $args{conservative_reconnect} || 0;
$self->{every} = $args{every} || 1000;
$self->{role} = $args{role};

if (exists $args{sock}) {
$self->{server} = $args{sock};
Expand All @@ -83,6 +84,7 @@ sub new {
};
} elsif ($args{sentinels}) {
$self->{sentinels} = $args{sentinels};
$self->{role} ||= 'master';

ref $self->{sentinels} eq 'ARRAY'
or croak("'sentinels' param must be an ArrayRef");
Expand All @@ -106,7 +108,16 @@ sub new {
? $self->{sentinels_write_timeout} : 1 ),
)
} or next;
my $server_address = $sentinel->get_service_address($self->{service});
my $server_address;
if ($self->{role} eq 'slave') {
my $slaves = $sentinel->get_slaves($self->{service});
$status = "no slaves found for service '$self->{service}'", next unless @$slaves;
my $pick = $slaves->[int(rand(scalar(@$slaves)))];
$server_address = "$pick->{ip}:$pick->{port}";
}
else {
$server_address = $sentinel->get_service_address($self->{service});
}
defined $server_address
or $status ||= "Sentinels don't know this service",
next;
Expand All @@ -128,12 +139,12 @@ sub new {
( sort { $h{$a} <=> $h{$b} } keys %h ), # sorted existing sentinels,
grep { ! $h{$_}; } # list of unknown
map { +{ @$_ }->{name}; } # names of
$sentinel->sentinel( # sentinels
$sentinel->sentinel( # sentinels
sentinels => $self->{service} # for this service
)
];
}

return $self->_maybe_enable_timeouts(
IO::Socket::INET->new(
PeerAddr => $server_address,
Expand Down Expand Up @@ -431,7 +442,7 @@ sub wait_for_messages {
$s->remove($s->handles);
$s->add($sock);

while ($s->can_read($timeout)) {
while ($s->can_read($timeout)) {
my $has_stuff = $self->__try_read_sock($sock);
# If the socket is ready to read but there is nothing to read, ( so
# it's an EOF ), try to reconnect.
Expand All @@ -448,7 +459,7 @@ sub wait_for_messages {
# or undef ( socket became EOF), back to select until timeout
} while ($self->{__buf} || $self->__try_read_sock($sock));
}

});

} catch {
Expand Down Expand Up @@ -623,16 +634,26 @@ sub __build_sock {

sub __close_sock {
my ($self) = @_;

$self->{__buf} = '';
delete $self->{__inside_watch};
delete $self->{__inside_transaction};

return close(delete $self->{sock});
}

sub __on_connection {

my ($self) = @_;

if ($self->{role}) {
my $role = $self->__get_server_role();
if ($role ne $self->{role}) {
## FIXME: how to force the process to retry? If we are in
## reconnect mode, it's easy, just abuse it... if not, then
## maybe we should just reuse it?
}
}

# If we are in PubSub mode we shouldn't perform any command besides
# (p)(un)subscribe
if (! $self->{is_subscriber}) {
Expand All @@ -642,7 +663,7 @@ sub __on_connection {
$n = $n->($self) if ref($n) eq 'CODE';
$self->client_setname($n) if defined $n;
};

defined $self->{current_database}
and $self->select($self->{current_database});
}
Expand All @@ -666,7 +687,20 @@ sub __on_connection {

defined $self->{on_connect}
and $self->{on_connect}->($self);
}


sub __get_server_role {
my ($self) = @_;

my $role;
eval { ($role) = $self->role(); 1 } or do {
my $info = $self->info('replication');
$role = $info->{role};
};
die "Could not determine role" unless $role;

return $role;
}


Expand Down Expand Up @@ -924,10 +958,14 @@ __END__
my $redis = Redis->new(write_timeout => 1.2);

## Connect via a list of Sentinels to a given service
my $redis = Redis->new(sentinels => [ '127.0.0.1:12345' ], service => 'mymaster');
my $redis = Redis->new(sentinels => [ '127.0.0.1:12345' ], service => 'my_cluster');

## Connect to a random slave of a Sentinel monitored service
## it will reconnect to a random different slave on disconnect
my $redis = Redis->new(sentinels => [ '127.0.0.1:12345' ], service => 'my_cluster', role => 'slave');

## Same, but with connection, read and write timeout on the sentinel hosts
my $redis = Redis->new( sentinels => [ '127.0.0.1:12345' ], service => 'mymaster',
my $redis = Redis->new( sentinels => [ '127.0.0.1:12345' ], service => 'my_cluster',
sentinels_cnx_timeout => 0.1,
sentinels_read_timeout => 1,
sentinels_write_timeout => 1,
Expand Down Expand Up @@ -1079,33 +1117,76 @@ So, if you are working with character strings, you should pre-encode or post-dec
sentinels_write_timeout => 1,
);

Creates a L<< Redis >> instance and connects to a Redis server.

The constructor will try to find the server to connect to using multiple methods, in the sequence below. The first found is used.

=over

=item *

the C<< sock >> parameter;

=item *

the C<< sentinels >> parameter;

=item *

the C<< server >> parameter;

=item *

the C<< REDIS_SERVER >> environment variable.

=back

A detailed explanation of each of these parameters and environment
variable is found below.

=head3 C<< server >>

The C<< server >> parameter specifies the Redis server we should connect to,
via TCP. Use the 'IP:PORT' format. If no C<< server >> option is present, we
will attempt to use the C<< REDIS_SERVER >> environment variable. If neither of
those options are present, it defaults to '127.0.0.1:6379'.

Alternatively you can use the C<< sock >> parameter to specify the path of the
UNIX domain socket where the Redis server is listening.
=head3 C<< sock >>

The C<< sock >> parameter specifies the path of the UNIX domain socket
where the Redis server is listening.

=head3 C<< sentinels >> and C<< service >>

The C<< sentinels >> and the C<< service >> parameters specify a list of
sentinels to contact and try to get the address of the servers
supporting the given service name.

Alternatively you can use the C<< sentinels >> parameter and the C<< service >>
parameter to specify a list of sentinels to contact and try to get the address
of the given service name. C<< sentinels >> must be an ArrayRef and C<< service
>> an Str.
The C<< sentinels >> parameter must be an ArrayRef
and C<< service >> an Str.

The C<< REDIS_SERVER >> can be used for UNIX domain sockets too. The following
formats are supported:
By default this will connect you to the master instance of the service,
but you can use the C<< role >> set as "slave" to randomly connect to
one of the slaves. If no slaves are found, the connect call will die.

Please note that this means that you can also die on reconnects.

=over

=item *
=item

/path/to/sock
Tip: you can actually use C<< role >> to make sure you are connected to
the correct type of server, even if you don't use Sentinel.

=item *
=back

unix:/path/to/sock
=head3 C<< REDIS_SERVER ENV >>

The C<< REDIS_SERVER >> environment variable can be used to specify the
address or UNIX domain socket to use. The following formats are
supported:

=over

=item *

Expand All @@ -1115,6 +1196,14 @@ unix:/path/to/sock

tcp:127.0.0.1:11011

=item *

/path/to/sock

=item *

unix:/path/to/sock

=back

=head3 C<< reconnect >>, C<< every >>
Expand Down
19 changes: 19 additions & 0 deletions lib/Redis/Sentinel.pm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ sub get_masters {
map { +{ @$_ }; } @{ shift->sentinel('masters') || [] };
}

sub get_slaves {
my @slaves;

eval {@slaves = map { +{@$_}; } @{ shift->sentinel('slaves', shift) || [] }; 1 } or do {
die $@ unless $@ =~ m/ERR No such master with that name/;
return;
};

return \@slaves;
}

1;

__END__
Expand Down Expand Up @@ -69,4 +80,12 @@ service were found.
Returns a list of HashRefs representing all the master redis instances that
this sentinel monitors.

=head2 get_slaves

Takes the name of a service as parameter.

If the service is not known to the sentinels server, returns undef. If
the service is known, retuns an arrayRef of hashRef's, one for each
slave available on the service.

=cut
57 changes: 46 additions & 11 deletions t/60-sentinel.t
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,41 @@ use Redis::Sentinel;
use lib 't/tlib';
use Test::SpawnRedisServer;

my @ret = redis();
my $redis_port = pop @ret;
my ($c, $redis_addr) = @ret;
END { diag 'shutting down redis'; $c->() if $c }
my @ret_m = redis();
my $redis_m_port = pop @ret_m;
my ($c_m, $redis_m_addr) = @ret_m;
END { diag 'shutting down redis'; $c_m->() if $c_m }

diag "redis address : $redis_addr\n";
diag "redis master address : $redis_m_addr\n";

my @ret2 = sentinel( redis_port => $redis_port );
my @ret_s = redis();
my $redis_s_port = pop @ret_s;
my ($c_s, $redis_s_addr) = @ret_s;
END { diag 'shutting down redis'; $c_s->() if $c_s }

eval { Redis->new(server => $redis_s_addr)->slaveof('127.0.0.1', $redis_m_port); 1 } or do {
plan skip_all => '** FAILED to set slave server as a SLAVEOF master, aborting tests **';
};

diag "redis slave address : $redis_s_addr\n";

diag('Waiting 1 second to make sure the master/slave setup is in place before starting Sentinels');
sleep 1;

my @ret2 = sentinel( redis_port => $redis_m_port );
my $sentinel_port = pop @ret2;
my ($c2, $sentinel_addr) = @ret2;
END { diag 'shutting down sentinel'; $c2->() if $c2 }

my @ret3 = sentinel( redis_port => $redis_port );
my @ret3 = sentinel( redis_port => $redis_m_port );
my $sentinel2_port = pop @ret3;
my ($c3, $sentinel2_addr) = @ret3;
END { diag 'shutting down sentinel2'; $c3->() if $c3 }

diag "sentinel address: $sentinel_addr\n";
diag "sentinel2 address: $sentinel2_addr\n";

diag("wait 3 sec for the sentinels and the master to gossip");
diag("wait 3 secs for the sentinels and the master to gossip");
sleep 3;

{
Expand All @@ -40,22 +54,37 @@ sleep 3;

cmp_deeply($got, superhashof({ name => 'mymaster',
ip => '127.0.0.1',
port => $redis_port,
port => $redis_m_port,
flags => 'master',
'role-reported' => 'master',
'config-epoch' => 0,
'num-slaves' => 0,
'num-slaves' => 1,
'num-other-sentinels' => 1,
quorum => 2,
}),
"sentinel has proper config of its master"
);

$got = $sentinel->get_slaves('mymaster');
cmp_deeply(
$got,
[ superhashof(
{ 'port' => $redis_s_port,
'flags' => "slave",
'master-port' => $redis_m_port,
'role-reported' => "slave",
'name' => "127.0.0.1:$redis_s_port",
}
)
],
"sentinel has proper config of its slaves"
);
}

{
my $sentinel = Redis::Sentinel->new(server => $sentinel_addr);
my $address = $sentinel->get_service_address('mymaster');
is $address, "127.0.0.1:$redis_port", "found service mymaster";
is $address, "127.0.0.1:$redis_m_port", "found service mymaster";
}

{
Expand All @@ -73,4 +102,10 @@ sleep 3;

}

{
# connect to the slave via the sentinel
my $redis = Redis->new(sentinels => [ $sentinel_addr ], service => 'mymaster', role => 'slave');
is($redis->__get_server_role(), 'slave', 'Redis client connect to slave server via Sentinel');
}

done_testing();
Loading