Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/file_df/file_formats/excel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Excel
.. currentmodule:: onetl.file.format.excel

.. autoclass:: Excel
:members: get_packages
:members: get_packages,header,dataAddress,treatEmptyValuesAsNulls,setErrorCellsToFallbackValues,usePlainNumberFormat,inferSchema,timestampFormat,maxRowsInMemory,maxByteArraySize,tempFileThreshold,excerptSize,workbookPassword
:member-order: bysource
11 changes: 11 additions & 0 deletions onetl/file/format/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@
from onetl.file.format.orc import ORC
from onetl.file.format.parquet import Parquet
from onetl.file.format.xml import XML

__all__ = [
"Avro",
"CSV",
"Excel",
"JSON",
"JSONLine",
"ORC",
"Parquet",
"XML",
]
234 changes: 185 additions & 49 deletions onetl/file/format/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, Optional

try:
from pydantic.v1 import ByteSize, SecretStr
except (ImportError, AttributeError):
from pydantic import ByteSize, SecretStr # type: ignore[no-redef, assignment]

Check warning on line 11 in onetl/file/format/excel.py

View check run for this annotation

Codecov / codecov/patch

onetl/file/format/excel.py#L10-L11

Added lines #L10 - L11 were not covered by tests

from onetl._util.java import try_import_java_class
from onetl._util.scala import get_default_scala_version
Expand All @@ -14,32 +19,7 @@
from onetl.hooks import slot, support_hooks

if TYPE_CHECKING:
from pyspark.sql import SparkSession

READ_OPTIONS = frozenset(
(
"dataAddress",
"treatEmptyValuesAsNulls",
"setErrorCellsToFallbackValues",
"usePlainNumberFormat",
"inferSchema",
"addColorColumns",
"timestampFormat",
"maxRowsInMemory",
"maxByteArraySize",
"tempFileThreshold",
"excerptSize",
"workbookPassword",
),
)

WRITE_OPTIONS = frozenset(
(
"dataAddress",
"dateFormat",
"timestampFormat",
),
)
from pyspark.sql import DataFrameReader, SparkSession

Check warning on line 22 in onetl/file/format/excel.py

View check run for this annotation

Codecov / codecov/patch

onetl/file/format/excel.py#L22

Added line #L22 was not covered by tests

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,46 +47,190 @@

See documentation from link above.

.. versionadded:: 0.9.4

Examples
--------

.. note ::

You can pass any option to the constructor, even if it is not mentioned in this documentation.
**Option names should be in** ``camelCase``!
You can pass any option mentioned in `official documentation <https://github.com/crealytics/spark-excel>`_.

The set of supported options depends on Spark version. See link above.
The set of supported options depends on ``spark-excel`` package version.

.. versionadded:: 0.9.4
.. tabs::

Examples
--------
.. code-tab:: py Read files

Describe options how to read from/write to Excel file with specific options:
from onetl.file.format import Excel
from pyspark.sql import SparkSession

# Create Spark session with Excel package loaded
maven_packages = Excel.get_packages(spark_version="3.5.1")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)

# Read file /some/file.xlsx from local file system
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader

excel = Excel(header=True, inferSchema=True)

reader = FileDFReader(
connection=SparkLocalFS(spark=spark),
format=excel,
)
df = reader.run(["/some/file.xlsx"])

.. code:: python
.. code-tab:: py Write files

from onetl.file.format import Excel
from pyspark.sql import SparkSession
# Create Spark session with Excel package loaded
spark = ...
# Defined DataFrame
df = ...

# Create Spark session with Excel package loaded
maven_packages = Excel.get_packages(spark_version="3.5.1")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# Write DataFrame as .xlsx files at /some/folder on local file system
from onetl.connection import SparkLocalFS
from onetl.file import FileDFWriter
from onetl.file.format import XML

excel = Excel(
header=True,
inferSchema=True,
)
excel = Excel(header=True, dataAddress="'Sheet1'!A1")

writer = FileDFWriter(
connection=SparkLocalFS(spark=spark),
format=excel,
target_path="/some/folder",
)
writer.run(df)
"""

name: ClassVar[str] = "excel"

header: bool = False
"""
If ``True``, the first row in file is conditioned as a header.
Default ``False``.
"""

dataAddress: Optional[str] = None
"""
Cell address used as starting point.
For example: ``'A1'`` or ``'Sheet1'!A1``
"""

timestampFormat: Optional[str] = None
"""
Format string used for parsing or serializing timestamp values.
Default ``yyyy-mm-dd hh:mm:ss[.fffffffff]``.
"""

dateFormat: Optional[str] = None
"""
Format string used for parsing or serializing date values.
Default ``yyyy-MM-dd``.
"""

treatEmptyValuesAsNulls: Optional[bool] = None
"""
If ``True``, empty cells are parsed as ``null`` values.
If ``False``, empty cells are parsed as empty strings.
Default ``True``.

