dlt.destinations.impl.filesystem.iceberg_adapter
PartitionSpec Objects
@dataclass(frozen=True)
class PartitionSpec()
get_transform
def get_transform() -> Transform[S, Any]
Get the PyIceberg Transform object for this partition.
Returns:
A PyIceberg Transform object
Raises:
ValueError- If the transform is not recognized
iceberg_partition Objects
class iceberg_partition()
Helper class with factory methods for creating partition specs.
identity
@staticmethod
def identity(column_name: str) -> PartitionSpec
Create an identity partition on a column.
Arguments:
column_name- The name of the column to partition on
Returns:
A PartitionSpec for identity partitioning
year
@staticmethod
def year(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a year partition on a timestamp/date column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for year partitioning
month
@staticmethod
def month(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a month partition on a timestamp/date column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for month partitioning
day
@staticmethod
def day(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a day partition on a timestamp/date column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for day partitioning
hour
@staticmethod
def hour(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create an hour partition on a timestamp column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for hour partitioning
bucket
@staticmethod
def bucket(num_buckets: int,
column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a bucket partition on a column.
Arguments:
num_buckets- The number of buckets to createcolumn_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for bucket partitioning
truncate
@staticmethod
def truncate(width: int,
column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a truncate partition on a string column.
Arguments:
width- The width to truncate tocolumn_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for truncate partitioning
iceberg_adapter
def iceberg_adapter(
data: Any,
partition: Union[str, PartitionSpec, Sequence[Union[str,
PartitionSpec]]] = None
) -> DltResource
Prepares data or a DltResource for loading into Apache Iceberg table.
Takes raw data or an existing DltResource and configures it for Iceberg, primarily by defining partitioning strategies via the DltResource's hints.
Arguments:
data- The data to be transformed. This can be raw data (e.g., list of dicts) or an instance ofDltResource. If raw data is provided, it will be encapsulated into aDltResourceinstance.partition- Defines how the Iceberg table should be partitioned. Must be provided. It accepts:- A single column name (string): Defaults to an identity transform.
- A
PartitionSpecobject: Allows for detailed partition configuration, including transformation types (year, month, day, hour, bucket, truncate). Use theiceberg_partitionhelper class to create these specs. - A sequence of the above: To define multiple partition columns.
Returns:
A DltResource instance configured with Iceberg-specific partitioning hints,
ready for loading.
Raises:
ValueError- Ifpartitionis not specified or if an invalid partition transform is requested within aPartitionSpec.
Examples:
data = [{"id": 1, "event_time": "2023-03-15T10:00:00Z", "category": "A"}]
resource = iceberg_adapter(
... data, ... partition=[ ... "category", # Identity partition on category ... iceberg_partition.year("event_time"), ... ] ... )
# The resource's hints now contain the Iceberg partition specs:
# resource.compute_table_schema().get('x-iceberg-partition')
# [
# {'transform': 'identity', 'source_column': 'event_time'},
# {'transform': 'year', 'source_column': 'event_time'},
# ]
#
# Or in case of using an existing DltResource
@dlt.resource
... def my_data(): ... yield [{"value": "abc"}]
iceberg_adapter(my_data, partition="value")
parse_partition_hints
def parse_partition_hints(
table_schema: PreparedTableSchema) -> List[PartitionSpec]
Parse PARTITION_HINT from table schema into PartitionSpec list.
Arguments:
table_schema- dlt table schema containing partition hints
Returns:
List of PartitionSpec objects from hints, empty list if no hints found
create_identity_specs
def create_identity_specs(column_names: List[str]) -> List[PartitionSpec]
Create identity partition specs from column names.
Arguments:
column_names- List of column names to partition by identity
Returns:
List of PartitionSpec objects with identity transform
build_iceberg_partition_spec
def build_iceberg_partition_spec(
arrow_schema: pa.Schema, spec_list: Sequence[PartitionSpec]
) -> tuple[IcebergPartitionSpec, IcebergSchema]
Turn a dlt PartitionSpec list into a PyIceberg PartitionSpec. Returns the PartitionSpec and the IcebergSchema derived from the Arrow schema.