Skip to content

Commit 956670c

Browse files
authored
Feat(sqlmesh_dbt): Implement --model and --resource-type (#5443)
1 parent a95955c commit 956670c

File tree

8 files changed

+306
-17
lines changed

8 files changed

+306
-17
lines changed

sqlmesh/core/selector.py

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from sqlmesh.core.dialect import normalize_model_name
1717
from sqlmesh.core.environment import Environment
1818
from sqlmesh.core.model import update_model_schemas
19+
from sqlmesh.core.audit import StandaloneAudit
1920
from sqlmesh.utils import UniqueKeyDict
2021
from sqlmesh.utils.dag import DAG
2122
from sqlmesh.utils.git import GitClient
@@ -25,6 +26,7 @@
2526
if t.TYPE_CHECKING:
2627
from typing_extensions import Literal as Lit # noqa
2728
from sqlmesh.core.model import Model
29+
from sqlmesh.core.node import Node
2830
from sqlmesh.core.state_sync import StateReader
2931

3032

@@ -167,7 +169,7 @@ def get_model(fqn: str) -> t.Optional[Model]:
167169
return models
168170

169171
def expand_model_selections(
170-
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Model]] = None
172+
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
171173
) -> t.Set[str]:
172174
"""Expands a set of model selections into a set of model fqns that can be looked up in the Context.
173175
@@ -180,7 +182,7 @@ def expand_model_selections(
180182

181183
node = parse(" | ".join(f"({s})" for s in model_selections))
182184

183-
all_models = models or self._models
185+
all_models: t.Dict[str, Node] = models or dict(self._models)
184186
models_by_tags: t.Dict[str, t.Set[str]] = {}
185187

186188
for fqn, model in all_models.items():
@@ -226,6 +228,13 @@ def evaluate(node: exp.Expression) -> t.Set[str]:
226228
if fnmatch.fnmatchcase(tag, pattern)
227229
}
228230
return models_by_tags.get(pattern, set())
231+
if isinstance(node, ResourceType):
232+
resource_type = node.name.lower()
233+
return {
234+
fqn
235+
for fqn, model in all_models.items()
236+
if self._matches_resource_type(resource_type, model)
237+
}
229238
if isinstance(node, Direction):
230239
selected = set()
231240

@@ -243,36 +252,49 @@ def evaluate(node: exp.Expression) -> t.Set[str]:
243252
return evaluate(node)
244253

245254
@abc.abstractmethod
246-
def _model_name(self, model: Model) -> str:
255+
def _model_name(self, model: Node) -> str:
247256
"""Given a model, return the name that a selector pattern contining wildcards should be fnmatch'd on"""
248257
pass
249258

250259
@abc.abstractmethod
251-
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -> t.Set[str]:
260+
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
252261
"""Given a pattern, return the keys of the matching models from :all_models"""
253262
pass
254263

264+
@abc.abstractmethod
265+
def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
266+
"""Indicate whether or not the supplied model matches the supplied resource type"""
267+
pass
268+
255269

256270
class NativeSelector(Selector):
257271
"""Implementation of selectors that matches objects based on SQLMesh native names"""
258272

259-
def _model_name(self, model: Model) -> str:
273+
def _model_name(self, model: Node) -> str:
260274
return model.name
261275

262-
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -> t.Set[str]:
276+
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
263277
fqn = normalize_model_name(pattern, self._default_catalog, self._dialect)
264278
return {fqn} if fqn in all_models else set()
265279

280+
def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
281+
if resource_type == "model":
282+
return model.is_model
283+
if resource_type == "audit":
284+
return isinstance(model, StandaloneAudit)
285+
286+
raise SQLMeshError(f"Unsupported resource type: {resource_type}")
287+
266288

267289
class DbtSelector(Selector):
268290
"""Implementation of selectors that matches objects based on the DBT names instead of the SQLMesh native names"""
269291

270-
def _model_name(self, model: Model) -> str:
292+
def _model_name(self, model: Node) -> str:
271293
if dbt_fqn := model.dbt_fqn:
272294
return dbt_fqn
273295
raise SQLMeshError("dbt node information must be populated to use dbt selectors")
274296

