Skip to content

Commit 74e72c4

Browse files
committed
Fixes 4934: module streams for introspected repositories
1 parent f3dc32a commit 74e72c4

21 files changed

Lines changed: 896 additions & 70 deletions

.mockery.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ packages:
6060
TemplateDao:
6161
config:
6262
filename: "templates_mock.go"
63-
ModuleStreamsDao:
63+
ModuleStreamDao:
6464
config:
6565
filename: "modules_streams_mock.go"

db/migrations.latest

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
20241203143614
1+
20250107150808
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
BEGIN;
2+
3+
DROP TABLE IF EXISTS module_streams, repositories_module_streams;
4+
5+
COMMIT;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
BEGIN;
2+
3+
CREATE TABLE IF NOT EXISTS module_streams (
4+
uuid UUID UNIQUE NOT NULL PRIMARY KEY,
5+
created_at TIMESTAMP WITH TIME ZONE,
6+
updated_at TIMESTAMP WITH TIME ZONE,
7+
name text NOT NULL,
8+
stream text NOT NULL,
9+
version text NOT NULL,
10+
context text NOT NULL,
11+
arch text NOT NULL,
12+
summary text NOT NULL,
13+
description text NOT NULL,
14+
package_names text[] NOT NULL,
15+
packages text[] NOT NULL,
16+
hash_value text NOT NULL,
17+
profiles jsonb NOT NULL DEFAULT '{}'::jsonb
18+
);
19+
20+
CREATE TABLE IF NOT EXISTS repositories_module_streams (
21+
repository_uuid UUID NOT NULL,
22+
module_stream_uuid UUID NOT NULL
23+
);
24+
25+
CREATE INDEX IF NOT EXISTS module_streams_pkgs_idx ON module_streams USING GIN (package_names);
26+
CREATE INDEX IF NOT EXISTS module_streams_name_idx ON module_streams (uuid, name);
27+
28+
ALTER TABLE ONLY repositories_module_streams
29+
DROP CONSTRAINT IF EXISTS repositories_module_streams_pkey,
30+
ADD CONSTRAINT repositories_module_streams_pkey PRIMARY KEY (repository_uuid, module_stream_uuid);
31+
32+
ALTER TABLE ONLY repositories_module_streams
33+
DROP CONSTRAINT IF EXISTS fk_repositories_module_streams_mstream,
34+
ADD CONSTRAINT fk_repositories_module_streams_mstream
35+
FOREIGN KEY (module_stream_uuid) REFERENCES module_streams(uuid)
36+
ON DELETE CASCADE;
37+
38+
ALTER TABLE ONLY repositories_module_streams
39+
DROP CONSTRAINT IF EXISTS fk_repositories_module_streams_repository,
40+
ADD CONSTRAINT fk_repositories_module_streams_repository
41+
FOREIGN KEY (repository_uuid) REFERENCES repositories(uuid)
42+
ON DELETE CASCADE;
43+
44+
ALTER TABLE ONLY module_streams
45+
DROP CONSTRAINT IF EXISTS fk_module_streams_uniq,
46+
ADD CONSTRAINT fk_module_streams_uniq UNIQUE (hash_value);
47+
48+
COMMIT;

go.mod

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
module github.com/content-services/content-sources-backend
22

3-
go 1.22.7
3+
go 1.23
4+
5+
toolchain go1.23.4
6+
7+
replace github.com/content-services/yummy => /home/jlsherri/git/yummy/
8+
9+
replace github.com/content-services/tang => /home/jlsherri/git/tang/
410

