[python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset#8064
[python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset#8064steFaiz wants to merge 2 commits into
Conversation
|
Hi @steFaiz Thanks for the contribution! Maybe just implementing shuffle in Pytorch Paimon Dataset in this PR? And document it. |
JingsongLi
left a comment
There was a problem hiding this comment.
I found two issues worth addressing:
paimon-python/pypaimon/read/scanner/chunk_shuffle_split_generator.py:358
The DE chunk-shuffle path silently drops files whose row_id_range() is None. _split_by_row_id_with_range() first builds list_ranges from only non-null ranges, then skips every file whose range is None. With normal DE planning, such metadata fails fast; with with_chunk_shuffle, the same table can return incomplete or even empty splits.
A minimal shape is a data-evolution table whose files have no first_row_id (for example, data-evolution.enabled=true without row tracking / legacy metadata). Normal new_scan().plan() raises, while new_scan().with_chunk_shuffle(...).plan() produces []. Please validate this invariant and raise instead of silently continuing, or explicitly fall back to the regular DE split generator.
paimon-python/pypaimon/read/table_scan.py:162
The new API is implemented as with_chunk_shuffle(seed, chunk_size), but the discussion PR it closes documents usage as with_chunk_shuffle(chunk_size, seed). Since positional calls are natural here, users can silently swap the values and get a tiny chunk_size or unexpected shuffle seed. The current tests all use keyword arguments, so they do not catch this. Please align the order with the documented API, or make the parameters keyword-only to prevent accidental misuse.
I verified the added test file locally:
PYTHONPATH=. python -m pytest -q pypaimon/tests/scanner/chunk_shuffle_split_generator_test.py
# 36 passed
Of course! I'll research on existing shuffle and buffer shuffle in pytorch, and implement them in this PR (as well as address your comments) |
|
I introduced 3-layers shuffle in pytorch integration. including:
The usage is simple: from torch.utils.data import DataLoader
seed = 42
# do chunk-shuffle in planning. This is optional
table_scan = read_builder.new_scan().with_chunk_shuffle(
seed=seed,
chunk_size=1000,
)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
dataset = table_read.to_torch(
splits,
streaming=True,
shuffle=True,
seed=seed,
# buffer shuffle
buffer_size=1000,
# interleave splits
max_buffer_input_splits=10,
)
dataloader = DataLoader(
dataset,
batch_size=32,
num_workers=2,
shuffle=False,
)I refer to the HuggingFace Iterable Dataset: https://github.com/huggingface/datasets/blob/main/src/datasets/iterable_dataset.py |
| raise ValueError("%s must be a positive int" % name) | ||
| return value | ||
|
|
||
| def set_epoch(self, epoch: int) -> "TorchShuffledIterDataset": |
There was a problem hiding this comment.
set_epoch() does not work once the dataset is already held by persistent DataLoader workers. With DataLoader(..., num_workers>0, persistent_workers=True), the worker processes keep their own Dataset instances alive across epochs, so calling dataset.set_epoch(1) in the parent process only updates the parent copy. The workers still use the old self.epoch here when building the buffer-shuffle RNG, which makes the shuffle order repeat even though the docs say callers can set the epoch before iterating the DataLoader for the next epoch. Could we either propagate epoch through worker-visible shared state, or document/reject persistent workers and require rebuilding the DataLoader? It would be good to add a test for this mode as well.
Purpose
This PR will close: #8010
I tested a data-evolution table of 100,000,000 records, several structured columns and a blob column.
The result is as below:
I directly test reading tables on dfs, it costs a lot to plan i.e. generate 1 million DataSplits and shuffle them. This is because generating 1 million objects in Python is heavy. This will be completed within several hundred of millisecond is Java.
Next step I'll try to add shuffle and buffered shuffle for Pytorch Paimon Dataset.
Tests
See Unit Tests