1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
|
Word count in HDFS
==================
Setup
-----
In this example, we'll use ``distributed`` with the ``hdfs3`` library to count
the number of words in text files (Enron email dataset, 6.4 GB) stored in HDFS.
Copy the text data from Amazon S3 into HDFS on the cluster:
.. code-block:: bash
$ hadoop distcp s3n://AWS_SECRET_ID:AWS_SECRET_KEY@blaze-data/enron-email hdfs:///tmp/enron
where ``AWS_SECRET_ID`` and ``AWS_SECRET_KEY`` are valid AWS credentials.
Start the ``distributed`` scheduler and workers on the cluster.
Code example
------------
Import ``distributed``, ``hdfs3``, and other standard libraries used in this example:
.. code-block:: python
>>> import hdfs3
>>> from collections import defaultdict, Counter
>>> from distributed import Client, progress
Initialize a connection to HDFS, replacing ``NAMENODE_HOSTNAME`` and
``NAMENODE_PORT`` with the hostname and port (default: 8020) of the HDFS
namenode.
.. code-block:: python
>>> hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
Initialize a connection to the ``distributed`` client, replacing
``SCHEDULER_IP`` and ``SCHEDULER_PORT`` with the IP address and port of the
``distributed`` scheduler.
.. code-block:: python
>>> client = Client('SCHEDULER_IP:SCHEDULER_PORT')
Generate a list of filenames from the text data in HDFS:
.. code-block:: python
>>> filenames = hdfs.glob('/tmp/enron/*/*')
>>> print(filenames[:5])
['/tmp/enron/edrm-enron-v2_nemec-g_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_ring-r_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_bailey-s_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_fischer-m_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_geaccone-t_xml.zip/merged.txt']
Print the first 1024 bytes of the first text file:
.. code-block:: python
>>> print(hdfs.head(filenames[0]))
b'Date: Wed, 29 Nov 2000 09:33:00 -0800 (PST)\r\nFrom: Xochitl-Alexis Velasc
o\r\nTo: Mark Knippa, Mike D Smith, Gerald Nemec, Dave S Laipple, Bo Barnwel
l\r\nCc: Melissa Jones, Iris Waser, Pat Radford, Bonnie Shumaker\r\nSubject:
Finalize ECS/EES Master Agreement\r\nX-SDOC: 161476\r\nX-ZLID: zl-edrm-enro
n-v2-nemec-g-2802.eml\r\n\r\nPlease plan to attend a meeting to finalize the
ECS/EES Master Agreement \r\ntomorrow 11/30/00 at 1:30 pm CST.\r\n\r\nI wi
ll email everyone tomorrow with location.\r\n\r\nDave-I will also email you
the call in number tomorrow.\r\n\r\nThanks\r\nXochitl\r\n\r\n***********\r\n
EDRM Enron Email Data Set has been produced in EML, PST and NSF format by ZL
Technologies, Inc. This Data Set is licensed under a Creative Commons Attri
bution 3.0 United States License <http://creativecommons.org/licenses/by/3.0
/us/> . To provide attribution, please cite to "ZL Technologies, Inc. (http:
//www.zlti.com)."\r\n***********\r\nDate: Wed, 29 Nov 2000 09:40:00 -0800 (P
ST)\r\nFrom: Jill T Zivley\r\nTo: Robert Cook, Robert Crockett, John Handley
, Shawna'
Create a function to count words in each file:
.. code-block:: python
>>> def count_words(fn):
... word_counts = defaultdict(int)
... with hdfs.open(fn) as f:
... for line in f:
... for word in line.split():
... word_counts[word] += 1
... return word_counts
Before we process all of the text files using the distributed workers, let's
test our function locally by counting the number of words in the first text
file:
.. code-block:: python
>>> counts = count_words(filenames[0])
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'the', 144873),
(b'of', 98122),
(b'to', 97202),
(b'and', 90575),
(b'or', 60305),
(b'in', 53869),
(b'a', 43300),
(b'any', 31632),
(b'by', 31515),
(b'is', 30055)]
We can perform the same operation of counting the words in the first text file,
except we will use ``client.submit`` to execute the computation on a ``distributed``
worker:
.. code-block:: python
>>> future = client.submit(count_words, filenames[0])
>>> counts = future.result()
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'the', 144873),
(b'of', 98122),
(b'to', 97202),
(b'and', 90575),
(b'or', 60305),
(b'in', 53869),
(b'a', 43300),
(b'any', 31632),
(b'by', 31515),
(b'is', 30055)]
We are ready to count the number of words in all of the text files using
``distributed`` workers. Note that the ``map`` operation is non-blocking, and
you can continue to work in the Python shell/notebook while the computations
are running.
.. code-block:: python
>>> futures = client.map(count_words, filenames)
We can check the status of some ``futures`` while all of the text files are
being processed:
.. code-block:: python
>>> len(futures)
161
>>> futures[:5]
[<Future: status: finished, key: count_words-5114ab5911de1b071295999c9049e941>,
<Future: status: pending, key: count_words-d9e0d9daf6a1eab4ca1f26033d2714e7>,
<Future: status: pending, key: count_words-d2f365a2360a075519713e9380af45c5>,
<Future: status: pending, key: count_words-bae65a245042325b4c77fc8dde1acf1e>,
<Future: status: pending, key: count_words-03e82a9b707c7e36eab95f4feec1b173>]
>>> progress(futures)
[########################################] | 100% Completed | 3min 0.2s
When the ``futures`` finish reading in all of the text files and counting
words, the results will exist on each worker. This operation required about
3 minutes to run on a cluster with three worker machines, each with 4 cores
and 16 GB RAM.
Note that because the previous computation is bound by the GIL in Python, we
can speed it up by starting the ``distributed`` workers with the
``--nworkers 4`` option.
To sum the word counts for all of the text files, we need to gather some
information from the ``distributed`` workers. To reduce the amount of data
that we gather from the workers, we can define a function that only returns the
top 10,000 words from each text file.
.. code-block:: python
>>> def top_items(d):
... items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
... return dict(items)
We can then ``map`` the futures from the previous step to this culling
function. This is a convenient way to construct a pipeline of computations
using futures:
.. code-block:: python
>>> futures2 = client.map(top_items, futures)
We can ``gather`` the resulting culled word count data for each text file to
the local process:
.. code-block:: python
>>> results = client.gather(iter(futures2))
To sum the word counts for all of the text files, we can iterate over the
results in ``futures2`` and update a local dictionary that contains all of the
word counts.
.. code-block:: python
>>> all_counts = Counter()
>>> for result in results:
... all_counts.update(result)
Finally, we print the total number of words in the results and the words with
the highest frequency from all of the text files:
.. code-block:: python
>>> print(len(all_counts))
8797842
>>> print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'0', 67218380),
(b'the', 19586868),
(b'-', 14123768),
(b'to', 11893464),
(b'N/A', 11814665),
(b'of', 11724827),
(b'and', 10253753),
(b'in', 6684937),
(b'a', 5470371),
(b'or', 5227805)]
The complete Python script for this example is shown below:
.. code-block:: python
# word-count.py
import hdfs3
from collections import defaultdict, Counter
from distributed import Client
from distributed.diagnostics.progressbar import progress
hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
client = Client('SCHEDULER_IP:SCHEDULER:PORT')
filenames = hdfs.glob('/tmp/enron/*/*')
print(filenames[:5])
print(hdfs.head(filenames[0]))
def count_words(fn):
word_counts = defaultdict(int)
with hdfs.open(fn) as f:
for line in f:
for word in line.split():
word_counts[word] += 1
return word_counts
counts = count_words(filenames[0])
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
future = client.submit(count_words, filenames[0])
counts = future.result()
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
futures = client.map(count_words, filenames)
len(futures)
futures[:5]
progress(futures)
def top_items(d):
items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
return dict(items)
futures2 = client.map(top_items, futures)
results = client.gather(iter(futures2))
all_counts = Counter()
for result in results:
all_counts.update(result)
print(len(all_counts))
print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
|