Skip to content

Commit 3c6b10e

Browse files
committed
Upload "hit graphlet" and "hit value" lookup result entries
The next step will be to write initial lookup result entries, which describe the dependencies of a key.
1 parent 272df39 commit 3c6b10e

2 files changed

Lines changed: 194 additions & 6 deletions

File tree

pkg/model/evaluation/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
"@org_golang_google_protobuf//proto",
4747
"@org_golang_google_protobuf//types/known/anypb",
4848
"@org_golang_google_protobuf//types/known/timestamppb",
49+
"@org_golang_x_sync//errgroup",
4950
],
5051
)
5152

pkg/model/evaluation/recursive_computer.go

Lines changed: 193 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/buildbarn/bb-storage/pkg/clock"
2828
"github.com/buildbarn/bb-storage/pkg/util"
2929

30+
"golang.org/x/sync/errgroup"
3031
"google.golang.org/grpc/codes"
3132
"google.golang.org/grpc/status"
3233
"google.golang.org/protobuf/proto"
@@ -1400,6 +1401,7 @@ func (vs *variableDependenciesComputedValueState[TReference, TMetadata]) buildGr
14001401
variableDependencyValueState: vs.variableDependencyValueState,
14011402
graphlet: graphlet,
14021403
hoistedVariableDependencies: vs.hoistedVariableDependencies,
1404+
directVariableDependencies: vs.directVariableDependencies,
14031405
}, graphlet, nil
14041406
}
14051407

@@ -1409,6 +1411,7 @@ type variableDependenciesMarshaledValueState[TReference object.BasicReference, T
14091411

14101412
graphlet model_core.Message[*model_evaluation_pb.Graphlet, TReference]
14111413
hoistedVariableDependencies []*KeyState[TReference, TMetadata]
1414+
directVariableDependencies []*KeyState[TReference, TMetadata]
14121415
}
14131416

