diff --git a/proto/api/beam_api.proto b/proto/api/beam_api.proto index 55c655a..a26b759 100644 --- a/proto/api/beam_api.proto +++ b/proto/api/beam_api.proto @@ -35,7 +35,6 @@ service BeamFactStore { rpc queryFacts(QueryFactsRequest) returns (stream QueryFactsResult); rpc lookup_sp(LookupSPRequest) returns (LookupSPResult); rpc lookup_po(LookupPORequest) returns (LookupPOResult); - rpc insertFacts(InsertFactsRequest) returns (InsertFactsResult); } // QueryRequest is used to specify a SELECT/ASK-style query that returns a @@ -301,48 +300,3 @@ message LookupPOResult { repeated Items results = 1 [(gogoproto.nullable)=false]; int64 index = 2; } - -// InsertFactsRequest is the request for the deprecated insertFacts RPC. -message InsertFactsRequest { - option (gogoproto.stringer)=false; - // Each entry in this array will cause a new subjectID to be created, and - // assigned to the named var. You can use the named var in the - // InsertFact.subject.var field to bind it to a fact being inserted. You - // must use each subject ID created. - repeated string newSubjectVars = 1; - repeated InsertFact facts = 2 [(gogoproto.nullable)=false]; -} - -message InsertFact { - option (gogoproto.stringer)=false; - // Specify a var if you need to bind the created fact ID to a subsequent - // fact in the same insert request. - string factIDVar = 1; - KIDOrVar subject = 2 [(gogoproto.nullable)=false]; - KIDOrVar predicate = 3 [(gogoproto.nullable)=false]; - KGObjectOrVar object = 4 [(gogoproto.nullable)=false]; -} - -message KIDOrVar { - option (gogoproto.stringer)=false; - oneof value { - uint64 kid = 1; - string var = 2; - } -} - -message KGObjectOrVar { - option (gogoproto.stringer)=false; - oneof value { - KGObject object = 1; - string var = 2; - } -} - -// InsertFactsResult is the reply to the deprecated insertFacts RPC. -message InsertFactsResult { - option (gogoproto.stringer)=false; - repeated uint64 varResults = 1; - repeated uint64 factIds = 2; - int64 index = 3; -} diff --git a/src/github.com/ebay/beam/api/beam_api.go b/src/github.com/ebay/beam/api/beam_api.go index 10c69a6..284c7b0 100644 --- a/src/github.com/ebay/beam/api/beam_api.go +++ b/src/github.com/ebay/beam/api/beam_api.go @@ -41,65 +41,6 @@ func (m QueryFactsRequest) String() string { return buf.String() } -func (m InsertFactsResult) String() string { - var buf strings.Builder - fmt.Fprintf(&buf, "InsertFactsResult{") - fmt.Fprintf(&buf, "\n Index: %d", m.Index) - fmt.Fprint(&buf, "\n VarResults:") - for i, id := range m.VarResults { - if i > 0 { - fmt.Fprint(&buf, ",") - } - fmt.Fprintf(&buf, " %d", id) - } - fmt.Fprint(&buf, "\n FactIDs:") - for _, factID := range m.FactIds { - fmt.Fprintf(&buf, "\n %d", factID) - } - fmt.Fprintf(&buf, "\n}") - return buf.String() -} - -func (m InsertFactsRequest) String() string { - return m.string("") -} - -func (m InsertFactsRequest) string(indent string) string { - var buf strings.Builder - fmt.Fprintf(&buf, "InsertFactsRequest{\n") - fmt.Fprintf(&buf, "%s NewSubjectVars: %s\n", indent, strings.Join(m.NewSubjectVars, ", ")) - fmt.Fprintf(&buf, "%s Facts:\n", indent) - for _, fact := range m.Facts { - fmt.Fprintf(&buf, "%s %s\n", indent, fact) - } - fmt.Fprintf(&buf, "%s}", indent) - return buf.String() -} - -func (f InsertFact) String() string { - var buf strings.Builder - if f.FactIDVar != "" { - fmt.Fprintf(&buf, "?%s: ", f.FactIDVar) - } - fmt.Fprintf(&buf, "%s ", f.Subject) - fmt.Fprintf(&buf, "%s ", f.Predicate) - fmt.Fprintf(&buf, "%s", f.Object) - return buf.String() -} - -func (m KIDOrVar) String() string { - switch m.Value.(type) { - case *KIDOrVar_Kid: - return fmt.Sprintf("#%d", m.GetKid()) - case *KIDOrVar_Var: - return fmt.Sprintf("?%s", m.GetVar()) - case nil: - return "(nil)" - default: - panic(fmt.Sprintf("Unknown kid/var: %T", m.Value)) - } -} - // String returns a string version of the KGValue in the same format that // the query parser expects. func (m KGValue) String() string { @@ -154,19 +95,6 @@ func (m KGObject) String() string { } } -func (m KGObjectOrVar) String() string { - switch m.Value.(type) { - case *KGObjectOrVar_Object: - return m.GetObject().String() - case *KGObjectOrVar_Var: - return fmt.Sprintf("?%s", m.GetVar()) - case nil: - return "(nil)" - default: - panic(fmt.Sprintf("Unknown KGObjectOrVar value type: %T", m.Value)) - } -} - func (m KGTimestamp) String() string { return m.Value.Format(patterns[m.Precision]) } diff --git a/src/github.com/ebay/beam/api/beam_api_test.go b/src/github.com/ebay/beam/api/beam_api_test.go index b76c82c..dd65d92 100644 --- a/src/github.com/ebay/beam/api/beam_api_test.go +++ b/src/github.com/ebay/beam/api/beam_api_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" ) -var ifb = &insertFactBuilder{} var ts = time.Date(2018, time.Month(8), 8, 16, 44, 6, 999999999, time.UTC) func Test_String(t *testing.T) { @@ -73,61 +72,6 @@ func Test_String(t *testing.T) { exp: `QueryFactsRequest{ Index: 12345 Query: '?s ?p ?o; #12345 rdfs:type "literal"' -}`, - }, { - obj: &InsertFactsResult{ - Index: 12345, - VarResults: []uint64{678, 901, 234, 567}, - FactIds: []uint64{890, 123, 456, 789, 123, 456}, - }, - exp: `InsertFactsResult{ - Index: 12345 - VarResults: 678, 901, 234, 567 - FactIDs: - 890 - 123 - 456 - 789 - 123 - 456 -}`, - }, { - obj: &InsertFactsRequest{ - NewSubjectVars: []string{"a", "b", "c"}, - Facts: []InsertFact{ - ifb.captureFactID("id1").sVar("s").pVar("p").oVar("o").build(), - ifb.sID(123).pID(456).oBool(true).build(), - ifb.sID(123).pID(456).oBool(true).oUID(12).build(), - ifb.sID(123).pID(456).oBool(false).build(), - ifb.sID(123).pID(456).oBool(false).oUID(12).build(), - ifb.sID(123).pID(456).oFloat64(1).build(), - ifb.sID(123).pID(456).oFloat64(1).oUID(12).build(), - ifb.sID(123).pID(456).oInt64(1).build(), - ifb.sID(123).pID(456).oInt64(1).oUID(12).build(), - ifb.sID(123).pID(456).oID(789).build(), - ifb.sID(123).pID(456).oString("Literal String").build(), - ifb.sID(123).pID(456).oString("Literal String").oLID(34).build(), - ifb.sID(123).pID(456).oTime(ts, Nanosecond).build(), - ifb.sID(123).pID(456).oTime(ts, Nanosecond).oUID(12).build(), - }, - }, - exp: `InsertFactsRequest{ - NewSubjectVars: a, b, c - Facts: - ?id1: ?s ?p ?o - #123 #456 true - #123 #456 true - #123 #456 false - #123 #456 false - #123 #456 1.000000 - #123 #456 1.000000 - #123 #456 1 - #123 #456 1 - #123 #456 #789 - #123 #456 'Literal String' - #123 #456 'Literal String' - #123 #456 2018-08-08T16:44:06.999999999 - #123 #456 2018-08-08T16:44:06.999999999 }`, }, { obj: Year, @@ -177,124 +121,3 @@ func Test_String(t *testing.T) { }) } } - -type insertFactBuilder struct { - fact InsertFact -} - -func (b *insertFactBuilder) captureFactID(name string) *insertFactBuilder { - b.fact.FactIDVar = name - return b -} - -func (b *insertFactBuilder) sVar(name string) *insertFactBuilder { - b.fact.Subject.Value = &KIDOrVar_Var{Var: name} - return b -} - -func (b *insertFactBuilder) sID(kid uint64) *insertFactBuilder { - b.fact.Subject.Value = &KIDOrVar_Kid{Kid: kid} - return b -} - -func (b *insertFactBuilder) pVar(name string) *insertFactBuilder { - b.fact.Predicate.Value = &KIDOrVar_Var{Var: name} - return b -} - -func (b *insertFactBuilder) pID(kid uint64) *insertFactBuilder { - b.fact.Predicate.Value = &KIDOrVar_Kid{Kid: kid} - return b -} - -func (b *insertFactBuilder) oVar(name string) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Var{Var: name} - return b -} - -func (b *insertFactBuilder) oID(kid uint64) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Object{ - Object: &KGObject{ - Value: &KGObject_AKID{AKID: kid}, - }, - } - return b -} - -func (b *insertFactBuilder) oString(value string) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Object{ - Object: &KGObject{ - Value: &KGObject_AString{AString: value}, - }, - } - return b -} - -func (b *insertFactBuilder) oFloat64(value float64) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Object{ - Object: &KGObject{ - Value: &KGObject_AFloat64{AFloat64: value}, - }, - } - return b -} - -func (b *insertFactBuilder) oInt64(value int64) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Object{ - Object: &KGObject{ - Value: &KGObject_AInt64{AInt64: value}, - }, - } - return b -} - -func (b *insertFactBuilder) oTime(value time.Time, precision Precision) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Object{ - Object: &KGObject{ - Value: &KGObject_ATimestamp{ATimestamp: &KGTimestamp{Precision: precision, Value: value}}, - }, - } - return b -} - -func (b *insertFactBuilder) oBool(value bool) *insertFactBuilder { - b.fact.Object.Value = &KGObjectOrVar_Object{ - Object: &KGObject{ - Value: &KGObject_ABool{ABool: value}, - }, - } - return b -} - -func (b *insertFactBuilder) oUID(value uint64) *insertFactBuilder { - v, ok := b.fact.Object.Value.(*KGObjectOrVar_Object) - if !ok { - panic(fmt.Sprintf("OVar does not include unit id")) - } - v.Object.UnitID = value - return b -} - -func (b *insertFactBuilder) oLID(value uint64) *insertFactBuilder { - v, ok := b.fact.Object.Value.(*KGObjectOrVar_Object) - if !ok { - panic(fmt.Sprintf("OVar does not include lang id")) - } - v.Object.LangID = value - return b -} - -func (b *insertFactBuilder) build() InsertFact { - if b.fact.Subject.Value == nil { - panic("Attempted to build fact with nil subject") - } - if b.fact.Predicate.Value == nil { - panic("Attempted to build fact with nil predicate") - } - if b.fact.Object.Value == nil { - panic("Attempted to build fact with nil object") - } - f := b.fact - b.fact = InsertFact{} - return f -} diff --git a/src/github.com/ebay/beam/api/impl/impl_kg.go b/src/github.com/ebay/beam/api/impl/impl_kg.go index a53d4c1..a4f4a92 100644 --- a/src/github.com/ebay/beam/api/impl/impl_kg.go +++ b/src/github.com/ebay/beam/api/impl/impl_kg.go @@ -25,179 +25,14 @@ import ( "github.com/ebay/beam/blog" "github.com/ebay/beam/logentry" "github.com/ebay/beam/logentry/logencoder" - "github.com/ebay/beam/logentry/logread" "github.com/ebay/beam/rpc" "github.com/ebay/beam/util/cmp" "github.com/ebay/beam/util/parallel" "github.com/ebay/beam/util/tracing" - "github.com/ebay/beam/viewclient/lookups" opentracing "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" ) -const ( - maxInsertRetry = 32 -) - -// equalKeys returns true if the 2 maps have the same -// set of keys in them. It does not compare any values. -func equalKeys(a, b map[int]uint64) bool { - if len(a) != len(b) { - return false - } - for k := range a { - if _, exists := b[k]; !exists { - return false - } - } - return true -} - -// InsertFacts implements the gRPC InsertFacts function of the BeamFactStore service. This will -// upsert 1 or more facts described in the request, using a transaction to safely determine if -// the fact(s) already exist. -// The result includes the log index that the facts were committed at, subsequent requests to -// Read may not yet be at the supplied log index. Callers should include the returned log index -// in the subsequent read if they expect to see these write(s). -func (s *Server) InsertFacts(ctx context.Context, req *api.InsertFactsRequest) (*api.InsertFactsResult, error) { - return insertFacts(ctx, s.source, s.beamLog, req) -} - -// logAppendSingle provides an abstraction for the log interactions used by -// Insert. This makes it easier to write unit tests. -type logAppendSingle interface { - AppendSingle(ctx context.Context, data []byte) (blog.Index, error) -} - -// insertFacts provides the implementation of the InsertFacts public API. It's -// extracted out this way to aid in testing. -func insertFacts(ctx context.Context, - source lookups.SPO, - logAppender logAppendSingle, - req *api.InsertFactsRequest) (*api.InsertFactsResult, error) { - - log.Debugf("InsertFacts %+v", req) - - applyUnicodeNormalizationToInsertFactsRequest(req) - if err := validateInsertFactsRequest(req); err != nil { - return nil, err - } - // insert has to determine which facts in the request exist at the time of the write, we use a transaction - // to do this safely. Insert performs a loop of - // 1. write an InsertTxCommand of the facts - // 2. use the log from (1) to execute a LookupSPO of all facts in the request that could exist - // 3. if there was an existing fact in the InsertTxCommand then - // write a decide(abort) log message - // build a filtered InsertTxCommand removing all the facts we found as existing - // goto back to 1 and try again - // 4. if 2/3 didn't discover a different set of existing facts since the previous loop - // we can commit and we're done - // 5. if all the facts in the request are existing, then we're done - // - lookupForExisting := rpc.LookupSPORequest{ - Lookups: make([]rpc.LookupSPORequest_Item, 0, len(req.Facts)-len(req.NewSubjectVars)), - } - lookupOffsetToFactOffset := make([]int, 0, len(req.Facts)-len(req.NewSubjectVars)) - for i, f := range req.Facts { - if f.Subject.GetVar() == "" && f.Predicate.GetVar() == "" && f.Object.GetVar() == "" { - lookupOffsetToFactOffset = append(lookupOffsetToFactOffset, i) - lookupForExisting.Lookups = append(lookupForExisting.Lookups, rpc.LookupSPORequest_Item{ - Subject: f.Subject.GetKid(), - Predicate: f.Predicate.GetKid(), - Object: rpc.KGObjectFromAPI(*f.Object.GetObject()), - }) - } - } - // lookup facts from the request at the log index. Returns a map containing the IDs of the existing facts - // the map has the offset of the InsertFact in the request as the key. - lookup := func(idx blog.Index) (map[int]uint64, error) { - // its possible that all facts in the insert reference a variable and are therefore - // going to be new facts, and so there are zero existing facts to look for, in which - // case we don't have anything to do - if len(lookupForExisting.Lookups) == 0 { - return nil, nil - } - span, ctx := opentracing.StartSpanFromContext(ctx, "lookup existing facts") - span.SetTag("index", idx) - span.SetTag("lookup_count", len(lookupForExisting.Lookups)) - defer span.Finish() - existing := make(map[int]uint64) - resCh := make(chan *rpc.LookupChunk, 4) - wait := parallel.Go(func() { - for chunk := range resCh { - for _, f := range chunk.Facts { - reqFactOffset := lookupOffsetToFactOffset[f.Lookup] - existing[reqFactOffset] = f.Fact.Id - } - } - }) - lookupForExisting.Index = idx - err := source.LookupSPO(ctx, &lookupForExisting, resCh) - wait() - if err != nil { - return nil, err - } - span.SetTag("existing_count", len(existing)) - return existing, nil - } - logReq, vars := convertAPIInsertToLogCommand(req) - existingLastAttempt := map[int]uint64{} - filteredLogReq := logReq - var insertIdx blog.Index - - for attempt := 0; attempt < maxInsertRetry; attempt++ { - attemptSpan, attemptCtx := opentracing.StartSpanFromContext(ctx, "insert_attempt") - attemptSpan.SetTag("attempt", attempt) - var err error - insertIdx, err = appendInsertTxBegin(attemptCtx, logAppender, &filteredLogReq) - if err != nil { - attemptSpan.Finish() - return nil, fmt.Errorf("unable to write tx message: %v", err) - } - existing, err := lookup(insertIdx - 1) - commit := err == nil && equalKeys(existing, existingLastAttempt) - - if _, decideErr := appendTxDecide(attemptCtx, logAppender, insertIdx, commit); decideErr != nil { - attemptSpan.Finish() - return nil, fmt.Errorf("unable to write decide message: %v", decideErr) - } - if err != nil { - attemptSpan.Finish() - return nil, fmt.Errorf("unable to lookup existing facts: %v", err) - } - // if all the facts in the insert exist then commit will be false - // but there's no more work to do, so we're done at that point - if commit || len(req.Facts) == len(existing) { - attemptSpan.Finish() - return insertResult(req, &logReq, existing, insertIdx, vars), nil - } - existingLastAttempt = existing - filteredLogReq = excludeExisting(logReq, existing) - attemptSpan.Finish() - } - // give up - return nil, fmt.Errorf("insert exhausted all retries (%d) while attempting to resolve which facts do/don't exist", maxInsertRetry) -} - -// appendInsertTxBegin appends a single transaction message to the log containing req, and returns the index of the entry. -func appendInsertTxBegin(ctx context.Context, logAppender logAppendSingle, req *logentry.InsertTxCommand) (blog.Index, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "append startTx") - span.SetTag("facts", len(req.Facts)) - defer span.Finish() - enc := logencoder.Encode(req) - return logAppender.AppendSingle(ctx, enc) -} - -// appendTxDecide appends a single tx decision message to the log and returns the index of the entry. -func appendTxDecide(ctx context.Context, logAppender logAppendSingle, idx blog.Index, commit bool) (blog.Index, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "append decideTx") - span.SetTag("commit", commit) - defer span.Finish() - dm := &logentry.TxDecisionCommand{Tx: idx, Commit: commit} - enc := logencoder.Encode(dm) - return logAppender.AppendSingle(ctx, enc) -} - // Wipe implements the Wipe function in the BeamFactStore gRPC API. If Wiping is enabled in the // configuration this will delete all facts in the store. It is expected that this is only enabled // in development environments. This blocks until all active views have processed the Wipe request. @@ -391,41 +226,3 @@ func (s *Server) fetchLatestLogIndex(ctx context.Context, log beamLogInfo) (blog s.lock.Unlock() return info.LastIndex, nil } - -// insertResult builds an insert facts result from the request, existing ids and -// log index. logReq should be the InsertTxCommand that was first created from -// the Insert request, it should not be the version that has the existing facts -// filtered out of it. -func insertResult(apiReq *api.InsertFactsRequest, logReq *logentry.InsertTxCommand, existing map[int]uint64, idx blog.Index, vars map[string]int32) *api.InsertFactsResult { - res := &api.InsertFactsResult{ - VarResults: make([]uint64, len(apiReq.NewSubjectVars)), - FactIds: make([]uint64, len(apiReq.Facts)), - Index: int64(idx), - } - for i := range apiReq.Facts { - if id, isExisting := existing[i]; isExisting { - res.FactIds[i] = id - } else { - res.FactIds[i] = logread.KID(idx, logReq.Facts[i].FactIDOffset) - } - } - for i, vn := range apiReq.NewSubjectVars { - res.VarResults[i] = logread.KID(idx, vars[vn]) - } - return res -} - -// excludeExisting builds a new InsertTxCommand excluding facts where there's an existing fact. -// This does not filter in place as it requires the full InsertTxCommand as the starting point -// and that is used later on, so we don't want to mutate it. -func excludeExisting(req logentry.InsertTxCommand, existing map[int]uint64) logentry.InsertTxCommand { - n := logentry.InsertTxCommand{ - Facts: make([]logentry.InsertFact, 0, len(req.Facts)-len(existing)), - } - for i, f := range req.Facts { - if _, isExisting := existing[i]; !isExisting { - n.Facts = append(n.Facts, f) - } - } - return n -} diff --git a/src/github.com/ebay/beam/api/impl/impl_kg_test.go b/src/github.com/ebay/beam/api/impl/impl_kg_test.go index c17ac3e..4fb44c9 100644 --- a/src/github.com/ebay/beam/api/impl/impl_kg_test.go +++ b/src/github.com/ebay/beam/api/impl/impl_kg_test.go @@ -17,580 +17,15 @@ package impl import ( "context" - "errors" "fmt" "testing" "github.com/ebay/beam/api" "github.com/ebay/beam/blog" - "github.com/ebay/beam/blog/mockblog" "github.com/ebay/beam/logentry" - "github.com/ebay/beam/logentry/logencoder" - "github.com/ebay/beam/logentry/logwrite" - "github.com/ebay/beam/rpc" - "github.com/ebay/beam/viewclient/lookups/mocklookups" "github.com/stretchr/testify/assert" ) -func Test_EqualKeys(t *testing.T) { - assert.True(t, equalKeys(map[int]uint64{}, map[int]uint64{})) - assert.True(t, equalKeys(map[int]uint64{1: 1}, map[int]uint64{1: 2})) - assert.True(t, equalKeys(map[int]uint64{1: 1, 50: 2}, map[int]uint64{1: 2, 50: 3})) - assert.False(t, equalKeys(map[int]uint64{1: 1, 51: 2}, map[int]uint64{1: 1, 50: 2})) - assert.False(t, equalKeys(map[int]uint64{1: 1, 51: 2}, map[int]uint64{51: 2})) -} - -func Test_ExcludeExisting(t *testing.T) { - // msg is a helper to generate a InsertTxCommand with the list of FactIDOffsets - msg := func(offsets ...int32) logentry.InsertTxCommand { - r := logentry.InsertTxCommand{ - Facts: make([]logentry.InsertFact, len(offsets)), - } - for i, offset := range offsets { - r.Facts[i].FactIDOffset = offset - } - return r - } - type tc struct { - in logentry.InsertTxCommand - existing map[int]uint64 // offset of Fact in Insert -> existing FactID - exp logentry.InsertTxCommand - } - tests := []tc{ - {msg(3, 1, 2), map[int]uint64{}, msg(3, 1, 2)}, - {msg(3, 1, 2), map[int]uint64{0: 11}, msg(1, 2)}, - {msg(3, 1, 2), map[int]uint64{1: 11}, msg(3, 2)}, - {msg(3, 1, 2), map[int]uint64{2: 12}, msg(3, 1)}, - {msg(3, 1, 2), map[int]uint64{0: 11, 2: 12}, msg(1)}, - {msg(3, 1, 2), map[int]uint64{0: 11, 1: 11, 2: 12}, msg()}, - {msg(3, 1, 2), map[int]uint64{0: 11, 4: 11}, msg(1, 2)}, - } - for idx, test := range tests { - t.Run(fmt.Sprintf("tc[%d]", idx), func(t *testing.T) { - before := test.in - filtered := excludeExisting(test.in, test.existing) - assert.Equal(t, test.exp, filtered) - assert.Equal(t, before, test.in, "The input InsertTxCommand to excludeExisting should not be mutated") - }) - } -} - -func Test_InsertResults(t *testing.T) { - apiReq := api.InsertFactsRequest{ - NewSubjectVars: []string{"bob"}, - Facts: []api.InsertFact{ - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "bob"}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 4}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AString{AString: "Bob"}}}}}, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 6}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 41}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 7}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 0x5}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 0x41}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Var{Var: "bob"}}, - }, - }, - } - logReq, varOffsets := convertAPIInsertToLogCommand(&apiReq) - existing := map[int]uint64{1: 1001} - result := insertResult(&apiReq, &logReq, existing, 2, varOffsets) - assert.Equal(t, []uint64{2001}, result.VarResults) - assert.Equal(t, []uint64{2002, 1001, 2004}, result.FactIds) - assert.Equal(t, int64(2), result.Index) -} - -func Test_appendInsertTxBegin(t *testing.T) { - beamLog := mockblog.New(context.Background()) - cmd := &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{{ - FactIDOffset: 3, - }}, - } - idx, err := appendInsertTxBegin(context.Background(), beamLog, cmd) - assert.NoError(t, err) - assert.Equal(t, uint64(1), idx) - beamLog.AssertCommands(t, rpc.LogPosition{ - Index: 1, - Version: logencoder.DefaultLogVersion, - }, []logencoder.ProtobufCommand{cmd}) -} - -func Test_appendInsertTxBegin_reportsError(t *testing.T) { - beamLog := mockblog.New(context.Background()) - beamLog.SetNextAppendError(errors.New("Unable to append")) - idx, err := appendInsertTxBegin(context.Background(), beamLog, - &logentry.InsertTxCommand{}) - assert.EqualError(t, err, "Unable to append") - assert.Equal(t, uint64(0), idx) -} - -func Test_appendTxDecide(t *testing.T) { - beamLog := mockblog.New(context.Background()) - idx, err := appendTxDecide(context.Background(), beamLog, 5, true) - assert.NoError(t, err) - assert.Equal(t, uint64(1), idx) - beamLog.AssertCommands(t, rpc.LogPosition{ - Index: 1, - Version: logencoder.DefaultLogVersion, - }, []logencoder.ProtobufCommand{ - &logentry.TxDecisionCommand{Tx: 5, Commit: true}, - }) -} - -func Test_appendTxDecide_reportsError(t *testing.T) { - beamLog := mockblog.New(context.Background()) - beamLog.SetNextAppendError(errors.New("Unable to append")) - idx, err := appendTxDecide(context.Background(), beamLog, 5, true) - assert.EqualError(t, err, "Unable to append") - assert.Equal(t, uint64(0), idx) -} - -func makeInsertFact(s, p, o uint64) api.InsertFact { - return api.InsertFact{ - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: s}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: p}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: o}}}}, - } -} - -func Test_Insert(t *testing.T) { - lfb := new(logwrite.InsertFactBuilder).SetAutoFactID(false) - - // tc defines a single insert test case. It takes the inbound API request - // and runs it though InsertFacts using a mock log and a mock LookupSPO - // endpoint. The test case can control what lookupSPO returns. It will - // verify both the expected result of InsertFacts, as well as the what - // LookupSPO requests were made, and what Commands were written to the log - type tc struct { - name string - req api.InsertFactsRequest - // if not empty will assert that the insertFacts call returned this error - expectedError string - // the expected results of the insertFacts call if there was no error - expectedResult api.InsertFactsResult - // the first lookupSPO request returns mockSPO.results[0], the seconds returns [1] etc. - lookupSPO []mocklookups.Expected - // the list of commands expected to be written to the log - expectedLog []logencoder.ProtobufCommand - } - // startingLogIndex is the log index the first log entry created by the test will - // be placed at. - const startingLogIndex = 5 - tests := []tc{ - { - name: "Insert should validate the request", - req: api.InsertFactsRequest{ - NewSubjectVars: []string{"bob"}, - Facts: []api.InsertFact{ - makeInsertFact(1, 2, 3), - }, - }, - expectedError: "the following variables were declared but not used, all variables must get used: [bob]", - }, - { - name: "Insert with no existing facts, no variables", - req: api.InsertFactsRequest{ - Facts: []api.InsertFact{ - makeInsertFact(1001, 1002, 1003), - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2001}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2002}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{ - Value: &api.KGObject_AInt64{AInt64: 42}, - UnitID: 1, - }, - }}, - }, - }, - }, - expectedResult: api.InsertFactsResult{ - Index: startingLogIndex, - FactIds: []uint64{5001, 5002}, - }, - lookupSPO: []mocklookups.Expected{ - mocklookups.OK(&rpc.LookupSPORequest{ - Index: startingLogIndex - 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AInt64(42, 1)}, - }, - }, - // this is a slice with one item in it that's empty, i.e. no facts found - rpc.LookupChunk{}), - }, - expectedLog: []logencoder.ProtobufCommand{ - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - lfb.SID(2001).PID(2002).OInt64(42, 1).FactID(2).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: 5, Commit: true}, - }, - }, - { - name: "Insert with Variables", - req: api.InsertFactsRequest{ - NewSubjectVars: []string{"?bob"}, - Facts: []api.InsertFact{ - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "?bob"}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 4}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}, - }}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 123}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 4}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Var{Var: "?bob"}}, - }, - }, - }, - expectedResult: api.InsertFactsResult{ - Index: startingLogIndex, - VarResults: []uint64{5001}, - FactIds: []uint64{5002, 5003}, - }, - // as all the facts reference a newSubjectVar, they can't - // possibly exist, so there should be no lookupSPO call made - lookupSPO: nil, - expectedLog: []logencoder.ProtobufCommand{ - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SOffset(1).PID(4).OKID(1).FactID(2).Fact(), - lfb.SID(123).PID(4).OOffset(1).FactID(3).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: 5, Commit: true}, - }, - }, - { - name: "Insert multiple facts with one existing", - req: api.InsertFactsRequest{ - Facts: []api.InsertFact{ - makeInsertFact(1001, 1002, 1003), - makeInsertFact(2001, 2002, 2003), // this one exists and has factID 2004 - }, - }, - expectedResult: api.InsertFactsResult{ - Index: startingLogIndex + 2, - FactIds: []uint64{7001, 2004}, - }, - lookupSPO: []mocklookups.Expected{ - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex - 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 1, - Fact: rpc.Fact{Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003), Index: 2, Id: 2004}, - }}, - }, - ), - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex + 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 1, - Fact: rpc.Fact{Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003), Index: 2, Id: 2004}, - }}, - }, - ), - }, - expectedLog: []logencoder.ProtobufCommand{ - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - lfb.SID(2001).PID(2002).OKID(2003).FactID(2).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex, Commit: false}, - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex + 2, Commit: true}, - }, - }, - { - name: "Insert where all facts already exist", - req: api.InsertFactsRequest{ - Facts: []api.InsertFact{ - makeInsertFact(1001, 1002, 1003), - makeInsertFact(2001, 2002, 2003), - }, - }, - expectedResult: api.InsertFactsResult{ - Index: startingLogIndex, - FactIds: []uint64{1004, 2004}, - }, - lookupSPO: []mocklookups.Expected{ - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex - 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - // one result with the 2 existing facts - rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 0, - Fact: rpc.Fact{Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003), Id: 1004, Index: 1}, - }, { - Lookup: 1, - Fact: rpc.Fact{Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003), Id: 2004, Index: 2}, - }}, - }, - ), - }, - expectedLog: []logencoder.ProtobufCommand{ - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - lfb.SID(2001).PID(2002).OKID(2003).FactID(2).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex, Commit: false}, - }, - }, - { - name: "insert that requires more than 1 attempt to get stable existing state", - req: api.InsertFactsRequest{ - Facts: []api.InsertFact{ - makeInsertFact(1001, 1002, 1003), - makeInsertFact(2001, 2002, 2003), - }, - }, - expectedResult: api.InsertFactsResult{ - Index: startingLogIndex + 4, - FactIds: []uint64{1004, 9002}, - }, - lookupSPO: []mocklookups.Expected{ - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex - 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 1, - Fact: rpc.Fact{Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003), Id: 2004, Index: 2}, - }}, - }, - ), - // after finding that this fact exists, it'll loop around - // and write another insertTx with it removed, and then - // check again that its still the only existing fact, but in - // that time other writers could have created and/or deleted - // facts from the original request. The eventual state should - // be that Id:1004 exists, and Id:2004 does not. - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex + 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 0, - Fact: rpc.Fact{Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003), Id: 1004, Index: 1}, - }}, - }, - ), - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex + 3, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 0, - Fact: rpc.Fact{Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003), Id: 1004, Index: 1}, - }}, - }, - ), - }, - expectedLog: []logencoder.ProtobufCommand{ - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - lfb.SID(2001).PID(2002).OKID(2003).FactID(2).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex, Commit: false}, - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex + 2, Commit: false}, - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(2001).PID(2002).OKID(2003).FactID(2).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex + 4, Commit: true}, - }, - }, - { - name: "Abort log entry should be created when lookup fails", - req: api.InsertFactsRequest{ - Facts: []api.InsertFact{ - makeInsertFact(1001, 1002, 1003), - }, - }, - expectedError: "unable to lookup existing facts: LookupSPO Failed", - lookupSPO: []mocklookups.Expected{ - mocklookups.Err( - &rpc.LookupSPORequest{ - Index: startingLogIndex - 1, - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - }, - }, - errors.New("LookupSPO Failed"), - ), - }, - expectedLog: []logencoder.ProtobufCommand{ - &logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - }, - }, - &logentry.TxDecisionCommand{Tx: startingLogIndex, Commit: false}, - }, - }, - } - // this builds a test case where the lookups return different results each time - // (e.g. if there were high levels of concurrent writes/deletes of the same or - // overlapping sets of facts). Eventually this insert gives up and returns an error. - maxAttemptsTestCase := tc{ - name: "Exhaust lookups attempts", - req: api.InsertFactsRequest{ - Facts: []api.InsertFact{ - makeInsertFact(1001, 1002, 1003), - makeInsertFact(2001, 2002, 2003), - }, - }, - expectedError: "insert exhausted all retries (32) while attempting to resolve which facts do/don't exist", - } - for i := 0; i < maxInsertRetry; i++ { - // in reality we'd expect the index of the results to change for each request, but that - // doesn't affect the processing, so we don't set that up. - var result rpc.LookupChunk - if i%2 == 0 { - result = rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 0, - Fact: rpc.Fact{Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003), Id: 1004, Index: 1}, - }}, - } - } else { - result = rpc.LookupChunk{ - Facts: []rpc.LookupChunk_Fact{{ - Lookup: 1, - Fact: rpc.Fact{Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003), Id: 2004, Index: 2}, - }}, - } - } - // every insert attempt consists of a insertTx followed by an decideTx - // so the read's will be done at intervals of 2. (at least in the test - // where nothing else is writing). - maxAttemptsTestCase.lookupSPO = append(maxAttemptsTestCase.lookupSPO, - mocklookups.OK( - &rpc.LookupSPORequest{ - Index: startingLogIndex - 1 + 2*uint64(i), - Lookups: []rpc.LookupSPORequest_Item{ - {Subject: 1001, Predicate: 1002, Object: rpc.AKID(1003)}, - {Subject: 2001, Predicate: 2002, Object: rpc.AKID(2003)}, - }, - }, - result)) - } - expLog := make([]logencoder.ProtobufCommand, 0, maxInsertRetry) - firstInsert := logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1001).PID(1002).OKID(1003).FactID(1).Fact(), - lfb.SID(2001).PID(2002).OKID(2003).FactID(2).Fact(), - }, - } - expLog = append(expLog, &firstInsert) - expLog = append(expLog, &logentry.TxDecisionCommand{Tx: startingLogIndex, Commit: false}) - // after the first attempt, the log entries will flip between 2 different attempts - // based on the fliping lookup results - insertA := logentry.InsertTxCommand{Facts: firstInsert.Facts[1:]} - insertB := logentry.InsertTxCommand{Facts: firstInsert.Facts[:1]} - var tx uint64 = startingLogIndex + 2 - for i := 0; i < maxInsertRetry-1; i++ { // -1 because firstInsert will be the first attempt - if i%2 == 0 { - expLog = append(expLog, &insertA, &logentry.TxDecisionCommand{Tx: tx}) - } else { - expLog = append(expLog, &insertB, &logentry.TxDecisionCommand{Tx: tx}) - } - tx += 2 - } - maxAttemptsTestCase.expectedLog = expLog - tests = append(tests, maxAttemptsTestCase) - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ctx := context.Background() - beamLog := mockblog.New(ctx) - assert.NoError(t, beamLog.Discard(ctx, startingLogIndex)) - mockSPO, assertDone := mocklookups.New(t, test.lookupSPO...) - res, err := insertFacts(ctx, mockSPO, beamLog, &test.req) - if test.expectedError != "" { - assert.EqualError(t, err, test.expectedError) - } else { - assert.NoError(t, err) - // assert.Equal doesn't consider []foo{} & nil equal, even though nil - // is treated by an empty slice by everything. so normalize these up - // to what the is generated for the output - if test.expectedResult.VarResults == nil { - test.expectedResult.VarResults = []uint64{} - } - assert.Equal(t, &test.expectedResult, res) - } - assertDone() - beamLog.AssertCommands(t, rpc.LogPosition{ - Index: startingLogIndex, - Version: logencoder.DefaultLogVersion, - }, test.expectedLog) - }) - } -} - func Test_ResolveIndexConstraint(t *testing.T) { type test struct { recent blog.Index @@ -778,3 +213,7 @@ func (m *mockInfo) Info(context.Context) (*blog.Info, error) { LastIndex: m.latestIndex, }, nil } + +func Test_TimestampPrecisionSame(t *testing.T) { + assert.Equal(t, logentry.TimestampPrecision_value, api.Precision_value, "API & Logentry defintions are expected to have the same names/values for Precision") +} diff --git a/src/github.com/ebay/beam/api/impl/insert_facts.go b/src/github.com/ebay/beam/api/impl/insert_facts.go deleted file mode 100644 index 1a794fa..0000000 --- a/src/github.com/ebay/beam/api/impl/insert_facts.go +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2019 eBay Inc. -// Primary authors: Simon Fell, Diego Ongaro, -// Raymond Kroeker, and Sathish Kandasamy. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package impl - -import ( - "fmt" - "sort" - - "github.com/ebay/beam/api" - "github.com/ebay/beam/logentry" - "github.com/ebay/beam/logentry/logread" - "github.com/ebay/beam/logentry/logwrite" - "github.com/ebay/beam/util/unicode" - log "github.com/sirupsen/logrus" -) - -type varStatus uint8 - -const ( - varDeclared varStatus = 1 << iota - varUsed - varFactID -) - -// validateInsertFactsRequest performs basic validation on the insert facts -// structure. It's just cross-checking data inside the request struct, it's not -// doing any validations that require access to the currently stored data. -// It will ensure that the number of new KIDs required does not exceed the -// maximum for a single log entry, and it will verify that the usage of variables -// is valid -// 1) every variable in newSubjectVars is used in a fact. -// 2) every variable that captures a FactID is used in some other fact. -// 3) a field that is a variable doesn't have an empty variable name. -// 4) a variable that's in NewSubjectVars isn't also used to capture a FactID. -func validateInsertFactsRequest(i *api.InsertFactsRequest) error { - if len(i.Facts)+len(i.NewSubjectVars) >= logread.MaxOffset { - return fmt.Errorf("a single insert is limited to %d new Ids, this request has %d new Subjects & %d new Facts which is too many", logread.MaxOffset-1, len(i.NewSubjectVars), len(i.Facts)) - } - vars := make(map[string]varStatus) - for _, v := range i.NewSubjectVars { - vars[v] = varDeclared - } - checkVar := func(factIdx int, varName, field string, canBeFactID bool) error { - if varName == "" { - return nil - } - if _, exists := vars[varName]; !exists { - return fmt.Errorf("fact[%d] specifies a %s variable '%s', but that isn't delcared", factIdx, field, varName) - } - vars[varName] |= varUsed - if !canBeFactID { - if vars[varName]&varFactID != 0 { - return fmt.Errorf("fact[%d] specifies a %s variable '%s', but that variable is a fact_id and can't be bound to %s", factIdx, field, varName, field) - } - } - return nil - } - // check that used variables exist. - for idx, f := range i.Facts { - if f.FactIDVar != "" { - // the FactID can only be bound for reading later on, each fact that declares a var must declare a new unique var - if _, exists := vars[f.FactIDVar]; exists { - return fmt.Errorf("fact[%d] specifies a FactID Variable '%s', but that was already declared as variable", idx, f.FactIDVar) - } - vars[f.FactIDVar] = varDeclared + varFactID - } - if err := checkVar(idx, f.Subject.GetVar(), "subject", true); err != nil { - return err - } - if err := checkVar(idx, f.Predicate.GetVar(), "predicate", false); err != nil { - return err - } - if err := checkVar(idx, f.Object.GetVar(), "object", false); err != nil { - return err - } - if f.Object.GetVar() == "" && f.Object.GetObject() == nil { - return fmt.Errorf("fact[%d] Object doesn't specify a variable or an Object value, one or the other is required", idx) - } - } - // ensure that all declared vars were actually used. - unused := make([]string, 0, 4) - for v, st := range vars { - if st&varUsed == 0 { - unused = append(unused, v) - } - } - if len(unused) > 0 { - sort.Strings(unused) - return fmt.Errorf("the following variables were declared but not used, all variables must get used: %v", unused) - } - return nil -} - -// convertAPIInsertToLogCommand will take an api.InsertFactsRequest and convert -// it into a logentry.InsertTxCommand. This function assumes that -// validateInsertFactsRequest has already been called on the supplied -// InsertFactsRequest, and that it reported no errors. -// -// In the logentry version variables have been replaced with offsets which can -// be resolved to a KID given a log index that the message was written to. -// -// We do this so that we only assign ids in a single place, and record that in -// the log entry. This means if we need to change how this assignment happens in -// the future we won't have to deal with trying to get the API & DiskView -// instances to switch calculation at the same time. -// -// In addition it returns a map of variable name to offset for all the variables -// used in the insert. This map can be used in conjunction with the KID function -// to determine the final KIDs assigned to the variables once you know the log -// index the InsertTxCommand was written at. -func convertAPIInsertToLogCommand(i *api.InsertFactsRequest) (logentry.InsertTxCommand, map[string]int32) { - // dont access this directly, use the nextOffset function - nextOffsetToUse := int32(1) - nextOffset := func() int32 { - r := nextOffsetToUse - nextOffsetToUse++ - return r - } - - // The offsets are assigned sequentially from the insert request. We assign - // all new subjects first, then assign offsets for each factID. If - // subsequently existing facts are filtered out of the resulting - // InsertTxCommand, that'll leave holes in the used offsets, and thats ok. - vars := make(map[string]int32, len(i.NewSubjectVars)) - for _, v := range i.NewSubjectVars { - vars[v] = nextOffset() - } - - res := logentry.InsertTxCommand{ - Facts: make([]logentry.InsertFact, len(i.Facts)), - } - for idx, f := range i.Facts { - factIDOffset := nextOffset() - if f.FactIDVar != "" { - vars[f.FactIDVar] = factIDOffset - } - logFact := logentry.InsertFact{ - FactIDOffset: factIDOffset, - Subject: convertKidOrVar(&f.Subject, vars), - Predicate: convertKidOrVar(&f.Predicate, vars), - Object: convertKGObjectOrVar(&f.Object, vars), - } - res.Facts[idx] = logFact - } - if nextOffsetToUse >= logread.MaxOffset { - // validateInsertFactsRequest should of already caught this condition - // so something has gone wrong somewhere. - log.Panicf("convertAPIInsertToLogCommand consumed too many offsets: %d, which should of been caught by validateInsertFactsRequest", nextOffsetToUse-1) - } - return res, vars -} - -// convertKidOrVar maps from the api to log types for the KidOrVar, primarily by -// converting vars to offsets. The offset describes a KID relative to a later -// assigned log index. -func convertKidOrVar(in *api.KIDOrVar, vars map[string]int32) logentry.KIDOrOffset { - switch t := in.Value.(type) { - case *api.KIDOrVar_Kid: - return logentry.KIDOrOffset{Value: &logentry.KIDOrOffset_Kid{Kid: t.Kid}} - case *api.KIDOrVar_Var: - varOffset, exists := vars[t.Var] - if !exists { - log.Panicf("convertKidOrVar called with var '%s', which doesn't exist, this should never happen", t.Var) - } - return logentry.KIDOrOffset{Value: &logentry.KIDOrOffset_Offset{Offset: varOffset}} - } - log.Panicf("Received unexpected type for api.KIDOrVar value %T %v", in.Value, in.Value) - return logentry.KIDOrOffset{} // never gets here -} - -// convertKGObjectOrVar maps from the api to log types for the KGObjectOrVar, -// primarily by converting vars to offsets. The offset describes a KID relative -// to a later assigned log index. -func convertKGObjectOrVar(in *api.KGObjectOrVar, vars map[string]int32) logentry.KGObject { - switch t := in.Value.(type) { - case *api.KGObjectOrVar_Var: - return logwrite.AKIDOffset(vars[t.Var]) - - case *api.KGObjectOrVar_Object: - switch lv := t.Object.Value.(type) { - case *api.KGObject_AString: - return logwrite.AString(lv.AString, t.Object.LangID) - case *api.KGObject_AInt64: - return logwrite.AInt64(lv.AInt64, t.Object.UnitID) - case *api.KGObject_AFloat64: - return logwrite.AFloat64(lv.AFloat64, t.Object.UnitID) - case *api.KGObject_ABool: - return logwrite.ABool(lv.ABool, t.Object.UnitID) - case *api.KGObject_AKID: - return logwrite.AKID(lv.AKID) - case *api.KGObject_ATimestamp: - // Precision is currently defined identically in both api & - // logentry. This function will need to change if that changes. The - // unit test Test_PrecisionSame verifies that the values are - // currently equal. - return logwrite.ATimestamp(lv.ATimestamp.Value, logentry.TimestampPrecision(lv.ATimestamp.Precision), t.Object.UnitID) - } - log.Panicf("Received unexpected type for api.KGObject value %T %v", t.Object.Value, t.Object.Value) - } - log.Panicf("Received unexpected type for api.KGObjectOrVar value %T %v", in.Value, in.Value) - return logentry.KGObject{} // never gets here -} - -// applyUnicodeNormalizationToInsertFactsRequest rewrites string variables and -// Object string literals fields to Unicode normalized string to the -// InsertFactRequest -func applyUnicodeNormalizationToInsertFactsRequest(req *api.InsertFactsRequest) { - for i := range req.NewSubjectVars { - req.NewSubjectVars[i] = unicode.Normalize(req.NewSubjectVars[i]) - } - for i, fact := range req.Facts { - req.Facts[i].FactIDVar = unicode.Normalize(fact.FactIDVar) - if subject, ok := fact.Subject.Value.(*api.KIDOrVar_Var); ok { - subject.Var = unicode.Normalize(subject.Var) - } - if predicate, ok := fact.Predicate.Value.(*api.KIDOrVar_Var); ok { - predicate.Var = unicode.Normalize(predicate.Var) - } - switch object := fact.Object.Value.(type) { - case *api.KGObjectOrVar_Var: - object.Var = unicode.Normalize(object.Var) - case *api.KGObjectOrVar_Object: - if object, ok := object.Object.Value.(*api.KGObject_AString); ok { - object.AString = unicode.Normalize(object.AString) - } - } - } -} diff --git a/src/github.com/ebay/beam/api/impl/insert_facts_test.go b/src/github.com/ebay/beam/api/impl/insert_facts_test.go deleted file mode 100644 index a5ed8ce..0000000 --- a/src/github.com/ebay/beam/api/impl/insert_facts_test.go +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright 2019 eBay Inc. -// Primary authors: Simon Fell, Diego Ongaro, -// Raymond Kroeker, and Sathish Kandasamy. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package impl - -import ( - "testing" - "time" - - "github.com/ebay/beam/api" - "github.com/ebay/beam/logentry" - "github.com/ebay/beam/logentry/logwrite" - "github.com/ebay/beam/msg/kgobject" - "github.com/stretchr/testify/assert" -) - -func Test_TimestampPrecisionSame(t *testing.T) { - assert.Equal(t, logentry.TimestampPrecision_value, api.Precision_value, "API & Logentry defintions are expected to have the same names/values for Precision") -} - -func Test_IdLimit(t *testing.T) { - i := api.InsertFactsRequest{ - NewSubjectVars: make([]string, 1000), - } - assert.EqualError(t, validateInsertFactsRequest(&i), "a single insert is limited to 999 new Ids, this request has 1000 new Subjects & 0 new Facts which is too many") - i = api.InsertFactsRequest{ - NewSubjectVars: make([]string, 500), - Facts: make([]api.InsertFact, 500), - } - assert.EqualError(t, validateInsertFactsRequest(&i), "a single insert is limited to 999 new Ids, this request has 500 new Subjects & 500 new Facts which is too many") -} - -func Test_ObjectRequire(t *testing.T) { - i := api.InsertFactsRequest{ - Facts: []api.InsertFact{ - {}, - }, - } - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[0] Object doesn't specify a variable or an Object value, one or the other is required") - i.Facts[0].Object.Value = &api.KGObjectOrVar_Var{Var: "?bob"} - i.NewSubjectVars = []string{"?bob"} - assert.NoError(t, validateInsertFactsRequest(&i)) -} - -func Test_NewSubjectVarUsed(t *testing.T) { - i := api.InsertFactsRequest{ - NewSubjectVars: []string{"bob"}, - Facts: []api.InsertFact{ - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "bob"}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - }, - } - assert.NoError(t, validateInsertFactsRequest(&i)) - - i.NewSubjectVars[0] = "alice" - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[0] specifies a subject variable 'bob', but that isn't delcared") - - i.Facts[0].Subject = api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 1}} - assert.EqualError(t, validateInsertFactsRequest(&i), "the following variables were declared but not used, all variables must get used: [alice]") -} - -func Test_FactVarsUnique(t *testing.T) { - i := api.InsertFactsRequest{ - Facts: []api.InsertFact{ - { - FactIDVar: "?fact", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - { - FactIDVar: "?fact", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "?fact"}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - }} - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[1] specifies a FactID Variable '?fact', but that was already declared as variable") - i.Facts[1].FactIDVar = "" - assert.NoError(t, validateInsertFactsRequest(&i)) - - i.NewSubjectVars = []string{"?fact"} - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[0] specifies a FactID Variable '?fact', but that was already declared as variable") -} - -func Test_VarUsed(t *testing.T) { - i := api.InsertFactsRequest{ - Facts: []api.InsertFact{ - { - FactIDVar: "?fact", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - }} - assert.EqualError(t, validateInsertFactsRequest(&i), "the following variables were declared but not used, all variables must get used: [?fact]") -} - -func Test_FactIDVarCantBeNewSubjectVar(t *testing.T) { - i := api.InsertFactsRequest{ - NewSubjectVars: []string{"bob"}, - Facts: []api.InsertFact{ - { - FactIDVar: "bob", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 1}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 3}}}}, - }, - }, - } - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[0] specifies a FactID Variable 'bob', but that was already declared as variable") -} - -func Test_PredicateObjectVarNotFact(t *testing.T) { - i := api.InsertFactsRequest{ - Facts: []api.InsertFact{ - { - FactIDVar: "?fact", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "?fact"}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - }} - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[1] specifies a predicate variable '?fact', but that variable is a fact_id and can't be bound to predicate") - - i = api.InsertFactsRequest{ - Facts: []api.InsertFact{ - { - FactIDVar: "?fact", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 1}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 3}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Var{Var: "?fact"}}, - }, - }} - assert.EqualError(t, validateInsertFactsRequest(&i), "fact[1] specifies a object variable '?fact', but that variable is a fact_id and can't be bound to object") -} - -func Test_ConvertAPIInsertToLogCommandNoVars(t *testing.T) { - i := api.InsertFactsRequest{ - Facts: []api.InsertFact{ - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 1}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 3}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 4}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 5}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 4}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 5}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 6}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{ - LangID: 5, - Value: &api.KGObject_AString{AString: "Bob"}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 6}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 7}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{ - UnitID: 8, - Value: &api.KGObject_AInt64{AInt64: 42}}}}, - }, - }, - } - logEntry, vars := convertAPIInsertToLogCommand(&i) - lfb := new(logwrite.InsertFactBuilder) - exp := logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.SID(1).PID(2).OKID(3).Fact(), - lfb.SID(4).PID(5).OKID(4).Fact(), - lfb.SID(5).PID(6).OString("Bob", 5).Fact(), - lfb.SID(6).PID(7).OInt64(42, 8).Fact(), - }} - assert.Equal(t, exp, logEntry) - assert.Empty(t, vars) -} - -func Test_ConvertAPIInsertToLogCommandWithVars(t *testing.T) { - i := api.InsertFactsRequest{ - NewSubjectVars: []string{"?bob"}, - Facts: []api.InsertFact{ - { - FactIDVar: "?f", - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "?bob"}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 3}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: "?f"}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 4}}}}, - }, - { - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 5}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Var{Var: "?bob"}}, - }, - }, - } - logEntry, vars := convertAPIInsertToLogCommand(&i) - lfb := new(logwrite.InsertFactBuilder) - exp := logentry.InsertTxCommand{ - Facts: []logentry.InsertFact{ - lfb.FactID(2).SOffset(1).PID(2).OKID(3).Fact(), - lfb.FactID(3).SOffset(2).PID(2).OKID(4).Fact(), - lfb.FactID(4).SID(5).PID(2).OOffset(1).Fact(), - }} - assert.Equal(t, exp, logEntry) - assert.Equal(t, map[string]int32{"?bob": 1, "?f": 2}, vars) -} - -func Test_TooManyOffsets(t *testing.T) { - insert := api.InsertFactsRequest{ - Facts: make([]api.InsertFact, 1000), - } - for i := range insert.Facts { - insert.Facts[i] = api.InsertFact{ - Subject: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 1}}, - Predicate: api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: 2}}, - Object: api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{ - Object: &api.KGObject{Value: &api.KGObject_AKID{AKID: 3}}}}, - } - } - assert.Panics(t, func() { convertAPIInsertToLogCommand(&insert) }) -} - -func Test_ConvertKGObject(t *testing.T) { - type tc struct { - in api.KGObjectOrVar - exp logentry.KGObject - } - obj := func(obj api.KGObject) api.KGObjectOrVar { - return api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{Object: &obj}} - } - vr := func(name string) api.KGObjectOrVar { - return api.KGObjectOrVar{Value: &api.KGObjectOrVar_Var{Var: name}} - } - vars := map[string]int32{ - "bob": 3, - } - tests := []tc{ - {vr("bob"), logwrite.AKIDOffset(3)}, - {obj(kgobject.AKID(44)), logwrite.AKID(44)}, - {obj(kgobject.AString("Bob", 0)), logwrite.AString("Bob", 0)}, - {obj(kgobject.AString("Bob", 5)), logwrite.AString("Bob", 5)}, - {obj(kgobject.AInt64(42, 0)), logwrite.AInt64(42, 0)}, - {obj(kgobject.AInt64(42, 3)), logwrite.AInt64(42, 3)}, - {obj(kgobject.AFloat64(42.2, 0)), logwrite.AFloat64(42.2, 0)}, - {obj(kgobject.AFloat64(42.2, 4)), logwrite.AFloat64(42.2, 4)}, - {obj(kgobject.ABool(true, 0)), logwrite.ABool(true, 0)}, - {obj(kgobject.ABool(true, 3)), logwrite.ABool(true, 3)}, - {obj(kgobject.ATimestampYM(2018, 6, 44)), logwrite.ATimestamp(time.Date(2018, time.Month(6), 1, 0, 0, 0, 0, time.UTC), logentry.Month, 44)}, - {obj(kgobject.ATimestampYM(2018, 6, 0)), logwrite.ATimestamp(time.Date(2018, time.Month(6), 1, 0, 0, 0, 0, time.UTC), logentry.Month, 0)}, - } - for _, test := range tests { - t.Run(test.in.String(), func(t *testing.T) { - act := convertKGObjectOrVar(&test.in, vars) - assert.Equal(t, test.exp, act) - }) - } -} diff --git a/src/github.com/ebay/beam/tools/beam-client/bc.go b/src/github.com/ebay/beam/tools/beam-client/bc.go index 45ffef7..291d753 100644 --- a/src/github.com/ebay/beam/tools/beam-client/bc.go +++ b/src/github.com/ebay/beam/tools/beam-client/bc.go @@ -45,7 +45,6 @@ const usage = `beam-client is a command-line tool for calling the Beam API servi Usage: beam-client [--api=HOST -t=DUR --trace=HOST] query [-i=NUM] FILE beam-client [--api=HOST -t=DUR --trace=HOST --format=FORMAT] insert FILE - beam-client [--api=HOST -t=DUR --trace=HOST] insertfact [--new=VAR] SUBJECT PREDICATE OBJECT beam-client [--api=HOST -t=DUR --trace=HOST] queryfacts [-i=NUM] [--noextids] [QUERYSTRING] beam-client [--api=HOST -t=DUR --trace=HOST] lookupsp [-i=NUM] SUBJECT PREDICATE beam-client [--api=HOST -t=DUR --trace=HOST] lookuppo [-i=NUM] PREDICATE OBJECT @@ -54,7 +53,6 @@ Usage: Options: --api=HOST Host and port of Beam API server to connect to [default: localhost:9987] -t=DUR, --timeout=DUR Timeout for RPC calls to Beam API tier [default: 10s] - --new=VAR Create a new entity to reference in the inserted fact. -i=NUM, --index=NUM Log index to specify in request. -w=WAITFOR, --waitfor=WAITFOR Duration to wait for wipe to complete. --noextids Don't resolve/print KID externalIDs in fact results. @@ -62,13 +60,6 @@ Options: --format=FORMAT File format for inserting facts [default: tsv] Examples: - # To create a fact ' ', first create KIDs for 'car1', 'fits' and 'part1' - # by using HasExternalID(#4) predicate, and then create a fact with the KIDs. - beam-client insertfact --new=c '?c' '#4' '"car1"' # returned VarResults: 1430001 - beam-client insertfact --new=f '?f' '#4' '"fits"' # returned VarResults: 1454001 - beam-client insertfact --new=p '?p' '#4' '"part1"' # returned VarResults: 1487001 - beam-client insertfact '#1430001' '#1454001' '#1487001' - # Multiple line insert fact usage. beam-client insert - < @@ -127,15 +118,6 @@ type options struct { Filename string `docopt:"FILE"` Format string - // InsertFact - InsertFact bool `docopt:"insertfact"` - LangID uint64 `docopt:"--langid"` - UnitID uint64 `docopt:"--unitid"` - NewSubjectVar string `docopt:"--new"` - InsertSubject api.KIDOrVar - InsertPredicate api.KIDOrVar - InsertObject api.KGObjectOrVar - // Wipe Wipe bool WipeWaitFor time.Duration @@ -166,28 +148,6 @@ func parseArgs() *options { if options.Timeout == 0 { options.Timeout = time.Hour } - if options.InsertFact { - options.InsertSubject, err = parseKIDOrVar(options.SubjectString) - } else { - options.Subject, err = parseLiteralID(options.SubjectString) - } - if err != nil { - log.Fatalf("Unable to parse subject value: %v", err) - } - if options.InsertFact { - options.InsertPredicate, err = parseKIDOrVar(options.PredicateString) - } else { - options.Predicate, err = parseLiteralID(options.PredicateString) - } - if err != nil { - log.Fatalf("Unable to parse predicate value: %v", err) - } - if options.InsertFact { - options.InsertObject, err = parseKGObjectOrVar(options.ObjectString) - } - if err != nil { - log.Fatalf("Unable to parse object value: %v", err) - } if options.Wipe { if options.WipeWaitForString != "" { options.WipeWaitFor, err = time.ParseDuration(options.WipeWaitForString) @@ -199,44 +159,6 @@ func parseArgs() *options { return &options } -// parseKIDOrVar parses the query term and returns the api object or variable for 'insert' -func parseKIDOrVar(in string) (api.KIDOrVar, error) { - term, err := parser.ParseTerm(in) - if err != nil { - return api.KIDOrVar{}, err - } - if variable, ok := term.(*parser.Variable); ok { - return api.KIDOrVar{Value: &api.KIDOrVar_Var{Var: variable.Name}}, nil - } - if literalID, ok := term.(*parser.LiteralID); ok { - return api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: literalID.Value}}, nil - } - if literalInt, ok := term.(*parser.LiteralInt); ok { - if literalInt.Value > 0 { - return api.KIDOrVar{Value: &api.KIDOrVar_Kid{Kid: uint64(literalInt.Value)}}, nil - } - } - return api.KIDOrVar{}, fmt.Errorf("invalid kid or var: '%s' %T", in, term) -} - -// parseKGObjectOrVar parses the query term and returns the api object or variable for 'insert' -func parseKGObjectOrVar(in string) (api.KGObjectOrVar, error) { - term, err := parser.ParseTerm(in) - if err != nil { - return api.KGObjectOrVar{}, err - } - if variable, ok := term.(*parser.Variable); ok { - return api.KGObjectOrVar{Value: &api.KGObjectOrVar_Var{Var: variable.Name}}, nil - } - // TODO Once language and units semantics are finalized and implemented, - // need to make it available at the client side. - if literal, ok := term.(parser.Literal); ok { - obj := literal.Literal() - return api.KGObjectOrVar{Value: &api.KGObjectOrVar_Object{Object: obj}}, nil - } - return api.KGObjectOrVar{}, fmt.Errorf("invalid kgobject or var: '%s'", in) -} - func main() { debuglog.Configure(debuglog.Options{}) options := parseArgs() @@ -277,11 +199,6 @@ func main() { if err != nil { log.Fatalf("Error executing Insert: %v", err) } - case options.InsertFact: - err := insertFact(timeoutCtx, factStore, options) - if err != nil { - log.Fatalf("Error executing InsertFact: %v", err) - } case options.LookupPO: err := lookupPO(timeoutCtx, factStore, options) if err != nil { @@ -410,24 +327,3 @@ func (k *kidPrinter) format(kid uint64) string { } return fmtr.Sprintf("%d", kid) } - -// parseLiteralID parses an input string into a KID or returns an error. -func parseLiteralID(in string) (parser.LiteralID, error) { - if in == "" { - return parser.LiteralID{}, nil - } - literal, err := parser.ParseLiteral(in) - if err != nil { - return parser.LiteralID{}, err - } - if lint, ok := literal.(*parser.LiteralInt); ok { - if lint.Value > 0 { - return parser.LiteralID{Value: uint64(lint.Value)}, nil - } - return parser.LiteralID{}, fmt.Errorf("unable to parse literal id: '%s'", in) - } - if lid, ok := literal.(*parser.LiteralID); ok { - return *lid, nil - } - return parser.LiteralID{}, fmt.Errorf("invalid literal id: '%s'", in) -} diff --git a/src/github.com/ebay/beam/tools/beam-client/insert.go b/src/github.com/ebay/beam/tools/beam-client/insert.go index 38ab8ce..1921efe 100644 --- a/src/github.com/ebay/beam/tools/beam-client/insert.go +++ b/src/github.com/ebay/beam/tools/beam-client/insert.go @@ -62,32 +62,3 @@ func readFile(filename string) (string, error) { input, err := ioutil.ReadFile(filename) return string(input), err } - -func insertFact(ctx context.Context, store api.BeamFactStoreClient, options *options) error { - var v []string - if options.NewSubjectVar != "" { - v = []string{options.NewSubjectVar} - } - req := api.InsertFactsRequest{ - NewSubjectVars: v, - Facts: []api.InsertFact{ - { - Subject: options.InsertSubject, - Predicate: options.InsertPredicate, - Object: options.InsertObject, - }, - }, - } - log.Infof("Invoking InsertFacts (%d vars, 1 fact): %+v", - len(req.NewSubjectVars), req) - - start := time.Now() - resp, err := store.InsertFacts(ctx, &req) - if err != nil { - return err - } - log.Infof("InsertFacts returned: %+v", resp) - log.Infof("InsertFacts took %s", time.Since(start)) - - return nil -}