Skip to content

Commit

Permalink
Merge pull request #57 from Avokadoen/revisit-proxy-fix
Browse files Browse the repository at this point in the history
Implement merging of revisit records through proxy
  • Loading branch information
maeb authored Mar 25, 2022
2 parents b35190f + cbf1668 commit bbdebd0
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 89 deletions.
17 changes: 10 additions & 7 deletions cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ package proxy

import (
"fmt"
"net/http"
"os"
"time"

gHandlers "github.com/gorilla/handlers"
"github.com/julienschmidt/httprouter"
"github.com/nlnwa/gowarcserver/internal/server"
"github.com/nlnwa/gowarcserver/internal/server/handlers"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"net/http"
"os"
"time"
)

func NewCommand() *cobra.Command {
Expand Down Expand Up @@ -54,8 +55,8 @@ func NewCommand() *cobra.Command {
}

func proxyCmd(_ *cobra.Command, _ []string) error {
childUrls := ParseUrls(viper.GetStringSlice("childUrls"))
childQueryTimeout := viper.GetDuration("childQueryTimeout")
childUrls := ParseUrls(viper.GetStringSlice("child-urls"))
childQueryTimeout := viper.GetDuration("child-query-timeout")
port := viper.GetInt("port")
r := httprouter.New()

Expand All @@ -65,9 +66,11 @@ func proxyCmd(_ *cobra.Command, _ []string) error {

indexHandler := handlers.AggregatedHandler(childUrls, childQueryTimeout)
resourceHandler := handlers.FirstHandler(childUrls, childQueryTimeout)
contentHandler := handlers.FirstHandler(childUrls, childQueryTimeout)

r.Handler("GET", "/cdx", middleware(indexHandler))
r.Handler("GET", "/web", middleware(resourceHandler))
r.Handler("GET", "/warcserver/cdx", middleware(indexHandler))
r.Handler("GET", "/warcserver/web", middleware(resourceHandler))
r.Handler("GET", "/id/:id", middleware(contentHandler))

return server.Serve(port, r)
}
23 changes: 18 additions & 5 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ package serve
import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"regexp"
"runtime"
"time"

"github.com/dgraph-io/badger/v3/options"
"github.com/gorilla/handlers"
"github.com/julienschmidt/httprouter"
Expand All @@ -32,11 +39,6 @@ import (
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"net/http"
"os"
"regexp"
"runtime"
"time"
)

func NewCommand() *cobra.Command {
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewCommand() *cobra.Command {
logRequests := false

cmd.Flags().IntP("port", "p", port, "server port")
cmd.Flags().String("proxy-url", "", "url to a gowarc server proxy that will be used to resolve records")
cmd.Flags().StringSlice("include", nil, "only include files matching these regular expressions")
cmd.Flags().StringSlice("exclude", nil, "exclude files matching these regular expressions")
cmd.Flags().BoolP("index", "a", enableIndexing, "enable indexing")
Expand Down Expand Up @@ -128,6 +131,15 @@ func serveCmd(cmd *cobra.Command, args []string) error {
excludes = append(excludes, re)
}
}
// parse proxy url
var proxyUrl *url.URL
proxyStr := viper.GetString("proxy-url")
if proxyStr != "" {
proxyUrl, err = url.Parse(proxyStr)
if err != nil {
return err
}
}
// optionally start autoindexer
if viper.GetBool("index") {
log.Info().Msg("Starting auto indexer")
Expand Down Expand Up @@ -157,6 +169,7 @@ func serveCmd(cmd *cobra.Command, args []string) error {
return fileInfo.Path, err
}},
NoUnpack: false,
ProxyUrl: proxyUrl,
}

// middleware chain
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ require (
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/fsnotify/fsnotify v1.4.9
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/julienschmidt/httprouter v1.3.0
github.com/mitchellh/mapstructure v1.4.1
github.com/nlnwa/gowarc v1.0.0-beta.2
github.com/nlnwa/gowarc v1.0.0-beta.3
github.com/nlnwa/whatwg-url v0.1.0
github.com/rs/zerolog v1.26.1
github.com/spf13/cobra v1.2.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -260,8 +258,8 @@ github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/nlnwa/gowarc v1.0.0-beta.2 h1:9FyTTDTLP/Bxv7qL6J+zqDx6NLn1X8iuJiTl/d2VNVc=
github.com/nlnwa/gowarc v1.0.0-beta.2/go.mod h1:37HyH1JP9GpGS5WZsBDTbZ8rwvCCDdsSDOr+3gYGEE4=
github.com/nlnwa/gowarc v1.0.0-beta.3 h1:kCuAUTg/+T66Nx4V6k++NzkCWvteCmfX1Su5bQh46cg=
github.com/nlnwa/gowarc v1.0.0-beta.3/go.mod h1:37HyH1JP9GpGS5WZsBDTbZ8rwvCCDdsSDOr+3gYGEE4=
github.com/nlnwa/whatwg-url v0.0.0-20200306110950-d1a95e2e8fc3/go.mod h1:v3hJLcAdjhIn7PA89dVhJ9GSWooX0z2/qPgwlhz0HD8=
github.com/nlnwa/whatwg-url v0.1.0 h1:nJcUTPO+K/jjP7ZsrALylQ8a7XtDDvh0aqGDMdKO4co=
github.com/nlnwa/whatwg-url v0.1.0/go.mod h1:L97nLsTBZQV+fZTyMl1z6RdDhqgGzZTMmrpTkZDEdts=
Expand Down
5 changes: 3 additions & 2 deletions internal/loader/filestorageloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package loader
import (
"context"
"fmt"
"github.com/nlnwa/gowarc"
"github.com/rs/zerolog/log"
"strconv"
"strings"

"github.com/nlnwa/gowarc"
"github.com/rs/zerolog/log"
)

type FileStorageLoader struct {
Expand Down
54 changes: 48 additions & 6 deletions internal/loader/loader.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 National Library of Norway.
* Copyright 2022 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,15 @@
package loader

import (
"bufio"
"context"
"errors"
"fmt"
"net/http"
"net/url"
"path"

"github.com/dgraph-io/badger/v3"
"github.com/nlnwa/gowarc"
"github.com/rs/zerolog/log"
)
Expand All @@ -36,6 +42,7 @@ type Loader struct {
Resolver StorageRefResolver
Loader RecordLoader
NoUnpack bool
ProxyUrl *url.URL
}

type ErrResolveRevisit struct {
Expand Down Expand Up @@ -79,13 +86,48 @@ func (l *Loader) Load(ctx context.Context, warcId string) (gowarc.WarcRecord, er
Date: record.WarcHeader().Get(gowarc.WarcRefersToDate),
}
}

var revisitOf gowarc.WarcRecord
storageRef, err = l.Resolver.Resolve(warcRefersTo)
if err != nil {
// if the record is missing from out DB and a proxy is configured, then we should
// ask the proxy to get the revisitOf record for us
if errors.Is(err, badger.ErrKeyNotFound) && l.ProxyUrl != nil {
reqUrl := *l.ProxyUrl
reqUrl.Path = path.Join(reqUrl.Path, "id", warcRefersTo)

log.Debug().Msgf("attempt to get record from proxy url %s", reqUrl.String())
req, err := http.NewRequestWithContext(ctx, "GET", reqUrl.String(), nil)
if err != nil {
return nil, err
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("resolve revisit from proxy expected %d got %d", http.StatusOK, resp.StatusCode)
}

warcUnmarshaler := gowarc.NewUnmarshaler(
gowarc.WithSyntaxErrorPolicy(gowarc.ErrIgnore),
gowarc.WithSpecViolationPolicy(gowarc.ErrIgnore),
)
bodyIoReader := bufio.NewReader(resp.Body)
revisitOf, _, _, err = warcUnmarshaler.Unmarshal(bodyIoReader)
if err != nil {
return nil, err
}
} else if err != nil {
return nil, fmt.Errorf("unable to resolve referred Warc-Record-ID [%s]: %w", warcRefersTo, err)
}
revisitOf, err := l.Loader.Load(ctx, storageRef)
if err != nil {
return nil, err
} else {
// in the event that it managed to load record locally we do that instead
revisitOf, err = l.Loader.Load(ctx, storageRef)
if err != nil {
return nil, err
}
}
rtrRecord, err = record.Merge(revisitOf)
if err != nil {
Expand Down
73 changes: 23 additions & 50 deletions internal/server/coreserver/contenthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,50 @@
package coreserver

import (
"context"
"fmt"
"github.com/gorilla/mux"
"net/http"

"github.com/julienschmidt/httprouter"
"github.com/nlnwa/gowarc"
"github.com/nlnwa/gowarcserver/internal/loader"
"io"
"net/http"
)

type contentHandler struct {
loader loader.RecordLoader
}

func (h contentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
warcId, ok := mux.Vars(r)["id"]
if !ok {
http.NotFound(w, r)
params := httprouter.ParamsFromContext(r.Context())
warcId := params.ByName("id")
if warcId == "" {
http.Error(w, `missing required parameter "id"`, http.StatusBadRequest)
return
}
if len(warcId) > 0 && warcId[0] != '<' {
warcId = "<" + warcId + ">"
if warcId[0] != '<' {
warcId = "<" + warcId
}
if warcId[len(warcId)-1] != '>' {
warcId = warcId + ">"
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

record, err := h.loader.Load(ctx, warcId)
record, err := h.loader.Load(r.Context(), warcId)
if err != nil {
msg := fmt.Sprintf("Failed to load record: %v", err)
msg := fmt.Sprintf("failed to load record: %v", err)
http.Error(w, msg, http.StatusInternalServerError)
return
}
defer func() {
_ = record.Close()
}()
defer record.Close()

switch v := record.Block().(type) {
case gowarc.PayloadBlock:
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
_, err = record.WarcHeader().Write(w)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
byteReader, err := v.PayloadBytes()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = io.Copy(w, byteReader)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
default:
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
_, err = record.WarcHeader().Write(w)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Add("content-type", "application/octet-stream")
marshaler := gowarc.NewMarshaler()

rb, err := v.RawBytes()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = io.Copy(w, rb)
// write records until all relevant records are written
continuation := record
for continuation != nil {
continuation, _, err = marshaler.Marshal(w, continuation, 0)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
msg := fmt.Sprintf("failed to marshal record: %v", err)
http.Error(w, msg, http.StatusInternalServerError)
}
}
}
5 changes: 3 additions & 2 deletions internal/server/coreserver/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package coreserver

import (
"net/http"

"github.com/julienschmidt/httprouter"
"github.com/nlnwa/gowarcserver/internal/database"
"github.com/nlnwa/gowarcserver/internal/loader"
"net/http"
)

func Register(r *httprouter.Router, middleware func(http.Handler) http.Handler, pathPrefix string, loader *loader.Loader, db *database.CdxDbIndex) {
indexHandler := IndexHandler{db}
r.Handler("GET", pathPrefix+"/ids", http.HandlerFunc(indexHandler.ListIds))
r.Handler("GET", pathPrefix+"/files", http.HandlerFunc(indexHandler.ListFileNames))
r.Handler("GET", pathPrefix+"/search", http.HandlerFunc(indexHandler.Search))
r.Handler("GET", pathPrefix+"/id/{id}", contentHandler{loader})
r.Handler("GET", pathPrefix+"/id/:id", contentHandler{loader})
}
15 changes: 11 additions & 4 deletions internal/server/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package handlers

import (
"context"
"github.com/rs/zerolog/log"
"io"
"net/http"
"net/url"
"sync"
"time"

"github.com/rs/zerolog/log"
)

var client = &http.Client{
Expand Down Expand Up @@ -49,9 +50,14 @@ func FirstHandler(children []*url.URL, timeout time.Duration) http.Handler {
resp := response
if resp.StatusCode < 400 {
if !written {
for k, vv := range resp.Header {
for _, v := range vv {
w.Header().Add(k, v)
// Write headers
for key, values := range resp.Header {
for i, value := range values {
if i == 0 {
w.Header().Set(key, value)
} else {
w.Header().Add(key, value)
}
}
}
w.WriteHeader(resp.StatusCode)
Expand Down Expand Up @@ -93,6 +99,7 @@ func ChildHandler(children []*url.URL, timeout time.Duration, responseHandler Re
req := r.Clone(ctx)
req.RequestURI = ""
req.URL = buildChildURLString(childUrl, req.URL)
log.Debug().Msgf("request to child url %s", req.URL.String())
go func() {
defer wg.Done()

Expand Down
Loading

0 comments on commit bbdebd0

Please sign in to comment.