Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a general nak_delay for all msgs and a header to set the nak_delay individually for each msg on NATS Jetstream input #2556

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

mfamador
Copy link
Member

@mfamador mfamador commented Apr 30, 2024

Add an optional parameter to make NATS Jetstream to delay the redelivery of all msgs when negatively acknowledged. It can be used as a retry mechanism when a transient error occurs.

input:
  nats_jetstream:
    urls:
      - localhost:4222
    subject: foo
    durable: bar
    nak_delay: 5m

pipeline:
  processors:
  -  . . . 

output:
  switch:
    cases:
    - check: 'errored()'
      output:
        reject: "rejecting due to processing error: ${! error() }"
    - output:
        resource: output_ok

Additionally, we can use a header on each msg to define the unix epoch timestamp until when we want to delay the msg processing. By default is nak_delay_until but we can set it with another name using the parameter nak_delay_until_header

input:
  nats_jetstream:
    subject: foo
    durable: bar

pipeline:
  processors:
  - mapping: throw("transient error")
    
output:
  switch:
    cases:
    - check: '@nak_delay_until.number() - timestamp_unix() > 0'
      output:
        reject: "not time to process it yet ${! @time_to_process }"
              
    - check: 'errored()'
      output:
        nats_jetstream:
          subject: foo
          headers:
            nak_delay_until: ${! (timestamp_unix() + @num_retries * 10).int64() }
            num_retries: ${! @num_retries }
        processors:
        - mapping: meta num_retries = @num_retries.number().or(0) + 1
            
    - output:
        drop: { } # our regular output
        processors:

@mfamador mfamador requested a review from Jeffail as a code owner April 30, 2024 12:59
@mfamador mfamador marked this pull request as draft April 30, 2024 14:20
@mfamador mfamador changed the title Support a nak_delay on NATS Jetstream input Support a general nak_delay for all msgs and a header to set the nak_delay individually for each msg on NATS Jetstream input Apr 30, 2024
…elay a msg for a custom period of time.

Signed-off-by: Marco Amador <amador.marco@gmail.com>
…elay a msg for a custom period of time.

Signed-off-by: Marco Amador <amador.marco@gmail.com>
@mfamador mfamador marked this pull request as ready for review April 30, 2024 15:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant