Skip to content

Commit

Permalink
Merge pull request #1833 from TEAMSchools/600-slack-alerts-for-integr…
Browse files Browse the repository at this point in the history
…ation-job-errors

600 slack alerts for integration job errors
  • Loading branch information
cbini authored Sep 23, 2024
2 parents 3896279 + 869de18 commit c3087b6
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 58 deletions.
8 changes: 8 additions & 0 deletions .k8s/1password/items.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,11 @@ metadata:
namespace: dagster-cloud
spec:
itemPath: vaults/Data Team/items/Overgrad API - Camden
---
apiVersion: onepassword.com/v1
kind: OnePasswordItem
metadata:
name: op-slack-api
namespace: dagster-cloud
spec:
itemPath: vaults/Data Team/items/Slack API - Teamster
35 changes: 31 additions & 4 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"scikit-learn>=1.4.0",
"tableauserverclient>=0.25",
"tenacity>=8.2.3",
"dagster-slack>=0.24.8",
]
requires-python = ">=3.12,<3.13"
license = { text = "GPL-3.0-or-later" }
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Please do not edit it manually.

agate==1.9.1
alembic==1.13.2
alembic==1.13.3
annotated-types==0.7.0
attrs==24.2.0
avro==1.12.0
Expand Down Expand Up @@ -31,6 +31,7 @@ dagster-gcp==0.24.8
dagster-k8s==0.24.8
dagster-pandas==0.24.8
dagster-pipes==1.8.8
dagster-slack==0.24.8
dagster-ssh==0.24.8
db-dtypes==1.3.0
dbt-adapters==1.7.0
Expand Down Expand Up @@ -137,6 +138,7 @@ scipy==1.14.1
setuptools==75.1.0
shellingham==1.5.4
six==1.16.0
slack-sdk==3.33.1
soupsieve==2.6
sqlalchemy==2.0.35
sqlglot[rs]==25.22.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ def members(context: AssetExecutionContext, google_directory: GoogleDirectoryRes
]

assets = [
groups,
members,
orgunits,
role_assignments,
roles,
users,
*google_directory_nonpartitioned_assets,
*google_directory_partitioned_assets,
]
4 changes: 2 additions & 2 deletions src/teamster/code_locations/kipptaf/_google/directory/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


