File: StressTest.cpp

package info (click to toggle)
liblsl 1.16.2b1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 1,724 kB
  • sloc: cpp: 12,515; ansic: 666; python: 28; sh: 25; makefile: 18
file content (211 lines) | stat: -rw-r--r-- 9,739 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#include <stdint.h>
#include "../../include/lsl_cpp.h"
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <boost/functional/hash.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/atomic.hpp>
#include <iostream>
#include <time.h>
#include <stdlib.h>
using namespace std;

// key stress constants
int max_outlets = 15; // was 20
int max_inlets = 20; // was 30
int min_chunk_len_ms = 1;
int max_chunk_len_ms = 100;
int max_inlet_poll_interval_ms = 100;
int inlet_max_failure_interval_ms = 2000;
int outlet_max_failure_interval_ms = 2000;
int inlet_min_failure_interval_ms = 1;
int outlet_min_failure_interval_ms = 1;
int max_outlet_duration = 10;
int max_inlet_duration = 10;
double spawn_inlet_interval = 0.5;
double spawn_outlet_interval = 0.5;
int max_srate = 1000;
int max_channels = 10;
int max_buffered = 6;

// misc parameters
int max_chunk_oversize_factor = 5;
int max_samples = 10000000;

lslboost::atomic<int> num_outlets(0);
lslboost::atomic<int> num_inlets(0);

// some random names, types and formats
lsl::channel_format_t fmts[] = {lsl::cf_int8,lsl::cf_int16,lsl::cf_int32,lsl::cf_float32,lsl::cf_double64,lsl::cf_string};
const char *names[] = {"Test1","Test2","Test3","Test4"};
const char *types[] = {"EEG","Audio","MoCap"};

// set to true by the main program when we're ready to quit
bool stop_inlet = false, stop_outlet = false, start_outlet = false; 


// initialize a sample with data
template<class T>
void init_sample(int numchan, vector<T> &sample) {
	sample.resize(numchan);
	for (int c=0;c<numchan;c++)
		sample[c] = (T)17.3;
}


// run an outlet for some time (optionally with sporadic interruptions in between)
void run_outlet(const double duration_=0.0, const string name_=string(), const string type_=string(), const int numchan_=0, const lsl::channel_format_t fmt_=lsl::cf_undefined, const double srate_=0.0, const double seconds_between_failures_=0.0, const int chunk_len_=0) {
	num_outlets.fetch_add(1);
	std::ostringstream s; s << lslboost::this_thread::get_id();
	srand((unsigned)lslboost::hash<string>()(s.str()));
	try {
		// choose random parameters if desired
		double duration = (duration_ == 0.0) ? 1.0+rand()%(max_outlet_duration-1) : duration_;
		string name = name_.empty() ? names[rand()%(sizeof(names)/sizeof(names[0]))] : name_;
		string type = type_.empty() ? types[rand()%(sizeof(types)/sizeof(types[0]))] : type_;
		int numchan = (numchan_ == 0) ? 1+(rand()%(max_channels-1)) : numchan_;
		double srate = (srate_ == 0.0) ? 1.0 + (rand()%(max_srate-1)) : srate_;
		lsl::channel_format_t fmt = (fmt_ == lsl::cf_undefined) ? fmts[rand()%6] : fmt_;
		double seconds_between_failures = (seconds_between_failures_ == 0.0) ? (inlet_min_failure_interval_ms+rand()%outlet_max_failure_interval_ms)/1000.0 : seconds_between_failures_;
		int chunk_len = (chunk_len_ == 0) ? std::max(min_chunk_len_ms,(rand()%max_chunk_len_ms)) : chunk_len_;

		// create a new streaminfo
		lsl::stream_info info(name,type,numchan,srate,fmt,lslboost::uuids::to_string(lslboost::uuids::random_generator()()));

		// initialize data to send
		vector<float> chunk;
		init_sample((int)(numchan*floor(chunk_len*srate/1000*max_chunk_oversize_factor)),chunk);

		// and run...
		for (double endtime = lsl::local_clock()+duration;lsl::local_clock()<endtime;) {
			// run a single execution of the outlet
			{
				cout << "new outlet(" << name << "," << type << "," << numchan << "," << fmt << "," << srate << ")...";
				lsl::stream_outlet outlet(info,0,max_buffered);
				cout << "done." << endl;
				// send in bursts
				double now, start_time = lsl::local_clock(), fail_at=start_time+seconds_between_failures;
				for (int target,diff,written=0;written<max_samples && !stop_outlet;written+=diff) {
					lslboost::this_thread::sleep(lslboost::posix_time::milliseconds(chunk_len));
					now = lsl::local_clock();
					if (now>fail_at)
						break;
					target = (int)floor((now-start_time)*srate);
					int num_elements = (int)std::min((std::size_t)((target-written)*numchan),chunk.size());
					if (num_elements)
						outlet.push_chunk_multiplexed(&chunk[0],num_elements);
					diff = num_elements/numchan;
				}
			}
			cout << "del outlet(" << name << "," << type << "," << numchan << "," << fmt << "," << srate << ")" << endl;
			// downtime
			lslboost::this_thread::sleep(lslboost::posix_time::millisec(100*(rand()%50)));
		}
	} catch(std::exception &e) {
		std::cerr << "ERROR during run_outlet() Stress-test function: " <<	e.what() << std::endl;
	}
	num_outlets.fetch_sub(1);
}

