From 0c421d0ef4435c2eb59b8a9641128f59658d7a21 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Mon, 13 Apr 2026 14:51:04 -0700 Subject: [PATCH 1/2] Fix seed PR backfill interval overrides Signed-off-by: eakmanrq <6326532+eakmanrq@users.noreply.github.com> --- sqlmesh/core/context.py | 33 +++++++++++++++++++++++++++++++-- tests/core/test_context.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index dc51aad2a7..24ed80dfb1 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1666,6 +1666,7 @@ def plan_builder( # This ensures that no models outside the impacted sub-DAG(s) will be backfilled unexpectedly. backfill_models = modified_model_names or None + plan_execution_time = execution_time or now() max_interval_end_per_model = None default_start, default_end = None, None if not run: @@ -1680,17 +1681,31 @@ def plan_builder( max_interval_end_per_model, backfill_models, modified_model_names, - execution_time or now(), + plan_execution_time, ) + if ( + start + and default_end + and to_datetime(start, relative_base=to_datetime(plan_execution_time)) + > to_datetime(default_end) + ): + # If the requested start is newer than prod's latest interval end, fall back to execution time + # instead of forcing an invalid [start, default_end] range. + default_start, default_end = None, None + # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) + max_interval_end_per_model = self._filter_stale_end_overrides( + max_interval_end_per_model, + context_diff.snapshots_by_name, + ) start_override_per_model = self._calculate_start_override_per_model( min_intervals, start or default_start, end or default_end, - execution_time or now(), + plan_execution_time, backfill_models, snapshots, max_interval_end_per_model, @@ -3181,6 +3196,20 @@ def _get_max_interval_end_per_model( ).items() } + @staticmethod + def _filter_stale_end_overrides( + max_interval_end_per_model: t.Dict[str, datetime], + snapshots_by_name: t.Dict[str, Snapshot], + ) -> t.Dict[str, datetime]: + # Drop stale interval ends for snapshots whose new versions have no intervals yet. Otherwise the old + # prod end is reused as an end_override, causing missing_intervals() to skip the new snapshot entirely + # when the requested start is newer than that stale end. + return { + model_fqn: end + for model_fqn, end in max_interval_end_per_model.items() + if model_fqn not in snapshots_by_name or snapshots_by_name[model_fqn].intervals + } + @staticmethod def _get_models_for_interval_end( snapshots: t.Dict[str, Snapshot], backfill_models: t.Set[str] diff --git a/tests/core/test_context.py b/tests/core/test_context.py index c3d88e205e..88b6e42772 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1223,6 +1223,42 @@ def test_plan_seed_model_excluded_from_default_end(copy_to_temp_path: t.Callable context.close() +@pytest.mark.slow +def test_seed_model_pr_plan_with_stale_prod_intervals(copy_to_temp_path: t.Callable): + path = copy_to_temp_path("examples/sushi") + + with time_machine.travel("2024-06-01 00:00:00 UTC"): + context = Context(paths=path, gateway="duckdb_persistent") + context.plan("prod", no_prompts=True, auto_apply=True) + context.close() + + with time_machine.travel("2026-04-13 00:00:00 UTC"): + context = Context(paths=path, gateway="duckdb_persistent") + + model = context.get_model("sushi.waiter_names").copy() + model.seed.content += "10,Trey\n" + context.upsert_model(model) + + plan = context.plan("dev", start="2 months ago", no_prompts=True) + missing_interval_names = {si.snapshot_id.name for si in plan.missing_intervals} + + assert plan.user_provided_flags == {"start": "2 months ago"} + assert plan.provided_end is None + assert to_timestamp(plan.start) == to_timestamp("2026-02-13") + assert to_timestamp(plan.end) == to_timestamp("2026-04-13") + assert any("waiter_names" in name for name in missing_interval_names) + assert any("waiter_as_customer_by_day" in name for name in missing_interval_names) + + context.apply(plan) + + promoted_snapshot_names = { + snapshot.name for snapshot in context.state_sync.get_environment("dev").promoted_snapshots + } + assert any("waiter_names" in name for name in promoted_snapshot_names) + assert any("waiter_as_customer_by_day" in name for name in promoted_snapshot_names) + context.close() + + @pytest.mark.slow def test_schema_error_no_default(sushi_context_pre_scheduling) -> None: context = sushi_context_pre_scheduling From 60086e2e5a5c0bf21243260c8cfe4faadc83213c Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:14:37 -0700 Subject: [PATCH 2/2] Narrow seed PR stale override fix Signed-off-by: eakmanrq <6326532+eakmanrq@users.noreply.github.com> --- sqlmesh/core/context.py | 10 ------- tests/core/test_context.py | 60 ++++++++++++++++++++++++++++++++------ 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 24ed80dfb1..0c49b557b7 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1684,16 +1684,6 @@ def plan_builder( plan_execution_time, ) - if ( - start - and default_end - and to_datetime(start, relative_base=to_datetime(plan_execution_time)) - > to_datetime(default_end) - ): - # If the requested start is newer than prod's latest interval end, fall back to execution time - # instead of forcing an invalid [start, default_end] range. - default_start, default_end = None, None - # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) max_interval_end_per_model = self._filter_stale_end_overrides( diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 88b6e42772..f4dbde2f25 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -38,7 +38,7 @@ from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements from sqlmesh.core.plan.definition import Plan from sqlmesh.core.macros import MacroEvaluator, RuntimeStage -from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model +from sqlmesh.core.model import SeedModel, load_sql_based_model, model, SqlModel, Model from sqlmesh.core.model.common import ParsableSql from sqlmesh.core.model.cache import OptimizedQueryCache from sqlmesh.core.renderer import render_statements @@ -1224,7 +1224,9 @@ def test_plan_seed_model_excluded_from_default_end(copy_to_temp_path: t.Callable @pytest.mark.slow -def test_seed_model_pr_plan_with_stale_prod_intervals(copy_to_temp_path: t.Callable): +def test_seed_model_pr_plan_filters_stale_end_override( + copy_to_temp_path: t.Callable, mocker: MockerFixture +): path = copy_to_temp_path("examples/sushi") with time_machine.travel("2024-06-01 00:00:00 UTC"): @@ -1235,27 +1237,67 @@ def test_seed_model_pr_plan_with_stale_prod_intervals(copy_to_temp_path: t.Calla with time_machine.travel("2026-04-13 00:00:00 UTC"): context = Context(paths=path, gateway="duckdb_persistent") - model = context.get_model("sushi.waiter_names").copy() + model = t.cast(SeedModel, context.get_model("sushi.waiter_names").copy()) model.seed.content += "10,Trey\n" context.upsert_model(model) + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL ( + name sushi.waiter_rollup, + kind FULL, + cron '@daily' + ); + + SELECT waiter_id, waiter_name, event_date + FROM sushi.waiter_as_customer_by_day + """ + ), + default_catalog=context.default_catalog, + ) + ) + + waiter_as_customer_by_day = context.get_snapshot( + "sushi.waiter_as_customer_by_day", raise_if_missing=True + ) + orders = context.get_snapshot("sushi.orders", raise_if_missing=True) + original_get_max_interval_end_per_model = context._get_max_interval_end_per_model + + def _mocked_max_interval_end_per_model( + snapshots: t.Dict[str, t.Any], backfill_models: t.Optional[t.Set[str]] + ) -> t.Dict[str, datetime]: + result = original_get_max_interval_end_per_model(snapshots, backfill_models) + # Keep the overall plan end recent via another affected model while making the old prod end for + # waiter_as_customer_by_day older than the PR start. Without filtering, that stale end_override + # causes the new waiter_as_customer_by_day snapshot to be skipped and waiter_rollup fails when it + # references the missing physical table. + result[waiter_as_customer_by_day.name] = to_datetime("2026-01-01") + result[orders.name] = to_datetime("2026-04-13") + return result + + mocker.patch.object( + context, + "_get_max_interval_end_per_model", + side_effect=_mocked_max_interval_end_per_model, + ) plan = context.plan("dev", start="2 months ago", no_prompts=True) missing_interval_names = {si.snapshot_id.name for si in plan.missing_intervals} assert plan.user_provided_flags == {"start": "2 months ago"} - assert plan.provided_end is None assert to_timestamp(plan.start) == to_timestamp("2026-02-13") assert to_timestamp(plan.end) == to_timestamp("2026-04-13") - assert any("waiter_names" in name for name in missing_interval_names) assert any("waiter_as_customer_by_day" in name for name in missing_interval_names) + assert any("waiter_rollup" in name for name in missing_interval_names) context.apply(plan) - promoted_snapshot_names = { - snapshot.name for snapshot in context.state_sync.get_environment("dev").promoted_snapshots - } - assert any("waiter_names" in name for name in promoted_snapshot_names) + environment = context.state_sync.get_environment("dev") + assert environment is not None + promoted_snapshot_names = {snapshot.name for snapshot in environment.promoted_snapshots} assert any("waiter_as_customer_by_day" in name for name in promoted_snapshot_names) + assert any("waiter_rollup" in name for name in promoted_snapshot_names) context.close()