Skip to content

Commit

Permalink
Merge pull request #288 from trozet/implement_wait_method
Browse files Browse the repository at this point in the history
Implement support for wait method in server
  • Loading branch information
dcbw authored Jan 27, 2022
2 parents c5d9765 + d082757 commit a619f0f
Show file tree
Hide file tree
Showing 2 changed files with 350 additions and 2 deletions.
99 changes: 97 additions & 2 deletions server/transact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"fmt"
"reflect"
"time"

"github.com/google/uuid"
"github.com/ovn-org/libovsdb/cache"
Expand Down Expand Up @@ -514,8 +515,102 @@ func (t *Transaction) Delete(database, table string, where []ovsdb.Condition) (o
}
}

func (t *Transaction) Wait(database, table string, timeout *int, conditions []ovsdb.Condition, columns []string, until string, rows []ovsdb.Row) ovsdb.OperationResult {
e := ovsdb.NotSupported{}
func (t *Transaction) Wait(database, table string, timeout *int, where []ovsdb.Condition, columns []string, until string, rows []ovsdb.Row) ovsdb.OperationResult {
start := time.Now()

if until != "!=" && until != "==" {
e := ovsdb.NotSupported{}
return ovsdb.OperationResult{Error: e.Error()}
}

dbModel := t.Model
realTable := dbModel.Schema.Table(table)
if realTable == nil {
e := ovsdb.NotSupported{}
return ovsdb.OperationResult{Error: e.Error()}
}
model, err := dbModel.NewModel(table)
if err != nil {
panic(err)
}

Loop:
for {
var filteredRows []ovsdb.Row
foundRowModels, err := t.rowsFromTransactionCacheAndDatabase(table, where)
if err != nil {
panic(err)
}

m := dbModel.Mapper
for _, rowModel := range foundRowModels {
info, err := dbModel.NewModelInfo(rowModel)
if err != nil {
panic(err)
}

foundMatch := true
for _, column := range columns {
columnSchema := info.Metadata.TableSchema.Column(column)
for _, r := range rows {
i, err := dbModel.NewModelInfo(model)
if err != nil {
panic(err)
}
err = dbModel.Mapper.GetRowData(&r, i)
if err != nil {
panic(err)
}
x, err := i.FieldByColumn(column)
if err != nil {
panic(err)
}

// check to see if field value is default for given rows
// if it is default (not provided) we shouldn't try to compare
// for equality
if ovsdb.IsDefaultValue(columnSchema, x) {
continue
}
y, err := info.FieldByColumn(column)
if err != nil {
panic(err)
}
if !reflect.DeepEqual(x, y) {
foundMatch = false
}
}
}

if foundMatch {
resultRow, err := m.NewRow(info)
if err != nil {
panic(err)
}
filteredRows = append(filteredRows, resultRow)
}

}

if until == "==" && len(filteredRows) == len(rows) {
return ovsdb.OperationResult{}
} else if until == "!=" && len(filteredRows) != len(rows) {
return ovsdb.OperationResult{}
}

if timeout != nil {
// TODO(trozet): this really shouldn't just break and loop on a time interval
// Really this client handler should pause, wait for another handler to update the DB
// and then try again. However the server is single threaded for now and not capable of
// doing something like that.
if time.Since(start) > time.Duration(*timeout)*time.Millisecond {
break Loop
}
}
time.Sleep(200 * time.Millisecond)
}

e := ovsdb.TimedOut{}
return ovsdb.OperationResult{Error: e.Error()}
}

Expand Down
253 changes: 253 additions & 0 deletions server/transact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/ovn-org/libovsdb/mapper"
Expand All @@ -11,6 +12,258 @@ import (
"github.com/stretchr/testify/require"
)

func TestWaitOpEquals(t *testing.T) {
defDB, err := model.NewClientDBModel("Open_vSwitch", map[string]model.Model{
"Open_vSwitch": &ovsType{},
"Bridge": &bridgeType{}})
if err != nil {
t.Fatal(err)
}
schema, err := getSchema()
if err != nil {
t.Fatal(err)
}
ovsDB := NewInMemoryDatabase(map[string]model.ClientDBModel{"Open_vSwitch": defDB})
dbModel, errs := model.NewDatabaseModel(schema, defDB)
require.Empty(t, errs)
o, err := NewOvsdbServer(ovsDB, dbModel)
require.Nil(t, err)

ovsUUID := uuid.NewString()
bridgeUUID := uuid.NewString()

m := mapper.NewMapper(schema)

ovs := ovsType{}
info, err := dbModel.NewModelInfo(&ovs)
require.NoError(t, err)
ovsRow, err := m.NewRow(info)
require.Nil(t, err)

bridge := bridgeType{
Name: "foo",
ExternalIds: map[string]string{
"foo": "bar",
"baz": "quux",
"waldo": "fred",
},
}
bridgeInfo, err := dbModel.NewModelInfo(&bridge)
require.NoError(t, err)
bridgeRow, err := m.NewRow(bridgeInfo)
require.Nil(t, err)

transaction := o.NewTransaction(dbModel, "Open_vSwitch", o.db)

res, updates := transaction.Insert("Open_vSwitch", ovsUUID, ovsRow)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{res}, []ovsdb.Operation{{Op: "insert"}})
require.Nil(t, err)

res, update2 := transaction.Insert("Bridge", bridgeUUID, bridgeRow)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{res}, []ovsdb.Operation{{Op: "insert"}})
require.Nil(t, err)

