Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, config: DBTCloudConnection):

self.job_ids = self.config.jobIds
self.project_ids = self.config.projectIds
self.environment_ids = self.config.environmentIds

client_config: ClientConfig = ClientConfig(
base_url=clean_uri(self.config.host),
Expand All @@ -72,6 +73,7 @@ def _get_jobs(
self,
job_id: str = None,
project_id: str = None,
environment_id: str = None,
) -> Iterable[DBTJob]:
"""
Fetch jobs for an account in dbt cloud
Expand All @@ -80,9 +82,17 @@ def _get_jobs(

try:
job_path = f"{job_id}/" if job_id else ""
project_path = f"?project_id={project_id}" if project_id else ""

# Build query string for filters
filters = []
if project_id:
filters.append(f"project_id={project_id}")
if environment_id:
filters.append(f"environment_id={environment_id}")
query_string = "?" + "&".join(filters) if filters else ""

result = self.client.get(
f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}",
f"/accounts/{self.config.accountId}/jobs/{job_path}{query_string}",
data=query_params,
)

Expand All @@ -101,7 +111,7 @@ def _get_jobs(

query_params["offset"] += query_params["limit"]
result = self.client.get(
f"/accounts/{self.config.accountId}/jobs/{job_path}{project_path}",
f"/accounts/{self.config.accountId}/jobs/{job_path}{query_string}",
data=query_params,
)
job_list_response = DBTJobList.model_validate(result)
Expand All @@ -110,7 +120,8 @@ def _get_jobs(
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Failed to get job info for project_id: `{project_id}` or job_id: `{job_id}` : {exc}"
f"Failed to get job info for project_id: `{project_id}`, "
f"environment_id: `{environment_id}` or job_id: `{job_id}` : {exc}"
)

def test_get_jobs(self) -> List[DBTJob]:
Expand All @@ -132,21 +143,31 @@ def get_jobs(self) -> Iterable[DBTJob]:
"""
List jobs for an account in dbt cloud using generator pattern.
yields job one at a time for memory efficiency.

Filter priority:
1. If jobIds specified - fetch specific jobs directly (highest priority)
2. If projectIds and/or environmentIds specified - fetch by those filters
3. If nothing specified - fetch all jobs
"""
try:
# case when job_ids are specified and project_ids are not
if self.job_ids and not self.project_ids:
# Case 1: jobIds specified - fetch specific jobs directly (highest priority)
if self.job_ids:
for job_id in self.job_ids:
yield from self._get_jobs(job_id=job_id)
# case when project_ids are specified or both are specified
elif self.project_ids:
for project_id in self.project_ids:
for job in self._get_jobs(project_id=project_id):
if self.job_ids:
if str(job.id) in self.job_ids:
yield job
else:
yield job

# Case 2: projectIds and/or environmentIds specified (no jobIds)
elif self.project_ids or self.environment_ids:
project_list = self.project_ids or [None]
env_list = self.environment_ids or [None]

for project_id in project_list:
for environment_id in env_list:
yield from self._get_jobs(
project_id=project_id,
environment_id=environment_id,
)

# Case 3: No filters specified - fetch all jobs
else:
yield from self._get_jobs()
except Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ def yield_pipeline_lineage_details(
}

for model in dbt_models or []:
if not model.runGeneratedAt:
logger.debug(
f"Skipping model with missing runGeneratedAt: name={getattr(model, 'name', None)}"
)
continue

if not all([model.name, model.database, model.dbtschema]):
logger.debug(
f"Skipping model with missing attributes: name={getattr(model, 'name', None)}, "
Expand Down Expand Up @@ -269,6 +275,15 @@ def yield_pipeline_lineage_details(
if not parent:
continue

# Check runGeneratedAt for models and seeds (not sources)
# Sources are auto-generated and don't have runGeneratedAt
is_source = unique_id.startswith("source.")
if not is_source and not parent.runGeneratedAt:
logger.debug(
f"Skipping parent with missing runGeneratedAt: uniqueId={unique_id}"
)
continue

if not all([parent.name, parent.database, parent.dbtschema]):
logger.debug(
f"Skipping parent with missing attributes: name={getattr(parent, 'name', None)}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DBTJob(BaseModel):
job_type: Optional[str] = None
schedule: Optional[DBTSchedule] = None
project_id: int
environment_id: Optional[int] = None


class Pagination(BaseModel):
Expand Down Expand Up @@ -69,6 +70,7 @@ class DBTSources(BaseModel):
name: Optional[str] = None
dbtschema: Optional[str] = Field(None, alias="schema")
database: Optional[str] = None
runGeneratedAt: Optional[str] = None
extra: Optional[Extra] = None


Expand All @@ -77,6 +79,7 @@ class DBTModel(BaseModel):
name: Optional[str] = None
dbtschema: Optional[str] = Field(None, alias="schema")
database: Optional[str] = None
runGeneratedAt: Optional[str] = None
dependsOn: Optional[List[str]] = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
database
schema
dependsOn
runGeneratedAt
}
seeds {
uniqueId
name
schema
database
runGeneratedAt
}
sources {
uniqueId
Expand Down
Loading
Loading