File: err_184.py

package info (click to toggle)
python-refurb 1.27.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,700 kB
  • sloc: python: 9,468; makefile: 40; sh: 6
file content (163 lines) | stat: -rw-r--r-- 3,440 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class torch:
    @staticmethod
    def ones(*args):
        return torch

    @staticmethod
    def long():
        return torch

    @staticmethod
    def to(device: str):
        return torch.Tensor()

    class Tensor:
        pass


def transform(x):
    return x


class spark:
    class read:
        @staticmethod
        def parquet(file_name: str):
            return spark.DataFrame()

    class functions:
        @staticmethod
        def lit(constant):
            return constant

        @staticmethod
        def col(col_name):
            return col_name

    class DataFrame:
        @staticmethod
        def withColumnRenamed(col_in, col_out):
            return spark.DataFrame()

        @staticmethod
        def withColumn(col_in, col_out):
            return spark.DataFrame()

        @staticmethod
        def select(*args):
            return spark.DataFrame()

class F:
    @staticmethod
    def lit(value):
        return value


# these will match
def get_tensors(device: str) -> torch.Tensor:
    a = torch.ones(2, 1)
    a = a.long()
    a = a.to(device)
    return a


def process(file_name: str):
    common_columns = ["col1_renamed", "col2_renamed", "custom_col"]
    df = spark.read.parquet(file_name)
    df = df \
        .withColumnRenamed('col1', 'col1_renamed') \
        .withColumnRenamed('col2', 'col2_renamed')
    df = df \
        .select(common_columns) \
        .withColumn('service_type', spark.functions.lit('green'))
    return df


def projection(df_in: spark.DataFrame) -> spark.DataFrame:
    df = (
        df_in.select(["col1", "col2"])
        .withColumnRenamed("col1", "col1a")
    )
    return df.withColumn("col2a", spark.functions.col("col2").cast("date"))


def assign_multiple(df):
    df = df.select("column")
    result_df = df.select("another_column")
    final_df = result_df.withColumn("column2", F.lit("abc"))
    return final_df


# not yet supported
def assign_alternating(df, df2):
    df = df.select("column")
    df2 = df2.select("another_column")
    df = df.withColumn("column2", F.lit("abc"))
    return df, df2


# these will not
def ignored(x):
    _ = x.op1()
    _ = _.op2()
    return _

def _(x):
    y = x.m()
    return y.operation(*[v for v in y])


def assign_multiple_referenced(df, df2):
    df = df.select("column")
    result_df = df.select("another_column")
    return df, result_df


def invalid(df_in: spark.DataFrame, alternative_df: spark.DataFrame) -> spark.DataFrame:
    df = (
        df_in.select(["col1", "col2"])
        .withColumnRenamed("col1", "col1a")
    )
    return alternative_df.withColumn("col2a", spark.functions.col("col2").cast("date"))


def no_match():
    y = 10
    y = transform(y)
    return y

def f(x):
    if x:
        name = "alice"
        stripped = name.strip()
        print(stripped)
    else:
        name = "bob"
    print(name)

def g(x):
    try:
        name = "alice"
        stripped = name.strip()
        print(stripped)
    except ValueError:
        name = "bob"
    print(name)

def h(x):
    for _ in (1, 2, 3):
        name = "alice"
        stripped = name.strip()
        print(stripped)
    else:
        name = "bob"
    print(name)

def assign_multiple_try(df):
    try:
        df = df.select("column")
        result_df = df.select("another_column")
        final_df = result_df.withColumn("column2", F.lit("abc"))
        return final_df
    except ValueError:
        return None