File: csinksocket.c

package info (click to toggle)
entity 0.7.2-6
  • links: PTS
  • area: main
  • in suites: woody
  • size: 5,352 kB
  • ctags: 5,272
  • sloc: ansic: 61,707; sh: 7,921; makefile: 732; perl: 399
file content (570 lines) | stat: -rw-r--r-- 14,542 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
#include "csink.h"
#include "csinksocket.h"

/* debugging */
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>


static void csink_socket_connect_action (CSink *sink);
static void csink_socket_free (CSinkSocket *sink);
gint
csink_socket_write (CSinkSocket * sink, CBuf * data);

void
csink_socket_init (CSinkSocket *sink, int address_len, int address_family)
{
  csink_init(CSINK(sink));
  CSINK(sink)->csink_type = CSINK_SOCKET_TYPE;

  sink->address_len = address_len;
  sink->address_family = address_family;
  sink->fd = -1;

  if (sink->remote_address)
     free (sink->remote_address);
  sink->remote_address = (struct sockaddr *)malloc(address_len);

  if (sink->local_address)
     free (sink->local_address);
  sink->local_address = (struct sockaddr *)malloc(address_len);

  CSINK(sink)->open = (CSinkOpenFunc)csink_socket_open;
  CSINK(sink)->write = (CSinkWriteFunc)csink_socket_write;
  CSINK(sink)->close = (CSinkCloseFunc)csink_socket_close;
  CSINK(sink)->free = (CSinkFreeFunc)csink_socket_free;

  sink->listen = (CSinkSocketListenFunc)csink_socket_default_listen;
  sink->open_action = csink_socket_connect_action;

  sink->can_read_action = (CSinkSocketCanReadFunc)csink_socket_can_read_action;
  sink->can_write_action =
		(CSinkSocketCanReadFunc)csink_socket_can_write_action;
}

void
csink_socket_release (CSinkSocket *sink)
{
  if (sink->local_address) {
      free (sink->local_address);
      sink->local_address = NULL;
  }
  if(sink->remote_address) {
      free (sink->remote_address);
      sink->remote_address = NULL;
  }

  CDEBUG (("csinksocket", "in csink_socket_release"));
  if (sink->read_watch_tag) {
      CDEBUG (("csinksocket", "clearing read tag"));
      csink_remove_fd(sink->read_watch_tag, 
		"Socket sink being freed");
      sink->read_watch_tag = NULL;
  }
  if (sink->write_watch_tag) {
      CDEBUG (("csinksocket", "clearing write tag"));
      csink_remove_fd(sink->write_watch_tag, 
		"Socket Sink being freed");
      sink->write_watch_tag = NULL;
  }

  csink_release (CSINK(sink));

}

void
csink_socket_free (CSinkSocket *sink)
{
  /* this is not expected to be user-accesssible, as csinksocket is abstract.
   * it's just here for completeness. */
  csink_socket_release (sink);
  free (sink);
}

void
csink_socket_close (CSinkSocket *sink)
{
  close (sink->fd);

  if (sink->read_watch_tag) {
      csink_remove_fd (sink->read_watch_tag,
		"Socket sink closing");
      sink->read_watch_tag = NULL;
  }
  if (sink->write_watch_tag) {
      csink_remove_fd (sink->write_watch_tag,
		"Socket sink closing");
      sink->write_watch_tag = NULL;
  }
}

void
csink_socket_set_remote_address (CSinkSocket *sink, struct sockaddr *address)
{
  /* memset(sink->remote_address, 0, sink->address_len); */
  memcpy(sink->remote_address, address, sink->address_len);
}

void
csink_socket_set_local_address (CSinkSocket *sink, struct sockaddr *address)
{
  /* memset(sink->local_address, 0, sink->address_len); */
  memcpy(sink->local_address, address, sink->address_len);
}


void
csink_socket_set_accept_func (CSinkSocket *sink, CSinkCallbackFunc func)
{
    sink->on_accept = func;
}

void
csink_socket_on_accept (CSinkSocket *sink, CSink * newconn)
{
  if (sink->on_accept)
    sink->on_accept (newconn);
  else {
    csink_close(newconn);
    csink_free(newconn);
  }
}


