File: client_data.c

package info (click to toggle)
poolcounter 1.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 244 kB
  • sloc: ansic: 1,065; python: 458; makefile: 33
file content (163 lines) | stat: -rw-r--r-- 4,579 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include <stddef.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <assert.h>
#include "client_data.h"
#include "locks.h"
#include "stats.h"

struct client_data* new_client_data(int fd) {
	struct client_data* cli_data;
	cli_data = malloc( sizeof( *cli_data ) );
	cli_data->used_buffer = 0;
	cli_data->next_lock = 0;
	cli_data->fd = fd;
	int i;
	for ( i = 0; i < MAX_LOCKS_PER_CLIENT; i++ ) {
		cli_data->client_locks[i].state = LS_UNLOCKED;
	}
	return cli_data;
}

int all_unlocked(struct client_data* cli_data) {
	int i;
	for ( i = 0; i < MAX_LOCKS_PER_CLIENT; i++ ) {
		if ( cli_data->client_locks[i].state != LS_UNLOCKED ) {
			fprintf( stderr, "%d lock still locked after free_client_data.  next_lock is %d.  State is %d.\n",
				i, cli_data->next_lock, cli_data->client_locks[i].state );
			return 0;
		}
	}
	return 1;
}

void free_client_data(struct client_data* cli_data) {
	int i;
	// Release locks backwards because that feels better.
	for ( i = cli_data->next_lock - 1; i >= 0 ; i-- ) {
		finish_lock( cli_data->client_locks + i );
	}

	assert( all_unlocked( cli_data ) );
	free( cli_data );
}

/**
 * Initialize and return a pointer to the next available lock in the client or
 * NULL if there aren't any more available locks.
 */
struct locks* init_next_lock(struct client_data* cli_data, struct PoolCounter* parent, enum lock_state state) {
	if ( cli_data->next_lock >= MAX_LOCKS_PER_CLIENT ) {
		return NULL;
	}
	struct locks* l = cli_data->client_locks + cli_data->next_lock;
	l->state = state;
	l->parent = parent;
	l->client_data = cli_data;
	cli_data->next_lock++;
	return l;
}

/**
 * Read data from the client
 * If we filled a line, return the line length, and point to it in *line.
 * If a line is not available, *line will point to NULL.
 * Return -1 or -2 if the socket was closed (gracefully / erroneusly)
 * Line separator is \n.
 * Returned lines end in \0 with \n stripped.
 * Incomplete lines are not returned on close.
 */
int read_client_line(int fd, struct client_data* cli_data, char** line) {
	int n, i;
	*line = NULL;
	n = recv( fd, cli_data->buffer + cli_data->used_buffer, sizeof( cli_data->buffer ) - cli_data->used_buffer, 0 );
	if ( n == 0 ) {
		return -1;
	}
	if ( n == -1 ) {
		if (errno == EAGAIN) {
			/* This shouldn't happen... */
			return 0;
		} else {
			return -2;
		}
	}
	
	for ( i=cli_data->used_buffer; i < cli_data->used_buffer+n; i++ ) {
		if ( cli_data->buffer[i] == '\n' ) {
			cli_data->buffer[i] = '\0';
			*line = cli_data->buffer;
			return i;
		}
	}

	/* Wait for the rest of the line */
	event_add( &cli_data->ev, NULL );
	return 0;
}

/* Recover the space from the buffer which has been read, return another line if available */
int recover_client_buffer(struct client_data* cli_data, int len, char** line) {
	int i;
	*line = 0;
	if ( len >= cli_data->used_buffer ) {
		/* This is a query-response protocol. This should be *always* the case */
		cli_data->used_buffer = 0;
		return 0;
	}

	/* Nonetheless handle the other case */
	memmove(cli_data->buffer, cli_data->buffer + len, cli_data->used_buffer - len);
	cli_data->used_buffer -= len;
	
	for ( i=0; i < cli_data->used_buffer; i++ ) {
		if ( cli_data->buffer[i] == '\n' ) {
			cli_data->buffer[i] = '\0';
			*line = cli_data->buffer;
			return i;
		}
	}

	return 0;
}

/* Sends the message msg to the other side, or nothing if msg is NULL
 * Since the message are short, we optimistically consider that they
 * will always fit and never block (note O_NONBLOCK is set).
 */
void send_client(struct client_data* cli_data, const char* msg) {
	if ( !msg ) return;
	
	size_t len = strlen(msg);

	if ( send( cli_data->fd, msg, len, 0) != len ) {
		perror( "Something failed sending message" );
		incr_stats( failed_sends );
	}
	/* Wait for answer */
	event_add( &cli_data->ev, NULL );
}

void process_timeout(struct client_data* cli_data) {
	/*
	 * Note that you can't cancel a timeout so we just have to be careful and
	 * only do timeout things when the lock looks like its timed out.
	 */
	if ( cli_data->next_lock <= 0 ) {
		return;
	}
	struct locks* l = cli_data->client_locks + cli_data->next_lock - 1;
	if ( ( l->state == LS_WAIT_ANY ) || ( l->state == LS_WAITING ) ) {
		// Ignore any timeouts for locks not waiting - those are just left over
		// because its expensive to cancel them.
		cli_data->next_lock--;
		struct timeval now = { 0 };
		time_stats( l, wasted_timeout_time );
		send_client( cli_data, "TIMEOUT\n" );
		decr_stats( waiting_workers );
		remove_client_lock( l, 0 );
	}
}