File: word_count_task.py

package info (click to toggle)
python-avro 1.8.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,436 kB
  • ctags: 1,407
  • sloc: python: 9,113; xml: 3,822; java: 384; sh: 249; makefile: 44
file content (96 lines) | stat: -rw-r--r-- 3,117 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
"""

__all__=["WordCountTask"]

from avro.tether import TetherTask

import logging

#TODO::Make the logging level a parameter we can set
#logging.basicConfig(level=logging.INFO)
class WordCountTask(TetherTask):
  """
  Implements the mappper and reducer for the word count example
  """

  def __init__(self):
    """
    """

    inschema="""{"type":"string"}"""
    midschema="""{"type":"record", "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
              {"name":"key","type":"string"},
              {"name":"value","type":"long","order":"ignore"}]
              }"""
    outschema=midschema
    TetherTask.__init__(self,inschema,midschema,outschema)


    #keep track of the partial sums of the counts
    self.psum=0


  def map(self,record,collector):
    """Implement the mapper for the word count example

    Parameters
    ----------------------------------------------------------------------------
    record - The input record
    collector - The collector to collect the output
    """

    words=record.split()

    for w in words:
      logging.info("WordCountTask.Map: word={0}".format(w))
      collector.collect({"key":w,"value":1})

  def reduce(self,record, collector):
    """Called with input values to generate reducer output. Inputs are sorted by the mapper
    key.

    The reduce function is invoked once for each value belonging to a given key outputted
    by the mapper.

    Parameters
    ----------------------------------------------------------------------------
    record - The mapper output
    collector - The collector to collect the output
    """

    self.psum+=record["value"]

  def reduceFlush(self,record, collector):
    """
    Called with the last intermediate value in each equivalence run.
    In other words, reduceFlush is invoked once for each key produced in the reduce
    phase. It is called after reduce has been invoked on each value for the given key.

    Parameters
    ------------------------------------------------------------------
    record - the last record on which reduce was invoked.
    """

    #collect the current record
    logging.info("WordCountTask.reduceFlush key={0} value={1}".format(record["key"],self.psum))

    collector.collect({"key":record["key"],"value":self.psum})

    #reset the sum
    self.psum=0