File: DirectoryEntries.swift

package info (click to toggle)
swiftlang 6.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,791,532 kB
  • sloc: cpp: 9,901,743; ansic: 2,201,431; asm: 1,091,827; python: 308,252; objc: 82,166; f90: 80,126; lisp: 38,358; pascal: 25,559; sh: 20,429; ml: 5,058; perl: 4,745; makefile: 4,484; awk: 3,535; javascript: 3,018; xml: 918; fortran: 664; cs: 573; ruby: 396
file content (659 lines) | stat: -rw-r--r-- 22,922 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android)
import CNIODarwin
import CNIOLinux
import NIOConcurrencyHelpers
import NIOPosix
@preconcurrency import SystemPackage

/// An `AsyncSequence` of entries in a directory.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct DirectoryEntries: AsyncSequence {
    public typealias AsyncIterator = DirectoryIterator
    public typealias Element = DirectoryEntry

    /// The underlying sequence.
    private let batchedSequence: Batched

    /// Creates a new ``DirectoryEntries`` sequence.
    internal init(handle: SystemFileHandle, recursive: Bool) {
        self.batchedSequence = Batched(handle: handle, recursive: recursive)
    }

    /// Creates a ``DirectoryEntries`` sequence by wrapping an `AsyncSequence` of _batches_ of
    /// directory entries.
    public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Batched.Element {
        self.batchedSequence = Batched(wrapping: sequence)
    }

    public func makeAsyncIterator() -> DirectoryIterator {
        return DirectoryIterator(iterator: self.batchedSequence.makeAsyncIterator())
    }

    /// Returns a sequence of directory entry batches.
    ///
    /// The batched sequence has its element type as `Array<DirectoryEntry>` rather
    /// than `DirectoryEntry`. This can enable better performance by reducing the number of
    /// executor hops.
    public func batched() -> Batched {
        return self.batchedSequence
    }

    /// An `AsyncIteratorProtocol` of `DirectoryEntry`.
    public struct DirectoryIterator: AsyncIteratorProtocol {
        /// The batched iterator to consume from.
        private var iterator: Batched.AsyncIterator
        /// A slice of the current batch being iterated.
        private var currentBatch: ArraySlice<DirectoryEntry>

        init(iterator: Batched.AsyncIterator) {
            self.iterator = iterator
            self.currentBatch = []
        }

        public mutating func next() async throws -> DirectoryEntry? {
            if self.currentBatch.isEmpty {
                let batch = try await self.iterator.next()
                self.currentBatch = (batch ?? [])[...]
            }

            return self.currentBatch.popFirst()
        }
    }
}

@available(*, unavailable)
extension DirectoryEntries.AsyncIterator: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension DirectoryEntries {
    /// An `AsyncSequence` of batches of directory entries.
    ///
    /// The ``Batched`` sequence uses `Array<DirectoryEntry>` as its element type rather
    /// than `DirectoryEntry`. This can enable better performance by reducing the number of
    /// executor hops at the cost of ease-of-use.
    public struct Batched: AsyncSequence {
        public typealias AsyncIterator = BatchedIterator
        public typealias Element = [DirectoryEntry]

        private let stream: BufferedOrAnyStream<[DirectoryEntry]>

        /// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence`
        /// of directory entry batches.
        public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
            self.stream = BufferedOrAnyStream(wrapping: sequence)
        }

        fileprivate init(handle: SystemFileHandle, recursive: Bool) {
            // Expanding the batches yields watermarks of 256 and 512 directory entries.
            let stream = BufferedStream.makeBatchedDirectoryEntryStream(
                handle: handle,
                recursive: recursive,
                entriesPerBatch: 64,
                lowWatermark: 4,
                highWatermark: 8
            )

            self.stream = BufferedOrAnyStream(wrapping: stream)
        }

        public func makeAsyncIterator() -> BatchedIterator {
            return BatchedIterator(wrapping: self.stream.makeAsyncIterator())
        }

        /// An `AsyncIteratorProtocol` of `Array<DirectoryEntry>`.
        public struct BatchedIterator: AsyncIteratorProtocol {
            private var iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator

            init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator) {
                self.iterator = iterator
            }

            public mutating func next() async throws -> [DirectoryEntry]? {
                try await self.iterator.next()
            }
        }
    }
}

@available(*, unavailable)
extension DirectoryEntries.Batched.AsyncIterator: Sendable {}

