Skip to content

[python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset#8064

Open
steFaiz wants to merge 2 commits into
apache:masterfrom
steFaiz:chunk_shuffle_support
Open

[python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset#8064
steFaiz wants to merge 2 commits into
apache:masterfrom
steFaiz:chunk_shuffle_support

Conversation

@steFaiz
Copy link
Copy Markdown
Contributor

@steFaiz steFaiz commented Jun 1, 2026

Purpose

This PR will close: #8010
I tested a data-evolution table of 100,000,000 records, several structured columns and a blob column.
The result is as below:

Metrics Value
Plan 49.78s
AVG Per chunk read 1.199s
chunk size 100
AVG chunk Arrow size 41.14 MiB
AVG chunk file num 81
columns length, image_name, conversations, width, height, image_count, dataset, image_bytes

I directly test reading tables on dfs, it costs a lot to plan i.e. generate 1 million DataSplits and shuffle them. This is because generating 1 million objects in Python is heavy. This will be completed within several hundred of millisecond is Java.

Next step I'll try to add shuffle and buffered shuffle for Pytorch Paimon Dataset.

Tests

See Unit Tests

@JingsongLi
Copy link
Copy Markdown
Contributor

JingsongLi commented Jun 1, 2026

Hi @steFaiz Thanks for the contribution! Maybe just implementing shuffle in Pytorch Paimon Dataset in this PR? And document it.

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two issues worth addressing:

  1. paimon-python/pypaimon/read/scanner/chunk_shuffle_split_generator.py:358

The DE chunk-shuffle path silently drops files whose row_id_range() is None. _split_by_row_id_with_range() first builds list_ranges from only non-null ranges, then skips every file whose range is None. With normal DE planning, such metadata fails fast; with with_chunk_shuffle, the same table can return incomplete or even empty splits.

A minimal shape is a data-evolution table whose files have no first_row_id (for example, data-evolution.enabled=true without row tracking / legacy metadata). Normal new_scan().plan() raises, while new_scan().with_chunk_shuffle(...).plan() produces []. Please validate this invariant and raise instead of silently continuing, or explicitly fall back to the regular DE split generator.

  1. paimon-python/pypaimon/read/table_scan.py:162

The new API is implemented as with_chunk_shuffle(seed, chunk_size), but the discussion PR it closes documents usage as with_chunk_shuffle(chunk_size, seed). Since positional calls are natural here, users can silently swap the values and get a tiny chunk_size or unexpected shuffle seed. The current tests all use keyword arguments, so they do not catch this. Please align the order with the documented API, or make the parameters keyword-only to prevent accidental misuse.

I verified the added test file locally:

PYTHONPATH=. python -m pytest -q pypaimon/tests/scanner/chunk_shuffle_split_generator_test.py
# 36 passed

@steFaiz
Copy link
Copy Markdown
Contributor Author

steFaiz commented Jun 2, 2026

Hi @steFaiz Thanks for the contribution! Maybe just implementing shuffle in Pytorch Paimon Dataset in this PR? And document it.

Of course! I'll research on existing shuffle and buffer shuffle in pytorch, and implement them in this PR (as well as address your comments)

@steFaiz steFaiz changed the title [python] support chunk shuffle for append table [wip][python] support chunk shuffle for append table Jun 2, 2026
@steFaiz
Copy link
Copy Markdown
Contributor Author

steFaiz commented Jun 2, 2026

I introduced 3-layers shuffle in pytorch integration. including:

  1. File meta layer chunk-shuffle. This relies on random-access-optimized data format.
  2. Interleaving several chunks
  3. A buffer for shuffle

The usage is simple:

from torch.utils.data import DataLoader

seed = 42

# do chunk-shuffle in planning. This is optional
table_scan = read_builder.new_scan().with_chunk_shuffle(
    seed=seed,
    chunk_size=1000,
)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()

dataset = table_read.to_torch(
    splits,
    streaming=True,
    shuffle=True,
    seed=seed,
    # buffer shuffle
    buffer_size=1000,
    # interleave splits
    max_buffer_input_splits=10,
)

dataloader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=2,
    shuffle=False,
)

I refer to the HuggingFace Iterable Dataset: https://github.com/huggingface/datasets/blob/main/src/datasets/iterable_dataset.py

@steFaiz steFaiz changed the title [wip][python] support chunk shuffle for append table [python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset Jun 2, 2026
raise ValueError("%s must be a positive int" % name)
return value

def set_epoch(self, epoch: int) -> "TorchShuffledIterDataset":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_epoch() does not work once the dataset is already held by persistent DataLoader workers. With DataLoader(..., num_workers>0, persistent_workers=True), the worker processes keep their own Dataset instances alive across epochs, so calling dataset.set_epoch(1) in the parent process only updates the parent copy. The workers still use the old self.epoch here when building the buffer-shuffle RNG, which makes the shuffle order repeat even though the docs say callers can set the epoch before iterating the DataLoader for the next epoch. Could we either propagate epoch through worker-visible shared state, or document/reject persistent workers and require rebuilding the DataLoader? It would be good to add a test for this mode as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants