Skip to content

Commit

Permalink
refactor: refactor promote and demote into method (#3421)
Browse files Browse the repository at this point in the history
  • Loading branch information
benfdking authored Nov 25, 2024
1 parent 002f846 commit c9057d4
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sqlmesh.core import analytics
from sqlmesh.core import constants as c
from sqlmesh.core.console import Console, get_console
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core.notification_target import (
NotificationTarget,
)
Expand All @@ -32,6 +33,8 @@
SnapshotEvaluator,
SnapshotIntervals,
SnapshotId,
SnapshotInfoLike,
SnapshotTableInfo,
)
from sqlmesh.core.state_sync import StateSync
from sqlmesh.core.state_sync.base import PromotionResult
Expand Down Expand Up @@ -306,14 +309,16 @@ def _update_views(

completed = False
try:
self.snapshot_evaluator.promote(
self._promote_snapshots(
plan,
[snapshots[s.snapshot_id] for s in promotion_result.added],
environment.naming_info,
deployability_index=deployability_index,
on_complete=lambda s: self.console.update_promotion_progress(s, True),
)
if promotion_result.removed_environment_naming_info:
self.snapshot_evaluator.demote(
self._demote_snapshots(
plan,
promotion_result.removed,
promotion_result.removed_environment_naming_info,
on_complete=lambda s: self.console.update_promotion_progress(s, False),
Expand All @@ -323,6 +328,32 @@ def _update_views(
finally:
self.console.stop_promotion_progress(success=completed)

def _promote_snapshots(
self,
plan: EvaluatablePlan,
target_snapshots: t.Iterable[Snapshot],
environment_naming_info: EnvironmentNamingInfo,
deployability_index: t.Optional[DeployabilityIndex] = None,
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
) -> None:
self.snapshot_evaluator.promote(
target_snapshots,
environment_naming_info,
deployability_index=deployability_index,
on_complete=on_complete,
)

def _demote_snapshots(
self,
plan: EvaluatablePlan,
target_snapshots: t.Iterable[SnapshotTableInfo],
environment_naming_info: EnvironmentNamingInfo,
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
) -> None:
self.snapshot_evaluator.demote(
target_snapshots, environment_naming_info, on_complete=on_complete
)

def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]) -> None:
if not plan.restatements:
return
Expand Down

0 comments on commit c9057d4

Please sign in to comment.