Skip to content

Releases: ArroyoSystems/arroyo

v0.10.3

11 Jun 16:03
Compare
Choose a tag to compare

0.10.3 is a patch release containing a couple of fixes on top of 0.10.2. It is a drop-in replacement for clusters running any patch release of 0.10.

What's changed

  • Add table for kafka exactly-once state by @mwylde in #655
  • Add option to deploy kubernetes programs as configmaps rather than env vars by @mwylde in #654

Full Changelog: v0.10.2...v0.10.3

Note that to take advantage of the fix for the issue in #654, the helm chart must be upgraded to 0.10.3 as well (the existing helm chart will work with the new image, however, the fix is disabled as it requires additional k8s permissions).

v0.10.2

08 May 18:37
Compare
Choose a tag to compare

0.10.2 is a patch release containing several fixes on top of 0.10.1. It is a drop-in replacement for clusters running 0.10.0 or 0.10.1.

What's Changed

Full Changelog: v0.10.1...v0.10.2

v0.10.1

01 May 17:03
Compare
Choose a tag to compare

0.10.1 is a patch release containing several fixes on top of 0.10.0. It is a drop-in replacement for clusters running 0.10.0.

Full Changelog: v0.10.0...v0.10.1

v0.10.0

25 Apr 17:18
Compare
Choose a tag to compare

These release notes are also available on the Arroyo blog.

Arroyo 0.10.0 is a major release of the Arroyo stream processing engine, featuring an entirely new SQL engine that's >3x faster and ships as a single binary. Plus NATS and MQTT connectors, more SQL features, and more.

NATS

NATS is a messaging and queueing system designed for simplicity and performance. Arroyo 0.10 adds sources and sinks for Core NATS and NATS Jetstream, which layers on persistence and delivery guarantees.

For example, to consume from a NATS Jetstream subject, we can use this DDL:

CREATE TABLE logs (
    id BIGINT NOT NULL,
    time TIMESTAMP NOT NULL,
    host TEXT NOT NULL,
    level TEXT NOT NULL,
    message TEXT NOT NULL
) with (
    type = 'source',
    connector = 'nats',
    servers = 'localhost:4222',
    'nats.subject' = 'logs',
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json'
);

See the connector docs for more details.

Thanks to Quentin Gaborit for this incredible contribution!

  • Add NATS source and sink connectors by @gbto in #578

MQTT

MQTT is a lightweight messaging protocol widely used with low-power devices and in the Internet of Things (IoT) space. Arroyo 0.10 now ships with an MQTT source and sink to consume and produce from MQTT brokers.

For example, you can create an MQTT source with the following SQL

CREATE TABLE devices (
    device_id BIGINT NOT NULL,
    time TIMESTAMP NOT NULL,
    lat FLOAT NOT NULL,
    lng FLOAT NOT NULL,
    metadata JSON
) with (
    connector = 'mqtt',
    url = 'tcp://localhost:1883',
    type = 'source',
    topic = 'events',
    format = 'json'
);

See the connector docs for more details.

Thanks to community member Giovanny Gutiérrez (@bakjos) for this amazing contribution!

<span style={{fontFamily: "monospace"}}>$ arroyo

With the new architecture in 0.10, the entire Arroyo system now builds as a single binary

$ arroyo
Usage: arroyo <COMMAND>

Commands:
  api         Starts an Arroyo API server
  controller  Starts an Arroyo Controller
  cluster     Starts a complete Arroyo cluster
  worker      Starts an Arroyo worker
  compiler    Starts an Arroyo compiler
  node        Starts an Arroyo node server
  migrate     Runs database migrations on the configure Postgres database
  help        Print this message or the help of the given subcommand(s)

Options:
  -h, --help     Print help
  -V, --version  Print version

Running a complete local cluster is as easy as

$ arroyo cluster
INFO arroyo_controller: Using process scheduler
INFO arroyo_server_common: Starting cluster admin server on 0.0.0.0:8001
INFO arroyo_controller: Starting arroyo-controller on 0.0.0.0:9190
INFO arroyo_api: Starting API server on 0.0.0.0:8000
INFO arroyo_compiler_service: Starting compiler service at 0.0.0.0:9000

Today we are providing pre-compiled binaries on the release page for Linux and MacOS, or you can build your own by following the dev instructions.

