Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a ion Pebble function (similar as json but that takes ion as input) #3661

Closed
brian-mulier-p opened this issue May 2, 2024 · 13 comments
Labels
enhancement New feature or request
Milestone

Comments

@brian-mulier-p
Copy link
Member

Feature description

The idea is to convert a ION String to an object on which we can retrieve attributes easily as we do with the json() function

@brian-mulier-p brian-mulier-p added the enhancement New feature or request label May 2, 2024
@brian-mulier-p brian-mulier-p added this to the v0.17.0 milestone May 2, 2024
@anna-geller
Copy link
Member

anna-geller commented May 5, 2024

Can you share a use case and an example of how you imagine this to look like? ION is a list of rows so not really meant to be transformed with Pebble, but rather with Transform tasks row by row (e.g. Groovy, Jython)

At first look, it seems that someone has a use case for ForEachItem (break a file into chunks and process row by row)

@tchiotludo tchiotludo modified the milestones: v0.17.0, v0.18.0 May 5, 2024
@brian-mulier-p
Copy link
Member Author

We should auto-detect if it's a ionl (maybe also add that to json function).
Basically the usecase is simple, a user gets some ion-formatted data (in the usecase a user had, from a Kafka consumer) which is multiple ion objects with 1 per line.

The user wants to do an insert 1 row per 1 row by handcrafting the INSERT query by accessing object attributes individually. Currently it's not possible to get a ion object attribute other than first adding a JsonWriter for eg. to get a json object from which you can access attributes

@anna-geller
Copy link
Member

Thank you. Initially, I'd think about adding Batch tasks for the database the user wanted. I think here it was Cassandra right?

We have Batch tasks for many databases so it seems nice to follow the same approach for consistency e.g.:

@brian-mulier-p
Copy link
Member Author

I agree but I believe it's a non-sense that we don't have a ion function to parse it while every task output is in such format. Even though it's not a particularly fun format to deal with, we better support access attributes directly from ion rather than forcing users to do some transform to json before 🤔

@anna-geller
Copy link
Member

This week is full for you so let me schedule something next week to discuss more

@johnkm516
Copy link

I agree but I believe it's a non-sense that we don't have a ion function to parse it while every task output is in such format. Even though it's not a particularly fun format to deal with, we better support access attributes directly from ion rather than forcing users to do some transform to json before 🤔

@anna-geller
Came to Github to suggest the exact same thing. My company is currently doing a PoC for Kestra Enterprise and am running into the same issue. It makes no sense that Kestra internal storage is formatted in ion by default but provides no native way to access attributes for the data once it has been stored.

The batch activity for DB is too simplistic for most use cases; it doesn't leverage the strongpoints of Kestra.
For example, one of my PoC flows is to sync DB data, but it's not as simple as a batch. For each row, I want to run a procedure / complicated query that is not a simple insert query. Once the procedure is actually successfully completed, for each row I want to update the origin DB table acknowledging that the row has successfully been synced (Set Interface_Update field to 'Y' when it was originally 'N' or null) . The batch activity does not support such use cases, and in cases of error I want to replay the sync for that specific row, not the entire bulk.

Right now I'm forced to store the entire batch data in the execution context in order to access it.

@johnkm516
Copy link

johnkm516 commented May 8, 2024

Sorry for the double post, but I would like to add some more specific suggestions on exactly how ion files should be used in Kestra :

Pebble functions and jq should natively support loading ion files directly. As in, ion files should be treated exactly the same as Kestra's execution context, accessible anywhere in any activity. For example, if I was manipulating and accessing an output object within Kestra's execution context like this :

{{ outputs.myquery.rows | jq('.[:100] | .employeeName') }} //any jq or pebble function is applicable here

I should be able to access data in an ion file almost exactly the same way like this :

{{ ion(outputs.myquery.uri) | jq('.[:100] | .employeeName') }} //any jq or pebble function is applicable here

The whole point of storing data in ion format is to avoid putting large amounts of data into the execution context (into the DB). Instead of storing data in the DB, the data should be able to be stored in ion and accessed directly like the above, but without saving it into the execution context.
The above example expression would :

  1. load the entire ion data into memory temporarily to run the expression, but NOT save it into the execution context
  2. slice the loaded data in memory and select the employeeName attribute
  3. The result of the expression is saved in the execution context wherever it's used

