Skip to content

Commit

Permalink
Add a reader for harvesting directly from purl-fetcher HTTP API
Browse files Browse the repository at this point in the history
This adds a traject reader that can be useful in development when
you want to quickly index many records from purl-fetcher without
resorting to Kafka. It is intended for dev use only.

It can point at any release target (searchworks, earthworks) and
index all of the items currently released to that target.

This PR also modifies PublicCocinaRecord and PublicXmlRecord to
optionally accept a connection object, so that a single Faraday
connection can be shared amongst the reader and records, which
enables parallelizing record-fetching from purl to match the
number of traject threads.

This setup allows indexing everything released to Earthworks in
a little under 5 minutes with 4 threads on my machine.
  • Loading branch information
thatbudakguy committed Aug 30, 2024
1 parent d192ad7 commit c6a538f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ gem 'stanford-geo', '0.2.0'
gem 'mutex_m'

gem 'match_map', '~> 3.0'
gem 'faraday', '~> 2.9'
gem 'faraday-net_http_persistent', '~> 2.1'
gem 'progress_bar'
21 changes: 21 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ GEM
erubi (1.13.0)
factory_bot (6.4.6)
activesupport (>= 5.0.0)
faraday (2.9.1)
faraday-net_http (>= 2.0, < 3.2)
faraday-net_http (3.1.0)
net-http
faraday-net_http_persistent (2.1.0)
faraday (~> 2.5)
net-http-persistent (~> 4.0)
ffi (1.17.0-arm64-darwin)
ffi (1.17.0-x86_64-darwin)
ffi (1.17.0-x86_64-linux-gnu)
Expand All @@ -154,6 +161,8 @@ GEM
rake
hashdiff (1.1.1)
hashie (5.0.0)
highline (3.1.0)
reline
honeybadger (5.15.6)
http (5.2.0)
addressable (~> 2.8)
Expand Down Expand Up @@ -205,6 +214,10 @@ GEM
view_component
multi_json (1.15.0)
mutex_m (0.2.0)
net-http (0.4.1)
uri
net-http-persistent (4.0.2)
connection_pool (~> 2.2)
net-scp (4.0.0)
net-ssh (>= 2.6.5, < 8.0.0)
net-sftp (4.0.0)
Expand All @@ -223,13 +236,17 @@ GEM
commonmarker (>= 1.0)
openapi_parser (1.0.0)
optimist (3.1.0)
options (2.3.2)
parallel (1.26.3)
parser (3.3.4.2)
ast (~> 2.4.1)
racc
patience_diff (1.2.0)
optimist (~> 3.0)
pg (1.5.7)
progress_bar (1.3.4)
highline (>= 1.6)
options (~> 2.3.0)
psych (5.1.2)
stringio
public_suffix (6.0.1)
Expand Down Expand Up @@ -346,6 +363,7 @@ GEM
concurrent-ruby (~> 1.0)
unf (0.2.0)
unicode-display_width (2.5.0)
uri (0.13.0)
useragent (0.16.10)
view_component (3.13.0)
activesupport (>= 5.2.0, < 8.0)
Expand Down Expand Up @@ -382,6 +400,8 @@ DEPENDENCIES
dor-event-client
dor-rights-auth
factory_bot (~> 6.2)
faraday (~> 2.9)
faraday-net_http_persistent (~> 2.1)
honeybadger
http
i18n
Expand All @@ -391,6 +411,7 @@ DEPENDENCIES
mutex_m
parallel
pg
progress_bar
rake
retriable
rspec
Expand Down
10 changes: 4 additions & 6 deletions lib/public_cocina_record.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
# frozen_string_literal: true

require 'http'

class PublicCocinaRecord
attr_reader :public_cocina_doc, :druid, :purl_url

def self.fetch(druid, purl_url: 'https://purl.stanford.edu')
response = HTTP.get("#{purl_url}/#{druid}.json")
new(druid, response.body, purl_url:) if response.status.ok?
def self.fetch(druid, purl_url: 'https://purl.stanford.edu', client: Faraday.new)
response = client.get("#{purl_url}/#{druid}.json")
new(druid, response.body, purl_url:) if response.success?
end

def initialize(druid, public_cocina, purl_url: 'https://purl.stanford.edu')
@druid = druid
@purl_url = purl_url
@public_cocina_doc = JSON.parse(public_cocina)
@public_cocina_doc = public_cocina
end

def cocina_access
Expand Down
7 changes: 3 additions & 4 deletions lib/public_xml_record.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# frozen_string_literal: true

require 'http'
require 'mods_display'
require 'dor/rights_auth'

class PublicXmlRecord
attr_reader :public_xml_doc, :druid, :purl_url

def self.fetch(druid, purl_url: 'https://purl.stanford.edu')
response = HTTP.get("#{purl_url}/#{druid}.xml")
new(druid, response.body, purl_url:) if response.status.ok?
def self.fetch(druid, purl_url: 'https://purl.stanford.edu', client: Faraday.new)
response = client.get("#{purl_url}/#{druid}.xml")
new(druid, response.body, purl_url:) if response.success?
end

def initialize(druid, public_xml, purl_url: 'https://purl.stanford.edu')
Expand Down
9 changes: 5 additions & 4 deletions lib/purl_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
require 'active_support/core_ext/module/delegation'

class PurlRecord
attr_reader :druid, :purl_url
attr_reader :druid, :purl_url, :client

def initialize(druid, purl_url: 'https://purl.stanford.edu')
def initialize(druid, purl_url: 'https://purl.stanford.edu', client: nil)
@druid = druid
@purl_url = purl_url
@client = client
end

def searchworks_id
Expand All @@ -21,11 +22,11 @@ def druid_tree
end

def public_xml
@public_xml ||= PublicXmlRecord.fetch(druid, purl_url:)
@public_xml ||= PublicXmlRecord.fetch(druid, purl_url:, client:)
end

def public_cocina
@public_cocina ||= PublicCocinaRecord.fetch(druid, purl_url:)
@public_cocina ||= PublicCocinaRecord.fetch(druid, purl_url:, client:)
end

def public_meta_json
Expand Down
43 changes: 43 additions & 0 deletions lib/traject/readers/purl_fetcher_reader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

require 'faraday'
require 'progress_bar'

module Traject
# A reader that fetches all items released to a target from purl-fetcher
class PurlFetcherReader
attr_reader :input_stream, :settings

def initialize(input_stream, settings)
@settings = Traject::Indexer::Settings.new settings
@input_stream = input_stream
end

def each
return to_enum(:each) unless block_given?

response = client.get("/released/#{target}.json")
bar = ProgressBar.new(response.body.length)
response.body.each do |obj|
yield PurlRecord.new(obj['druid'].gsub('druid:', ''), purl_url: @settings['purl.url'], client:)
bar.increment!
end
end

private

def target
@settings['purl_fetcher.target'] || 'Searchworks'
end

def host
@settings['purl_fetcher.url'] || 'https://purl-fetcher.stanford.edu'
end

def client
@client ||= Faraday.new(url: host) do |builder|
builder.adapter(:net_http_persistent, pool_size: @settings['processing_thread_pool'])
end
end
end
end

0 comments on commit c6a538f

Please sign in to comment.