275-
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -> t.Set[str]:
297+
def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Node]) -> t.Set[str]:
276298
# a pattern like "staging.customers" should match a model called "jaffle_shop.staging.customers"
277299
# but not a model called "jaffle_shop.customers.staging"
278300
# also a pattern like "aging" should not match "staging" so we need to consider components; not substrings
@@ -306,6 +328,40 @@ def _pattern_to_model_fqns(self, pattern: str, all_models: t.Dict[str, Model]) -
306328
matches.add(fqn)
307329
return matches
308330

331+
def _matches_resource_type(self, resource_type: str, model: Node) -> bool:
332+
"""
333+
ref: https://docs.getdbt.com/reference/node-selection/methods#resource_type
334+
335+
# supported by SQLMesh
336+
"model"
337+
"seed"
338+
"source" # external model
339+
"test" # standalone audit
340+
341+
# not supported by SQLMesh yet, commented out to throw an error if someone tries to use them
342+
"analysis"
343+
"exposure"
344+
"metric"
345+
"saved_query"
346+
"semantic_model"
347+
"snapshot"
348+
"unit_test"
349+
"""
350+
if resource_type not in ("model", "seed", "source", "test"):
351+
raise SQLMeshError(f"Unsupported resource type: {resource_type}")
352+
353+
if isinstance(model, StandaloneAudit):
354+
return resource_type == "test"
355+
356+
if resource_type == "model":
357+
return model.is_model and not model.kind.is_external and not model.kind.is_seed
358+
if resource_type == "source":
359+
return model.kind.is_external
360+
if resource_type == "seed":
361+
return model.kind.is_seed
362+
363+
return False
364+
309365

310366
class SelectorDialect(Dialect):
311367
IDENTIFIERS_CAN_START_WITH_DIGIT = True
@@ -336,6 +392,10 @@ class Tag(exp.Expression):
336392
pass
337393

338394

395+
class ResourceType(exp.Expression):
396+
pass
397+
398+
339399
class Direction(exp.Expression):
340400
pass
341401

@@ -388,7 +448,8 @@ def _parse_var() -> exp.Expression:
388448
upstream = _match(TokenType.PLUS)
389449
downstream = None
390450
tag = _parse_kind("tag")
391-
git = False if tag else _parse_kind("git")
451+
resource_type = False if tag else _parse_kind("resource_type")
452+
git = False if resource_type else _parse_kind("git")
392453
lstar = "*" if _match(TokenType.STAR) else ""
393454
directions = {}
394455

@@ -414,6 +475,8 @@ def _parse_var() -> exp.Expression:
414475

415476
if tag:
416477
this = Tag(this=this)
478+
if resource_type:
479+
this = ResourceType(this=this)
417480
if git:
418481
this = Git(this=this)
419482
if directions:

sqlmesh_dbt/cli.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,39 @@ def _cleanup() -> None:
3333

3434
select_option = click.option(
3535
"-s",
36-
"-m",
3736
"--select",
37+
multiple=True,
38+
help="Specify the nodes to include.",
39+
)
40+
model_option = click.option(
41+
"-m",
3842
"--models",
3943
"--model",
4044
multiple=True,
41-
help="Specify the nodes to include.",
45+
help="Specify the model nodes to include; other nodes are excluded.",
4246
)
4347
exclude_option = click.option("--exclude", multiple=True, help="Specify the nodes to exclude.")
4448

49+
# TODO: expand this out into --resource-type/--resource-types and --exclude-resource-type/--exclude-resource-types
50+
resource_types = [
51+
"metric",
52+
"semantic_model",
53+
"saved_query",
54+
"source",
55+
"analysis",
56+
"model",
57+
"test",
58+
"unit_test",
59+
"exposure",
60+
"snapshot",
61+
"seed",
62+
"default",
63+
"all",
64+
]
65+
resource_type_option = click.option(
66+
"--resource-type", type=click.Choice(resource_types, case_sensitive=False)
67+
)
68+
4569