For cases where the ion data is very large and cannot be stored in memory, there should be a transformIon activity :

Required input parameters :

  • File URI of the input Ion file
  • Batch (split and batch process the file)
  • Transform (using pebble functions, like this :
    {{ this | jq('.[] | .employeeName') }} or this {{ ? | jq('.[] | .employeeName') }} whatever you want to denote the loaded batch data

Output parameters :

  • array of output ion file URIs (for each transformation batch)

In theory, treating ion files the same as Kestra's execution context should be fine as long as the file URI is stored in the execution context. The flow would therefore still be replayable. The data can be loaded into memory and accessed during runtime, and even accessed dynamically in the Kestra UI similar to how ion files are accessed in the logs currently. This would take the load off the database and allow us to rely more on the Kestra internal storage.

@anna-geller
Copy link
Member

anna-geller commented May 8, 2024

thanks, @johnkm516. Have you tried ForEachItem? ForEachItem is intended for that use case to split the ION file into batches of 1 to allow processing each row separately. We can consider adding some new functionality to process a single-row ION file split with ForEachItem

Feel free to share more high-level context about the kind of data transformations you are trying to do, this would help a lot to figure out what pebble (or other) kind of functionality we should consider

@johnkm516
Copy link

johnkm516 commented May 8, 2024

Hi @anna-geller,

The data transformations can be as simple as merging fields with other data sources and inputting it into a database, although as the number of fields increase such simple transformations can become quickly complicated.

For data warehouse tasks, bulk inserts, batched data transformations using python scripts, etc., are all good solutions that Kestra offers. In my cases these flows aren't mission critical, it's fine if a couple rows of data go missing in a lake of millions of rows of data.

However, my use cases for Kestra is for far more mission critical tasks; moving design files, interfacing master data such as line plans, order sheets from a separate PLM system to the ERP, interfacing data between WMS to the ERP, interfacing B2B and B2C orders to the ERP, etc. All flows where each row of data matters, and a failure for a flow can mean a shipment gets delayed, an order goes missing, etc.

Kestra IMO is a perfect solution because it allows us, in cases of failure, replay the flows that failed using its execution context, modify flows if needed before replaying it, alert my team if a flow fails, etc.

Let's take the ForEachItem activity. Let's say I queried for a set of data of 1000 rows using a DB query and used the store option to store the data in ION format. Then, I use ForEachItem to split the ION data for each row. What can I do afterwards? I now have 1000 File URIs for each row in ION format, but now I have to parse it manually because something like
{{ json(read(outputs.foreachitem[0].uri)) }} does not work, I have a string that looks like a json but doesn't have the double quotes around the key, which forces me to do some sort of workaround like building a script that parses ION into Json, or using the IonToCsv and some other activities to somehow convert it into a usable json. Furthermore, splitting the single ION file into 1000 separate ION files for each row seem like a waste, instead of splitting it into files I want to read the single 1000 row ION file into memory, do pebble transformation on that data directly in the ForEachItem, pass that single row into a subflow to do mission critical tasks, and keep the result of the transformation in the execution context. Processing each row of data as a separate execution of a subflow is important for us as it allows us to replay that single row of data that might have failed. Processing in bulk does not work for our use cases because much of our data is transactional in nature, and is not idempotent - the subflow processing each row of data should only be processed once.

pebble and jq as it's accessed for any object in Kestra's execution context already fits 99% of all use cases for data transformation. Jq allows us to transform objects into an array, map the original object to different fields, even create nested fields, filter out data, etc. I just want to be able to use ION data stored in Kestra's internal storage the exact same way I use an object in Kestra's execution context, as described in my previous post.

Edit : I want to emphasize that you cannot cover all use cases simply by modifying ForEachItem to add an option to parse ION. Rather than modifying that single activity, a pebble function such as ion() with identical functionality to json() should be implemented like @brian-mulier-p suggested. Pebble expressions can be used anywhere in any activity, and the functionality to be able to read ION should be available anywhere, just like the json() function.

@anna-geller
Copy link
Member

anna-geller commented May 8, 2024

Totally understandable. We have some pain in the implementation. If the function processes a single row, we could add that quickly, no issue there. However, we cannot process 1000-row-ION file in Pebble as it would break the executor. This is why we have the FileTransform tasks because then the data processing is offloaded to the Worker and you can do transformations row by row over the ION file in a friendly Python-like syntax e.g.: https://kestra.io/blueprints/103-extract-data-transform-it-and-load-it-in-parallel-to-s3-and-postgres-in-less-than-7-seconds. Could you DM me some example flows you are using via Slack? We could then figure out a reliable solution, e.g. a dedicated task that can take ION and process it in a friendly syntax (e.g. we were exploring JSONata for JSON #3148. You can also schedule something for next week if easier https://calendly.com/anna__geller

For now, I would recommend transforming the ION file from the query task to JSON before feeding it into ForEachItem using the JsonWriter task. This way, you can work with JSON and jq everywhere

@johnkm516
Copy link

johnkm516 commented May 13, 2024

Totally understandable. We have some pain in the implementation. If the function processes a single row, we could add that quickly, no issue there. However, we cannot process 1000-row-ION file in Pebble as it would break the executor. This is why we have the FileTransform tasks because then the data processing is offloaded to the Worker and you can do transformations row by row over the ION file in a friendly Python-like syntax e.g.: https://kestra.io/blueprints/103-extract-data-transform-it-and-load-it-in-parallel-to-s3-and-postgres-in-less-than-7-seconds. Could you DM me some example flows you are using via Slack? We could then figure out a reliable solution, e.g. a dedicated task that can take ION and process it in a friendly syntax (e.g. we were exploring JSONata for JSON #3148. You can also schedule something for next week if easier https://calendly.com/anna__geller

For now, I would recommend transforming the ION file from the query task to JSON before feeding it into ForEachItem using the JsonWriter task. This way, you can work with JSON and jq everywhere

@anna-geller

Hi Anna, sorry for the late reply, I was away since Thursday. The flow is relatively very simple. It's just a query that gets batches of top 100 rows (currently without using store for the query), and then uses Parallel like this :

  - id: parallel
    type: io.kestra.core.tasks.flows.EachParallel
    tasks:
      - id: subflow
        type: io.kestra.core.tasks.flows.Subflow
        namespace: dev
        inputs:
          masterDataRow: "{{ taskrun.value }}"
        flowId: Centric_Sync_Files_Worker
        retry:
          type: constant
          interval: PT3M
          maxAttempt: 5
        transmitFailed: true
        wait: true
    value: "{{trigger.rows | jq('.[]') | chunk(1)}}"

The above works fine, but if I were to convert parallel to ForEachItem instead using ION, there's a lot of inefficiencies as we've discussed :

  1. the ForEachItem will split the ion into a file for each row, I'd rather just load it into memory in batches and do the transformation in memory, it's a waste of drive space
  2. I have to use JsonWriter on top of 1) which will make me create even more files
  3. Even if I delete the files afterward if I have a lot of these types of flows running at once I think it will busy up the drive making that the bottleneck in most flows. Currently I have other production systems using the drive and I don't want to have to buy a new storage drive specifically for Kestra's internal storage
  4. Log the full child execution URLs in the ForEachItem task logs #2481 I can't access the subflow directly from the ForEachItem logs like I can with Parallel

I understand now the problem is large amounts of data would break the Executor.
If there was a way to transform the ION file as it's reading it, with a hard limit for the batch size it would be extremely useful. For my use case it would be per row anyways. Given what you've told me, I think #3148 is a great solution. Some additional required input parameters for #3148 would be batch size (with a hard limit) and store (save as json file or load it into execution context).

@anna-geller
Copy link
Member

@johnkm516 again, let's please schedule a meeting with me, it will be easier https://calendly.com/anna__geller

@anna-geller
Copy link
Member

as discussed offline via Slack, @johnkm516 I'm closing this issue and we'll implement:

  1. The ION-processing function as part of Improve support for ION/JSON processing in Pebble expressions #3715
  2. The JSONata transform task as discussed Feature: provide core task for easily manipulating JSON structured data #3148

@anna-geller anna-geller closed this as not planned Won't fix, can't repro, duplicate, stale May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants