Fix Cloud Run deferrable trigger handling of transient grpc errors#67219
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds resilience to the Cloud Run job-finished trigger by retrying transient Cloud Run API 503s during polling, preventing deferred tasks from failing and re-submitting jobs unnecessarily.
Changes:
- Retry
get_operationpolling whengoogle.api_core.exceptions.ServiceUnavailableoccurs. - Add unit tests to verify retry-on-503 behavior and propagation of non-retryable exceptions.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py | Catch and retry ServiceUnavailable during operation polling with a logged warning and sleep. |
| providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py | Add tests covering retryable 503 behavior and non-retryable exception propagation. |
6c5bd38 to
d162324
Compare
The CloudRunJobFinishedTrigger polls the long-running operation via CloudRunAsyncHook.get_operation in its loop. When that gRPC call fails with a transient 503 ServiceUnavailable — typical of a regional Cloud Run API blip while the underlying job is still progressing — the exception propagates out of the trigger, the triggerer logs the failure, and the deferred task is failed with TaskDeferralError. The worker's task-level retry then re-runs the operator from scratch, which re-submits a brand new Cloud Run execution rather than waiting on the in-flight one. Catch ServiceUnavailable inside the polling loop, log a warning, sleep polling_period_seconds, and continue — mirroring the equivalent fix in DataflowJobStatusTrigger (apache#66293). Other exceptions still propagate so Airflow's task-level retry can take over for genuinely terminal errors. Tests cover the new retry behavior (one ServiceUnavailable followed by a successful poll yields the SUCCESS TriggerEvent) and lock in that unexpected exceptions are not silently swallowed. Signed-off-by: Akshet Pandey <argetlam.akshet@gmail.com>
8c8c77c to
bdd4389
Compare
|
@shahar1 Updated the PR to address your comments |
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
related: #66293
Problem
CloudRunJobFinishedTrigger.run()polls the long-running operation viaCloudRunAsyncHook.get_operationin its loop. When that gRPC call failswith a transient error — typical of a regional Cloud Run API blip while
the underlying job is still progressing fine — the exception
propagates out of the trigger:
TaskDeferralError.which submits a brand new Cloud Run execution rather than
waiting on the in-flight one. So a 1-second transient API blip turns
into a duplicate (and billed) job run.
This mirrors the same class of bug fixed for Dataflow in #66293.
How to fix
Catch the full set of retryable gRPC errors inside the Cloud Run
get-operation polling loop, log a warning, sleep
polling_period_seconds, and continue polling. The retryable tuple is:ServiceUnavailable(503 / UNAVAILABLE)InternalServerError(500 / INTERNAL)DeadlineExceeded(504 / DEADLINE_EXCEEDED)GatewayTimeoutResourceExhausted(429 / RESOURCE_EXHAUSTED)AbortedAnything outside that tuple (
PermissionDenied,NotFound, authfailures, unexpected
RuntimeErrors) still propagates, so Airflow'stask-level retry remains the safety net for genuinely terminal
failures.
Tests
Two parametrized tests cover the new behavior:
test_trigger_continues_polling_after_retryable_grpc_error— runsonce per entry in the retryable tuple. The first
get_operationraises the parametrized exception; the second returns a successfully
completed operation. The trigger must yield the
SUCCESSTriggerEventandasyncio.sleepmust be awaited exactly once withpolling_period_seconds.test_trigger_propagates_unexpected_polling_exception—parametrized over
PermissionDeniedand a bareRuntimeError. Bothmust still propagate out of
run(), locking in that only theretryable tuple is swallowed.
All pre-existing tests in
test_cloud_run.pycontinue to pass.Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.7 following the guidelines