Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cmd set #16

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions internal/command/a.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ type RHash interface {
Values(key string) ([]core.Value, error)
}

// RSet is a set repository.
type RSet interface {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better not to rush things with the command package. Let's deal with rset first.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't I implement the internal/command part?

Add(key string, elems ...any) (int, error)
}

// Redka is an abstraction for *redka.DB and *redka.Tx.
// Used to execute commands in a unified way.
type Redka struct {
key RKey
str RStr
hash RHash
set RSet
}

// RedkaDB creates a new Redka instance for a database.
Expand All @@ -127,6 +133,7 @@ func RedkaDB(db *redka.DB) Redka {
key: db.Key(),
str: db.Str(),
hash: db.Hash(),
set: db.Set(),
}
}

Expand All @@ -136,6 +143,7 @@ func RedkaTx(tx *redka.Tx) Redka {
key: tx.Key(),
str: tx.Str(),
hash: tx.Hash(),
set: tx.Set(),
}
}

Expand All @@ -154,6 +162,10 @@ func (r Redka) Hash() RHash {
return r.hash
}

func (r Redka) Set() RSet {
return r.set
}

type baseCmd struct {
name string
args [][]byte
Expand Down Expand Up @@ -293,6 +305,10 @@ func Parse(args [][]byte) (Cmd, error) {
case "hvals":
return parseHVals(b)

// set
case "sadd":
return parseSAdd(b)

default:
return parseUnknown(b)
}
Expand Down
33 changes: 33 additions & 0 deletions internal/command/sadd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package command

// SAdd adds one or more elements to set.
// SADD key elem [elem...]
// For more information: https://redis.io/docs/latest/commands/sadd/
type SAdd struct {
baseCmd
key string
elems []any
}

// parseSAdd parses SAdd command and creates *SAdd object.
func parseSAdd(b baseCmd) (*SAdd, error) {
cmd := &SAdd{baseCmd: b}
if len(cmd.args) < 2 {
return nil, ErrInvalidArgNum
}
cmd.key = string(cmd.args[0])
for _, arg := range cmd.args[1:] {
cmd.elems = append(cmd.elems, arg)
}
return cmd, nil
}

func (cmd *SAdd) Run(w Writer, red Redka) (any, error) {
n, err := red.Set().Add(cmd.key, cmd.elems...)
if err != nil {
w.WriteError(cmd.Error(err))
return nil, err
}
w.WriteInt(n)
return n, nil
}
27 changes: 27 additions & 0 deletions internal/command/sadd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package command

import (
"reflect"
"testing"
)

func TestParseSAdd(t *testing.T) {
tests := []struct {
name string
err error
}{
{},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseSAdd(tt.args.b)
if (err != nil) != tt.wantErr {
t.Errorf("parseSAdd() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseSAdd() got = %v, want %v", got, tt.want)
}
})
}
}
25 changes: 25 additions & 0 deletions internal/rset/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package rset

import (
"database/sql"
"github.com/nalgeon/redka/internal/sqlx"
)

// DB is a database-backed set repository.
// A set is a slice of elements associated with a key.
// Use the set repository to work with individual sets
type DB struct {
*sqlx.DB[*Tx]
}

// New connects to the set repository.
// Does not create the database schema.
func New(db *sql.DB) *DB {
d := sqlx.New(db, NewTx)
return &DB{d}
}

func (d *DB) Add(key string, elems ...any) (int, error) {
tx := NewTx(d.SQL)
return tx.Add(key, elems...)
Comment on lines +23 to +24
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add modifies the database, so it should be called inside a transaction (see rzset.DB.Add for reference).

}
65 changes: 65 additions & 0 deletions internal/rset/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rset

import (
"database/sql"
"github.com/nalgeon/redka/internal/core"
"github.com/nalgeon/redka/internal/sqlx"
"time"
)

