-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathmain.go
111 lines (96 loc) · 2.92 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"regexp"
"github.com/ossf/package-feeds/pkg/feeds"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/kafkapubsub"
"github.com/ossf/package-analysis/cmd/scheduler/proxy"
"github.com/ossf/package-analysis/internal/log"
"github.com/ossf/package-analysis/pkg/api/pkgecosystem"
)
type ManagerConfig struct {
// ExcludeVersions is a list of regexp expressions, where if a version of
// any package has a version string matching an expression in this list,
// that package version will be ignored.
ExcludeVersions []*regexp.Regexp
// Ecosystem is the internal name of the ecosystem.
Ecosystem pkgecosystem.Ecosystem
}
func (m *ManagerConfig) SkipVersion(version string) bool {
if m == nil {
return true
}
if m.ExcludeVersions == nil || len(m.ExcludeVersions) == 0 {
return false
}
for _, f := range m.ExcludeVersions {
if f.MatchString(version) {
return true
}
}
return false
}
// supportedPkgManagers lists the package managers Package Analysis can
// analyze. It is a map from ossf/package-feeds package types, to a
// config for the package manager's feed.
var supportedPkgManagers = map[string]*ManagerConfig{
"npm": {Ecosystem: pkgecosystem.NPM},
"pypi": {Ecosystem: pkgecosystem.PyPI},
"rubygems": {Ecosystem: pkgecosystem.RubyGems},
"packagist": {
Ecosystem: pkgecosystem.Packagist,
ExcludeVersions: []*regexp.Regexp{regexp.MustCompile(`^dev-`), regexp.MustCompile(`\.x-dev$`)},
},
"crates": {Ecosystem: pkgecosystem.CratesIO},
}
func main() {
subscriptionURL := os.Getenv("OSSMALWARE_SUBSCRIPTION_URL")
topicURL := os.Getenv("OSSMALWARE_WORKER_TOPIC")
log.Initialize(os.Getenv("LOGGER_ENV"))
err := listenLoop(subscriptionURL, topicURL)
if err != nil {
slog.Error("Error encountered", "error", err)
}
}
func listenLoop(subURL, topicURL string) error {
ctx := context.Background()
sub, err := pubsub.OpenSubscription(ctx, subURL)
if err != nil {
return err
}
topic, err := pubsub.OpenTopic(ctx, topicURL)
if err != nil {
return err
}
srv := proxy.New(topic, sub)
slog.InfoContext(ctx, "Listening for messages to proxy...")
err = srv.Listen(ctx, func(m *pubsub.Message) (*pubsub.Message, error) {
slog.InfoContext(ctx, "Handling message", "body", string(m.Body))
pkg := feeds.Package{}
if err := json.Unmarshal(m.Body, &pkg); err != nil {
return nil, fmt.Errorf("error unmarshalling json: %w", err)
}
config, ok := supportedPkgManagers[pkg.Type]
if !ok {
return nil, fmt.Errorf("package type is not supported: %v", pkg.Type)
}
if config.SkipVersion(pkg.Version) {
return nil, fmt.Errorf("package version '%v' is filtered for type: %v", pkg.Version, pkg.Type)
}
return &pubsub.Message{
Body: []byte{},
Metadata: map[string]string{
"name": pkg.Name,
"ecosystem": config.Ecosystem.String(),
"version": pkg.Version,
},
}, nil
})
return err
}