DeltaTable
DeltaTable
dataclass
DeltaTable(
table_uri: Union[str, Path],
version: Optional[int] = None,
storage_options: Optional[Dict[str, str]] = None,
without_files: bool = False,
log_buffer_size: Optional[int] = None,
)
Represents a Delta Table
Create the Delta Table from a path with an optional version.
Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
Depending on the storage backend used, you could provide options values using the storage_options parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_uri |
Union[str, Path]
|
the path of the DeltaTable |
required |
version |
Optional[int]
|
version of the DeltaTable |
None
|
storage_options |
Optional[Dict[str, str]]
|
a dictionary of the options to use for the storage backend |
None
|
without_files |
bool
|
If True, will load table without tracking files. Some append-only applications might have no need of tracking any files. So, the DeltaTable will be loaded with a significant memory reduction. |
False
|
log_buffer_size |
Optional[int]
|
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus. |
None
|
delete
delete(predicate: Optional[str] = None) -> Dict[str, Any]
Delete records from a Delta Table that statisfy a predicate.
When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files that contain records that satisfy the predicate. Once files are determined they are rewritten without the records.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate |
Optional[str]
|
a SQL where clause. If not passed, will delete all rows. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
the metrics from delete. |
file_uris
file_uris(
partition_filters: Optional[
List[Tuple[str, str, Any]]
] = None
) -> List[str]
Get the list of files as absolute URIs, including the scheme (e.g. "s3://").
Local files will be just plain absolute paths, without a scheme. (That is, no 'file://' prefix.)
Use the partition_filters parameter to retrieve a subset of files that match the given filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition_filters |
Optional[List[Tuple[str, str, Any]]]
|
the partition filters that will be used for getting the matched files |
None
|
Returns:
| Type | Description |
|---|---|
List[str]
|
list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable |
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: =, !=, in, and not in. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string '' for Null partition value.
Examples:
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])
files
files(
partition_filters: Optional[
List[Tuple[str, str, Any]]
] = None
) -> List[str]
Get the .parquet files of the DeltaTable.
The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition_filters |
Optional[List[Tuple[str, str, Any]]]
|
the partition filters that will be used for getting the matched files |
None
|
Returns:
| Type | Description |
|---|---|
List[str]
|
list of the .parquet files referenced for the current version of the DeltaTable |
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: =, !=, in, and not in. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string '' for Null partition value.
Examples:
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])
from_data_catalog
classmethod
from_data_catalog(
data_catalog: DataCatalog,
database_name: str,
table_name: str,
data_catalog_id: Optional[str] = None,
version: Optional[int] = None,
log_buffer_size: Optional[int] = None,
) -> DeltaTable
Create the Delta Table from a Data Catalog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_catalog |
DataCatalog
|
the Catalog to use for getting the storage location of the Delta Table |
required |
database_name |
str
|
the database name inside the Data Catalog |
required |
table_name |
str
|
the table name inside the Data Catalog |
required |
data_catalog_id |
Optional[str]
|
the identifier of the Data Catalog |
None
|
version |
Optional[int]
|
version of the DeltaTable |
None
|
log_buffer_size |
Optional[int]
|
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus. |
None
|
get_add_actions
get_add_actions(
flatten: bool = False,
) -> pyarrow.RecordBatch
Return a dataframe with all current add actions.
Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
flatten |
bool
|
whether to flatten the schema. Partition values columns are
given the prefix |
False
|
Returns:
| Type | Description |
|---|---|
RecordBatch
|
a PyArrow RecordBatch containing the add action data. |
Example:
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data, partition_by=["x"])
dt = DeltaTable("tmp")
dt.get_add_actions().to_pandas()
path size_bytes modification_time data_change partition_values num_records null_count min max
0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 2} 1 {'y': 0} {'y': 5} {'y': 5}
1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 3} 1 {'y': 0} {'y': 6} {'y': 6}
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 1} 1 {'y': 0} {'y': 4} {'y': 4}
dt.get_add_actions(flatten=True).to_pandas()
path size_bytes modification_time data_change partition.x num_records null_count.y min.y max.y
0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 2 1 0 5 5
1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 3 1 0 6 6
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4
history
history(
limit: Optional[int] = None,
) -> List[Dict[str, Any]]
Run the history command on the DeltaTable. The operations are returned in reverse chronological order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit |
Optional[int]
|
the commit info limit to return |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
list of the commit infos registered in the transaction log |
load_version
load_version(version: int) -> None
Load a DeltaTable with a specified version.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version |
int
|
the identifier of the version of the DeltaTable to load |
required |
load_with_datetime
load_with_datetime(datetime_string: str) -> None
Time travel Delta table to the latest version that's created at or before provided datetime_string argument.
The datetime_string argument should be an RFC 3339 and ISO 8601 date and time string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
datetime_string |
str
|
the identifier of the datetime point of the DeltaTable to load |
required |
Examples:
"2018-01-26T18:30:09Z"
"2018-12-19T16:39:57-08:00"
"2018-01-26T18:30:09.453+00:00"
merge
merge(
source: Union[
pyarrow.Table,
pyarrow.RecordBatch,
pyarrow.RecordBatchReader,
],
predicate: str,
source_alias: Optional[str] = None,
target_alias: Optional[str] = None,
error_on_type_mismatch: bool = True,
) -> TableMerger
Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not match the underlying table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source |
Table | RecordBatch | RecordBatchReader
|
source data |
required |
predicate |
str
|
SQL like predicate on how to merge |
required |
source_alias |
str
|
Alias for the source table |
None
|
target_alias |
str
|
Alias for the target table |
None
|
error_on_type_mismatch |
bool
|
specify if merge will return error if data types are mismatching :default = True |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
metadata
metadata() -> Metadata
Get the current metadata of the DeltaTable.
Returns:
| Type | Description |
|---|---|
Metadata
|
the current Metadata registered in the transaction log |
protocol
protocol() -> ProtocolVersions
Get the reader and writer protocol versions of the DeltaTable.
Returns:
| Type | Description |
|---|---|
ProtocolVersions
|
the current ProtocolVersions registered in the transaction log |
restore
restore(
target: Union[int, datetime, str],
*,
ignore_missing_files: bool = False,
protocol_downgrade_allowed: bool = False
) -> Dict[str, Any]
Run the Restore command on the Delta Table: restore table to a given version or datetime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target |
Union[int, datetime, str]
|
the expected version will restore, which represented by int, date str or datetime. |
required |
ignore_missing_files |
bool
|
whether the operation carry on when some data files missing. |
False
|
protocol_downgrade_allowed |
bool
|
whether the operation when protocol version upgraded. |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
the metrics from restore. |
schema
schema() -> Schema
Get the current schema of the DeltaTable.
Returns:
| Type | Description |
|---|---|
Schema
|
the current Schema registered in the transaction log |
to_pandas
to_pandas(
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
filesystem: Optional[
Union[str, pa_fs.FileSystem]
] = None,
filters: Optional[FilterType] = None,
) -> pandas.DataFrame
Build a pandas dataframe using data from the DeltaTable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partitions |
Optional[List[Tuple[str, str, Any]]]
|
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax |
None
|
columns |
Optional[List[str]]
|
The columns to project. This can be a list of column names to include (order and duplicates will be preserved) |
None
|
filesystem |
Optional[Union[str, FileSystem]]
|
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem |
None
|
filters |
Optional[FilterType]
|
A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass |
None
|
to_pyarrow_dataset
to_pyarrow_dataset(
partitions: Optional[List[Tuple[str, str, Any]]] = None,
filesystem: Optional[
Union[str, pa_fs.FileSystem]
] = None,
parquet_read_options: Optional[
ParquetReadOptions
] = None,
) -> pyarrow.dataset.Dataset
Build a PyArrow Dataset using data from the DeltaTable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partitions |
Optional[List[Tuple[str, str, Any]]]
|
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax |
None
|
filesystem |
Optional[Union[str, FileSystem]]
|
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem |
None
|
parquet_read_options |
Optional[ParquetReadOptions]
|
Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31 |
None
|
More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html
Returns:
| Type | Description |
|---|---|
Dataset
|
the PyArrow dataset in PyArrow |
to_pyarrow_table
to_pyarrow_table(
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
filesystem: Optional[
Union[str, pa_fs.FileSystem]
] = None,
filters: Optional[FilterType] = None,
) -> pyarrow.Table
Build a PyArrow Table using data from the DeltaTable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partitions |
Optional[List[Tuple[str, str, Any]]]
|
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax |
None
|
columns |
Optional[List[str]]
|
The columns to project. This can be a list of column names to include (order and duplicates will be preserved) |
None
|
filesystem |
Optional[Union[str, FileSystem]]
|
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem |
None
|
filters |
Optional[FilterType]
|
A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass |
None
|
update
update(
updates: Dict[str, str],
predicate: Optional[str] = None,
writer_properties: Optional[Dict[str, int]] = None,
error_on_type_mismatch: bool = True,
) -> Dict[str, Any]
UPDATE records in the Delta Table that matches an optional predicate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates |
Dict[str, str]
|
a mapping of column name to update SQL expression. |
required |
predicate |
Optional[str]
|
a logical expression, defaults to None |
None
|
writer_properties |
Optional[Dict[str, int]]
|
Pass writer properties to the Rust parquet writer, see options
https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html,
only the following fields are supported: |
None
|
error_on_type_mismatch |
bool
|
specify if merge will return error if data types are mismatching, default = True |
True
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
the metrics from delete |
Examples:
Update some row values with SQL predicate. This is equivalent to UPDATE table SET deleted = true WHERE id = '5'
from deltalake import DeltaTable
dt = DeltaTable("tmp")
dt.update(predicate="id = '5'", updates = {"deleted": True})
Update all row values. This is equivalent to UPDATE table SET id = concat(id, '_old').
from deltalake import DeltaTable
dt = DeltaTable("tmp")
dt.update(updates={"deleted": True, "id": "concat(id, '_old')"})
update_incremental
update_incremental() -> None
Updates the DeltaTable to the latest version by incrementally applying newer versions.
vacuum
vacuum(
retention_hours: Optional[int] = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
) -> List[str]
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retention_hours |
Optional[int]
|
the retention threshold in hours, if none then the value from |
None
|
dry_run |
bool
|
when activated, list only the files, delete otherwise |
True
|
enforce_retention_duration |
bool
|
when disabled, accepts retention hours smaller than the value from |
True
|
Returns:
| Type | Description |
|---|---|
List[str]
|
the list of files no longer referenced by the Delta Table and are older than the retention threshold. |
version
version() -> int
Get the version of the DeltaTable.
Returns:
| Type | Description |
|---|---|
int
|
The current version of the DeltaTable |
Metadata
dataclass
Metadata(table: RawDeltaTable)
Create a Metadata instance.
configuration
property
configuration: Dict[str, str]
Return the DeltaTable properties.
created_time
property
created_time: int
Return The time when this metadata action is created, in milliseconds since the Unix epoch of the DeltaTable.
description
property
description: str
Return the user-provided description of the DeltaTable.
id
property
id: int
Return the unique identifier of the DeltaTable.
name
property
name: str
Return the user-provided identifier of the DeltaTable.
partition_columns
property
partition_columns: List[str]
Return an array containing the names of the partitioned columns of the DeltaTable.
TableMerger
TableMerger(
table: DeltaTable,
source: pyarrow.RecordBatchReader,
predicate: str,
source_alias: Optional[str] = None,
target_alias: Optional[str] = None,
safe_cast: bool = True,
)
API for various table MERGE commands.
execute
execute() -> Dict[str, Any]
Executes MERGE with the previously provided settings in Rust with Apache Datafusion query engine.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: metrics |
when_matched_delete
when_matched_delete(
predicate: Optional[str] = None,
) -> TableMerger
Delete a matched row from the table only if the given predicate (if specified) is
true for the matched row. If not specified it deletes all matches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate |
(str | None, Optional)
|
SQL like predicate on when to delete. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Examples:
Delete on a predicate
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_matched_delete(predicate = "source.deleted = true") ... .execute()
Delete all records that were matched
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_matched_delete() ... .execute()
when_matched_update
when_matched_update(
updates: Dict[str, str], predicate: Optional[str] = None
) -> TableMerger
Update a matched table row based on the rules defined by updates.
If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates |
dict
|
a mapping of column name to update SQL expression. |
required |
predicate |
(str | None, Optional)
|
SQL like predicate on when to update. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Examples:
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_matched_update( ... updates = { ... "x": "source.x", ... "y": "source.y" ... } ... ).execute()
when_matched_update_all
when_matched_update_all(
predicate: Optional[str] = None,
) -> TableMerger
Updating all source fields to target fields, source and target are required to have the same field names.
If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate |
(str | None, Optional)
|
SQL like predicate on when to update all columns. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Examples:
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_matched_update_all().execute()
when_not_matched_by_source_delete
when_not_matched_by_source_delete(
predicate: Optional[str] = None,
) -> TableMerger
Delete a target row that has no matches in the source from the table only if the given
predicate (if specified) is true for the target row.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate |
(str | None, Optional)
|
SQL like predicate on when to delete when not matched by source. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
when_not_matched_by_source_update
when_not_matched_by_source_update(
updates: Dict[str, str], predicate: Optional[str] = None
) -> TableMerger
Update a target row that has no matches in the source based on the rules defined by updates.
If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates |
dict
|
a mapping of column name to update SQL expression. |
required |
predicate |
(str | None, Optional)
|
SQL like predicate on when to update. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_not_matched_by_source_update( ... predicate = "y > 3" ... updates = { ... "y": "0", ... } ... ).execute()
when_not_matched_insert
when_not_matched_insert(
updates: Dict[str, str], predicate: Optional[str] = None
) -> TableMerger
Insert a new row to the target table based on the rules defined by updates. If a
predicate is specified, then it must evaluate to true for the new row to be inserted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates |
dict
|
a mapping of column name to insert SQL expression. |
required |
predicate |
(str | None, Optional)
|
SQL like predicate on when to insert. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Examples:
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_not_matched_insert( ... updates = { ... "x": "source.x", ... "y": "source.y" ... } ... ).execute()
when_not_matched_insert_all
when_not_matched_insert_all(
predicate: Optional[str] = None,
) -> TableMerger
Insert a new row to the target table, updating all source fields to target fields. Source and target are
required to have the same field names. If a predicate is specified, then it must evaluate to true for
the new row to be inserted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate |
(str | None, Optional)
|
SQL like predicate on when to insert. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Examples:
from deltalake import DeltaTable import pyarrow as pa data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) dt = DeltaTable("tmp") dt.merge(source=data, predicate='target.x = source.x', source_alias='source', target_alias='target') ... .when_not_matched_insert_all().execute()
with_writer_properties
with_writer_properties(
data_page_size_limit: Optional[int] = None,
dictionary_page_size_limit: Optional[int] = None,
data_page_row_count_limit: Optional[int] = None,
write_batch_size: Optional[int] = None,
max_row_group_size: Optional[int] = None,
) -> TableMerger
Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_page_size_limit |
(int | None, Optional)
|
Limit DataPage size to this in bytes. Defaults to None. |
None
|
dictionary_page_size_limit |
(int | None, Optional)
|
Limit the size of each DataPage to store dicts to this amount in bytes. Defaults to None. |
None
|
data_page_row_count_limit |
(int | None, Optional)
|
Limit the number of rows in each DataPage. Defaults to None. |
None
|
write_batch_size |
(int | None, Optional)
|
Splits internally to smaller batch size. Defaults to None. |
None
|
max_row_group_size |
(int | None, Optional)
|
Max number of rows in row group. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
TableOptimizer
TableOptimizer(table: DeltaTable)
API for various table optimization commands.
compact
compact(
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[
Union[int, timedelta]
] = None,
) -> Dict[str, Any]
Compacts small files to reduce the total number of files in the table.
This operation is idempotent; if run twice on the same table (assuming it has not been updated) it will do nothing the second time.
If this operation happens concurrently with any operations other than append, it will fail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition_filters |
Optional[FilterType]
|
the partition filters that will be used for getting the matched files |
None
|
target_size |
Optional[int]
|
desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value |
None
|
max_concurrent_tasks |
Optional[int]
|
the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. |
None
|
min_commit_interval |
Optional[Union[int, timedelta]]
|
minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
the metrics from optimize |
Example:
# Use a timedelta object to specify the seconds, minutes or hours of the interval.
from deltalake import DeltaTable
from datetime import timedelta
dt = DeltaTable("tmp")
time_delta = timedelta(minutes=10)
dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta)
z_order
z_order(
columns: Iterable[str],
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
min_commit_interval: Optional[
Union[int, timedelta]
] = None,
) -> Dict[str, Any]
Reorders the data using a Z-order curve to improve data skipping.
This also performs compaction, so the same parameters as compact() apply.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
columns |
Iterable[str]
|
the columns to use for Z-ordering. There must be at least one column. partition_filters: the partition filters that will be used for getting the matched files |
required |
target_size |
Optional[int]
|
desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value |
None
|
max_concurrent_tasks |
Optional[int]
|
the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. |
None
|
max_spill_size |
int
|
the maximum number of bytes to spill to disk. Defaults to 20GB. |
20 * 1024 * 1024 * 1024
|
min_commit_interval |
Optional[Union[int, timedelta]]
|
minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
the metrics from optimize |
Example:
# Use a timedelta object to specify the seconds, minutes or hours of the interval.
from deltalake import DeltaTable
from datetime import timedelta
dt = DeltaTable("tmp")
time_delta = timedelta(minutes=10)
dt.optimize.compact(min_commit_interval=time_delta)
Writing Delta Tables
Write to a Delta Lake table
If the table does not already exist, it will be created.
This function only supports writer protocol version 2 currently. When attempting to write to an existing table with a higher min_writer_version, this function will throw DeltaProtocolError.
Note that this function does NOT register this table in a data catalog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri |
Union[str, Path, DeltaTable]
|
URI of a table or a DeltaTable object. |
required |
data |
Union[DataFrame, Table, RecordBatch, Iterable[RecordBatch], RecordBatchReader]
|
Data to write. If passing iterable, the schema must also be given. |
required |
schema |
Optional[Schema]
|
Optional schema to write. |
None
|
partition_by |
Optional[Union[List[str], str]]
|
List of columns to partition the table by. Only required when creating a new table. |
None
|
filesystem |
Optional[FileSystem]
|
Optional filesystem to pass to PyArrow. If not provided will be inferred from uri. The file system has to be rooted in the table root. Use the pyarrow.fs.SubTreeFileSystem, to adopt the root of pyarrow file systems. |
None
|
mode |
Literal['error', 'append', 'overwrite', 'ignore']
|
How to handle existing data. Default is to error if table already exists. If 'append', will add new data. If 'overwrite', will replace table with new data. If 'ignore', will not write anything if table already exists. |
'error'
|
file_options |
Optional[ParquetFileWriteOptions]
|
Optional write options for Parquet (ParquetFileWriteOptions). Can be provided with defaults using ParquetFileWriteOptions().make_write_options(). Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533 for the list of available options |
None
|
max_partitions |
Optional[int]
|
the maximum number of partitions that will be used. |
None
|
max_open_files |
int
|
Limits the maximum number of files that can be left open while writing. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your data into many small files. |
1024
|
max_rows_per_file |
int
|
Maximum number of rows per file. If greater than 0 then this will limit how many rows are placed in any single file. Otherwise there will be no limit and one file will be created in each output directory unless files need to be closed to respect max_open_files min_rows_per_group: Minimum number of rows per group. When the value is set, the dataset writer will batch incoming data and only write the row groups to the disk when sufficient rows have accumulated. |
10 * 1024 * 1024
|
max_rows_per_group |
int
|
Maximum number of rows per group. If the value is set, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set. |
128 * 1024
|
name |
Optional[str]
|
User-provided identifier for this table. |
None
|
description |
Optional[str]
|
User-provided description for this table. |
None
|
configuration |
Optional[Mapping[str, Optional[str]]]
|
A map containing configuration options for the metadata action. |
None
|
overwrite_schema |
bool
|
If True, allows updating the schema of the table. |
False
|
storage_options |
Optional[Dict[str, str]]
|
options passed to the native delta filesystem. Unused if 'filesystem' is defined. |
None
|
partition_filters |
Optional[List[Tuple[str, str, Any]]]
|
the partition filters that will be used for partition overwrite. |
None
|
large_dtypes |
bool
|
If True, the table schema is checked against large_dtypes |
False
|