int
csink_socket_listen (CSinkSocket *sink)
{

  CDEBUG (("csinksocket", "in csink_socket_listen"));

  if(!sink->listen) {
    CDEBUG(("csinksocket", "Attempt to call null listen function"));
    /* set the error object here */
    return -1;
  }

  /* Ensure we are in the proper state. */
  if (sink->status & (SOCKET_CONNECTED |  SOCKET_CONNECT_INPROGRESS)) {
    CDEBUG (("csinksocket", 
	"attempt to listen on an already connected sink."));
    /* set the error object here */
    return -1;
  }

  return sink->listen(CSINK(sink));
}


void
csink_socket_open (CSinkSocket *sink)
{
  int res;

  struct sockaddr_in addr;

  int len = sizeof(addr);

  CDEBUG (("csinksocket", "open, called."));

  sink->fd = socket (AF_INET, SOCK_STREAM, 0);
  /* sink->fd = socket (sink->address_family, SOCK_STREAM, 0); */

  if (sink->fd < 0) {		/* error */
    CDEBUG (("csinksocket", "open, couldn't get an fd. returning."));
    csink_on_error (CSINK (sink), "SOCKET_CREATE_FAILED");
    return;
  }

  /* IO in csink is ALWAYS nonblocking. */
  fcntl (sink->fd, F_SETFL, O_NONBLOCK);

  res = connect (sink->fd, (struct sockaddr *) sink->remote_address, 
		 len);
  CDEBUG (("csinksocket", "open, result of connect() call was %d.", res));
  CDEBUG (("csinksocket", "open, fd is %d.", sink->fd));

  if (res < 0) {		/* Error? */
    switch (errno) {
    case EINPROGRESS:
    case EALREADY:
      CDEBUG (("csinksocket", "open, connection is happening nonblockingly"));
      sink->status |= SOCKET_CONNECT_INPROGRESS;

      /* Use the read watch because we aren't into a user defined state. */
      sink->read_watch_tag = csink_add_fd (sink->fd, 
			EIO_READ | EIO_WRITE | EIO_ERROR,
                        sink->open_action,
			CSINK (sink),
			"Nonblocking socket connect wait tag");
      break;
    default:
      CDEBUG (("csinksocket", "open, error in connect, aborting."));
      sink->status &= ~(SOCKET_CONNECT_INPROGRESS | SOCKET_CONNECTED);
      csink_close (CSINK (sink));
      csink_on_error (CSINK (sink), "CONNECT_FAILURE");
    }
  } else {
      CDEBUG (("csinksocket", "open, connection established."));
      if (sink->open_action)
	sink->open_action  (CSINK(sink));
  }

  CDEBUG (("csinksocket", "open, returning."));
}


/* Need to setup the write handler code. */
gint
csink_socket_write (CSinkSocket * sink, CBuf * message)
{


    if (FALSE == csink_write_sanity (CSINK(sink), message) ) {
        return FALSE;
    }
       
    /* If we're not connected, leave. */
    if (! (sink->status & SOCKET_CONNECTED)) {
        csink_on_error (CSINK (sink), "DISCONN_WRITE");
        return FALSE;
    }


    /* Need to queue up the data. */
    csink_default_write (CSINK(sink), message);

    if (NULL == sink->write_watch_tag) {
        sink->write_watch_tag = csink_add_fd (sink->fd,
		EIO_WRITE, sink->can_write_action,
		CSINK (sink),
		"Socket write tag");
    }

    return TRUE;
}


int
csink_socket_do_connect (CSinkSocket *sink)
{
  int ret;
  int optval;
  socklen_t option_len = sizeof(optval);


  /* Remove our watch. */
  if (sink->read_watch_tag) {
      csink_remove_fd (sink->read_watch_tag, 
		"About to try connect, clearing read tag.");
      sink->read_watch_tag = NULL;
  }
  

  /* Need to see if we got connected. */
  errno = 0;
  optval = 0;
  ret = getsockopt (sink->fd, SOL_SOCKET, SO_ERROR, &optval, &option_len);

  /* Solaris needs this; see stevens, Unix Network Pgm'ng, 15.4 */
  if (ret < 0)
    optval = errno;

  CDEBUG (("csinksocket", "Getsockopt: ret = %i, optval = %i", ret, optval));


  /* See if there was a bad error, or if we didn't get connected... */
  if (0 != optval ) {
      CSINK_SOCKET (sink)->status &= 
			~(SOCKET_CONNECT_INPROGRESS | SOCKET_CONNECTED);

      CDEBUG (("csinksocket", "failure for unknown reason: %s(%d)", 
	       strerror(optval), 
	       optval));
      csink_on_error (CSINK(sink), "CONNECT_FAILURE");
      csink_close (CSINK(sink));
      return FALSE;
  }


  sink->status |= SOCKET_CONNECTED;
  sink->status &= ~SOCKET_CONNECT_INPROGRESS;

  CDEBUG (("csinksocket", "Connect callback...\n"));

  
  /* Always need to read from the socket. */
  sink->read_watch_tag =
	csink_add_fd (sink->fd, EIO_READ | EIO_ERROR, 
		sink->can_read_action, CSINK (sink),
		"Read tag for socket sink");

  return TRUE;
}


static void
csink_socket_connect_action (CSink *sink_)
{
  CSinkSocket *sink = CSINK_SOCKET(sink_);


  CDEBUG (("csinksocket", "connect_action, called"));

  if (!csink_socket_do_connect (sink))
    return;

  CDEBUG (("csinksocket", "connect_action, calling connect callback"));  
  csink_on_connect (CSINK (sink));
  CDEBUG (("csinksocket", "connect_action, back from callback"));  

  CDEBUG (("csinksocket", "connect_action, returning"));
}


void
csink_socket_can_read_action (CSinkSocket *sink)
{
  char buf[256] = "";
  CBuf *newmsg;
  int res;

    
  /* Read a small block. */
  res = recv (sink->fd, buf, 255, 0);

  if(res > 0)
    CDEBUG (("csinksocket", "read off %i bytes.\n", res));
  
  /* end-of-file */
  if (res == 0) {
    CDEBUG (("csinksocket",
	     "Connection closed on other end. closing link.\n"));
    csink_close (CSINK (sink));
    return;
  }
  
  /* error */
  if (res < 0) {
    switch (errno) {
    case ENOTSOCK:
    case EBADF:			/* Not a socket. */
      csink_close (CSINK(sink));
      csink_on_error (CSINK(sink), "READ_EBADF");
      break;
    case ENOTCONN:		/* Not connected. */
      csink_close (CSINK(sink));
      csink_on_error (CSINK(sink), "IMPLEMENTATION_BUG");
      break;
    case EINTR:			/* Just try again later. */
    case EAGAIN:
      break;
    default:			/* Lord only knows. */
      csink_close(CSINK(sink));
      csink_on_error (CSINK(sink), "UNKNOWN");	
    }
    return;
  }
  
  /* Create a CBuf from the buf. 
   * +1, because we need the \0 for good luck. :) */
  newmsg = cbuf_new_with_data (buf, res+1);

  g_ptr_array_add (CSINK (sink)->inQueue, (gpointer) newmsg);

  csink_on_new_data (CSINK(sink));
}


void
csink_socket_can_write_action (CSinkSocket * sink)
{
    CBuf *partial;
    gint res;
    CBuf *cur;


    CDEBUG (("csinksocket", "in csink_socket_can_write_action"));

    if (! (sink->status & SOCKET_CONNECTED) ) {
        /* Need to clean up stuff, there is user error going on
         * if this is the case. We need to call the cleanup code. 
         * We need to disable the select(2) on this fd. */
        /* FIXME. MW */
        return;
    }


    /* Send loop for sending the queue. */
    while (0 != CSINK (sink)->outQueue->len) {
	int written = 0;
	CDEBUG (("csinksocket", "SEND LOOP--CONNECTED (1)\n"));

	cur = (CBuf *) g_ptr_array_index (CSINK (sink)->outQueue, 0);
	g_ptr_array_remove_index (CSINK (sink)->outQueue, 0);

	while (written < cur->len) {
	    res = send (sink->fd, 			      /* No SIGPIPE. */
			cur->str + written, cur->len - written, 0);

	    if (res < 0) {	/* Error. */
		switch (errno) {
		case EPIPE:	/* Connection broken. */
		    csink_on_error (CSINK (sink), "SEND_PIPE");
		    csink_close (CSINK (sink));
		    goto fail;
		case ENOTSOCK:
		case EBADF:	/* Not a valid socket file descriptor. */
		    csink_close (CSINK (sink));
		    csink_on_error (CSINK (sink), "SEND_ENOTSOCK");
		    goto fail;
		case EMSGSIZE:
		    csink_close (CSINK (sink));
		    csink_on_error (CSINK (sink), "SEND_EMSGSIZE");
		    goto fail;
		case ENOTCONN:
		    csink_close (CSINK (sink));
		    csink_on_error (CSINK (sink), "IMPLEMENTATION_BUG");
		    goto fail;
		
		case ENOBUFS:
		case EWOULDBLOCK: /* Wait for a while and try again. */
		    /* Push what's left of the message back into the queue. */
		    partial = cbuf_new_with_data (cur->str+written, 
							cur->len-written);
		    g_ptr_array_add_at_start (CSINK(sink)->outQueue, 
					    (gpointer) partial);
                    cbuf_free (cur);
		    return;

		case EINTR:	/* interrupted system call; just try again */
		    break;

		default:
		    csink_close (CSINK (sink));
		    csink_on_error (CSINK (sink), "UNKNOWN");
		    goto fail;
		}
	    } else {	/* Success. */
		written += res;
	    }
	} /* END Current hunck code. */

        cbuf_free (cur);
    }


  /* Disconnect write tag now that everthing is done being sent. */
  csink_remove_fd (sink->write_watch_tag, 
		"Socket Write Queue is empty"); 
  sink->write_watch_tag = NULL;

  return;


fail:
  cbuf_free (cur);
}


