File: recv.c

package info (click to toggle)
innduct 2.2
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 664 kB
  • sloc: sh: 4,270; ansic: 3,114; perl: 130; makefile: 33
file content (254 lines) | stat: -rw-r--r-- 7,770 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/*
 *  innduct
 *  tailing reliable realtime streaming feeder for inn
 *  recv.c - receiving peer responses and disposing of articles
 *
 *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
 *  and contributors; see LICENCE.txt.
 *  SPDX-License-Identifier: GPL-3.0-or-later
 */

#include "innduct.h"

/*========== handling responses from peer ==========*/

const oop_rd_style peer_rd_style= {
  OOP_RD_DELIM_STRIP, '\n',
  OOP_RD_NUL_FORBID,
  OOP_RD_SHORTREC_FORBID
};

void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
		  const char *errmsg, int errnoval,
		  const char *data, size_t recsz, void *conn_v) {
  Conn *conn= conn_v;
  connfail(conn, "error receiving from peer: %s", errmsg);
  return OOP_CONTINUE;
}

static Article *article_reply_check(Conn *conn, const char *response,
				    int code_indicates_streaming,
				    int must_have_sent
					/* 1:yes, -1:no, 0:dontcare */,
				    const char *sanitised_response) {
  Article *art= LIST_HEAD(conn->sent);

  if (!art) {
    connfail(conn,
	     "peer gave unexpected response when no commands outstanding: %s",
	     sanitised_response);
    return 0;
  }

  if (code_indicates_streaming) {
    assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
    if (!conn->stream) {
      connfail(conn, "peer gave streaming response code "
	       " to IHAVE or subsequent body: %s", sanitised_response);
      return 0;
    }
    const char *got_mid= response+4;
    int got_midlen= strcspn(got_mid, " \n\r");
    if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
      connfail(conn, "peer gave streaming response with syntactically invalid"
	       " messageid: %s", sanitised_response);
      return 0;
    }
    if (got_midlen != art->midlen ||
	memcmp(got_mid, art->messageid, got_midlen)) {
      connfail(conn, "peer gave streaming response code to wrong article -"
	       " probable synchronisation problem; we offered: %s;"
	       " peer said: %s",
	       art->messageid, sanitised_response);
      return 0;
    }
  } else {
    if (conn->stream) {
      connfail(conn, "peer gave non-streaming response code to"
	       " CHECK/TAKETHIS: %s", sanitised_response);
      return 0;
    }
  }

  if (must_have_sent>0 && art->state < art_Wanted) {
    connfail(conn, "peer says article accepted but"
	     " we had not sent the body: %s", sanitised_response);
    return 0;
  }
  if (must_have_sent<0 && art->state >= art_Wanted) {
    connfail(conn, "peer says please sent the article but we just did: %s",
	     sanitised_response);
    return 0;
  }

  Article *art_again= LIST_REMHEAD(conn->sent);
  assert(art_again == art);
  return art;
}

static void update_nocheck(int accepted) {
  accept_proportion *= nocheck_decay;
  accept_proportion += accepted * (1.0 - nocheck_decay);
  int new_nocheck= accept_proportion >= nocheck_thresh;
  if (new_nocheck && !nocheck_reported) {
    notice("entering nocheck mode for the first time");
    nocheck_reported= 1;
  } else if (new_nocheck != nocheck) {
    dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
  }
  nocheck= new_nocheck;
}

void article_done(Article *art, int whichcount) {
  if (whichcount>=0 && !art->missing)
    art->ipf->counts.results[art->state][whichcount]++;

  if (whichcount == RC_accepted)
    update_nocheck(1);
  else if (whichcount == RC_unwanted ||
	   (whichcount == RC_rejected && art->state == art_Unsolicited))
    update_nocheck(0);

  InputFile *ipf= art->ipf;

  while (art->blanklen) {
    static const char spaces[]=
      "                                                                "
      "                                                                "
      "                                                                "
      "                                                                "
      "                                                                "
      "                                                                "
      "                                                                "
      "                                                                "
      "                                                                ";
    int nspaces= sizeof(spaces)-1;
    int w= art->blanklen;  if (w > nspaces) w= nspaces;
    int r= pwrite(ipf->fd, spaces, w, art->offset);
    if (r==-1) {
      if (errno==EINTR) continue;
      syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
	       art->messageid, art->blanklen,
	       (unsigned long)art->offset, ipf->path);
    }
    assert(r>=0 && r<=w);
    art->blanklen -= w;
    art->offset += w;
  }

  ipf->inprogress--;
  assert(ipf->inprogress >= 0);
  free(art);

  if (!ipf->inprogress && ipf != main_input_file)
    queue_check_input_done();
}

void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
		 const char *errmsg, int errnoval,
		 const char *data, size_t recsz, void *conn_v) {
  Conn *conn= conn_v;

  if (ev == OOP_RD_EOF) {
    connfail(conn, "unexpected EOF from peer");
    return OOP_CONTINUE;
  }
  assert(ev == OOP_RD_OK);

  char *sani= sanitise(data,-1);

  char *ep;
  unsigned long code= strtoul(data, &ep, 10);
  if (ep != data+3 || *ep != ' ' || data[0]=='0') {
    connfail(conn, "badly formatted response from peer: %s", sani);
    return OOP_CONTINUE;
  }

  int busy= conn_busy(conn);

  if (conn->quitting) {
    if (code!=205 && code!=400) {
      connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
	       conn->quitting, sani);
    } else {
      LIST_REMOVE(conns,conn);
      info("C%d (now %d) idle connection closed (%s)",
	     conn->fd, conns.count, conn->quitting);
      notice_conns_fewer();
      assert(!busy);
      conn_dispose(conn);
    }
    return OOP_CONTINUE;
  }

  conn->since_activity= 0;
  Article *art;

#define GET_ARTICLE(musthavesent) do{					      \
    art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
    if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
  }while(0) 

#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{	\
    code_streaming= (streaming);				\
    GET_ARTICLE(musthavesent);					\
    article_done(art, RC_##how);				\
    goto dealtwith;						\
  }while(0)

#define PEERBADMSG(m) do {					\
    connfail(conn, m ": %s", sani);  return OOP_CONTINUE;	\
  }while(0)

  int code_streaming= 0;

  switch (code) {

  default:  PEERBADMSG("peer sent unexpected message");

  case 400:
    if (busy)
      PEERBADMSG("peer timed us out or stopped accepting articles");

    LIST_REMOVE(conns,conn);
    info("C%d (now %d) idle connection closed by peer",
	 conns.count, conn->fd);
    notice_conns_fewer();
    conn_dispose(conn);
    return OOP_CONTINUE;

  case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
  case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */

  case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
  case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */

  case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
  case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */

  case 238: /* CHECK says send it */
    code_streaming= 1;
  case 335: /* IHAVE says send it */
    GET_ARTICLE(-1);
    assert(art->state == art_Unchecked);
    art->ipf->counts.results[art->state][RC_accepted]++;
    art->state= art_Wanted;
    LIST_ADDTAIL(conn->priority, art);
    break;

  case 431: /* CHECK or TAKETHIS says try later */
    code_streaming= 1;
  case 436: /* IHAVE says try later */
    GET_ARTICLE(0);
    article_defer(art, RC_deferred);
    break;

  }
dealtwith:

  conn_maybe_write(conn);
  check_assign_articles();
  return OOP_CONTINUE;
}