File: count-lines-4

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (131 lines) | stat: -rwxr-xr-x 4,301 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
#
# This source file is part of the Swift.org open source project
#
# Copyright (c) 2014 - 2015 Apple Inc. and the Swift project authors
# Licensed under Apache License v2.0 with Runtime Library Exception
#
# See http://swift.org/LICENSE.txt for license information
# See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors

import os
import subprocess

import util
from simplebuild import *

class GetFileInfoRule(SimpleAsyncRule):
    """
    Rule which synchronizes the state of external file.
    """

    def __init__(self, path):
        self.path = str(path)

    def run(self):
        util.note("getting file info: %r" % (self.path,))
        return util.get_stat_info(self.path)

    def is_data_valid(self, engine, data):
        # Check if the current info is up-to-date.
        util.note("checking file info: %r" % (self.path,))
        return util.get_stat_info(self.path) == data
        
class GetDirectoryContentsRule(SimpleAsyncRule):
    def __init__(self, path):
        self.path = path

    def start(self, engine):
        # Get the file info for the directory, to ensure we treat the external
        # directory as an input.
        engine.task_needs_input(self, GetFileInfoRule.asKey(self.path))

    def provide_data(self, engine, input_id, data):
        pass

    def run(self):
        util.note("listing directory contents: %r" % (str(self.path),))
        try:
            return sorted(os.listdir(self.path))
        except OSError:
            return []

class SpawnJobRule(SimpleAsyncRule):
    def __init__(self, args, input_files, output_files, other_inputs=()):
        self.args = list(args)
        self.input_files = list(input_files)
        self.output_files = list(output_files)

    def start(self, engine):
        # Request all of the inputs.
        for path in self.input_files:
            engine.task_needs_input(self, GetFileInfoRule.asKey(path))

    def provide_data(self, engine, input_id, data):
        pass
        
    def run(self):
        try:
            util.message(' '.join(self.args))
            subprocess.check_call(self.args)
        except:
            import traceback
            traceback.print_exc()
            util.error("error: command failed: %s" % (' '.join(self.args),))
            return { "error" : True }

        # Get the file info on all of the output files.
        output_infos = [util.get_stat_info(path)
                       for path in self.output_files]
        return { "output_info" : output_infos }
    
class CountSourceLinesRule(SimpleAsyncRule):
    get_dir_input_ID = 0
    source_count_input_ID = 1

    def __init__(self, path):
        super(CountSourceLinesRule, self).__init__()
        self.path = str(path)
        self.output_paths = None
        self.result = None

    def start(self, engine):
        engine.task_needs_input(self, GetDirectoryContentsRule.asKey(self.path),
                                self.get_dir_input_ID)

    def provide_data(self, engine, input_id, data):
        # If this is the directory result, scan it and request individual
        # counts.
        if input_id == 0:
            self.output_paths = []
            for input in data:
                if input.endswith(".c"):
                    input_path = os.path.join(self.path, input)
                    output_path = os.path.join(self.path, input + ".wc")
                    self.output_paths.append(output_path)

                    cmd = ['/bin/sh', '-c', "wc -l < %s > %s" % (input_path,
                                                              output_path)]
                    engine.task_needs_input(
                        self, SpawnJobRule.asKey(cmd, [input_path],
                                                 [output_path]),
                        self.source_count_input_ID)
            return

    def run(self):
        # Gather all the results.
        self.result = 0
        for path in self.output_paths:
            with open(path) as f:
                self.result += int(f.read())
        return self.result

# Get the input path.
_,path = sys.argv

# Run the job
engine = DataDrivenEngine(globals())
engine.attach_db(".count-source-lines-with-wc.db")

result = engine.build(CountSourceLinesRule.asKey(path))
print(result)