1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
class torch:
@staticmethod
def ones(*args):
return torch
@staticmethod
def long():
return torch
@staticmethod
def to(device: str):
return torch.Tensor()
class Tensor:
pass
def transform(x):
return x
class spark:
class read:
@staticmethod
def parquet(file_name: str):
return spark.DataFrame()
class functions:
@staticmethod
def lit(constant):
return constant
@staticmethod
def col(col_name):
return col_name
class DataFrame:
@staticmethod
def withColumnRenamed(col_in, col_out):
return spark.DataFrame()
@staticmethod
def withColumn(col_in, col_out):
return spark.DataFrame()
@staticmethod
def select(*args):
return spark.DataFrame()
class F:
@staticmethod
def lit(value):
return value
# these will match
def get_tensors(device: str) -> torch.Tensor:
a = torch.ones(2, 1)
a = a.long()
a = a.to(device)
return a
def process(file_name: str):
common_columns = ["col1_renamed", "col2_renamed", "custom_col"]
df = spark.read.parquet(file_name)
df = df \
.withColumnRenamed('col1', 'col1_renamed') \
.withColumnRenamed('col2', 'col2_renamed')
df = df \
.select(common_columns) \
.withColumn('service_type', spark.functions.lit('green'))
return df
def projection(df_in: spark.DataFrame) -> spark.DataFrame:
df = (
df_in.select(["col1", "col2"])
.withColumnRenamed("col1", "col1a")
)
return df.withColumn("col2a", spark.functions.col("col2").cast("date"))
def assign_multiple(df):
df = df.select("column")
result_df = df.select("another_column")
final_df = result_df.withColumn("column2", F.lit("abc"))
return final_df
# not yet supported
def assign_alternating(df, df2):
df = df.select("column")
df2 = df2.select("another_column")
df = df.withColumn("column2", F.lit("abc"))
return df, df2
# these will not
def ignored(x):
_ = x.op1()
_ = _.op2()
return _
def _(x):
y = x.m()
return y.operation(*[v for v in y])
def assign_multiple_referenced(df, df2):
df = df.select("column")
result_df = df.select("another_column")
return df, result_df
def invalid(df_in: spark.DataFrame, alternative_df: spark.DataFrame) -> spark.DataFrame:
df = (
df_in.select(["col1", "col2"])
.withColumnRenamed("col1", "col1a")
)
return alternative_df.withColumn("col2a", spark.functions.col("col2").cast("date"))
def no_match():
y = 10
y = transform(y)
return y
def f(x):
if x:
name = "alice"
stripped = name.strip()
print(stripped)
else:
name = "bob"
print(name)
def g(x):
try:
name = "alice"
stripped = name.strip()
print(stripped)
except ValueError:
name = "bob"
print(name)
def h(x):
for _ in (1, 2, 3):
name = "alice"
stripped = name.strip()
print(stripped)
else:
name = "bob"
print(name)
def assign_multiple_try(df):
try:
df = df.select("column")
result_df = df.select("another_column")
final_df = result_df.withColumn("column2", F.lit("abc"))
return final_df
except ValueError:
return None
|