Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Change how read/write args are read from metadata #15

Open
mjkanji opened this issue Jul 6, 2023 · 4 comments
Open

Change how read/write args are read from metadata #15

mjkanji opened this issue Jul 6, 2023 · 4 comments

Comments

@mjkanji
Copy link

mjkanji commented Jul 6, 2023

The current implementation is unnecessarily susceptible to breaking changes. Say a new version of Polars (or pyarrow.dataset) removes/renames a given write_parquet argument. Because the arguments are currently hard-coded in the PolarsParquetIOManager, this would break user scripts if they upgrade to the newer version of Polars.

I think a good solution would be switching to a dictionary (called something along the lines of polars_io_args -- I'm sure there's a better name) as follows:

@asset(
    metadata={
        "polars_io_args": {
            "compression": "snappy"
        }
    }
)
def some_asset():
    ...

And the dump_df_to_path and scan_df_from_path method then forward these args to Polars, without hard-coding their names:

class PolarsParquetIOManager(BasePolarsUPathIOManager):
    ...

    def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath):
        assert context.metadata is not None
        io_args = context.metadata.get("polars_io_args", {})

        with path.open("wb") as file:
            df.write_parquet(
                file,
                **io_args 
            )

    def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame:
        assert context.metadata is not None
        io_args = context.metadata.get("polars_io_args", {})
        io_args.setdefault("format", "parquet")  # sets format to Parquet if not defined already


        fs: Union[fsspec.AbstractFileSystem, None] = None

        try:
            fs = path._accessor._fs
        except AttributeError:
            pass

        return pl.scan_pyarrow_dataset(
            ds.dataset(
                str(path),
                filesystem=fs,
                **io_args
            ),
            allow_pyarrow_filter=context.metadata.get("allow_pyarrow_filter", True),
        )

This

  • helps reduces the amount of "coupling" we have in terms of expected arguments
  • is overall "cleaner", as we're separating IO-related metadata in a different 'namespace' compared to other metadata.
  • helps the metadata section in Dagit be less 'cluttered' with IO minutia by hiding it under a single JSON entry.

Finally, this might also enable (in the future) 'hybrid' IO managers which save the same asset in two ways, because you can separate each IO manager's args in different keys, even if the names of the children-args clash/overlap.

The only problem with the above solution is that I can't think of a "clean" solution that also allows you to pass allow_pyarrow_filer values because io_args is unpacked at a different place.

@mjkanji
Copy link
Author

mjkanji commented Jul 7, 2023

Following up on polars_io_args from above, instead of having a hard-coded name, we could instead leave this up to the user by adding an optional metadata_args_key (again, there's probably a better name) field to PolarsParquetIOManager (or potentially BasePolarsUPathIOManager so all child classes inherit the same functionality?).

And we then access the args as follows:

assert context.metadata is not None
io_args = context.metadata.get(self.metadata_args_key, {})

@danielgafni
Copy link
Owner

I like your ideas, I'll probably implement them.

However, I'd like to wait a little to see if dagster-io/dagster#15125 would get any traction.

@ei-grad
Copy link

ei-grad commented Jul 10, 2023

Please, think about keeping the IOManager-related config in the configuration of IOManager itself. There is already the partition_expr metadata field "convention", which looks nice especially for RDBMS IOManagers, but doesn't play well with IOManagers for pandas/spark/polars (which may require a more complex solution to manage output dataset partitioning, than the table column which is usually enough for RDBMS).

@danielgafni
Copy link
Owner

danielgafni commented Jul 17, 2023

Hey @ei-grad, thanks for your interest in the lib!

I don't think all parameters should go into the config. I see two reasons for including them there:

  1. The parameter may only be provided at runtime. For example - overwrite_schema for deltalake. I would only need to set it sometimes in very specific situations. I don't want it to be in the metadata as it would be hard-coded in this case (will fix soon! Move some parameters to the IOManager config #17 ). The ability to provide values at runtime is the main difference between config and metadata.
  2. The parameter is not asset-specific, e.g. storage bucket, connection parameters, etc. Including asset-specific parameters (like partitioning, compression, etc) parameters in the IOManager config would require creating multiple IOManagers with different parameters for different assets, which seems unnecessary to me.

Now, you are saying some parameters can't be expressed in terms of metadata, but can be expressed in terms of configuration. This is a bit surprising to me, because as I understand similar constraints (for example, being json-like) are applied in both cases.

Would you be able to explain how exactly does metadata "not play well" in the cases you brought up above, please?

By the way, in terms of partitioning I was thinking to reuse the partition_expr approach, because Polars can execute SQL. So the DBIOManager approach should work here too.

Thanks again!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants