1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import logging
logger = logging.getLogger("panwid.datatable")
import raccoon as rc
import collections
class DataTableDataFrame(rc.DataFrame):
DATA_TABLE_COLUMNS = ["_dirty", "_focus_position", "_value_fn", "_cls", "_details", "_rendered_row"]
def __init__(self, data=None, columns=None, index=None, index_name="index", sort=None):
if columns and not index_name in columns:
columns.insert(0, index_name)
columns += self.DATA_TABLE_COLUMNS
super(DataTableDataFrame, self).__init__(
data=data,
columns=columns,
index=index,
index_name=index_name,
sort=sort
)
# for c in self.DATA_TABLE_COLUMNS:
# self[c] = None
def _validate_index(self, indexes):
try:
return super(DataTableDataFrame, self)._validate_index(indexes)
except ValueError:
logger.error("duplicates in index: %s" %(
[item for item, count
in list(collections.Counter(indexes).items()) if count > 1
]))
raise
def log_dump(self, n=5, columns=None, label=None):
df = self
if columns:
if not isinstance(columns, list):
columns = [columns]
df = df[columns]
logger.info("%slength: %d, index: %s [%s%s]\n%s" %(
"%s, " %(label) if label else "",
len(self),
self.index_name,
",".join([str(x) for x in self.index[0:min(n, len(self.index))]]),
"..." if len(self.index) > n else "",
df.head(n)))
def transpose_data(self, rows):
data_columns = list(set().union(*(list(d.keys()) for d in rows)))
data_columns += [
c for c in self.columns
if c not in data_columns
and c != self.index_name
and c not in self.DATA_TABLE_COLUMNS
]
data_columns += ["_cls", "_details"]
data = dict(
list(zip((data_columns),
[ list(z) for z in zip(*[[
d.get(k, None if k != "_details" else {"open": False, "disabled": False})
if isinstance(d, collections.abc.MutableMapping)
else getattr(d, k, None if k != "_details" else {"open": False, "disabled": False})
# for k in data_columns + self.DATA_TABLE_COLUMNS] for d in rows])]
for k in data_columns] for d in rows])]
))
)
return data
def update_rows(self, rows, limit=None):
data = self.transpose_data(rows)
# data["_details"] = [{"open": False, "disabled": False}] * len(rows)
# if not "_details" in data:
# data["_details"] = [{"open": False, "disabled": False}] * len(rows)
if not limit:
if len(rows):
indexes = [x for x in self.index if x not in data.get(self.index_name, [])]
if len(indexes):
self.delete_rows(indexes)
else:
self.delete_all_rows()
# logger.info(f"update_rowGs: {self.index}, {data[self.index_name]}")
if not len(rows):
return []
if self.index_name not in data:
index = list(range(len(self), len(self) + len(rows)))
data[self.index_name] = index
else:
index = data[self.index_name]
for c in data.keys():
try:
self.set(data[self.index_name], c, data[c])
except ValueError as e:
logger.error(e)
logger.info(f"update_rows: {self.index}, {data}")
raise Exception(c, len(self.index), len(data[c]))
return data.get(self.index_name, [])
def append_rows(self, rows):
length = len(rows)
if not length:
return
colnames = list(self.columns) + [c for c in self.DATA_TABLE_COLUMNS if c not in self.columns]
# data_columns = list(set().union(*(list(d.keys()) for d in rows)))
data = self.transpose_data(rows)
colnames += [c for c in data.keys() if c not in colnames]
for c in self.columns:
if not c in data:
data[c] = [None]*length
for c in colnames:
if not c in self.columns:
self[c] = None
kwargs = dict(
columns = colnames,
data = data,
sort=False,
index=data[self.index_name],
index_name = self.index_name,
)
try:
newdata = DataTableDataFrame(**kwargs)
except ValueError:
raise Exception(kwargs)
# newdata.log_dump()
# self.log_dump(10, label="before")
try:
self.append(newdata)
except ValueError:
raise Exception(f"{self.index}, {newdata}")
# self.log_dump(10, label="after")
# def add_column(self, column, data=None):
# self[column] = data
def clear(self):
self.delete_all_rows()
# self.delete_rows(self.index)
|