const sqlAdd1 = `
insert into rkey (key, type, version, etime, mtime)
values (:key, :type, :version, :etime, :mtime)
on conflict (key) do update set
version = version + 1,
type = excluded.type,
etime = excluded.etime,
mtime = excluded.mtime
;`

const sqlAdd2 = `insert into rset (key_id, elem)
values ((select id from rkey where key = :key), :value)`

// Tx is a set repository transaction.
type Tx struct {
tx sqlx.Tx
}

func NewTx(tx sqlx.Tx) *Tx {
return &Tx{tx}
}

// Add adds key elems to set.
func (t *Tx) Add(key string, elems ...any) (int, error) {
return t.add(key, elems...)

}

func (t *Tx) add(key string, elems ...any) (int, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the need for this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we may need to reuse the same method for other commands, but I might be overengineering. I can remove it.

now := time.Now()

var args [][]any
for _, elem := range elems {
args = append(args, []any{
sql.Named("key", key),
sql.Named("type", core.TypeSet),
sql.Named("version", core.InitialVersion),
sql.Named("elem", elem),
sql.Named("etime", now),
sql.Named("mtime", now.UnixMilli()),
})
}

for _, arg := range args {
_, err := t.tx.Exec(sqlAdd1, arg)
if err != nil {
return 0, sqlx.TypedError(err)
}
_, err = t.tx.Exec(sqlAdd2, arg)
if err != nil {
return 0, err
}
}

return len(args), nil
}
24 changes: 24 additions & 0 deletions internal/sqlx/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,27 @@ select
from rkey join rzset on rkey.id = rzset.key_id
where rkey.type = 5
and (rkey.etime is null or rkey.etime > unixepoch('subsec'));

-- set

create table if not exists
rset (
key_id integer not null,
elem blob not null,

foreign key (key_id) references rkey (id)
on delete cascade
);

create unique index if not exists
rset_pk_idx on rset (key_id, elem);

create view if not exists
vset as
select
rkey.id as key_id, rkey.key, rset.elem,
datetime(etime/1000, 'unixepoch') as etime,
datetime(mtime/1000, 'unixepoch') as mtime
from rkey join rset on rkey.id = rset.key_id
where rkey.type = 3
and (rkey.etime is null or rkey.etime > unixepoch('subsec'))
12 changes: 11 additions & 1 deletion redka.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package redka
import (
"context"
"database/sql"
"github.com/nalgeon/redka/internal/rset"
"io"
"log/slog"
"time"
Expand Down Expand Up @@ -64,6 +65,7 @@ type DB struct {
keyDB *rkey.DB
stringDB *rstring.DB
hashDB *rhash.DB
setDB *rset.DB
zsetDB *rzset.DB
bg *time.Ticker
log *slog.Logger
Expand Down Expand Up @@ -98,7 +100,6 @@ func Open(path string, opts *Options) (*DB, error) {
keyDB: rkey.New(db),
stringDB: rstring.New(db),
hashDB: rhash.New(db),
zsetDB: rzset.New(db),
log: opts.Logger,
}
rdb.bg = rdb.startBgManager()
Expand Down Expand Up @@ -137,6 +138,10 @@ func (db *DB) Key() *rkey.DB {
return db.keyDB
}

func (db *DB) Set() *rset.DB {
return db.setDB
}

// Update executes a function within a writable transaction.
// See the [tx] example for details.
//
Expand Down Expand Up @@ -216,6 +221,7 @@ type Tx struct {
keyTx *rkey.Tx
strTx *rstring.Tx
hashTx *rhash.Tx
setTx *rset.Tx
zsetTx *rzset.Tx
}

Expand All @@ -234,6 +240,10 @@ func (tx *Tx) Str() *rstring.Tx {
return tx.strTx
}

func (tx *Tx) Set() *rset.Tx {
return tx.setTx
}

// Keys returns the key transaction.
func (tx *Tx) Key() *rkey.Tx {
return tx.keyTx
Expand Down