File: word-count.rst

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (285 lines) | stat: -rw-r--r-- 8,953 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
Word count in HDFS
==================

Setup
-----

In this example, we'll use ``distributed`` with the ``hdfs3`` library to count
the number of words in text files (Enron email dataset, 6.4 GB) stored in HDFS.

Copy the text data from Amazon S3 into HDFS on the cluster:

.. code-block:: bash

   $ hadoop distcp s3n://AWS_SECRET_ID:AWS_SECRET_KEY@blaze-data/enron-email hdfs:///tmp/enron

where ``AWS_SECRET_ID`` and ``AWS_SECRET_KEY`` are valid AWS credentials.

Start the ``distributed`` scheduler and workers on the cluster.

Code example
------------

Import ``distributed``, ``hdfs3``, and other standard libraries used in this example:

.. code-block:: python

   >>> import hdfs3
   >>> from collections import defaultdict, Counter
   >>> from distributed import Client, progress

Initialize a connection to HDFS, replacing ``NAMENODE_HOSTNAME`` and
``NAMENODE_PORT`` with the hostname and port (default: 8020) of the HDFS
namenode.

.. code-block:: python

   >>> hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)

Initialize a connection to the ``distributed`` client, replacing
``SCHEDULER_IP`` and ``SCHEDULER_PORT`` with the IP address and port of the
``distributed`` scheduler.

.. code-block:: python

   >>> client = Client('SCHEDULER_IP:SCHEDULER_PORT')

Generate a list of filenames from the text data in HDFS:

.. code-block:: python

   >>> filenames = hdfs.glob('/tmp/enron/*/*')
   >>> print(filenames[:5])

   ['/tmp/enron/edrm-enron-v2_nemec-g_xml.zip/merged.txt',
    '/tmp/enron/edrm-enron-v2_ring-r_xml.zip/merged.txt',
    '/tmp/enron/edrm-enron-v2_bailey-s_xml.zip/merged.txt',
    '/tmp/enron/edrm-enron-v2_fischer-m_xml.zip/merged.txt',
    '/tmp/enron/edrm-enron-v2_geaccone-t_xml.zip/merged.txt']

Print the first 1024 bytes of the first text file:

.. code-block:: python

   >>> print(hdfs.head(filenames[0]))

   b'Date: Wed, 29 Nov 2000 09:33:00 -0800 (PST)\r\nFrom: Xochitl-Alexis Velasc
   o\r\nTo: Mark Knippa, Mike D Smith, Gerald Nemec, Dave S Laipple, Bo Barnwel
   l\r\nCc: Melissa Jones, Iris Waser, Pat Radford, Bonnie Shumaker\r\nSubject:
    Finalize ECS/EES Master Agreement\r\nX-SDOC: 161476\r\nX-ZLID: zl-edrm-enro
   n-v2-nemec-g-2802.eml\r\n\r\nPlease plan to attend a meeting to finalize the
    ECS/EES  Master Agreement \r\ntomorrow 11/30/00 at 1:30 pm CST.\r\n\r\nI wi
   ll email everyone tomorrow with location.\r\n\r\nDave-I will also email you
   the call in number tomorrow.\r\n\r\nThanks\r\nXochitl\r\n\r\n***********\r\n
   EDRM Enron Email Data Set has been produced in EML, PST and NSF format by ZL
    Technologies, Inc. This Data Set is licensed under a Creative Commons Attri
   bution 3.0 United States License <http://creativecommons.org/licenses/by/3.0
   /us/> . To provide attribution, please cite to "ZL Technologies, Inc. (http:
   //www.zlti.com)."\r\n***********\r\nDate: Wed, 29 Nov 2000 09:40:00 -0800 (P
   ST)\r\nFrom: Jill T Zivley\r\nTo: Robert Cook, Robert Crockett, John Handley
   , Shawna'

Create a function to count words in each file:

.. code-block:: python

   >>> def count_words(fn):
   ...     word_counts = defaultdict(int)
   ...     with hdfs.open(fn) as f:
   ...         for line in f:
   ...             for word in line.split():
   ...                 word_counts[word] += 1
   ...     return word_counts

Before we process all of the text files using the distributed workers, let's
test our function locally by counting the number of words in the first text
file:

.. code-block:: python

   >>> counts = count_words(filenames[0])
   >>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

   [(b'the', 144873),
    (b'of', 98122),
    (b'to', 97202),
    (b'and', 90575),
    (b'or', 60305),
    (b'in', 53869),
    (b'a', 43300),
    (b'any', 31632),
    (b'by', 31515),
    (b'is', 30055)]

