1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224
|
(* $Id$
* ----------------------------------------------------------------------
*
*)
(* TODO:
* - Improve log messages
* - Keep qlist backup
* - Authentication
* Rpc_server: limit message size. Find out peer IP address, port number.
* - Auth protocol: Password authentication
* - Put all config options into config section.
*)
let spooldir = "/tmp/qserver";;
let max_chunksize = Int32.of_string "1048576" ;; (* 1 MB *)
open Netnumber
open Netxdr
open Rpc
open Rpc_server
module A = Queues_aux ;;
module S1 = Queues_srv.QUEUESPROG.QUEUESVERS1 ;;
type error =
[ `not_found
| `picked
| `exists
| `not_picked
| `chunks_too_large
| `timeout
| `empty
| `queue_deleted
| `full
| `bad_name
| `bad_value
| `inactive
| `permission_denied
| `sys_error
]
exception Error of error
let log message =
(* Output [message] to the log medium. This is currently stderr. *)
prerr_endline message
;;
let check_name name =
(* Checks that [name] is a proper name. Raises Error if not. *)
for i = 0 to String.length name - 1 do
match name.[i] with
'A'..'Z' -> ()
| 'a'..'z' -> ()
| '0'..'9' -> ()
| '_' -> ()
| _ -> raise(Error `bad_name)
done
;;
let catch_error f reply =
try (f reply : unit)
with
Error code ->
(* This match...with is only to help the typechecker inferring the
* right type
*)
(match code with
#error as c -> reply c
)
| Sys_error e ->
log ("Sys_error: " ^ e);
reply `sys_error
| Unix.Unix_error(code,msg,fn) ->
log ("Unix_error: " ^ Unix.error_message code ^
(if msg <> "" then ": " ^ msg else "") ^
(if fn <> "" then " [" ^ fn ^ "]" else ""));
reply `sys_error
;;
let protected_call f what =
try
f what
with
err ->
log("Exception in deferred function: " ^
Printexc.to_string err);
false (* remove from [ongrow], [onshrink] *)
;;
let with_out_file file fn =
try
let r = fn file in
close_out file;
r
with
err -> close_out file; raise err
;;
let with_in_file file fn =
try
let r = fn file in
close_in file;
r
with
err -> close_in file; raise err
;;
let directory_list name =
let l = ref [] in
let dir = Unix.opendir name in
try
while true do
match Unix.readdir dir with
"." | ".." -> ()
| name ->
l := name :: !l
done; assert false
with
End_of_file ->
Unix.closedir dir;
List.rev !l
| err ->
Unix.closedir dir;
raise err
;;
let t_queue = Netxdr.validate_xdr_type A.xdrt_queue;;
let t_entry = Netxdr.validate_xdr_type A.xdrt_entry;;
type managed_entry =
{ qentry : A.entry;
mutable is_picked : bool;
mutable pick_connection : Rpc_server.connection_id option;
mutable is_uploading : bool;
mutable upload_connection : Rpc_server.connection_id option;
mutable current_downloads : string list; (* dhandles *)
mutable current_upload : string option; (* uhandle *)
}
type managed_queue =
{ mutable qentries : managed_entry list;
mutable next_eid : int;
mutable qinfo : A.queue;
mutable deleted : bool;
mutable onshrink : (float * (bool -> bool)) list;
(* These functions are called in order when the queue shrinks.
* If the functions return 'false' they are removed
* from the [ongrow] list.
* The float is the absolute time when the function should be
* called anyway (timeout). The argument passed to the called
* function indicates whether the reason of the call is a timeout (true)
* or a shrinking queue (false).
*)
mutable ongrow : (float * (bool -> bool)) list;
(* These functions are called in order when an entry is added to
* a queue. If the functions return 'false' they are removed
* from the [ongrow] list.
* The float is the absolute time when the function should be
* called anyway (timeout). The argument passed to the called
* function indicates whether the reason of the call is a timeout (true)
* or a growing queue (false).
*)
}
type managed_download =
{ mutable downfile : in_channel;
mutable downsize : int;
mutable downchunksize : int;
mutable downserial : int64;
downentry : managed_entry;
downconnection : connection_id;
}
type managed_upload =
{ mutable upfile : out_channel;
mutable upserial : int64;
upentry : managed_entry;
upconnection : connection_id;
}
let queues = (Hashtbl.create 100 : (string, managed_queue) Hashtbl.t) ;;
(* Cached queues *)
let next_dhandle = ref 0;;
let next_uhandle = ref 0;;
let dhandles = (Hashtbl.create 100 : (string, managed_download) Hashtbl.t) ;;
let uhandles = (Hashtbl.create 100 : (string, managed_upload) Hashtbl.t) ;;
let instance_prefix
= Digest.string (string_of_float (Unix.gettimeofday())) ;;
(* A prefix that is prepended before the dhandle and uhandle numbers in
* the protocol. The instance_prefix distinguishes between instances of
* qserver.
*)
let queue_accepts_entries mq =
let maxl = mq.qinfo.A.qparams.A.qmaxlen in
mq.qinfo.A.qparams.A.qaccepting &&
( maxl < Int32.zero ||
let l = List.length mq.qentries in
(Int32.of_int l) < maxl
)
;;
let queue_delivers_entries mq =
mq.qinfo.A.qparams.A.qdelivering &&
( (* At least one unpicked entry needed: *)
List.exists
(fun e -> not e.is_picked && not e.is_uploading)
mq.qentries
)
;;
let load_managed_queue qid =
(* Reads the "qlist" file and the "e*" files containing the meta data
* of the entries
*)
let queuedir = Filename.concat spooldir qid in
let infofile = Filename.concat queuedir "qinfo" in
let listfile = Filename.concat queuedir "qlist" in
if not (Sys.file_exists infofile) then raise (Error `not_found);
let qinfo =
with_in_file (open_in_bin infofile)
(fun f ->
let l = in_channel_length f in
let buf = String.create l in
really_input f buf 0 l;
A._to_queue (Netxdr.unpack_xdr_value ~fast:true buf t_queue [])
)
in
let eid_list =
with_in_file (open_in listfile)
(fun f ->
let l = ref [] in
try
while true do
let line = input_line f in
l := (int_of_string line) :: !l;
done;
assert false
with
End_of_file -> List.rev !l
)
in
let next_eid =
(List.fold_left max 0 eid_list) + 1 in
let mentry_list =
List.map
(fun eid ->
let qentry_file =
Filename.concat queuedir ("e" ^ string_of_int eid) in
let v =
with_in_file (open_in_bin qentry_file)
(fun f ->
let l = in_channel_length f in
let buf = String.create l in
really_input f buf 0 l;
buf
)
in
let e =
A._to_entry (Netxdr.unpack_xdr_value ~fast:true v t_entry []) in
{ qentry = e;
is_picked = false;
pick_connection = None;
is_uploading = false;
upload_connection = None;
current_downloads = [];
current_upload = None
}
)
eid_list
in
let mqueue =
{ qentries = mentry_list;
next_eid = next_eid;
deleted = false;
qinfo = qinfo;
onshrink = [];
ongrow = [];
}
in
mqueue
;;
let store_managed_queue qid mq =
(* Writes the "qlist" file. Does not write anything else. *)
let queuedir = Filename.concat spooldir qid in
let infofile = Filename.concat queuedir "qinfo" in
let listfile = Filename.concat queuedir "qlist" in
if not (Sys.file_exists infofile) then raise (Error `not_found);
(* Update qmodified: *)
mq.qinfo.A.qmodified <- Int64.of_float (Unix.time());
with_out_file (open_out_bin infofile)
(fun f ->
let v = A._of_queue mq.qinfo in
Netxdr.pack_xdr_value v t_queue [] (output_string f);
);
with_out_file (open_out listfile)
(fun f ->
List.iter
(fun me ->
(* Write now the "qlist" file. Don't include entries currently
* being uploaded because these are considered as non-persistent
* until the upload finishes.
* Picked entries are considered as still persistent, so include
* them.
*)
if not me.is_uploading then begin
output_string f me.qentry.A.eid;
output_string f "\n";
end
)
mq.qentries;
)
;;
let lookup_managed_queue qid =
(* Checks whether the queue is cached. If yes, the cached queue record
* is returned. Otherwise, the queue is loaded and added to the cache.
*)
try
Hashtbl.find queues qid
with
Not_found ->
let mq = load_managed_queue qid in
Hashtbl.add queues qid mq;
(* Note: There are not any picked and uploaded entries, so reset: *)
mq.qinfo.A.qpicked <- Int32.zero;
mq.qinfo.A.quploads <- Int32.zero;
(* Check the length: *)
let l = Int32.of_int (List.length mq.qentries) in
if l <> mq.qinfo.A.qlength then begin
log "Load error: The qlength field is not correct. (fixed)";
mq.qinfo.A.qlength <- l;
end;
mq
;;
let proc_ping session () reply =
reply()
;;
let proc_create_queue session queuename =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: queuename must only consist of alphanumeric characters,
* including underscore
*)
check_name queuename;
let queuedir = Filename.concat spooldir queuename in
let infofile = Filename.concat queuedir "qinfo" in
let listfile = Filename.concat queuedir "qlist" in
(* Check if the queue already exists: *)
if Sys.file_exists infofile then raise(Error `exists);
(* Create the directory [queuedir] if necessary: *)
if not (Sys.file_exists queuedir) then
Unix.mkdir queuedir 0o777;
(* Create qlist: *)
with_out_file (open_out listfile)
(fun f -> () );
(* Create qinfo: *)
let now = Int64.of_float (Unix.time()) in
let qinfo = { A.qid = queuename;
A.qname = queuename;
A.qowner = user;
A.qcreation = now;
A.qmodified = now;
A.qlength = Int32.zero;
A.qpicked = Int32.zero;
A.quploads = Int32.zero;
A.qparams = { A.qmaxlen = Int32.minus_one;
A.qactive = false;
A.qaccepting = true;
A.qdelivering = true;
}
} in
with_out_file (open_out_bin infofile)
(fun f ->
let v = A._of_queue qinfo in
Netxdr.pack_xdr_value v t_queue [] (output_string f);
);
reply `successful
)
;;
let proc_delete_queue session queuename =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: queuename must only consist of alphanumeric characters,
* including underscore
*)
check_name queuename;
let queuedir = Filename.concat spooldir queuename in
(* Remove the files: *)
let files = directory_list queuedir in
List.iter
(fun f -> let path = Filename.concat queuedir f in Sys.remove path)
files;
(* Remove the directory. It is ok if this fails (no permission) *)
( try Unix.rmdir queuedir with _ -> ());
(* Is there a managed_queue? *)
begin try
let mq = Hashtbl.find queues queuename in (* or Not_found *)
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
Hashtbl.remove queues queuename;
mq.deleted <- true;
(* Remove all upload/download handles: *)
List.iter
(fun me ->
List.iter
(Hashtbl.remove dhandles)
me.current_downloads;
( match me.current_upload with
None -> ()
| Some uh -> Hashtbl.remove uhandles uh
);
)
mq.qentries;
(* Tell the [ongrow] and [onshrink] handlers that something
* errorneous happened
*)
List.iter
(fun (_,f) -> ignore(protected_call f true)) mq.ongrow;
List.iter
(fun (_,f) -> ignore(protected_call f true)) mq.onshrink;
mq.ongrow <- [];
mq.onshrink <- [];
with
Not_found -> () (* no managed_queue *)
end;
reply `successful
)
;;
let proc_list_queues session () =
Printf.eprintf "LIST_QUEUES\n%!";
catch_error
(fun reply ->
(* Get all files in the spooldir: *)
let l = directory_list spooldir in
(* Keep only files that are actually directories containing "qinfo": *)
let l' =
List.filter
(fun name ->
Sys.file_exists
(Filename.concat
(Filename.concat spooldir name)
"qinfo")
)
l in
(* Read in the "qinfo" files, and decode them: *)
let qlist =
List.map
(fun name ->
let mq = lookup_managed_queue name in
mq.qinfo
)
l'
in
(* Pass the result back: *)
reply (`successful (Array.of_list qlist))
)
;;
let proc_get_queue session queuename =
catch_error
(fun reply ->
(* Sanity check: queuename must only consist of alphanumeric characters,
* including underscore
*)
check_name queuename;
let mq = lookup_managed_queue queuename in
reply (`successful mq.qinfo);
)
;;
let proc_set_queue session (queuename,params) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: queuename must only consist of alphanumeric characters,
* including underscore
*)
check_name queuename;
let mq = lookup_managed_queue queuename in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
if params.A.qactive then begin
(* Activate the queue/change limits: *)
let old_accepting = queue_accepts_entries mq in
let old_delivering = queue_delivers_entries mq in
mq.qinfo.A.qparams <- params;
let new_accepting = queue_accepts_entries mq in
let new_delivering = queue_delivers_entries mq in
store_managed_queue queuename mq;
(* NOTE: If the queue has previously been inactive, the [ongrow] and
* [onshrink] lists are empty. So the following statements do
* nothing.
*)
(* Maybe there is now space for growth: *)
if not old_delivering && new_delivering then
mq.ongrow <-
List.filter (fun (_,f) -> protected_call f false) mq.ongrow;
(* Maybe there is now an entry to pick: *)
if not old_accepting && new_accepting then
mq.onshrink <-
List.filter (fun (_,f) -> protected_call f false) mq.onshrink;
end
else begin
(* Deactivate the queue: *)
mq.qinfo.A.qparams <- params;
store_managed_queue queuename mq;
(* Make that everybody notices the inactive queue: *)
mq.ongrow <-
List.filter (fun (_,f) -> protected_call f true) mq.ongrow;
mq.onshrink <-
List.filter (fun (_,f) -> protected_call f true) mq.onshrink;
(* Make these lists empty: *)
if mq.ongrow <> [] then
log "Warning: After deactivation the ongrow list is not empty";
if mq.onshrink <> [] then
log "Warning: After deactivation the onshrink list is not empty";
mq.ongrow <- [];
mq.onshrink <- [];
end;
reply `successful
)
;;
let proc_list_queue_entries session qid =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
let visible =
List.filter
(fun qe ->
not qe.is_picked && not qe.is_uploading
)
mq.qentries
in
let visible' = List.map (fun qe -> qe.qentry) visible in
reply (`successful (Array.of_list visible'))
)
;;
let timeout_pick mq = (* raise the right exception *)
if mq.deleted then
raise (Error `queue_deleted)
else
if not mq.qinfo.A.qparams.A.qactive then
raise (Error `inactive)
else
if mq.qinfo.A.qparams.A.qdelivering then
raise (Error `empty)
else
raise (Error `timeout)
;;
let try_pick session mq reply0 timed_out =
let keep = ref true in
let reply r =
(* If [reply] is called, the function [try_pick] must return [false]
* to indicate the it is to be removed from the [ongrow] list
*)
keep := false; reply0 r
in
catch_error
(fun reply ->
if timed_out then timeout_pick mq;
if not mq.qinfo.A.qparams.A.qactive then raise(Error `inactive);
try
if not mq.qinfo.A.qparams.A.qdelivering then raise Not_found;
let first =
List.find
(fun qe -> not qe.is_picked && not qe.is_uploading)
mq.qentries
in (* or Not_found *)
reply (`successful first.qentry); (* may raise Connection_lost *)
first.is_picked <- true;
first.pick_connection <- Some(Rpc_server.get_connection_id session);
mq.qinfo.A.qpicked <- Int32.succ mq.qinfo.A.qpicked;
mq.qinfo.A.qlength <- Int32.pred mq.qinfo.A.qlength;
with
Not_found -> ()
(* Try again *)
)
reply;
!keep
let proc_pick_queue_entry session (qid,timeout) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
if try_pick session mq reply false then begin
(* [try_pick] was not successful. Add [try_pick] to the [ongrow]
* list
*)
if timeout = Int32.zero then timeout_pick mq;
let max_time =
if timeout >= Int32.zero then
Unix.time() +. Int32.to_float timeout
else
(-1.0)
in
mq.ongrow <- mq.ongrow @ [ max_time, try_pick session mq reply ];
end
)
;;
let stop_downloads mq entry =
List.iter
(fun dhandle ->
let dl =
try Hashtbl.find dhandles dhandle
with Not_found -> assert false
in
close_in dl.downfile;
Hashtbl.remove dhandles dhandle
)
entry.current_downloads;
mq.qinfo.A.qpicked <- Int32.pred mq.qinfo.A.qpicked;
mq.qinfo.A.qlength <- Int32.succ mq.qinfo.A.qlength;
entry.current_downloads <- []
;;
let proc_return_picked_queue_entry session (qid,eid) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
let entry =
try
List.find
(fun qe -> qe.qentry.A.eid = eid)
mq.qentries
with Not_found -> raise (Error `not_found)
in
if not entry.is_picked then raise (Error `not_picked );
if entry.pick_connection <> Some(Rpc_server.get_connection_id session)
then raise(Error `not_found);
entry.is_picked <- false;
(* Stop all current downloads: *)
stop_downloads mq entry;
reply `successful
)
;;
let proc_remove_picked_queue_entry session (qid,eid) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
let entry =
try
List.find
(fun qe -> qe.qentry.A.eid = eid)
mq.qentries
with Not_found -> raise (Error `not_found)
in
if not entry.is_picked then raise (Error `not_picked );
if entry.pick_connection <> Some(Rpc_server.get_connection_id session)
then raise(Error `not_found);
(* Stop all current downloads: *)
stop_downloads mq entry;
(* Remove this entry from the list, and store the list: *)
mq.qentries <- List.filter
(fun qe -> qe.qentry.A.eid <> eid)
mq.qentries;
mq.qinfo.A.qlength <- Int32.pred mq.qinfo.A.qlength;
store_managed_queue qid mq;
(* Remove the "e*" and "d*" files: *)
let queuedir = Filename.concat spooldir qid in
let qentry_file = Filename.concat queuedir ("e" ^ eid) in
let qdata_file = Filename.concat queuedir ("d" ^ eid) in
Sys.remove qentry_file;
Sys.remove qdata_file;
reply `successful;
(* Finally, consider the [onshrink] activities: *)
mq.onshrink <- List.filter
(fun (_,f) -> protected_call f false) mq.onshrink
)
;;
let proc_remove_queue_entry session (qid,eid) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
let entry =
try
List.find
(fun qe -> qe.qentry.A.eid = eid)
mq.qentries
with Not_found -> raise (Error `not_found)
in
if entry.is_picked &&
entry.pick_connection <> Some(Rpc_server.get_connection_id session)
then raise(Error `not_found);
if entry.is_picked then raise (Error `picked );
if entry.is_uploading then raise (Error `sys_error); (* strange *)
(* Remove this entry from the list, and store the list: *)
mq.qentries <- List.filter
(fun qe -> qe.qentry.A.eid <> eid)
mq.qentries;
mq.qinfo.A.qlength <- Int32.pred mq.qinfo.A.qlength;
store_managed_queue qid mq;
(* Remove the "e*" and "d*" files: *)
let queuedir = Filename.concat spooldir qid in
let qentry_file = Filename.concat queuedir ("e" ^ eid) in
let qdata_file = Filename.concat queuedir ("d" ^ eid) in
Sys.remove qentry_file;
Sys.remove qdata_file;
reply `successful;
(* Finally, consider the [onshrink] activities: *)
mq.onshrink <- List.filter
(fun (_,f) -> protected_call f false) mq.onshrink
)
;;
let proc_download_entry session (qid,eid,chunksize) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
if chunksize > max_chunksize then raise (Error `chunks_too_large);
if chunksize <= Int32.zero then raise (Error `bad_value);
let ichunksize = Int32.to_int chunksize in
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
let entry =
try
List.find
(fun qe -> qe.qentry.A.eid = eid)
mq.qentries
with Not_found -> raise (Error `not_found)
in
if not entry.is_picked then raise (Error `not_picked );
if entry.pick_connection <> Some(Rpc_server.get_connection_id session)
then raise(Error `not_found);
(* Create new dhandle: *)
let n = !next_dhandle in
incr next_dhandle;
let dhandle =
instance_prefix ^ Netnumber.BE.int4_as_string (int4_of_int n) in
(* Create new download record *)
let queuedir = Filename.concat spooldir qid in
let qdata_file = Filename.concat queuedir ("d" ^ eid) in
let file = open_in_bin qdata_file in
begin try
let dl =
{ downfile = file;
downsize = in_channel_length file;
downchunksize = ichunksize;
downserial = Int64.zero;
downentry = entry;
downconnection = Rpc_server.get_connection_id session;
}
in
Hashtbl.add dhandles dhandle dl;
entry.current_downloads <- dhandle :: entry.current_downloads;
with
err -> close_in file; raise err
end;
(* Note: [file] remains open. It will be closed once the download
* is finished, or the download stops for some reason.
*)
(* Return the dhandle: *)
reply (`successful dhandle)
)
;;
let proc_download_chunk session dhandle =
catch_error
(fun reply ->
let dl =
try Hashtbl.find dhandles dhandle
with Not_found -> raise (Error `not_found)
in
if dl.downconnection <> Rpc_server.get_connection_id session
then raise(Error `not_found);
let buf = String.create (dl.downchunksize) in
let n = input dl.downfile buf 0 dl.downchunksize in
let chunk =
{ A.serial = dl.downserial;
A.last = pos_in dl.downfile >= dl.downsize;
A.data =
if n < dl.downchunksize then
String.sub buf 0 n
else
buf;
}
in
dl.downserial <- Int64.succ dl.downserial;
reply (`successful chunk)
)
;;
let timeout_upload mq = (* raise the right exception *)
if mq.deleted then
raise (Error `queue_deleted)
else
if not mq.qinfo.A.qparams.A.qactive then
raise (Error `inactive)
else
if mq.qinfo.A.qparams.A.qaccepting then
raise (Error `full)
else
raise (Error `timeout)
;;
let stop_upload mq entry =
match entry.current_upload with
None -> ()
| Some uhandle ->
let ul =
try Hashtbl.find uhandles uhandle
with Not_found -> assert false
in
close_out ul.upfile;
let queuedir = Filename.concat spooldir entry.qentry.A.eqid in
let qdata_file = Filename.concat queuedir ("d" ^ entry.qentry.A.eid) in
Sys.remove qdata_file;
Hashtbl.remove uhandles uhandle;
mq.qinfo.A.quploads <- Int32.pred mq.qinfo.A.quploads;
entry.current_upload <- None
;;
let start_upload session mq props reply0 timed_out =
let keep = ref true in
let reply r =
(* If [reply] is called, the function [start_upload] must return [false]
* to indicate the it is to be removed from the [onshrink] list
*)
keep := false; reply0 r
in
catch_error
(fun reply ->
if timed_out then timeout_upload mq;
if not mq.qinfo.A.qparams.A.qactive then raise (Error `inactive);
(* Is there space in the queue? *)
let qid = mq.qinfo.A.qid in
if queue_accepts_entries mq then begin
(* Create new uhandle: *)
let n = !next_uhandle in
incr next_uhandle;
let uhandle =
instance_prefix ^ Netnumber.BE.int4_as_string (int4_of_int n) in
(* Create new entry record in memory: *)
let eid = string_of_int (mq.next_eid) in
mq.next_eid <- mq.next_eid + 1;
let qe =
{ A.eid = eid;
A.eqid = qid;
A.ecreation = Int64.of_float (Unix.time());
A.esize = Int64.zero;
A.eprops = props;
}
in
let entry =
{ qentry = qe;
is_picked = false;
pick_connection = None;
is_uploading = true;
upload_connection = Some (Rpc_server.get_connection_id session);
current_downloads = [];
current_upload = Some uhandle;
}
in
(* Create new data file: *)
let queuedir = Filename.concat spooldir qid in
let qdata_file = Filename.concat queuedir ("d" ^ eid) in
let file = open_out_bin qdata_file in
(* Note: the entry file will be created when the upload is finished *)
begin try
(* Create new upload record: *)
let ul =
{ upfile = file;
upserial = Int64.zero;
upentry = entry;
upconnection = Rpc_server.get_connection_id session;
}
in
(* Register [entry]: *)
mq.qentries <- mq.qentries @ [ entry ];
mq.qinfo.A.quploads <- Int32.succ mq.qinfo.A.quploads;
(* Register [ul]: *)
Hashtbl.add uhandles uhandle ul;
with
err -> close_out file; raise err
end;
(* The upload is accepted: *)
try
reply (`successful uhandle);
with
Rpc_server.Connection_lost ->
stop_upload mq entry
end
)
reply;
!keep
;;
let proc_upload_entry session (qid,props,timeout) =
catch_error
(fun reply ->
let user = Rpc_server.get_user session in
(* Sanity check: qid must only consist of alphanumeric characters,
* including underscore
*)
check_name qid;
(* If the queue is short enough, begin the upload immediately;
* otherwise wait
*)
let mq = lookup_managed_queue qid in
if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
if start_upload session mq props reply false then begin
(* Upload is not possible now. Check if should do it later. *)
if timeout = Int32.zero then raise (Error `timeout);
let max_time = if timeout >= Int32.zero then
Unix.time() +. Int32.to_float timeout
else
-1.0
in
mq.onshrink <- mq.onshrink @
[ max_time, start_upload session mq props reply ];
end
)
;;
let proc_upload_chunk session (uhandle,chunk) =
catch_error
(fun reply ->
let ul =
try Hashtbl.find uhandles uhandle
with Not_found -> raise (Error `not_found)
in
if ul.upconnection <> Rpc_server.get_connection_id session
then raise(Error `not_found);
if chunk.A.serial <> ul.upserial then raise (Error `bad_value);
output_string ul.upfile chunk.A.data;
let l = String.length chunk.A.data in
ul.upentry.qentry.A.esize <-
Int64.add ul.upentry.qentry.A.esize (Int64.of_int l);
if chunk.A.last then begin
let entry = ul.upentry in
(* Close the data file *)
close_out ul.upfile;
(* Write the entry file *)
let qid = entry.qentry.A.eqid in
let eid = entry.qentry.A.eid in
let queuedir = Filename.concat spooldir qid in
let qentry_file = Filename.concat queuedir ("e" ^ eid) in
with_out_file (open_out_bin qentry_file)
(fun file ->
let v = A._of_entry entry.qentry in
Netxdr.pack_xdr_value v t_entry [] (output_string file);
);
(* Deallocate uhandle: *)
Hashtbl.remove uhandles uhandle;
entry.current_upload <- None;
(* Make the new entry visible: *)
let mq = lookup_managed_queue qid in
entry.is_uploading <- false;
mq.qinfo.A.qlength <- Int32.succ mq.qinfo.A.qlength;
mq.qinfo.A.quploads <- Int32.pred mq.qinfo.A.quploads;
store_managed_queue qid mq;
(* Tell waiters that there is a new entry: *)
mq.ongrow <- List.filter
(fun (_,f) -> protected_call f false) mq.ongrow;
reply `successful
end
else begin
ul.upserial <- Int64.succ ul.upserial;
reply `successful
end
)
;;
let onclose conn_id =
(* This function is called when a TCP connection is closed. We have to
* iterate over the whole cache of queues and to:
* - return all picked entries (by this connection)
* - stop all downloads (by this connection)
* - stop and remove all uploads (by this connection)
*)
Hashtbl.iter
(fun qid mq ->
let canceled_uploads = ref false in
List.iter
(fun entry ->
if entry.is_picked && entry.pick_connection = Some conn_id then
begin
entry.is_picked <- false;
stop_downloads mq entry;
end;
if entry.is_uploading && entry.upload_connection = Some conn_id then
begin
entry.is_uploading <- false;
stop_upload mq entry;
canceled_uploads := true;
end;
)
mq.qentries;
(* If there are cancaled uploads, execute [onshrink]: *)
if !canceled_uploads then
mq.onshrink <- List.filter
(fun (_,f) -> protected_call f false) mq.onshrink;
)
queues
;;
let check_timeouts ev =
(* This function is called every minute. We have to
* iterate over the whole cache of queues and to:
* - return all timed out picked entries and stop all downloads
* - stop and remove all timed out uploads
*)
( match ev with
Unixqueue.Timeout(_,_) -> ()
| _ -> raise Equeue.Reject
);
let now = Unix.time() in
Hashtbl.iter
(fun qid mq ->
(* Are there timed out events in [onshrink]? *)
mq.onshrink <-
List.filter
(fun (t,f) ->
if t >= 0.0 && t <= now then
protected_call f true
else
true
)
mq.onshrink;
(* Are there timed out events in [ongrow]? *)
mq.ongrow <-
List.filter
(fun (t,f) ->
if t >= 0.0 && t <= now then
protected_call f true
else
true
)
mq.ongrow;
)
queues
;;
let pluggable_auth_module =
ref ("<None>",
(`Socket(Tcp, Rpc_server.Portmapped, Rpc_server.default_socket_config)),
(fun _ ->
failwith "No auth module linked, startup not possible"; () )) ;;
let main() =
Printexc.record_backtrace true;
let esys = Unixqueue.create_unix_event_system() in
let (auth_name, srv_mode, f_srv_config) = !pluggable_auth_module in
prerr_endline ("Starting queues server with auth module: " ^ auth_name);
(*
Rpc_client.verbose true;
Rpc_server.verbose true;
Rpc_client.Debug.enable_ptrace := true;
*)
let server = Rpc_server.create2 srv_mode esys in
S1.bind_async
~proc_ping
~proc_create_queue
~proc_delete_queue
~proc_list_queues
~proc_get_queue
~proc_set_queue
~proc_list_queue_entries
~proc_pick_queue_entry
~proc_return_picked_queue_entry
~proc_remove_picked_queue_entry
~proc_remove_queue_entry
~proc_download_entry
~proc_download_chunk
~proc_upload_entry
~proc_upload_chunk
server;
Rpc_server.set_onclose_action server onclose;
f_srv_config server;
(* Arrange that [check_timeouts] is called every second: *)
let w = Unixqueue.new_wait_id esys in
let g = Unixqueue.new_group esys in
Unixqueue.add_handler esys g (fun _ _ -> check_timeouts);
Unixqueue.add_resource esys g (Unixqueue.Wait w, 1.0);
List.iter
(fun signal ->
Netsys_signal.register_handler
~signal
~name:"Qserver"
~callback:(fun _ ->
Unixqueue.remove_resource esys g (Unixqueue.Wait w);
Rpc_server.stop_server server;
)
()
)
[ Sys.sighup; Sys.sigint; Sys.sigquit; Sys.sigterm ];
Sys.set_signal
Sys.sigpipe
Sys.Signal_ignore;
let rec auto_restart f arg =
try f arg
with err ->
prerr_endline ("Server: Uncaught exception: " ^ Printexc.to_string err);
auto_restart f arg
in
(* Fork *)
match Unix.fork() with
0 ->
(* Child *)
Sys.chdir "/";
ignore(Unix.setsid());
auto_restart Unixqueue.run esys;
exit 99
| n when n > 0 ->
(* Parent *)
()
| _ ->
assert false
;;
|