File: CorrelationService.cs

package info (click to toggle)
mono 6.12.0.199%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,296,836 kB
  • sloc: cs: 11,181,803; xml: 2,850,076; ansic: 699,709; cpp: 123,344; perl: 59,361; javascript: 30,841; asm: 21,853; makefile: 20,405; sh: 15,009; python: 4,839; pascal: 925; sql: 859; sed: 16; php: 1
file content (308 lines) | stat: -rw-r--r-- 16,348 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#region Using directives

using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Collections;
using System.Reflection;
using System.Runtime.Serialization;
using System.Workflow.ComponentModel;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using System.Runtime.Remoting.Messaging;
using System.Xml;
using System.Globalization;

#endregion

namespace System.Workflow.Activities
{
    internal interface ICorrelationProvider
    {
        ICollection<CorrelationProperty> ResolveCorrelationPropertyValues(Type interfaceType, string memberName, object[] methodArgs, bool provideInitializerTokens);
        bool IsInitializingMember(Type interfaceType, string memberName, object[] methodArgs);
    }

    [AttributeUsageAttribute(AttributeTargets.Interface | AttributeTargets.Class, AllowMultiple = false, Inherited = true)]
    internal sealed class CorrelationProviderAttribute : Attribute
    {
        private Type correlationProviderType;

        internal CorrelationProviderAttribute(Type correlationProviderType)
        {
            this.correlationProviderType = correlationProviderType;
        }

        internal Type CorrelationProviderType
        {
            get
            {
                return this.correlationProviderType;
            }
        }
    }

    internal static class CorrelationService
    {
        internal static void Initialize(IServiceProvider context, Activity activity, Type interfaceType, string methodName, Guid instanceId)
        {
            if (activity == null)
                throw new ArgumentNullException("activity");
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType");
            if (methodName == null)
                throw new ArgumentNullException("methodName");

            Subscribe(context, activity, interfaceType, methodName, null, instanceId);
            InitializeFollowers(context, interfaceType, methodName);
        }

