Skip to content

Commit

Permalink
🚨 Low code CDK: Add session token authenticator (#28050)
Browse files Browse the repository at this point in the history
This PR adds a new authenticator: The SessionTokenAuthenticator. The existing authenticator under the same name is renamed to LegacySessionTokenAuthenticator.
  • Loading branch information
Joe Reuter authored Jul 19, 2023
1 parent a83774a commit 58cc540
Show file tree
Hide file tree
Showing 14 changed files with 1,329 additions and 164 deletions.
45 changes: 21 additions & 24 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import base64
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
from typing import Any, Mapping, Union

import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.auth.token_provider import TokenProvider
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.types import Config
Expand All @@ -29,20 +30,19 @@ class ApiKeyAuthenticator(DeclarativeAuthenticator):
`"Authorization": "Bearer hello"`
Attributes:
header (Union[InterpolatedString, str]): Header key to set on the HTTP requests
api_token (Union[InterpolatedString, str]): Header value to set on the HTTP requests
request_option (RequestOption): request option how to inject the token into the request
token_provider (TokenProvider): Provider of the token
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
"""

request_option: RequestOption
api_token: Union[InterpolatedString, str]
token_provider: TokenProvider
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_name = InterpolatedString.create(self.request_option.field_name, parameters=parameters)
self._token = InterpolatedString.create(self.api_token, parameters=parameters)

@property
def auth_header(self) -> str:
Expand All @@ -51,21 +51,21 @@ def auth_header(self) -> str:

@property
def token(self) -> str:
return self._token.eval(self.config)
return self.token_provider.get_token()

def _get_request_options(self, option_type: RequestOptionType):
def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self.request_option.inject_into == option_type:
options[self._field_name.eval(self.config)] = self.token
return options

def get_request_params(self) -> Optional[Mapping[str, Any]]:
def get_request_params(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter)

def get_request_body_data(self) -> Optional[Union[Mapping[str, Any], str]]:
def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
return self._get_request_options(RequestOptionType.body_data)

def get_request_body_json(self) -> Optional[Mapping[str, Any]]:
def get_request_body_json(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)


Expand All @@ -78,25 +78,22 @@ class BearerAuthenticator(DeclarativeAuthenticator):
`"Authorization": "Bearer <token>"`
Attributes:
api_token (Union[InterpolatedString, str]): The bearer token
token_provider (TokenProvider): Provider of the token
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
"""

api_token: Union[InterpolatedString, str]
token_provider: TokenProvider
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]):
self._token = InterpolatedString.create(self.api_token, parameters=parameters)

@property
def auth_header(self) -> str:
return "Authorization"

@property
def token(self) -> str:
return f"Bearer {self._token.eval(self.config)}"
return f"Bearer {self.token_provider.get_token()}"


@dataclass
Expand All @@ -120,7 +117,7 @@ class BasicHttpAuthenticator(DeclarativeAuthenticator):
parameters: InitVar[Mapping[str, Any]]
password: Union[InterpolatedString, str] = ""

def __post_init__(self, parameters):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._username = InterpolatedString.create(self.username, parameters=parameters)
self._password = InterpolatedString.create(self.password, parameters=parameters)

Expand All @@ -143,7 +140,7 @@ def token(self) -> str:
i.e. by adding another item the cache would exceed its maximum size, the cache must choose which item(s) to discard
ttl=86400 means that cached token will live for 86400 seconds (one day)
"""
cacheSessionTokenAuthenticator = TTLCache(maxsize=1000, ttl=86400)
cacheSessionTokenAuthenticator: TTLCache[str, str] = TTLCache(maxsize=1000, ttl=86400)


@cached(cacheSessionTokenAuthenticator)
Expand All @@ -168,11 +165,11 @@ def get_new_session_token(api_url: str, username: str, password: str, response_k
response.raise_for_status()
if not response.ok:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
return response.json()[response_key]
return str(response.json()[response_key])


@dataclass
class SessionTokenAuthenticator(DeclarativeAuthenticator):
class LegacySessionTokenAuthenticator(DeclarativeAuthenticator):
"""
Builds auth based on session tokens.
A session token is a random value generated by a server to identify
Expand Down Expand Up @@ -205,7 +202,7 @@ class SessionTokenAuthenticator(DeclarativeAuthenticator):
validate_session_url: Union[InterpolatedString, str]
password: Union[InterpolatedString, str] = ""

def __post_init__(self, parameters):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._username = InterpolatedString.create(self.username, parameters=parameters)
self._password = InterpolatedString.create(self.password, parameters=parameters)
self._api_url = InterpolatedString.create(self.api_url, parameters=parameters)
Expand All @@ -219,13 +216,13 @@ def __post_init__(self, parameters):

@property
def auth_header(self) -> str:
return self._header.eval(self.config)
return str(self._header.eval(self.config))

@property
def token(self) -> str:
if self._session_token.eval(self.config):
if self.is_valid_session_token():
return self._session_token.eval(self.config)
return str(self._session_token.eval(self.config))
if self._password.eval(self.config) and self._username.eval(self.config):
username = self._username.eval(self.config)
password = self._password.eval(self.config)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import datetime
from abc import abstractmethod
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional, Union

import dpath.util
import pendulum
import requests
from airbyte_cdk.models import Level
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.http_logger import format_http_message
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
from isodate import Duration
from pendulum import DateTime


