Skip to content

Commit

Permalink
Add Expectations, env config and pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
sbusso committed Mar 23, 2024
1 parent 37706eb commit aa48a26
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 82 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ cd scrapework
poetry install
```

### Spider Configuration

- `start_urls`: A list of URLs to start scraping from.
- pipelines
- extractors: comes with various extractors (plain body, smart extractors, markedown.)
- middlewares: comes with various middlewares

### Creating a Spider

A Spider is a class that defines how to navigate a website and extract data. Here's how you can create a Spider:
Expand Down
34 changes: 18 additions & 16 deletions scrapework/config.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
import enum
import os
from typing import Optional

from dotenv import load_dotenv
from pydantic import BaseModel
from pydantic import BaseModel, Field

load_dotenv()


class Config(BaseModel):
# S3_ENDPOINT: str = Field(default=os.environ["AWS_ENDPOINT_URL"])
# S3_BUCKET: str = Field(default=os.environ["S3_BUCKET"])
# S3_ACCESS_KEY_ID: str = Field(default=os.environ["AWS_ACCESS_KEY_ID"])
# S3_SECRET_ACCESS_KEY: str = Field(default=os.environ["AWS_SECRET_ACCESS_KEY"])
# TELEGRAM_SENDER_TOKEN: str = Field(default=os.environ["TELEGRAM_SENDER_TOKEN"])
pass
class EnvConfig(BaseModel):
@classmethod
def create_config(cls):
fields = {}
for field_name, field_value in cls.__fields__.items():
if field_name in os.environ:
fields[field_name] = os.environ[field_name]
else:
raise ValueError(
f"Required environment variable '{field_name}' not set"
)

return cls(**fields)

config = Config()

config = EnvConfig()

class BackendType(enum.Enum):
FILE = "file"
S3 = "s3"
META = "meta"
SCRAPEOPS_API_KEY: Optional[str] = Field(default=os.environ.get("SCRAPEOPS_API_KEY"))


class PipelineConfig(BaseModel):
base_url: str
backend: BackendType
s3_bucket: str

filename: str
43 changes: 42 additions & 1 deletion scrapework/middleware.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
class Middleware:
from abc import abstractmethod
from typing import List, Optional
from urllib.parse import urlencode

import requests
from pydantic import BaseModel


class Proxy(BaseModel):
url: str


class Middleware(BaseModel):
proxies: Optional[List[Proxy]] = None

@abstractmethod
def process_request(self, request):
raise NotImplementedError

Expand All @@ -13,3 +28,29 @@ class MiddlewareDefaultHeaders(Middleware):
def process_request(self, request):
request.headers.update({"User-Agent": "Mozilla/5.0"})
return request


class MiddlewareLogging(Middleware):
def process_request(self, request):
print(f"Making request to {request.url}")
return request


class MiddlewareProxy(Middleware):
def process_request(self, request):
if self.proxies:
request.proxies = self.proxies

return request


class MiddlewareScrapeOps(Middleware):
api_key: str

def process_request(self, request):

payload = {"api_key": self.api_key, "url": request.url}
request = requests.get(
"https://proxy.scrapeops.io/v1/", params=urlencode(payload)
)
return request
7 changes: 7 additions & 0 deletions scrapework/monitors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic import BaseModel


# This is a generic class to manage output expectation, like number of items, format, etc.
class Expectations(BaseModel):
def is_met(self):
raise NotImplementedError
56 changes: 39 additions & 17 deletions scrapework/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,52 @@
import json
from typing import Dict, Iterable, List, Union
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Union

import boto3
from pydantic import BaseModel
from pydantic import Field

from scrapework.config import BackendType, PipelineConfig

class Pipeline(ABC):
@abstractmethod
def process_items(
self,
items: Union[Dict[str, Any], Iterable[Dict[str, Any]]],
filename: str | None,
):
pass

class ItemPipeline(BaseModel):
def process_items(self, items: Union[Dict, Iterable], config: PipelineConfig):
if config.backend == BackendType.FILE:
self.export_to_json(list(items), config)
elif config.backend == BackendType.S3:
self.export_to_s3(items, config)

def export_to_json(self, items: Union[Dict, List], config: PipelineConfig):
file_name = config.filename
with open(file_name, "w") as f:
class JsonFilePipeline(Pipeline):
def process_items(
self, items: Union[Dict[str, Any], Iterable[Dict[str, Any]]], filename: str
):

with open(filename, "w") as f:
json.dump(items, f)

def export_to_s3(self, items: Union[Dict, Iterable], config: PipelineConfig):
if not config.s3_bucket:
raise ValueError("S3 bucket name not provided in the configuration.")

class S3Pipeline(Pipeline):
s3_bucket: str = Field(default_factory=str)

def process_items(
self,
items: Union[Dict[str, Any], Iterable[Dict[str, Any]]],
filename: str,
):

s3_client = boto3.client("s3")
file_name = config.filename

s3_client.put_object(
Body=json.dumps(items), Bucket=config.s3_bucket, Key=file_name
Body=json.dumps(items), Bucket=self.s3_bucket, Key=filename
)


class MetadataPipeline(Pipeline):
def process_items(
self,
items: Union[Dict[str, Any], Iterable[Dict[str, Any]]],
):
if isinstance(items, dict):
return {"items_count": 1}
else:
return {"items_count": len(list(items))}
36 changes: 14 additions & 22 deletions scrapework/spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,43 @@
from typing import Any, Callable, ClassVar, Dict, Iterable, List, Optional, Union

import requests
from pydantic import BaseModel
from pydantic import BaseModel, Field

from scrapework.config import BackendType, PipelineConfig
from scrapework.config import EnvConfig
from scrapework.extractors import Extractor
from scrapework.logger import logger
from scrapework.middleware import Middleware
from scrapework.pipelines import ItemPipeline
from scrapework.pipelines import Pipeline


class Spider(BaseModel, ABC):
name: ClassVar[str] = "base_spider"
start_urls: List[str] = []
pipeline: Optional[ItemPipeline] = None
pipelines: List[Pipeline] = []
base_url: str = ""
backend: BackendType = BackendType.FILE
s3_bucket: str = ""
filename: str = ""
callback: Optional[
Callable[[requests.Response], Union[Dict[str, Any], Iterable[Dict[str, Any]]]]
] = None
middlewares: List[Middleware] = []
logger: ClassVar[logging.Logger] = logger
config: EnvConfig = Field(default_factory=EnvConfig)

def __init__(self, **args):

if not self.__class__.name:
raise ValueError("Subclass must provide a name attribute")
super().__init__(**args)
self.config = self.SpiderConfig.create_config()
self.callback = self.parse
if not self.base_url and self.start_urls:
self.base_url = self.start_urls[0]
if not self.s3_bucket:
self.s3_bucket = self.name

if not self.filename:
self.filename = f"{self.name}.json"
if not self.pipeline:
self.pipeline = ItemPipeline()

class SpiderConfig(EnvConfig):
pass

class Config:
arbitrary_types_allowed = True
Expand All @@ -51,15 +51,6 @@ def use(self, middleware: Middleware):
def parse(self, response) -> Union[Dict[str, Any], Iterable[Dict[str, Any]]]:
Extractor().extract_body(response)

@property
def pipeline_config(self) -> PipelineConfig:
return PipelineConfig(
base_url=self.base_url,
backend=self.backend,
s3_bucket=self.s3_bucket,
filename=self.filename,
)

def run(self):
for url in self.start_urls:
response = self.make_request(url)
Expand All @@ -77,10 +68,11 @@ def run(self):

items = self.callback(response)

if not self.pipeline:
raise ValueError("Pipeline not defined")
if items is None:
raise ValueError("Items not returned")

self.pipeline.process_items(items, self.pipeline_config)
for pipeline in self.pipelines:
pipeline.process_items(items, self.filename)

def make_request(self, url: str) -> Optional[requests.Response]:
request = requests.Request("GET", url)
Expand Down
27 changes: 1 addition & 26 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import json
from unittest.mock import MagicMock, patch

from scrapework.config import BackendType, PipelineConfig
from scrapework.pipelines import ItemPipeline
from scrapework.config import PipelineConfig
from scrapework.spider import Spider


Expand All @@ -14,30 +13,6 @@ def parse(self):
pass


def test_pipeline_config_defaults():
spider = ConcreteSpider()
config = spider.pipeline_config
assert isinstance(config, PipelineConfig)
assert config.backend == BackendType.FILE
assert config.s3_bucket == "concrete_spider"
# Use the updated default filename here
assert config.filename == "concrete_spider.json"


def test_pipeline_config_with_values():
spider = ConcreteSpider()
spider.base_url = "https://example.com"
spider.backend = BackendType.FILE
spider.s3_bucket = "my-bucket"
spider.filename = "data.csv"
config = spider.pipeline_config
assert isinstance(config, PipelineConfig)
assert config.base_url == "https://example.com"
assert config.backend == BackendType.FILE
assert config.s3_bucket == "my-bucket"
assert config.filename == "data.csv"


def test_process_items_with_file_backend():
items = [{"name": "item1"}, {"name": "item2"}]
config = PipelineConfig(
Expand Down

0 comments on commit aa48a26

Please sign in to comment.