.. note::

Used only for reading files.
"""

setErrorCellsToFallbackValues: Optional[bool] = None
"""
If ``True``, cells containing ``#N/A`` value are replaced with default value for column type,
e.g. 0 for ``IntegerType()``. If ``False``, ``#N/A`` values are replaced with ``null``.
Default ``False``.

.. note::

Used only for reading files.
"""

usePlainNumberFormat: Optional[bool] = None
"""
If ``True``, read or write numeric values with plain format, without using scientific notation or rounding.
Default ``False``.
"""

inferSchema: Optional[bool] = None
"""
If ``True``, infer DataFrame schema based on cell content.
If ``False`` and no explicit DataFrame schema is passed, all columns are ``StringType()``.

.. note::

Used only for reading files.
"""

workbookPassword: Optional[SecretStr] = None
"""
If Excel file is encrypted, provide password to open it.

.. note::

Used only for reading files. Cannot be used to write files.
"""

maxRowsInMemory: Optional[int] = None
"""
If set, use streaming reader and fetch only specified number of rows per iteration.
This reduces memory usage for large files.
Default ``None``, which means reading the entire file content to memory.

.. warning::

Can be used only with ``.xlsx`` files, but fails on ``.xls``.

.. note::

Used only for reading files.
"""

maxByteArraySize: Optional[ByteSize] = None
"""
If set, overrides memory limit (in bytes) of byte array size used for reading rows from input file.
Default ``0``, which means using default limit.

See `IOUtils.setByteArrayMaxOverride <https://poi.apache.org/apidocs/5.0/org/apache/poi/util/IOUtils.html#setByteArrayMaxOverride-int->`_
documentation.

.. note::

Used only for reading files.
"""

tempFileThreshold: Optional[ByteSize] = None
"""
If value is greater than 0, large zip entries will be written to temporary files after reaching this threshold.
If value is 0, all zip entries will be written to temporary files.
If value is -1, no temp files will be created, which may cause errors if zip entry is larger than 2GiB.

.. note::

Used only for reading files.
"""

excerptSize: Optional[int] = None
"""
If ``inferSchema=True``, set number of rows to infer schema from.
Default ``10``.

.. note::

Used only for reading files.
"""

class Config:
known_options = READ_OPTIONS | WRITE_OPTIONS
known_options = frozenset()
extra = "allow"

@slot
Expand Down Expand Up @@ -209,3 +333,15 @@
if log.isEnabledFor(logging.DEBUG):
log.debug("Missing Java class", exc_info=e, stack_info=True)
raise ValueError(msg) from e

@slot
def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader:
options = self.dict(by_alias=True, exclude_none=True)
if self.workbookPassword:
options["workbookPassword"] = self.workbookPassword.get_secret_value()
return reader.format(self.name).options(**options)

def __repr__(self):
options_dict = self.dict(by_alias=True, exclude_none=True)
options_kwargs = ", ".join(f"{k}={v!r}" for k, v in options_dict.items())
return f"{self.__class__.__name__}({options_kwargs})"
4 changes: 2 additions & 2 deletions onetl/file/format/file_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class Config:

@slot
def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader:
options = self.dict(by_alias=True)
options = self.dict(by_alias=True, exclude_none=True)
return reader.format(self.name).options(**options)

@slot
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
options = self.dict(by_alias=True)
options = self.dict(by_alias=True, exclude_none=True)
return writer.format(self.name).options(**options)
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ per-file-ignores =
F401,
# WPS442 Found outer scope names shadowing: KerberosClient
WPS442,
onetl/file/format/*.py:
# N815 variable 'rootTag' in class scope should not be mixedCase
N815,
onetl/hooks/slot.py:
# WPS210 Found too many local variables
WPS210,
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ def test_excel_reader_with_infer_schema(
("without_header", {}),
("with_header", {"header": True}),
("with_data_address", {"dataAddress": "'ABC'!K6"}),
("with_encryption", {"workbookPassword": "1234"}),
],
ids=["without_header", "with_header", "with_data_address"],
ids=["without_header", "with_header", "with_data_address", "with_encryption"],
)
def test_excel_reader_with_options(
spark,
Expand Down
Loading
Loading