The fastapi-mail simple lightweight mail system, sending emails and attachments(individual && bulk)
$ pip install fastapi-mail
uvicorn examples.main:app --reload --port 8001
from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form
from starlette.responses import JSONResponse
from starlette.requests import Request
from fastapi_mail import FastMail, MessageSchema,ConnectionConfig
from pydantic import EmailStr
from pydantic import EmailStr, BaseModel
from typing import List
class EmailSchema(BaseModel):
email: List[EmailStr]
conf = ConnectionConfig(
MAIL_USERNAME = "YourUsername",
MAIL_PASSWORD = "strong_password",
MAIL_FROM = "[email protected]",
MAIL_PORT = 587,
MAIL_SERVER = "your mail server",
MAIL_TLS = True,
MAIL_SSL = False
)
app = FastAPI()
html = """
<p>Hi this test mail, thanks for using Fastapi-mail</p>
"""
template = """
<p>Hi this test mail using BackgroundTasks, thanks for using Fastapi-mail</p>
"""
@app.post("/email")
async def simple_send(email: EmailSchema) -> JSONResponse:
message = MessageSchema(
subject="Fastapi-Mail module",
recipients=email.dict().get("email"), # List of recipients, as many as you can pass
body=html,
subtype="html"
)
fm = FastMail(conf)
await fm.send_message(message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
@app.post("/emailbackground")
async def send_in_background(background_tasks: BackgroundTasks,email: EmailSchema) -> JSONResponse:
message = MessageSchema(
subject="Fastapi mail module",
recipients=email.dict().get("email"),
body="Simple background task ",
)
fm = FastMail(conf)
background_tasks.add_task(fm.send_message,message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
@app.post("/file")
async def send_file(background_tasks: BackgroundTasks,file: UploadFile = File(...),email:EmailStr = Form(...)) -> JSONResponse:
message = MessageSchema(
subject="Fastapi mail module",
recipients=[email],
body="Simple background task ",
attachments=[file]
)
fm = FastMail(conf)
background_tasks.add_task(fm.send_message,message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
The email folder must be present within your applications working directory.
In sending HTML emails, the CSS expected by mail servers -outlook, google, etc- must be inline CSS. Fastapi mail passes "body" to the rendered template. In creating the template for emails the dynamic objects should be used with the assumption that the variable is named "body" and that it is a python dict.
check out jinja2 for more details https://jinja.palletsprojects.com/en/2.11.x/
class EmailSchema(BaseModel):
email: List[EmailStr]
body: Dict[str, Any]
conf = ConnectionConfig(
MAIL_USERNAME = "YourUsername",
MAIL_PASSWORD = "strong_password",
MAIL_FROM = "[email protected]",
MAIL_PORT = 587,
MAIL_SERVER = "your mail server",
MAIL_TLS = True,
MAIL_SSL = False,
TEMPLATE_FOLDER='./email templates folder'
)
@app.post("/email")
async def send_with_template(email: EmailSchema) -> JSONResponse:
message = MessageSchema(
subject="Fastapi-Mail module",
recipients=email.dict().get("email"), # List of recipients, as many as you can pass
body=email.dict().get("body"),
subtype="html"
)
fm = FastMail(conf)
await fm.send_message(message, template_name="email_template.html") ##optional field template_name is the name of the html file(jinja template) to use from the email template folder
return JSONResponse(status_code=200, content={"message": "email has been sent"})
The utility allows you to check temporary email addresses, you can block any email or domain. You can connect Redis to save and check email addresses. If you do not provide a Redis configuration, then the utility will save it in the list or set by default.
from fastapi import FastAPI, Query, Body
from starlette.responses import JSONResponse
from pydantic import EmailStr
from typing import List
from fastapi_mail.email_utils import DefaultChecker
from fastapi import Depends
app = FastAPI()
async def default_checker():
checker = DefaultChecker() # you can pass source argument for your own email domains
await checker.fetch_temp_email_domains() # require to fetch temporary email domains
return checker
@app.get('/email/dispasoble')
async def simple_send(domain: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
if await checker.is_dispasoble(domain):
return JSONResponse(status_code=400, content={'message': 'this is dispasoble domain'})
...
return JSONResponse(status_code=200, content={'message': 'email has been sent'})
@app.post('/email/dispasoble')
async def add_disp_domain(domains: list = Body(...,embed=True), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.add_temp_domain(domains)
return JSONResponse(status_code=200, content={'result': res})
@app.post('/email/blocked/domains')
async def block_domain(domain: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
await checker.blacklist_add_domain(domain)
return JSONResponse(status_code=200, content={'message': f'{domain} added to blacklist'})
@app.get('/email/blocked/domains')
async def get_blocked_domain(domain: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.is_blocked_domain(domain)
return JSONResponse(status_code=200, content={"result": res})
@app.post('/email/blocked/address')
async def block_address(email: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
await checker.blacklist_add_email(email)
return JSONResponse(status_code=200, content={"result": True})
@app.get('/email/blocked/address')
async def get_block_address(email: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.is_blocked_address(email)
return JSONResponse(status_code=200, content={"result": res})
@app.get('/email/mx')
async def test_mx(email: EmailStr = Query(...),full_result: bool = Query(False) ,checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
domain = email.split("@")[-1]
res = await checker.check_mx_record(domain,full_result)
return JSONResponse(status_code=200, content=res)
@app.delete('/email/blocked/address')
async def del_blocked_address(email: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.blacklist_rm_email(email)
return JSONResponse(status_code=200, content={"result": res})
@app.delete('/email/blocked/domains')
async def del_blocked_domain(domain: str = Query(...), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.blacklist_rm_domain(domain)
return JSONResponse(status_code=200, content={"result": res})
@app.delete('/email/dispasoble')
async def del_disp_domain(domains: list = Body(...,embed=True), checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.blacklist_rm_temp(domains)
return JSONResponse(status_code=200, content={'result': res})
Fell free to open issue and send pull request.
Thanks goes to these wonderful people (π§):
Sabuhi Shukurov π¬ π π§ |
Tural Muradov π π π§ |
Hasan Aliyev π π§ π |
Ashwani π§ |
Leon Xu π§ |
Gabriel Oliveira π π§ |
Onothoja Marho π π§ π§ |
Tim Kiely π§ |
This project follows the all-contributors specification. Contributions of any kind are welcome!
Before you start please read CONTRIBUTING