File: CorrelationTokenInvalidatedHandler.cs

package info (click to toggle)
mono 6.12.0.199%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,296,836 kB
  • sloc: cs: 11,181,803; xml: 2,850,076; ansic: 699,709; cpp: 123,344; perl: 59,361; javascript: 30,841; asm: 21,853; makefile: 20,405; sh: 15,009; python: 4,839; pascal: 925; sql: 859; sed: 16; php: 1
file content (133 lines) | stat: -rw-r--r-- 5,326 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Collections;
using System.Reflection;
using System.Runtime.Serialization;
using System.Workflow.ComponentModel;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;

namespace System.Workflow.Activities
{
    [Serializable]
    internal sealed class CorrelationTokenInvalidatedHandler : IActivityEventListener<CorrelationTokenEventArgs>
    {
        IActivityEventListener<QueueEventArgs> eventHandler;
        EventQueueName queueName;
        Guid subscriptionId;
        Guid instanceId;

        bool queueCreator;
        Type interfaceType;
        string followerOperation;

        internal CorrelationTokenInvalidatedHandler(Type interfaceType, string operation, IActivityEventListener<QueueEventArgs> eventHandler, Guid instanceId)
        {
            this.eventHandler = eventHandler;
            this.interfaceType = interfaceType;
            this.followerOperation = operation;
            this.instanceId = instanceId;
        }

        #region IActivityEventListener<CorrelationTokenEventArgs> Members
        void IActivityEventListener<CorrelationTokenEventArgs>.OnEvent(object sender, CorrelationTokenEventArgs dataChangeEventArgs)
        {
            if (sender == null)
                throw new ArgumentException("sender");
            if (dataChangeEventArgs == null)
                throw new ArgumentException("dataChangeEventArgs");

            ActivityExecutionContext context = sender as ActivityExecutionContext;
            Activity activity = context.Activity;

            ICollection<CorrelationProperty> correlationValues = dataChangeEventArgs.CorrelationToken.Properties;
            if (dataChangeEventArgs.IsInitializing)
            {
                CreateSubscription(this.instanceId, context, correlationValues);
                return;
            }

            if (queueName != null)
            {
                if (!CorrelationResolver.IsInitializingMember(queueName.InterfaceType, queueName.MethodName,
                    correlationValues == null ? null : new object[] { correlationValues }))
                {
                    DeleteSubscription(context);
                }
            }

            dataChangeEventArgs.CorrelationToken.UnsubscribeFromCorrelationTokenInitializedEvent(activity, this);
        }
        #endregion

        private void CreateSubscription(Guid instanceId, ActivityExecutionContext context, ICollection<CorrelationProperty> correlationValues)
        {
            WorkflowQueuingService queueSvcs = context.GetService<WorkflowQueuingService>();
            EventQueueName queueId = new EventQueueName(this.interfaceType, this.followerOperation, correlationValues);

            WorkflowQueue workflowQueue = null;
            if (!queueSvcs.Exists(queueId))
            {
                WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "CorrelationTokenInvalidatedHandler: creating q {0} ", queueId.GetHashCode());
                workflowQueue = queueSvcs.CreateWorkflowQueue(queueId, true);
                queueCreator = true;
            }
            else
            {
                workflowQueue = queueSvcs.GetWorkflowQueue(queueId);
            }

            if (this.eventHandler != null)
            {
                workflowQueue.RegisterForQueueItemAvailable(this.eventHandler);
            }

            WorkflowSubscriptionService subscriptionService = (WorkflowSubscriptionService)context.GetService(typeof(WorkflowSubscriptionService));

            MessageEventSubscription subscription = new MessageEventSubscription(queueId, instanceId);
            this.queueName = queueId;
            this.subscriptionId = subscription.SubscriptionId;
            subscription.InterfaceType = this.interfaceType;
            subscription.MethodName = this.followerOperation;

            this.interfaceType = null;
            this.followerOperation = null;

            if (correlationValues != null)
            {
                foreach (CorrelationProperty property in correlationValues)
                {
                    subscription.CorrelationProperties.Add(property);
                }
            }

            if (this.eventHandler != null)
                return;

            if (subscriptionService == null)
                return;
            subscriptionService.CreateSubscription(subscription);
        }

        private void DeleteSubscription(ActivityExecutionContext context)
        {
            if (this.queueName == null)
                return;

            WorkflowQueuingService queueSvcs = context.GetService<WorkflowQueuingService>();
            if (queueCreator)
                queueSvcs.DeleteWorkflowQueue(this.queueName);

            if (this.eventHandler != null)
                return;

            WorkflowSubscriptionService subscriptionService = context.GetService<WorkflowSubscriptionService>();
            if (subscriptionService != null)
                subscriptionService.DeleteSubscription(this.subscriptionId);

            WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "CorrelationTokenInvalidatedHandler subscription deleted SubId {0} QueueId {1}", this.subscriptionId, this.queueName);
        }

    }
}