Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,49 @@ def _date_ref_from_field(
],
)

@staticmethod
def _build_parent_reference_sources(
definition: CustomDatasetDefinition,
) -> list[CatalogDeclarativeReferenceSource]:
"""Resolve the list of parent reference sources.

Precedence:
* If ``parent_dataset_references`` is set and non-empty, use it as-is.
* Otherwise fall back to the legacy single-column fields wrapped in a
one-element list. Missing legacy fields yield an empty list, which
will be rejected downstream by the GoodData API.
"""
if definition.parent_dataset_references:
return [
CatalogDeclarativeReferenceSource(
column=ref.source_column,
data_type=ref.data_type.value,
target=CatalogGrainIdentifier(
id=ref.attribute_id,
type=CustomFieldType.ATTRIBUTE.value,
),
)
for ref in definition.parent_dataset_references
]

if (
definition.dataset_reference_source_column is not None
and definition.dataset_reference_source_column_data_type is not None
and definition.parent_dataset_reference_attribute_id is not None
):
return [
CatalogDeclarativeReferenceSource(
column=definition.dataset_reference_source_column,
data_type=definition.dataset_reference_source_column_data_type.value,
target=CatalogGrainIdentifier(
id=definition.parent_dataset_reference_attribute_id,
type=CustomFieldType.ATTRIBUTE.value,
),
)
]

return []

