Skip to content

Commit

Permalink
[FIX] pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiocorato committed Sep 12, 2024
1 parent b4ac6eb commit d6ac49c
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 142 deletions.
5 changes: 1 addition & 4 deletions l10n_it_fatturapa_govway/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
{
"name": "ITA - Fattura elettronica - Supporto GovWay",
"version": "14.0.1.0.0",
"version": "16.0.1.0.0",
"category": "Localization/Italy",
"summary": "Invio fatture elettroniche tramite GovWay",
"author": "Odoo Community Association (OCA)",
Expand All @@ -20,9 +20,6 @@
# "views/fetchmail_view.xml",
"views/company_view.xml",
"views/sdi_view.xml",
"views/ir_mail_server.xml",
"data/fetchmail_data.xml",
"data/config_parameter.xml",
"data/sdi_channel_demo.xml",
],
"installable": True,
Expand Down
69 changes: 48 additions & 21 deletions l10n_it_fatturapa_govway/controllers/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging

from odoo.http import Controller, request, route

_logger = logging.getLogger()


class FatturaPAGovWay(Controller):
# incoming invoices
@route(["/fatturapa/govway/ricevi_fattura"], type="http", auth="user", website=True)
def ricevi_fattura(self):
def ricevi_fattura(self, *args, **post):
# headers
# - GovWay-SDI-FormatoArchivioBase64
# - GovWay-SDI-FormatoArchivioInvioFattura
Expand All @@ -16,46 +18,71 @@ def ricevi_fattura(self):
# - GovWay-SDI-NomeFile
# - GovWay-SDI-NomeFileMetadati
# - GovWay-Transaction-ID
identificativo_sdi = request.httprequest.headers.get('GovWay-SDI-IdentificativoSdI', '')
sdi_nomefile = request.httprequest.headers.get('GovWay-SDI-NomeFile', '')
transaction_id = request.httprequest.headers.get('GovWay-Transaction-ID', '')
identificativo_sdi = request.httprequest.headers.get(
"GovWay-SDI-IdentificativoSdI", ""
)
sdi_nomefile = request.httprequest.headers.get("GovWay-SDI-NomeFile", "")
transaction_id = request.httprequest.headers.get("GovWay-Transaction-ID", "")

sdi_formatoarchiviobase64 = request.httprequest.headers.get("GovWay-SDI-FormatoArchivioBase64", "")
sdi_formatoarchiviiinviofattura = request.httprequest.headers.get("GovWay-SDI-FormatoArchivioInvioFattura", "")
sdi_formartofatturapa = request.httprequest.headers.get("GovWay-SDI-FormatoFatturaPA", "")
sdi_messageid= request.httprequest.headers.get("GovWay-SDI-MessageId", "")
sdi_nomefile_metadati = request.httprequest.headers.get("GovWay-SDI-NomeFileMetadati", "")
request.httprequest.headers.get("GovWay-SDI-FormatoArchivioBase64", "")
request.httprequest.headers.get("GovWay-SDI-FormatoArchivioInvioFattura", "")
request.httprequest.headers.get("GovWay-SDI-FormatoFatturaPA", "")
request.httprequest.headers.get("GovWay-SDI-MessageId", "")
request.httprequest.headers.get("GovWay-SDI-NomeFileMetadati", "")

_logger.info("ricevi_fattura(): {} {} {}".format(identificativo_sdi, sdi_nomefile, transaction_id))
_logger.info(
"ricevi_fattura(): {} {} {}".format(
identificativo_sdi, sdi_nomefile, transaction_id
)
)
_logger.debug("ricevi_fattura(): args={}".format(repr(args)))
_logger.debug("ricevi_fattura(): post={}".format(repr(post)))

