1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
|
(* File is a copy of netplex_types.mli *)
(* $Id$ *)
(** Types for [Netplex] *)
type encap = Netplex_encap.encap
type param_value =
[ `String of string
| `Int of int
| `Float of float
| `Bool of bool
]
type param_value_or_any =
[ param_value
| `Any of exn
| `Encap of encap
]
type level =
[ `Emerg | `Alert | `Crit | `Err | `Warning | `Notice | `Info | `Debug ]
(** Log levels, modeled after syslog *)
(** A logger receives log messages *)
class type logger =
object
(** A logger receives log messages *)
method log_subch : component:string -> subchannel:string ->
level:level -> message:string -> unit
(** Receive a log [message] of [level] from [component]. The component
string is the name of the socket service emitting the message.
Optionally, one can specify a [subchannel] when a single component
needs to write to several log files. For example, a [subchannel]
of ["access"] could select the access log of a webserver.
The main log file is selected by passing the empty string as
[subchannel].
*)
method log : component:string -> level:level -> message:string -> unit
(** Same as [log_subch] when [subchannel] is set to the empty string.
This means that the message is sent to the main log file of the
component.
*)
method reopen : unit -> unit
(** Reopen the log files *)
end
type parallelization_type =
[ `Multi_processing
| `Multi_threading
| `Controller_attached
]
(** Type of parallelization:
* - [`Multi_processing] on a single host
* - [`Multi_threading] on a single host
* - [`Controller_attached] means that the service runs within the
* controller. This is (for now) only allowed for controller-internal
* services.
*)
type thread_sys_id =
[ `Thread of int | `Process of int ]
(** A system-specific identifier of the thread/process *)
type socket_state =
[ `Enabled | `Disabled | `Restarting of bool | `Down ]
(** The state of a socket:
* - [`Enabled]: The controller allows containers to accept connections.
* Note that this does not necessarily means that there such containers.
* - [`Disabled]: It is not allowed to accept new connections. The
* socket is kept open, however.
* - [`Restarting b]: The containers are being restarted. The boolean
* argument says whether the socket will be enabled after that.
* - [`Down]: The socket is down/closed
*)
type container_id = < socket_service_name : string >
(** Such objects identify containers. As additional info, the method
[socket_service_name] returns the name of the socket service the
container implements
*)
type container_state =
[ `Accepting of int * float
| `Busy
| `Starting of float
| `Shutting_down
]
(** The container state for workload management:
* - [`Accepting(n,t)]: The container is accepting further connections.
* It currently processes [n] connections. The last connection was
* accepted at time [t] (seconds since the epoch).
* - [`Busy]: The container does not accept connections
* - [`Starting t]: The container was started at time [t] and is not
* yet ready.
* - [`Shutting_down]: The container is being shutted down.
*)
type capacity =
[ `Normal_quality of int * bool
| `Low_quality of int * bool
| `Unavailable
]
(** How many connections a container can accept in addition to the
* existing connections:
* - [`Normal_quality(n,greedy)]: It can accept n connections with normal
* service quality, [n > 0]
* - [`Low_quality(n,greedy)]: It can accept n connections with low
* service quality (e.g. because it is already quite loaded), [n > 0]
* - [`Unavailable]: No capacity free
*
* The [greedy] flag sets whether greedy accepts are allowed.
*)
type extended_address =
[ `Socket of Unix.sockaddr
| `Socket_file of string
| `W32_pipe of string
| `W32_pipe_file of string
| `Container of string * string * string * [ thread_sys_id | `Any ]
| `Internal of string
]
(** Possible addresses:
- [`Socket s]: The socket at this socket address
- [`Socket_file f]: The file [f] contains the (anonymous) port number
of a socket bound to [127.0.0.1] (This is meant as substitute for
Unix Domain sockets on Win32.)
- [`W32_pipe name]: The Win32 pipe with this [name] which must
be of the form "\\.\pipe\<pname>"
- [`W32_pipe_file f]: The file [f] contains the (random) name of a
Win32 pipe
- [`Container(socket_dir,service_name,proto_name,thread_id)]:
The special endpoint
of the container for [service_name] that is running as [thread_id].
It is system-dependent what this endpoint "is" in reality, usually
a socket or named pipe. If any container of a service is meant,
the value [`Any] is substituted as a placeholder for a not yet
known [thread_id].
- [`Internal name]: an internal service called [name]. Internal sockets
are only available for multithreaded programs, and are registered
at {!Netplex_internal}.
*)
(** Equality witness *)
type (_,_) eq =
| Equal : ('a,'a) eq
| Not_equal
(** List possible argument types for polysockets ({!Netsys_polysocket}),
which are the basis for internal services. Since OCaml-4.02 this
type is extensible by the user.
*)
#ifdef HAVE_EXTENSIBLE_VARIANTS
type _ polysocket_kind = ..
type _ polysocket_kind +=
#else
type _ polysocket_kind =
#endif
| Txdr : Netxdr.xdr_value polysocket_kind
| Tstring : string polysocket_kind
(** Helper type for a polymorphic check whether a kind this the expected
kind *)
type 'a kind_check =
{ kind_check : 'b . 'b polysocket_kind -> ('a,'b) eq
}
(** Boxed version of [polysocket_kind] where the type parameter is hidden *)
type polysocket_kind_box =
| Polysocket_kind_box : _ polysocket_kind -> polysocket_kind_box
(** This type pairs a [polysocket_kind] with a [polyserver] as GADT. The
type parameter is hidden.
*)
type polyserver_box =
| Polyserver_box : 'a polysocket_kind *
'a Netsys_polysocket.polyserver ->
polyserver_box
(** This type pairs a [polysocket_kind] with a [polyclient]
as GADT. The type parameter is hidden.
*)
type polyclient_box =
| Polyclient_box : 'a polysocket_kind *
'a Netsys_polysocket.polyclient ->
polyclient_box
(** Internally used *)
type extfd =
| OS_descr of Unix.file_descr
| Poly_endpoint of Netxdr.xdr_value Netsys_polysocket.polyendpoint
(** The controller is the object in the Netplex master process/thread
that manages the containers, logging, and service definitions
*)
class type controller =
object
method ptype : parallelization_type
(** The actually effective parallelization type *)
method sys_id : thread_sys_id
(** The thread running the controller *)
method controller_config : controller_config
method services : (socket_service * socket_controller * workload_manager) list
(** The list of controlled services *)
method add_service : socket_service -> workload_manager -> unit
(** Adds a new service. Containers for these services will be started
* soon. It is allowed to add several services with the same name
* (but it will be hard to distinguish them later).
*)
method add_message_receiver : ctrl_message_receiver -> unit
(** Adds a message receiver. This receiver runs in the context of the
controller and receives all messages sent to it. The [name] method
must return the name.
*)
method add_plugin : plugin -> unit
(** Adds a plugin. If the plugin object has already been added, this
is a no-op.
Plugins must have been added before the first container is started.
This is not checked, however. You are on the safe side when the
plugin is added in the [create_processor] factory method, or in
the [post_add_hook] of the processor.
*)
method add_admin : (Rpc_server.t -> unit) -> unit
(** [add_admin setup]: Allows to bind another RPC program to the admin
* socket. The function [setup] will be called whenever a connection
* to the admin socket is established, and this function can call
* [Rpc_server.bind] to bind another RPC program. By default, only
* the [Admin] interface is available as described in [netplex_ctrl.x].
*
* Note that this RPC server runs in the scope of the controller! No
* additional process or thread is created.
*)
method logger : logger
(** The logger *)
method event_system : Unixqueue.unix_event_system
(** The event system used by the controller. It {b must not} be used
* from a container.
*)
method restart : unit -> unit
(** Initiates a restart of all containers: All threads/processes are
* terminated and replaced by newly initialized ones.
*)
method shutdown : unit -> unit
(** Initiates a shutdown of all containers. It is no longer possible
* to add new services. When the shutdown has been completed,
* the controller will terminate itself. Note that the shutdown is
* performed asynchronously, i.e. this method returns immediately,
* and the messaging required to do the shutdown is done in the
* background.
*)
method send_message : string -> string -> string array -> unit
(** [send_message destination msgname msgargs]: Sends a message to
[destination]. When this method returns, it is only ensured that
the receivers registered in the controller have been notified about
the message (so it can be made sure that any newly forked containers
know about the message). It is not guaranteed that the existing
containers are notified when this method returns. This can (and
usually will) happen at any time in the future.
*)
method send_admin_message : string -> string -> string array -> unit
(** [send_message destination msgname msgargs]: Sends an admin message to
[destination].
See [send_message] for the notification guarantees.
*)
method register_lever : (controller -> encap -> encap) -> int
(** [let id = register_lever f]: It is possible to register a function [f]
in the controller, and run it over the internal RPC interface from
any container. These functions are called levers. See
[activate_lever] below. See also
{!Netplex_cenv.Make_lever} for a convenient way to create
and use levers.
*)
method activate_lever : int -> encap -> encap
(** Runs the registered lever directly *)
method containers : container_id list
(** Lists the containers *)
method containers_for : string -> container_id list
(** Lists the containers for a certain socket service name *)
method container_count : string -> int
(** The number of containers for a certain socket service name *)
method free_resources : unit -> unit
(** Should be called when the controller is finished, in order to
free resources again. E.g. plugins are unplugged, and the master
sockets are closed.
*)
method startup_directory : string
(** The current directory at startup time *)
end
and controller_config =
object
method socket_directory : string
(** The directory where Unix domain sockets are created. For every
* service a subdirectory is created, and the socket has the name
* of the protocol.
*
* This is always an absolute path, even if it is only given as
* relative path in the config file.
*)
method create_logger : controller -> logger
(** Create a logger to be used for the whole Netplex system. The
* controller is already initialized which makes it possible to
* write the logger as Netplex service. Messages arriving during the
* creation are queued up and sent afterwards to the new logger.
*)
method max_level : level
(** Return the maximum global log level *)
method set_max_level : level -> unit
(** Set the maximum global log level *)
end
and socket_service =
object
method name : string
(** The name of the [socket_service] is used to identify the service
* in the whole netplex process cluster. Names are hierarchical;
* name components are separated by dots (e.g. "company.product.service").
* The prefix "netplex." is reserved for use by Netplex. The name
* "netplex.controller" refers to the service provided by the
* controller.
*)
method sockets : (string * Unix.file_descr array) list
(** A [socket_service] consists of a list of supported protocols
* which are identified by a name. Every protocol is available
* on a list of sockets (which may be bound to different addresses).
* The sockets corresponding to [`Container] addresses are missing
* here.
*)
method internal_sockets : (string * polyserver_box) list
(** The internal sockets for internal services: pairs of
([protocol,server])
*)
method socket_service_config : socket_service_config
(** The configuration *)
method processor : processor
(** A user-supplied object to process incoming connections *)
method shutdown : unit -> unit
(** Shuts down the master sockets *)
method create_container : parallelization_type -> socket_service -> container
(** {b Internal method.} Called by the controller to create a new
* container. The container must match the parallelization type of
* the controller. This call is already done in the process/thread
* provided for the container.
*)
method on_add : controller -> unit
(** Get some runtime configuration aspects from this controller. This
is called when the socket service is added to the controller
*)
method startup_directory : string
(** The current directory at Netplex startup time (same view as controller)
*)
end
and socket_service_config =
object
method name : string
(** The proposed name for the [socket_service] *)
method protocols : protocol list
(** This list describes the sockets to create in detail *)
method change_user_to : (int * int) option
(** Instructs the container to change the user of the process after
* starting the service. This is only possible in multi-processing mode.
* In multi-threading mode, this parameter is ignored.
*)
method startup_timeout : float
(** After this many seconds the container must have finished the
[post_start_hook]. It is usually 60 seconds.
*)
method conn_limit : int option
(** An optional limit of the number of connections this container
can accept. If the limit is reached, the container will not
accept any further connections, and shut down when all connections
are processed.
*)
method gc_when_idle : bool
(** If set, idle containers run a [Gc.full_major] cycle. *)
method controller_config : controller_config
(** Make this config accessible here too, for convenience *)
end
and protocol =
object
method name : string
(** The protocol name is an arbitrary string identifying groups of
* sockets serving the same protocol for a [socket_service].
*)
method addresses : extended_address array
(** The addresses of the master sockets. (The socket type is always
* SOCK_STREAM.) The list must be non-empty.
*)
method lstn_backlog : int
(** The backlog (argument of Unix.listen) *)
method lstn_reuseaddr : bool
(** Whether to reuse ports immediately *)
method so_keepalive : bool
(** Whether to set the keep-alive socket option *)
method tcp_nodelay : bool
(** Whether to set the TCP_NODELAY option *)
method local_chmod : int option
(** Whether to chmod Unix Domain sockets *)
method local_chown : (int * int) option
(** Whether to chown (user,group) Unix Domain sockets *)
method configure_slave_socket : Unix.file_descr -> unit
(** A user-supplied function to configure slave sockets (after [accept]).
* The function is called from the process/thread of the container.
*)
end
and socket_controller =
object
method state : socket_state
(** The current state *)
method enable : unit -> unit
(** Enables a disabled socket service again *)
method disable : unit -> unit
(** Disable a socket service temporarily *)
method restart : unit -> unit
(** Restarts the containers for this socket service only *)
method shutdown : unit -> unit
(** Closes the socket service forever, and initiates a shutdown of all
* containers serving this type of service.
*)
method container_state : (container_id * string * container_state * bool) list
(* (cid, par_info, cstate, selected)
* par_info: the info_string of the par_thread
* selected: says whether the container is selected to accept the
* next connection
*)
method start_containers : int -> int
(* Arg: #containers to start. Return val: actual started #containers *)
method stop_containers : container_id list -> unit
end
and ctrl_message_receiver =
object
method name : string
(** The name of this receiver *)
method receive_message :
controller -> string -> string array -> unit
(** This function is called when a broadcast message is received.
* The first string is the name of the message, and the array are
* the arguments.
*)
method receive_admin_message :
controller -> string -> string array -> unit
(** This function is called when a broadcast admin message is received.
* The first string is the name of the message, and the array are
* the arguments.
*)
end
(** Processor hooks can be used to modify the behavior of a processor.
See {!Netplex_intro.servproc} for some documentation about the hooks.
*)
and processor_hooks =
object
method post_add_hook : socket_service -> controller -> unit
(** A user-supplied function that is called after the service has been
* added to the controller
*)
method post_rm_hook : socket_service -> controller -> unit
(** A user-supplied function that is called after the service has been
* removed from the controller
*)
method pre_start_hook : socket_service -> controller -> container_id -> unit
(** A user-supplied function that is called before the container is
* created and started. It is called from the process/thread of the
* controller.
*)
method post_start_hook : container -> unit
(** A user-supplied function that is called after the container is
* created and started, but before the first service request arrives.
* It is called from the process/thread of the
* container.
*)
method pre_finish_hook : container -> unit
(** A user-supplied function that is called just before the container is
* terminated. It is called from the process/thread of the
* container.
*)
method post_finish_hook : socket_service -> controller -> container_id -> unit
(** A user-supplied function that is called after the container is
* terminated. It is called from the process/thread of the
* controller.
*)
method workload_hook : container -> bool -> int -> unit
(** A user-supplied function that is called when the workload
changes, i.e. a new connection has been accepted, or an
existing connection could be completely processed.
The [bool] argument is [true] if the reason is a new
connection. The [int] argument is the number of connections.
This function is called from the process/thread of the container.
*)
method receive_message :
container -> string -> string array -> unit
(** This function is called when a broadcast message is received.
* The first string is the name of the message, and the array are
* the arguments.
*)
method receive_admin_message :
container -> string -> string array -> unit
(** This function is called when a broadcast admin message is received.
* The first string is the name of the message, and the array are
* the arguments.
*)
method config_internal : (string * polysocket_kind_box) list
(** For internal services, this list configures which message kind
is used for which protocol.
*)
method process_internal :
when_done:(unit -> unit) ->
container -> polyserver_box -> string -> unit
(** [process_internal ~when_done cont client protocol]: This function
is called instead of [process] when a connection to an internal
service is made. This method has to accept or reject the connection
with {!Netsys_polysocket.accept} or {!Netsys_polysocket.refuse},
respectively. The default is to refuse.
Like [process], the function must call [when_done] when the connection
is fully processed.
*)
method system_shutdown : unit -> unit
(** A user-supplied function that is called when a system shutdown
* notification arrives. This notification is just for information
* that every container of the system will soon be shut down. The
* system is still completely up at the time this notification
* arrives, so if the services of other components are required to
* go down this is the right point in time to do that (e.g. send
* important data to a storage component).
*)
method shutdown : unit -> unit
(** A user-supplied function that is called when a shutdown notification
* arrives. That means that the container should terminate ASAP.
* There is, however, no time limitation. The termination is started
* by calling the [when_done] function passed to the [process] method.
*)
method global_exception_handler : exn -> bool
(** This method is called when an uncaught exception would otherwise
* terminate the container. It can return [true] to indicate that
* the container continues running.
*)
method container_event_system : unit -> Unixqueue.event_system
(** This method is called to get the event systems for containers.
This is normally a {!Unixqueue.standard_event_system}, but
users can override it.
*)
method container_run : Unixqueue.event_system -> unit
(** [container_run esys]: By default, it just runs [esys#run()].
This method is called to run the event system of the containers.
Users can override it.
*)
end
(** The processor is the object that is notified when a new TCP connection
is accepted. The processor has to include the protocol interpreter that
reads and write data on this connection. See {!Netplex_intro.defproc}
for an example how to define a processor.
*)
and processor =
object
inherit processor_hooks
method process :
when_done:(unit -> unit) ->
container -> Unix.file_descr -> string -> unit
(** A user-supplied function that is called when a new socket connection
* is established. The function can now process the requests arriving
* over the connection. It is allowed to use the event system of the
* container, and to return immediately (multiplexing processor). It is
* also allowed to process the requests synchronously and to first return
* to the caller when the connection is terminated.
*
* The function {b must} call [when_done] to indicate that it processed
* this connection completely.
*
* The string argument is the protocol name.
*)
method supported_ptypes : parallelization_type list
(** The supported parallelization types *)
end
(** Containers encapsulate the control flow of the service components.
A container is run in a separate thread or process.
{b Thread safety:} All methods except [start] can be called from
any thread, and provide full thread safety.
*)
and container =
object
method socket_service_name : string
method socket_service : socket_service
method container_id : container_id
(** Return the container ID *)
method ptype : parallelization_type
(** The parallelization type actually used for this container *)
method event_system : Unixqueue.unix_event_system
(** The event system the container uses *)
method start : extfd -> extfd -> unit
(** {b Internal Method.} Called by the controller to start the container.
* It is the responsibility of the container to call the
* [post_start_hook] and the [pre_finish_hook].
*
* The file descriptors are endpoints of RPC connections to the
* controller. The first serves calls of the [Control] program,
* and the second serves calls of the [System] program.
*
* When [start] returns the container will be terminated.
*)
method shutdown : unit -> unit
(** Initiates a shutdown of the container. *)
method n_connections : int
(** The current number of connections *)
method n_total : int
(** The sum of all connections so far *)
method system : Rpc_client.t
(** An RPC client that can be used to send messages to the controller.
* Only available while [start] is running. It is bound to
* [System.V1].
*
* In multi-threaded programs access to [system] must be governed
* by [system_monitor]. See {!Uq_mt} for details what this means.
*)
method system_monitor : Uq_mt.monitor
(** The thread monitor protecting the [system] RPC client *)
method lookup : string -> string -> string option
(** [lookup service_name protocol_name] tries to find a Unix domain
* socket for the service and returns it.
*)
method lookup_container_sockets : string -> string -> string array
(** [lookup_container_sockets service_name protocol_name]: returns
the Unix Domain paths of all container sockets for this service and
protocol. These are the sockets declared with address type
"container" in the config file.
*)
method owned_container_sockets : (string * string) list
(** List of pairs [(protocol_name, path)] of all container sockets
of this container
*)
method send_message : string -> string -> string array -> unit
(** [send_message service_pattern msg_name msg_arguments]: Sends
a message to all services and message receivers matching
[service_pattern]. The pattern may include the wildcard [*].
See the {!Netplex_types.controller.send_message} method for
the notification guarantees.
*)
method log : level -> string -> unit
(** Sends a log message to the controller. *)
method log_subch : string -> level -> string -> unit
(** Sends a log message to the controller. The first string is the
subchannel
*)
method update_detail : Unix.file_descr -> string -> unit
(** Update the detail string output for the [netplex.connections]
admin message
*)
method var : string -> param_value_or_any
(** Returns the value of a container variable or [Not_found]. Container
* variables can be used by the user of a container to store additional
* values in the container. These values exist once per thread/process.
*)
method set_var : string -> param_value_or_any -> unit
(** Sets the value of a container variable *)
method call_plugin : plugin -> string -> Netxdr.xdr_value-> Netxdr.xdr_value
(** [call_plugin p procname procarg]: This method can be called
from the container context to invoke the plugin [p] procedure
[procname]. This means that the [ctrl_receive_call] of the
same plugin is invoked in the controller context.
*)
method activate_lever : int -> encap -> encap
(** Runs a lever function registered in the controller. The [int]
argument identifies the lever. The [encap] argument is the parameter,
and the returned exception is the result. See also
{!Netplex_cenv.Make_lever} for a convenient way to create
and use levers.
*)
method startup_directory : string
(** The current directory at Netplex startup time (same view as controller)
*)
end
(** See {!Netplex_workload} for definitions of workload managers *)
and workload_manager =
object
method hello : controller -> unit
(** Called by the controller when the service is added *)
method shutdown : unit -> unit
(** Called by the controller to notify the manager about a shutdown *)
method adjust : socket_service -> socket_controller -> unit
(** This function is called by the controller at certain events to
* adjust the number of available containers. The manager can
* call [start_containers] and [stop_containers] to change the
* system.
*
* The function is called right after the startup to ensure
* that there are containers to serve requests. It is also called:
* - just after a connection has been accepted and before it is
* decided which container will have the chance to accept in the
* round
* - after the shutdown of a container
*
* Of course, the workload manager is free to adjust the load
* at any other time, too, not only when [adjust] is called.
*)
method capacity : container_id -> container_state -> capacity
(** Computes the capacity, i.e. the number of jobs a certain container
* can accept in addition to the existing load.
*)
end
and plugin =
object
method required : plugin list
(** Required plugins *)
method program : Rpc_program.t
(** The RPC program structure on which the messaging bases. The program,
version and procedure numbers are ignored
*)
method ctrl_added : controller -> unit
(** This method is invoked when the plugin has been added to this
controller. Note that plugins can be added to several controllers.
*)
method ctrl_unplugged : controller -> unit
(** The plugin has been unplugged from this controller *)
method ctrl_receive_call :
controller -> container_id -> string -> Netxdr.xdr_value ->
(Netxdr.xdr_value option -> unit) ->
unit
(** [ctrl_receive_call ctrl cid procname procarg emit]:
This method is called in the controller context [ctrl] when a procedure
named [procname] is called. In [procarg] the argument of the
procedure is passed. [cid] is the container ID from where the
call originates. To pass the result [r] of the call back to the caller,
is is required to call [emit (Some r)] (either immediately, or at
some time in the future). By calling [emit None], an error condition
is propagated back to the caller.
*)
method ctrl_container_finished : controller -> container_id -> bool -> unit
(** This method is called when a container finishes
(after [post_finish_hook]).
The boolean is true if the container is the last of the terminated
socket service.
*)
end
(** Plugins are extensions of the Netplex system that run in the controller
and can be invoked from containers
*)
;;
class type par_thread =
object
method ptype : parallelization_type
method sys_id : thread_sys_id
(** Returns a system-dependent identifier for the thread:
* - [`Thread id]: The [id] as returned by [Thread.id]
* - [`Process id]: The [id] is the process ID
*)
method info_string : string
(** Outputs the process or thread ID *)
method watch_shutdown : Unixqueue.unix_event_system -> unit
(** Called by the controller if it thinks the container is down.
* {b This method must not be called outside the internal Netplex
* implementation!}
*)
method parallelizer : parallelizer
(** Returns the parallelizer that created this thread. Can be used
* to start another thread of the same type.
*)
end
and parallelizer =
object
method ptype : parallelization_type
method init : unit -> unit
(** Initializes the main process for usage with this parallelizer.
* {b This method must not be called outside the internal Netplex
* implementation!}
*)
method start_thread :
(par_thread -> unit) ->
Unix.file_descr list ->
Unix.file_descr list ->
string ->
logger ->
par_thread
(** [start_thread f l_close l_share name logger]:
* Starts a new thread or process and calls
* [f thread] in that context. Before this is done, file descriptors
* are closed, controlled by the parameters [l_close] and [l_share].
* The descriptors in [l_close] are always closed. The descriptors
* in [l_share] are not closed. The implementation of the parallelizer
* is free to close a reasonable set of descriptors, and [l_close]
* is the minimum, and [all - l_share] is the maximum.
*
* There is no way to check when the thread terminates.
*
* It is allowed that the [par_thread] object passed to [f] is a different
* object as the returned [par_thread] object.
*)
method create_mem_mutex : unit -> ( (unit -> unit) * (unit -> unit) )
(** [let lock, unlock = par#create_mem_locker()]: Creates a mutex that
* is sufficient to protect process memory from uncoordinated access.
* The function [lock] obtains the lock, and [unlock] releases it.
*)
method current_sys_id : [ `Thread of int | `Process of int ]
(** Returns the system-dependent thread identifier of the caller *)
end
type config_tree =
[ `Section of string * config_tree list
(* (relative_name, contents) *)
| `Parameter of string * param_value
(* (relative_name, contents) *)
]
and address = < >
class type config_file =
object
method filename : string
method tree : config_tree
method root_addr : address
method root_name : string
method resolve_section : address -> string -> address list
(* Fails if the address cannot be found. Returns [] if there is no
* such section at this address
*)
method resolve_parameter : address -> string -> address
(* Fails if the address cannot be found. Raises Not_found if there is no
* such parameter at this address
*)
method print : address -> string
method string_param : address -> string
method int_param : address -> int
method float_param : address -> float
method bool_param : address -> bool
method restrict_subsections : address -> string list -> unit
method restrict_parameters : address -> string list -> unit
end
class type processor_factory =
object
method name : string
method create_processor :
controller_config -> config_file -> address -> processor
end
class type workload_manager_factory =
object
method name : string
method create_workload_manager :
controller_config -> config_file -> address -> workload_manager
end
class type logger_factory =
object
method name : string
method create_logger : config_file -> address -> controller -> logger
end
class type netplex_config =
object
method ptype : parallelization_type
method controller_config : controller_config
method services : (socket_service_config *
(address * processor_factory) *
(address * workload_manager_factory) ) list
end
|