1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
#!/usr/bin/env python
"""Parallel word frequency counter.
This only works for a local cluster, because the filenames are local paths.
"""
import os
import time
from itertools import repeat
import requests
from wordfreq import print_wordfreq, wordfreq
import ipyparallel as ipp
davinci_url = "https://www.gutenberg.org/files/5000/5000-8.txt"
def pwordfreq(view, fnames):
"""Parallel word frequency counter.
view - An IPython DirectView
fnames - The filenames containing the split data.
"""
assert len(fnames) == len(view.targets)
view.scatter('fname', fnames, flatten=True)
ar = view.apply(wordfreq, ipp.Reference('fname'))
freqs_list = ar.get()
word_set = set()
for f in freqs_list:
word_set.update(f.keys())
freqs = dict(zip(word_set, repeat(0)))
for f in freqs_list:
for word, count in f.items():
freqs[word] += count
return freqs
if __name__ == '__main__':
# Create a Client and View
rc = ipp.Client()
view = rc[:]
view.apply_sync(os.chdir, os.getcwd())
if not os.path.exists('davinci.txt'):
# download from project gutenberg
print("Downloading Da Vinci's notebooks from Project Gutenberg")
r = requests.get(davinci_url)
with open('davinci.txt', 'w', encoding='utf8') as f:
f.write(r.text)
# Run the serial version
print("Serial word frequency count:")
text = open('davinci.txt', encoding='latin1').read()
tic = time.time()
freqs = wordfreq(text)
toc = time.time()
print_wordfreq(freqs, 10)
print("Took %.3f s to calculate" % (toc - tic))
# The parallel version
print("\nParallel word frequency count:")
# split the davinci.txt into one file per engine:
lines = text.splitlines()
nlines = len(lines)
n = len(rc)
block = nlines // n
for i in range(n):
chunk = lines[i * block : i * (block + 1)]
with open('davinci%i.txt' % i, 'w', encoding='utf8') as f:
f.write('\n'.join(chunk))
try: # python2
cwd = os.path.abspath(os.getcwdu())
except AttributeError: # python3
cwd = os.path.abspath(os.getcwd())
fnames = [os.path.join(cwd, 'davinci%i.txt' % i) for i in range(n)]
tic = time.time()
pfreqs = pwordfreq(view, fnames)
toc = time.time()
print_wordfreq(freqs)
print("Took %.3f s to calculate on %i engines" % (toc - tic, len(view.targets)))
# cleanup split files
map(os.remove, fnames)
|