Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1890 deleted entries #1929

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion embedded/document/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SPDX-License-Identifier: BUSL-1.1
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://mariadb.com/bsl11/
https://mariadb.com/bsl11/

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -1107,6 +1107,10 @@ func (e *Engine) AuditDocument(ctx context.Context, collectionName string, docID

for _, valRef := range valRefs {
docAtRevision, err := e.getDocument(searchKey, valRef, includePayload)
if errors.Is(err, store.ErrDeletedEntry) {
continue
}

if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions embedded/document/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SPDX-License-Identifier: BUSL-1.1
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://mariadb.com/bsl11/
https://mariadb.com/bsl11/

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -628,7 +628,7 @@ func TestDocumentAudit(t *testing.T) {

res, err = engine.AuditDocument(context.Background(), collectionName, docID, false, 0, 10, true)
require.NoError(t, err)
require.Len(t, res, 3)
require.Len(t, res, 1)
}

func TestQueryDocuments(t *testing.T) {
Expand Down
222 changes: 222 additions & 0 deletions embedded/store/deleted_entries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package store

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestDeletedEntryIsNotAccesibleFromReadEntry(t *testing.T) {
var key = []byte("test-key")

immuStore, err := Open(t.TempDir(), DefaultOptions())
require.NoError(t, err)
require.NotNil(t, immuStore)

defer immustoreClose(t, immuStore)

setKeys(t, immuStore, key, 1)

entry, _, err := immuStore.ReadTxEntry(1, key, true)
require.NoError(t, err)

val, err := immuStore.ReadValue(entry)
require.NoError(t, err)
require.Equal(t, val, []byte("test-value-0"))

deleteKey(t, immuStore, key)

val, err = immuStore.ReadValue(entry)
require.ErrorIs(t, err, ErrDeletedEntry)
require.Nil(t, val)
}

func TestDeletedEntriesAreNotAccessibleFromGetBetween(t *testing.T) {
var (
nRecordsBeforeDelete = 100
key = []byte("test-key")
)

immuStore, err := Open(t.TempDir(), DefaultOptions())
require.NoError(t, err)
require.NotNil(t, immuStore)

defer immustoreClose(t, immuStore)

setKeys(t, immuStore, key, nRecordsBeforeDelete)
deleteKey(t, immuStore, key)

for i := 0; i < nRecordsBeforeDelete; i++ {
valRef, err := immuStore.GetBetween(context.Background(), key, 0, uint64(i))
require.NoError(t, err)

val, err := valRef.Resolve()
require.Nil(t, val)
require.ErrorIs(t, err, ErrDeletedEntry)
}

valRef, err := immuStore.GetBetween(context.Background(), key, 0, uint64(nRecordsBeforeDelete+1))
require.NoError(t, err)

val, err := valRef.Resolve()
require.NoError(t, err)
require.Nil(t, val)

md := valRef.KVMetadata()
require.NotNil(t, md)
require.True(t, md.Deleted())
}

func TestDeletedEntriesAreNotAccessibleFromHistory(t *testing.T) {
var (
nRecordsBeforeDelete = 100
nRecordsAfterDelete = 10
key = []byte("test-key")
)

immuStore, err := Open(t.TempDir(), DefaultOptions())
require.NoError(t, err)
require.NotNil(t, immuStore)

defer immustoreClose(t, immuStore)

setKeys(t, immuStore, key, nRecordsBeforeDelete)

valRefs, _, err := immuStore.History(key, 0, false, nRecordsBeforeDelete)
require.NoError(t, err)
assertValuesAreVisible(t, valRefs, false)

deleteKey(t, immuStore, key)
setKeys(t, immuStore, key, nRecordsAfterDelete)

t.Run("requesting deleted records only", func(t *testing.T) {
valRefs, _, err = immuStore.History(key, uint64(nRecordsBeforeDelete/2), false, nRecordsBeforeDelete/2)
require.NoError(t, err)

assertValuesAreDeleted(t, valRefs)

valRefs, _, err = immuStore.History(key, uint64(nRecordsAfterDelete+1), true, nRecordsBeforeDelete)
require.NoError(t, err)

assertValuesAreDeleted(t, valRefs)
})

t.Run("requesting deleted and non deleted records", func(t *testing.T) {
valRefs, _, err = immuStore.History(key, uint64(nRecordsBeforeDelete/2), false, nRecordsBeforeDelete+1+nRecordsAfterDelete)
require.NoError(t, err)

assertValuesAreDeleted(t, valRefs[:(nRecordsBeforeDelete/2)])
assertIsDeleteEntry(t, valRefs[nRecordsBeforeDelete/2])
assertValuesAreVisible(t, valRefs[(nRecordsBeforeDelete/2+1):], false)

valRefs, _, err = immuStore.History(key, 0, true, nRecordsBeforeDelete+1+nRecordsAfterDelete)
require.NoError(t, err)

assertValuesAreVisible(t, valRefs[:nRecordsAfterDelete], true)
assertIsDeleteEntry(t, valRefs[nRecordsAfterDelete])
assertValuesAreDeleted(t, valRefs[nRecordsAfterDelete+1:])
})
}

func assertIsDeleteEntry(t *testing.T, valRef ValueRef) {
require.NotNil(t, valRef.KVMetadata())
require.True(t, valRef.KVMetadata().Deleted())

value, err := valRef.Resolve()
require.NoError(t, err)
require.Nil(t, value)
}

func TestExpiredEntryIsNotAccessibleFromHistory(t *testing.T) {
var (
key = []byte("test-key")
nRecords = 100
)

immuStore, err := Open(t.TempDir(), DefaultOptions())
require.NoError(t, err)
require.NotNil(t, immuStore)

defer immustoreClose(t, immuStore)

expiredRecordIndex := rand.Intn(nRecords)
setKeys(t, immuStore, key, expiredRecordIndex)

tx, err := immuStore.NewWriteOnlyTx(context.Background())
require.NoError(t, err)

md := NewKVMetadata()
err = md.ExpiresAt(time.Now().Add(-time.Second))
require.NoError(t, err)

err = tx.Set(key, md, []byte("expired-value"))
require.NoError(t, err)

_, err = tx.Commit(context.Background())
require.NoError(t, err)

setKeys(t, immuStore, key, nRecords-expiredRecordIndex-1)

valRefs, _, err := immuStore.History(key, 0, false, nRecords)
require.NoError(t, err)
require.Len(t, valRefs, nRecords)

for i, valRef := range valRefs {
value, err := valRef.Resolve()
if i == expiredRecordIndex {
require.ErrorIs(t, err, ErrExpiredEntry)
require.Nil(t, value)
} else {
require.NoError(t, err)
}
}
}

func assertValuesAreVisible(t *testing.T, valRefs []ValueRef, reverse bool) {
for i, valRef := range valRefs {
value, err := valRef.Resolve()
require.NoError(t, err)

expectedIdx := i
if reverse {
expectedIdx = len(valRefs) - 1 - i
}
require.Equal(t, []byte(fmt.Sprintf("test-value-%d", expectedIdx)), value)
}
}

func assertValuesAreDeleted(t *testing.T, valRefs []ValueRef) {
for i := 0; i < len(valRefs)-1; i++ {
value, err := valRefs[i].Resolve()
require.Nil(t, value)
require.ErrorIs(t, err, ErrDeletedEntry)
}
}

func setKeys(t *testing.T, store *ImmuStore, key []byte, nRecords int) {
for i := 0; i < nRecords; i++ {
tx, err := store.NewWriteOnlyTx(context.Background())
require.NoError(t, err)

err = tx.Set(key, nil, []byte(fmt.Sprintf("test-value-%d", i)))
require.NoError(t, err)

_, err = tx.Commit(context.Background())
require.NoError(t, err)
}
}

func deleteKey(t *testing.T, store *ImmuStore, key []byte) {
tx, err := store.NewTx(context.Background(), DefaultTxOptions())
require.NoError(t, err)

err = tx.Delete(context.Background(), key)
require.NoError(t, err)

_, err = tx.Commit(context.Background())
require.NoError(t, err)
}
54 changes: 44 additions & 10 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var ErrTxSizeGreaterThanMaxTxSize = errors.New("tx size greater than max tx size
var ErrCorruptedAHtree = errors.New("appendable hash tree is corrupted")
var ErrKeyNotFound = tbtree.ErrKeyNotFound // TODO: define error in store layer
var ErrExpiredEntry = fmt.Errorf("%w: expired entry", ErrKeyNotFound)
var ErrDeletedEntry = fmt.Errorf("%w: deleted entry", ErrKeyNotFound)
var ErrKeyAlreadyExists = errors.New("key already exists")
var ErrTxNotFound = errors.New("tx not found")
var ErrNoMoreEntries = tbtree.ErrNoMoreEntries // TODO: define error in store layer
Expand Down Expand Up @@ -917,12 +918,20 @@ func (s *ImmuStore) GetBetween(ctx context.Context, key []byte, initialTxID uint
return nil, err
}

indexedVal, tx, hc, err := indexer.GetBetween(key, initialTxID, finalTxID)
indexedVal, tx, lastDeleteAtTx, hc, err := indexer.GetBetween(key, initialTxID, finalTxID)
if err != nil {
return nil, err
}

return s.valueRefFrom(tx, hc, indexedVal)
valRef, err = s.valueRefFrom(tx, hc, indexedVal)
if err != nil {
return nil, err
}

if lastDeleteAtTx > finalTxID {
return &unreadableValueRef{ValueRef: valRef}, nil
}
return valRef, nil
}

func (s *ImmuStore) Get(ctx context.Context, key []byte) (valRef ValueRef, err error) {
Expand All @@ -938,7 +947,7 @@ func (s *ImmuStore) GetWithFilters(ctx context.Context, key []byte, filters ...F
return nil, err
}

indexedVal, tx, hc, err := indexer.Get(key)
indexedVal, tx, _, hc, err := indexer.Get(key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1010,11 +1019,10 @@ func (s *ImmuStore) History(key []byte, offset uint64, descOrder bool, limit int
if errors.Is(err, ErrIndexNotFound) {
return nil, 0, ErrKeyNotFound
}

return nil, 0, err
}

timedValues, hCount, err := indexer.History(key, offset, descOrder, limit)
timedValues, lastDeleteAtTx, hCount, err := indexer.History(key, offset, descOrder, limit)
if err != nil {
return nil, 0, err
}
Expand All @@ -1041,6 +1049,15 @@ func (s *ImmuStore) History(key []byte, offset uint64, descOrder bool, limit int
}
}

for i, valRef := range valRefs {
md := valRef.KVMetadata()
isExpired := md != nil && md.ExpiredAt(time.Now())
isDeleted := lastDeleteAtTx > valRef.Tx()

if isDeleted || isExpired {
valRefs[i] = &unreadableValueRef{ValueRef: valRef}
}
}
return valRefs, hCount, nil
}

Expand Down Expand Up @@ -3063,7 +3080,7 @@ func (s *ImmuStore) ReadTxEntry(txID uint64, key []byte, skipIntegrityCheck bool
return nil, nil, err
}

e := &TxEntry{k: make([]byte, s.maxKeyLen)}
e := &TxEntry{txID: txID, k: make([]byte, s.maxKeyLen)}

for i := 0; i < header.NEntries; i++ {
err = tdr.readEntry(e)
Expand All @@ -3078,7 +3095,7 @@ func (s *ImmuStore) ReadTxEntry(txID uint64, key []byte, skipIntegrityCheck bool
ret = e

// Allocate new placeholder for scanning the rest of entries
e = &TxEntry{k: make([]byte, s.maxKeyLen)}
e = &TxEntry{txID: txID, k: make([]byte, s.maxKeyLen)}
}
}
if ret == nil {
Expand Down Expand Up @@ -3113,6 +3130,25 @@ func (s *ImmuStore) ReadValue(entry *TxEntry) ([]byte, error) {
return nil, ErrExpiredEntry
}

indexer, err := s.getIndexerFor(entry.key())
if err != nil && !errors.Is(err, ErrIndexNotFound) {
return nil, err
}

if !errors.Is(err, ErrIndexNotFound) {
_, _, lastDeleteAtTx, _, err := indexer.Get(entry.key())
if err != nil {
if errors.Is(err, ErrKeyNotFound) {
return nil, nil
}
return nil, err
}

if lastDeleteAtTx > entry.txID {
return nil, ErrDeletedEntry
}
}

if entry.vLen == 0 {
// while not required, nil is returned instead of an empty slice

Expand All @@ -3124,11 +3160,9 @@ func (s *ImmuStore) ReadValue(entry *TxEntry) ([]byte, error) {

b := make([]byte, entry.vLen)

_, err := s.readValueAt(b, entry.vOff, entry.hVal, false)
if err != nil {
if _, err := s.readValueAt(b, entry.vOff, entry.hVal, false); err != nil {
return nil, err
}

return b, nil
}

Expand Down