Note that Arroyo currently requires a running Postgres database for configuration, but in the next release we will be adding the option of using sqlite. Prometheus is also used to power the pipeline metrics in the Web UI if available.

  • Add a new binary crate 'arroyo-bin' to package the entire system by @mwylde in #514

Performance

Arroyo 0.10 is significantly faster than 0.9.

In fact, it's so fast that for many benchmarks we've run on EC2 instances, it can saturate the network before using all the available CPU. We'll be following up with rigorous benchmarks against other systems, but in early tests it significantly beats other streaming systems in throughput.

How has it gotten so much faster? This is deserving of an entire blog post, but the short version is that Arroyo now operates internally on columnar data using carefully tuned vector kernels, thanks to the hard work of the Apache Arrow and DataFusion communities.

Columnar data is now the standard for OLAP (analytics-oriented) query engines like ClickHouse, Pinot, and Presto. There are a few reasons for this:

  • By storing all values in a column together, you can achieve better compression ratios and make better use of CPU cache
  • Only the columns actually referenced in a query need to be read, reducing disk and network IO
  • Columnar processing aligns well with the vector capabilities in modern CPUs, providing 3x or more speedups

However, row-oriented data remains the standard for streaming engines. There are some inherent tradeoffs between latency (how quickly an event can traverse through the pipeline) and throughput (how many events can be processed with a given amount of CPU). By batching data we can get higher throughput at the expense of latency. And columnar representations require that we batch a number of events together before we see performance improvements (in fact, with a small number of rows columnar processing will be much slower due to fixed overhead).

But there's a strong argument for moving to columnar representations for streaming: at any given batch size, the higher the throughput the less time we must wait to fill that batch. For example, if we want at least 100 records in our batch to overcome fixed costs, the amount of time we need to wait to receive 100 records will depend on our throughput:

  • At 10 events/second, it takes 1 second
  • At 1,000 — 0.01 seconds (100ms)
  • At 1,000,000 — 0.0001 (0.1ms)

Or looking at it from a fixed latency perspective (say, waiting at most 10ms):

  • At 10 events/second, our batch size is 1
  • At 1,000 — 100
  • At 1,000,000 — 100,000

In Arroyo 0.10, we perform batching at the source and via aggregating operators like windows and joins. The source batching behavior can be configured via two environment variables:

  • BATCH_SIZE which controls the maximum batch size
  • BATCH_LINGER_MS which controls the maximum amount of time to wait (in milliseconds) before emitting a batch

By configuring these values, users can choose their own tradeoff between latency and throughput.

SQL Improvements

Functions

We've added support for over 200 new scalar, aggregate, and window SQL functions. These unlock many powerful new capabilities that previously would have been inexpressible or would have required writing a UDF.

There are too many new functions to highlight them all, but some exciting additions include:

  • Nearly 200 scalar functions, covering math, conditions, strings, regex, JSON, times, and more
  • Statistical and approximate aggregate functions like approx_percentile_cont and approx_distinct, which uses HyperLogLog to produce very memory-efficient estimate distinct counts
  • Many new SQL window functions including rank, percent_rank, lag, and lead

Check out the Scalar, Aggregate,
and Window function docs for the complete list.

Arrays

Previously, Arroyo supported a limited set of operations on arrays, but only within certain contexts. Now we have complete array support, including a comprehensive set of array function, support for indexing via square brackets (like a[1]), support for serializing arrays as JSON and Avro, the ability to convert aggregates into arrays (via array_agg), and the ability to unroll arrays into separate rows via unnest.

For example, we can write unique IPs over a tumbling window as an array to Kafka with a query like this:

CREATE TABLE sink (
  values TEXT[]
) with (
  connector = 'confluent',
  connection_profile = 'confluent-working',
  format = 'avro',
  'avro.confluent_schema_registry' = 'true',
  topic = 'list_output',
  type = 'sink'
);

INSERT INTO sink
SELECT array_agg(DISTINCT ip) FROM logs
GROUP BY tumble(interval '10 seconds');

Source Fusion

Arroyo 0.10 includes new optimizations that reduce the amount of data read from a single source when it's used across multiple queries. Now, each source will only be read once rather than once per subquery.

This same optimization also applies to view, so compute can be reused across outputs.

Upgrading to 0.10

There are some backwards-incompatible changes in 0.10's SQL. Some pipelines written for 0.9 will need to be updated as described here.

Virtual fields

The syntax for virtual fields has changed to more closely match Postgres: we now require the STORED keyword. Previously:

event_time TIMESTAMP GENERATED ALWAYS AS (CAST(date as TIMESTAMP))

Now:

event_time TIMESTAMP GENERATED ALWAYS AS (CAST(date as TIMESTAMP)) STORED

UDFs

UDF definitions have changed slightly; now the UDF function must be annotated with an attribute macro #[arroyo_udf_plugin::udf], for example:

use arroyo_udf_plugin::udf;

#[udf]
fn plus_one(x: i64) -> i64 {
    x + 1
}

T...

Read more

v0.9.1

02 Feb 21:33
Compare
Choose a tag to compare

0.9.1 is a patch release containing several fixes on top of 0.9.0. It is a drop-in replacement for clusters running 0.9.0.

  • Upgrade framer-motion to fix UDF popover by @jbeisen in #499
  • Fix cases where kafka client_config setting was not used by @mwylde in #520

Full Changelog: v0.9.0...v0.9.1

V0.9.0

17 Jan 16:47
Compare
Choose a tag to compare

These release notes are also available on the Arroyo blog.

Arroyo 0.9.0 introduces async UDFs, which allow users to use databases, services, and models from within their pipelines. It also brings support for joining update tables, more control over bad data handling, a redesigned connection profile editor, and more.

For this release, we are thrilled to welcome two new contributors to the Arroyo project:

Thanks to all our contributors for this release:

Async UDFs

User-defined functions (UDFs) and user-defined aggregate functions (UDAFs) allow you to extend Arroyo with custom logic. New in Arroyo 0.9 is support for what we call async UDFs.

Existing (sync) UDFs are expected to implement simple, computational logic. Common use cases include parsing custom formats, implementing functions that are not natively supported, or implementing custom business logic that would be difficult to express in SQL. Because they are run synchronously with processing, if they take too long to run they can block the entire pipeline.

This isn't a hypothetical concern. A UDF that takes 10ms to run will be limited to processing just 100 events per second per subtask!

And there are many use cases where you want to run logic that is not a simple, stateless computation. You may need to do point lookups in a database to enrich events, make an API call to another service, or even perform inference against an AI model.

Async UDFs allow you to do all of these things, without blocking the pipeline. Async UDFs are defined as a Rust async fn, supporting non-blocking IO. Then within the Arroyo runtime, many instances of the UDF can be run in parallel, with a configurable concurrency limit.

What does this look like? Let's take an example of a UDF that enriches web events
with GeoIP data by calling a GeoIP service. First, we define the UDF:

/*
[dependencies]
reqwest = { version = "0.11.23", features = ["json"] }
serde_json = "1"

[udfs]
async_max_concurrency = 1000
*/

pub async fn get_city(ip: String) -> Option<String> {
  let body: serde_json::Value =
    reqwest::get(
      format!("http://geoip-service:8000/{ip}"))
        .await
        .ok()?
        .json()
        .await
        .ok()?;

    body.pointer("/names/en").and_then(|t|
      t.as_str()
    ).map(|t| t.to_string())
}

Then we can use this UDF in a query, for example this one that finds the most common cities in the last 15 minutes:

create view cities as
select get_city(logs.ip) as city
from logs;

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        city,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM cities
            WHERE city IS NOT NULL
            group by city, window)) WHERE row_num <= 5;

Async UDFs support several configuration options that control their behavior, including the maximum number of concurrent requests, timeouts, and whether event order should be preserved.

They also support defining a Context struct which is passed in to each invocation of the UDF. This allows you to do setup for expensive operations (like setting up a Database connection pool) just once, and share the result with all invocations.

For example, we can create a client for Postgres like this:

pub struct Context {
    client: RwLock<Option<Client>>,
}

impl Context {
    pub fn new() -> Self {
        Self {
            client: RwLock::new(None),
        }
    }
}

#[async_trait]
impl arroyo_types::UdfContext for Context {
    async fn init(&self) {
        let conn_str = "host=localhost user=dbuser password=dbpassword dbname=my_db";

        let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();

        let mut c = self.client.write().await;
        *c = Some(client);

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                println!("connection error: {}", e);
            }
        });
    }
}

See the docs for full details.

Joining Update tables

Arroyo has two semantics for streaming SQL, one based on event-time watermarks and another that we call "Update Tables," which allow for incremental computation of most analytical SQL constructs. However, previously there was a restriction on joins, which were only supported for the watermark semantic.

Now that restriction is gone, and update tables can be joined. Hurray!

For example, this query will count the number of page views by user, and when a transaction comes in joins that accumulated count:

CREATE VIEW page_view_counts as
SELECT count(*) as count, user_id as user_id
FROM page_views
GROUP BY user_id;


SELECT T.user_id, amount, P.count as page_views
FROM transactions T
LEFT JOIN page_view_counts P on T.user_id = P.user_id;

Bad Data handling

It happens to the best of us. You've carefully built out your data architecture, perform rigorous code reviews, and diligently monitor your rollouts.

And yet it happens—somehow, invalid data got into your Kafka topic. Now in Arroyo 0.9, you can now configure the source to drop bad data instead of causing a processing failure.

This behavior can be configured via the Web UI or with the bad_data option when creating a source in SQL. Two options are currently available:

  • fail (default, and behavior in Arroyo 0.8) causes the pipeline to restart when bad data is encountered

  • drop causes the data to be dropped, while logging an error message to the console and incrementing the arroyo_worker_deserialization_errors metric

  • Add control for deserialization error behavior by @jbeisen in #443

  • Add bad data option to create connection form by @jbeisen in #452

Environment variable substitution

Some connectors need to be configured with authentication details or other secret data. Today in Arroyo those secrets are stored in the configuration database (postgres), which is not very secure.

Now in Arroyo 0.9, we're introducing environment-variable substitution for sensitive configuration fields:

This feature allows you to use double curly braces ({{ }}) to reference environment variables, which will get substituted in at run time. For example, if you have an environment variable called WEBHOOK_SECRET you can now include this as a header in a webhook sink like

Authentication: Basic {{ WEBHOOK_SECRET }}

Env variable substitution can be used for both connections created via the Web UI and those created directly in SQL, like:

CREATE TABLE slack (
    value TEXT
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/{{ SLACK_KEY }}',
    method = 'POST',
    headers = 'Content-Type:application/json',
    format = 'json'
);
  • Support environment variable substitution by @jbeisen in #433

Confluent Cloud connector

Confluent is the company founded by the creators of Apache Kafka. They provide a cloud-native distribution of Kafka and serverless Kafka platform. While Arroyo has always supported reading and writing from Kafka, with Arroyo 0.9 we're making it even easier to integrate with your Confluent Cloud topics with the new Confluent Cloud connector.

See the complete integration guide to get started.

Connection Profile UI

Connection Profiles encapsulate common configuration that is shared across multiple connection tables. For example, a single Kafka cluster may have many topics that you would like to consumer or produce to from Arroyo.

We've upgraded the process of creating and managing these profiles. It's now possible to view and delete existing connection profiles, and the whole UI has gotten a little spiffier.

We're also now validating that the profile is good (like ensuring we can talk to Kafka with the provided address and credentials), so you don't discover you mistyped something at the end of the connection creation process.

  • Redesign cluster profile UI and add v...
Read more

v0.8.1

08 Dec 23:07
Compare
Choose a tag to compare

0.8.1 is a patch release, with several bug fixes over 0.8.0. It is a drop-in upgrade for clusters running 0.8.0.

Full Changelog: v0.8.0...v0.8.1

v0.8.0

29 Nov 06:12
Compare
Choose a tag to compare

These release notes are also available on our blog

The Arroyo team is pleased to announce the release of Arroyo 0.8.0. This release includes a number of new connectors, including a FileSystem source, Delta Lake sink, and a Redis sink. There are also other new features like Avro support, global UDFs, and more.

Arroyo is an open-source stream processing engine, that allows anyone to build correct, reliable, and scalable real-time data pipelines using SQL.

Read on for more, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

FileSystem source

Arroyo 0.5 added a high-performance FileSystem sink, capable of writing JSON and Parquet files to local or remote filesystems and object stores like S3. Now in 0.8, we're adding a corresponding FileSystem source.

The FileSystem source can be configured with a directory (on a local filesystem, or a remote object store like S3). It will read all files in that directory and emit them as records for processing, with support for JSON and Parquet files and a variety of compression codecs.

Files are read in parallel, enabling a very high rate of ingestion.

A FileSystem source can be created in SQL like this:

CREATE TABLE logs (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'filesystem',
    type = 'source',
    path = 's3://my-bucket/inputs/events',
    compression_format = 'gzip',
    format = 'json'
)

select * from logs;

In the next release, we will be improving our support for bootstrapping using the FileSystem source, which is the process of creating the initial state of the pipeline from historical data.

For example, you may have a window over 30 days of data. If you start reading today from Kafka, that will take 30 days to fully fill the window. But if you have the historical data available on S3, as is common in many modern data architectures, you can read from both the FileSystem and Kafka sources in parallel with a SQL union.

Full details on the FileSystem source are available in the docs.

Thanks to @rcjmurillo for this major contribution!

Delta Lake Sink

Delta Lake is a popular open-source storage format, with support for ACID transactions, schema enforcement, time travel, and more. It's a great choice for modern data lakes and supported by many popular query engines.

In Arroyo 0.8 we've enhanced our existing FileSystem sink to support Delta Lake, allowing you to write data transactionally to Delta tables.

With Arroyo, a high-performance Kafka to Delta Lake pipeline can be created in SQL as easily as

CREATE TABLE events (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'kafka',
    type = 'source',
    topic = 'events',
    format = 'json'
);

CREATE TABLE deltatable (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'delta',
    path = 's3://arroyo-deltalake/delta_uuid_remote_checkpoints',
    format = 'parquet',
    'filename.strategy' = 'uuid'
);

INSERT INTO deltatable
SELECT * FROM events;

See the Delta Lake docs for more details.

Redis Sink

For applications that rely on real-time data Redis is a popular choice for a fast, in-memory state store. Now Arroyo can sink its results directly to Redis, providing a great option for serving state.

Redis supports a variety of data structures. In the initial release of the Redis sink, we support writing to String tables, Lists, and Hashes. Keys can be constructed dynamically from columns in the record.

For example, to write results to a Redis String table, you can use a SQL statement like

CREATE TABLE redis (
    user_id TEXT,
    count INT
) WITH (
    connector = 'redis',
    type = 'sink',
    format = 'json',
    'address' = 'redis://localhost:6379',
    target = 'string',
    'target.key_prefix' = 'counts.',
    'target.key_column' = 'user_id'
);

INSERT INTO redis
SELECT user_id, count(*)
FROM events
GROUP BY user_id, hop(interval '5 seconds', interval '1 hour');

This will write JSON-encoded values to keys like counts.fred for the user fred.

Writes are performed efficiently using Redis pipelines and can achieve throughputs of millions of writes per second on a single Redis node.

Both standalone and cluster modes are supported.

See the Redis connector docs for more details.

Avro

Arroyo today supports JSON, Parquet, and custom string formats. New in 0.8 is support for Avro, including deep integration with Confluent Schema Registry.

Avro-encoded Kafka sources can be easily created using the Confluent Schema Registry, using the Arroyo UI or API:

From SQL, those sources can be queried like any other:

select * from pizza_orders;

When reading from schema registry sources, Arroyo will use the schema registry to decode the data with full support for schema evolution.

It's also possible to specify your schema directly, although without a schema registry you will not be able to evolve your schemas.

Avro sinks are now supported as well, allowing you to write Avro-encoded data to Kafka and other Avro-compatible systems. When using the Schema Registry, Arroyo will automatically register new schemas as they are encountered.

create table output with (
    connector = 'kafka',
    type = 'sink',
    bootstrap_servers = 'localhost:9092',
    'schema_registry.endpoint' =
      'http://localhost:8081',
    format = 'avro',
    'avro.confluent_schema_registry' = 'true',
    'topic' = 'outputs'
);

insert into output
select * from source;

See the Avro format docs for more details.

Schema Registry improvements

Arroyo supports Confluent Schema Registry for loading and storing schemas of data read and written to Kafka.

We've made several improvementsto our schema registry support in addition to adding Avro support (see above).

  • It's now possible to write JSON data to Kafka using a pre-existing schema that has been registered with the schema registry. This provides a more efficient way to write data to Kafka for use with Kafka Connect and other systems that rely on having a schema (previously Arroyo supported an option to embed the schema in the message, which is less efficient).a
  • We now support using schema registries that require authentication, for example Confluent Cloud.

See the Kafka connector docs for more details.

  • Implement json sink for schema registry, use connection types for sinks, add auth for schema registry by @jacksonrnewhouse in #416

UDFs

UDFs (user-defined functions) and UDAFs (user-defined aggregate functions) have become a widely-used feature of Arroyo, allowing users to extend Arroyo with custom logic written in Rust.

Arroyo 0.8 includes a number of improvements to UDFs, making them easier to use and more powerful.

Global UDFs

In earlier versions of Arroyo, UDFs could be defined within a single pipeline, but could not be easily shared. It's common to have a set of UDFs that are useful across an organization, for example to parse custom formats or implementing business logic.

Now in 0.8, UDFs can be defined globally, and used across multiple pipelines. Along with this, we've also completely reworked the UDF editing experience.

Custom UDF dependencies

UDFs can now depend on a custom set of external Rust crates (libraries) by specifying them as a special comment in the UDF definition. Previously, UDFs had a few built-in dependencies (including serde_json and regex) but now any Rust crate can be used.

For example, the access_log_parser crate provides a comprehensive suite of web server log parsers. It's now possible to write a UDF that uses this crate to parse access logs and extract fields from them.

/*
[dependencies]
access_log_parser = "0.8"
serde_json = "1"
*/

use access_log_parser::{parse, AccessLogError, LogType, LogEntry};
use serde_json::json;
use std::time::{UNIX_EPOCH, Duration};
...
Read more

v0.7.1

24 Oct 18:19
Compare
Choose a tag to compare

0.7.1 is a patch release that includes one fix over v0.7.0:

  • Use udfs crate for compilation in addition to checking (#382 by @mwylde)

v0.7.0

17 Oct 16:36
Compare
Choose a tag to compare

These release notes are also available on our blog.

The Arroyo team is excited to announce the release of Arroyo 0.7.0, the latest version of our open-source stream processing engine. This release includes a number of new features, including custom partitioning for the filesystem sink, message framing and unnest support, unions, state compaction, and more. Our focus on quality also continues, with a more sophisticated correctness test suite that can now test checkpointing and restoration.

Read on for more, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

What's next

With the 0.7 release out, we're already working on the next one. Arroyo 0.8 is targeted for mid-November, and will include a number of new features, including support for Avro, Delta Lake integration, a FileSystem source, saved UDFs, and more. We've also been working on a new distributed state backend, which will allow Arroyo to scale to multi-TB state sizes while maintaining fast restarts and frequently checkpoints.

Anything you'd like to see? Let us know on Discord!

Now on to the details.


Custom partitioning for FileSystem sink

Arroyo 0.5 added a high-performance, transactional filesystem sink which enables
ingestion into data warehouses on S3. The initial version did not support custom partitioning of data, so records were written to a single file per subtask (with time and size-based rollovers).

In many cases, you will get better query performance if you partition your data by a field in your data (like an event type) or by time.

Arroyo 0.7 introduces support for field-based and time-based partitioning, allowing you to optimize your data layout according to your query patterns.

For example you can now create a table like this:

CREATE TABLE file_sink (
    time TIMESTAMP,
    event_type TEXT,
    user_id TEXT,
    count INT
) WITH (
    connector = 'filesystem',
    format = 'parquet',
    path = 's3://arroyo-events/realtime',
    rollover_seconds = '3600',
    time_partition_pattern = 'year=%Y/month=%m/day=%d/hour=%H',
    partition_fields = 'event_type'
);

This will write data to a path like s3://arroyo-events/realtime/year=2023/month=10/day=19/hour=10/event_type=login.

See all of the available options in the docs.

Message Framing

Arroyo now supports defining a framing strategy for messages. This allows users to customize how messages read off of a source are split into records for processing, where previously there was always a one-to-one mapping. This is particularly useful for sources which do not have a built-in framing strategy, such as HTTP APIs.

As an example, Arroyo can now directly consume metrics from a prometheus-compatible application—without using Prometheus! Here's a query that polls a prometheus endpoint (for a node exporter) and computes the CPU usage over a 1 minute sliding window:

create table raw_metrics (
    value TEXT,
    parsed TEXT generated always as (parse_prom(value))
) WITH (
    connector = 'polling_http',
    endpoint = 'http://localhost:9100/metrics',
    format = 'raw_string',
    framing = 'newline',
    emit_behavior = 'changed',
    poll_interval_ms = '1000'
);

create table metrics as
    select extract_json_string(parsed, '$.name') as name,
        cast(extract_json_string(parsed, '$.value') as float) as value,
        get_first_json_object(parsed, '$.labels') as labels
    from raw_metrics;

create table cpu as
select
    extract_json_string(labels, '$.cpu') as cpu,
    extract_json_string(labels, '$.mode') as mode,
    value
from metrics
where name = 'node_cpu_seconds_total';

select sum(usage) from (
    select rate(value) as usage, cpu, mode,
        hop(interval '2 seconds', '60 seconds') as window
    from cpu
    where mode = 'user' or mode = 'system'
    group by cpu, mode, window);

This relies on the following UDFs:

fn parse_prom(s: String) -> Option
fn parse_prom(s: String) -> Option<String> {
    let regex = regex::Regex::new(r"(?P<metric_name>\w+)\{(?P<labels>[^}]+)\}\s+(?P<metric_value>[\d.]+)").unwrap();
    let label_regex = regex::Regex::new(r##"(?P<label>[^,]+)="(?P<value>[^"]+)""##).unwrap();

    let captures = regex.captures(&s)?;

    let name = captures.name("metric_name").unwrap().as_str();
    let labels = captures.name("labels").unwrap().as_str();
    let value = captures.name("metric_value").unwrap().as_str();

    let labels: std::collections::HashMap<String, String> = label_regex.captures_iter(&labels)
        .map(|capture| (
            capture.name("label").unwrap().as_str().to_string(),
            capture.name("value").unwrap().as_str().to_string()
        ))
        .collect();


    Some(serde_json::json!({
        "name": name,
        "labels": labels,
        "value": value
    }).to_string())
}
fn rate(values: Vec) -> Option
fn rate(values: Vec<f32>) -> Option<f32> {
    let start = values.first()?;
    let end = values.last()?;

    Some((end - start) / 60.0)
}

Currently we support framing via newlines (specified as framing = 'newline' in SQL), although we plan to add support for other framing strategies in the future.

See the format docs for more.

  • Add support for framing in message deserialization by @mwylde in #339

SQL unnest

While framing allows you to split a single message into multiple records, this can only be applied at the source and for fairly simple framing rules. For other use cases you may want to do some computation or parsing that produces an array, and then unroll that array into multiple records.

This is what the new unnest operator does. It takes a column of type Array and produces a new record for each element in the array. This is often useful for dealing with JSON data, particularly from web APIs.

For example, the Github API doesn't provide a websocket feed of events, but it does provide a REST API endpoint. We can use the polling_http connector along with unnest to turn that into a stream:

CREATE TABLE raw_events (
    value TEXT
) WITH (
    connector = 'polling_http',
    endpoint = 'https://api.github.com/networks/arroyosystems/events',
    poll_interval_ms = '5000',
    emit_behavior = 'changed',
    headers = 'User-Agent:arroyo/0.7',
    format = 'json',
    'json.unstructured' = 'true'
);

create table events AS (
    select
        extract_json_string(event, '$.id') as id,
        extract_json_string(event, '$.type') as type,
        extract_json_string(event, '$.actor.login') as login,
        extract_json_string(event, '$.repo.name') as repo
    FROM
        (select unnest(extract_json(value, '$[*]'))
            as event from raw_events));

select concat(type, ' from ', login, ' in ', repo) FROM (
    select distinct(id), type, login, repo
    from events
);

SQL union

The union operator allows you to combine the results of multiple queries into a single stream. This is often useful for combining similar data from multiple sources (for example, two kafka streams); it can also be very useful for bootstrapping, which is the process of processing historical data and then switching to a live stream.

For example, we can use union to combine two Kafka topics like this:

create table topic1 (
    value TEXT
) WITH (
    connector = 'kafka',
    topic = 'topic1',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    format = 'raw_string'
);

create table topic2 (
    value TEXT
) WITH (
    connector = 'kafka',
    topic = 'topic2',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    format = 'raw_string'
);

select value from topic1
union all select value from topic2;

State compaction

Arroyo pipelines are stateful; operators like windows and joins need to remember things in order to aggregate across time, and sources and sinks need to remember what data they've already read or written to provide exactly-once semantics. While state is stored in memory for processing, it also needs to be written durably so that it can be recovered in the event of a failure. Today we write this state to local disk or to a remote object store like S3 as Parquet files.

This pro...

Read more