Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
6 changes: 1 addition & 5 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,13 @@ jobs:
- name: Install dependencies
run: |
uv sync --extra files --group test-spark-3.5 --group test-pydantic-2 --group dev
uv pip install -U flake8-commas

# Set the `CODEQL-PYTHON` environment variable to the Python executable
# that includes the dependencies
echo "CODEQL_PYTHON=$(which python)" >> $GITHUB_ENV

- name: Run flake8
run: python3 -m flake8 --config setup.cfg .

- name: Run mypy
run: python3 -m mypy --config-file setup.cfg onetl
run: python3 -m mypy onetl

codeql:
name: CodeQL
Expand Down
16 changes: 5 additions & 11 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,19 @@ repos:
hooks:
- id: ruff-format
priority: 8
- id: ruff
args: [--fix]
priority: 9

- repo: local
hooks:
- id: flake8
name: flake8
entry: python3 -m flake8
language: system
types: [python]
files: ^(onetl|tests)/.*$
pass_filenames: true
priority: 9
- id: mypy
name: mypy
entry: python3 -m mypy --config-file setup.cfg onetl
entry: python3 -m mypy onetl
language: system
types: [python]
pass_filenames: false
priority: 9
priority: 10
- id: towncrier
name: towncrier
entry: towncrier build --draft
Expand All @@ -153,6 +148,5 @@ ci:
skip:
- chmod # failing in pre-commit.ci
- docker-compose-check # cannot run on pre-commit.ci
- flake8 # checked with Github Actions
- mypy # checked with Github Actions
- towncrier # checked with Github Actions
2 changes: 2 additions & 0 deletions onetl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from onetl.plugins import import_plugins
from onetl.version import __version__

__all__ = ["__version__"]


def plugins_auto_import():
"""
Expand Down
2 changes: 1 addition & 1 deletion onetl/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
__all__ = [
"SparkCommandMetrics",
"SparkDriverMetrics",
"SparkMetricsRecorder",
"SparkExecutorMetrics",
"SparkInputMetrics",
"SparkMetricsRecorder",
"SparkOutputMetrics",
]
4 changes: 2 additions & 2 deletions onetl/_metrics/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
def _get_int(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None:
try:
return int(data[key][0])
except Exception:
except (IndexError, KeyError, ValueError, TypeError):
return None


Expand All @@ -38,7 +38,7 @@ def _get_bytes(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | No
raw_value = data[key][0]
normalized_value = NON_BYTE_SIZE.sub("", raw_value)
return int(ByteSize.validate(normalized_value))
except Exception:
except (IndexError, KeyError, ValueError, TypeError):
return None


Expand Down
16 changes: 8 additions & 8 deletions onetl/_metrics/listener/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
)

__all__ = [
"SparkListenerTask",
"SparkListenerTaskStatus",
"SparkListenerTaskMetrics",
"SparkListenerStage",
"SparkListenerStageStatus",
"SparkListenerJob",
"SparkListenerJobStatus",
"SparkListenerExecution",
"SparkListenerExecutionStatus",
"SparkSQLMetricNames",
"SparkListenerJob",
"SparkListenerJobStatus",
"SparkListenerStage",
"SparkListenerStageStatus",
"SparkListenerTask",
"SparkListenerTaskMetrics",
"SparkListenerTaskStatus",
"SparkMetricsListener",
"SparkSQLMetricNames",
]
87 changes: 44 additions & 43 deletions onetl/_metrics/listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@ class BaseSparkListener:
"""Base no-op SparkListener implementation.

See `SparkListener <https://spark.apache.org/docs/3.5.7/api/java/org/apache/spark/scheduler/SparkListener.html>`_ interface.
"""
""" # noqa: E501

spark: SparkSession

def activate(self):
start_callback_server(self.spark)

# passing python listener object directly to addSparkListener or removeSparkListener leads to creating new java object each time.
# passing python listener object directly to addSparkListener or removeSparkListener
# leads to creating new java object each time.
# But removeSparkListener call has effect only on the same Java object passed to removeSparkListener.
# So we need to explicitly create Java object, and then pass it both calls.
gateway = get_java_gateway(self.spark)
java_list = gateway.jvm.java.util.ArrayList()
java_list.append(self)
self._java_listener = java_list[0]

spark_context = self.spark.sparkContext._jsc.sc() # noqa: WPS437
spark_context = self.spark.sparkContext._jsc.sc() # noqa: SLF001
spark_context.addSparkListener(self._java_listener)

def deactivate(self):
with suppress(Exception):
spark_context = self.spark.sparkContext._jsc.sc() # noqa: WPS437
spark_context = self.spark.sparkContext._jsc.sc() # noqa: SLF001
spark_context.removeSparkListener(self._java_listener)

with suppress(Exception):
Expand All @@ -50,7 +51,7 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.deactivate()

def __del__(self): # noqa: WPS603
def __del__(self):
# If current object is collected by GC, deactivate listener
# and free bind Java object
self.deactivate()
Expand All @@ -60,119 +61,119 @@ def equals(self, other):
# so we compare string representation which should contain some form of id
return other.toString() == self._java_listener.toString()

def toString(self):
def toString(self): # noqa: N802
return type(self).__qualname__ + "@" + hex(id(self))

def hashCode(self):
def hashCode(self): # noqa: N802
return hash(self)

# no cover: start
# method names are important for Java interface compatibility!
def onApplicationEnd(self, application):
def onApplicationEnd(self, application): # noqa: N802
pass

def onApplicationStart(self, application):
def onApplicationStart(self, application): # noqa: N802
pass