4670
@click.group(cls=ErrorHandlingGroup, invoke_without_command=True)
4771
@click.option("--profile", help="Which existing profile to load. Overrides output.profile")
@@ -86,7 +110,9 @@ def dbt(
86110

87111
@dbt.command()
88112
@select_option
113+
@model_option
89114
@exclude_option
115+
@resource_type_option
90116
@click.option(
91117
"-f",
92118
"--full-refresh",
@@ -116,7 +142,9 @@ def run(
116142

117143
@dbt.command(name="list")
118144
@select_option
145+
@model_option
119146
@exclude_option
147+
@resource_type_option
120148
@vars_option
121149
@click.pass_context
122150
def list_(ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], **kwargs: t.Any) -> None:

sqlmesh_dbt/operations.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ def list_(
2626
self,
2727
select: t.Optional[t.List[str]] = None,
2828
exclude: t.Optional[t.List[str]] = None,
29+
models: t.Optional[t.List[str]] = None,
30+
resource_type: t.Optional[str] = None,
2931
) -> None:
3032
# dbt list prints:
3133
# - models
3234
# - "data tests" (audits) for those models
3335
# it also applies selectors which is useful for testing selectors
34-
selected_models = list(self._selected_models(select, exclude).values())
36+
selected_models = list(
37+
self._selected_models(select, exclude, models, resource_type).values()
38+
)
3539
self.console.list_models(
3640
selected_models, {k: v.node for k, v in self.context.snapshots.items()}
3741
)
@@ -41,13 +45,19 @@ def run(
4145
environment: t.Optional[str] = None,
4246
select: t.Optional[t.List[str]] = None,
4347
exclude: t.Optional[t.List[str]] = None,
48+
models: t.Optional[t.List[str]] = None,
49+
resource_type: t.Optional[str] = None,
4450
full_refresh: bool = False,
4551
empty: bool = False,
4652
) -> Plan:
53+
consolidated_select, consolidated_exclude = selectors.consolidate(
54+
select or [], exclude or [], models or [], resource_type
55+
)
56+
4757
plan_builder = self._plan_builder(
4858
environment=environment,
49-
select=select,
50-
exclude=exclude,
59+
select=consolidated_select,
60+
exclude=consolidated_exclude,
5161
full_refresh=full_refresh,
5262
empty=empty,
5363
)
@@ -86,9 +96,15 @@ def _plan_builder(
8696
)
8797

8898
def _selected_models(
89-
self, select: t.Optional[t.List[str]] = None, exclude: t.Optional[t.List[str]] = None
99+
self,
100+
select: t.Optional[t.List[str]] = None,
101+
exclude: t.Optional[t.List[str]] = None,
102+
models: t.Optional[t.List[str]] = None,
103+
resource_type: t.Optional[str] = None,
90104
) -> t.Dict[str, Model]:
91-
if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []):
105+
if sqlmesh_selector := selectors.to_sqlmesh(
106+
*selectors.consolidate(select or [], exclude or [], models or [], resource_type)
107+
):
92108
if self.debug:
93109
self.console.print(f"dbt --select: {select}")
94110
self.console.print(f"dbt --exclude: {exclude}")

sqlmesh_dbt/selectors.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,45 @@
44
logger = logging.getLogger(__name__)
55

66

7-
def to_sqlmesh(dbt_select: t.Collection[str], dbt_exclude: t.Collection[str]) -> t.Optional[str]:
7+
def consolidate(
8+
select: t.List[str],
9+
exclude: t.List[str],
10+
models: t.List[str],
11+
resource_type: t.Optional[str],
12+
) -> t.Tuple[t.List[str], t.List[str]]:
13+
"""
14+
Given a bunch of dbt CLI arguments that may or may not be defined:
15+
--select, --exclude, --models, --resource-type
16+
17+
Combine them into a single set of --select/--exclude node selectors, throwing an error if mutually exclusive combinations are provided
18+
Note that the returned value is still in dbt format, pass it to to_sqlmesh() to create a selector for the sqlmesh selector engine
19+
"""
20+
if models and select:
21+
raise ValueError('"models" and "select" are mutually exclusive arguments')
22+
23+
if models and resource_type:
24+
raise ValueError('"models" and "resource_type" are mutually exclusive arguments')
25+
26+
if models:
27+
# --models implies resource_type:model
28+
resource_type = "model"
29+
30+
if resource_type:
31+
resource_type_selector = f"resource_type:{resource_type}"
32+
all_selectors = [*select, *models]
33+
select = (
34+
[
35+
f"resource_type:{resource_type},{original_selector}"
36+
for original_selector in all_selectors
37+
]
38+
if all_selectors
39+
else [resource_type_selector]
40+
)
41+
42+
return select, exclude
43+
44+
45+
def to_sqlmesh(dbt_select: t.List[str], dbt_exclude: t.List[str]) -> t.Optional[str]:
846
"""
947
Given selectors defined in the format of the dbt cli --select and --exclude arguments, convert them into a selector expression that
1048
the SQLMesh selector engine can understand.

0 commit comments

Comments
 (0)