// run an inlet for some time (optionally with sporadic interruptions in between)
void run_inlet(const double duration_=0.0, const string name_=string(), const string type_=string(), const int in_chunks_=-1, const int request_info_=-1, const int request_time_=-1, const double seconds_between_failures_=0.0) {
	num_inlets.fetch_add(1);
	std::ostringstream s; s << lslboost::this_thread::get_id();
	srand((unsigned)lslboost::hash<string>()(s.str()));
	try {
		// choose random parameters if desired		
		double duration = (duration_ == 0.0) ? 1.0+rand()%(max_outlet_duration-1) : duration_;
		string name = name_.empty() ? names[rand()%(sizeof(names)/sizeof(names[0]))] : name_;
		string type = type_.empty() ? types[rand()%(sizeof(types)/sizeof(types[0]))] : type_;
		int request_info = (request_info_==-1) ? rand()%3 == 0 : request_info_;
		int request_time = (request_time_==-1) ? rand()%3 == 0 : request_time_;
		double seconds_between_failures = (seconds_between_failures_ == 0.0) ? (inlet_min_failure_interval_ms+rand()%outlet_max_failure_interval_ms)/1000.0 : seconds_between_failures_;

		// resolve by type...
		vector<lsl::stream_info> results = lsl::resolve_stream("type",type,1,5);
		if (results.empty())
			throw lsl::lost_error("No stream found.");
		lsl::stream_info result = results[rand()%results.size()];
		vector<float> chunk;		

		// and run...
		double t=0.0;
		for (double endtime = lsl::local_clock()+duration;lsl::local_clock()<endtime;) {
			// run a single execution of the inlet
			{
				cout << "new inlet(" << name << "," << type << ")...";
				lsl::stream_inlet inlet(result,max_buffered);
				cout << "done." << endl;
				int numchans = inlet.info().channel_count();
				init_sample(numchans*(int)ceil(max_chunk_len_ms*result.nominal_srate()/1000*max_chunk_oversize_factor),chunk);
				if (request_info) {
					cout << "  info = " << inlet.info(1.0).name() << endl;
				}
				for (double fail_at = lsl::local_clock()+seconds_between_failures;lsl::local_clock()<fail_at;) {
					lslboost::this_thread::sleep(lslboost::posix_time::milliseconds(1+rand()%max_inlet_poll_interval_ms));
					inlet.pull_chunk_multiplexed(&chunk[0],NULL,chunk.size(),0);
					if (request_time)
						t = inlet.time_correction(1.0);
				}
			}
			cout << "del inlet(" << name << "," << type << ")" << endl;
			if (request_time)
				cout << "  tcorr = " << t << endl;
			// downtime
			lslboost::this_thread::sleep(lslboost::posix_time::millisec(100*(rand()%50)));
		}
	} 
	catch(lsl::timeout_error &) { std::cerr << "Timeout exceeded; stopping inlet." << std::endl; }
	catch(lsl::lost_error &) { std::cerr << "Found no matching outlet; stopping inlet." << std::endl; }
	catch(std::exception &e) {
		std::cerr << "ERROR during run_inlet() Stress-test function: " <<	e.what() << std::endl;
	}
	num_inlets.fetch_sub(1);
}

void random_inlets(double spawn_every=0.0, double duration=0.0, string name=string(), string type=string(), int in_chunks=-1, int request_info=-1, int request_time=-1, double seconds_between_failures=0.0) {
	if (spawn_every == 0.0)
		spawn_every = spawn_inlet_interval;
	while(true) {
        try {
			if (num_inlets.load() < max_outlets)
				lslboost::thread tmp(&run_inlet,duration,name,type,in_chunks,request_info,request_time,seconds_between_failures);
		} catch(std::exception &e) {
			std::cerr << "Could not spawn a new inlet thread: " << e.what() << std::endl;
        }		
		lslboost::this_thread::sleep(lslboost::posix_time::millisec((lslboost::int64_t)(1000*spawn_every)));
	}
}

void random_outlets(double spawn_every=0.0, double duration=0.0, string name=string(), string type=string(), int numchan=0, lsl::channel_format_t fmt=lsl::cf_undefined, double srate=0.0, double seconds_between_failures=0.0, int chunk_len=0) {
	if (spawn_every == 0.0)
		spawn_every = spawn_outlet_interval;
	while(true) {
        try {
			if (num_outlets.load() < max_inlets)
				lslboost::thread tmp(&run_outlet,duration,name,type,numchan,fmt,srate,seconds_between_failures,chunk_len);
		} catch(std::exception &e) {
			std::cerr << "Could not spawn a new outlet thread: " << e.what() << std::endl;
        }		
		lslboost::this_thread::sleep(lslboost::posix_time::millisec((lslboost::int64_t)(1000*spawn_every)));
	}
}

int main(int argc, char* argv[]) {
	srand((unsigned)time(NULL));
	cout << "This stress test program puts heavy load on network equipment," << endl;
	cout << "particularly when multiple instances run on the same network." << endl;
	cout << "We recommend to not run this software on a corporate or campus" << endl;
	cout << "network since it generates erratic heavy traffic that can " << endl;
	cout << "alert network operators and/or may crash unreliable equipment." << endl << endl; 
	cout << "Are you sure you want to continue? [y/n]" << endl;
	if ((argc>1 && string(argv[1]) == "-f") || tolower(cin.get()) == 'y') {
		lslboost::thread outlets(&random_outlets,0.0,0.0,string(),string(),0,lsl::cf_undefined,0.0,0.0,0);
		lslboost::thread inlets(&random_inlets,0.0,0.0,string(),string(),-1,-1,-1,0.0);
		cout << "Press ENTER to exit. " << endl; cin.get();cin.get();
	}
	return 0;
}