Skip to content

Commit

Permalink
Merge branch 'feat/pulse'
Browse files Browse the repository at this point in the history
  • Loading branch information
bcho committed Jan 9, 2017
2 parents b7e4b9d + 719a832 commit bd33798
Show file tree
Hide file tree
Showing 190 changed files with 34,432 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pulse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# pulse

Repository pulse... with more power.

## Usage

```
$ ./pulse
```
17 changes: 17 additions & 0 deletions pulse/app/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package app

import (
"context"
"log"
"net/http"

"github.com/bearyinnovative/radagast/config"
)

func Serve(ctx context.Context) {
config := config.FromContext(ctx).Get("pulse.app").Config()

bind := config.Get("bind").String()
log.Printf("pulse.app listening on %s", bind)
log.Fatal(http.ListenAndServe(bind, nil))
}
42 changes: 42 additions & 0 deletions pulse/cmd/op/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"context"
"flag"
"log"

"github.com/bearyinnovative/radagast/config"
"github.com/bearyinnovative/radagast/pulse/db"
)

var (
reindex bool
)

func main() {
flag.Parse()

ctx := context.Background()
ctx = config.MustMakeContext(ctx, "./radagast.toml")
ctx = db.MustMakeContext(ctx)

dbClient := db.ClientFromContext(ctx)

if reindex {
if _, err := dbClient.DeleteIndex(db.PULSE_INDEX).Do(ctx); err != nil {
log.Fatalf("delete index failed: %+v", err)
} else {
log.Printf("index deleted")
}
}

if err := db.CreateMapping(ctx, dbClient); err == nil {
log.Printf("created mapping")
} else {
log.Fatalf("create index failed: %+v", err)
}
}

func init() {
flag.BoolVar(&reindex, "reindex", false, "delete current index?")
}
40 changes: 40 additions & 0 deletions pulse/cmd/pulse/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"context"
"log"

"github.com/bearyinnovative/radagast/config"
"github.com/bearyinnovative/radagast/github"
"github.com/bearyinnovative/radagast/pulse/app"
"github.com/bearyinnovative/radagast/pulse/db"
"github.com/bearyinnovative/radagast/pulse/metric"
"github.com/bearyinnovative/radagast/pulse/worker"
)

func main() {
ctx := context.Background()
ctx = config.MustMakeContext(ctx, "./radagast.toml")
ctx = github.MustMakeContext(ctx)
ctx = db.MustMakeContext(ctx)

config := config.FromContext(ctx).Get("pulse").Config()
for _, r := range config.GetSlice("repos") {
repoConfig := r.Config()
repo := metric.NewRepoFromString(
repoConfig.Get("owner").String(),
repoConfig.Get("name").String(),
)
go indexRepo(ctx, *repo)

}

app.Serve(ctx)
}

func indexRepo(ctx context.Context, repo metric.Repo) {
githubClient := github.ClientFromContext(ctx)
esClient := db.ClientFromContext(ctx)

log.Fatal(worker.RunPullRequestIndexer(repo, githubClient, esClient))
}
80 changes: 80 additions & 0 deletions pulse/cmd/sync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"log"
"sync"
"time"

"github.com/bearyinnovative/radagast/config"
"github.com/bearyinnovative/radagast/github"
"github.com/bearyinnovative/radagast/pulse/db"
"github.com/bearyinnovative/radagast/pulse/metric"
gogithub "github.com/google/go-github/github"
)

func main() {
ctx := context.Background()
ctx = config.MustMakeContext(ctx, "./radagast.toml")
ctx = github.MustMakeContext(ctx)
ctx = db.MustMakeContext(ctx)

config := config.FromContext(ctx).Get("pulse").Config()

var wg sync.WaitGroup

for _, r := range config.GetSlice("repos") {
repoConfig := r.Config()
repo := metric.NewRepoFromString(
repoConfig.Get("owner").String(),
repoConfig.Get("name").String(),
)
wg.Add(1)
go indexRepo(ctx, *repo, &wg)

}

wg.Wait()
log.Printf("all repo synced")
}

