1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
The [`datapipes`](https://github.com/pytorch/pytorch/tree/master/torch/utils/data/datapipes) folder holds the implementation of the `IterDataPipe` and `MapDataPipe`.
This document serves as an entry point for DataPipe implementation.
## Implementing DataPipe
For the sake of an example, let us implement an `IterDataPipe` to apply a callable over data under [`iter`](https://github.com/pytorch/pytorch/tree/master/torch/utils/data/datapipes/iter).
For `MapDataPipe`, please take reference from files in [map](https://github.com/pytorch/pytorch/tree/master/torch/utils/data/datapipes/map) folder and implement the corresponding `__getitem__` method.
### Naming
The naming convention for DataPipe is Operation-er and with suffix of `IterDataPipe` because each DataPipe behaves like a container to apply the operation to data yielded from the source DataPipe.
And, when importing the DataPipe into `iter` module under `datapipes`, each DataPipe will be aliased as Op-er without the suffix of `IterDataPipe`.
Please check [`__init__.py`](https://github.com/pytorch/pytorch/blob/master/torch/utils/data/datapipes/iter/__init__.py) in `iter` module for how we aliasing each DataPipe class.
Like the example of `IterDataPipe` to map a function, we are going to name it as `MapperIterDataPipe` and alias it as `iter.Mapper` under `datapipes`.
### Constructor
As DataSet now constructed by a stack of DataPipe-s, each DataPipe normally takes a source DataPipe as the first argument.
```py
class MapperIterDataPipe(IterDataPipe):
def __init__(self, dp, fn):
super().__init__()
self.dp = dp
self.fn = fn
```
Note:
- Avoid loading data from the source DataPipe in `__init__` function, in order to support lazy data loading and save memory.
- If `IterDataPipe` instance holds data in memory, please be ware of the in-place modification of data. When second iterator is created from the instance, the data may have already changed. Please take [`IterableWrapper`](https://github.com/pytorch/pytorch/blob/master/torch/utils/data/datapipes/iter/utils.py) class as reference to `deepcopy` data for each iterator.
### Iterator
For `IterDataPipe`, an `__iter__` function is needed to consume data from the source `IterDataPipe` then apply operation over the data before yield.
```py
class MapperIterDataPipe(IterDataPipe):
...
def __iter__(self):
for d in self.dp:
yield self.fn(d)
```
### Length
In the most common cases, as the example of `MapperIterDataPipe` above, the `__len__` method of DataPipe should return the length of source DataPipe.
```py
class MapperIterDataPipe(IterDataPipe):
...
def __len__(self):
return len(self.dp)
```
Note that `__len__` method is optional for `IterDataPipe`.
Like `CSVParserIterDataPipe` in the [Using DataPipe sector](#using-datapipe), `__len__` is not implemented because the size of each file streams is unknown for us before loading it.
Besides, in some special cases, `__len__` method can be provided, but it would either return an integer length or raise Error depending on the arguments of DataPipe.
And, the Error is required to be `TypeError` to support Python's build-in functions like `list(dp)`.
Please check NOTE [ Lack of Default `__len__` in Python Abstract Base Classes ] for detailed reason in PyTorch.
### Registering DataPipe with functional API
Each DataPipe can be registered to support functional API using the decorator `functional_datapipe`.
```py
@functional_datapipe("map")
class MapperIterDataPipe(IterDataPipe):
...
```
Then, the stack of DataPipe can be constructed in functional-programming manner.
```py
>>> import torch.utils.data.datapipes as dp
>>> datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
>>> datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
>>> datapipes2 = dp.iter.Mapper(datapipes2)
>>> datapipes2 = dp.iter.Shuffler(datapipes2)
>>> datapipes2 = dp.iter.Batcher(datapipes2, 2)
```
In the above example, `datapipes1` and `datapipes2` represent the exact same stack of `IterDataPipe`-s.
## Using DataPipe
For example, we want to load data from CSV files with the following data pipeline:
- List all csv files
- Load csv files
- Parse csv file and yield rows
To support the above pipeline, `CSVParser` is registered as `parse_csv_files` to consume file streams and expand them as rows.
```py
@functional_datapipe("parse_csv_files")
class CSVParserIterDataPipe(IterDataPipe):
def __init__(self, dp, **fmtparams):
self.dp = dp
self.fmtparams = fmtparams
def __iter__(self):
for filename, stream in self.dp:
reader = csv.reader(stream, **self.fmtparams)
for row in reader:
yield filename, row
```
Then, the pipeline can be assembled as following:
```py
>>> import torch.utils.data.datapipes as dp
>>> FOLDER = 'path/2/csv/folder'
>>> datapipe = dp.iter.FileLister([FOLDER]).filter(fn=lambda filename: filename.endswith('.csv'))
>>> datapipe = dp.iter.FileOpener(datapipe, mode='rt')
>>> datapipe = datapipe.parse_csv_files(delimiter=' ')
>>> for d in datapipe: # Start loading data
... pass
```
|