New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MAINTENANCE] Remove batching regex from FilePathDataConnector #9898
[MAINTENANCE] Remove batching regex from FilePathDataConnector #9898
Conversation
✅ Deploy Preview for niobium-lead-7998 canceled.
|
…_expectations into m/v1-290/remove_batching_regex_from_asset_sig
…_expectations into m/v1-290/remove_batching_regex_from_asset_sig
…_expectations into m/v1-290/remove_batching_regex_from_asset_sig
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## develop #9898 +/- ##
===========================================
- Coverage 77.85% 77.83% -0.02%
===========================================
Files 454 454
Lines 39517 39533 +16
===========================================
+ Hits 30766 30771 +5
- Misses 8751 8762 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…_expectations into m/v1-290/remove_batching_regex_from_asset_sig
@pytest.mark.big | ||
@pytest.mark.xfail( | ||
reason="Accessing objects on azure.storage.blob using Pandas is not working, due to local credentials issues (this test is conducted using Jupyter notebook manually)." # noqa: E501 | ||
) | ||
def test_get_batch_list_from_fully_specified_batch_request( | ||
monkeypatch: pytest.MonkeyPatch, | ||
pandas_abs_datasource: PandasAzureBlobStorageDatasource, | ||
): | ||
azure_client: azure.BlobServiceClient = cast(azure.BlobServiceClient, MockBlobServiceClient()) | ||
|
||
def instantiate_azure_client_spy(self) -> None: | ||
self._azure_client = azure_client | ||
|
||
monkeypatch.setattr( | ||
great_expectations.execution_engine.pandas_execution_engine.PandasExecutionEngine, | ||
"_instantiate_s3_client", | ||
instantiate_azure_client_spy, | ||
raising=True, | ||
) | ||
asset = pandas_abs_datasource.add_csv_asset( | ||
name="csv_asset", | ||
batching_regex=r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv", | ||
abs_container="my_container", | ||
) | ||
|
||
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 1 | ||
batch = batches[0] | ||
assert batch.batch_request.datasource_name == pandas_abs_datasource.name | ||
assert batch.batch_request.data_asset_name == asset.name | ||
assert batch.batch_request.options == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.metadata == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.id == "pandas_abs_datasource-csv_asset-name_alex-timestamp_20200819-price_1300" | ||
|
||
request = asset.build_batch_request({"name": "alex"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this test was never completed; i don't see any issue with "local credentials" as mentioned in the pytest.mark.skip
, it just looks like the mock ABS backend was never finished/implemented correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it raises a TestConnectionError
because the mock ABS client isn't returning anything:
def test_connection(self) -> None:
"""Test the connection for the DataAsset.
Raises:
TestConnectionError: If the connection test fails.
"""
try:
if self._data_connector.test_connection():
return None
except Exception as e:
raise TestConnectionError( # noqa: TRY003
f"Could not connect to asset using {type(self._data_connector).__name__}: Got {type(e).__name__}" # noqa: E501
) from e
> raise TestConnectionError(self._test_connection_error_message)
E great_expectations.datasource.fluent.interfaces.TestConnectionError: No file in bucket "test_bucket" with prefix "" and recursive file discovery set to "False" matched regular expressions pattern "(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv" using delimiter "/" for DataAsset "csv_asset".
@pytest.mark.big | ||
@pytest.mark.xfail( | ||
reason="Accessing objects on google.cloud.storage using Pandas is not working, due to local credentials issues (this test is conducted using Jupyter notebook manually)." # noqa: E501 | ||
) | ||
def test_get_batch_list_from_fully_specified_batch_request( | ||
monkeypatch: pytest.MonkeyPatch, | ||
pandas_gcs_datasource: PandasGoogleCloudStorageDatasource, | ||
): | ||
gcs_client: google.Client = cast(google.Client, MockGCSClient()) | ||
|
||
def instantiate_gcs_client_spy(self) -> None: | ||
self._gcs = gcs_client | ||
|
||
monkeypatch.setattr( | ||
great_expectations.execution_engine.pandas_execution_engine.PandasExecutionEngine, | ||
"_instantiate_s3_client", | ||
instantiate_gcs_client_spy, | ||
raising=True, | ||
) | ||
asset = pandas_gcs_datasource.add_csv_asset( | ||
name="csv_asset", | ||
batching_regex=r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv", | ||
) | ||
|
||
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 1 | ||
batch = batches[0] | ||
assert batch.batch_request.datasource_name == pandas_gcs_datasource.name | ||
assert batch.batch_request.data_asset_name == asset.name | ||
assert batch.batch_request.options == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.metadata == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.id == "pandas_gcs_datasource-csv_asset-name_alex-timestamp_20200819-price_1300" | ||
|
||
request = asset.build_batch_request({"name": "alex"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 2 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't dig into this test as deeply as I did the pandas ABS test above, but pytest.mark.skip
message is exactly the same, and the implementation looks almost identical, and it also doesn't pass, so it seems safe to assume that this test was also never completed.
@pytest.mark.big | ||
@pytest.mark.xfail( | ||
reason="Accessing objects on azure.storage.blob using Spark is not working, due to local credentials issues (this test is conducted using Jupyter notebook manually)." # noqa: E501 | ||
) | ||
def test_get_batch_list_from_fully_specified_batch_request( | ||
monkeypatch: pytest.MonkeyPatch, | ||
spark_abs_datasource: SparkAzureBlobStorageDatasource, | ||
): | ||
azure_client: azure.BlobServiceClient = cast(azure.BlobServiceClient, MockBlobServiceClient()) | ||
|
||
def instantiate_azure_client_spy(self) -> None: | ||
self._azure_client = azure_client | ||
|
||
monkeypatch.setattr( | ||
great_expectations.execution_engine.sparkdf_execution_engine.SparkDFExecutionEngine, | ||
"_instantiate_s3_client", | ||
instantiate_azure_client_spy, | ||
raising=True, | ||
) | ||
asset_specified_metadata = {"asset_level_metadata": "my_metadata"} | ||
asset = spark_abs_datasource.add_csv_asset( | ||
name="csv_asset", | ||
batching_regex=r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv", | ||
abs_container="my_container", | ||
batch_metadata=asset_specified_metadata, | ||
) | ||
|
||
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 1 | ||
batch = batches[0] | ||
assert batch.batch_request.datasource_name == spark_abs_datasource.name | ||
assert batch.batch_request.data_asset_name == asset.name | ||
assert batch.batch_request.options == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.metadata == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
**asset_specified_metadata, | ||
} | ||
assert batch.id == "spark_abs_datasource-csv_asset-name_alex-timestamp_20200819-price_1300" | ||
|
||
request = asset.build_batch_request({"name": "alex"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test fails because the SparkExecutionEngine doesn't have the attribute that we're trying to patch:
> monkeypatch.setattr(
great_expectations.execution_engine.sparkdf_execution_engine.SparkDFExecutionEngine,
"_instantiate_s3_client",
instantiate_azure_client_spy,
raising=True,
)
E AttributeError: <class 'great_expectations.execution_engine.sparkdf_execution_engine.SparkDFExecutionEngine'> has no attribute '_instantiate_s3_client'
test_spark_azure_blob_storage_datasource.py:354: AttributeError
As far as I can tell this test was never completed.
@pytest.mark.big | ||
@pytest.mark.xfail( | ||
reason="Accessing objects on google.cloud.storage using Spark is not working, due to local credentials issues (this test is conducted using Jupyter notebook manually)." # noqa: E501 | ||
) | ||
def test_get_batch_list_from_fully_specified_batch_request( | ||
monkeypatch: pytest.MonkeyPatch, | ||
spark_gcs_datasource: SparkGoogleCloudStorageDatasource, | ||
): | ||
gcs_client: google.Client = cast(google.Client, MockGCSClient()) | ||
|
||
def instantiate_gcs_client_spy(self) -> None: | ||
self._gcs = gcs_client | ||
|
||
monkeypatch.setattr( | ||
great_expectations.execution_engine.sparkdf_execution_engine.SparkDFExecutionEngine, | ||
"_instantiate_s3_client", | ||
instantiate_gcs_client_spy, | ||
raising=True, | ||
) | ||
asset_specified_metadata = {"asset_level_metadata": "my_metadata"} | ||
asset = spark_gcs_datasource.add_csv_asset( | ||
name="csv_asset", | ||
batching_regex=r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv", | ||
batch_metadata=asset_specified_metadata, | ||
) | ||
|
||
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 1 | ||
batch = batches[0] | ||
assert batch.batch_request.datasource_name == spark_gcs_datasource.name | ||
assert batch.batch_request.data_asset_name == asset.name | ||
assert batch.batch_request.options == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.metadata == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
**asset_specified_metadata, | ||
} | ||
assert batch.id == "spark_gcs_datasource-csv_asset-name_alex-timestamp_20200819-price_1300" | ||
|
||
request = asset.build_batch_request({"name": "alex"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test fails because the SparkExecutionEngine
doesn't have the attribute that we're trying to patch:
> monkeypatch.setattr(
great_expectations.execution_engine.sparkdf_execution_engine.SparkDFExecutionEngine,
"_instantiate_s3_client",
instantiate_gcs_client_spy,
raising=True,
)
E AttributeError: <class 'great_expectations.execution_engine.sparkdf_execution_engine.SparkDFExecutionEngine'> has no attribute '_instantiate_s3_client'
As far as I can tell this test was never completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the execution engine also doesn't have _instantiate_gcs_client
, which would make more sense than _isntantiate_s3_client
as written.
@pytest.mark.big | ||
@pytest.mark.xfail( | ||
reason="Accessing objects on moto.mock_s3 using Spark is not working (this test is conducted using Jupyter notebook manually)." # noqa: E501 | ||
) | ||
def test_get_batch_list_from_fully_specified_batch_request( | ||
spark_s3_datasource: SparkS3Datasource, | ||
): | ||
asset_specified_metadata = {"asset_level_metadata": "my_metadata"} | ||
asset = spark_s3_datasource.add_csv_asset( | ||
name="csv_asset", | ||
batching_regex=r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv", | ||
header=True, | ||
infer_schema=True, | ||
batch_metadata=asset_specified_metadata, | ||
) | ||
asset.build_batch_request({"year": "2024", "month": 5}) | ||
|
||
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 1 | ||
batch = batches[0] | ||
assert batch.batch_request.datasource_name == spark_s3_datasource.name | ||
assert batch.batch_request.data_asset_name == asset.name | ||
assert batch.batch_request.options == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.metadata == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
**asset_specified_metadata, | ||
} | ||
assert batch.id == "spark_s3_datasource-csv_asset-name_alex-timestamp_20200819-price_1300" | ||
|
||
request = asset.build_batch_request({"name": "alex"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test fails with an opaque Py4JJavaError
which makes me wonder if this datasource is actually functional:
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"})
> batches = asset.get_batch_list_from_batch_request(request)
test_spark_s3_datasource.py:214:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../great_expectations/datasource/fluent/data_asset/path/path_data_asset.py:144: in get_batch_list_from_batch_request
data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec)
../../../great_expectations/execution_engine/sparkdf_execution_engine.py:515: in get_batch_data_and_markers
batch_data = reader_fn(path)
../../../env/lib/python3.10/site-packages/pyspark/sql/readwriter.py:727: in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
../../../env/lib/python3.10/site-packages/py4j/java_gateway.py:1322: in __call__
return_value = get_return_value(
../../../env/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:169: in deco
return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
answer = 'xro29'
gateway_client = <py4j.clientserver.JavaClient object at 0x2898baef0>
target_id = 'o26', name = 'csv'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
> raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling o26.csv.
E : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
E at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
E at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
E at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
E at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
E at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
E at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
E at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
E at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
E at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
E at scala.collection.immutable.List.map(List.scala:293)
E at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
E at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
E
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe one needs to install an AWS provided jar to get spark to be able to read from s3.
…://github.com/great-expectations/great_expectations into m/v1-290/remove_batching_regex_from_asset_sig
great_expectations/datasource/fluent/data_connector/file_path_data_connector.py
Outdated
Show resolved
Hide resolved
…_expectations into m/v1-290/remove_batching_regex_from_asset_sig
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment and a question but nothing that is blocking. Thanks!
return [] | ||
regex_parser = RegExParser( | ||
regex_pattern=partitioner.regex, | ||
unnamed_regex_group_prefix=self._unnamed_regex_param_prefix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these unamed_regex_param_prefix
anymore?
@@ -143,7 +136,10 @@ def build_batch_spec(self, batch_definition: LegacyBatchDefinition) -> PathBatch | |||
# Interface Method | |||
@override | |||
def get_data_reference_count(self) -> int: | |||
data_references = self._get_data_references_cache(batching_regex=self._batching_regex) | |||
# todo: in the world of BatchDefinition, this method must accept a BatchRequest. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we file a ticket for this.
@pytest.mark.big | ||
@pytest.mark.xfail( | ||
reason="Accessing objects on moto.mock_s3 using Spark is not working (this test is conducted using Jupyter notebook manually)." # noqa: E501 | ||
) | ||
def test_get_batch_list_from_fully_specified_batch_request( | ||
spark_s3_datasource: SparkS3Datasource, | ||
): | ||
asset_specified_metadata = {"asset_level_metadata": "my_metadata"} | ||
asset = spark_s3_datasource.add_csv_asset( | ||
name="csv_asset", | ||
batching_regex=r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>\d{4})\.csv", | ||
header=True, | ||
infer_schema=True, | ||
batch_metadata=asset_specified_metadata, | ||
) | ||
asset.build_batch_request({"year": "2024", "month": 5}) | ||
|
||
request = asset.build_batch_request({"name": "alex", "timestamp": "20200819", "price": "1300"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 1 | ||
batch = batches[0] | ||
assert batch.batch_request.datasource_name == spark_s3_datasource.name | ||
assert batch.batch_request.data_asset_name == asset.name | ||
assert batch.batch_request.options == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
} | ||
assert batch.metadata == { | ||
"path": "alex_20200819_1300.csv", | ||
"name": "alex", | ||
"timestamp": "20200819", | ||
"price": "1300", | ||
**asset_specified_metadata, | ||
} | ||
assert batch.id == "spark_s3_datasource-csv_asset-name_alex-timestamp_20200819-price_1300" | ||
|
||
request = asset.build_batch_request({"name": "alex"}) | ||
batches = asset.get_batch_list_from_batch_request(request) | ||
assert len(batches) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe one needs to install an AWS provided jar to get spark to be able to read from s3.
This PR updates
FileDataAssets
andDirectoryDataAssets
to not require abatching_regex
parameter on instantiation.Changes
Other things to note
FilePathDataConnector
has been slightly refactored to make it clearer that its behavior differs significantly based on what sort of Asset it is serving.FileDataAssets
did preprocessing of their batching regex inside their init method - that has now been moved to the_preprocess_batching_regex
method.test_connection
logic forFileDataAsset
andDirectoryDataAsset
is not that useful in V1, since the specificbatching_regex
used to identify files is no longer owned by the asset. The majority of those tests have been removed, and future work will addtest_connection
functionality to theBatchDefinition
.test_get_batch_list_from_fully_specified_batch_request
for specific datasource/assets were marked as xfail for reasons that proved not to be accurate. They've been removed, and we should verify that we have adequate test coverage for those code paths. They've all been annotated with comments in this PR.