1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
package IPC::PubSub::Cache::Memcached;
use strict;
use warnings;
use base 'IPC::PubSub::Cache';
use Cache::Memcached;
use Time::HiRes ();
sub new {
my $class = shift;
my $namespace = shift || $class;
my $config = shift || $class->default_config($namespace);
my $mem = Cache::Memcached->new($config);
# Force our connection to never timeout on selects
$mem->{select_timeout} = undef;
bless(\$mem, $class);
}
sub disconnect {
my $self = shift;
$$self->disconnect_all;
}
sub default_config {
my ($class, $namespace) = @_;
return {
servers => ['127.0.0.1:11211'],
debug => 0,
namespace => $namespace,
};
}
sub fetch_data {
my $self = shift;
my $key = shift;
return $$self->get("data:$key");
}
sub store_data {
my $self = shift;
my $key = shift;
my $val = shift;
if (defined $val) {
$$self->set("data:$key" => $val);
}
else {
$$self->delete("data:$key");
}
}
sub fetch {
my $self = shift;
values(%{$$self->get_multi(@_)});
}
sub store {
my ($self, $key, $val, $time, $expiry) = @_;
$$self->set($key, [$time, $val], $expiry);
}
sub publisher_indices {
my ($self, $chan) = @_;
$$self->get("pubs:$chan") || {};
}
sub lock {
my ($self, $key) = @_;
for my $i (1..100) {
return 1 if $$self->add("lock:$key" => $$);
Time::HiRes::usleep(rand(250000)+250000);
}
return 0;
}
sub unlock {
my ($self, $chan) = @_;
return 1 if $$self->delete("lock:$chan");
return 0;
}
sub add_publisher {
my ($self, $chan, $pub) = @_;
my $key = "pubs:$chan";
$self->lock($key);
my $pubs = $$self->get($key) || {};
$pubs->{$pub} = 0;
$$self->set($key => $pubs);
$self->unlock($key);
}
sub remove_publisher {
my ($self, $chan, $pub) = @_;
my $key = "pubs:$chan";
$self->lock($key);
my $pubs = $$self->get($key) || {};
delete $pubs->{$pub};
$$self->set($key => $pubs);
$self->unlock($key);
}
sub get_index {
my ($self, $chan, $pub) = @_;
($$self->get("pubs:$chan") || {})->{$pub};
}
sub set_index {
my ($self, $chan, $pub, $idx) = @_;
my $pubs = $$self->get("pubs:$chan") || {};
$pubs->{$pub} = $idx;
$$self->set("pubs:$chan", $pubs);
}
1;
|