File: word_count_task.py

package info (click to toggle)
python-avro 1.11.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 4,724 kB
  • sloc: python: 7,196; xml: 4,238; sh: 784; java: 386; makefile: 74
file content (95 lines) | stat: -rw-r--r-- 3,356 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python3

##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import avro.tether.tether_task

__all__ = ["WordCountTask"]


# TODO::Make the logging level a parameter we can set
# logging.basicConfig(level=logging.INFO)
class WordCountTask(avro.tether.tether_task.TetherTask):
    """
    Implements the mapper and reducer for the word count example
    """

    def __init__(self):
        """ """

        inschema = """{"type":"string"}"""
        midschema = """{"type":"record", "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
              {"name":"key","type":"string"},
              {"name":"value","type":"long","order":"ignore"}]
              }"""
        outschema = midschema
        avro.tether.tether_task.TetherTask.__init__(self, inschema, midschema, outschema)

        # keep track of the partial sums of the counts
        self.psum = 0

    def map(self, record, collector):
        """Implement the mapper for the word count example

        Parameters
        ----------------------------------------------------------------------------
        record - The input record
        collector - The collector to collect the output
        """

        words = record.split()

        for w in words:
            logging.info("WordCountTask.Map: word=%s", w)
            collector.collect({"key": w, "value": 1})

    def reduce(self, record, collector):
        """Called with input values to generate reducer output. Inputs are sorted by the mapper
        key.

        The reduce function is invoked once for each value belonging to a given key outputted
        by the mapper.

        Parameters
        ----------------------------------------------------------------------------
        record - The mapper output
        collector - The collector to collect the output
        """

        self.psum += record["value"]

    def reduceFlush(self, record, collector):
        """
        Called with the last intermediate value in each equivalence run.
        In other words, reduceFlush is invoked once for each key produced in the reduce
        phase. It is called after reduce has been invoked on each value for the given key.

        Parameters
        ------------------------------------------------------------------
        record - the last record on which reduce was invoked.
        """

        # collect the current record
        logging.info("WordCountTask.reduceFlush key=%s value=%s", record["key"], self.psum)

        collector.collect({"key": record["key"], "value": self.psum})

        # reset the sum
        self.psum = 0