        internal static bool Subscribe(IServiceProvider context, Activity activity, Type interfaceType, string methodName, IActivityEventListener<QueueEventArgs> eventListener, Guid instanceId)
        {
            if (activity == null)
                throw new ArgumentNullException("activity");
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType");
            if (methodName == null)
                throw new ArgumentNullException("methodName");

            WorkflowQueuingService queueService = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService));
            IComparable queueName = ResolveQueueName(activity, interfaceType, methodName);
            if (queueName != null)
            {
                // initializer
                WorkflowQueue queue = null;
                if (queueService.Exists(queueName))
                {
                    queue = queueService.GetWorkflowQueue(queueName);
                    queue.Enabled = true;
                }
                else
                {
                    queue = queueService.CreateWorkflowQueue(queueName, true);
                }

                if (eventListener != null)
                {
                    queue.RegisterForQueueItemAvailable(eventListener, activity.QualifiedName);
                    WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "CorrelationService: activity '{0}' subscribing to QueueItemAvailable", activity.QualifiedName);
                    return true;
                }
                return false;
            }

            SubscribeForCorrelationTokenInvalidation(activity, interfaceType, methodName, eventListener, instanceId);
            return false;
        }

        internal static bool Unsubscribe(IServiceProvider context, Activity activity, Type interfaceType, string methodName, IActivityEventListener<QueueEventArgs> eventListener)
        {
            if (activity == null)
                throw new ArgumentException("activity");
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType");
            if (methodName == null)
                throw new ArgumentNullException("methodName");

            WorkflowQueuingService queueService = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService));
            IComparable queueName = ResolveQueueName(activity, interfaceType, methodName);
            if (queueName != null)
            {
                if (queueService.Exists(queueName))
                {
                    queueService.GetWorkflowQueue(queueName).UnregisterForQueueItemAvailable(eventListener);
                    return true;
                }
            }
            return false;
        }

        internal static IComparable ResolveQueueName(Activity activity, Type interfaceType, string methodName)
        {
            if (activity == null)
                throw new ArgumentNullException("activity");
            if (interfaceType == null)
                throw new ArgumentNullException("interfaceType");
            if (methodName == null)
                throw new ArgumentNullException("methodName");

            // resolver will check for an explicit correlation provider, 
            // if none present this will return an uncorrelated provider.
            // note, an uncorrelated methodName will always be an initializer
            if (CorrelationResolver.IsInitializingMember(interfaceType, methodName, null))
            {
                ICollection<CorrelationProperty> corrvalues = CorrelationResolver.ResolveCorrelationValues(interfaceType, methodName, null, true);
                return new EventQueueName(interfaceType, methodName, corrvalues);
            }

            CorrelationToken reference = GetCorrelationToken(activity);
            if (!reference.Initialized)
                return null;

            return new EventQueueName(interfaceType, methodName, reference.Properties);
        }

        internal static void InvalidateCorrelationToken(Activity activity, Type interfaceType, string methodName, object[] messageArgs)
        {
            object correlationProvider = CorrelationResolver.GetCorrelationProvider(interfaceType);
            if (correlationProvider is NonCorrelatedProvider)
                return;

            CorrelationToken reference = GetCorrelationToken(activity);
            ICollection<CorrelationProperty> correlationvalues = CorrelationResolver.ResolveCorrelationValues(interfaceType, methodName, messageArgs, false);

            if (!CorrelationResolver.IsInitializingMember(interfaceType, methodName, messageArgs))
            {
                if (!reference.Initialized)
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationNotInitialized, reference.Name, activity.QualifiedName));
                ValidateCorrelation(reference.Properties, correlationvalues, reference.Name, activity);
                return;
            }

            // invalidate correlation token if methodName is an initializer
            reference.Initialize(activity, correlationvalues);
        }

        private static CorrelationToken GetCorrelationToken(Activity activity)
        {
            DependencyProperty dependencyProperty = DependencyProperty.FromName("CorrelationToken", activity.GetType());
            if (dependencyProperty == null)
                dependencyProperty = DependencyProperty.FromName("CorrelationToken", activity.GetType().BaseType);
            CorrelationToken reference = activity.GetValue(dependencyProperty) as CorrelationToken;
            if (reference == null)
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationTokenMissing, activity.Name));

            CorrelationToken correlator = CorrelationTokenCollection.GetCorrelationToken(activity, reference.Name, reference.OwnerActivityName);
            if (correlator == null)
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationTokenMissing, activity.Name));

            return correlator;
        }

        private static void ValidateCorrelation(ICollection<CorrelationProperty> initializerProperties, ICollection<CorrelationProperty> followerProperties, string memberName, Activity activity)
        {
            if (followerProperties == null && initializerProperties == null)
                return;

            if (followerProperties == null || initializerProperties == null)
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));

            if (initializerProperties.Count != followerProperties.Count)
                throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));

            IEnumerator<CorrelationProperty> initializerValues = initializerProperties.GetEnumerator();
            IEnumerator<CorrelationProperty> followerValues = followerProperties.GetEnumerator();
            while (initializerValues.MoveNext() && followerValues.MoveNext())
            {
                IComparable initializerValue = initializerValues.Current.Value as IComparable;
                object followerValue = followerValues.Current.Value;

                // Bug DevDiv2\DevDiv 552322 - http://vstfdevdiv.redmond.corp.microsoft.com:8080/DevDiv2/DevDiv/_workitems#_a=edit&id=552322
                // Reflection does not guarantee ordering, so the two collections (arrays) of
                // CorrelationProperties may not be in the same order, based on Name. So we need to check all the
                // elements of the followerProperties for the Name of the current initializerValue.
                // The collections MIGHT be in the same order. Before searching the followerProperties collection for an element with a matching
                // name, see if the current element in the initializerValues and followerValues enumerators have a matching name.
                // If they do match, just fall thru because followerValue is already set to followerValues.Current.Value;
                if (!initializerValues.Current.Name.Equals(followerValues.Current.Name, StringComparison.OrdinalIgnoreCase))
                {
                    CorrelationProperty followerProperty = null;
                    IEnumerator<CorrelationProperty> followerEnumerator = followerProperties.GetEnumerator();
                    while (followerEnumerator.MoveNext())
                    {
                        // We don't need to be concerned with culture here because the names we are comparing
                        // are parameter names on methods in an interface.
                        if (initializerValues.Current.Name.Equals(followerEnumerator.Current.Name, StringComparison.OrdinalIgnoreCase))
                        {
                            // We found a matching Name in the follower collection.
                            // Saving the followerProperty rather than followerEnumerator.Current.Value here
                            // because the latter could be null and still be correct. I need something
                            // that indicates that we actually found a matching CorrelationProperty in the
                            // collection. So instead of having a separate bool, I just have a reference
                            // to the matching CorrelationProperty.
                            followerProperty = followerEnumerator.Current;
                            break;
                        }
                        // If we get here, the name of the parameter doesn't match, so just move to the next element in the 
                        // followerEnumerator.
                    }
                    // If we found a followerProperty with a matching name, use it.
                    // In the highly, possibly impossible, event that we didn't find an element in the
                    // followerProperties collection with a matching name, we fall thru with
                    // followerValue = followerValues.Current.Value, which is exactly what the previous
                    // code had, and we act just like we did before.
                    if (followerProperty != null)
                    {
                        followerValue = followerProperty.Value;
                    }
                }

                if (initializerValue != null && (initializerValue.CompareTo(followerValue) != 0))
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));
                else if (initializerValues.Current.Value == null && followerValue == null)
                    return;
                else if (initializerValue == null && followerValue != null && !followerValue.Equals(initializerValues.Current.Value))
                    throw new InvalidOperationException(SR.GetString(SR.Error_CorrelationViolationException, memberName, activity.QualifiedName));
            }
        }

        private static void SubscribeForCorrelationTokenInvalidation(Activity activity, Type interfaceType, string followermethodName, IActivityEventListener<QueueEventArgs> eventListener, Guid instanceId)
        {
            CorrelationToken reference = GetCorrelationToken(activity);
            CorrelationTokenInvalidatedHandler dataChangeEventListener = new CorrelationTokenInvalidatedHandler(interfaceType, followermethodName, eventListener, instanceId);
            reference.SubscribeForCorrelationTokenInitializedEvent(activity, dataChangeEventListener);
        }

        private static void InitializeFollowers(IServiceProvider context, Type interfaceType, string followermethodName)
        {
            if (CorrelationResolver.IsInitializingMember(interfaceType, followermethodName, null))
                return;

            EventInfo[] events = interfaceType.GetEvents();
            foreach (EventInfo e in events)
            {
                CreateFollowerEntry(context, interfaceType, followermethodName, e.Name);
            }
        }

        private static void CreateFollowerEntry(IServiceProvider context, Type interfaceType, string followermethodName, string initializermethodName)
        {
            if (!CorrelationResolver.IsInitializingMember(interfaceType, initializermethodName, null))
                return;

            WorkflowQueuingService queueSvcs = (WorkflowQueuingService)context.GetService(typeof(WorkflowQueuingService));
            FollowerQueueCreator follower = new FollowerQueueCreator(followermethodName);
            WorkflowActivityTrace.Activity.TraceEvent(TraceEventType.Information, 0, "Creating follower {0} on initializer {1}", interfaceType.Name + followermethodName, interfaceType.Name + initializermethodName);

            ICollection<CorrelationProperty> corrValues = CorrelationResolver.ResolveCorrelationValues(interfaceType, initializermethodName, null, true);
            EventQueueName key = new EventQueueName(interfaceType, initializermethodName, corrValues);
            WorkflowQueue initializerQueue = null;
            if (queueSvcs.Exists(key))
            {
                initializerQueue = queueSvcs.GetWorkflowQueue(key);
            }
            else
            {
                // traversed follower before initializer
                initializerQueue = queueSvcs.CreateWorkflowQueue(key, true);
                initializerQueue.Enabled = false;
            }

            initializerQueue.RegisterForQueueItemArrived(follower);
        }

        internal static void UninitializeFollowers(Type interfaceType, string initializer, WorkflowQueue initializerQueue)
        {
            if (!CorrelationResolver.IsInitializingMember(interfaceType, initializer, null))
                return;

            EventInfo[] events = interfaceType.GetEvents();
            foreach (EventInfo e in events)
            {
                string follower = e.Name;
                if (!CorrelationResolver.IsInitializingMember(interfaceType, e.Name, null))
                    initializerQueue.UnregisterForQueueItemArrived(new FollowerQueueCreator(follower));
            }
        }
    }
}