@route(["/fatturapa/govway/ricevi_ndt"], type="http", auth="user", website=True)
def ricevi_ndt(self):
def ricevi_ndt(self, *args, **post):
# headers
# - GovWay-SDI-IdentificativoSdI
# - GovWay-SDI-NomeFile
# - GovWay-Transaction-ID
identificativo_sdi = request.httprequest.headers.get('GovWay-SDI-IdentificativoSdI', '')
sdi_nomefile = request.httprequest.headers.get('GovWay-SDI-NomeFile', '')
transaction_id = request.httprequest.headers.get('GovWay-Transaction-ID', '')
identificativo_sdi = request.httprequest.headers.get(
"GovWay-SDI-IdentificativoSdI", ""
)
sdi_nomefile = request.httprequest.headers.get("GovWay-SDI-NomeFile", "")
transaction_id = request.httprequest.headers.get("GovWay-Transaction-ID", "")

_logger.info("ricevi_ndt(): {} {} {}".format(identificativo_sdi, sdi_nomefile, transaction_id))
_logger.info(
"ricevi_ndt(): {} {} {}".format(
identificativo_sdi, sdi_nomefile, transaction_id
)
)
_logger.debug("ricevi_ndt(): args={}".format(repr(args)))
_logger.debug("ricevi_ndt(): post={}".format(repr(post)))

# outgoing invoices
@route(["/fatturapa/govway/ricevi_notifica"], type="http", auth="user", methods=['POST'], website=True)
@route(
["/fatturapa/govway/ricevi_notifica"],
type="http",
auth="user",
methods=["POST"],
website=True,
)
def ricevi_notifica(self, *args, **post):
# headers:
# - GovWay-SDI-IdentificativoSdI
# - GovWay-SDI-NomeFile
# - GovWay-Transaction-ID
identificativo_sdi = request.httprequest.headers.get('GovWay-SDI-IdentificativoSdI', '')
sdi_nomefile = request.httprequest.headers.get('GovWay-SDI-NomeFile', '')
transaction_id = request.httprequest.headers.get('GovWay-Transaction-ID', '')
identificativo_sdi = request.httprequest.headers.get(
"GovWay-SDI-IdentificativoSdI", ""
)
sdi_nomefile = request.httprequest.headers.get("GovWay-SDI-NomeFile", "")
transaction_id = request.httprequest.headers.get("GovWay-Transaction-ID", "")

_logger.info("ricevi_notifica(): {} {} {}".format(identificativo_sdi, sdi_nomefile, transaction_id))
_logger.info(
"ricevi_notifica(): {} {} {}".format(
identificativo_sdi, sdi_nomefile, transaction_id
)
)
_logger.debug("ricevi_notifica(): args={}".format(repr(args)))
_logger.debug("ricevi_notifica(): post={}".format(repr(post)))
#request.env["sdi.channel"].sdi_channel_model.receive_notification({ sdi_nomefile: post })
# request.env["sdi.channel"].sdi_channel_model.receive_notification(
# { sdi_nomefile: post })
1 change: 1 addition & 0 deletions l10n_it_fatturapa_govway/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
# from . import ir_mail_server
# from . import fetchmail
from . import company

# from . import sdi
222 changes: 110 additions & 112 deletions l10n_it_fatturapa_govway/models/sdi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
from urllib.parse import urljoin

from odoo import _, api, exceptions, fields, models
from odoo.tools import config
from odoo.exceptions import UserError
import requests
from requests import HTTPError, Timeout, TooManyRedirects
from urllib.parse import urljoin
from requests import Timeout, TooManyRedirects

from odoo import _, api, fields, models
from odoo.exceptions import UserError


class SdiChannel(models.Model):
Expand All @@ -14,68 +15,68 @@ class SdiChannel(models.Model):
selection_add=[("govway", "GovWay")], ondelete={"govway": "cascade"}
)

