File: rules.py

package info (click to toggle)
python-efficient-apriori 2.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,736 kB
  • sloc: python: 889; sh: 10; makefile: 10
file content (437 lines) | stat: -rw-r--r-- 14,701 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Implementations of algorithms related to association rules.
"""

import typing
import numbers
import itertools
from efficient_apriori.itemsets import apriori_gen


class Rule:
    """
    A class for a rule.
    """

    # Number of decimals used for printing
    _decimals = 3

    def __init__(
        self,
        lhs: tuple,
        rhs: tuple,
        count_full: int = 0,
        count_lhs: int = 0,
        count_rhs: int = 0,
        num_transactions: int = 0,
    ):
        """
        Initialize a new rule. This call is a thin wrapper around some data.

        Parameters
        ----------
        lhs : tuple
            The left hand side (antecedent) of the rule. Each item in the tuple
            must be hashable, e.g. a string or an integer.
        rhs : tuple
            The right hand side (consequent) of the rule.
        count_full : int
            The count of the union of the lhs and rhs in the dataset.
        count_lhs : int
            The count of the lhs in the dataset.
        count_rhs : int
            The count of the rhs in the dataset.
        num_transactions : int
            The number of transactions in the dataset.

        Examples
        --------
        >>> r = Rule(('a', 'b'), ('c',), 50, 100, 150, 200)
        >>> r.confidence  # Probability of 'c', given 'a' and 'b'
        0.5
        >>> r.support  # Probability of ('a', 'b', 'c') in the data
        0.25
        >>> # Ratio of observed over expected support if lhs, rhs = independent
        >>> r.lift == 2 / 3
        True
        >>> print(r)
        {a, b} -> {c} (conf: 0.500, supp: 0.250, lift: 0.667, conv: 0.500)
        >>> r
        {a, b} -> {c}
        """
        self.lhs = lhs  # antecedent
        self.rhs = rhs  # consequent
        self.count_full = count_full
        self.count_lhs = count_lhs
        self.count_rhs = count_rhs
        self.num_transactions = num_transactions

    @property
    def confidence(self):
        """
        The confidence of a rule is the probability of the rhs given the lhs.
        If X -> Y, then the confidence is P(Y|X).
        """
        try:
            return self.count_full / self.count_lhs
        except ZeroDivisionError:
            return None
        except AttributeError:
            return None

    @property
    def support(self):
        """
        The support of a rule is the frequency of which the lhs and rhs appear
        together in the dataset. If X -> Y, then the support is P(Y and X).
        """
        try:
            return self.count_full / self.num_transactions
        except ZeroDivisionError:
            return None
        except AttributeError:
            return None

    @property
    def lift(self):
        """
        The lift of a rule is the ratio of the observed support to the expected
        support if the lhs and rhs were independent.If X -> Y, then the lift is
        given by the fraction P(X and Y) / (P(X) * P(Y)).
        """
        try:
            observed_support = self.count_full / self.num_transactions
            prod_counts = self.count_lhs * self.count_rhs
            expected_support = prod_counts / self.num_transactions**2
            return observed_support / expected_support
        except ZeroDivisionError:
            return None
        except AttributeError:
            return None

    @property
    def conviction(self):
        """
        The conviction of a rule X -> Y is the ratio P(not Y) / P(not Y | X).
        It's the proportion of how often Y does not appear in the data to how
        often Y does not appear in the data, given X. If the ratio is large,
        then the confidence is large and Y appears often.
        """
        try:
            eps = 10e-10  # Avoid zero division
            prob_not_rhs = 1 - self.count_rhs / self.num_transactions
            prob_not_rhs_given_lhs = 1 - self.confidence
            return prob_not_rhs / (prob_not_rhs_given_lhs + eps)
        except ZeroDivisionError:
            return None
        except AttributeError:
            return None

    @property
    def rpf(self):
        """
        The RPF (Rule Power Factor) is the confidence times the support.
        """
        try:
            return self.confidence * self.support
        except ZeroDivisionError:
            return None
        except AttributeError:
            return None

    @staticmethod
    def _pf(s):
        """
        Pretty formatting of an iterable.
        """
        return "{" + ", ".join(str(k) for k in s) + "}"

    def __repr__(self):
        """
        Representation of a rule.
        """
        return "{} -> {}".format(self._pf(self.lhs), self._pf(self.rhs))

    def __str__(self):
        """
        Printing of a rule.
        """
        conf = "conf: {0:.3f}".format(self.confidence)
        supp = "supp: {0:.3f}".format(self.support)
        lift = "lift: {0:.3f}".format(self.lift)
        conv = "conv: {0:.3f}".format(self.conviction)

        return "{} -> {} ({}, {}, {}, {})".format(self._pf(self.lhs), self._pf(self.rhs), conf, supp, lift, conv)

    def __eq__(self, other):
        """
        Equality of two rules.
        """
        return (set(self.lhs) == set(other.lhs)) and (set(self.rhs) == set(other.rhs))

    def __hash__(self):
        """
        Hashing a rule for efficient set and dict representation.
        """
        return hash(frozenset(self.lhs + self.rhs))

    def __len__(self):
        """
        The length of a rule, defined as the number of items in the rule.
        """
        return len(self.lhs + self.rhs)


def generate_rules_simple(
    itemsets: typing.Dict[int, typing.Dict],
    min_confidence: float,
    num_transactions: int,
):
    """
    DO NOT USE. This is a simple top-down algorithm for generating association
    rules. It is included here for testing purposes, and because it is
    mentioned in the 1994 paper by Agrawal et al. It is slow because it does
    not enumerate the search space efficiently: it produces duplicates, and it
    does not prune the search space efficiently.

    Simple algorithm for generating association rules from itemsets.
    """

    # Iterate over every size
    for size in itemsets.keys():
        # Do not consider itemsets of size 1
        if size < 2:
            continue

        # This algorithm returns duplicates, so we keep track of items yielded
        # in a set to avoid yielding duplicates
        yielded: set = set()
        yielded_add = yielded.add

        # Iterate over every itemset of the prescribed size
        for itemset in itemsets[size].keys():
            # Generate rules
            for result in _genrules(itemset, itemset, itemsets, min_confidence, num_transactions):
                # If the rule has been yieded, keep going, else add and yield
                if result in yielded:
                    continue

                yielded_add(result)
                yield result


def _genrules(l_k, a_m, itemsets, min_conf, num_transactions):
    """
    DO NOT USE. This is the gen-rules algorithm from the 1994 paper by Agrawal
    et al. It's a subroutine called by `generate_rules_simple`. However, the
    algorithm `generate_rules_simple` should not be used.
    The naive algorithm from the original paper.

    Parameters
    ----------
    l_k : tuple
        The itemset containing all elements to be considered for a rule.
    a_m : tuple
        The itemset to take m-length combinations of, an move to the left of
        l_k. The itemset a_m is a subset of l_k.
    """

    def count(itemset):
        """
        Helper function to retrieve the count of the itemset in the dataset.
        """
        return itemsets[len(itemset)][itemset]

    # Iterate over every k - 1 combination of a_m to produce
    # rules of the form a -> (l - a)
    for a_m in itertools.combinations(a_m, len(a_m) - 1):
        # Compute the count of this rule, which is a_m -> (l_k - a_m)
        confidence = count(l_k) / count(a_m)

        # Keep going if the confidence level is too low
        if confidence < min_conf:
            continue

        # Create the right hand set: rhs = (l_k - a_m) , and keep it sorted
        rhs = set(l_k).difference(set(a_m))
        rhs = tuple(sorted(rhs))

        # Create new rule object and yield it
        yield Rule(a_m, rhs, count(l_k), count(a_m), count(rhs), num_transactions)

        # If the left hand side has one item only, do not recurse the function
        if len(a_m) <= 1:
            continue
        yield from _genrules(l_k, a_m, itemsets, min_conf, num_transactions)


def generate_rules_apriori(
    itemsets: typing.Dict[int, typing.Dict[tuple, int]],
    min_confidence: float,
    num_transactions: int,
    verbosity: int = 0,
):
    """
    Bottom up algorithm for generating association rules from itemsets, very
    similar to the fast algorithm proposed in the original 1994 paper by
    Agrawal et al.

    The algorithm is based on the observation that for {a, b} -> {c, d} to
    hold, both {a, b, c} -> {d} and {a, b, d} -> {c} must hold, since in
    general conf( {a, b, c} -> {d} ) >= conf( {a, b} -> {c, d} ).
    In other words, if either of the two one-consequent rules do not hold, then
    there is no need to ever consider the two-consequent rule.

    Parameters
    ----------
    itemsets : dict of dicts
        The first level of the dictionary is of the form (length, dict of item
        sets). The second level is of the form (itemset, count_in_dataset)).
    min_confidence :  float
        The minimum confidence required for the rule to be yielded.
    num_transactions : int
        The number of transactions in the data set.
    verbosity : int
        The level of detail printing when the algorithm runs. Either 0, 1 or 2.

    Examples
    --------
    >>> itemsets = {1: {('a',): 3, ('b',): 2, ('c',): 1},
    ...             2: {('a', 'b'): 2, ('a', 'c'): 1}}
    >>> list(generate_rules_apriori(itemsets, 1.0, 3))
    [{b} -> {a}, {c} -> {a}]
    """
    # Validate user inputs
    if not ((0 <= min_confidence <= 1) and isinstance(min_confidence, numbers.Number)):
        raise ValueError("`min_confidence` must be a number between 0 and 1.")

    if not ((num_transactions >= 0) and isinstance(num_transactions, numbers.Number)):
        raise ValueError("`num_transactions` must be a number greater than 0.")

    def count(itemset):
        """
        Helper function to retrieve the count of the itemset in the dataset.
        """
        return itemsets[len(itemset)][itemset]

    if verbosity > 0:
        print("Generating rules from itemsets.")

    # For every itemset of a perscribed size
    for size in itemsets.keys():
        # Do not consider itemsets of size 1
        if size < 2:
            continue

        if verbosity > 0:
            print(" Generating rules of size {}.".format(size))

        # For every itemset of this size
        for itemset in itemsets[size].keys():
            # Generate combinations to start off of. These 1-combinations will
            # be merged to 2-combinations in the function `_ap_genrules`
            H_1 = []
            # Special case to capture rules such as {others} -> {1 item}
            for removed in itertools.combinations(itemset, 1):
                # Compute the left hand side
                remaining = set(itemset).difference(set(removed))
                lhs = tuple(sorted(remaining))

                # If the confidence is high enough, yield the rule
                conf = count(itemset) / count(lhs)
                if conf >= min_confidence:
                    yield Rule(
                        lhs,
                        removed,
                        count(itemset),
                        count(lhs),
                        count(removed),
                        num_transactions,
                    )

                    # Consider the removed item for 2-combinations in the function `_ap_genrules`
                    H_1.append(removed)

            # If H_1 is empty, there is nothing for passing to _ap_genrules, so continue to the next itemset
            if len(H_1) == 0:
                continue

            yield from _ap_genrules(itemset, H_1, itemsets, min_confidence, num_transactions)

    if verbosity > 0:
        print("Rule generation terminated.\n")


def _ap_genrules(
    itemset: tuple,
    H_m: typing.List[tuple],
    itemsets: typing.Dict[int, typing.Dict[tuple, int]],
    min_conf: float,
    num_transactions: int,
):
    """
    Recursively build up rules by adding more items to the right hand side.

    This algorithm is called `ap-genrules` in the original paper. It is
    called by the `generate_rules_apriori` generator above. See it's docs.

    Parameters
    ----------
    itemset : tuple
        The itemset under consideration.
    H_m : tuple
        Subsets of the itemset of length m, to be considered for rhs of a rule.
    itemsets : dict of dicts
        All itemsets and counts for in the data set.
    min_conf : float
        The minimum confidence for a rule to be returned.
    num_transactions : int
        The number of transactions in the data set.
    """

    def count(itemset):
        """
        Helper function to retrieve the count of the itemset in the dataset.
        """
        return itemsets[len(itemset)][itemset]

    # If H_1 is so large that calling `apriori_gen` will produce right-hand
    # sides as large as `itemset`, there will be no right hand side.
    # This should not happen happen, so we return.
    if len(itemset) <= (len(H_m[0]) + 1):
        return

    # Generate right-hand itemsets of length k + 1 if H is of length k
    H_m = list(apriori_gen(H_m))
    H_m_copy = H_m.copy()

    # For every possible right hand side
    for h_m in H_m:
        # Compute the left hand side of the rule
        lhs = tuple(sorted(set(itemset).difference(set(h_m))))

        # If the confidence is high enough, yield the rule, else remove from
        # the upcoming recursive generator call
        if (count(itemset) / count(lhs)) >= min_conf:
            yield Rule(
                lhs,
                h_m,
                count(itemset),
                count(lhs),
                count(h_m),
                num_transactions,
            )
        else:
            H_m_copy.remove(h_m)

    # Unless the list of right-hand sides is empty, recurse the generator call
    if H_m_copy:
        yield from _ap_genrules(itemset, H_m_copy, itemsets, min_conf, num_transactions)


if __name__ == "__main__":
    import pytest

    pytest.main(args=[".", "--doctest-modules", "-v"])