class TokenProvider:
@abstractmethod
def get_token(self) -> str:
pass


@dataclass
class SessionTokenProvider(TokenProvider):
login_requester: Requester
session_token_path: List[str]
expiration_duration: Optional[Union[datetime.timedelta, Duration]]
parameters: InitVar[Mapping[str, Any]]
message_repository: MessageRepository = NoopMessageRepository()

_decoder: Decoder = JsonDecoder(parameters={})
_next_expiration_time: Optional[DateTime] = None
_token: Optional[str] = None

def get_token(self) -> str:
self._refresh_if_necessary()
if self._token is None:
raise ReadException("Failed to get session token, token is None")
return self._token

def _refresh_if_necessary(self) -> None:
if self._next_expiration_time is None or self._next_expiration_time < pendulum.now():
self._refresh()

def _refresh(self) -> None:
response = self.login_requester.send_request()
if response is None:
raise ReadException("Failed to get session token, response got ignored by requester")
self._log_response(response)
session_token = dpath.util.get(self._decoder.decode(response), self.session_token_path)
if self.expiration_duration is not None:
self._next_expiration_time = pendulum.now() + self.expiration_duration
self._token = session_token

def _log_response(self, response: requests.Response) -> None:
self.message_repository.log_message(
Level.DEBUG,
lambda: format_http_message(
response,
"Login request",
"Obtains session token",
None,
is_auxiliary=True,
),
)


@dataclass
class InterpolatedStringTokenProvider(TokenProvider):
config: Config
api_token: Union[InterpolatedString, str]
parameters: Mapping[str, Any]

def __post_init__(self) -> None:
self._token = InterpolatedString.create(self.api_token, parameters=self.parameters)

def get_token(self) -> str:
return str(self._token.eval(self.config))
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,81 @@ definitions:
$parameters:
type: object
additionalProperties: true
SessionTokenAuthenticator:
type: object
required:
- type
- login_requester
- session_token_path
- request_authentication
properties:
type:
type: string
enum: [SessionTokenAuthenticator]
login_requester:
title: Login Requester
description: Description of the request to perform to obtain a session token to perform data requests. The response body is expected to be a JSON object with a session token property.
"$ref": "#/definitions/HttpRequester"
examples:
- type: HttpRequester
url_base: "https://my_api.com"
path: "/login"
authenticator:
type: BasicHttpAuthenticator
username: "{{ config.username }}"
password: "{{ config.password }}"
session_token_path:
title: Session Token Path
description: The path in the response body returned from the login requester to the session token.
examples:
- ["access_token"]
- ["result", "token"]
type: array
items:
type: string
expiration_duration:
title: Expiration Duration
description: The duration in ISO 8601 duration notation after which the session token expires, starting from the time it was obtained. Omitting it will result in the session token being refreshed for every request.
type: string
examples:
- "PT1H"
- "P1D"
request_authentication:
title: Data request authentication
description: Authentication method to use for requests sent to the API, specifying how to inject the session token.
anyOf:
- "$ref": "#/definitions/SessionTokenRequestApiKeyAuthenticator"
- "$ref": "#/definitions/SessionTokenRequestBearerAuthenticator"
$parameters:
type: object
additionalProperties: true
SessionTokenRequestApiKeyAuthenticator:
type: object
title: API Key Authenticator
description: Authenticator for requests using the session token as an API key that's injected into the request.
required:
- type
- inject_into
properties:
type:
enum: [ApiKey]
inject_into:
title: Inject API Key Into Outgoing HTTP Request
description: Configure how the API Key will be sent in requests to the source API.
"$ref": "#/definitions/RequestOption"
examples:
- inject_into: header
field_name: Authorization
- inject_into: request_parameter
field_name: authKey
SessionTokenRequestBearerAuthenticator:
title: Bearer Authenticator
description: Authenticator for requests using the session token as a standard bearer token.
required:
- type
properties:
type:
enum: [Bearer]
HttpRequester:
title: HTTP Requester
description: Requester submitting HTTP requests and extracting records from the response.
Expand Down Expand Up @@ -1057,6 +1132,7 @@ definitions:
- "$ref": "#/definitions/OAuthAuthenticator"
- "$ref": "#/definitions/NoAuth"
- "$ref": "#/definitions/SessionTokenAuthenticator"
- "$ref": "#/definitions/LegacySessionTokenAuthenticator"
error_handler:
title: Error Handler
description: Error handler component that defines how to handle errors.
Expand Down Expand Up @@ -1740,9 +1816,9 @@ definitions:
description: The stream schemas representing the shape of the data emitted by the stream.
type: object
additionalProperties: true
SessionTokenAuthenticator:
LegacySessionTokenAuthenticator:
title: Session Token Authenticator
description: Authenticator for requests authenticated using session tokens. A session token is a random value generated by a server to identify a specific user for the duration of one interaction session.
description: Deprecated - use SessionTokenAuthenticator instead. Authenticator for requests authenticated using session tokens. A session token is a random value generated by a server to identify a specific user for the duration of one interaction session.
type: object
required:
- type
Expand All @@ -1753,7 +1829,7 @@ definitions:
properties:
type:
type: string
enum: [SessionTokenAuthenticator]
enum: [LegacySessionTokenAuthenticator]
header:
title: Session Request Header
description: The name of the session token header that will be injected in the request
Expand Down
Loading

0 comments on commit 58cc540

Please sign in to comment.