1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Implementations of algorithms related to itemsets.
"""
import itertools
import numbers
import typing
import collections
from dataclasses import field, dataclass
import collections.abc
@dataclass
class ItemsetCount:
itemset_count: int = 0
members: set = field(default_factory=set)
class TransactionManager:
# The brilliant transaction manager idea is due to:
# https://github.com/ymoch/apyori/blob/master/apyori.py
def __init__(self, transactions: typing.Iterable[typing.Iterable[typing.Hashable]]):
# A lookup that returns indices of transactions for each item
self.indices_by_item = collections.defaultdict(set)
# Populate
i = -1
for i, transaction in enumerate(transactions):
for item in transaction:
self.indices_by_item[item].add(i)
# Total number of transactions
self._transactions = i + 1
@property
def items(self):
return set(self.indices_by_item.keys())
def __len__(self):
return self._transactions
def transaction_indices(self, transaction: typing.Iterable[typing.Hashable]):
"""Return the indices of the transaction."""
transaction = set(transaction) # Copy
item = transaction.pop()
indices = self.indices_by_item[item]
while transaction:
item = transaction.pop()
indices = indices.intersection(self.indices_by_item[item])
return indices
def transaction_indices_sc(self, transaction: typing.Iterable[typing.Hashable], min_support: float = 0):
"""Return the indices of the transaction, with short-circuiting.
Returns (over_or_equal_to_min_support, set_of_indices)
"""
# Sort items by number of transaction rows the item appears in,
# starting with the item beloning to the most transactions
transaction = sorted(transaction, key=lambda item: len(self.indices_by_item[item]), reverse=True)
# Pop item appearing in the fewest
item = transaction.pop()
indices = self.indices_by_item[item]
support = len(indices) / len(self)
if support < min_support:
return False, None
# The support is a non-increasing function
# Sorting by number of transactions the items appear in is a heuristic
# to make the support drop as quickly as possible
while transaction:
item = transaction.pop()
indices = indices.intersection(self.indices_by_item[item])
support = len(indices) / len(self)
if support < min_support:
return False, None
# No short circuit happened
return True, indices
def join_step(itemsets: typing.List[tuple]):
"""
Join k length itemsets into k + 1 length itemsets.
This algorithm assumes that the list of itemsets are sorted, and that the
itemsets themselves are sorted tuples. Instead of always enumerating all
n^2 combinations, the algorithm only has n^2 runtime for each block of
itemsets with the first k - 1 items equal.
Parameters
----------
itemsets : list of itemsets
A list of itemsets of length k, to be joined to k + 1 length
itemsets.
Examples
--------
>>> # This is an example from the 1994 paper by Agrawal et al.
>>> itemsets = [(1, 2, 3), (1, 2, 4), (1, 3, 4), (1, 3, 5), (2, 3, 4)]
>>> list(join_step(itemsets))
[(1, 2, 3, 4), (1, 3, 4, 5)]
"""
i = 0
# Iterate over every itemset in the itemsets
while i < len(itemsets):
# The number of rows to skip in the while-loop, initially set to 1
skip = 1
# Get all but the last item in the itemset, and the last item
*itemset_first, itemset_last = itemsets[i]
# We now iterate over every itemset following this one, stopping
# if the first k - 1 items are not equal. If we're at (1, 2, 3),
# we'll consider (1, 2, 4) and (1, 2, 7), but not (1, 3, 1)
# Keep a list of all last elements, i.e. tail elements, to perform
# 2-combinations on later on
tail_items = [itemset_last]
tail_items_append = tail_items.append # Micro-optimization
# Iterate over ever itemset following this itemset
for j in range(i + 1, len(itemsets)):
# Get all but the last item in the itemset, and the last item
*itemset_n_first, itemset_n_last = itemsets[j]
# If it's the same, append and skip this itemset in while-loop
if itemset_first == itemset_n_first:
# Micro-optimization
tail_items_append(itemset_n_last)
skip += 1
# If it's not the same, break out of the for-loop
else:
break
# For every 2-combination in the tail items, yield a new candidate
# itemset, which is sorted.
itemset_first_tuple = tuple(itemset_first)
for a, b in itertools.combinations(tail_items, 2):
yield itemset_first_tuple + (a,) + (b,)
# Increment the while-loop counter
i += skip
def prune_step(itemsets: typing.Iterable[tuple], possible_itemsets: typing.List[tuple]):
"""
Prune possible itemsets whose subsets are not in the list of itemsets.
Parameters
----------
itemsets : list of itemsets
A list of itemsets of length k.
possible_itemsets : list of itemsets
A list of possible itemsets of length k + 1 to be pruned.
Examples
-------
>>> itemsets = [('a', 'b', 'c'), ('a', 'b', 'd'),
... ('b', 'c', 'd'), ('a', 'c', 'd')]
>>> possible_itemsets = list(join_step(itemsets))
>>> list(prune_step(itemsets, possible_itemsets))
[('a', 'b', 'c', 'd')]
"""
# For faster lookups
itemsets = set(itemsets)
# Go through every possible itemset
for possible_itemset in possible_itemsets:
# Remove 1 from the combination, same as k-1 combinations
# The itemsets created by removing the last two items in the possible
# itemsets must be part of the itemsets by definition,
# due to the way the `join_step` function merges the sorted itemsets
for i in range(len(possible_itemset) - 2):
removed = possible_itemset[:i] + possible_itemset[i + 1 :]
# If every k combination exists in the set of itemsets,
# yield the possible itemset. If it does not exist, then it's
# support cannot be large enough, since supp(A) >= supp(AB) for
# all B, and if supp(S) is large enough, then supp(s) must be large
# enough for every s which is a subset of S.
# This is the downward-closure property of the support function.
if removed not in itemsets:
break
# If we have not breaked yet
else:
yield possible_itemset
def apriori_gen(itemsets: typing.List[tuple]):
"""
Compute all possible k + 1 length supersets from k length itemsets.
This is done efficiently by using the downward-closure property of the
support function, which states that if support(S) > k, then support(s) > k
for every subset s of S.
Parameters
----------
itemsets : list of itemsets
A list of itemsets of length k.
Examples
-------
>>> # This is an example from the 1994 paper by Agrawal et al.
>>> itemsets = [(1, 2, 3), (1, 2, 4), (1, 3, 4), (1, 3, 5), (2, 3, 4)]
>>> possible_itemsets = list(join_step(itemsets))
>>> list(prune_step(itemsets, possible_itemsets))
[(1, 2, 3, 4)]
"""
possible_extensions = join_step(itemsets)
yield from prune_step(itemsets, possible_extensions)
def itemsets_from_transactions(
transactions: typing.Iterable[typing.Union[set, tuple, list]],
min_support: float,
max_length: int = 8,
verbosity: int = 0,
output_transaction_ids: bool = False,
):
"""
Compute itemsets from transactions by building the itemsets bottom up and
iterating over the transactions to compute the support repedately. This is
the heart of the Apriori algorithm by Agrawal et al. in the 1994 paper.
Parameters
----------
transactions : a list of itemsets (tuples/sets/lists with hashable entries)
min_support : float
The minimum support of the itemsets, i.e. the minimum frequency as a
percentage.
max_length : int
The maximum length of the itemsets.
verbosity : int
The level of detail printing when the algorithm runs. Either 0, 1 or 2.
output_transaction_ids : bool
If set to true, the output contains the ids of transactions that
contain a frequent itemset. The ids are the enumeration of the
transactions in the sequence they appear.
Examples
--------
>>> # This is an example from the 1994 paper by Agrawal et al.
>>> transactions = [(1, 3, 4), (2, 3, 5), (1, 2, 3, 5), (2, 5)]
>>> itemsets, _ = itemsets_from_transactions(transactions, min_support=2/5)
>>> itemsets[1] == {(1,): 2, (2,): 3, (3,): 3, (5,): 3}
True
>>> itemsets[2] == {(1, 3): 2, (2, 3): 2, (2, 5): 3, (3, 5): 2}
True
>>> itemsets[3] == {(2, 3, 5): 2}
True
"""
# STEP 0 - Sanitize user inputs
# -----------------------------
if not (isinstance(min_support, numbers.Number) and (0 <= min_support <= 1)):
raise ValueError("`min_support` must be a number between 0 and 1.")
# Store in transaction manager
manager = TransactionManager(transactions)
# If no transactions are present
transaction_count = len(manager)
if transaction_count == 0:
return dict(), 0 # large_itemsets, num_transactions
# STEP 1 - Generate all large itemsets of size 1
# ----------------------------------------------
if verbosity > 0:
print("Generating itemsets.")
print(" Counting itemsets of length 1.")
candidates: typing.Dict[tuple, int] = {(item,): len(indices) for item, indices in manager.indices_by_item.items()}
large_itemsets: typing.Dict[int, typing.Dict[tuple, int]] = {
1: {item: count for (item, count) in candidates.items() if (count / len(manager)) >= min_support}
}
if verbosity > 0:
print(" Found {} candidate itemsets of length 1.".format(len(manager.items)))
print(" Found {} large itemsets of length 1.".format(len(large_itemsets.get(1, dict()))))
if verbosity > 1:
print(" {}".format(list(item for item in large_itemsets.get(1, dict()).keys())))
# If large itemsets were found, convert to dictionary
if not large_itemsets.get(1, dict()):
return dict(), 0 # large_itemsets, num_transactions
# STEP 2 - Build up the size of the itemsets
# ------------------------------------------
# While there are itemsets of the previous size
k = 2
while large_itemsets[k - 1] and (max_length != 1):
if verbosity > 0:
print(" Counting itemsets of length {}.".format(k))
# STEP 2a) - Build up candidate of larger itemsets
# Retrieve the itemsets of the previous size, i.e. of size k - 1
# They must be sorted to maintain the invariant when joining/pruning
itemsets_list = sorted(item for item in large_itemsets[k - 1].keys())
# Gen candidates of length k + 1 by joining, prune, and copy as set
# This algorithm assumes that the list of itemsets are sorted,
# and that the itemsets themselves are sorted tuples
C_k: typing.List[tuple] = list(apriori_gen(itemsets_list))
if verbosity > 0:
print(" Found {} candidate itemsets of length {}.".format(len(C_k), k))
if verbosity > 1:
print(" {}".format(C_k))
# If no candidate itemsets were found, break out of the loop
if not C_k:
break
# Prepare counts of candidate itemsets (from the prune step)
if verbosity > 1:
print(" Iterating over transactions.")
# Keep only large transactions
found_itemsets: typing.Dict[tuple, int] = dict()
for candidate in C_k:
over_min_support, indices = manager.transaction_indices_sc(candidate, min_support=min_support)
if over_min_support:
found_itemsets[candidate] = len(indices)
# If no itemsets were found, break out of the loop
if not found_itemsets:
break
# Candidate itemsets were found, add them
large_itemsets[k] = {i: counts for (i, counts) in found_itemsets.items()}
if verbosity > 0:
num_found = len(large_itemsets[k])
print(" Found {} large itemsets of length {}.".format(num_found, k))
if verbosity > 1:
print(" {}".format(list(large_itemsets[k].keys())))
k += 1
# Break out if we are about to consider larger itemsets than the max
if k > max_length:
break
if verbosity > 0:
print("Itemset generation terminated.\n")
if output_transaction_ids:
itemsets_out = {
length: {
item: ItemsetCount(itemset_count=count, members=manager.transaction_indices(set(item)))
for (item, count) in itemsets.items()
}
for (length, itemsets) in large_itemsets.items()
}
return itemsets_out, len(manager)
return large_itemsets, len(manager)
if __name__ == "__main__":
import pytest
pytest.main(args=[".", "--doctest-modules", "-v"])
|