[CI] Fix Playground CI Nightly flakiness caused by API rate limiting and backend startup races#38752
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the initial implementation of the Apache Beam DeltaIO connector for the Java SDK. It provides the capability to read data from Delta Lake tables by implementing the Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request implements the DeltaIO.ReadRows expansion to read Delta Lake tables using the Delta Kernel API, introducing file descriptors, a file-reading DoFn, schema inference, and row conversion utilities, alongside integration and performance tests. The reviewer feedback highlights critical performance and correctness issues: re-listing scan files inside the read loop causes O(N^2) complexity, and creating Hadoop configurations and engines per element is highly inefficient. Additionally, convertVectorValue lacks support for complex types, which can cause correctness bugs, and the use of SerializableCoder should be optimized using Beam's schema-based coders.
| try (CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(engine)) { | ||
| while (scanFileIter.hasNext()) { | ||
| FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); | ||
| try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = scanFilesBatch.getRows()) { | ||
| while (scanFileRows.hasNext()) { | ||
| io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); | ||
| FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); | ||
| if (fileStatus.getPath().equals(desc.getFilePath())) { |
There was a problem hiding this comment.
Re-listing all scan files in the table for every file descriptor processed by ReadFileFn leads to
Consider refactoring the reading logic to avoid re-listing all files on every element. For example, you could serialize the necessary metadata of scanFileRow directly into DeltaFileDescriptor and reconstruct a lightweight Row wrapper on the worker, or group the file reads so that a single worker task can process multiple files from the same scan iterator without re-listing.
| public void processElement(ProcessContext c) throws Exception { | ||
| DeltaFileDescriptor desc = c.element(); | ||
| org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(desc.getHadoopConfig()); | ||
| Engine engine = DefaultEngine.create(hadoopConfig); | ||
| Table table = Table.forPath(engine, desc.getTablePath()); |
There was a problem hiding this comment.
Creating a Hadoop Configuration and a Delta Engine inside processElement for every single file is highly inefficient as these are expensive operations.
Instead, pass the Hadoop configuration map to the ReadFileFn constructor, initialize the Engine once in @Setup, and store it in a transient field to reuse it across all elements in the bundle.
| private static Object convertVectorValue(ColumnVector vec, int rowId, DataType dataType) { | ||
| if (dataType instanceof IntegerType) { | ||
| return vec.getInt(rowId); | ||
| } else if (dataType instanceof LongType) { | ||
| return vec.getLong(rowId); | ||
| } else if (dataType instanceof StringType) { | ||
| return vec.getString(rowId); | ||
| } else if (dataType instanceof DoubleType) { | ||
| return vec.getDouble(rowId); | ||
| } else if (dataType instanceof FloatType) { | ||
| return vec.getFloat(rowId); | ||
| } else if (dataType instanceof BooleanType) { | ||
| return vec.getBoolean(rowId); | ||
| } else if (dataType instanceof ShortType) { | ||
| return vec.getShort(rowId); | ||
| } else if (dataType instanceof ByteType) { | ||
| return vec.getByte(rowId); | ||
| } else if (dataType instanceof BinaryType) { | ||
| return vec.getBinary(rowId); | ||
| } else if (dataType instanceof DecimalType) { | ||
| return vec.getDecimal(rowId); | ||
| } else { | ||
| return vec.toString(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation of convertVectorValue does not handle complex types such as TimestampType, DateType, StructType, ArrayType, or MapType. If any of these types are present inside an array or map, it will fall back to vec.toString(), which returns an internal class name or debug string instead of the actual value, leading to correctness bugs.
Update convertVectorValue to recursively handle all supported Delta types, similar to how convertValue is implemented.
private static Object convertVectorValue(ColumnVector vec, int rowId, DataType dataType) {
if (vec.isNullAt(rowId)) {
return null;
}
if (dataType instanceof IntegerType) {
return vec.getInt(rowId);
} else if (dataType instanceof LongType) {
return vec.getLong(rowId);
} else if (dataType instanceof StringType) {
return vec.getString(rowId);
} else if (dataType instanceof DoubleType) {
return vec.getDouble(rowId);
} else if (dataType instanceof FloatType) {
return vec.getFloat(rowId);
} else if (dataType instanceof BooleanType) {
return vec.getBoolean(rowId);
} else if (dataType instanceof ShortType) {
return vec.getShort(rowId);
} else if (dataType instanceof ByteType) {
return vec.getByte(rowId);
} else if (dataType instanceof BinaryType) {
return vec.getBinary(rowId);
} else if (dataType instanceof DecimalType) {
return vec.getDecimal(rowId);
} else if (dataType instanceof TimestampType) {
long micros = vec.getLong(rowId);
return org.joda.time.Instant.ofEpochMilli(micros / 1000);
} else if (dataType instanceof DateType) {
int days = vec.getInt(rowId);
return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000);
} else if (dataType instanceof StructType) {
io.delta.kernel.data.Row structRow = vec.getStruct(rowId);
return convertKernelRowToBeamRow(structRow, inferBeamSchema((StructType) dataType));
} else if (dataType instanceof ArrayType) {
ArrayValue arrayVal = vec.getArray(rowId);
int size = arrayVal.getSize();
List<Object> list = new ArrayList<>(size);
DataType elemType = ((ArrayType) dataType).getElementType();
ColumnVector elementsVec = arrayVal.getElements();
for (int j = 0; j < size; j++) {
if (elementsVec.isNullAt(j)) {
list.add(null);
} else {
list.add(convertVectorValue(elementsVec, j, elemType));
}
}
return list;
} else if (dataType instanceof MapType) {
MapValue mapVal = vec.getMap(rowId);
int size = mapVal.getSize();
Map<Object, Object> map = new HashMap<>(size);
DataType keyType = ((MapType) dataType).getKeyType();
DataType valueType = ((MapType) dataType).getValueType();
ColumnVector keysVec = mapVal.getKeys();
ColumnVector valuesVec = mapVal.getValues();
for (int j = 0; j < size; j++) {
Object key = convertVectorValue(keysVec, j, keyType);
Object val = valuesVec.isNullAt(j) ? null : convertVectorValue(valuesVec, j, valueType);
map.put(key, val);
}
return map;
} else {
return vec.toString();
}
}| return input | ||
| .apply("CreateFileDescriptors", Create.of(fileDescriptors) | ||
| .withCoder(SerializableCoder.of(DeltaFileDescriptor.class))) |
There was a problem hiding this comment.
Using SerializableCoder for pipeline elements is highly discouraged in Apache Beam as it is slow and produces large serialized payloads. Since DeltaFileDescriptor is a simple POJO, you can annotate it with @DefaultSchema(JavaFieldSchema.class) and let Beam automatically infer and use a highly optimized SchemaCoder instead.
return input
.apply("CreateFileDescriptors", Create.of(fileDescriptors))
Closes #38693
Summary
This PR improves the reliability of the Playground CI Nightly workflow by addressing two recurring sources of flakiness observed in scheduled runs.
Root Causes
1. GitHub API rate limiting
The workflow retrieves the latest Beam release using an unauthenticated GitHub API request:
bash id="i6p9qh" https://api.github.com/repos/apache/beam/releases/latest
Under heavy CI activity, unauthenticated requests may hit GitHub REST API rate limits, causing invalid responses or null release tags. This results in downstream failures while resolving SDK versions and Docker images.
2. Backend startup race condition
The workflow launches Playground backend containers in detached mode and immediately starts the CI runner against port 8080.
Under slower runner conditions or higher system load, the backend service may not yet be ready to accept connections, causing intermittent connection failures and flaky CI runs.
Changes
GitHub API hardening
Backend readiness validation
Observability improvements
Validation
Validated by:
The readiness polling exits immediately once the backend becomes available, minimizing runtime overhead on successful runs.
Expected Impact
This change reduces intermittent Playground CI Nightly failures caused by:
The implementation improves determinism and observability while keeping workflow changes minimal and maintainable.
Testing
Verified:
No production runtime code paths were modified outside CI workflows.