Skip to content

onyx-platform/onyx-amazon-sqs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

onyx-amazon-sqs

Onyx plugin for Amazon SQS.

Installation

In your project file:

[org.onyxplatform/onyx-amazon-sqs "0.14.5.0"]

In your peer boot-up namespace:

(:require [onyx.plugin.sqs-input]
          [onyx.plugin.sqs-output])

Limitations

This plugin currently only supports at least once behaviour.

It is neccessary to respond to failed async writes. For reads that have timed out, they will automatically retry. For failed writes we have to re-align the barriers, which resets to the last checkpoint.

Functions

Input Task

Catalog entry:

{:onyx/name <<TASK_NAME>>
 :onyx/plugin :onyx.plugin.sqs-input/input
 :onyx/type :input
 :onyx/medium :sqs
 :onyx/batch-size 10
 :onyx/batch-timeout 1000
 :sqs/region "us-east-1"
 :sqs/queue-name queue-name
 :sqs/deserializer-fn :clojure.edn/read-string
 :sqs/attribute-names []
 :sqs/message-attribute-names []
 ; :sqs/max-batch 10
 ; :sqs/max-inflight-receive-batches 10
 :onyx/doc "Reads segments from an SQS queue"}

Lifecycle entry:

{:lifecycle/task <<TASK_NAME>>
 :lifecycle/calls :onyx.plugin.sqs-input/input-calls}

In lieu of :sqs/queue-name, the url of the queue can be suppied via :sqs/queue-url.

SQS only supports batching up to 10 messages, which limits :onyx/batch-size to a maximum of 10.

Attributes

key type description
:sqs/deserializer-fn keyword A keyword pointing to a fully qualified function that will deserialize the :body of the queue entry from a string
:sqs/region string The SQS region to use
:sqs/queue-name string The SQS queue name
:sqs/queue-url string The SQS queue url, in lieu of sqs/queue-name
:sqs/attribute-names [string] A list of attributes to fetch for the queue entry. Default is to fetch no attributes. You can override it to fetch specific attributes or all (["All])
:sqs/message-attribute-names [string] A list of attributes to fetch for each message. Default is to fetch no message attributes. You can override it to fetch specific message attributes or all (["All])
:sqs/max-batch string Maximum number of messages read in a given batch.
:sqs/max-inflight-receive-batches string Maximum number of batches to buffer up. :sqs/max-batch * :sqs/max-inflight-receive-batches = number of messages buffered by client before read by onyx.

Possible attribute names can be found in the API documentation. And more about message attributes at API documentation.

Output Task

Catalog entry:

{:onyx/name <<TASK_NAME>>
 :onyx/plugin :onyx.plugin.sqs-output/output
 :sqs/queue-name <<OPTIONAL_QUEUE_NAME>>
 :sqs/region "us-east-1"
 :sqs/serializer-fn :clojure.core/pr-str
 :onyx/type :output
 :onyx/medium :sqs
 :onyx/batch-size 10
 :onyx/doc "Writes segments to SQS queues"}

Lifecycle entry:

{:lifecycle/task <<TASK_NAME>>
 :lifecycle/calls :onyx.plugin.sqs-output/output-calls}

Segments received at this task must have a body key in string form, which will be written to the queue defined in the key :sqs/queue-name, OR :sqs/queue-url in the task-map, or via the key :queue-url in the segment. Note that queue-name and queue-url are in different formats, with the queue-name being in the form "yourqueuename" and queue-url in the form https://sqs.us-east-1.amazonaws.com/039384834151/e3668c38-4.

SQS only supports batching up to 10 messages, which limits :onyx/batch-size to a maximum of 10.

Attributes

key type description
:sqs/region string The SQS region to use
:sqs/queue-name string Optional SQS queue name. If not present, segments will be routed via the :queue-url key of the segment
:sqs/queue-url string Optional SQS queue url. If not present, segments will be routed via the :queue-url key of the segment
:sqs/serializer-fn keyword A keyword pointing to a fully qualified function that will serialize the :body key of the segment to a string

AWS Role Privileges

The AWS role running Onyx should have the following AWS privileges:

  • sqs:SendMessage*
  • sqs:ReceiveMessage
  • sqs:DeleteMessage*
  • sqs:ChangeMessageVisibility*
  • sqs:GetQueueAttributes
  • sqs:GetQueueUrl

Acknowledgments

Many thanks to LockedOn for allowing this work to be open sourced and contributed back to the Onyx Platform community.

Contributing

Pull requests into the master branch are welcomed.

License

Copyright © 2016 Distributed Masonry LLC

Distributed under the Eclipse Public License, the same as Clojure.