File: xmit.c

package info (click to toggle)
innduct 2.2
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 664 kB
  • sloc: sh: 4,270; ansic: 3,114; perl: 130; makefile: 33
file content (356 lines) | stat: -rw-r--r-- 9,755 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
/*
 *  innduct
 *  tailing reliable realtime streaming feeder for inn
 *  xmit.c - transmitting checks and articles, flow control, expiry
 *
 *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
 *  and contributors; see LICENCE.txt.
 *  SPDX-License-Identifier: GPL-3.0-or-later
 */

#include "innduct.h"

const char *const artstate_names[]=
  { "Unchecked", "Wanted", "Unsolicited", 0 };

/*---------- assigning articles to conns, and transmitting ----------*/

static Article *dequeue_from(int peek, InputFile *ipf) {
  if (!ipf) return 0;
  if (peek) return LIST_HEAD(ipf->queue);

  Article *art= LIST_REMHEAD(ipf->queue);
  if (!art) return 0;
  check_reading_pause_resume(ipf);
  return art;
}

static Article *dequeue(int peek) {
  Article *art;
  art= dequeue_from(peek, flushing_input_file);  if (art) return art;
  art= dequeue_from(peek, backlog_input_file);   if (art) return art;
  art= dequeue_from(peek, main_input_file);      if (art) return art;
  return 0;
}

static void conn_inqueue_spare(const Conn *conn,
			       int *inqueue_r, int *spare_r) {
  int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
  int spare= conn->max_queue - inqueue;
  if (inqueue_r) *inqueue_r= inqueue;
  if (spare_r) *spare_r= spare;
}

void check_assign_articles(void) {
  for (;;) {
    if (!dequeue(1))
      break;

    Conn *walk, *use=0;

    /* Find a connection to offer this article.  We prefer a busy
     * connection to an idle one, provided it's not full.  We take the
     * first (oldest) and since that's stable, it will mean we fill up
     * connections in order.  That way if we have too many
     * connections, the spare ones will go away eventually.
     */
    FOR_CONN(walk) {
      if (walk->quitting) continue;
      int inqueue, spare;
      conn_inqueue_spare(walk, &inqueue, &spare);
      assert(inqueue <= max_queue_per_conn);
      assert(spare >= 0);
      if (inqueue==0) /*idle*/ { if (!use) use= walk; }
      else if (spare>0) /*working*/ { use= walk; break; }
    }
    if (use) {
      int inqueue, spare;
      conn_inqueue_spare(use, &inqueue, &spare);
      if (!inqueue) use->since_activity= 0; /* reset idle counter */
      while (spare>0) {
	Article *art= dequeue(0);
	if (!art) break;
	LIST_ADDTAIL(use->waiting, art);
	lowvol_perperiod[lowvol_circptr]++;
	spare--;
      }
      conn_maybe_write(use);
    } else if (allow_connect_start()) {
      connect_start();
      break;
    } else {
      break;
    }
  }
}

static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
  conn_maybe_write(u);
  return OOP_CONTINUE;
}

void conn_maybe_write(Conn *conn) {
  for (;;) {
    conn_make_some_xmits(conn);
    if (!conn->xmitu) {
      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
      conn->oopwriting= 0;
      return;
    }

    void *rp= conn_write_some_xmits(conn);
    if (rp==OOP_CONTINUE) {
      if (!conn->oopwriting) {
	loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
	conn->oopwriting= 1;
      }
      return;
    } else if (rp==OOP_HALT) {
      return;
    } else if (!rp) {
      /* transmitted everything */
    } else {
      abort();
    }
  }
}

/*---------- expiry, flow control and deferral ----------*/

/*
 * flow control notes
 * to ensure articles go away eventually
 * separate queue for each input file
 *   queue expiry
 *     every period, check head of backlog queue for expiry with SMretrieve
 *       if too old: discard, and check next article
 *     also check every backlog article as we read it
 *   flush expiry
 *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
 *     one-off: eat queued articles from flushing and write them to defer
 *     one-off: connfail all connections which have any articles from flushing
 *     newly read articles from flushing go straight to defer
 *     this should take care of it and get us out of this state
 * to avoid filling up ram needlessly
 *   input control
 *     limit number of queued articles for each ipf
 *     pause/resume inputfile tailing
 */

void check_reading_pause_resume(InputFile *ipf) {
  if (ipf->queue.count >= max_queue_per_ipf)
    inputfile_reading_pause(ipf);
  else
    inputfile_reading_resume(ipf);
}

void article_defer(Article *art /* not on a queue */, int whichcount) {
  open_defer();
  if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
      || fflush(defer))
    sysdie("write to defer file %s",path_defer);
  article_done(art, whichcount);
}

int article_check_expired(Article *art /* must be queued, not conn */) {
  ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
  if (artdata) { SMfreearticle(artdata); return 0; }

  LIST_REMOVE(art->ipf->queue, art);
  art->missing= 1;
  art->ipf->counts.events[nooffer_missing]++;
  article_done(art,-1);
  return 1;
}

void inputfile_queue_check_expired(InputFile *ipf) {
  if (!ipf) return;

  for (;;) {
    Article *art= LIST_HEAD(ipf->queue);
    if (!art) break;
    int expd= article_check_expired(art);
    if (!expd) break;
  }
  check_reading_pause_resume(ipf);
}