updates.Merge(update2)
err = o.db.Commit("Open_vSwitch", uuid.New(), updates)
require.NoError(t, err)

timeout := 0
// Attempt to wait for row with name foo to appear
gotResult := transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name"},
"==",
[]ovsdb.Row{{"name": "foo"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.Nil(t, err)

// Attempt to wait for 2 rows, where one does not exist
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name"},
"==",
[]ovsdb.Row{{"name": "foo"}, {"name": "blah"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.NotNil(t, err)

extIDs, err := ovsdb.NewOvsMap(map[string]string{
"foo": "bar",
"baz": "quux",
"waldo": "fred",
})
require.Nil(t, err)
// Attempt to wait for a row, with multiple columns specified
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name", "external_ids"},
"==",
[]ovsdb.Row{{"name": "foo", "external_ids": extIDs}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.Nil(t, err)

// Attempt to wait for a row, with multiple columns, but not specified in row filtering
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name", "external_ids"},
"==",
[]ovsdb.Row{{"name": "foo"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.Nil(t, err)

// Attempt to get something with a non-zero timeout that will fail
timeout = 400
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name", "external_ids"},
"==",
[]ovsdb.Row{{"name": "doesNotExist"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.NotNil(t, err)

}

func TestWaitOpNotEquals(t *testing.T) {
defDB, err := model.NewClientDBModel("Open_vSwitch", map[string]model.Model{
"Open_vSwitch": &ovsType{},
"Bridge": &bridgeType{}})
if err != nil {
t.Fatal(err)
}
schema, err := getSchema()
if err != nil {
t.Fatal(err)
}
ovsDB := NewInMemoryDatabase(map[string]model.ClientDBModel{"Open_vSwitch": defDB})
dbModel, errs := model.NewDatabaseModel(schema, defDB)
require.Empty(t, errs)
o, err := NewOvsdbServer(ovsDB, dbModel)
require.Nil(t, err)

ovsUUID := uuid.NewString()
bridgeUUID := uuid.NewString()

m := mapper.NewMapper(schema)

ovs := ovsType{}
info, err := dbModel.NewModelInfo(&ovs)
require.NoError(t, err)
ovsRow, err := m.NewRow(info)
require.Nil(t, err)

bridge := bridgeType{
Name: "foo",
ExternalIds: map[string]string{
"foo": "bar",
"baz": "quux",
"waldo": "fred",
},
}
bridgeInfo, err := dbModel.NewModelInfo(&bridge)
require.NoError(t, err)
bridgeRow, err := m.NewRow(bridgeInfo)
require.Nil(t, err)

transaction := o.NewTransaction(dbModel, "Open_vSwitch", o.db)

res, updates := transaction.Insert("Open_vSwitch", ovsUUID, ovsRow)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{res}, []ovsdb.Operation{{Op: "insert"}})
require.Nil(t, err)

res, update2 := transaction.Insert("Bridge", bridgeUUID, bridgeRow)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{res}, []ovsdb.Operation{{Op: "insert"}})
require.Nil(t, err)

updates.Merge(update2)
err = o.db.Commit("Open_vSwitch", uuid.New(), updates)
require.NoError(t, err)

timeout := 0
// Attempt a wait where no entry with name blah should exist
gotResult := transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name"},
"!=",
[]ovsdb.Row{{"name": "blah"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.Nil(t, err)

// Attempt another wait with multiple rows specified, one that would match, and one that doesn't
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name"},
"!=",
[]ovsdb.Row{{"name": "blah"}, {"name": "foo"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.Nil(t, err)

// Attempt another wait where name would match, but ext ids would not match
NoMatchExtIDs, err := ovsdb.NewOvsMap(map[string]string{
"foo": "bar",
"baz": "quux",
"waldo": "is_different",
})
require.Nil(t, err)
// Attempt to wait for a row, with multiple columns specified and one is not a match
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name", "external_ids"},
"!=",
[]ovsdb.Row{{"name": "foo", "external_ids": NoMatchExtIDs}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
require.Nil(t, err)

// Check to see if a non match takes around the timeout
start := time.Now()
timeout = 200
gotResult = transaction.Wait(
"Open_vSwitch",
"Bridge",
&timeout,
[]ovsdb.Condition{ovsdb.NewCondition("name", ovsdb.ConditionEqual, "foo")},
[]string{"name"},
"!=",
[]ovsdb.Row{{"name": "foo"}},
)
_, err = ovsdb.CheckOperationResults([]ovsdb.OperationResult{gotResult}, []ovsdb.Operation{{Op: "wait"}})
ts := time.Since(start)
if ts < time.Duration(timeout)*time.Millisecond {
t.Fatalf("Should have taken at least %d milliseconds to return, but it took %d instead", timeout, ts)
}
require.NotNil(t, err)
}

func TestMutateOp(t *testing.T) {
defDB, err := model.NewClientDBModel("Open_vSwitch", map[string]model.Model{
"Open_vSwitch": &ovsType{},
Expand Down

0 comments on commit a619f0f

Please sign in to comment.