Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to Teradata Provider #39217

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
67 changes: 67 additions & 0 deletions airflow/providers/teradata/hooks/teradata.py
Expand Up @@ -32,6 +32,17 @@
if TYPE_CHECKING:
from airflow.models.connection import Connection

PARAM_TYPES = {bool, float, int, str}


def _map_param(value):
if value in PARAM_TYPES:
# In this branch, value is a Python type; calling it produces
# an instance of the type which is understood by the Teradata driver
# in the out parameter mapping mechanism.
value = value()
return value
Comment on lines +38 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that i understand this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used to translate stored procedure out parameters into a format understandable by the driver. For instance, str will be converted to an empty string (''). Stored procedures can be invoked with output parameters in various ways, as illustrated below.

TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )

This will result in the statement: {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, 0, ''].
If we omit the usage of this function, the statement would be converted to {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, <class 'int'>, <class 'str'>], which leads to failure with an error.

Similarly, consider another invocation of the TeradataStoredProcedureOperator:

TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
 )

This will translate to the statement: {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, ?, ?].

Example DAG - https://github.com/apache/airflow/blob/1747e64f51f53a50a62ed31550be9ecf0c5e4ac7/tests/system/providers/teradata/example_teradata_call_sp.py



class TeradataHook(DbApiHook):
"""General hook for interacting with Teradata SQL Database.
Expand Down Expand Up @@ -187,3 +198,59 @@ def get_ui_field_behaviour() -> dict:
"password": "dbc",
},
}

def callproc(
self,
identifier: str,
autocommit: bool = False,
parameters: list | dict | None = None,
) -> list | dict | tuple | None:
"""
Call the stored procedure identified by the provided string.

Any OUT parameters must be provided with a value of either the
expected Python type (e.g., `int`) or an instance of that type.

:param identifier: stored procedure name
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
:param parameters: The `IN`, `OUT` and `INOUT` parameters for Teradata
stored procedure

The return value is a list or mapping that includes parameters in
both directions; the actual return type depends on the type of the
provided `parameters` argument.

"""
if parameters is None:
parameters = []

args = ",".join("?" for name in parameters)

sql = f"{{CALL {identifier}({(args)})}}"

def handler(cursor):
records = cursor.fetchall()

if records is None:
return

if isinstance(records, list):
return [row for row in records]

if isinstance(records, dict):
return {n: v for (n, v) in records.items()}
raise TypeError(f"Unexpected results: {records}")

result = self.run(
sql,
autocommit=autocommit,
parameters=(
{name: _map_param(value) for (name, value) in parameters.items()}
if isinstance(parameters, dict)
else [_map_param(value) for value in parameters]
),
handler=handler,
)

return result
43 changes: 41 additions & 2 deletions airflow/providers/teradata/operators/teradata.py
Expand Up @@ -17,11 +17,15 @@
# under the License.
from __future__ import annotations

from typing import Sequence
from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class TeradataOperator(SQLExecuteQueryOperator):
"""
Expand All @@ -41,8 +45,8 @@ class TeradataOperator(SQLExecuteQueryOperator):
"""

template_fields: Sequence[str] = (
"parameters",
"sql",
"parameters",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
Expand All @@ -62,3 +66,38 @@ def __init__(
}
super().__init__(**kwargs)
self.conn_id = conn_id


class TeradataStoredProcedureOperator(BaseOperator):
"""
Executes stored procedure in a specific Teradata database.

:param procedure: name of stored procedure to call (templated)
:param conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
reference to a specific Teradata database.
:param parameters: (optional, templated) the parameters provided in the call

"""

template_fields: Sequence[str] = (
"procedure",
"parameters",
)
ui_color = "#ededed"

def __init__(
self,
*,
procedure: str,
conn_id: str = TeradataHook.default_conn_name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought better to use teradata_conn_id instead rather than generic conn_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified conn_id to teradata_conn_id. It requires to change system tests. Changed system tests as per this modification.

parameters: dict | list | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.conn_id = conn_id
self.procedure = procedure
self.parameters = parameters

def execute(self, context: Context):
hook = TeradataHook(teradata_conn_id=self.conn_id)
return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)
10 changes: 10 additions & 0 deletions airflow/providers/teradata/provider.yaml
Expand Up @@ -33,6 +33,8 @@ dependencies:
- apache-airflow-providers-common-sql>=1.3.1
- teradatasqlalchemy>=17.20.0.0
- teradatasql>=17.20.0.28
- apache-airflow-providers-microsoft-azure
- apache-airflow-providers-amazon
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this means that anyone who is using teradata provider will be forced to also have azure and amazon.
I don't think it's right as these integrations are not part of the core functionality that this provider offer (unlike for example common-sql) I suggest to move them to optional dependencies like:

additional-extras:
- name: apache.beam
dependencies:
- apache-beam[gcp]
- name: cncf.kubernetes
dependencies:
- apache-airflow-providers-cncf-kubernetes>=7.2.0
- name: leveldb
dependencies:
- plyvel
- name: oracle
dependencies:
- apache-airflow-providers-oracle>=3.1.0
- name: facebook
dependencies:
- apache-airflow-providers-facebook>=2.2.0
- name: amazon
dependencies:
- apache-airflow-providers-amazon>=2.6.0

This will allow users of teradata to choose if they want to add the optional dependencies into their installation.
You can also wrap the imports with AirflowOptionalProviderFeatureException to prevent cases where someone uses operator without having the underlying provider installed (similar to what I suggested in #39366 (comment) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal thank you. Will change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified as suggested


integrations:
- integration-name: Teradata
Expand All @@ -57,6 +59,14 @@ transfers:
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.teradata_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/teradata_to_teradata.rst
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.azure_blob_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
- source-integration-name: Amazon Simple Storage Service (S3)
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.s3_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst

connection-types:
- hook-class-name: airflow.providers.teradata.hooks.teradata.TeradataHook
Expand Down
95 changes: 95 additions & 0 deletions airflow/providers/teradata/transfers/azure_blob_to_teradata.py
@@ -0,0 +1,95 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class AzureBlobStorageToTeradataOperator(BaseOperator):
"""

Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToTeradataOperator`

:param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated)
The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param azure_conn_id: The Airflow WASB connection used for azure blob credentials.
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`

