File: test_subprocess.f90

package info (click to toggle)
fortran-stdlib 0.8.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 34,008 kB
  • sloc: f90: 24,178; ansic: 1,244; cpp: 623; python: 119; makefile: 13
file content (176 lines) | stat: -rw-r--r-- 6,813 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
module test_subprocess
    use testdrive, only : new_unittest, unittest_type, error_type, check, skip_test
    use stdlib_system, only: process_type, run, runasync, is_running, wait, update, elapsed, is_windows, kill

    implicit none

contains

    !> Collect all exported unit tests
    subroutine collect_suite(testsuite)
        !> Collection of tests
        type(unittest_type), allocatable, intent(out) :: testsuite(:)

        testsuite = [ &
            new_unittest('test_run_synchronous', test_run_synchronous), &
            new_unittest('test_run_asynchronous', test_run_asynchronous), &
            new_unittest('test_process_kill', test_process_kill), &
            new_unittest('test_process_state', test_process_state), &
            new_unittest('test_input_redirection', test_input_redirection) &
        ]
    end subroutine collect_suite

    !> Test running a synchronous process
    subroutine test_run_synchronous(error)
        type(error_type), allocatable, intent(out) :: error
        type(process_type) :: process
        character(len=*), parameter :: command = "echo Hello"

        process = run(command, want_stdout=.true.)
        call check(error, process%completed)
        if (allocated(error)) return
        
        call check(error, trim(process%stdout) == "Hello", "stdout=<"//trim(process%stdout)//">, expected <Hello>")
    end subroutine test_run_synchronous

    !> Test running an asynchronous process
    subroutine test_run_asynchronous(error)
        type(error_type), allocatable, intent(out) :: error
        type(process_type) :: process
        logical :: running

        ! The closest possible to a cross-platform command that waits
        if (is_windows()) then 
            process = runasync("ping -n 2 127.0.0.1")
        else
            process = runasync("ping -c 2 127.0.0.1")
        endif
        ! Should not be immediately completed
        call check(error, .not. process%completed, "ping process should not complete immediately")
        if (allocated(error)) return

        running = is_running(process)
        call check(error, running, "ping process should still be running immediately after started")
        if (allocated(error)) return

        call wait(process)
        call check(error, process%completed, "process should be complete after `call wait`")
        if (allocated(error)) return

        call check(error, elapsed(process)>1.0e-4, "There should be a non-zero elapsed time")
        
    end subroutine test_run_asynchronous

    !> Test killing an asynchronous process
    subroutine test_process_kill(error)
        type(error_type), allocatable, intent(out) :: error
        type(process_type) :: process
        logical :: running, success

        ! Start a long-running process asynchronously
        if (is_windows()) then
            process = runasync("ping -n 10 127.0.0.1")
        else
            process = runasync("ping -c 10 127.0.0.1")
        endif

        ! Ensure the process starts running
        call check(error, .not. process%completed, "Process should not be completed immediately after starting")
        if (allocated(error)) return

        running = is_running(process)
        call check(error, running, "Process should be running immediately after starting")
        if (allocated(error)) return

        ! Kill the process
        call kill(process, success)
        call check(error, success, "Failed to kill the process")
        if (allocated(error)) return

        ! Verify the process is no longer running
        call check(error, .not. is_running(process), "Process should not be running after being killed")
        if (allocated(error)) return

        ! Ensure process state updates correctly after killing
        call check(error, process%completed, "Process should be marked as completed after being killed")
    end subroutine test_process_kill

    !> Test updating and checking process state
    subroutine test_process_state(error)
        type(error_type), allocatable, intent(out) :: error
        type(process_type) :: process
        character(len=*), parameter :: command = "echo Testing"

        process = run(command, want_stdout=.true., want_stderr=.true.)

        call update(process)
        call check(error, process%completed)
        if (allocated(error)) return

        call check(error, process%exit_code == 0, "Check zero exit code")
        if (allocated(error)) return
        
        call check(error, len_trim(process%stderr) == 0, "Check no stderr output")
        if (allocated(error)) return

        call check(error, trim(process%stdout) == "Testing", "stdout=<"//trim(process%stdout)//">, expected <Testing>")
        if (allocated(error)) return
    end subroutine test_process_state

    !> Test input redirection
    subroutine test_input_redirection(error)
        type(error_type), allocatable, intent(out) :: error
        type(process_type) :: process
        character(len=*), parameter :: input_string = "Hello Stdin"
        
        if (is_windows()) then 
             ! findstr "^" echoes input lines. 
             ! Note: We need complex quoting because of how arguments are parsed.
             ! Actually, sticking to something simpler if possible. 
             ! "more" implies paging which might hang. "sort" is usually safe.
             process = run("sort", stdin=input_string, want_stdout=.true.)
        else
             process = run("cat", stdin=input_string, want_stdout=.true.)
        endif

        call check(error, process%completed, "Process did not complete")
        if (allocated(error)) return

        call check(error, process%exit_code == 0, "Process failed with non-zero exit code")
        if (allocated(error)) return

        ! Check if output matches input (sort of "Hello Stdin" is "Hello Stdin")
        call check(error, index(process%stdout, input_string) > 0, &
                   "Output <"//trim(process%stdout)//"> should contain <"//input_string//">")
        
    end subroutine test_input_redirection

end module test_subprocess

program tester
    use, intrinsic :: iso_fortran_env, only : error_unit
    use testdrive, only : run_testsuite, new_testsuite, testsuite_type
    use test_subprocess, only : collect_suite

    implicit none

    integer :: stat, is
    type(testsuite_type), allocatable :: testsuites(:)
    character(len=*), parameter :: fmt = '("#", *(1x, a))'

    stat = 0

    testsuites = [ &
        new_testsuite("subprocess", collect_suite) &
    ]

    do is = 1, size(testsuites)
        write(error_unit, fmt) "Testing:", testsuites(is)%name
        call run_testsuite(testsuites(is)%collect, error_unit, stat)
    end do

    if (stat > 0) then
        write(error_unit, '(i0, 1x, a)') stat, "test(s) failed!"
        error stop
    end if
end program