1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
module test_subprocess
use testdrive, only : new_unittest, unittest_type, error_type, check, skip_test
use stdlib_system, only: process_type, run, runasync, is_running, wait, update, elapsed, is_windows, kill
implicit none
contains
!> Collect all exported unit tests
subroutine collect_suite(testsuite)
!> Collection of tests
type(unittest_type), allocatable, intent(out) :: testsuite(:)
testsuite = [ &
new_unittest('test_run_synchronous', test_run_synchronous), &
new_unittest('test_run_asynchronous', test_run_asynchronous), &
new_unittest('test_process_kill', test_process_kill), &
new_unittest('test_process_state', test_process_state), &
new_unittest('test_input_redirection', test_input_redirection) &
]
end subroutine collect_suite
!> Test running a synchronous process
subroutine test_run_synchronous(error)
type(error_type), allocatable, intent(out) :: error
type(process_type) :: process
character(len=*), parameter :: command = "echo Hello"
process = run(command, want_stdout=.true.)
call check(error, process%completed)
if (allocated(error)) return
call check(error, trim(process%stdout) == "Hello", "stdout=<"//trim(process%stdout)//">, expected <Hello>")
end subroutine test_run_synchronous
!> Test running an asynchronous process
subroutine test_run_asynchronous(error)
type(error_type), allocatable, intent(out) :: error
type(process_type) :: process
logical :: running
! The closest possible to a cross-platform command that waits
if (is_windows()) then
process = runasync("ping -n 2 127.0.0.1")
else
process = runasync("ping -c 2 127.0.0.1")
endif
! Should not be immediately completed
call check(error, .not. process%completed, "ping process should not complete immediately")
if (allocated(error)) return
running = is_running(process)
call check(error, running, "ping process should still be running immediately after started")
if (allocated(error)) return
call wait(process)
call check(error, process%completed, "process should be complete after `call wait`")
if (allocated(error)) return
call check(error, elapsed(process)>1.0e-4, "There should be a non-zero elapsed time")
end subroutine test_run_asynchronous
!> Test killing an asynchronous process
subroutine test_process_kill(error)
type(error_type), allocatable, intent(out) :: error
type(process_type) :: process
logical :: running, success
! Start a long-running process asynchronously
if (is_windows()) then
process = runasync("ping -n 10 127.0.0.1")
else
process = runasync("ping -c 10 127.0.0.1")
endif
! Ensure the process starts running
call check(error, .not. process%completed, "Process should not be completed immediately after starting")
if (allocated(error)) return
running = is_running(process)
call check(error, running, "Process should be running immediately after starting")
if (allocated(error)) return
! Kill the process
call kill(process, success)
call check(error, success, "Failed to kill the process")
if (allocated(error)) return
! Verify the process is no longer running
call check(error, .not. is_running(process), "Process should not be running after being killed")
if (allocated(error)) return
! Ensure process state updates correctly after killing
call check(error, process%completed, "Process should be marked as completed after being killed")
end subroutine test_process_kill
!> Test updating and checking process state
subroutine test_process_state(error)
type(error_type), allocatable, intent(out) :: error
type(process_type) :: process
character(len=*), parameter :: command = "echo Testing"
process = run(command, want_stdout=.true., want_stderr=.true.)
call update(process)
call check(error, process%completed)
if (allocated(error)) return
call check(error, process%exit_code == 0, "Check zero exit code")
if (allocated(error)) return
call check(error, len_trim(process%stderr) == 0, "Check no stderr output")
if (allocated(error)) return
call check(error, trim(process%stdout) == "Testing", "stdout=<"//trim(process%stdout)//">, expected <Testing>")
if (allocated(error)) return
end subroutine test_process_state
!> Test input redirection
subroutine test_input_redirection(error)
type(error_type), allocatable, intent(out) :: error
type(process_type) :: process
character(len=*), parameter :: input_string = "Hello Stdin"
if (is_windows()) then
! findstr "^" echoes input lines.
! Note: We need complex quoting because of how arguments are parsed.
! Actually, sticking to something simpler if possible.
! "more" implies paging which might hang. "sort" is usually safe.
process = run("sort", stdin=input_string, want_stdout=.true.)
else
process = run("cat", stdin=input_string, want_stdout=.true.)
endif
call check(error, process%completed, "Process did not complete")
if (allocated(error)) return
call check(error, process%exit_code == 0, "Process failed with non-zero exit code")
if (allocated(error)) return
! Check if output matches input (sort of "Hello Stdin" is "Hello Stdin")
call check(error, index(process%stdout, input_string) > 0, &
"Output <"//trim(process%stdout)//"> should contain <"//input_string//">")
end subroutine test_input_redirection
end module test_subprocess
program tester
use, intrinsic :: iso_fortran_env, only : error_unit
use testdrive, only : run_testsuite, new_testsuite, testsuite_type
use test_subprocess, only : collect_suite
implicit none
integer :: stat, is
type(testsuite_type), allocatable :: testsuites(:)
character(len=*), parameter :: fmt = '("#", *(1x, a))'
stat = 0
testsuites = [ &
new_testsuite("subprocess", collect_suite) &
]
do is = 1, size(testsuites)
write(error_unit, fmt) "Testing:", testsuites(is)%name
call run_testsuite(testsuites(is)%collect, error_unit, stat)
end do
if (stat > 0) then
write(error_unit, '(i0, 1x, a)') stat, "test(s) failed!"
error stop
end if
end program
|