void article_autodefer(InputFile *ipf, Article *art) {
  ipf->autodefer++;
  article_defer(art,-1);
}

static int has_article_in(const ArticleList *al, InputFile *ipf) {
  Article *art;
  for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
    if (art->ipf == ipf) return 1;
  return 0;
}

static void autodefer_input_file_articles(InputFile *ipf) {
  Article *art;
  while ((art= LIST_REMHEAD(ipf->queue)))
    article_autodefer(ipf, art);
}

void autodefer_input_file(InputFile *ipf) {
  static const char *const abandon= "stuck";
  ipf->autodefer= 0;

  autodefer_input_file_articles(ipf);

  if (ipf->inprogress) {
    Conn *walk;
    FOR_CONN(walk) {
      if (has_article_in(&walk->waiting,  ipf) ||
	  has_article_in(&walk->priority, ipf) ||
	  has_article_in(&walk->sent,     ipf))
	walk->quitting= abandon;
    }
    while (ipf->inprogress) {
      FOR_CONN(walk)
	if (walk->quitting == abandon) goto found;
      abort(); /* where are they ?? */

    found:
      connfail(walk, "connection is stuck or crawling,"
	       " and we need to finish flush");
      autodefer_input_file_articles(ipf);
    }
  }

  check_reading_pause_resume(ipf);
}

/*========== article transmission ==========*/

static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
		  XmitKind kind) { /* caller must then fill in details */
  struct iovec *v= &conn->xmit[conn->xmitu];
  XmitDetails *d= &conn->xmitd[conn->xmitu++];
  v->iov_base= (char*)data;
  v->iov_len= len;
  d->kind= kind;
  return d;
}

static void xmit_noalloc(Conn *conn, const char *data, int len) {
  xmit_core(conn,data,len, xk_Const);
}
#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))

static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
  XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
  d->info.sm_art= ah;
}

void xmit_free(XmitDetails *d) {
  switch (d->kind) {
  case xk_Artdata: SMfreearticle(d->info.sm_art); break;
  case xk_Const:                                  break;
  default: abort();
  }
}

void *conn_write_some_xmits(Conn *conn) {
  /* return values:
   *      0:            nothing more to write, no need to call us again
   *      OOP_CONTINUE: more to write but fd not writeable
   *      OOP_HALT:     disaster, have destroyed conn
   */
  for (;;) {
    int count= conn->xmitu;
    if (!count) return 0;

    if (count > IOV_MAX) count= IOV_MAX;
    ssize_t rs= writev(conn->fd, conn->xmit, count);
    if (rs < 0) {
      if (isewouldblock(errno)) return OOP_CONTINUE;
      connfail(conn, "write failed: %s", strerror(errno));
      return OOP_HALT;
    }
    assert(rs > 0);

    int done;
    for (done=0; rs; ) {
      assert(done<conn->xmitu);
      struct iovec *vp= &conn->xmit[done];
      XmitDetails *dp= &conn->xmitd[done];
      assert(vp->iov_len <= SSIZE_MAX);
      if ((size_t)rs >= vp->iov_len) {
	rs -= vp->iov_len;
	xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
	done++;
      } else {
	vp->iov_base= (char*)vp->iov_base + rs;
	vp->iov_len -= rs;
	break; /* rs -= rs */
      }
    }
    int newu= conn->xmitu - done;
    memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
    memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
    conn->xmitu= newu;
  }
}

void conn_make_some_xmits(Conn *conn) {
  for (;;) {
    if (conn->xmitu+5 > CONNIOVS)
      break;

    Article *art= LIST_REMHEAD(conn->priority);
    if (!art) art= LIST_REMHEAD(conn->waiting);
    if (!art) break;

    if (art->state >= art_Wanted || (conn->stream && nocheck)) {
      /* actually send it */

      ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);

      art->state=
	art->state == art_Unchecked ? art_Unsolicited :
	art->state == art_Wanted    ? art_Wanted      :
	(abort(),-1);

      if (!artdata) art->missing= 1;
      art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;

      if (conn->stream) {
	if (artdata) {
	  XMIT_LITERAL("TAKETHIS ");
	  xmit_noalloc(conn, art->messageid, art->midlen);
	  XMIT_LITERAL("\r\n");
	  xmit_artbody(conn, artdata);
	} else {
	  article_done(art, -1);
	  continue;
	}
      } else {
	/* we got 235 from IHAVE */
	if (artdata) {
	  xmit_artbody(conn, artdata);
	} else {
	  XMIT_LITERAL(".\r\n");
	}
      }

      LIST_ADDTAIL(conn->sent, art);

    } else {
      /* check it */

      if (conn->stream)
	XMIT_LITERAL("CHECK ");
      else
	XMIT_LITERAL("IHAVE ");
      xmit_noalloc(conn, art->messageid, art->midlen);
      XMIT_LITERAL("\r\n");

      assert(art->state == art_Unchecked);
      art->ipf->counts.results[art->state][RC_sent]++;
      LIST_ADDTAIL(conn->sent, art);
    }
  }
}