File: Scheduler.cs

package info (click to toggle)
mono 6.14.1%2Bds2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,282,732 kB
  • sloc: cs: 11,182,461; xml: 2,850,281; ansic: 699,123; cpp: 122,919; perl: 58,604; javascript: 30,841; asm: 21,845; makefile: 19,602; sh: 10,973; python: 4,772; pascal: 925; sql: 859; sed: 16; php: 1
file content (378 lines) | stat: -rw-r--r-- 14,894 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
using System;
using System.Globalization;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Transactions;
using System.Workflow.ComponentModel;

namespace System.Workflow.Runtime
{
    #region Scheduler

    // Only one instance of this type is used for a workflow instance.
    //
    class Scheduler
    {
        #region data

        // state to be persisted for the scheduler
        internal static DependencyProperty HighPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("HighPriorityEntriesQueue", typeof(Queue<SchedulableItem>), typeof(Scheduler));
        internal static DependencyProperty NormalPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("NormalPriorityEntriesQueue", typeof(Queue<SchedulableItem>), typeof(Scheduler));
        Queue<SchedulableItem> highPriorityEntriesQueue;
        Queue<SchedulableItem> normalPriorityEntriesQueue;

        // non-persisted state for the scheduler
        WorkflowExecutor rootWorkflowExecutor;
        bool empty;
        bool canRun;
        bool threadRequested;
        bool abortOrTerminateRequested;
        Queue<SchedulableItem> transactedEntries;
        object syncObject = new object();

        #endregion data

        #region ctors

        // loading with some state
        public Scheduler(WorkflowExecutor rootExec, bool canRun)
        {
            this.rootWorkflowExecutor = rootExec;
            this.threadRequested = false;

            // canRun is true if normal creation
            // false if loading from a persisted state. Will be set to true later at ResumeOnIdle
            this.canRun = canRun;

            this.highPriorityEntriesQueue = (Queue<SchedulableItem>)rootExec.RootActivity.GetValue(Scheduler.HighPriorityEntriesQueueProperty);
            this.normalPriorityEntriesQueue = (Queue<SchedulableItem>)rootExec.RootActivity.GetValue(Scheduler.NormalPriorityEntriesQueueProperty);
            if (this.highPriorityEntriesQueue == null)
            {
                this.highPriorityEntriesQueue = new Queue<SchedulableItem>();
                rootExec.RootActivity.SetValue(Scheduler.HighPriorityEntriesQueueProperty, this.highPriorityEntriesQueue);
            }
            if (this.normalPriorityEntriesQueue == null)
            {
                this.normalPriorityEntriesQueue = new Queue<SchedulableItem>();
                rootExec.RootActivity.SetValue(Scheduler.NormalPriorityEntriesQueueProperty, this.normalPriorityEntriesQueue);
            }

            this.empty = ((this.normalPriorityEntriesQueue.Count == 0) && (this.highPriorityEntriesQueue.Count == 0));
        }

        #endregion ctors

        #region Misc properties

        public override string ToString()
        {
            return "Scheduler('" + ((Activity)this.RootWorkflowExecutor.WorkflowDefinition).QualifiedName + "')";
        }

        protected WorkflowExecutor RootWorkflowExecutor
        {
            get { return this.rootWorkflowExecutor; }
        }

        public bool IsStalledNow
        {
            get
            {
                return empty;
            }
        }

        public bool CanRun
        {
            get
            {
                return canRun;
            }

            set
            {
                canRun = value;
            }
        }

        internal bool AbortOrTerminateRequested
        {
            get
            {
                return abortOrTerminateRequested;
            }
            set
            {
                abortOrTerminateRequested = value;
            }
        }

        #endregion Misc properties

        #region Run work

        public void Run()
        {
            do
            {
                this.RootWorkflowExecutor.ProcessQueuedEvents();
                // Get item to run
                SchedulableItem item = GetItemToRun();
                bool runningItem = false;

                // no ready work to run... go away
                if (item == null)
                    break;

                Activity itemActivity = null;
                Exception exp = null;

                TransactionalProperties transactionalProperties = null;
                int contextId = item.ContextId;

                // This function gets the root or enclosing while-loop activity
                Activity contextActivity = this.RootWorkflowExecutor.GetContextActivityForId(contextId);
                if (contextActivity == null)
                    throw new InvalidOperationException(ExecutionStringManager.InvalidExecutionContext);

                // This is the activity corresponding to the item's ActivityId
                itemActivity = contextActivity.GetActivityByName(item.ActivityId);
                using (new ServiceEnvironment(itemActivity))
                {
                    exp = null;
                    bool ignoreFinallyBlock = false;

                    try
                    {
                        // item preamble 
                        // set up the item transactional context if necessary
                        //
                        Debug.Assert(itemActivity != null, "null itemActivity");
                        if (itemActivity == null)
                            throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.InvalidActivityName, item.ActivityId));

                        Activity atomicActivity = null;
                        if (this.RootWorkflowExecutor.IsActivityInAtomicContext(itemActivity, out atomicActivity))
                        {
                            transactionalProperties = (TransactionalProperties)atomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty);
                            // If we've aborted for any reason stop now!
                            // If we attempt to enter a new TransactionScope the com+ context will get corrupted
                            // See windows se bug 137267
                            if (!WorkflowExecutor.CheckAndProcessTransactionAborted(transactionalProperties))
                            {
                                if (transactionalProperties.TransactionScope == null)
                                {
                                    // Use TimeSpan.Zero so scope will not create timeout independent of the transaction
                                    // Use EnterpriseServicesInteropOption.Full to flow transaction to COM+
                                    transactionalProperties.TransactionScope =
                                        new TransactionScope(transactionalProperties.Transaction, TimeSpan.Zero, EnterpriseServicesInteropOption.Full);

                                    WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0,
                                        "Workflow Runtime: Scheduler: instanceId: " + this.RootWorkflowExecutor.InstanceIdString +
                                        "Entered into TransactionScope, Current atomic acitivity " + atomicActivity.Name);
                                }
                            }
                        }