// MARK: - Internal

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension BufferedStream where Element == [DirectoryEntry] {
    fileprivate static func makeBatchedDirectoryEntryStream(
        handle: SystemFileHandle,
        recursive: Bool,
        entriesPerBatch: Int,
        lowWatermark: Int,
        highWatermark: Int
    ) -> BufferedStream<[DirectoryEntry]> {
        let state = DirectoryEnumerator(handle: handle, recursive: recursive)
        let protectedState = NIOLockedValueBox(state)

        var (stream, source) = BufferedStream.makeStream(
            of: [DirectoryEntry].self,
            backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark)
        )

        source.onTermination = {
            guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
                return
            }

            threadPool.submit { _ in  // always run, even if cancelled
                protectedState.withLockedValue { state in
                    state.closeIfNecessary()
                }
            }
        }

        let producer = DirectoryEntryProducer(
            state: protectedState,
            source: source,
            entriesPerBatch: entriesPerBatch
        )
        // Start producing immediately.
        producer.produceMore()

        return stream
    }
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private struct DirectoryEntryProducer {
    let state: NIOLockedValueBox<DirectoryEnumerator>
    let source: BufferedStream<[DirectoryEntry]>.Source
    let entriesPerBatch: Int

    /// The 'entry point' for producing elements.
    ///
    /// Calling this function will start producing directory entries asynchronously by dispatching
    /// work to the IO executor and feeding the result back to the stream source. On yielding to the
    /// source it will either produce more or be scheduled to produce more. Stopping production
    /// is signalled via the stream's 'onTermination' handler.
    func produceMore() {
        let threadPool = self.state.withLockedValue { state in
            state.produceMore()
        }

        // No thread pool means we're done.
        guard let threadPool = threadPool else { return }

        threadPool.submit {
            let result: Result<[DirectoryEntry], Error>
            switch $0 {
            case .active:
                result = Result { try self.nextBatch() }
            case .cancelled:
                result = .failure(CancellationError())
            }
            self.onNextBatchResult(result)
        }
    }

    private func nextBatch() throws -> [DirectoryEntry] {
        return try self.state.withLockedValue { state in
            try state.next(self.entriesPerBatch)
        }
    }

    private func onNextBatchResult(_ result: Result<[DirectoryEntry], Error>) {
        switch result {
        case let .success(entries):
            self.onNextBatch(entries)
        case let .failure(error):
            // Failed to read more entries: close and notify the stream so consumers receive the
            // error.
            self.close()
            self.source.finish(throwing: error)
        }
    }

    private func onNextBatch(_ entries: [DirectoryEntry]) {
        // No entries were read: this must be the end (as the batch size must be greater than zero).
        if entries.isEmpty {
            self.source.finish(throwing: nil)
            return
        }

        // Reading short means reading EOF. The enumerator closes itself in that case.
        let readEOF = entries.count < self.entriesPerBatch

        // Entries were produced: yield them and maybe produce more.
        do {
            let writeResult = try self.source.write(contentsOf: CollectionOfOne(entries))
            // Exit early if EOF was read; no use in trying to produce more.
            if readEOF {
                self.source.finish(throwing: nil)
                return
            }

            switch writeResult {
            case .produceMore:
                self.produceMore()
            case let .enqueueCallback(token):
                self.source.enqueueCallback(callbackToken: token) {
                    switch $0 {
                    case .success:
                        self.produceMore()
                    case .failure:
                        self.close()
                    }
                }
            }
        } catch {
            // Failure to write means the source is already done, that's okay we just need to
            // update our state and stop producing.
            self.close()
        }
    }

    private func close() {
        guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
            return
        }

        threadPool.submit { _ in  // always run, even if cancelled
            self.state.withLockedValue { state in
                state.closeIfNecessary()
            }
        }
    }
}

