File: WQBlocked.pm

package info (click to toggle)
public-inbox 1.9.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 4,152 kB
  • sloc: perl: 52,771; sh: 302; ansic: 106; makefile: 37
file content (49 lines) | stat: -rw-r--r-- 1,305 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>

# non-blocking workqueues, currently used by LeiNoteEvent to track renames
package PublicInbox::WQBlocked;
use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
use PublicInbox::IPC;
use Carp ();
use Socket qw(MSG_EOR);

sub new {
	my ($cls, $wq, $buf) = @_;
	my $self = bless { msgq => [$buf], }, $cls;
	$wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT);
}

sub flush_send {
	my ($self) = @_;
	push(@{$self->{msgq}}, $_[1]) if defined($_[1]);
	while (defined(my $buf = shift @{$self->{msgq}})) {
		if (ref($buf) eq 'CODE') {
			$buf->($self); # could be \&PublicInbox::DS::close
		} else {
			my $wq_s1 = $self->{sock};
			my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
								MSG_EOR);
			next if defined($n);
			Carp::croak("sendmsg: $!") unless $!{EAGAIN};
			PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
			unshift @{$self->{msgq}}, $buf;
			last; # wait for ->event_step
		}
	}
}

sub enq_close { flush_send($_[0], $_[0]->can('close')) }

sub event_step { # called on EPOLLOUT wakeup
	my ($self) = @_;
	eval { flush_send($self) } if $self->{sock};
	if ($@) {
		warn $@;
		$self->close;
	}
}

1;