1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
|
%%% TODO: need to figure out this 'inet6' setting in the tcp options:
%%% It (and *not* 'inet') seems to be necessary when I run on Mac.
%%% Things work with neither setting on Linux and Windows -- not sure
%%% if things work there with the setting though.
%%% When the manager receives a command to install a munger, it can
%%% only affect an existing connection, because the way it works is to
%%% send the info to the registered path_mgr processes. It doesn't
%%% save up a record of active mungers to bequeath to any connections
%%% that become established after that.
%%%
%%% I think that's a shortcoming that should be fixed.
%%%
%%% We might also need a way to remove a munger, although not
%%% currently.
%%% Hmm, is there any reason why the various path pairs need be
%%% related to each other? Would it make sense to start [{7001,6001},
%%% {7000,6000}] first, and then later add {7002,6002} to the chaos?
%%% I think that might be fine, but only if the site at 6002 truly
%%% isn't running yet, because if it is, it could send a message that
%%% would need munging. Similarly, it's important that no running
%%% site have that in its configuration yet, even if the new site
%%% isn't running yet.
%%%
%%% It's not needed yet, but I could imagine a need to target a
%%% command to an as yet unestablished connection/path. Perhaps it
%%% could be directed to 'new'(?), or {6001,6000,new} (?). Somewhere
%%% we'd need to hold onto that list, and give it (or some part of it)
%%% to new connections as they're getting established. The point is,
%%% if you wait until after the connection is established and the
%%% sites have talked to each other, it might be too late to do what
%%% you need to do.
%%% Need to be able to:
%%% 2. remove previously installed munges (shall we give each one a
%%% serial number or something?)
%%% 3. sometimes it's a little more complicated, and we need a way
%%% to send a message or signal to an existing munge and/or even
%%% the recv/send-er (in the case of socket congestion blockage,
%%% when we want to free it up again).
%%% An interesting feature, that could be useful, maybe even
%%% essential: we could keep a record of all connections, even after
%%% they've been closed, for the purpose of analysis by the test.
%%% Obviously they would be keyed by site-pair (as expressed by port
%%% numbers, similarly to how we addressing path-specific munger
%%% commands). But they could also include establishment timestamp,
%%% so that a test could distinguish between possibly multiple paths
%%% between the same endpoints.
%%%
%%% With that, a command could request that a path count various
%%% statistics about the messages that pass through (e.g., did we see
%%% any heartbeats on this connection? We shouldn't, if we're in
%%% mixed-version mode. Although I suppose an old site would simply
%%% crash if we made that mistake.) A later command could then come
%%% back and request these statistics.
%% Also, it may be that the forward port isn't listening yet, in which
%% case we could get econnrefused. Again, it's not terribly
%% disastrous to just let the error report close the connection. But
%% we should handle it. Besides, better than closing the initiating
%% connection might be to not even start listening until we see that
%% our forward port is listening. That's more difficult, and could
%% conceivably bother the target site, but it's more realistic in
%% terms of what the connecting site should see. Hmm, hard to know
%% which way is better, and whether it's even worth worrying about.
%%
%% Well, here's another thing to think about: it probably doesn't work
%% merely to avoid listening in the beginning, because the site could
%% die in the middle. We certainly want ultimately to be able to test
%% those kinds of situations. Hmm, at least for now, perhaps it's all
%% right just to close the incoming connection when we can't make the
%% outgoing connection, because repmgr doesn't really make much
%% difference between an EOF and an error. But someday it might make
%% a difference.
%%
%% Test functions:
%% - stop reading (to block progress)
%% - discard everything (heartbeats)
%% - discard acks
%% - delay acks
%%
%% Each pair of sites (each link we could care about controlling) has
%% two possible ways it might get set up (in the most general case,
%% though often it's easy enough to control it more strictly).
%%
%% A munge function can be specified as applying only to a specified
%% path, or to all paths. For example,
%%
%% {{6000,6001},page_clog}
%%
%% says to apply the page_clog function to the path going from the
%% site listening on 6000 to the site listening on 6001 (both expressed
%% as real port numbers, not the spoofed port numbers).
%%
%%
%% There's another, rather different way of looking at how this gets
%% configured: instead of each site having one fiddler "sheilding" its
%% incoming connections, you could have fiddlers at just one site,
%% completely "wrapping" it, so that it takes not only all incoming
%% connections, but outgoing connections as well. Consequences: (1)
%% it really focuses on that one site, making it the site under test
%% -- all the others are just going along for the ride. You could
%% even imagine making them fake. (2) Other sites talk directly to
%% each other, so we have no control over traffic between them.
%%
%% Site A:
%% local: 6000
%% Site B:
%% local: 6001
%% remote: 7000
%% Site C:
%% local: 6002
%% remote: 6000
%% [{7001,6001},{7000,6000},{7002,6002}
%%
%% However, this might be confusing, and is probably counter to some
%% of the higher-level assumptions I've made in setting up
%% transformations.
-module(fiddler).
-export([start/1, start/2, main/1, do_accept/2, slave/2]).
-import(lists,[member/2]).
-import(gen_tcp,[listen/2,accept/1,connect/3,send/2]).
-define(MANAGER_PORT, 8000).
-include("rep_literals.hrl").
%% Config is a list of {spoofed,real} port numbers. (For now
%% everything's on localhost.)
%%
%% TODO: shouldn't we use records for those tuples?
start(Config) ->
start(?MANAGER_PORT, Config).
%% For each pair specified in the config, spawn off a listener to
%% spoof the given pair.
%%
start(MgrPort, Pairs) ->
optstore:start_link(),
registry:start(),
manager:start(MgrPort, Pairs),
lists:foreach(fun (Pair) -> spawn(fiddler, main, [Pair]) end, Pairs).
main(Ports) ->
{Spoofed, Real} = Ports,
{ok, LSock} = listen(Spoofed,
[binary, inet, inet6, {packet,raw},
{active, false}, {reuseaddr, true}]),
do_accept(LSock, Real).
%% It may seem more straightforward conceptually to spawn a new
%% process to take care of each incoming connection, handing off the
%% socket to the new process, and just have the initial listen process
%% loop back onto the next accept() call. But instead we "chain" to a
%% new process to continue listening for new connections, and allow
%% this process to proceed to handling socket we've just gotten. The
%% reason this is convenient is that this process is the "controlling
%% process" of the socket, which means that the socket will
%% automatically get closed when the process terminates.
%%
do_accept(LSock, RealPort) ->
{ok, Sock} = accept(LSock),
Pid = spawn(fiddler, do_accept, [LSock, RealPort]),
gen_tcp:controlling_process(LSock, Pid),
slave(Sock, RealPort).
slave(Sock, RealPort) ->
{ok, TargetSock} = connect("localhost", RealPort,
[binary, inet, inet6, {packet, raw}, {active, false}]),
%% Conduct the initial handshake exchange synchronously, so that
%% we can easily get the originator's (peer's) port without
%% programming a FSM:
%%
%% (1) pass through the originator's version proposal
send_msg(TargetSock, reader:read(Sock)),
%% (2) pass through our local site's version confirmation
send_msg(Sock, reader:read(TargetSock)),
%% (3) get the final Parameters handshake, and look there for the
%% peer port before forwarding it
ThirdMsg = reader:read(Sock),
Opts = case ThirdMsg of
{?HANDSHAKE,_,_,Control,_} ->
<<PeerPort:16, _/binary>> = Control,
Path = {RealPort, PeerPort},
registry:register(Path),
initial_opts(Path);
_ ->
[] % could be JOIN_REQUEST
end,
R1 = reader:start(Sock),
R2 = reader:start(TargetSock),
send_msg(TargetSock, ThirdMsg),
reader:prime(R1),
reader:prime(R2),
loop(Opts, [{R1,Sock},{R2,TargetSock}]).
%% This is a bit confusing, because there are really 3 port numbers
%% we're interested in. There's (1) the real port that our local site
%% (the one we're fronting) is actually listening on; (2) the
%% fake/spoof port that *we're* listening on, on our local site's
%% behalf, to which any remote sites are going to be redirected to
%% connect to; and (3) the (nominal/real/configured) port of any
%% remote site that actually does connect to us, which we discover by
%% spying on the handshake message. The only reason we even care
%% about (3) at all is because it's convenient if we make ourselves
%% known by a "path" 2-tuple that includes it -- i.e., convenient for
%% tests that are using this thing, and want to refer to paths using
%% each site's "real" port numbers.
%%
%% (However, once we get down this far, having already (1) checked for
%% "initial options" and (2) registered ourselves for
%% later/dynamically added options, I suspect we don't even care about
%% *any* of these port values. All we care about is readers and
%% sockets.
loop(Opts0, Readers) ->
receive
{msg,Rdr,Msg} ->
Opts = check_toss(check_wedge(Opts0, Msg)),
Tossing = member(toss_all, Opts),
Wedged = member(wedged, Opts),
if
Wedged ->
ok;
Tossing ->
reader:prime(Rdr);
true ->
send_msg(other_socket(Readers,Rdr), Msg),
reader:prime(Rdr)
end,
loop(Opts, Readers);
{closed,Rdr} ->
Opts = check_toss(Opts0),
Tossing = member(toss_all, Opts),
if Tossing ->
{value, {_,S}, Readers2} = lists:keytake(Rdr,1,Readers),
gen_tcp:close(S),
loop(Opts, Readers2);
true ->
shutdown(Readers)
end;
shutdown ->
shutdown(Readers)
end.
shutdown(Readers) ->
lists:foreach(fun ({_R,S}) ->
gen_tcp:close(S)
end, Readers).
%% Hmm, maybe what we should really do is filter out the matching list
%% element, and then take from what's left. Might be more code, but
%% more elegant?
%%
other_socket([{Rdr,_},{_,S}], Rdr) ->
S;
other_socket([{_,S},{Rdr,_}], Rdr) ->
S;
other_socket(_,_) ->
nil.
initial_opts(Path) ->
optstore:lookup(Path).
%% If we don't already have 'toss' on our list of processing options,
%% then check to see if we're now being instructed to add it, if so
%% then do so.
%%
check_toss(Opts) ->
Tossing = member(toss, Opts),
if
Tossing ->
Opts;
true ->
receive
toss_all ->
[ toss_all | Opts ]
after 0 ->
Opts
end
end.
check_wedge(Opts, Msg) ->
Wedgeable = member(page_clog, Opts),
Wedging = member(wedged, Opts),
if
Wedgeable andalso not Wedging ->
case Msg of
{?REP_MESSAGE,_,_,Control,_Rec} ->
<<_:4/unit:32, Type:32/big, _/binary>> = Control,
if
Type == ?PAGE ->
[ wedged | Opts ];
true ->
Opts
end;
_ ->
Opts
end;
true ->
Opts
end.
send_msg(Sock, {MsgType, ControlLength, RecLength, Control, Rec}) ->
Header = <<MsgType:8, ControlLength:32/big, RecLength:32/big>>,
send(Sock, Header),
send_piece(Sock, Control),
send_piece(Sock, Rec);
send_msg(Sock, {MsgType, Length, Other, Data}) ->
Header = <<MsgType:8, Length:32/big, Other:32/big>>,
send(Sock, Header),
send_piece(Sock, Data);
send_msg(Sock, {MsgType, Length, Other}) ->
Header = <<MsgType:8, Length:32/big, Other:32/big>>,
send(Sock, Header);
send_msg(_Sock, nil) ->
ok.
send_piece(_Sock, nil) ->
ok;
send_piece(Sock, Piece) ->
send(Sock, Piece).
|