511
require (
612
github.com/ProtonMail/go-crypto v1.1.3

pkg/api/module_streams.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ type SearchSnapshotModuleStreamsRequest struct {
77
Search string `json:"search"` // Search string to search module names
88
}
99

10+
type SearchModuleStreamsRequest struct {
11+
UUIDs []string `json:"uuids" validate:"required"` // List of repository UUIDs to search
12+
URLs []string `json:"urls" validate:"required"` // List of repository URLs to search
13+
RpmNames []string `json:"rpm_names" validate:"required"` // List of rpm names to search
14+
SortBy string `json:"sort_by"` // SortBy sets the sort order of the result
15+
Search string `json:"search"` // Search string to search rpm names
16+
}
17+
1018
type Stream struct {
1119
Name string `json:"name"` // Name of the module
1220
Stream string `json:"stream"` // Module stream version

pkg/dao/interfaces.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ type DaoRegistry struct {
2222
AdminTask AdminTaskDao
2323
Domain DomainDao
2424
PackageGroup PackageGroupDao
25+
ModuleStream ModuleStreamDao
2526
Environment EnvironmentDao
2627
Template TemplateDao
2728
Uploads UploadDao
28-
ModuleStreams ModuleStreamsDao
2929
}
3030

3131
func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
@@ -38,9 +38,9 @@ func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
3838
Rpm: &rpmDaoImpl{
3939
db: db,
4040
},
41-
ModuleStreams: &moduleStreamsImpl{db: db},
42-
Repository: repositoryDaoImpl{db: db},
43-
Metrics: metricsDaoImpl{db: db},
41+
ModuleStream: &moduleStreamsImpl{db: db},
42+
Repository: repositoryDaoImpl{db: db},
43+
Metrics: metricsDaoImpl{db: db},
4444
Snapshot: &snapshotDaoImpl{
4545
db: db,
4646
pulpClient: pulp_client.GetPulpClientWithDomain(""),
@@ -82,8 +82,11 @@ type RepositoryConfigDao interface {
8282
BulkImport(ctx context.Context, reposToImport []api.RepositoryRequest) ([]api.RepositoryImportResponse, []error)
8383
}
8484

85-
type ModuleStreamsDao interface {
85+
type ModuleStreamDao interface {
86+
SearchRepositoryModuleStreams(ctx context.Context, orgID string, request api.SearchModuleStreamsRequest) ([]api.SearchModuleStreams, error)
8687
SearchSnapshotModuleStreams(ctx context.Context, orgID string, request api.SearchSnapshotModuleStreamsRequest) ([]api.SearchModuleStreams, error)
88+
InsertForRepository(ctx context.Context, repoUuid string, pkgGroups []yum.ModuleMD) (int64, error)
89+
OrphanCleanup(ctx context.Context) error
8790
}
8891

8992
type RpmDao interface {

pkg/dao/module_streams.go

Lines changed: 231 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,115 @@ package dao
33
import (
44
"context"
55
"fmt"
6+
"strings"
67

78
"github.com/content-services/content-sources-backend/pkg/api"
89
"github.com/content-services/content-sources-backend/pkg/config"
910
ce "github.com/content-services/content-sources-backend/pkg/errors"
11+
"github.com/content-services/content-sources-backend/pkg/models"
1012
"github.com/content-services/tang/pkg/tangy"
13+
"github.com/content-services/yummy/pkg/yum"
14+
"github.com/lib/pq"
15+
"golang.org/x/exp/slices"
1116
"gorm.io/gorm"
17+
"gorm.io/gorm/clause"
1218
)
1319

20+
func GetModuleStreamsDao(db *gorm.DB) ModuleStreamDao {
21+
// Return DAO instance
22+
return &moduleStreamsImpl{db: db}
23+
}
24+
1425
type moduleStreamsImpl struct {
1526
db *gorm.DB
1627
}
1728

18-
func GetModuleStreamsDao(db *gorm.DB) ModuleStreamsDao {
19-
// Return DAO instance
20-
return &moduleStreamsImpl{
21-
db: db,
29+
func (r *moduleStreamsImpl) SearchRepositoryModuleStreams(ctx context.Context, orgID string, request api.SearchModuleStreamsRequest) (resp []api.SearchModuleStreams, err error) {
30+
if orgID == "" {
31+
return resp, fmt.Errorf("orgID can not be an empty string")
32+
}
33+
dbWithCtx := r.db.WithContext(ctx)
34+
if request.RpmNames == nil {
35+
request.RpmNames = []string{}
36+
}
37+
if len(request.UUIDs) == 0 && len(request.URLs) == 0 {
38+
return resp, &ce.DaoError{
39+
BadValidation: true,
40+
Message: "must contain at least 1 Repository UUID or URL",
41+
}
42+
}
43+
44+
uuids := []string{}
45+
if request.UUIDs != nil {
46+
uuids = request.UUIDs
47+
}
48+
49+
urls := []string{}
50+
for _, url := range request.URLs {
51+
url = models.CleanupURL(url)
52+
urls = append(urls, url)
53+
}
54+
55+
uuidsValid, urlsValid, uuid, url := checkForValidRepoUuidsUrls(ctx, uuids, urls, r.db)
56+
if !uuidsValid {
57+
return resp, &ce.DaoError{
58+
NotFound: true,
59+
Message: "Could not find repository with UUID: " + uuid,
60+
}
61+
}
62+
if !urlsValid {
63+
return resp, &ce.DaoError{
64+
NotFound: true,
65+
Message: "Could not find repository with URL: " + url,
66+
}
67+
}
68+
69+
streams := []models.ModuleStream{}
70+
71+
newestStreams := dbWithCtx.Model(&models.ModuleStream{}).
72+
Select("DISTINCT ON (name, stream) uuid").
73+
Joins("inner join repositories_module_streams on module_streams.uuid = repositories_module_streams.module_stream_uuid").
74+
Where("repositories_module_streams.repository_uuid in (?)", readableRepositoryQuery(dbWithCtx, orgID, urls, uuids))
75+
76+
if len(request.RpmNames) > 0 {
77+
// we are checking if two arrays have things in common, so we have to conver to pq array type
78+
newestStreams = newestStreams.Where("module_streams.package_names && ?", pq.Array(request.RpmNames))
79+
}
80+
if request.Search != "" {
81+
newestStreams = newestStreams.Where("module_streams.name ilike ?", fmt.Sprintf("%%%s%%", request.Search))
82+
}
83+
newestStreams = newestStreams.Order("name, stream, version DESC")
84+
85+
order := convertSortByToSQL(request.SortBy, map[string]string{"name": "name"}, "name asc")
86+
result := dbWithCtx.Model(&models.ModuleStream{}).Where("uuid in (?)", newestStreams).Order(fmt.Sprintf("%v, stream", order)).Find(&streams)
87+
88+
if result.Error != nil {
89+
return resp, result.Error
90+
}
91+
return ModuleStreamsToCollectionResponse(streams), nil
92+
}
93+
94+
func ModuleStreamsToCollectionResponse(modules []models.ModuleStream) (response []api.SearchModuleStreams) {
95+
mapping := make(map[string][]api.Stream)
96+
for _, mod := range modules {
97+
mapping[mod.Name] = append(mapping[mod.Name], api.Stream{
98+
Name: mod.Name,
99+
Stream: mod.Stream,
100+
Context: mod.Context,
101+
Arch: mod.Arch,
102+
Version: mod.Version,
103+
Description: mod.Description,
104+
Profiles: mod.Profiles,
105+
})
106+
}
107+
108+
for k, v := range mapping {
109+
response = append(response, api.SearchModuleStreams{
110+
ModuleName: k,
111+
Streams: v,
112+
})
22113
}
114+
return response
23115
}
24116

25117
func (r *moduleStreamsImpl) SearchSnapshotModuleStreams(ctx context.Context, orgID string, request api.SearchSnapshotModuleStreamsRequest) ([]api.SearchModuleStreams, error) {
@@ -95,3 +187,138 @@ func (r *moduleStreamsImpl) SearchSnapshotModuleStreams(ctx context.Context, org
95187

96188
return response, nil
97189
}
190+
191+
func (r moduleStreamsImpl) fetchRepo(ctx context.Context, uuid string) (models.Repository, error) {
192+
found := models.Repository{}
193+
if err := r.db.WithContext(ctx).
194+
Where("UUID = ?", uuid).
195+
First(&found).
196+
Error; err != nil {
197+
return found, err
198+
}
199+
return found, nil
200+
}
201+
202+
// Converts an rpm NVREA into just the name
203+
func extractRpmName(nvrea string) string {
204+
// rubygem-bson-debugsource-0:4.3.0-2.module+el8.1.0+3656+f80bfa1d.x86_64
205+
split := strings.Split(nvrea, "-")
206+
if len(split) < 3 {
207+
return nvrea
208+
}
209+
split = split[0 : len(split)-2]
210+
return strings.Join(split, "-")
211+
}
212+
213+
func ModuleMdToModuleStreams(moduleMds []yum.ModuleMD) (moduleStreams []models.ModuleStream) {
214+
for _, m := range moduleMds {
215+
mStream := models.ModuleStream{
216+
Name: m.Data.Name,
217+
Stream: m.Data.Stream,
218+
Version: m.Data.Version,
219+
Context: m.Data.Context,
220+
Arch: m.Data.Arch,
221+
Summary: m.Data.Summary,
222+
Description: m.Data.Description,
223+
Profiles: map[string][]string{},
224+
PackageNames: []string{},
225+
Packages: m.Data.Artifacts.Rpms,
226+
}
227+
for _, p := range m.Data.Artifacts.Rpms {
228+
mStream.PackageNames = append(mStream.PackageNames, extractRpmName(p))
229+
}
230+
slices.Sort(mStream.PackageNames) // Sort the package names so the hash is consistent
231+
mStream.HashValue = generateHash(mStream.ToHashString())
232+
for pName, p := range m.Data.Profiles {
233+
mStream.Profiles[pName] = p.Rpms
234+
}
235+
236+
moduleStreams = append(moduleStreams, mStream)
237+
}
238+
return moduleStreams
239+
}
240+
241+
// InsertForRepository inserts a set of yum module streams for a given repository
242+
// and removes any that are not in the list. This will involve inserting the package groups
243+
// if not present, and adding or removing any associations to the Repository
244+
// Returns a count of new package groups added to the system (not the repo), as well as any error
245+
func (r moduleStreamsImpl) InsertForRepository(ctx context.Context, repoUuid string, modules []yum.ModuleMD) (int64, error) {
246+
var (
247+
err error
248+
repo models.Repository
249+
)
250+
ctxDb := r.db.WithContext(ctx)
251+
252+
// Retrieve Repository record
253+
if repo, err = r.fetchRepo(ctx, repoUuid); err != nil {
254+
return 0, fmt.Errorf("failed to fetchRepo: %w", err)
255+
}
256+
257+
moduleStreams := ModuleMdToModuleStreams(modules)
258+
259+
err = ctxDb.Model(&models.ModuleStream{}).Clauses(clause.OnConflict{
260+
Columns: []clause.Column{{Name: "hash_value"}},
261+
DoNothing: true}).
262+
Create(moduleStreams).Error
263+
if err != nil {
264+
return 0, fmt.Errorf("failed to insert module streams: %w", err)
265+
}
266+
267+
hashes := make([]string, len(moduleStreams))
268+
for _, m := range moduleStreams {
269+
hashes = append(hashes, m.HashValue)
270+
}
271+
uuids := make([]string, len(moduleStreams))
272+
273+
// insert any modules streams, ignoring any hash conflicts
274+
if err = r.db.WithContext(ctx).
275+
Where("hash_value in (?)", hashes).
276+
Model(&models.ModuleStream{}).
277+
Pluck("uuid", &uuids).Error; err != nil {
278+
return 0, fmt.Errorf("failed retrieving existing ids in module_streams: %w", err)
279+
}
280+
281+
// Delete repository module stream entries not needed
282+
err = r.deleteUnneeded(ctx, repo, uuids)
283+
if err != nil {
284+
return 0, fmt.Errorf("failed to delete unneeded module streams: %w", err)
285+
}
286+
287+
// Add any needed repo module stream entries
288+
repoModStreams := make([]models.RepositoryModuleStream, len(moduleStreams))
289+
for i, uuid := range uuids {
290+
repoModStreams[i] = models.RepositoryModuleStream{
291+
RepositoryUUID: repo.UUID,
292+
ModuleStreamUUID: uuid,
293+
}
294+
}
295+
err = ctxDb.Clauses(clause.OnConflict{
296+
Columns: []clause.Column{{Name: "repository_uuid"}, {Name: "module_stream_uuid"}},
297+
DoNothing: true}).
298+
Create(repoModStreams).Error
299+
if err != nil {
300+
return 0, fmt.Errorf("failed to insert repo module streams: %w", err)
301+
}
302+
return int64(len(repoModStreams)), nil
303+
}
304+
305+
// deleteUnneeded removes any RepositoryPackageGroup entries that are not in the list of package_group_uuids
306+
func (r moduleStreamsImpl) deleteUnneeded(ctx context.Context, repo models.Repository, moduleStreamUUIDs []string) error {
307+
if err := r.db.WithContext(ctx).Model(&models.RepositoryModuleStream{}).
308+
Where("repository_uuid = ?", repo.UUID).
309+
Where("module_stream_uuid NOT IN (?)", moduleStreamUUIDs).
310+
Error; err != nil {
311+
return err
312+
}
313+
return nil
314+
}
315+
316+
func (r moduleStreamsImpl) OrphanCleanup(ctx context.Context) error {
317+
if err := r.db.WithContext(ctx).
318+
Model(&models.ModuleStream{}).
319+
Where("NOT EXISTS (select from repositories_module_streams where module_streams.uuid = repositories_module_streams.module_stream_uuid )").
320+
Delete(&models.ModuleStream{}).Error; err != nil {
321+
return err
322+
}
323+
return nil
324+
}

0 commit comments

Comments
 (0)