@api.constrains("fetch_pec_server_id")
def check_fetch_pec_server_id(self):
pec_channels = self.filtered(lambda c: c.channel_type == "pec")
for channel in pec_channels:
domain = [
("fetch_pec_server_id", "=", channel.fetch_pec_server_id.id),
("id", "in", pec_channels.ids),
]
elements = self.search(domain)
if len(elements) > 1:
raise exceptions.ValidationError(
_("The channel %s with pec server %s already exists")
% (channel.name, channel.fetch_pec_server_id.name)
)
# @api.constrains("fetch_pec_server_id")
# def check_fetch_pec_server_id(self):
# pec_channels = self.filtered(lambda c: c.channel_type == "pec")
# for channel in pec_channels:
# domain = [
# ("fetch_pec_server_id", "=", channel.fetch_pec_server_id.id),
# ("id", "in", pec_channels.ids),
# ]
# elements = self.search(domain)
# if len(elements) > 1:
# raise exceptions.ValidationError(
# _("The channel %s with pec server %s already exists")
# % (channel.name, channel.fetch_pec_server_id.name)
# )
#
# @api.constrains("pec_server_id")
# def check_pec_server_id(self):
# pec_channels = self.filtered(lambda c: c.channel_type == "pec")
# for channel in pec_channels:
# domain = [
# ("pec_server_id", "=", channel.pec_server_id.id),
# ("id", "in", pec_channels.ids),
# ]
# elements = self.search(domain)
# if len(elements) > 1:
# raise exceptions.ValidationError(
# _("The channel %s with pec server %s already exists")
# % (channel.name, channel.pec_server_id.name)
# )

@api.constrains("pec_server_id")
def check_pec_server_id(self):
pec_channels = self.filtered(lambda c: c.channel_type == "pec")
for channel in pec_channels:
domain = [
("pec_server_id", "=", channel.pec_server_id.id),
("id", "in", pec_channels.ids),
]
elements = self.search(domain)
if len(elements) > 1:
raise exceptions.ValidationError(
_("The channel %s with pec server %s already exists")
% (channel.name, channel.pec_server_id.name)
)
# @api.constrains("email_exchange_system")
# def check_email_validity(self):
# if self.env.context.get("skip_check_email_validity"):
# return
# pec_channels = self.filtered(lambda c: c.channel_type == "pec")
# for channel in pec_channels:
# if not extract_rfc2822_addresses(channel.email_exchange_system):
# raise exceptions.ValidationError(
# _("Email %s is not valid") % channel.email_exchange_system
# )

@api.constrains("email_exchange_system")
def check_email_validity(self):
if self.env.context.get("skip_check_email_validity"):
return
pec_channels = self.filtered(lambda c: c.channel_type == "pec")
for channel in pec_channels:
if not extract_rfc2822_addresses(channel.email_exchange_system):
raise exceptions.ValidationError(
_("Email %s is not valid") % channel.email_exchange_system
)
# def check_first_pec_sending(self):
# if not self.first_invoice_sent:
# sdi_address = self.env["ir.config_parameter"].get_param(
# "sdi.pec.first.address",
# )
# self.email_exchange_system = sdi_address
# else:
# if not self.email_exchange_system:
# raise exceptions.UserError(
# _(
# "SDI PEC address not set. Please update it with the "
# "address indicated by SDI after the first sending"
# )
# )

def check_first_pec_sending(self):
if not self.first_invoice_sent:
sdi_address = self.env["ir.config_parameter"].get_param(
"sdi.pec.first.address",
)
self.email_exchange_system = sdi_address
else:
if not self.email_exchange_system:
raise exceptions.UserError(
_(
"SDI PEC address not set. Please update it with the "
"address indicated by SDI after the first sending"
)
)

def update_after_first_pec_sending(self):
if not self.first_invoice_sent:
self.first_invoice_sent = True
self.with_context(
skip_check_email_validity=True
).email_exchange_system = False
# def update_after_first_pec_sending(self):
# if not self.first_invoice_sent:
# self.first_invoice_sent = True
# self.with_context(
# skip_check_email_validity=True
# ).email_exchange_system = False