                        // Run the item
                        //
                        runningItem = true;
                        WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString());

                        // running any entry implicitly changes some state of the workflow instance                    
                        this.RootWorkflowExecutor.stateChangedSincePersistence = true;

                        item.Run(this.RootWorkflowExecutor);
                    }
                    catch (Exception e)
                    {
                        if (WorkflowExecutor.IsIrrecoverableException(e))
                        {
                            ignoreFinallyBlock = true;
                            throw;
                        }
                        else
                        {
                            if (transactionalProperties != null)
                                transactionalProperties.TransactionState = TransactionProcessState.AbortProcessed;
                            exp = e;
                        }
                    }
                    finally
                    {
                        if (!ignoreFinallyBlock)
                        {
                            if (runningItem)
                                WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Done with running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString());

                            // Process exception
                            //
                            if (exp != null)
                            {
                                // 
                                this.RootWorkflowExecutor.ExceptionOccured(exp, itemActivity == null ? contextActivity : itemActivity, null);
                                exp = null;
                            }
                        }
                    }
                }
            } while (true);
        }

        private SchedulableItem GetItemToRun()
        {
            SchedulableItem ret = null;

            lock (this.syncObject)
            {
                bool workToDo = false;
                if ((this.highPriorityEntriesQueue.Count > 0) || (this.normalPriorityEntriesQueue.Count > 0))
                {
                    workToDo = true;

                    // If an abort or termination of the workflow has been requested,
                    // then the workflow should try to terminate ASAP. Even transaction scopes
                    // in progress shouldn't be executed to completion. (Ref: 16534)
                    if (this.AbortOrTerminateRequested)
                    {
                        ret = null;
                    }
                    // got work to do in the scheduler
                    else if ((this.highPriorityEntriesQueue.Count > 0))
                    {
                        ret = this.highPriorityEntriesQueue.Dequeue();
                    }
                    else if (this.CanRun)
                    {
                        // the scheduler can run right now
                        //

                        // pick an entry to run
                        //
                        if (((IWorkflowCoreRuntime)this.RootWorkflowExecutor).CurrentAtomicActivity == null &&
                            (this.normalPriorityEntriesQueue.Count > 0))
                            ret = this.normalPriorityEntriesQueue.Dequeue();
                    }
                    else
                    {
                        // scheduler can't run right now.. even though there is ready work
                        // do nothing in the scheduler
                        ret = null;
                    }
                }

                if (!workToDo)
                {
                    // no ready work to do in the scheduler...
                    // we are gonna return the thread back
                    this.empty = true;
                }

                // set it to true only iff there is something to run
                this.threadRequested = (ret != null);
            }
            return ret;
        }

        // This method should be called only after we have determined that
        // this instance can start running now
        public void Resume()
        {
            canRun = true;

            if (!empty)
            {
                // There is scheduled work
                // ask the threadprovider for a thread
                this.RootWorkflowExecutor.ScheduleForWork();
            }
        }

        // This method should be called only after we have determined that
        // this instance can start running now
        public void ResumeIfRunnable()
        {
            if (!canRun)
                return;

            if (!empty)
            {
                // There is scheduled work
                // ask the threadprovider for a thread
                this.RootWorkflowExecutor.ScheduleForWork();
            }
        }
        #endregion Run work

        #region Schedule work

        public void ScheduleItem(SchedulableItem s, bool isInAtomicTransaction, bool transacted)
        {
            lock (this.syncObject)
            {
                WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Scheduling entry: {1}", this.RootWorkflowExecutor.InstanceIdString, s.ToString());
                // SchedulableItems in AtomicTransaction has higher priority
                Queue<SchedulableItem> q = isInAtomicTransaction ? this.highPriorityEntriesQueue : this.normalPriorityEntriesQueue;
                q.Enqueue(s);

                if (transacted)
                {
                    if (transactedEntries == null)
                        transactedEntries = new Queue<SchedulableItem>();
                    transactedEntries.Enqueue(s);
                }

                if (!this.threadRequested)
                {
                    if (this.CanRun)
                    {
                        this.RootWorkflowExecutor.ScheduleForWork();
                        this.threadRequested = true;
                    }
                }
                this.empty = false;
            }
        }

        #endregion Schedule work

        #region psuedo-transacted support

        public void PostPersist()
        {
            transactedEntries = null;
        }

        public void Rollback()
        {
            if (transactedEntries != null && transactedEntries.Count > 0)
            {
                // make a list of non-transacted entries
                // @undone: bmalhi: transacted entries only on priority-0

                IEnumerator<SchedulableItem> e = this.normalPriorityEntriesQueue.GetEnumerator();
                Queue<SchedulableItem> newScheduled = new Queue<SchedulableItem>();
                while (e.MoveNext())
                {
                    if (!transactedEntries.Contains(e.Current))
                        newScheduled.Enqueue(e.Current);
                }

                // clear the scheduled items
                this.normalPriorityEntriesQueue.Clear();

                // schedule the non-transacted items back
                e = newScheduled.GetEnumerator();
                while (e.MoveNext())
                    this.normalPriorityEntriesQueue.Enqueue(e.Current);

                transactedEntries = null;
            }
        }

        #endregion psuedo-transacted support
    }

    #endregion Scheduler
}