/// Enumerates a directory in batches.
///
/// Note that this is not a `Sequence` because we allow for errors to be thrown on `next()`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private struct DirectoryEnumerator: Sendable {
    private enum State: @unchecked Sendable {
        case modifying
        case idle(SystemFileHandle.SendableView, recursive: Bool)
        case open(NIOThreadPool, Source, [DirectoryEntry])
        case done
    }

    /// The source of directory entries.
    private enum Source {
        case readdir(CInterop.DirPointer)
        case fts(CInterop.FTSPointer)
    }

    /// The current state of enumeration.
    private var state: State

    /// The path to the directory being enumerated.
    private let path: FilePath

    /// Information about an entry returned by FTS. See 'fts(3)'.
    private enum FTSInfo: Hashable, Sendable {
        case directoryPreOrder
        case directoryCausingCycle
        case ftsDefault
        case directoryUnreadable
        case dotFile
        case directoryPostOrder
        case error
        case regularFile
        case noStatInfoAvailable
        case noStatInfoRequested
        case symbolicLink
        case symbolicLinkToNonExistentTarget

        init?(rawValue: UInt16) {
            switch Int32(rawValue) {
            case FTS_D:
                self = .directoryPreOrder
            case FTS_DC:
                self = .directoryCausingCycle
            case FTS_DEFAULT:
                self = .ftsDefault
            case FTS_DNR:
                self = .directoryUnreadable
            case FTS_DOT:
                self = .dotFile
            case FTS_DP:
                self = .directoryPostOrder
            case FTS_ERR:
                self = .error
            case FTS_F:
                self = .regularFile
            case FTS_NS:
                self = .noStatInfoAvailable
            case FTS_NSOK:
                self = .noStatInfoRequested
            case FTS_SL:
                self = .symbolicLink
            case FTS_SLNONE:
                self = .symbolicLinkToNonExistentTarget
            default:
                return nil
            }
        }
    }

    internal init(handle: SystemFileHandle, recursive: Bool) {
        self.state = .idle(handle.sendableView, recursive: recursive)
        self.path = handle.path
    }

    internal func produceMore() -> NIOThreadPool? {
        switch self.state {
        case let .idle(handle, _):
            return handle.threadPool
        case let .open(threadPool, _, _):
            return threadPool
        case .done:
            return nil
        case .modifying:
            fatalError()
        }
    }

    internal func threadPoolForClosing() -> NIOThreadPool? {
        switch self.state {
        case let .open(threadPool, _, _):
            return threadPool
        case .idle, .done:
            // Don't need to close in the idle state: we don't own the handle.
            return nil
        case .modifying:
            fatalError()
        }
    }

    /// Returns the next batch of directory entries.
    internal mutating func next(_ count: Int) throws -> [DirectoryEntry] {
        while true {
            switch self.process(count) {
            case let .yield(result):
                return try result.get()
            case .continue:
                ()
            }
        }
    }

    /// Closes the descriptor, if necessary.
    internal mutating func closeIfNecessary() {
        switch self.state {
        case .idle:
            // We don't own the handle so don't close it.
            self.state = .done

        case let .open(_, mode, _):
            self.state = .done
            switch mode {
            case .readdir(let dir):
                _ = Libc.closedir(dir)
            case .fts(let fts):
                _ = Libc.ftsClose(fts)
            }

        case .done:
            ()

        case .modifying:
            fatalError()
        }
    }

    private enum ProcessResult {
        case yield(Result<[DirectoryEntry], FileSystemError>)
        case `continue`
    }

    private mutating func makeReaddirSource(
        _ handle: SystemFileHandle.SendableView
    ) -> Result<Source, FileSystemError> {
        return handle._duplicate().mapError { dupError in
            FileSystemError(
                message: "Unable to open directory stream for '\(handle.path)'.",
                wrapping: dupError
            )
        }.flatMap { descriptor in
            // We own the descriptor and cede ownership if 'opendir' succeeds; if it doesn't we need
            // to close it.
            descriptor.opendir().mapError { errno in
                // Close the descriptor on error.
                try? descriptor.close()
                return FileSystemError.fdopendir(errno: errno, path: handle.path, location: .here())
            }
        }.map {
            .readdir($0)
        }
    }

    private mutating func makeFTSSource(
        _ handle: SystemFileHandle.SendableView
    ) -> Result<Source, FileSystemError> {
        return Libc.ftsOpen(handle.path, options: [.noChangeDir, .physical]).mapError { errno in
            FileSystemError.open("fts_open", error: errno, path: handle.path, location: .here())
        }.map {
            .fts($0)
        }
    }

    private mutating func processOpenState(
        threadPool: NIOThreadPool,
        dir: CInterop.DirPointer,
        entries: inout [DirectoryEntry],
        count: Int
    ) -> (State, ProcessResult) {
        entries.removeAll(keepingCapacity: true)
        entries.reserveCapacity(count)

        while entries.count < count {
            switch Libc.readdir(dir) {
            case let .success(.some(entry)):
                // Skip "." and ".." (and empty paths)
                if self.isThisOrParentDirectory(entry.pointee) {
                    continue
                }

                let fileType = FileType(direntType: entry.pointee.d_type)
                let name: FilePath.Component
                #if canImport(Darwin)
                // Safe to force unwrap: may be nil if empty, a root, or more than one component.
                // Empty is checked for above, root can't exist within a directory, and directory
                // items must be a single path component.
                name = FilePath.Component(platformString: CNIODarwin_dirent_dname(entry))!
                #else
                name = FilePath.Component(platformString: CNIOLinux_dirent_dname(entry))!
                #endif

                let fullPath = self.path.appending(name)
                // '!' is okay here: the init returns nil if there is an empty path which we know
                // isn't the case as 'self.path' is non-empty.
                entries.append(DirectoryEntry(path: fullPath, type: fileType)!)

            case .success(.none):
                // Nothing we can do on failure so ignore the result.
                _ = Libc.closedir(dir)
                return (.done, .yield(.success(entries)))

            case let .failure(errno):
                // Nothing we can do on failure so ignore the result.
                _ = Libc.closedir(dir)
                let error = FileSystemError.readdir(
                    errno: errno,
                    path: self.path,
                    location: .here()
                )
                return (.done, .yield(.failure(error)))
            }
        }

        // We must have hit our 'count' limit.
        return (.open(threadPool, .readdir(dir), entries), .yield(.success(entries)))
    }

    private mutating func processOpenState(
        threadPool: NIOThreadPool,
        fts: CInterop.FTSPointer,
        entries: inout [DirectoryEntry],
        count: Int
    ) -> (State, ProcessResult) {
        entries.removeAll(keepingCapacity: true)
        entries.reserveCapacity(count)

        while entries.count < count {
            switch Libc.ftsRead(fts) {
            case .success(.some(let entry)):
                let info = FTSInfo(rawValue: entry.pointee.fts_info)
                switch info {
                case .directoryPreOrder:
                    let entry = DirectoryEntry(path: entry.path, type: .directory)!
                    entries.append(entry)

                case .directoryPostOrder:
                    ()  // Don't visit directories twice.

                case .regularFile:
                    let entry = DirectoryEntry(path: entry.path, type: .regular)!
                    entries.append(entry)

                case .symbolicLink, .symbolicLinkToNonExistentTarget:
                    let entry = DirectoryEntry(path: entry.path, type: .symlink)!
                    entries.append(entry)

                case .ftsDefault:
                    // File type is unknown.
                    let entry = DirectoryEntry(path: entry.path, type: .unknown)!
                    entries.append(entry)

                case .error:
                    let errno = Errno(rawValue: entry.pointee.fts_errno)
                    let error = FileSystemError(
                        code: .unknown,
                        message: "Can't read file system tree.",
                        cause: FileSystemError.SystemCallError(systemCall: "fts_read", errno: errno),
                        location: .here()
                    )
                    _ = Libc.ftsClose(fts)
                    return (.done, .yield(.failure(error)))

                case .directoryCausingCycle:
                    ()  // Cycle found, ignore it and continue.
                case .directoryUnreadable:
                    ()  // Can't read directory, ignore it and continue iterating.
                case .dotFile:
                    ()  // Ignore "." and ".."
                case .noStatInfoAvailable:
                    ()  // No stat info available so we can't list the entry, ignore it.
                case .noStatInfoRequested:
                    ()  // Shouldn't happen.

                case nil:
                    ()  // Unknown, ignore.
                }

            case .success(.none):
                // No entries left to iterate.
                _ = Libc.ftsClose(fts)
                return (.done, .yield(.success(entries)))

            case .failure(let errno):
                // Nothing we can do on failure so ignore the result.
                _ = Libc.ftsClose(fts)
                let error = FileSystemError.ftsRead(
                    errno: errno,
                    path: self.path,
                    location: .here()
                )
                return (.done, .yield(.failure(error)))
            }
        }

        // We must have hit our 'count' limit.
        return (.open(threadPool, .fts(fts), entries), .yield(.success(entries)))
    }

    private mutating func process(_ count: Int) -> ProcessResult {
        switch self.state {
        case let .idle(handle, recursive):
            let result: Result<Source, FileSystemError>

            if recursive {
                result = self.makeFTSSource(handle)
            } else {
                result = self.makeReaddirSource(handle)
            }

            switch result {
            case let .success(source):
                self.state = .open(handle.threadPool, source, [])
                return .continue

            case let .failure(error):
                self.state = .done
                return .yield(.failure(error))
            }

        case .open(let threadPool, let mode, var entries):
            self.state = .modifying

            switch mode {
            case .readdir(let dir):
                let (state, result) = self.processOpenState(
                    threadPool: threadPool,
                    dir: dir,
                    entries: &entries,
                    count: count
                )
                self.state = state
                return result

            case .fts(let fts):
                let (state, result) = self.processOpenState(
                    threadPool: threadPool,
                    fts: fts,
                    entries: &entries,
                    count: count
                )
                self.state = state
                return result
            }

        case .done:
            return .yield(.success([]))

        case .modifying:
            fatalError()
        }
    }

    private func isThisOrParentDirectory(_ entry: CInterop.DirEnt) -> Bool {
        let dot = CChar(bitPattern: UInt8(ascii: "."))
        switch (entry.d_name.0, entry.d_name.1, entry.d_name.2) {
        case (0, _, _), (dot, 0, _), (dot, dot, 0):
            return true
        default:
            return false
        }
    }
}

extension UnsafeMutablePointer<CInterop.FTSEnt> {
    fileprivate var path: FilePath {
        return FilePath(platformString: self.pointee.fts_path!)
    }
}

#endif