1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
|
# This file is a part of Julia. License is MIT: https://julialang.org/license
using Test
using Base.Threads
# threading constructs
# parallel loop with parallel atomic addition
function threaded_loop(a, r, x)
@threads for i in r
a[i] = 1 + atomic_add!(x, 1)
end
end
function test_threaded_loop_and_atomic_add()
x = Atomic()
a = zeros(Int,10000)
threaded_loop(a,1:10000,x)
found = zeros(Bool,10000)
was_inorder = true
for i=1:length(a)
was_inorder &= a[i]==i
found[a[i]] = true
end
@test x[] == 10000
# Next test checks that all loop iterations ran,
# and were unique (via pigeon-hole principle).
@test !(false in found)
if was_inorder
println(stderr, "Warning: threaded loop executed in order")
end
end
test_threaded_loop_and_atomic_add()
# Helper for test_threaded_atomic_minmax that verifies sequential consistency.
function check_minmax_consistency(old::Array{T,1}, m::T, start::T, o::Base.Ordering) where T
for v in old
if v != start
# Check that atomic op that installed v reported consistent old value.
@test Base.lt(o, old[v-m+1], v)
end
end
end
function test_threaded_atomic_minmax(m::T,n::T) where T
mid = m + (n-m)>>1
x = Atomic{T}(mid)
y = Atomic{T}(mid)
oldx = Vector{T}(undef, n-m+1)
oldy = Vector{T}(undef, n-m+1)
@threads for i = m:n
oldx[i-m+1] = atomic_min!(x, T(i))
oldy[i-m+1] = atomic_max!(y, T(i))
end
@test x[] == m
@test y[] == n
check_minmax_consistency(oldy,m,mid,Base.Forward)
check_minmax_consistency(oldx,m,mid,Base.Reverse)
end
# The ranges below verify that the correct signed/unsigned comparison is used.
test_threaded_atomic_minmax(Int16(-5000),Int16(5000))
test_threaded_atomic_minmax(UInt16(27000),UInt16(37000))
function threaded_add_locked(::Type{LockT}, x, n) where LockT
critical = LockT()
@threads for i = 1:n
@test lock(critical) === nothing
@test islocked(critical)
x = x + 1
@test unlock(critical) === nothing
end
@test !islocked(critical)
nentered = 0
nfailed = Atomic()
@threads for i = 1:n
if trylock(critical)
@test islocked(critical)
nentered += 1
@test unlock(critical) === nothing
else
atomic_add!(nfailed, 1)
end
end
@test 0 < nentered <= n
@test nentered + nfailed[] == n
@test !islocked(critical)
return x
end
@test threaded_add_locked(SpinLock, 0, 10000) == 10000
@test threaded_add_locked(RecursiveSpinLock, 0, 10000) == 10000
@test threaded_add_locked(Mutex, 0, 10000) == 10000
# Check if the recursive lock can be locked and unlocked correctly.
let critical = RecursiveSpinLock()
@test !islocked(critical)
@test_throws AssertionError unlock(critical)
@test lock(critical) === nothing
@test islocked(critical)
@test lock(critical) === nothing
@test trylock(critical) == true
@test islocked(critical)
@test unlock(critical) === nothing
@test islocked(critical)
@test unlock(critical) === nothing
@test islocked(critical)
@test unlock(critical) === nothing
@test !islocked(critical)
@test_throws AssertionError unlock(critical)
@test trylock(critical) == true
@test islocked(critical)
@test unlock(critical) === nothing
@test !islocked(critical)
@test_throws AssertionError unlock(critical)
@test !islocked(critical)
end
# Make sure doing a GC while holding a lock doesn't cause dead lock
# PR 14190. (This is only meaningful for threading)
function threaded_gc_locked(::Type{LockT}) where LockT
critical = LockT()
@threads for i = 1:20
@test lock(critical) === nothing
@test islocked(critical)
GC.gc(false)
@test unlock(critical) === nothing
end
@test !islocked(critical)
end
threaded_gc_locked(SpinLock)
threaded_gc_locked(Threads.RecursiveSpinLock)
threaded_gc_locked(Mutex)
# Issue 14726
# Make sure that eval'ing in a different module doesn't mess up other threads
orig_curmodule14726 = @__MODULE__
main_var14726 = 1
module M14726
module_var14726 = 1
end
@threads for i in 1:100
for j in 1:100
@eval M14726 module_var14726 = $j
end
end
@test @isdefined(orig_curmodule14726)
@test @isdefined(main_var14726)
@test @__MODULE__() == orig_curmodule14726
@threads for i in 1:100
# Make sure current module is not null.
# The @test might not be particularly meaningful currently since the
# thread infrastructures swallows the error. (Same below)
@test @__MODULE__() == orig_curmodule14726
end
module M14726_2
using Test
using Base.Threads
@threads for i in 1:100
# Make sure current module is the same as the one on the thread that
# pushes the work onto the threads.
# The @test might not be particularly meaningful currently since the
# thread infrastructures swallows the error. (See also above)
@test @__MODULE__() == M14726_2
end
end
# Ensure only LLVM-supported types can be atomic
@test_throws TypeError Atomic{BigInt}
@test_throws TypeError Atomic{ComplexF64}
function test_atomic_bools()
x = Atomic{Bool}(false)
# Arithmetic functions are not defined.
@test_throws MethodError atomic_add!(x, true)
@test_throws MethodError atomic_sub!(x, true)
# All the rest are:
for v in [true, false]
@test x[] == atomic_xchg!(x, v)
@test v == atomic_cas!(x, v, !v)
end
x = Atomic{Bool}(false)
@test false == atomic_max!(x, true); @test x[] == true
x = Atomic{Bool}(true)
@test true == atomic_and!(x, false); @test x[] == false
end
test_atomic_bools()
# Test atomic memory ordering with load/store
mutable struct CommBuf
var1::Atomic{Int}
var2::Atomic{Int}
correct_write::Bool
correct_read::Bool
CommBuf() = new(Atomic{Int}(0), Atomic{Int}(0), false, false)
end
function test_atomic_write(commbuf::CommBuf, n::Int)
for i in 1:n
# The atomic stores guarantee that var1 >= var2
commbuf.var1[] = i
commbuf.var2[] = i
end
commbuf.correct_write = true
end
function test_atomic_read(commbuf::CommBuf, n::Int)
correct = true
while true
# load var2 before var1
var2 = commbuf.var2[]
var1 = commbuf.var1[]
correct &= var1 >= var2
var1 == n && break
# Temporary solution before we have gc transition support in codegen.
ccall(:jl_gc_safepoint, Cvoid, ())
end
commbuf.correct_read = correct
end
function test_atomic()
commbuf = CommBuf()
count = 1_000_000
@threads for i in 1:2
if i==1
test_atomic_write(commbuf, count)
else
test_atomic_read(commbuf, count)
end
end
@test commbuf.correct_write == true
@test commbuf.correct_read == true
end
test_atomic()
# Test ordering with fences using Peterson's algorithm
# Example adapted from <https://en.wikipedia.org/wiki/Peterson%27s_algorithm>
mutable struct Peterson
# State for Peterson's algorithm
flag::Vector{Atomic{Int}}
turn::Atomic{Int}
# Collision detection
critical::Vector{Atomic{Int}}
correct::Vector{Bool}
Peterson() =
new([Atomic{Int}(0), Atomic{Int}(0)],
Atomic{Int}(0),
[Atomic{Int}(0), Atomic{Int}(0)],
[false, false])
end
function test_fence(p::Peterson, id::Int, n::Int)
@assert id == mod1(id,2)
correct = true
otherid = mod1(id+1,2)
for i in 1:n
p.flag[id][] = 1
p.turn[] = otherid
atomic_fence()
while p.flag[otherid][] != 0 && p.turn[] == otherid
# busy wait
# Temporary solution before we have gc transition support in codegen.
ccall(:jl_gc_safepoint, Cvoid, ())
end
# critical section
p.critical[id][] = 1
correct &= p.critical[otherid][] == 0
p.critical[id][] = 0
# end of critical section
p.flag[id][] = 0
end
p.correct[id] = correct
end
function test_fence()
commbuf = Peterson()
count = 1_000_000
@threads for i in 1:2
test_fence(commbuf, i, count)
end
@test commbuf.correct[1] == true
@test commbuf.correct[2] == true
end
test_fence()
# Test load / store with various types
let atomic_types = [Int8, Int16, Int32, Int64, Int128,
UInt8, UInt16, UInt32, UInt64, UInt128,
Float16, Float32, Float64]
# Temporarily omit 128-bit types on 32bit x86
# 128-bit atomics do not exist on AArch32.
# And we don't support them yet on power, because they are lowered
# to `__sync_lock_test_and_set_16`.
if Sys.ARCH === :i686 || startswith(string(Sys.ARCH), "arm") ||
Sys.ARCH === :powerpc64le || Sys.ARCH === :ppc64le
filter!(T -> sizeof(T)<=8, atomic_types)
end
for T in atomic_types
var = Atomic{T}()
var[] = 42
@test var[] === T(42)
old = atomic_xchg!(var, T(13))
@test old === T(42)
@test var[] === T(13)
old = atomic_cas!(var, T(13), T(14)) # this will succeed
@test old === T(13)
@test var[] === T(14)
old = atomic_cas!(var, T(13), T(15)) # this will fail
@test old === T(14)
@test var[] === T(14)
end
end
# Test atomic_cas! and atomic_xchg!
function test_atomic_cas!(var::Atomic{T}, range::StepRange{Int,Int}) where T
for i in range
while true
old = atomic_cas!(var, T(i-1), T(i))
old == T(i-1) && break
# Temporary solution before we have gc transition support in codegen.
ccall(:jl_gc_safepoint, Cvoid, ())
end
end
end
for T in (Int32, Int64, Float32, Float64)
var = Atomic{T}()
nloops = 1000
di = nthreads()
@threads for i in 1:di
test_atomic_cas!(var, i:di:nloops)
end
@test var[] === T(nloops)
end
function test_atomic_xchg!(var::Atomic{T}, i::Int, accum::Atomic{Int}) where T
old = atomic_xchg!(var, T(i))
atomic_add!(accum, Int(old))
end
for T in (Int32, Int64, Float32, Float64)
accum = Atomic{Int}()
var = Atomic{T}()
nloops = 1000
@threads for i in 1:nloops
test_atomic_xchg!(var, i, accum)
end
@test accum[] + Int(var[]) === sum(0:nloops)
end
function test_atomic_float(varadd::Atomic{T}, varmax::Atomic{T}, varmin::Atomic{T}, i::Int) where T
atomic_add!(varadd, T(i))
atomic_max!(varmax, T(i))
atomic_min!(varmin, T(i))
end
for T in (Int32, Int64, Float32, Float64)
varadd = Atomic{T}()
varmax = Atomic{T}()
varmin = Atomic{T}()
nloops = 1000
@threads for i in 1:nloops
test_atomic_float(varadd, varmax, varmin, i)
end
@test varadd[] === T(sum(1:nloops))
@test varmax[] === T(maximum(1:nloops))
@test varmin[] === T(0)
end
using Dates
for period in (0.06, Dates.Millisecond(60))
let async = Base.AsyncCondition(), t
c = Condition()
task = schedule(Task(function()
notify(c)
wait(c)
t = Timer(period)
wait(t)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
wait(c)
sleep(period)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
end))
wait(c)
notify(c)
delay1 = @elapsed wait(async)
notify(c)
delay2 = @elapsed wait(async)
@test istaskdone(task)
@test delay1 > 0.05
@test delay2 > 0.05
@test isopen(async)
@test !isopen(t)
close(t)
close(async)
@test_throws EOFError wait(async)
@test !isopen(async)
@test_throws EOFError wait(t)
@test_throws EOFError wait(async)
end
end
function test_thread_cfunction()
# ensure a runtime call to `get_trampoline` will be created
# TODO: get_trampoline is not thread-safe (as this test shows)
function complex_cfunction(a)
s = zero(eltype(a))
@inbounds @simd for i in a
s += muladd(a[i], a[i], -2)
end
return s
end
fs = [ let a = zeros(10)
() -> complex_cfunction(a)
end for i in 1:1000 ]
@noinline cf(f) = @cfunction $f Float64 ()
cfs = Vector{Base.CFunction}(undef, length(fs))
cf1 = cf(fs[1])
@threads for i in 1:1000
cfs[i] = cf(fs[i])
end
@test cfs[1] == cf1
@test cfs[2] == cf(fs[2])
@test length(unique(cfs)) == 1000
ok = zeros(Int, nthreads())
@threads for i in 1:10000
i = mod1(i, 1000)
fi = fs[i]
cfi = cf(fi)
GC.@preserve cfi begin
ok[threadid()] += (cfi === cfs[i])
end
end
@test sum(ok) == 10000
end
test_thread_cfunction()
# Compare the two ways of checking if threading is enabled.
# `jl_tls_states` should only be defined on non-threading build.
if ccall(:jl_threading_enabled, Cint, ()) == 0
@test nthreads() == 1
cglobal(:jl_tls_states) != C_NULL
else
@test_throws ErrorException cglobal(:jl_tls_states)
end
function test_thread_range()
a = zeros(Int, nthreads())
@threads for i in 1:threadid()
a[i] = 1
end
for i in 1:threadid()
@test a[i] == 1
end
for i in (threadid() + 1):nthreads()
@test a[i] == 0
end
end
test_thread_range()
# Thread safety of `jl_load_and_lookup`.
function test_load_and_lookup_18020(n)
@threads for i in 1:n
try
ccall(:jl_load_and_lookup,
Ptr{Cvoid}, (Cstring, Cstring, Ref{Ptr{Cvoid}}),
"$i", :f, C_NULL)
catch
end
end
end
test_load_and_lookup_18020(10000)
# Nested threaded loops
# This may not be efficient/fully supported but should work without crashing.....
function test_nested_loops()
a = zeros(Int, 100, 100)
@threads for i in 1:100
@threads for j in 1:100
a[j, i] = i + j
end
end
for i in 1:100
for j in 1:100
@test a[j, i] == i + j
end
end
end
test_nested_loops()
function test_thread_too_few_iters()
x = Atomic()
a = zeros(Int, nthreads()+2)
threaded_loop(a, 1:nthreads()-1, x)
found = zeros(Bool, nthreads()+2)
for i=1:nthreads()-1
found[a[i]] = true
end
@test x[] == nthreads()-1
# Next test checks that all loop iterations ran,
# and were unique (via pigeon-hole principle).
@test !(false in found[1:nthreads()-1])
@test !(true in found[nthreads():end])
end
test_thread_too_few_iters()
|