func indexRepo(ctx context.Context, repo metric.Repo, wg *sync.WaitGroup) {
defer wg.Done()

githubClient := github.ClientFromContext(ctx)
esClient := db.ClientFromContext(ctx)

listOpts := &gogithub.PullRequestListOptions{
State: "all",
Sort: "updated",
Direction: "desc",
ListOptions: gogithub.ListOptions{
PerPage: 25,
},
}

for {
prs, resp, err := githubClient.PullRequests.List(*repo.Owner, *repo.Name, listOpts)
if err != nil {
log.Printf("%s sync failed: %+v", repo, err)
return
}

for _, pr := range prs {
pullRequest := metric.NewPullRequest(pr)
err := metric.IndexPullRequest(ctx, esClient, pullRequest)
if err != nil {
log.Printf("%s sync failed: %+v", repo, err)
}
log.Printf("pull request %s indexed", pullRequest)
}

if resp.NextPage == 0 {
log.Printf("%s sync finished", repo)
return
}

listOpts.ListOptions.Page = resp.NextPage
time.Sleep(3 * time.Second)
}
}
42 changes: 42 additions & 0 deletions pulse/db/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package db

import (
"context"

"github.com/bearyinnovative/radagast/config"

"gopkg.in/olivere/elastic.v5"
)

const CLIENT_KEY = "db_client"

func ClientFromContext(c context.Context) *elastic.Client {
client, ok := c.Value(CLIENT_KEY).(*elastic.Client)

if !ok {
panic("db.client required")
}

return client
}

func ClientToContext(c context.Context, client *elastic.Client) context.Context {
return context.WithValue(c, CLIENT_KEY, client)
}

func MustMakeContext(c context.Context) context.Context {
config := config.FromContext(c).Get("pulse.db").Config()
dbUrl := config.Get("url").String()
if dbUrl == "" {
panic("`pulse.db` required")
}

client, err := elastic.NewClient(
elastic.SetURL(config.Get("url").String()),
)
if err != nil {
panic(err)
}

return ClientToContext(c, client)
}
75 changes: 75 additions & 0 deletions pulse/db/mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package db

import (
"context"
"errors"

"gopkg.in/olivere/elastic.v5"
)

const mapping = `{
"mappings": {
"pullrequests": {
"properties": {
"id": {"type": "long"},
"number": {"type": "long"},
"url": {"type": "string", "index": "not_analyzed"},
"state": {"type": "string", "index": "not_analyzed"},
"title": {"type": "string"},
"body": {"type": "string"},
"created_at": {"type": "date"},
"updated_at": {"type": "date"},
"closed_at": {"type": "date"},
"merged_at": {"type": "date"},
"additions": {"type": "long"},
"deletions": {"type": "long"},
"changed_files": {"type": "long"},
"repo": {
"type": "nested",
"properties": {
"owner": {"type": "string", "index": "not_analyzed"},
"name": {"type": "string", "index": "not_analyzed"}
}
},
"user": {
"type": "nested",
"properties": {
"login": {"type": "string", "index": "not_analyzed"},
"name": {"type": "string", "index": "not_analyzed"}
}
},
"merged_by": {
"type": "nested",
"properties": {
"login": {"type": "string", "index": "not_analyzed"},
"name": {"type": "string", "index": "not_analyzed"}
}
},
"assignees": {
"type": "nested",
"properties": {
"login": {"type": "string", "index": "not_analyzed"},
"name": {"type": "string", "index": "not_analyzed"}
}
}
}
}
}
}`

const PULSE_INDEX = "beary_pulse"

var errCreateFailed = errors.New("create mapping failed")

func CreateMapping(c context.Context, client *elastic.Client) error {
rv, err := client.CreateIndex(PULSE_INDEX).BodyString(mapping).Do(c)
if err != nil {
return err
}

if !rv.Acknowledged {
return errCreateFailed
}

return nil
}
1 change: 1 addition & 0 deletions pulse/github/pullrequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package github
40 changes: 40 additions & 0 deletions pulse/metric/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package metric

import (
"context"
"errors"

"github.com/bearyinnovative/radagast/pulse/db"

"gopkg.in/olivere/elastic.v5"
)

var (
errNotIndexed = errors.New("not indexed")
)

func IndexPullRequest(ctx context.Context, esClient *elastic.Client, pr *PullRequest) error {
if pr == nil {
return errNotIndexed
}

_, err := esClient.DeleteByQuery().
Index(db.PULSE_INDEX).
Type(TypePullRequest).
Query(elastic.NewTermQuery("id", pr.ID)).
Do(ctx)
if err != nil {
return err
}

_, err = esClient.Index().
Index(db.PULSE_INDEX).
Type(TypePullRequest).
BodyJson(pr).
Do(ctx)
if err != nil {
return err
}

return nil
}
Loading

0 comments on commit bd33798

Please sign in to comment.