File: AsyncOperations.fs

package info (click to toggle)
fsharp 3.1.1.26%2Bdfsg2-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 59,244 kB
  • ctags: 4,190
  • sloc: cs: 13,398; ml: 1,098; sh: 399; makefile: 293; xml: 82
file content (167 lines) | stat: -rwxr-xr-x 7,161 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// (c) Microsoft Corporation 2005-2009. 
namespace Microsoft.FSharp.Control

    open System
    open System.Threading
    open Microsoft.FSharp.Control

    /// Represents the reified result of an asynchronous computation
    [<NoEquality; NoComparison>]
    type AsyncResult<'T>  =
        |   AsyncOk of 'T
        |   AsyncException of exn
        |   AsyncCanceled of OperationCanceledException

        static member Commit(res:AsyncResult<'T>) = 
            Async.FromContinuations (fun (cont,econt,ccont) -> 
                   match res with 
                   | AsyncOk v -> cont v 
                   | AsyncException exn -> econt exn 
                   | AsyncCanceled exn -> ccont exn)

    /// When using .NET 4.0 you can replace this type by Task<'T>
    [<Sealed>]
    type AsyncResultCell<'T>() =
        let mutable result = None
        // The continuation for the result, if any
        let mutable savedConts = []
        
        let syncRoot = new obj()
                

        // Record the result in the AsyncResultCell.
        // Ignore subsequent sets of the result. This can happen, e.g. for a race between 
        // a cancellation and a success.
        member x.RegisterResult (res:AsyncResult<'T>,?reuseThread) =
            let grabbedConts = 
                lock syncRoot (fun () ->
                    if result.IsSome then  
                        []
                    else
                        result <- Some res;
                        // Invoke continuations in FIFO order 
                        // Continuations that Async.FromContinuations provide do QUWI/SynchContext.Post, 
                        // so the order is not overly relevant but still.                        
                        List.rev savedConts)
            // Run continuations outside the lock
            let reuseThread = defaultArg reuseThread false
            match grabbedConts with
            |   [] -> ()
            |   [cont] when reuseThread -> cont res
            |   otherwise ->
#if FX_NO_SYNC_CONTEXT
                    let postOrQueue cont = ThreadPool.QueueUserWorkItem(fun _ -> cont res) |> ignore
#else
                    let synchContext = System.Threading.SynchronizationContext.Current
                    let postOrQueue =
                        match synchContext with
                        |   null -> fun cont -> ThreadPool.QueueUserWorkItem(fun _ -> cont res) |> ignore
                        |   sc -> fun cont -> sc.Post((fun _ -> cont res), state=null)
#endif                        
                    grabbedConts |> List.iter postOrQueue

        /// Get the reified result 
        member private x.AsyncPrimitiveResult =
            Async.FromContinuations(fun (cont,_,_) -> 
                let grabbedResult = 
                    lock syncRoot (fun () ->
                        match result with
                        | Some res -> 
                            result
                        | None ->
                            // Otherwise save the continuation and call it in RegisterResult
                            savedConts <- cont::savedConts
                            None)
                // Run the action outside the lock
                match grabbedResult with 
                | None -> ()
                | Some res -> cont res) 
                           

        /// Get the result and commit it
        member x.AsyncResult =
            async { let! res = x.AsyncPrimitiveResult
                    return! AsyncResult.Commit(res) }


    [<AutoOpen>]
    module FileExtensions =

        let UnblockViaNewThread f =
            async { do! Async.SwitchToNewThread ()
                    let res = f()
                    do! Async.SwitchToThreadPool ()
                    return res }


        type System.IO.File with
            static member AsyncOpenText(path)   = UnblockViaNewThread (fun () -> System.IO.File.OpenText(path))
            static member AsyncAppendText(path) = UnblockViaNewThread (fun () -> System.IO.File.AppendText(path))
            static member AsyncOpenRead(path)   = UnblockViaNewThread (fun () -> System.IO.File.OpenRead(path))
            static member AsyncOpenWrite(path)  = UnblockViaNewThread (fun () -> System.IO.File.OpenWrite(path))
#if FX_NO_FILE_OPTIONS
            static member AsyncOpen(path,mode,?access,?share,?bufferSize) =
#else
            static member AsyncOpen(path,mode,?access,?share,?bufferSize,?options) =
#endif
                let access = match access with Some v -> v | None -> System.IO.FileAccess.ReadWrite
                let share = match share with Some v -> v | None -> System.IO.FileShare.None
#if FX_NO_FILE_OPTIONS
#else
                let options = match options with Some v -> v | None -> System.IO.FileOptions.None
#endif
                let bufferSize = match bufferSize with Some v -> v | None -> 0x1000
                UnblockViaNewThread (fun () -> 
#if FX_NO_FILE_OPTIONS
                    new System.IO.FileStream(path,mode,access,share,bufferSize))
#else
                    new System.IO.FileStream(path,mode,access,share,bufferSize, options))
#endif

            static member OpenTextAsync(path)   = System.IO.File.AsyncOpenText(path)
            static member AppendTextAsync(path) = System.IO.File.AsyncAppendText(path)
            static member OpenReadAsync(path)   = System.IO.File.AsyncOpenRead(path)
            static member OpenWriteAsync(path)  = System.IO.File.AsyncOpenWrite(path)
#if FX_NO_FILE_OPTIONS
            static member OpenAsync(path,mode,?access,?share,?bufferSize) = 
                System.IO.File.AsyncOpen(path, mode, ?access=access, ?share=share,?bufferSize=bufferSize)
#else
            static member OpenAsync(path,mode,?access,?share,?bufferSize,?options) = 
                System.IO.File.AsyncOpen(path, mode, ?access=access, ?share=share,?bufferSize=bufferSize,?options=options)
#endif

    [<AutoOpen>]
    module StreamReaderExtensions =
        type System.IO.StreamReader with

            member s.AsyncReadToEnd () = FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())
            member s.ReadToEndAsync () = s.AsyncReadToEnd ()

#if FX_NO_WEB_REQUESTS
#else
    [<AutoOpen>]
    module WebRequestExtensions =
        open System
        open System.Net
        open Microsoft.FSharp.Control.WebExtensions

        let callFSharpCoreAsyncGetResponse (req: System.Net.WebRequest) = req.AsyncGetResponse()
        
        type System.Net.WebRequest with
            member req.AsyncGetResponse() = callFSharpCoreAsyncGetResponse req // this calls the FSharp.Core method
            member req.GetResponseAsync() = callFSharpCoreAsyncGetResponse req // this calls the FSharp.Core method
#endif
     
#if FX_NO_WEB_CLIENT
#else
    [<AutoOpen>]
    module WebClientExtensions =
        open System.Net
        open Microsoft.FSharp.Control.WebExtensions
        
        let callFSharpCoreAsyncDownloadString (req: System.Net.WebClient) address = req.AsyncDownloadString address

        type WebClient with
            member this.AsyncDownloadString address = callFSharpCoreAsyncDownloadString this address
#endif