14141417
func (vs *variableDependenciesMarshaledValueState[TReference, TMetadata]) getHoistedDependencies() []*KeyState[TReference, TMetadata] {
@@ -1568,7 +1571,7 @@ func (vs initialMessageValueState[TReference, TMetadata]) evaluate(ctx context.C
15681571
// dependencies. This means that the initial lookup
15691572
// returned a value immediately.
15701573
return &noDependenciesUploadedMessageValueState[TReference, TMetadata]{
1571-
lookupResultReference: rootLookupResultReference,
1574+
hitValueLookupResultReference: rootLookupResultReference,
15721575
}, nil
15731576
default:
15741577
// Malformed cache entry.
@@ -1732,7 +1735,7 @@ func (vs *noDependenciesComputedMessageValueState[TReference, TMetadata]) upload
17321735
// of the LookupResult. This prevents the need for keeping the
17331736
// value in memory.
17341737
return &noDependenciesUploadedMessageValueState[TReference, TMetadata]{
1735-
lookupResultReference: model_core.CopyDecodable(rootTagKeyHash, rootLookupResultReference),
1738+
hitValueLookupResultReference: model_core.CopyDecodable(rootTagKeyHash, rootLookupResultReference),
17361739
}, nil
17371740
}
17381741

@@ -1776,8 +1779,142 @@ type variableDependenciesMarshaledMessageValueState[TReference object.BasicRefer
17761779
}
17771780

17781781
func (vs *variableDependenciesMarshaledMessageValueState[TReference, TMetadata]) upload(ctx context.Context, rc *RecursiveComputer[TReference, TMetadata], ks *KeyState[TReference, TMetadata]) (valueState[TReference, TMetadata], error) {
1779-
// TODO: Implement uploading!
1780-
return vs, nil
1782+
nextValueState := valueState[TReference, TMetadata](vs)
1783+
group, groupCtx := errgroup.WithContext(ctx)
1784+
1785+
// Upload the initial lookup result entry, and missing
1786+
// dependencies lookup result entries if needed.
1787+
group.Go(func() error {
1788+
rootTagKeyHash, err := rc.getCacheLookupTagKeyHash(ks, nil)
1789+
if err != nil {
1790+
return err
1791+
}
1792+
if _, err = model_tag.ResolveDecodableTag(groupCtx, rc.tagStore, rootTagKeyHash); err == nil {
1793+
return errors.New("merging of lookup results is not implemented yet")
1794+
} else if status.Code(err) != codes.NotFound {
1795+
return err
1796+
}
1797+
1798+
// TODO: Implement!
1799+
return nil
1800+
})
1801+
1802+
// Upload the "hit graphlet" lookup result entry.
1803+
group.Go(func() error {
1804+
hoistedDependenciesHasher := lthash.NewHasher()
1805+
rc.lock.RLock()
1806+
for _, ksDep := range vs.hoistedVariableDependencies {
1807+
hoistedDependenciesHasher.Add(ksDep.valueState.getDependenciesHashRecordReference().GetRawReference())
1808+
}
1809+
rc.lock.RUnlock()
1810+
dependenciesHash := hoistedDependenciesHasher.Sum()
1811+
hitGraphletTagKeyHash, err := rc.getCacheLookupTagKeyHash(ks, &model_evaluation_cache_pb.LookupTagKeyData_SubsequentLookup{
1812+
Scope: model_evaluation_cache_pb.LookupTagKeyData_SubsequentLookup_GRAPHLET,
1813+
DependenciesHash: dependenciesHash[:],
1814+
})
1815+
if err != nil {
1816+
return err
1817+
}
1818+
1819+
createdHitGraphletLookupResult, err := model_core.MarshalAndEncodeKeyed(
1820+
model_core.MustBuildPatchedMessage(
1821+
func(patcher *model_core.ReferenceMessagePatcher[TMetadata]) encoding.BinaryMarshaler {
1822+
return model_core.NewProtoBinaryMarshaler(
1823+
&model_evaluation_cache_pb.LookupResult{
1824+
Result: &model_evaluation_cache_pb.LookupResult_HitGraphlet{
1825+
HitGraphlet: model_core.Patch(
1826+
rc.objectManager,
1827+
vs.graphlet,
1828+
).Merge(patcher),
1829+
},
1830+
},
1831+
)
1832+
},
1833+
),
1834+
rc.referenceFormat,
1835+
rc.cacheKeyedEncoder,
1836+
hitGraphletTagKeyHash.GetDecodingParameters(),
1837+
)
1838+
if err != nil {
1839+
return err
1840+
}
1841+
hitGraphletLookupResultMetadataEntry, err := createdHitGraphletLookupResult.Capture(groupCtx, rc.objectManager)
1842+
if err != nil {
1843+
return err
1844+
}
1845+
hitGraphletLookupResultReference := rc.objectManager.ReferenceObject(hitGraphletLookupResultMetadataEntry)
1846+
1847+
// Subsequent attempts to access the graphlet may use
1848+
// the copy that has been written to storage.
1849+
nextValueState = &variableDependenciesUploadedMessageValueState[TReference, TMetadata]{
1850+
variableDependencyValueState: vs.variableDependencyValueState,
1851+
hitGraphletLookupResultReference: model_core.CopyDecodable(hitGraphletTagKeyHash, hitGraphletLookupResultReference),
1852+
hoistedVariableDependencies: vs.hoistedVariableDependencies,
1853+
}
1854+
return rc.tagStore.UpdateTag(groupCtx, hitGraphletTagKeyHash.Value, hitGraphletLookupResultReference)
1855+
})
1856+
1857+
// Upload the "hit value" lookup result entry.
1858+
group.Go(func() error {
1859+
// Only write it if the value's dependencies differ from
1860+
// the graphlet's. Given that we always attempt to look
1861+
// up the graphlet first, a "hit value" lookup result
1862+
// entry having the same dependencies would never be
1863+
// requested.
1864+
if slices.Equal(vs.hoistedVariableDependencies, vs.directVariableDependencies) {
1865+
return nil
1866+
}
1867+
directDependenciesHasher := lthash.NewHasher()
1868+
rc.lock.RLock()
1869+
for _, ksDep := range vs.directVariableDependencies {
1870+
directDependenciesHasher.Add(ksDep.valueState.getDependenciesHashRecordReference().GetRawReference())
1871+
}
1872+
rc.lock.RUnlock()
1873+
dependenciesHash := directDependenciesHasher.Sum()
1874+
hitValuetTagKeyHash, err := rc.getCacheLookupTagKeyHash(ks, &model_evaluation_cache_pb.LookupTagKeyData_SubsequentLookup{
1875+
Scope: model_evaluation_cache_pb.LookupTagKeyData_SubsequentLookup_VALUE,
1876+
DependenciesHash: dependenciesHash[:],
1877+
})
1878+
if err != nil {
1879+
return err
1880+
}
1881+
1882+
evaluation, err := GraphletGetEvaluation(ctx, rc.evaluationReader, vs.graphlet)
1883+
if err != nil {
1884+
return err
1885+
}
1886+
createdHitValuetLookupResult, err := model_core.MarshalAndEncodeKeyed(
1887+
model_core.MustBuildPatchedMessage(
1888+
func(patcher *model_core.ReferenceMessagePatcher[TMetadata]) encoding.BinaryMarshaler {
1889+
return model_core.NewProtoBinaryMarshaler(
1890+
&model_evaluation_cache_pb.LookupResult{
1891+
Result: &model_evaluation_cache_pb.LookupResult_HitValue{
1892+
HitValue: model_core.Patch(
1893+
rc.objectManager,
1894+
model_core.Nested(evaluation, evaluation.Message.Value),
1895+
).Merge(patcher),
1896+
},
1897+
},
1898+
)
1899+
},
1900+
),
1901+
rc.referenceFormat,
1902+
rc.cacheKeyedEncoder,
1903+
hitValuetTagKeyHash.GetDecodingParameters(),
1904+
)
1905+
if err != nil {
1906+
return err
1907+
}
1908+
hitValuetLookupResultMetadataEntry, err := createdHitValuetLookupResult.Capture(groupCtx, rc.objectManager)
1909+
if err != nil {
1910+
return err
1911+
}
1912+
hitValuetLookupResultReference := rc.objectManager.ReferenceObject(hitValuetLookupResultMetadataEntry)
1913+
return rc.tagStore.UpdateTag(groupCtx, hitValuetTagKeyHash.Value, hitValuetLookupResultReference)
1914+
})
1915+
1916+
err := group.Wait()
1917+
return nextValueState, err
17811918
}
17821919

17831920
func (vs *variableDependenciesMarshaledMessageValueState[TReference, TMetadata]) getGraphlet(ctx context.Context, rc *RecursiveComputer[TReference, TMetadata], ks *KeyState[TReference, TMetadata]) (valueState[TReference, TMetadata], model_core.Message[*model_evaluation_pb.Graphlet, TReference], error) {
@@ -1803,15 +1940,15 @@ func (variableDependenciesMarshaledMessageValueState[TReference, TMetadata]) get
18031940
type noDependenciesUploadedMessageValueState[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] struct {
18041941
noDependenciesValueState[TReference, TMetadata]
18051942

1806-
lookupResultReference model_core.Decodable[TReference]
1943+
hitValueLookupResultReference model_core.Decodable[TReference]
18071944
}
18081945

18091946
func (vs *noDependenciesUploadedMessageValueState[TReference, TMetadata]) upload(ctx context.Context, rc *RecursiveComputer[TReference, TMetadata], ks *KeyState[TReference, TMetadata]) (valueState[TReference, TMetadata], error) {
18101947
return vs, nil
18111948
}
18121949

18131950
func (vs *noDependenciesUploadedMessageValueState[TReference, TMetadata]) getMessageValue(ctx context.Context, rc *RecursiveComputer[TReference, TMetadata]) (model_core.TopLevelMessage[*anypb.Any, TReference], error) {
1814-
rootLookupResult, err := rc.lookupResultReader.ReadObject(ctx, vs.lookupResultReference)
1951+
rootLookupResult, err := rc.lookupResultReader.ReadObject(ctx, vs.hitValueLookupResultReference)
18151952
if err != nil {
18161953
return model_core.TopLevelMessage[*anypb.Any, TReference]{}, err
18171954
}
@@ -1831,6 +1968,56 @@ func (noDependenciesUploadedMessageValueState[TReference, TMetadata]) getError()
18311968
return nil
18321969
}
18331970

1971+
type variableDependenciesUploadedMessageValueState[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] struct {
1972+
uploadedValueState[TReference, TMetadata]
1973+
variableDependencyValueState[TReference, TMetadata]
1974+
1975+
hitGraphletLookupResultReference model_core.Decodable[TReference]
1976+
hoistedVariableDependencies []*KeyState[TReference, TMetadata]
1977+
}
1978+
1979+
func (vs *variableDependenciesUploadedMessageValueState[TReference, TMetadata]) getMessageValue(ctx context.Context, rc *RecursiveComputer[TReference, TMetadata]) (model_core.TopLevelMessage[*anypb.Any, TReference], error) {
1980+
hitGraphletLookupResultReference, err := rc.lookupResultReader.ReadObject(ctx, vs.hitGraphletLookupResultReference)
1981+
if err != nil {
1982+
return model_core.TopLevelMessage[*anypb.Any, TReference]{}, err
1983+
}
1984+
evaluation, err := GraphletGetEvaluation(
1985+
ctx,
1986+
rc.evaluationReader,
1987+
model_core.Nested(
1988+
hitGraphletLookupResultReference,
1989+
hitGraphletLookupResultReference.Message.Result.(*model_evaluation_cache_pb.LookupResult_HitGraphlet).HitGraphlet,
1990+
),
1991+
)
1992+
if err != nil {
1993+
return model_core.TopLevelMessage[*anypb.Any, TReference]{}, err
1994+
}
1995+
return model_core.FlattenAny(model_core.Nested(evaluation, evaluation.Message.Value))
1996+
}
1997+
1998+
func (variableDependenciesUploadedMessageValueState[TReference, TMetadata]) getNativeValue() (any, error) {
1999+
return nil, errors.New("key does not yield a native value")
2000+
}
2001+
2002+
func (variableDependenciesUploadedMessageValueState[TReference, TMetadata]) getError() error {
2003+
return nil
2004+
}
2005+
2006+
func (vs *variableDependenciesUploadedMessageValueState[TReference, TMetadata]) getGraphlet(ctx context.Context, rc *RecursiveComputer[TReference, TMetadata], ks *KeyState[TReference, TMetadata]) (valueState[TReference, TMetadata], model_core.Message[*model_evaluation_pb.Graphlet, TReference], error) {
2007+
hitGraphletLookupResultReference, err := rc.lookupResultReader.ReadObject(ctx, vs.hitGraphletLookupResultReference)
2008+
if err != nil {
2009+
return vs, model_core.Message[*model_evaluation_pb.Graphlet, TReference]{}, err
2010+
}
2011+
return vs, model_core.Nested(
2012+
hitGraphletLookupResultReference,
2013+
hitGraphletLookupResultReference.Message.Result.(*model_evaluation_cache_pb.LookupResult_HitGraphlet).HitGraphlet,
2014+
), nil
2015+
}
2016+
2017+
func (vs *variableDependenciesUploadedMessageValueState[TReference, TMetadata]) getHoistedDependencies() []*KeyState[TReference, TMetadata] {
2018+
return vs.hoistedVariableDependencies
2019+
}
2020+
18342021
type overriddenMessageValueState[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] struct {
18352022
uploadedValueState[TReference, TMetadata]
18362023
variableDependencyValueState[TReference, TMetadata]

0 commit comments

Comments
 (0)