File: reconserver.ml

package info (click to toggle)
sks 1.1.6-14
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 2,296 kB
  • sloc: ml: 15,228; ansic: 1,069; sh: 358; makefile: 347
file content (405 lines) | stat: -rw-r--r-- 14,299 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
(***********************************************************************)
(* reconserver.ml - Executable: server process that handles            *)
(*                  reconciliation                                     *)
(*                                                                     *)
(* Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, *)
(*               2011, 2012, 2013  Yaron Minsky and Contributors       *)
(*                                                                     *)
(* This file is part of SKS.  SKS is free software; you can            *)
(* redistribute it and/or modify it under the terms of the GNU General *)
(* Public License as published by the Free Software Foundation; either *)
(* version 2 of the License, or (at your option) any later version.    *)
(*                                                                     *)
(* This program is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of          *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU   *)
(* General Public License for more details.                            *)
(*                                                                     *)
(* You should have received a copy of the GNU General Public License   *)
(* along with this program; if not, write to the Free Software         *)
(* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 *)
(* USA or see <http://www.gnu.org/licenses/>.                          *)
(***********************************************************************)

module F(M:sig end) =
struct
  open StdLabels
  open MoreLabels
  open Printf
  open Common
  open Packet
  open DbMessages
  module Unix = UnixLabels
  module PTree = PrefixTree
  module Map = PMap.Map
  module ZSet = ZZp.Set

  open RecoverList
  open PTreeDB
  open Catchup

  let settings = {
    mbar = !Settings.mbar;
    bitquantum = !Settings.bitquantum;
    treetype = (if !Settings.transactions
                then `transactional
                else if !Settings.disk_ptree
                then `ondisk else `inmem);
    max_nodes = !Settings.max_ptree_nodes;
    dbdir = Lazy.force Settings.ptree_dbdir;
    cache_bytes = !Settings.ptree_cache_bytes;
    pagesize = !Settings.ptree_pagesize;
  }

  (******************************************************************)

  let reconsocks =
    List.rev_map ~f:Eventloop.maybe_create_sock (make_addr_list recon_address recon_port)
  let reconsocks =
    List.fold_right ~init:[]
      ~f:(function
	   | Some sock -> fun acc -> sock :: acc
	   | None -> fun acc -> acc)
      reconsocks
  let () =
    if reconsocks = [] then
      failwith "Could not listen on any address."



  let () =
    if Sys.file_exists recon_command_name
    then Unix.unlink recon_command_name
  let comsock = Eventloop.create_sock recon_command_addr

  let filters = ref None
  let get_filters () = match !filters with
      None -> failwith "No filters retrieved"
    | Some filters -> filters


  (***************************************************************)
  (*  Handlers  *************************************************)
  (***************************************************************)

  let eventify_handler handle =
    (fun addr cin cout ->
       let cin = (new Channel.sys_in_channel cin)
       and cout = (new Channel.sys_out_channel cout) in
       handle addr cin cout
    )

  let choose_partner () =
    try
      let addrlist = Membership.choose () in
      (* Only return usable addresses *)
      let is_compatible addr =
        try
          ignore (match_client_recon_addr addr.Unix.ai_addr);
          true
        with Not_found -> false
      in
      let addrlist = List.filter ~f:is_compatible addrlist in
      List.nth addrlist (Random.int (List.length addrlist))
    with
        Not_found | Invalid_argument _ ->
          failwith "No gossip partners available"

  let missing_keys_timeout = !Settings.missing_keys_timeout

  (******************************************************************)

  let rec get_missing_keys () =
    let name = "get missing keys" in
    let timeout = missing_keys_timeout in
    try

      ( try
          let (hashes,httpaddr) = Queue.pop recover_list in
          plerror 3
            "Requesting %d missing keys from %s, starting with %s"
            (List.length hashes) (sockaddr_to_string httpaddr)
            (match hashes with
                 [] -> "<nohash>"
               | hash::tl -> KeyHash.hexify hash
            );

          let keystrings = ReconComm.get_keystrings_via_http httpaddr hashes in
          plerror 3 "%d keys received" (List.length keystrings);
          let ack = ReconComm.send_dbmsg (KeyStrings keystrings) in
          if ack <> Ack 0
          then failwith ("Reconserver.get_missing_keys: " ^
                         "Unexpected reply to KeyStrings message");
          let now = Unix.gettimeofday () in
          [
            Eventloop.Event
             (now,
              Eventloop.make_tc
                ~name:"get_missing_keys.catchup"
                ~timeout:max_int
                ~cb:Catchup.catchup);

            Eventloop.Event
              (Ehandlers.float_incr now,
               Eventloop.make_tc ~name ~timeout
                 ~cb:get_missing_keys; );
          ]
        with
          | Queue.Empty -> enable_gossip (); []
          | Eventloop.SigAlarm as e -> raise e
          | e ->
              Eventloop.reraise e;
              eperror e "Error getting missing keys";
              [Eventloop.Event (Unix.gettimeofday (),
                                Eventloop.make_tc ~cb:get_missing_keys
                                  ~timeout ~name)
              ]

      )
    with
      | Eventloop.SigAlarm ->
          plerror 2 "get_missing_keys terminated by timeout";
          (* If we time out, just schedule the next one *)
          [Eventloop.Event (Unix.gettimeofday (),
                            Eventloop.make_tc ~cb:get_missing_keys ~timeout ~name; ) ]

  (******************************************************************)

  (** convert a sockaddr to a string suitable for including in a file name *)
  let sockaddr_to_name sockaddr = match sockaddr with
      Unix.ADDR_UNIX s -> sprintf "UNIX_%s" s
    | Unix.ADDR_INET (addr,p) -> sprintf "%s_%d" (Unix.string_of_inet_addr addr) p

  (******************************************************************)

  (** Handles incoming reconciliation *)
  let recon_handler addr cin cout =
    if gossip_disabled ()  then
      begin
        plerror 3
          "Reconciliation attempt from %s while gossip disabled. %s"
          (sockaddr_to_string addr) "Ignoring.";
        []
      end
    else if not (Membership.test addr) then
      begin
        plerror 1
          "Reconciliation attempt from unauthorized host %s.  Ignoring"
          (sockaddr_to_string addr) ;
        []
      end
    else
      begin
        plerror 4 "Beginning recon as server, client: %s"
          (sockaddr_to_string addr);
        let cin = (new Channel.sys_in_channel cin)
        and cout = (new Channel.sys_out_channel cout) in
        let filters = get_filters () in
        let (results,http_addr) =
          ReconCS.handle_connection (get_ptree ()) ~filters
            ~partner:addr cin cout
        in
        plerror 4 "Reconciliation complete";
        let elements = ZSet.elements results in
        let hashes = hashconvert elements in
        print_hashes (sockaddr_to_string http_addr) hashes;
        log_diffs (sprintf "/var/spool/sks/diff-%s.txt" (sockaddr_to_name http_addr)) hashes;
        if List.length elements > 0
        then
          begin
            update_recover_list elements http_addr;
            [Eventloop.Event (Unix.gettimeofday () +. 10.0,
                              Eventloop.make_tc ~cb:get_missing_keys
                                ~timeout:missing_keys_timeout
                                ~name:"get missing keys"
                             )]
          end
        else
          []
      end


  (******************************************************************)

  (** Initiates reconciliation as client *)
  let initiate_recon () =
    if gossip_disabled () then
      begin
        plerror 5 "Not gossiping because gossip is disabled";
        []
      end
    else
      begin
        let partner = choose_partner () in
        plerror 4 "Recon partner: %s" (sockaddr_to_string partner.Unix.ai_addr);
        let filters = get_filters () in
        let (results,http_addr) =
          ReconCS.connect (get_ptree ()) ~filters ~partner
        in
        let results = ZSet.elements results in
        plerror 4 "Reconciliation complete";
        let hashes = hashconvert results in
        print_hashes (sockaddr_to_string http_addr) hashes;
        log_diffs (sprintf "/var/spool/sks/diff-%s.txt" (sockaddr_to_name http_addr)) hashes;
        match results with
            [] -> []
          | _ ->
              update_recover_list results http_addr;
              [Eventloop.Event (Unix.gettimeofday (),
                                Eventloop.make_tc ~cb:get_missing_keys
                                  ~timeout:missing_keys_timeout
                                  ~name:"get missing keys"
                               )]
      end


  (******************************************************************)

  let command_handler addr cin cout =
    match (unmarshal cin).msg with

      | Synchronize ->
          marshal cout (Ack 0);
          plerror 2 "Initiating recon due to explicit request";
          initiate_recon ()

      | RandomDrop n ->
          marshal cout (Ack 0);
          for i = 1 to n do
            try
              let hash = PTree.get_random (get_ptree ())
                           (PTree.root (get_ptree ())) in
              let hash = RMisc.truncate hash KeyHash.hash_bytes in
              plerror 3 "Requesting deletion %s" (Utils.hexstring hash);
              ignore (ReconComm.send_dbmsg (DeleteKey hash))
            with
                Not_found ->
                  failwith "Attempted to delete element from empty prefix tree"
              | e ->
                  Eventloop.reraise e;
                  eplerror 3 e "Attempt to delete key failed"
          done;
          []

      | HashRequest hashes ->
          let keyresp = (ReconComm.send_dbmsg (HashRequest hashes)) in
          assert (match keyresp with Keys _ -> true | _ -> false);
          marshal cout keyresp;
          []

      | Config (s,cvar) ->
          plerror 4 "Received config message";
          (match (s,cvar) with
               ("maxnodes",`int x) ->
                 plerror 3 "Setting maxnodes to %d" x;
                 let txn = new_txnopt () in
                 (try
                    PTree.set_maxnodes (get_ptree ()) txn x;
                    PTree.clean txn (get_ptree ());
                    commit_txnopt txn
                  with
                      e ->
                        eplerror 1 e "set_maxnodes Transaction aborting";
                        abort_txnopt txn)
             | _ ->
                 failwith "Unexpected config request"
          );
          []

      | m ->
          marshal cout ProtocolError;
          perror "Unexpected message: %s" (msg_to_string m);
          []

  (***************************************************************)

  let sync_interval = !Settings.recon_sync_interval
  let sync_tree () =
    perror "Syncing prefix tree";
    let txn = new_txnopt () in
    try
      PTree.clean txn (get_ptree ());
      commit_txnopt txn
    with
        e ->
          eplerror 1 e "sync_tree transaction aborting";
          abort_txnopt txn;
          raise e


  let checkpoint_interval = !Settings.recon_checkpoint_interval

  (***************************************************************)

  let () = Sys.set_signal Sys.sigusr1 Sys.Signal_ignore
  let () = Sys.set_signal Sys.sigusr2 Sys.Signal_ignore

  (***********************************************************************)

  let prepare () =
    set_logfile "recon";
    plerror 1 "sks_recon, SKS version %s%s"  version version_suffix;
    plerror 0 "Using BerkelyDB version %s" (Bdb.version(););
    plerror 1 "Copyright Yaron Minsky 2002-2013";
    plerror 1 "Licensed under GPL.  See LICENSE file for details";
    plerror 5 "recon port: %d" recon_port;

    init_db settings;
    init_ptree settings


  let run () =
    prepare ();
    plerror 4 "Initiating catchup";
    uninterruptable_catchup ();
    (* do initial catchup to ensure reconciliation data
       is synchronized with key database *)
    plerror 4 "Fetching filters";
    filters := Some (ReconComm.fetch_filters ());
    plerror 4 "Starting event loop";
    Eventloop.evloop
      ( [ Eventloop.Event (0.0, Eventloop.Callback catchup) ]
        @ (Ehandlers.repeat_forever_simple catchup_interval catchup)
        @ (if !Settings.gossip
           then Ehandlers.repeat_forever
             ~jitter:0.1 (* 10% randomness in delay interval *)
             !Settings.gossip_interval
             (Eventloop.make_tc
                ~cb:initiate_recon
                ~name:"recon as client"
                ~timeout:!Settings.reconciliation_config_timeout
             )
           else [] )
        @ (match settings.treetype with
             | `transactional ->
                 Ehandlers.repeat_forever_simple checkpoint_interval checkpoint
             | `ondisk -> Ehandlers.repeat_forever_simple
                 sync_interval sync_tree
             | `inmem -> []
          )
      )

      ( (comsock, Eventloop.make_th
           ~name:"command handler"
           ~cb:(eventify_handler command_handler)
           ~timeout:!Settings.command_timeout
        )
       ::
        (List.map ~f:(fun sock ->
          (sock, Eventloop.make_th
             ~name:"reconciliation handler"
             ~cb:recon_handler
             ~timeout:!Settings.reconciliation_config_timeout))
           reconsocks))


  (******************************************************************)

  let run () =
    protect ~f:run
      ~finally:(fun () ->
                  closedb ();
                  plerror 2 "DB closed"
               )

end