We can perform the same operation of counting the words in the first text file,
except we will use ``client.submit`` to execute the computation on a ``distributed``
worker:

.. code-block:: python

   >>> future = client.submit(count_words, filenames[0])
   >>> counts = future.result()
   >>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

   [(b'the', 144873),
    (b'of', 98122),
    (b'to', 97202),
    (b'and', 90575),
    (b'or', 60305),
    (b'in', 53869),
    (b'a', 43300),
    (b'any', 31632),
    (b'by', 31515),
    (b'is', 30055)]

We are ready to count the number of words in all of the text files using
``distributed`` workers. Note that the ``map`` operation is non-blocking, and
you can continue to work in the Python shell/notebook while the computations
are running.

.. code-block:: python

   >>> futures = client.map(count_words, filenames)

We can check the status of some ``futures`` while all of the text files are
being processed:

.. code-block:: python

   >>> len(futures)

   161

   >>> futures[:5]

   [<Future: status: finished, key: count_words-5114ab5911de1b071295999c9049e941>,
    <Future: status: pending, key: count_words-d9e0d9daf6a1eab4ca1f26033d2714e7>,
    <Future: status: pending, key: count_words-d2f365a2360a075519713e9380af45c5>,
    <Future: status: pending, key: count_words-bae65a245042325b4c77fc8dde1acf1e>,
    <Future: status: pending, key: count_words-03e82a9b707c7e36eab95f4feec1b173>]

   >>> progress(futures)

   [########################################] | 100% Completed |  3min  0.2s

When the ``futures`` finish reading in all of the text files and counting
words, the results will exist on each worker. This operation required about
3 minutes to run on a cluster with three worker machines, each with 4 cores
and 16 GB RAM.

Note that because the previous computation is bound by the GIL in Python, we
can speed it up by starting the ``distributed`` workers with the
``--nworkers 4`` option.

To sum the word counts for all of the text files, we need to gather some
information from the ``distributed`` workers. To reduce the amount of data
that we gather from the workers, we can define a function that only returns the
top 10,000 words from each text file.

.. code-block:: python

   >>> def top_items(d):
   ...     items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
   ...     return dict(items)

We can then ``map`` the futures from the previous step to this culling
function. This is a convenient way to construct a pipeline of computations
using futures:

.. code-block:: python

   >>> futures2 = client.map(top_items, futures)

We can ``gather`` the resulting culled word count data for each text file to
the local process:

.. code-block:: python

   >>> results = client.gather(iter(futures2))

To sum the word counts for all of the text files, we can iterate over the
results in ``futures2`` and update a local dictionary that contains all of the
word counts.

.. code-block:: python

   >>> all_counts = Counter()
   >>> for result in results:
   ...     all_counts.update(result)

Finally, we print the total number of words in the results and the words with
the highest frequency from all of the text files:

.. code-block:: python

   >>> print(len(all_counts))

   8797842

   >>> print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

   [(b'0', 67218380),
    (b'the', 19586868),
    (b'-', 14123768),
    (b'to', 11893464),
    (b'N/A', 11814665),
    (b'of', 11724827),
    (b'and', 10253753),
    (b'in', 6684937),
    (b'a', 5470371),
    (b'or', 5227805)]

The complete Python script for this example is shown below:

.. code-block:: python

   # word-count.py

   import hdfs3
   from collections import defaultdict, Counter
   from distributed import Client
   from distributed.diagnostics.progressbar import progress

   hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
   client = Client('SCHEDULER_IP:SCHEDULER:PORT')

   filenames = hdfs.glob('/tmp/enron/*/*')
   print(filenames[:5])
   print(hdfs.head(filenames[0]))


   def count_words(fn):
       word_counts = defaultdict(int)
       with hdfs.open(fn) as f:
           for line in f:
               for word in line.split():
                   word_counts[word] += 1
       return word_counts

   counts = count_words(filenames[0])
   print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

   future = client.submit(count_words, filenames[0])
   counts = future.result()
   print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

   futures = client.map(count_words, filenames)
   len(futures)
   futures[:5]
   progress(futures)


   def top_items(d):
       items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
       return dict(items)

   futures2 = client.map(top_items, futures)
   results = client.gather(iter(futures2))

   all_counts = Counter()
   for result in results:
       all_counts.update(result)

   print(len(all_counts))

   print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])