def onBlockManagerAdded(self, block_manager):
def onBlockManagerAdded(self, block_manager): # noqa: N802
pass

def onBlockManagerRemoved(self, block_manager):
def onBlockManagerRemoved(self, block_manager): # noqa: N802
pass

def onBlockUpdated(self, block):
def onBlockUpdated(self, block): # noqa: N802
pass

def onEnvironmentUpdate(self, environment):
def onEnvironmentUpdate(self, environment): # noqa: N802
pass

def onExecutorAdded(self, executor):
def onExecutorAdded(self, executor): # noqa: N802
pass

def onExecutorMetricsUpdate(self, executor):
def onExecutorMetricsUpdate(self, executor): # noqa: N802
pass

def onExecutorRemoved(self, executor):
def onExecutorRemoved(self, executor): # noqa: N802
pass

def onExecutorBlacklisted(self, event):
def onExecutorBlacklisted(self, event): # noqa: N802
pass

def onExecutorBlacklistedForStage(self, event):
def onExecutorBlacklistedForStage(self, event): # noqa: N802
pass

def onExecutorExcluded(self, event):
def onExecutorExcluded(self, event): # noqa: N802
pass

def onExecutorExcludedForStage(self, event):
def onExecutorExcludedForStage(self, event): # noqa: N802
pass

def onExecutorUnblacklisted(self, event):
def onExecutorUnblacklisted(self, event): # noqa: N802
pass

def onExecutorUnexcluded(self, event):
def onExecutorUnexcluded(self, event): # noqa: N802
pass

def onJobStart(self, event):
def onJobStart(self, event): # noqa: N802
pass

def onJobEnd(self, event):
def onJobEnd(self, event): # noqa: N802
pass

def onNodeBlacklisted(self, node):
def onNodeBlacklisted(self, node): # noqa: N802
pass

def onNodeBlacklistedForStage(self, stage):
def onNodeBlacklistedForStage(self, stage): # noqa: N802
pass

def onNodeExcluded(self, node):
def onNodeExcluded(self, node): # noqa: N802
pass

def onNodeExcludedForStage(self, node):
def onNodeExcludedForStage(self, node): # noqa: N802
pass

def onNodeUnblacklisted(self, node):
def onNodeUnblacklisted(self, node): # noqa: N802
pass

def onNodeUnexcluded(self, node):
def onNodeUnexcluded(self, node): # noqa: N802
pass

def onOtherEvent(self, event):
def onOtherEvent(self, event): # noqa: N802
pass

def onResourceProfileAdded(self, resource_profile):
def onResourceProfileAdded(self, resource_profile): # noqa: N802
pass

def onSpeculativeTaskSubmitted(self, task):
def onSpeculativeTaskSubmitted(self, task): # noqa: N802
pass

def onStageCompleted(self, event):
def onStageCompleted(self, event): # noqa: N802
pass

def onStageExecutorMetrics(self, metrics):
def onStageExecutorMetrics(self, metrics): # noqa: N802
pass

def onStageSubmitted(self, event):
def onStageSubmitted(self, event): # noqa: N802
pass

def onTaskEnd(self, event):
def onTaskEnd(self, event): # noqa: N802
pass

def onTaskGettingResult(self, task):
def onTaskGettingResult(self, task): # noqa: N802
pass

def onTaskStart(self, event):
def onTaskStart(self, event): # noqa: N802
pass

def onUnpersistRDD(self, rdd):
def onUnpersistRDD(self, rdd): # noqa: N802
pass

def onUnschedulableTaskSetAdded(self, task_set):
def onUnschedulableTaskSetAdded(self, task_set): # noqa: N802
pass

def onUnschedulableTaskSetRemoved(self, task_set):
def onUnschedulableTaskSetRemoved(self, task_set): # noqa: N802
pass

# no cover: stop
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
implements = ["org.apache.spark.scheduler.SparkListenerInterface"] # noqa: RUF012
7 changes: 2 additions & 5 deletions onetl/_metrics/listener/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __str__(self):
return self.value


class SparkSQLMetricNames(str, Enum): # noqa: WPS338
class SparkSQLMetricNames(str, Enum):
# Metric names passed to SQLMetrics.createMetric(...)
# But only those we're interested in.

Expand Down Expand Up @@ -60,10 +60,7 @@ class SparkListenerExecution:

@property
def jobs(self) -> list[SparkListenerJob]:
result = []
for job_id in sorted(self._jobs.keys()):
result.append(self._jobs[job_id])
return result
return [self._jobs[job_id] for job_id in sorted(self._jobs.keys())]

def on_execution_start(self, event):
# https://github.com/apache/spark/blob/v3.5.7/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58
Expand Down
7 changes: 2 additions & 5 deletions onetl/_metrics/listener/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ class SparkListenerJob:

@property
def stages(self) -> list[SparkListenerStage]:
result = []
for stage_id in sorted(self._stages.keys()):
result.append(self._stages[stage_id])
return result
return [self._stages[stage_id] for stage_id in sorted(self._stages.keys())]

@classmethod
def create(cls, event):
Expand All @@ -50,7 +47,7 @@ def create(cls, event):
stage_ids = scala_seq_to_python_list(event.stageIds())
stage_infos = scala_seq_to_python_list(event.stageInfos())
for stage_id, stage_info in zip(stage_ids, stage_infos):
result._stages[stage_id] = SparkListenerStage.create(stage_info) # noqa: WPS437
result._stages[stage_id] = SparkListenerStage.create(stage_info)

return result

Expand Down
Loading
Loading