@staticmethod
def _get_sources(
dataset: CustomDataset,
Expand Down Expand Up @@ -253,6 +296,41 @@ def datasets_to_ldm(
# Get the data source info
dataset_source_table_id, dataset_sql = self._get_sources(dataset)

# Build the parent reference source list. The composite-friendly
# `parent_dataset_references` list takes precedence when set and
# non-empty; otherwise fall back to the legacy single-column fields.
parent_reference_sources = self._build_parent_reference_sources(
dataset.definition
)

# Workspace data filter fields are optional and must be set together
# (validated on the input model). Emit columns/references only when
# both are provided.
wdf_columns: list[CatalogDeclarativeWorkspaceDataFilterColumn] = []
wdf_references: list[
CatalogDeclarativeWorkspaceDataFilterReferences
] = []
if (
dataset.definition.workspace_data_filter_id is not None
and dataset.definition.workspace_data_filter_column_name
is not None
):
wdf_columns.append(
CatalogDeclarativeWorkspaceDataFilterColumn(
name=dataset.definition.workspace_data_filter_column_name,
data_type=ColumnDataType.STRING.value,
)
)
wdf_references.append(
CatalogDeclarativeWorkspaceDataFilterReferences(
filter_id=CatalogDatasetWorkspaceDataFilterIdentifier(
id=dataset.definition.workspace_data_filter_id
),
filter_column=dataset.definition.workspace_data_filter_column_name,
filter_column_data_type=ColumnDataType.STRING.value,
)
)

# Construct the declarative dataset object and append it to the list.
declarative_datasets.append(
CatalogDeclarativeDataset(
Expand All @@ -265,16 +343,7 @@ def datasets_to_ldm(
id=dataset.definition.parent_dataset_reference,
),
multivalue=True,
sources=[
CatalogDeclarativeReferenceSource(
column=dataset.definition.dataset_reference_source_column,
data_type=dataset.definition.dataset_reference_source_column_data_type.value,
target=CatalogGrainIdentifier(
id=dataset.definition.parent_dataset_reference_attribute_id,
type=CustomFieldType.ATTRIBUTE.value,
),
)
],
sources=parent_reference_sources,
),
]
+ date_references,
Expand All @@ -283,21 +352,8 @@ def datasets_to_ldm(
facts=facts,
data_source_table_id=dataset_source_table_id,
sql=dataset_sql,
workspace_data_filter_columns=[
CatalogDeclarativeWorkspaceDataFilterColumn(
name=dataset.definition.workspace_data_filter_column_name,
data_type=ColumnDataType.STRING.value,
)
],
workspace_data_filter_references=[
CatalogDeclarativeWorkspaceDataFilterReferences(
filter_id=CatalogDatasetWorkspaceDataFilterIdentifier(
id=dataset.definition.workspace_data_filter_id
),
filter_column=dataset.definition.workspace_data_filter_column_name,
filter_column_data_type=ColumnDataType.STRING.value,
)
],
workspace_data_filter_columns=wdf_columns or None,
workspace_data_filter_references=wdf_references or None,
tags=_effective_dataset_tags(dataset.definition),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,43 @@ def check_ids_not_equal(self) -> "CustomFieldDefinition":
return self


class ParentDatasetReference(BaseModel):
"""One column of a (possibly composite) join to the parent dataset.

A list of these on ``CustomDatasetDefinition.parent_dataset_references``
supports multi-column foreign keys. Each entry binds a source column on the
new dataset to a grain attribute on the parent.
"""

attribute_id: str = Field(
description="Attribute ID on the parent dataset that this column joins to.",
)
source_column: str = Field(
description="Column name on this dataset used to join to the parent.",
)
data_type: ColumnDataType = Field(
description="Data type of the source column.",
)


class CustomDatasetDefinition(BaseModel):
"""Input model for custom dataset definition."""
"""Input model for custom dataset definition.

The reference to the parent dataset can be expressed in two ways:

* The legacy single-column form via ``parent_dataset_reference_attribute_id``,
``dataset_reference_source_column`` and ``dataset_reference_source_column_data_type``.
All three must be provided together.
* The composite-friendly form via ``parent_dataset_references``: a list of
``ParentDatasetReference`` entries, one per join column.

When ``parent_dataset_references`` is set and non-empty, it takes precedence
and the legacy fields are ignored. Otherwise the legacy fields are used.

Workspace data filter fields are optional. Both must be set together or
both left unset; when set, a single-column WDF binding is emitted on the
declarative dataset.
"""

workspace_id: str
dataset_id: str
Expand All @@ -71,11 +106,36 @@ class CustomDatasetDefinition(BaseModel):
dataset_source_table: str | None
dataset_source_sql: str | None
parent_dataset_reference: str
parent_dataset_reference_attribute_id: str
dataset_reference_source_column: str
dataset_reference_source_column_data_type: ColumnDataType
workspace_data_filter_id: str
workspace_data_filter_column_name: str
parent_dataset_reference_attribute_id: str | None = Field(
default=None,
deprecated=(
"Use `parent_dataset_references` for richer (composite-key) joins. "
"This field will be removed in a future release."
),
)
dataset_reference_source_column: str | None = Field(
default=None,
deprecated=(
"Use `parent_dataset_references` for richer (composite-key) joins. "
"This field will be removed in a future release."
),
)
dataset_reference_source_column_data_type: ColumnDataType | None = Field(
default=None,
deprecated=(
"Use `parent_dataset_references` for richer (composite-key) joins. "
"This field will be removed in a future release."
),
)
parent_dataset_references: list[ParentDatasetReference] | None = Field(
default=None,
description=(
"Composite-key reference to the parent dataset. When provided and "
"non-empty, supersedes the legacy single-column reference fields."
),
)
workspace_data_filter_id: str | None = None
workspace_data_filter_column_name: str | None = None
dataset_description: str | None = Field(
default=None,
description="Declarative description on the custom dataset.",
Expand All @@ -98,6 +158,41 @@ def check_source(self) -> "CustomDatasetDefinition":
)
return self

@model_validator(mode="after")
def check_reference_form_exclusive(self) -> "CustomDatasetDefinition":
"""Reject mixing the legacy single-column fields with ``parent_dataset_references``.

Forcing callers to pick one form prevents silent precedence surprises:
without this check, setting both would quietly use the new list and
ignore the legacy values, which is easy to miss when debugging.
"""
has_new = bool(self.parent_dataset_references)
has_legacy = (
self.parent_dataset_reference_attribute_id is not None
or self.dataset_reference_source_column is not None
or self.dataset_reference_source_column_data_type is not None
)
if has_new and has_legacy:
raise ValueError(
"Set either `parent_dataset_references` or the legacy single-column "
"fields (`parent_dataset_reference_attribute_id`, "
"`dataset_reference_source_column`, "
"`dataset_reference_source_column_data_type`), not both."
)
return self

@model_validator(mode="after")
def check_wdf_pair(self) -> "CustomDatasetDefinition":
"""Workspace data filter id and column name must be provided together or both omitted."""
has_id = self.workspace_data_filter_id is not None
has_col = self.workspace_data_filter_column_name is not None
if has_id != has_col:
raise ValueError(
"workspace_data_filter_id and workspace_data_filter_column_name "
"must both be set or both be omitted"
)
return self


class CustomDataset(BaseModel):
"""Custom dataset with its definition and custom fields."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,50 @@ def test_datasets_to_ldm(mock_custom_dataset):
assert ds.workspace_data_filter_references[0].filter_id.id == "wdf1"
assert len(ldm.date_instances) == 1
assert ldm.date_instances[0].id == "date1"


def test_datasets_to_ldm_parent_dataset_references_composite():
"""Multi-column references via `parent_dataset_references` produce N sources."""
from gooddata_pipelines.ldm_extension.models.custom_data_object import (
CustomDatasetDefinition,
ParentDatasetReference,
)

definition = CustomDatasetDefinition(
workspace_id="workspace1",
dataset_id="ds_composite",
dataset_name="Composite Dataset",
dataset_source_table="table1",
dataset_datasource_id="ds_source",
dataset_source_sql=None,
parent_dataset_reference="parent_ds",
parent_dataset_references=[
ParentDatasetReference(
attribute_id="parent_pk1",
source_column="src_col1",
data_type=ColumnDataType.STRING,
),
ParentDatasetReference(
attribute_id="parent_pk2",
source_column="src_col2",
data_type=ColumnDataType.INT,
),
],
)
ds = CustomDataset(definition=definition, custom_fields=[])
processor = LdmExtensionDataProcessor()
model = processor.datasets_to_ldm({"ds_composite": ds})
parent_ref = model.ldm.datasets[0].references[0]
assert len(parent_ref.sources) == 2
assert [s.column for s in parent_ref.sources] == ["src_col1", "src_col2"]


def test_datasets_to_ldm_legacy_reference_fallback(mock_dataset_definition):
"""When `parent_dataset_references` is not set, fall back to legacy fields."""
mock_dataset_definition.parent_dataset_references = None
ds = CustomDataset(definition=mock_dataset_definition, custom_fields=[])
processor = LdmExtensionDataProcessor()
model = processor.datasets_to_ldm({"ds1": ds})
parent_ref = model.ldm.datasets[0].references[0]
assert len(parent_ref.sources) == 1
assert parent_ref.sources[0].column == "ref_col"
Loading
Loading