1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
using System;
using System.Collections.Generic;
using System.Collections;
using System.Diagnostics;
using System.Reflection;
using System.Text;
using System.IO;
using System.IO.Compression;
using System.Transactions;
using System.Workflow.ComponentModel;
namespace System.Workflow.Runtime
{
internal sealed class WorkflowStateRollbackService
{
WorkflowExecutor workflowExecutor;
// cache the revert back data
MemoryStream clonedInstanceStateStream;
Activity workflowDefinition = null;
bool isInstanceStateRevertRequested = false;
// revert back notification info
string activityQualifiedName;
int activityContextId;
EventArgs callbackData;
EventHandler<EventArgs> callbackHandler;
bool suspendOnRevert;
string suspendOnRevertInfo;
Hashtable completedContextActivities = new Hashtable();
public WorkflowStateRollbackService(WorkflowExecutor workflowExecutor)
{
this.workflowExecutor = workflowExecutor;
}
internal bool IsInstanceStateRevertRequested
{
get { return this.isInstanceStateRevertRequested; }
}
internal void CheckpointInstanceState()
{
Debug.Assert(this.workflowExecutor.InstanceId != null, "instance id null at checkpoint time");
// serialize the instance state
this.clonedInstanceStateStream = new MemoryStream(10240);
this.workflowExecutor.RootActivity.Save(this.clonedInstanceStateStream);
this.workflowDefinition = this.workflowExecutor.WorkflowDefinition;
this.completedContextActivities = (Hashtable)this.workflowExecutor.CompletedContextActivities.Clone();
this.clonedInstanceStateStream.Position = 0;
}
internal void RequestRevertToCheckpointState(Activity currentActivity, EventHandler<EventArgs> callbackHandler, EventArgs callbackData, bool suspendOnRevert, string suspendInfo)
{
if (this.clonedInstanceStateStream == null)
throw new InvalidOperationException(ExecutionStringManager.InvalidRevertRequest);
// cache the after revert information
this.activityContextId = ContextActivityUtils.ContextId(ContextActivityUtils.ContextActivity(currentActivity));
this.activityQualifiedName = currentActivity.QualifiedName;
this.callbackData = callbackData;
this.callbackHandler = callbackHandler;
this.suspendOnRevert = suspendOnRevert;
this.suspendOnRevertInfo = suspendInfo;
// ask scheduler to stop
this.isInstanceStateRevertRequested = true;
this.workflowExecutor.Scheduler.CanRun = false;
}
internal void DisposeCheckpointState()
{
this.clonedInstanceStateStream = null;
}
internal void RevertToCheckpointState()
{
Debug.Assert(this.clonedInstanceStateStream != null, "cloned instance-state stream null at restore time");
// deserialize only on first access
Activity clonedRootActivity = null;
this.clonedInstanceStateStream.Position = 0;
using (RuntimeEnvironment runtimeEnv = new RuntimeEnvironment(this.workflowExecutor.WorkflowRuntime))
{
clonedRootActivity = Activity.Load(this.clonedInstanceStateStream, (Activity)this.workflowDefinition);
}
Debug.Assert(clonedRootActivity != null);
//
// Set the trackingListenerBroker before initializing the executor so the tracking
// runtime gets a reference to the correct object
clonedRootActivity.SetValue(WorkflowExecutor.TrackingListenerBrokerProperty, workflowExecutor.RootActivity.GetValue(WorkflowExecutor.TrackingListenerBrokerProperty));
// create the new workflowExecutor
WorkflowExecutor newWorkflowExecutor = new WorkflowExecutor(Guid.Empty); // use a dummy guid while swapping executors
newWorkflowExecutor.Initialize(clonedRootActivity, this.workflowExecutor.WorkflowRuntime, this.workflowExecutor);
// enqueue the activity notifier
Activity activityContext = newWorkflowExecutor.GetContextActivityForId(this.activityContextId);
Activity activity = activityContext.GetActivityByName(this.activityQualifiedName);
using (new ServiceEnvironment(activity))
{
using (newWorkflowExecutor.SetCurrentActivity(activity))
{
using (ActivityExecutionContext executionContext = new ActivityExecutionContext(activity))
executionContext.Invoke<EventArgs>(this.callbackHandler, this.callbackData);
}
}
//
// Push the batch item ordering id to the new instance
newWorkflowExecutor.BatchCollection.WorkItemOrderId = this.workflowExecutor.BatchCollection.WorkItemOrderId;
// replace pending batch items
foreach (KeyValuePair<object, WorkBatch> batch in this.workflowExecutor.BatchCollection)
{
batch.Value.SetWorkBatchCollection(newWorkflowExecutor.BatchCollection);
Activity oldActivity = batch.Key as Activity;
// no need to add the transient state batch
if (oldActivity != null)
{
Activity newactivity = activityContext.GetActivityByName(oldActivity.QualifiedName);
newWorkflowExecutor.BatchCollection.Add(newactivity, batch.Value);
}
}
this.workflowExecutor.BatchCollection.Clear();
Debug.Assert(this.completedContextActivities != null);
newWorkflowExecutor.CompletedContextActivities = this.completedContextActivities;
// replace with the WorkflowRuntime
Debug.Assert(this.workflowExecutor.IsInstanceValid);
this.workflowExecutor.WorkflowRuntime.ReplaceWorkflowExecutor(this.workflowExecutor.InstanceId, this.workflowExecutor, newWorkflowExecutor);
// now resume or suspend the scheduler as needed
if (!this.suspendOnRevert)
{
// get the new one going
newWorkflowExecutor.Scheduler.Resume();
}
else
{
// this call will be old scheduler's thread
newWorkflowExecutor.SuspendOnIdle(this.suspendOnRevertInfo);
}
DisposeCheckpointState();
}
}
}
|