-
Notifications
You must be signed in to change notification settings - Fork 376
Fix seed PR backfill interval overrides #5764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,7 @@ | |
| from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements | ||
| from sqlmesh.core.plan.definition import Plan | ||
| from sqlmesh.core.macros import MacroEvaluator, RuntimeStage | ||
| from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model | ||
| from sqlmesh.core.model import SeedModel, load_sql_based_model, model, SqlModel, Model | ||
| from sqlmesh.core.model.common import ParsableSql | ||
| from sqlmesh.core.model.cache import OptimizedQueryCache | ||
| from sqlmesh.core.renderer import render_statements | ||
|
|
@@ -1223,6 +1223,84 @@ def test_plan_seed_model_excluded_from_default_end(copy_to_temp_path: t.Callable | |
| context.close() | ||
|
|
||
|
|
||
| @pytest.mark.slow | ||
| def test_seed_model_pr_plan_filters_stale_end_override( | ||
| copy_to_temp_path: t.Callable, mocker: MockerFixture | ||
| ): | ||
| path = copy_to_temp_path("examples/sushi") | ||
|
Comment on lines
+1226
to
+1230
|
||
|
|
||
| with time_machine.travel("2024-06-01 00:00:00 UTC"): | ||
| context = Context(paths=path, gateway="duckdb_persistent") | ||
| context.plan("prod", no_prompts=True, auto_apply=True) | ||
| context.close() | ||
|
|
||
| with time_machine.travel("2026-04-13 00:00:00 UTC"): | ||
| context = Context(paths=path, gateway="duckdb_persistent") | ||
|
|
||
| model = t.cast(SeedModel, context.get_model("sushi.waiter_names").copy()) | ||
| model.seed.content += "10,Trey\n" | ||
| context.upsert_model(model) | ||
| context.upsert_model( | ||
| load_sql_based_model( | ||
| parse( | ||
| """ | ||
| MODEL ( | ||
| name sushi.waiter_rollup, | ||
| kind FULL, | ||
| cron '@daily' | ||
| ); | ||
|
|
||
| SELECT waiter_id, waiter_name, event_date | ||
| FROM sushi.waiter_as_customer_by_day | ||
| """ | ||
| ), | ||
| default_catalog=context.default_catalog, | ||
| ) | ||
| ) | ||
|
|
||
| waiter_as_customer_by_day = context.get_snapshot( | ||
| "sushi.waiter_as_customer_by_day", raise_if_missing=True | ||
| ) | ||
| orders = context.get_snapshot("sushi.orders", raise_if_missing=True) | ||
| original_get_max_interval_end_per_model = context._get_max_interval_end_per_model | ||
|
|
||
| def _mocked_max_interval_end_per_model( | ||
| snapshots: t.Dict[str, t.Any], backfill_models: t.Optional[t.Set[str]] | ||
| ) -> t.Dict[str, datetime]: | ||
| result = original_get_max_interval_end_per_model(snapshots, backfill_models) | ||
| # Keep the overall plan end recent via another affected model while making the old prod end for | ||
| # waiter_as_customer_by_day older than the PR start. Without filtering, that stale end_override | ||
| # causes the new waiter_as_customer_by_day snapshot to be skipped and waiter_rollup fails when it | ||
| # references the missing physical table. | ||
| result[waiter_as_customer_by_day.name] = to_datetime("2026-01-01") | ||
| result[orders.name] = to_datetime("2026-04-13") | ||
| return result | ||
|
|
||
| mocker.patch.object( | ||
| context, | ||
| "_get_max_interval_end_per_model", | ||
| side_effect=_mocked_max_interval_end_per_model, | ||
| ) | ||
|
|
||
| plan = context.plan("dev", start="2 months ago", no_prompts=True) | ||
| missing_interval_names = {si.snapshot_id.name for si in plan.missing_intervals} | ||
|
|
||
| assert plan.user_provided_flags == {"start": "2 months ago"} | ||
| assert to_timestamp(plan.start) == to_timestamp("2026-02-13") | ||
| assert to_timestamp(plan.end) == to_timestamp("2026-04-13") | ||
| assert any("waiter_as_customer_by_day" in name for name in missing_interval_names) | ||
| assert any("waiter_rollup" in name for name in missing_interval_names) | ||
|
|
||
| context.apply(plan) | ||
|
|
||
| environment = context.state_sync.get_environment("dev") | ||
| assert environment is not None | ||
| promoted_snapshot_names = {snapshot.name for snapshot in environment.promoted_snapshots} | ||
| assert any("waiter_as_customer_by_day" in name for name in promoted_snapshot_names) | ||
| assert any("waiter_rollup" in name for name in promoted_snapshot_names) | ||
| context.close() | ||
|
|
||
|
|
||
| @pytest.mark.slow | ||
| def test_schema_error_no_default(sushi_context_pre_scheduling) -> None: | ||
| context = sushi_context_pre_scheduling | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plan_execution_time = execution_time or now()will ignore valid falsyexecution_timevalues (e.g., epoch millis0) and silently replace them withnow(). Use an explicitNonecheck (e.g.,execution_time if execution_time is not None else now()) to preserve caller-provided values.