@api.model
def _check_server_govway(self):
Expand All @@ -84,10 +85,8 @@ def _check_server_govway(self):
url = urljoin(govway_url, "/status")
try:
response = requests.get(
url=url,
params=params,
headers=headers,
timeout=60)
url=url
) # , params=params, headers=headers, timeout=60)
response_data = json.loads(response.text)
status = response_data.get("status")
if status and status.get("error_code", False):
Expand All @@ -99,52 +98,51 @@ def _check_server_govway(self):
% (status.get("error_code"), status.get("error_message"))
)
except (ConnectionError, Timeout, TooManyRedirects) as e:
raise UserError(_(
"GovWay server not available for %s. Please configure it.")
% str(e)
raise UserError(
_("GovWay server not available for %s. Please configure it.") % str(e)
)
return response_data.get("data", {})

def send_via_govway(self, attachment_out_ids):
self._check_server_govway()
self.check_first_pec_sending()
user = self.env.user
for att in attachment_out_ids:
if not att.datas or not att.name:
raise UserError(_("File content and file name are mandatory"))
company = att.company_id
mail_message = self.env["mail.message"].create(
{
"model": att._name,
"res_id": att.id,
"subject": att.name,
"body": "XML file for FatturaPA {} sent to Exchange System to "
"the email address {}.".format(
att.name, company.email_exchange_system
),
"attachment_ids": [(6, 0, att.ir_attachment_id.ids)],
"email_from": company.email_from_for_fatturaPA,
"reply_to": company.email_from_for_fatturaPA,
"mail_server_id": company.sdi_channel_id.pec_server_id.id,
}
)

mail = self.env["mail.mail"].create(
{
"mail_message_id": mail_message.id,
"body_html": mail_message.body,
"email_to": company.email_exchange_system,
"headers": {"Return-Path": company.email_from_for_fatturaPA},
}
)

if mail:
try:
mail.send(raise_exception=True)
att.state = "sent"
att.sending_date = fields.Datetime.now()
att.sending_user = user.id
company.sdi_channel_id.update_after_first_pec_sending()
except MailDeliveryException as e:
att.state = "sender_error"
mail.body = str(e)
# self.check_first_pec_sending()
# user = self.env.user
# for att in attachment_out_ids:
# if not att.datas or not att.name:
# raise UserError(_("File content and file name are mandatory"))
# company = att.company_id
# mail_message = self.env["mail.message"].create(
# {
# "model": att._name,
# "res_id": att.id,
# "subject": att.name,
# "body": "XML file for FatturaPA {} sent to Exchange System to "
# "the email address {}.".format(
# att.name, company.email_exchange_system
# ),
# "attachment_ids": [(6, 0, att.ir_attachment_id.ids)],
# "email_from": company.email_from_for_fatturaPA,
# "reply_to": company.email_from_for_fatturaPA,
# "mail_server_id": company.sdi_channel_id.pec_server_id.id,
# }
# )
#
# mail = self.env["mail.mail"].create(
# {
# "mail_message_id": mail_message.id,
# "body_html": mail_message.body,
# "email_to": company.email_exchange_system,
# "headers": {"Return-Path": company.email_from_for_fatturaPA},
# }
# )
#
# if mail:
# try:
# mail.send(raise_exception=True)
# att.state = "sent"
# att.sending_date = fields.Datetime.now()
# att.sending_user = user.id
# company.sdi_channel_id.update_after_first_pec_sending()
# except MailDeliveryException as e:
# att.state = "sender_error"
# mail.body = str(e)
2 changes: 1 addition & 1 deletion l10n_it_fatturapa_govway/models/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ curl -k -X POST -basic -u "SdIRiceviFile:cq7dP4axLyA3RfCtwwH94FNz" \

https://testgovway.pretecno.com/govway/Pretecno/

https://testgovway.pretecno.com/govway/sdi/in/Pretecno/TrasmissioneFatture/v1
https://testgovway.pretecno.com/govway/sdi/in/Pretecno/TrasmissioneFatture/v1
Loading

0 comments on commit d6ac49c

Please sign in to comment.