Skip to content

Commit

Permalink
limit max consecutive errors in sharepoint (#7870)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 386b098dd1e7a6c72ab32422484690938b357ebd
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Dec 16, 2024
1 parent cea3767 commit de221ce
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### Changed
- `pw.io.sharepoint.read` now explicitly terminates with an error if it fails to read the data the specified number of times per row (the default is `8`).

## [0.16.1] - 2024-12-12

### Changed
Expand Down
19 changes: 15 additions & 4 deletions python/pathway/xpacks/connectors/sharepoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import json
import logging
import sys
import time

import pathway as pw
Expand Down Expand Up @@ -154,6 +153,7 @@ def __init__(
with_metadata,
recursive,
object_size_limit,
max_failed_attempts_in_row,
):
_check_entitlements("xpack-sharepoint")
super().__init__()
Expand All @@ -165,6 +165,7 @@ def __init__(
self._recursive = recursive
self._object_size_limit = object_size_limit
self._stored_metadata = {}
self._max_failed_attempts_in_row = max_failed_attempts_in_row

@property
def _session_type(self) -> api.SessionType:
Expand All @@ -183,6 +184,7 @@ def _context(self):
return self._context_wrapper.context

def run(self) -> None:
n_failed_attempts_in_row = 0
while True:
try:
_url = urlparse(self._context_wrapper._url)
Expand All @@ -195,11 +197,15 @@ def run(self) -> None:
common_metadata={"base_url": f"{_url.scheme}://{_url.netloc}"},
)
diff = scanner.get_snapshot_diff()
n_failed_attempts_in_row = 0
except Exception as e:
print(
f"Failed to get snapshot diff: {e}. Retrying in {self._refresh_interval} seconds...",
file=sys.stderr,
n_failed_attempts_in_row += 1
if n_failed_attempts_in_row == self._max_failed_attempts_in_row:
raise
logging.error(
f"Failed to get snapshot diff: {e}. Retrying in {self._refresh_interval} seconds..."
)
time.sleep(self._refresh_interval)
continue

for deleted_path in diff.deleted_entries:
Expand Down Expand Up @@ -259,6 +265,7 @@ def read(
object_size_limit: int | None = None,
with_metadata: bool = False,
refresh_interval: int = 30,
max_failed_attempts_in_row: int | None = 8,
) -> Table:
"""Reads a table from a directory or a file in Microsoft SharePoint site.
Requires a valid Pathway Scale license key.
Expand Down Expand Up @@ -291,6 +298,9 @@ def read(
as UNIX timestamps;
refresh_interval: Time in seconds between scans. Applicable if mode is set to\
'streaming'.
max_failed_attempts_in_row: The maximum number of consecutive read errors before\
the connector terminates with an error. If set to ``None``, the connector tries to read\
data indefinitely, regardless of possible errors in the provided credentials.
Returns:
The table read.
Expand Down Expand Up @@ -360,6 +370,7 @@ def read(
with_metadata=with_metadata,
recursive=recursive,
object_size_limit=object_size_limit,
max_failed_attempts_in_row=max_failed_attempts_in_row,
)

return pw.io.python.read(subject, format="binary", name="sharepoint")

0 comments on commit de221ce

Please sign in to comment.