Note that ``blob_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("blob_source_key", "teradata_table")
ui_color = "#e07c24"

def __init__(
self,
*,
blob_source_key: str,
azure_conn_id: str = "azure_default",
teradata_table: str,
teradata_conn_id: str = "teradata_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.blob_source_key = blob_source_key
self.azure_conn_id = azure_conn_id
self.teradata_table = teradata_table
self.teradata_conn_id = teradata_conn_id

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table
)
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
access_id = conn.login if conn.login is not None else ""
access_secret = conn.password if conn.password is not None else ""
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
ACCESS_ID= '{access_id}'
ACCESS_KEY= '{access_secret}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this expose the secret in the logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access_Id_Key_Log
secret is logging as masked in the log.

) AS d
) WITH DATA
"""
Taragolis marked this conversation as resolved.
Show resolved Hide resolved
try:
teradata_hook.run(sql, True)
except Exception as ex:
self.log.error(str(ex))
raise
self.log.info("The transfer of data from Azure Blob to Teradata was successful")
100 changes: 100 additions & 0 deletions airflow/providers/teradata/transfers/s3_to_teradata.py
@@ -0,0 +1,100 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class S3ToTeradataOperator(BaseOperator):
"""
Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ToTeradataOperator`

:param s3_source_key: The URI format specifying the location of the S3 object store.(templated)
The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param aws_conn_id: The Airflow AWS connection used for AWS credentials.
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`.

Note that ``s3_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("s3_source_key", "teradata_table")
ui_color = "#e07c24"

def __init__(
self,
*,
s3_source_key: str,
teradata_table: str,
aws_conn_id: str = "aws_default",
teradata_conn_id: str = "teradata_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.s3_source_key = s3_source_key
self.teradata_table = teradata_table
self.aws_conn_id = aws_conn_id
self.teradata_conn_id = teradata_conn_id

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table
)

s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
access_key = (
s3_hook.conn_config.aws_access_key_id if s3_hook.conn_config.aws_access_key_id is not None else ""
)
access_secret = (
s3_hook.conn_config.aws_secret_access_key
if s3_hook.conn_config.aws_secret_access_key is not None
else ""
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not semantically correct in case of AWS Connection

Contain itself might not contain aws_access_key_id / aws_secret_access_key it could be obtained by botocore / boto3 credential strategy

You should call get_credentials method which returns named tuple with frozen credentials

ReadOnlyCredentials = namedtuple(
    'ReadOnlyCredentials', ['access_key', 'secret_key', 'token']
)

Please note in case if token is not None you should also provide it because without it (STS Session Token) credentials is not valid.

And I can't find how to do it within the terradata, because manual doesn't contain such information https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Authentication-for-External-Object-Stores/Using-AWS-Assume-Role/Setting-Up-Assume-Role-on-Analytics-Database however this KB shows that somehow it supported

And finally there is pretty difficult to handle anonymous access, because there is no out-of-box solution for that in AWS Hooks, so I would recommend to add separate parameter for that, so we could skip obtain connection at all if this kind of access required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Utilizing the get_credentials function to fetch the access key and secret from the AWS connection.
  2. Introduced a new parameter, "public_bucket," to indicate whether the bucket is publicly accessible or not. Accessing get_credentials only when bucket is not public.
  3. Concerning the STS session token, we had a JIRA task pending to integrate Teradata authorization object as a parameter for cloud transfer operators. This implementation will be integrated concurrently with the resolution of this JIRA task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerning the STS session token, we had a JIRA task pending to integrate Teradata authorization object as a parameter for cloud transfer operators.

I would recommend to add into the Operator documentation about current limitation


.. note::
   Fo Bar Spam Egg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added documentation as suggested at docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst https://github.com/apache/airflow/pull/39217/files#diff-763c72396c181b27414fffe4f0cd1a2c97c01868759a119099b56867a03d9e8b


teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.s3_source_key}'
ACCESS_ID= '{access_key}'
ACCESS_KEY= '{access_secret}'
) AS d
) WITH DATA
"""
Taragolis marked this conversation as resolved.
Show resolved Hide resolved
try:
teradata_hook.run(sql, True)
except Exception as ex:
self.log.error(str(ex))
raise
self.log.info("The transfer of data from S3 to Teradata was successful")