-
Notifications
You must be signed in to change notification settings - Fork 15
Graph Managers
Previous Chapter: Crawl Managers
Lets suppose we have the following spider:
class MySiteArticlesSpider(Spider):
name = 'mysite.com'
url = None
def start_requests(self):
assert self.url is not None, "Missing required argument 'url'."
yield Request(url, dont_filter=True)
def parse(Self, response):
(... link discovery logic ...)
yield Request(..., callback=self.parse_article)
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
and we need to schedule it multiple times with different start urls. We can use a GeneratorCrawlManager
instance for this, as we did in the previous chapter. The urls may come
from an external source like s3, HS collections, HCF (Hubstorage Crawl frontier, see next chapters), etc, or may be hardcoded in the crawl manager.
In many use cases the crawl manager and the spider would be enough for fulfilling the project requirements. But lets suppose we want to deliver a final single file with all the items from all the scheduled spider jobs.
Here comes the GraphManager
to the rescue. Graph managers allow to define arbitrary workflows of spiders and scripts running in ScrapyCloud, and create dependencies between them.
For our case, we need to define a workflow where the crawl manager runs first, and a deliver script runs next, once the crawl manager finishes execution:
from typing import Tuple
from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task
class MyArticlesGraphManager(GraphManager):
loop_mode = 120
def configure_workflow(self) -> Tuple[Task]:
crawlTask = Task("crawl", "py:crawlmanager.py", init_args=["mysite.com"])
myfilename = str(int(time.time()))
deliverTask = Task("deliver", "py:deliver.py", init_args=["mysite.com", f"--output-file=s3://mybucket/myfolder/{myfilename}.jl"])
crawlTask.add_next_task(deliverTask)
return (crawl,)
Lets save this code into scripts/flowmanager.py
. The workflow is defined via the configure_workflow()
method. This method defines two tasks: crawl
and deliver
. The crawl
task instructs the graph manager to schedule the crawl manager with the appropiate arguments. The deliver
task
do the same for the deliver script (see next section on how to implement the deliver script). The line crawlTask.add_next_task(deliverTask)
instruct the graph manager
to schedule the deliver
task immediately after the crawl
task is completed. The return value of the configure_workflow()
method is a tuple of tasks indicating the root tasks. In our case,
just the crawl
task.
In order to execute this workflow, the script must be invoked in this way:
$ python flowmanager.py --root-jobs
The flag --root-jobs
instruct the graph manager to start execution with the root jobs defined in the return value of configure_workflow()
. Eventually this can be altered
(see the graph manager command line help).
We did't define the deliver.py
script. It may be any script that reads all jobs from this specific workflow instance, merge them, and deliver. shub-workflow
library also provides
a convenient class for easy implementation of deliver scripts, that takes advantage from the fact that all jobs within the same workflow instance share the same FLOW_ID
tag, and this
value will be different from other workflow instances, even if initiated by the same graph manager script. This feature ensures that all and only the spider jobs belonging to same crawl
will be read and delivered by the deliver script. We may be running multiple crawls and deliver scripts at same time
Let's suppose we want to deliver our job items into s3 (the script also supports GCS and local file storage), so we would add to our project the script scripts/deliver.py
:
import json
from tempfile import mktemp
from shub_workflow.deliver import BaseDeliverScript
from shub_workflow.utils.futils import mv_file
class MyDeliverScript(BaseDeliverScript):
def add_argparser_options(self):
super().add_argparser_options()
self.argparser.add_argument("--output-file")
def on_item(self, item: dict, scrapername: str):
print(json.dumps(item), file=self.tempfile)
def run(self):
if self.args.output_file is not None:
self.tempfile = open(mktemp(), "w", encoding="utf8")
super().run()
def on_close(self):
if self.args.output_file and self.total_items_count > 0:
self.tempfile.close()
mv_file(self.tempfile.name, self.args.output_file)
super().on_close()
if __name__ == '__main__':
from shub_workflow.utils import get_kumo_loglevel
logging.basicConfig(format="%(asctime)s %(name)s [%(levelname)s]: %(message)s", level=get_kumo_loglevel())
deliver = MyDeliverScript()
deliver.run()
The above subclass is very simple and it is easy to understand what it does. But this class is designed for easy overriding of multiple attributes and methods in order to provide very flexible customization for the specific needs of a project. (see DeliverScript code).
Notice that in the graph manager we passed to the deliver script the spider name and the output file. The first one is a standard argument from the base deliver class, that allows the script to select
the target spiders to deliver from. The output file option is specific to our case and it is easily implemented with the help of shub-workflow
futils tools.
Lets explore more complex approaches in following chapters.
Next Chapter: Managing Hubstorage Crawl Frontiers