int
csink_socket_default_listen (CSinkSocket * sink)
{
  int res;
  int true = 1;

  
  CDEBUG (("csinksocket", "in csink_socket_default_listen"));

  sink->status &= ~(SOCKET_CONNECT_INPROGRESS);

  /* Do all of the fun stuff necessary to initialize a listening socket. */
  sink->fd = socket (sink->address_family, SOCK_STREAM, 0);

  if (sink->fd < 0) {
    csink_on_error (CSINK (sink), "SOCKET_CREATE_FAILED");
    /* set error object */
    return -1;
  }

  /* Set the SO_REUSEADDR option on the socket; this gets around the nasty
   * stupid fact that sockets aren't closed properly when a process exits. */
  res = setsockopt (sink->fd, SOL_SOCKET, SO_REUSEADDR, 
		    &true, sizeof (int));
  if (res < 0) {
    switch (errno) {
    case EBADF:		/* bad socket descriptor */
      csink_on_error (CSINK (sink), "EBADF");
      break;
    case ENOTSOCK:		/* fd was not a socket */
      csink_on_error (CSINK (sink), "ENOTSOCK");
      break;
    case ENOPROTOOPT:	/* that option doesn't make any sense at that 
			 * level */
      csink_on_error (CSINK (sink), "ENOPROTOOPT");
      break;
    default:
      csink_on_error (CSINK (sink), "UNKNOWN");
    }
    return -1;
  }


  /* Socket stuff. */
  fcntl (sink->fd, F_SETFL, O_NONBLOCK);

  res = bind (sink->fd, sink->local_address, sink->address_len);
  if (res < 0) {
    switch (errno) {
    case EBADF:		/* bad socket descriptor */
      csink_on_error (CSINK (sink), "EBADF");
      break;
    case ENOTSOCK:		/* fd was not a socket */
      csink_on_error (CSINK (sink), "ENOTSOCK");
      break;
    case EADDRNOTAVAIL:	/* local address/port  not available on this
			 * machine */
      csink_on_error (CSINK (sink), "EADDRNOTAVAIL");
      break;
    case EADDRINUSE:	/* that address/port is in use. */
      csink_on_error (CSINK (sink), "EADDRINUSE");
      break;
    case EINVAL:		/* socket already has an address */
      csink_on_error (CSINK (sink), "EINVAL");
      break;		/* is this fatal?? I don't know. */
    case EACCES:
      csink_on_error (CSINK (sink), "EACCES");
      break;
    default:
      csink_on_error (CSINK (sink), "UNKNOWN");
      break;
    }
    return -1;
  }

  res = listen (sink->fd, 2);
  if (res < 0) {
    switch (errno) {
    case EBADF:		/* bad socket descriptor */
      csink_on_error (CSINK (sink), "EBADF");
      break;
    case ENOTSOCK:		/* fd was not a socket */
      csink_on_error (CSINK (sink), "ENOTSOCK");
      break;
    case EOPNOTSUPP:	/* this is not a listenable socket */
      csink_on_error (CSINK (sink), "EOPNOTSUPP");
      break;
    }
    return -1;
  }

  sink->status |= SOCKET_CONNECTED;

  sink->read_watch_tag = csink_add_fd (sink->fd, 
		EIO_READ | EIO_WRITE | EIO_ERROR,
		sink->accept_action, CSINK (sink),
		"Socket listen tag");
  return 0;
}