@job(
name=f"{CODE_LOCATION}_google_directory_user_sync_job",
name=f"{CODE_LOCATION}__google__directory__user_sync_job",
config=RunConfig(
ops={
"bigquery_get_table_op": BigQueryGetTableOpConfig(
Expand All @@ -35,7 +35,7 @@ def google_directory_user_sync_job():


@job(
name=f"{CODE_LOCATION}_google_directory_role_assignments_sync_job",
name=f"{CODE_LOCATION}__google__directory__role_assignments_sync_job",
config=RunConfig(
ops={
"bigquery_get_table_op": BigQueryGetTableOpConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

google_directory_nonpartitioned_asset_schedule = ScheduleDefinition(
job=define_asset_job(
name=f"{CODE_LOCATION}_google_directory_nonpartitioned_asset_job",
name=f"{CODE_LOCATION}__google__directory__nonpartitioned_asset_job",
selection=[a.key for a in google_directory_nonpartitioned_assets],
),
cron_schedule="0 1 * * *",
Expand Down
5 changes: 5 additions & 0 deletions src/teamster/code_locations/kipptaf/dagster-cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -709,3 +709,8 @@ locations:
secretKeyRef:
name: op-overgrad-api-kippnewark
key: credential
- name: SLACK_TOKEN
valueFrom:
secretKeyRef:
name: op-slack-api
key: credential
2 changes: 2 additions & 0 deletions src/teamster/code_locations/kipptaf/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
BIGQUERY_RESOURCE,
GCS_RESOURCE,
OVERGRAD_RESOURCE,
SLACK_RESOURCE,
SSH_COUCHDROP,
get_dbt_cli_resource,
get_io_manager_gcs_avro,
Expand Down Expand Up @@ -110,6 +111,7 @@
"overgrad": OVERGRAD_RESOURCE,
"ps_enrollment": resources.POWERSCHOOL_ENROLLMENT_RESOURCE,
"schoolmint_grow": resources.SCHOOLMINT_GROW_RESOURCE,
"slack": SLACK_RESOURCE,
"smartrecruiters": resources.SMARTRECRUITERS_RESOURCE,
"ssh_adp_workforce_now": resources.SSH_RESOURCE_ADP_WORKFORCE_NOW,
"ssh_clever": resources.SSH_RESOURCE_CLEVER,
Expand Down
3 changes: 3 additions & 0 deletions src/teamster/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster import EnvVar
from dagster_dbt import DbtCliResource
from dagster_gcp import BigQueryResource, GCSResource
from dagster_slack import SlackResource

from teamster import GCS_PROJECT_NAME
from teamster.core.io_managers.gcs import GCSIOManager
Expand Down Expand Up @@ -28,6 +29,8 @@

OVERGRAD_RESOURCE = OvergradResource(api_key=EnvVar("OVERGRAD_API_KEY"), page_limit=100)

SLACK_RESOURCE = SlackResource(token=EnvVar("SLACK_TOKEN"))

SSH_COUCHDROP = SSHResource(
remote_host=EnvVar("COUCHDROP_SFTP_HOST"),
username=EnvVar("COUCHDROP_SFTP_USERNAME"),
Expand Down
7 changes: 2 additions & 5 deletions src/teamster/libraries/adp/workforce_now/api/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def setup_for_execution(self, context: InitResourceContext) -> None:
self._session.cert = (self.cert_filepath, self.key_filepath)

# authorize client
# trunk-ignore(bandit/B106)
token_dict = self._session.fetch_token(
token_url="https://accounts.adp.com/auth/oauth/v2/token",
auth=HTTPBasicAuth(username=self.client_id, password=self.client_secret),
Expand All @@ -45,12 +44,10 @@ def setup_for_execution(self, context: InitResourceContext) -> None:
@retry(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)
)
def _request(self, method, url, **kwargs):
response = Response()
def _request(self, method, url, **kwargs) -> Response:
response = self._session.request(method=method, url=url, **kwargs)

try:
response = self._session.request(method=method, url=url, **kwargs)

response.raise_for_status()
return response
except HTTPError as e:
Expand Down
28 changes: 4 additions & 24 deletions src/teamster/libraries/google/directory/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,7 @@ def batch_insert_users(self, users):
def callback(request_id, response, exception):
if exception is not None:
self._log.error(exception)
if exception.status_code == 403:
raise exception
elif exception.status_code == 409 and exception.reason not in [
"Entity already exists.",
"Invalid Given/Family Name: GivenName",
]:
raise exception
raise exception
else:
self._log.info(
msg=(
Expand Down Expand Up @@ -200,8 +194,7 @@ def batch_update_users(self, users):
def callback(request_id, response, exception):
if exception is not None:
self._log.error(exception)
if exception.status_code in [403, 409]:
raise exception
raise exception
else:
self._log.info(
msg=(
Expand Down Expand Up @@ -238,13 +231,7 @@ def batch_insert_members(self, members):
def callback(request_id, response, exception):
if exception is not None:
self._log.error(exception)
if exception.status_code == 403:
raise exception
elif (
exception.status_code == 409
and exception.reason != "Member already exists."
):
raise exception
raise exception

# Queries per minute per user == 2400 (40/sec)
batches = self._batch_list(list=members, size=40)
Expand Down Expand Up @@ -272,14 +259,7 @@ def batch_insert_role_assignments(self, role_assignments, customer=None):
def callback(request_id, response, exception):
if exception is not None:
self._log.error(exception)
if exception.status_code == 403:
raise exception
elif (
exception.status_code == 409
and exception.reason
!= "Role assignment already exists for the role"
):
raise exception
raise exception

batches = self._batch_list(list=role_assignments, size=10)

Expand Down
48 changes: 40 additions & 8 deletions src/teamster/libraries/schoolmint/grow/ops.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,47 @@
from dagster import OpExecutionContext, op
from dagster_slack import SlackResource

from teamster.libraries.schoolmint.grow.resources import SchoolMintGrowResource


@op
def schoolmint_grow_user_update_op(
context: OpExecutionContext, schoolmint_grow: SchoolMintGrowResource, users
context: OpExecutionContext,
schoolmint_grow: SchoolMintGrowResource,
slack: SlackResource,
users,
):
exceptions = []
slack_client = slack.get_client()

for u in users:
if u["surrogate_key_source"] == u["surrogate_key_destination"]:
continue

request_args = ["users"]

user_id = u["user_id"]
inactive = u["inactive"]
user_email = u["user_email"]

exception_str = [user_email]

# restore
if inactive == 0 and u["archived_at"] is not None:
try:
context.log.info(f"RESTORING\t{user_email}")
request_args.extend([user_id, "restore"])
exception_str.extend([*request_args, "PUT"])

schoolmint_grow.put(
"users",
user_id,
"restore",
params={"district": schoolmint_grow.district_id},
*request_args, params={"district": schoolmint_grow.district_id}
)
except Exception as e:
context.log.exception(e)
exception_str.append(e)

exceptions.append("\t".join(exception_str))

continue

# build user payload
Expand All @@ -49,21 +64,38 @@ def schoolmint_grow_user_update_op(
# create
if inactive == 0 and user_id is None:
context.log.info(f"CREATING\t{user_email}")
create_response = schoolmint_grow.post("users", json=payload)
exception_str.extend([*request_args, "POST"])

create_response = schoolmint_grow.post(*request_args, json=payload)

u["user_id"] = create_response["_id"]
# update
elif inactive == 0 and user_id is not None:
context.log.info(f"UPDATING\t{user_email}")
schoolmint_grow.put("users", user_id, json=payload)
request_args.append(user_id)
exception_str.extend([*request_args, "PUT"])

schoolmint_grow.put(*request_args, json=payload)
# archive
elif inactive == 1 and user_id is not None and u["archived_at"] is None:
context.log.info(f"ARCHIVING\t{user_email}")
schoolmint_grow.delete("users", user_id)
request_args.append(user_id)
exception_str.extend([*request_args, "DELETE"])

schoolmint_grow.delete(*request_args, "users", user_id)
except Exception as e:
context.log.exception(e)
exception_str.append(e)

exceptions.append("\t".join(exception_str))

continue

if exceptions:
slack_client.chat_postMessage(
channel="#dagster-alerts", text="\n".join(exceptions)
)

return users


Expand Down
Loading

0 comments on commit c3087b6

Please sign in to comment.