File: test_multipart.pl

package info (click to toggle)
swi-prolog 9.0.4%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 82,408 kB
  • sloc: ansic: 387,503; perl: 359,326; cpp: 6,613; lisp: 6,247; java: 5,540; sh: 3,147; javascript: 2,668; python: 1,900; ruby: 1,594; yacc: 845; makefile: 428; xml: 317; sed: 12; sql: 6
file content (123 lines) | stat: -rw-r--r-- 3,794 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
:- module(test_multipart,
          [ test_multipart/0,
            data_to_mf/3,
            dump_mf/1,
            mf_to_data/3
          ]).
:- asserta(user:file_search_path(foreign, '.')).
:- asserta(user:file_search_path(foreign, '../clib')).
:- asserta(user:file_search_path(foreign, '../sgml')).
:- asserta(user:file_search_path(library, '..')).
:- asserta(user:file_search_path(library, '../sgml')).
:- asserta(user:file_search_path(library, '../plunit')).
:- asserta(user:file_search_path(library, '../clib')).

:- use_module(library(plunit)).
:- use_module(library(memfile)).
:- use_module(library(http/http_stream)).
:- use_module(library(http/mimepack)).
:- use_module(library(http/http_header)).
:- use_module(library(apply)).
:- use_module(library(option)).
:- use_module(library(debug)).
:- use_module(library(lists)).

test_multipart :-
    run_tests([ multipart
              ]).

data_to_mf(Data, MF, Boundary) :-
    new_memory_file(MF),
    setup_call_cleanup(
        open_memory_file(MF, write, Out, [encoding(octet)]),
        mime_pack(Data, Out, Boundary),
        close(Out)).

dump_mf(MF) :-
    setup_call_cleanup(
        open_memory_file(MF, read, In, [encoding(octet)]),
        copy_stream_data(In, current_output),
        close(In)).

mf_to_data(MF, Boundary, Data) :-
    mf_to_data(MF, Boundary, Data, []).

mf_to_data(MF, Boundary, Data, Options) :-
    setup_call_cleanup(
        open_memory_file(MF, read, In, [encoding(octet)]),
        process_multifile(In, Boundary, Data, Options),
        close(In)).

process_multifile(In, Boundary, Data, Options) :-
    (   option(in_buffer_size(BSize), Options)
    ->  set_stream(In, buffer_size(BSize))
    ;   true
    ),
    setup_call_cleanup(
        multipart_open(In, Stream, [boundary(Boundary)]),
        process_parts(Stream, Data, Options),
        close(Stream)).

process_parts(Stream, Parts, Options) :-
    (   option(data_buffer_size(BSize), Options)
    ->  set_stream(Stream, buffer_size(BSize))
    ;   true
    ),
    process_parts(Stream, Parts).

process_parts(Stream, [part(Header, String)|More]) :-
    http_read_header(Stream, HTTPHeader),
    part_header(HTTPHeader, Header),
    read_string(Stream, _, String),
    debug(multipart(content), 'Got ~q~n', [String]),
    (   multipart_open_next(Stream)
    ->  process_parts(Stream, More)
    ;   More = []
    ).

part_header([content_disposition(disposition(Type, Args))], Args) :-
    assertion(Type == 'form-data').

long_string(N, String) :-
    numlist(1,N,List),
    term_string(List, String).

:- begin_tests(multipart).

test(one, Data == [part([name=n], "v")]) :-
    data_to_mf([n=v], MF, B),
    mf_to_data(MF, B, Data).
test(two, Data == [part([name=n], "v"), part([name=x], "y")]) :-
    data_to_mf([n=v,x=y], MF, B),
    mf_to_data(MF, B, Data).
test(long, Data == [part([name=n], String), part([name=x], "y")]) :-
    long_string(10000, String),
    data_to_mf([n=String,x=y], MF, B),
    mf_to_data(MF, B, Data).
test(multi_part_buffer_size) :-
    Data = [n="v",x="y"],
    forall(between(1, 200, BS),
           loop(Data, [in_buffer_size(BS)])).
test(multi_part_buffer_size_long) :-
    long_string(100, String),
    Data = [n=String,x="y"],
    forall(between(1, 200, BS),
           loop(Data, [in_buffer_size(BS)])).
test(data_buffer_size) :-
    Data = [n="v",x="y"],
    forall(between(1, 200, BS),
           loop(Data, [data_buffer_size(BS)])).
test(data_buffer_size_long) :-
    long_string(100, String),
    Data = [n=String,x="y"],
    forall(between(1, 200, BS),
           loop(Data, [data_buffer_size(BS)])).

loop(Data, Options) :-
    data_to_mf(Data, MF, B),
    mf_to_data(MF, B, DataOut, Options),
    maplist(ok, Data, DataOut).

ok(Name=Value, part([name=Name], Value)).

:- end_tests(multipart).