File: storage.cc

package info (click to toggle)
trafstats 0.4.20-1
  • links: PTS
  • area: main
  • in suites: woody
  • size: 296 kB
  • ctags: 110
  • sloc: cpp: 1,036; sh: 475; perl: 173; makefile: 98
file content (145 lines) | stat: -rw-r--r-- 4,022 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/* 
 * Storage.cc: The thread in this piece of the code handles the actual 
 * storage from the list into the database.
 * Spliced out of sniffercore.cc Wed, 10 Oct 2001 11:02:42 +0200
 * Last revision: Wed, 10 Oct 2001 11:12:21 +0200
*/
#include "trafstats.h"
#include "CTrafStatsList.h" 

#include <time.h>
extern pthread_mutex_t datalock;
extern pthread_mutex_t storagegonogolock;
extern pthread_cond_t  storage_gonogo;

extern CTrafStatsList theList;
extern int verbosity;
extern bool run;
PgDatabase *dbConn=NULL;

void storageSigHandler(int sig) {
	if(verbosity >= LOG_DEBUG) {
		syslog(LOG_DEBUG,"[storage] Received signal %d",sig);
	}
	switch(sig) {
	case SIGUSR1: // Orders from above. Time to go.
		// FALLTHRU
	case SIGINT: // ^C pressed
		// FALLTHRU
	case SIGTERM: // kill received
		     if(verbosity >= LOG_INFO) {
			     syslog(LOG_INFO,"[storage] Received signal %d, exiting.",
					     sig);
		     }
		     run=false;
		     break;
	default: // Er?
		     if(verbosity >= LOG_NOTICE) {
			     syslog(LOG_NOTICE,"[storage] I don't know what to do with %d.",sig);
		     }
		     
	}
}

void *storageThread(void *d) {
	storage_options* opts=(struct storage_options*)d;
	int interval=opts->interval;
	int delay=interval;
	run=true;
	unsigned long thetime=0;
	
	if(verbosity >= LOG_DEBUG) {
		syslog(LOG_DEBUG,"[storage] Storage thread launched.");
		syslog(LOG_DEBUG,"[storage] Setting signal handlers.");
	}

	signal(SIGUSR1,storageSigHandler);
	signal(SIGINT,storageSigHandler);
	signal(SIGTERM,storageSigHandler);
	char *connstr=opts->db_connstr;
	if(verbosity >= LOG_DEBUG) {
		syslog(LOG_DEBUG,"[storage] Contacting database using connection string <%s>.",connstr);
	}

	dbConn = new PgDatabase(connstr);

	if(dbConn->ConnectionBad()) { // Can't connect to the database, 
					// so aborting.
		if(verbosity >= LOG_CRIT) {
			syslog(LOG_CRIT,"[storage] Failed to connect to database: %s",
				dbConn->ErrorMessage());
		}
		run=0;
		// pthread_mutex_lock(&storagegonogolock);
		pthread_cond_broadcast(&storage_gonogo);
		// pthread_mutex_unlock(&storagegonogolock);
		pthread_exit(NULL);
	}
	// Report to the main thread that we're good to go.
	// 

	// pthread_mutex_lock(&storagegonogolock);
	pthread_cond_broadcast(&storage_gonogo);
	// pthread_mutex_unlock(&storagegonogolock);


	// Go.
	while(run==1) {
		if(verbosity >= LOG_INFO) {
			syslog(LOG_INFO,"[storage] Starting sleep: %d seconds.",delay);
		}
		int slept=sleep(delay); 
		// Waking up
		// 
		if(slept > 0) {
			if(verbosity >= LOG_INFO) {
				syslog(LOG_INFO,"[storage] Woken up prematurely after %d seconds",delay-slept);
			}
			if(run==0) {
				if(verbosity >= LOG_INFO) {
					syslog(LOG_INFO,"[storage] Time to leave.");
				}
				break;
			}
		}
		thetime=(unsigned long)time(NULL);
		if(verbosity >= LOG_INFO) {
			syslog(LOG_INFO,"[storage] Delay passed. Storing %d entries.",theList.size());
		}
		// Lock the list while copy in progress
		pthread_mutex_lock(&datalock);
		CTrafStatsList tempList=theList;
		
		// flush the original list and release the lock.
		theList.clear();
		pthread_mutex_unlock(&datalock);
		
		// Store the copy to database.
		tempList.storeToDatabase(dbConn);
		thetime=((unsigned long)time(NULL)) - thetime;
		if(verbosity >= LOG_INFO) {
			syslog(LOG_INFO,"[storage] Finished storage of %d entries in %ld seconds, %ld entries/second", 
					tempList.size(),
					thetime,
					tempList.size()/thetime);
		}
		delay=interval - (thetime % interval);
		if(verbosity >= LOG_DEBUG) {
			syslog(LOG_DEBUG,"[storage] Reducing delay to %d seconds in compensation",delay);
		}
	}

	if(verbosity>=LOG_INFO) {
		thetime=(unsigned long)time(NULL);
		syslog(LOG_INFO,"[storage] Ending threads; Performing final storage of %d entries.",theList.size());
	}

	pthread_mutex_lock(&datalock);
	theList.storeToDatabase(dbConn);
	if(verbosity >= LOG_INFO) {
		thetime=((unsigned long)time(NULL)) - thetime;
		syslog(LOG_INFO,"[storage] Finished final storage in %ld seconds.",thetime);
	}
	pthread_exit(NULL);
}