File: pwordfreq.py

package info (click to toggle)
ipyparallel 8.8.0-6
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,412 kB
  • sloc: python: 21,991; javascript: 267; makefile: 29; sh: 28
file content (85 lines) | stat: -rw-r--r-- 2,495 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
"""Parallel word frequency counter.

This only works for a local cluster, because the filenames are local paths.
"""

import os
import time
from itertools import repeat

import requests
from wordfreq import print_wordfreq, wordfreq

import ipyparallel as ipp

davinci_url = "https://www.gutenberg.org/files/5000/5000-8.txt"


def pwordfreq(view, fnames):
    """Parallel word frequency counter.

    view - An IPython DirectView
    fnames - The filenames containing the split data.
    """
    assert len(fnames) == len(view.targets)
    view.scatter('fname', fnames, flatten=True)
    ar = view.apply(wordfreq, ipp.Reference('fname'))
    freqs_list = ar.get()
    word_set = set()
    for f in freqs_list:
        word_set.update(f.keys())
    freqs = dict(zip(word_set, repeat(0)))
    for f in freqs_list:
        for word, count in f.items():
            freqs[word] += count
    return freqs


if __name__ == '__main__':
    # Create a Client and View
    rc = ipp.Client()

    view = rc[:]
    view.apply_sync(os.chdir, os.getcwd())

    if not os.path.exists('davinci.txt'):
        # download from project gutenberg
        print("Downloading Da Vinci's notebooks from Project Gutenberg")
        r = requests.get(davinci_url)
        with open('davinci.txt', 'w', encoding='utf8') as f:
            f.write(r.text)

    # Run the serial version
    print("Serial word frequency count:")
    text = open('davinci.txt', encoding='latin1').read()
    tic = time.time()
    freqs = wordfreq(text)
    toc = time.time()
    print_wordfreq(freqs, 10)
    print("Took %.3f s to calculate" % (toc - tic))

    # The parallel version
    print("\nParallel word frequency count:")
    # split the davinci.txt into one file per engine:
    lines = text.splitlines()
    nlines = len(lines)
    n = len(rc)
    block = nlines // n
    for i in range(n):
        chunk = lines[i * block : i * (block + 1)]
        with open('davinci%i.txt' % i, 'w', encoding='utf8') as f:
            f.write('\n'.join(chunk))

    try:  # python2
        cwd = os.path.abspath(os.getcwdu())
    except AttributeError:  # python3
        cwd = os.path.abspath(os.getcwd())
    fnames = [os.path.join(cwd, 'davinci%i.txt' % i) for i in range(n)]
    tic = time.time()
    pfreqs = pwordfreq(view, fnames)
    toc = time.time()
    print_wordfreq(freqs)
    print("Took %.3f s to calculate on %i engines" % (toc - tic, len(view.targets)))
    # cleanup split files
    map(os.remove, fnames)