File: Memory.pm

package info (click to toggle)
libmojo-ioloop-readwriteprocess-perl 1.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 540 kB
  • sloc: perl: 4,655; sh: 101; makefile: 2
file content (219 lines) | stat: -rw-r--r-- 5,640 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package Mojo::IOLoop::ReadWriteProcess::Shared::Memory;

use Mojo::IOLoop::ReadWriteProcess::Shared::Lock;
use Mojo::Base -base;

use Carp qw(croak confess);
use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
use IPC::SharedMem;
use Config;
use IPC::SysV
  qw(ftok IPC_PRIVATE IPC_NOWAIT IPC_CREAT IPC_EXCL S_IRUSR S_IWUSR S_IRGRP S_IWGRP S_IROTH S_IWOTH SEM_UNDO S_IRWXU S_IRWXG);

our @EXPORT_OK = qw(shared_memory shared_lock semaphore);
use Exporter 'import';

has key => sub { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore::_genkey() };
has 'buffer';
has destroy => 0;
has flags   => S_IRWXU() | S_IRWXG() | IPC_CREAT();
has lock_flags => IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP
  | S_IROTH | S_IWOTH;
has _size          => 10 * 1024;
has _shared_memory => sub { $_[0]->_newmem() };
has _shared_size =>
  sub { $_[0]->_newmem((2 * shift->key) - 1, $Config{intsize}) };
has _lock => sub {
  Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(
    flags => $_[0]->lock_flags,
    key   => (2 * shift->key) + 1
  );
};

has dynamic_resize    => 1;
has dynamic_decrement => 1;
has dynamic_increment => 1;

sub shared_lock   { Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(@_) }
sub semaphore     { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore->new(@_) }
sub shared_memory { __PACKAGE__->new(@_) }

sub new {
  my $s = shift->SUPER::new(@_);
  confess 'Could not allocate shared size memory ' . $s->key
    unless $s->_shared_size;
  $s->_loadsize;
  confess 'Could not allocate shared memory with key ' . $s->key
    unless $s->_shared_memory;
  return $s;
}

sub _encode_content { $_[0]->buffer(unpack 'H*', shift->buffer()) }
sub _decode_content { $_[0]->buffer(pack 'H*',   shift->buffer()) }

sub _writesize {
  my $self = shift;
  my $size = shift;
  $self->_shared_size()->write(pack('I', $size), 0, $Config{intsize});
}

sub _readsize {
  my $self = shift;
  my $s    = $self->_shared_size()->read(0, $Config{intsize});
  return unpack('I', $s);
}

sub _loadsize {
  my $s        = $_[0]->_readsize;
  my $cur_size = $_[0]->_size;
  $s = $_[0]->_size if $s == 0;
  $_[0]->_size($s =~ /\d/ ? $s : $_[0]->_size);
  $_[0]->_writesize($_[0]->_size) and $_[0]->_shared_memory($_[0]->_newmem)
    if $s != $cur_size;

  warn "[debug:$$] Mem size: " . $_[0]->_size if DEBUG;
}

sub _reload {
  $_[0]->_shared_memory($_[0]->_newmem);
  $_[0]->_shared_memory($_[0]->_newmem) until defined $_[0]->_shared_memory;
}

# Must be run in a locked section
sub resize {
  my $self = shift;
  $self->_shared_memory->detach();
  1 until $self->_safe_remove;
  $self->_size($_[0] // length $self->buffer);
  $self->_reload;

  # XXX: is faster to re-allocate the shared memory with shmctl, but SHM_SIZE
  # seems to not be really portable:
  # shmctl $_[0]->_shared_memory->id, SHM_SIZE, struct
#  $_[0]->_writesize($_[1] // length $_[0]->buffer ) if $_[0]->_shared_memory;
}

# Must be run in a locked section
sub _sync_size {
  warn "[debug:$$] Sync size for content ("
    . length($_[0]->buffer)
    . ") vs currently allocated ("
    . $_[0]->_size . ")"
    if DEBUG;
  $_[0]->resize;
}

sub save {
  warn "[debug:$$] Writing data : " . $_[0]->buffer if DEBUG;

  $_[0]->_encode_content;

  eval {
    # Resize
    $_[0]->_sync_size
      if (
      $_[0]->dynamic_resize && (
        (
          $_[0]->dynamic_increment
          && (defined $_[0]->buffer && length $_[0]->buffer > $_[0]->_size)
        )    # Increment
        || ($_[0]->dynamic_decrement
          && (defined $_[0]->buffer && $_[0]->_size > length $_[0]->buffer)
        )    # Decrement
      ));
    $_[0]->_writesize($_[0]->_size) if $_[0]->_shared_memory();

#    $_[0]->_reload;

    $_[0]->_shared_memory()->write($_[0]->buffer, 0, $_[0]->_size)
      if $_[0]->_shared_memory();
  };

  warn "[debug:$$] Error Saving data : $@" if $@ && DEBUG;

  $_[0]->_shared_memory->detach() if $_[0]->_shared_memory;
  return                          if $@;
  return 1;
}

sub _newmem {
  IPC::SharedMem->new(
    $_[1] // $_[0]->key(),
    $_[2] // $_[0]->_size,
    $_[0]->flags
  );
}

sub load {

  eval {
    $_[0]->_loadsize;
    warn "[debug:$$] Reading " . $_[0]->_size if DEBUG;
    $_[0]->_reload;
    $_[0]->_shared_memory->attach();
    $_[0]->buffer($_[0]->_shared_memory()->read(0, $_[0]->_size));

# XXX: Remove the 0 padding?
# substr($_[0]->{buffer}, index($_[0]->{buffer}, "\0")) = "";
    $_[0]->_decode_content;
  };

  warn "[debug:$$] Error Loading data : $@" if $@ && DEBUG;
  return                                    if $@;
  return 1;
}

sub _safe_remove {
  my $self = shift;
  my $stat = $self->_shared_memory()->stat();
  if (defined($stat) && ($stat->nattch() == 0)) {
    $self->_shared_memory()->remove();
    return 1;
  }
  return 0;
}

sub remove {
  my $self = shift;
  $self->_shared_memory->detach();
  $self->_lock->remove;
  $self->_shared_size()->remove();
  return $self->_safe_remove;
}

sub clean {
  my $self = shift;
  $self->lock_section(sub { $self->buffer(' ')->save });
}

sub unlock {
  eval { $_[0]->save };
  shift->_lock->unlock(@_);
}

sub lock     { my $s = shift; my $r = $s->_lock->lock(@_); $s->load; $r }
sub try_lock { $_[0]->_lock->try_lock() }

sub lock_section {
  my ($self, $fn) = @_;

  return $self->_lock->lock_section(
    sub {
      my $r;
      {
        $self->load;
        local $@;
        $r = eval { $fn->() };
        warn "[debug:$$] Error inside locked memory section : $@"
          if $@ && DEBUG;
        eval { $self->save };
      };
      return $r;
    });
}

sub stat { shift->_shared_memory->stat }

sub DESTROY { $_[0]->remove if $_[0]->destroy() }

!!42;