File: WorkflowStateRollbackService.cs

package info (click to toggle)
mono 6.14.1%2Bds2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,282,732 kB
  • sloc: cs: 11,182,461; xml: 2,850,281; ansic: 699,123; cpp: 122,919; perl: 58,604; javascript: 30,841; asm: 21,845; makefile: 19,602; sh: 10,973; python: 4,772; pascal: 925; sql: 859; sed: 16; php: 1
file content (148 lines) | stat: -rw-r--r-- 6,669 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
using System;
using System.Collections.Generic;
using System.Collections;
using System.Diagnostics;
using System.Reflection;
using System.Text;
using System.IO;
using System.IO.Compression;
using System.Transactions;
using System.Workflow.ComponentModel;

namespace System.Workflow.Runtime
{
    internal sealed class WorkflowStateRollbackService
    {
        WorkflowExecutor workflowExecutor;

        // cache the revert back data
        MemoryStream clonedInstanceStateStream;
        Activity workflowDefinition = null;
        bool isInstanceStateRevertRequested = false;

        // revert back notification info
        string activityQualifiedName;
        int activityContextId;
        EventArgs callbackData;
        EventHandler<EventArgs> callbackHandler;
        bool suspendOnRevert;
        string suspendOnRevertInfo;

        Hashtable completedContextActivities = new Hashtable();

        public WorkflowStateRollbackService(WorkflowExecutor workflowExecutor)
        {
            this.workflowExecutor = workflowExecutor;
        }

        internal bool IsInstanceStateRevertRequested
        {
            get { return this.isInstanceStateRevertRequested; }
        }

        internal void CheckpointInstanceState()
        {
            Debug.Assert(this.workflowExecutor.InstanceId != null, "instance id null at checkpoint time");

            // serialize the instance state
            this.clonedInstanceStateStream = new MemoryStream(10240);
            this.workflowExecutor.RootActivity.Save(this.clonedInstanceStateStream);
            this.workflowDefinition = this.workflowExecutor.WorkflowDefinition;
            this.completedContextActivities = (Hashtable)this.workflowExecutor.CompletedContextActivities.Clone();
            this.clonedInstanceStateStream.Position = 0;
        }

        internal void RequestRevertToCheckpointState(Activity currentActivity, EventHandler<EventArgs> callbackHandler, EventArgs callbackData, bool suspendOnRevert, string suspendInfo)
        {
            if (this.clonedInstanceStateStream == null)
                throw new InvalidOperationException(ExecutionStringManager.InvalidRevertRequest);

            // cache the after revert information
            this.activityContextId = ContextActivityUtils.ContextId(ContextActivityUtils.ContextActivity(currentActivity));
            this.activityQualifiedName = currentActivity.QualifiedName;
            this.callbackData = callbackData;
            this.callbackHandler = callbackHandler;
            this.suspendOnRevert = suspendOnRevert;
            this.suspendOnRevertInfo = suspendInfo;

            // ask scheduler to stop
            this.isInstanceStateRevertRequested = true;
            this.workflowExecutor.Scheduler.CanRun = false;
        }

        internal void DisposeCheckpointState()
        {
            this.clonedInstanceStateStream = null;
        }

        internal void RevertToCheckpointState()
        {
            Debug.Assert(this.clonedInstanceStateStream != null, "cloned instance-state stream null at restore time");

            // deserialize only on first access
            Activity clonedRootActivity = null;
            this.clonedInstanceStateStream.Position = 0;
            using (RuntimeEnvironment runtimeEnv = new RuntimeEnvironment(this.workflowExecutor.WorkflowRuntime))
            {
                clonedRootActivity = Activity.Load(this.clonedInstanceStateStream, (Activity)this.workflowDefinition);
            }
            Debug.Assert(clonedRootActivity != null);
            //
            // Set the trackingListenerBroker before initializing the executor so the tracking
            // runtime gets a reference to the correct object
            clonedRootActivity.SetValue(WorkflowExecutor.TrackingListenerBrokerProperty, workflowExecutor.RootActivity.GetValue(WorkflowExecutor.TrackingListenerBrokerProperty));

            // create the new workflowExecutor
            WorkflowExecutor newWorkflowExecutor = new WorkflowExecutor(Guid.Empty);    // use a dummy guid while swapping executors
            newWorkflowExecutor.Initialize(clonedRootActivity, this.workflowExecutor.WorkflowRuntime, this.workflowExecutor);

            // enqueue the activity notifier
            Activity activityContext = newWorkflowExecutor.GetContextActivityForId(this.activityContextId);
            Activity activity = activityContext.GetActivityByName(this.activityQualifiedName);
            using (new ServiceEnvironment(activity))
            {
                using (newWorkflowExecutor.SetCurrentActivity(activity))
                {
                    using (ActivityExecutionContext executionContext = new ActivityExecutionContext(activity))
                        executionContext.Invoke<EventArgs>(this.callbackHandler, this.callbackData);
                }
            }
            //
            // Push the batch item ordering id to the new instance
            newWorkflowExecutor.BatchCollection.WorkItemOrderId = this.workflowExecutor.BatchCollection.WorkItemOrderId;
            // replace pending batch items
            foreach (KeyValuePair<object, WorkBatch> batch in this.workflowExecutor.BatchCollection)
            {
                batch.Value.SetWorkBatchCollection(newWorkflowExecutor.BatchCollection);
                Activity oldActivity = batch.Key as Activity;
                // no need to add the transient state batch
                if (oldActivity != null)
                {
                    Activity newactivity = activityContext.GetActivityByName(oldActivity.QualifiedName);
                    newWorkflowExecutor.BatchCollection.Add(newactivity, batch.Value);
                }
            }
            this.workflowExecutor.BatchCollection.Clear();

            Debug.Assert(this.completedContextActivities != null);
            newWorkflowExecutor.CompletedContextActivities = this.completedContextActivities;

            // replace with the WorkflowRuntime
            Debug.Assert(this.workflowExecutor.IsInstanceValid);
            this.workflowExecutor.WorkflowRuntime.ReplaceWorkflowExecutor(this.workflowExecutor.InstanceId, this.workflowExecutor, newWorkflowExecutor);

            // now resume or suspend the scheduler as needed
            if (!this.suspendOnRevert)
            {
                // get the new one going
                newWorkflowExecutor.Scheduler.Resume();
            }
            else
            {
                // this call will be old scheduler's thread
                newWorkflowExecutor.SuspendOnIdle(this.suspendOnRevertInfo);
            }
            DisposeCheckpointState();
        }
    }
}