From 0d764d981fbbb5b5284b61d3b8b2d0c1e5badfe8 Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Fri, 11 Oct 2024 11:16:06 +0300 Subject: [PATCH] feat: 1st release --- api/smtp/backend.go | 15 +- api/smtp/session.go | 30 +--- config/config.go | 3 +- config/config_test.go | 2 +- helm/int-email/templates/certificate.yaml | 6 +- helm/int-email/templates/deployment.yaml | 22 ++- .../templates/issuer-letsencrypt.yaml | 8 +- helm/int-email/values-awakari-com.yaml | 2 +- helm/int-email/values.yaml | 31 +++- main.go | 13 +- scripts/cover.sh | 2 +- service/converter/logging.go | 2 +- service/converter/service.go | 151 +++++++++++------- service/converter/service_test.go | 150 +++++++++++++++++ service/logging.go | 27 ++++ service/service.go | 39 +++++ service/service_test.go | 75 +++++++++ 17 files changed, 457 insertions(+), 121 deletions(-) create mode 100644 service/converter/service_test.go create mode 100644 service/logging.go create mode 100644 service/service.go create mode 100644 service/service_test.go diff --git a/api/smtp/backend.go b/api/smtp/backend.go index 66d6c38..55a5590 100644 --- a/api/smtp/backend.go +++ b/api/smtp/backend.go @@ -1,30 +1,25 @@ package smtp import ( - "github.com/awakari/int-email/service/converter" - "github.com/awakari/int-email/service/writer" + "github.com/awakari/int-email/service" "github.com/emersion/go-smtp" ) type backend struct { - svcWriter writer.Service rcpts map[string]bool dataLimit int64 - evtType string - conv converter.Service + svc service.Service } -func NewBackend(svcWriter writer.Service, rcpts map[string]bool, dataLimit int64, evtType string, conv converter.Service) smtp.Backend { +func NewBackend(rcpts map[string]bool, dataLimit int64, svc service.Service) smtp.Backend { return backend{ - svcWriter: svcWriter, rcpts: rcpts, dataLimit: dataLimit, - evtType: evtType, - conv: conv, + svc: svc, } } func (b backend) NewSession(c *smtp.Conn) (s smtp.Session, err error) { - s = newSession(b.svcWriter, b.rcpts, b.dataLimit, b.evtType, b.conv) + s = newSession(b.rcpts, b.dataLimit, b.svc) return } diff --git a/api/smtp/session.go b/api/smtp/session.go index 1f65f91..a2eed4f 100644 --- a/api/smtp/session.go +++ b/api/smtp/session.go @@ -2,40 +2,30 @@ package smtp import ( "context" - "github.com/awakari/int-email/service/converter" - "github.com/awakari/int-email/service/writer" - "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/awakari/int-email/service" "github.com/emersion/go-smtp" - "github.com/segmentio/ksuid" "io" ) type session struct { - svcWriter writer.Service rcptsAllowed map[string]bool dataLimit int64 - evtType string - conv converter.Service + svc service.Service // allowed bool from string - data []byte } -func newSession(svcWriter writer.Service, rcptsAllowed map[string]bool, dataLimit int64, evtType string, conv converter.Service) smtp.Session { +func newSession(rcptsAllowed map[string]bool, dataLimit int64, svc service.Service) smtp.Session { return &session{ - svcWriter: svcWriter, rcptsAllowed: rcptsAllowed, dataLimit: dataLimit, - evtType: evtType, - conv: conv, } } func (s *session) Reset() { s.allowed = false s.from = "" - s.data = nil return } @@ -59,18 +49,8 @@ func (s *session) Data(r io.Reader) (err error) { switch s.allowed { case true: r = io.LimitReader(r, s.dataLimit) - evt := &pb.CloudEvent{ - Id: ksuid.New().String(), - Source: s.from, - SpecVersion: "1.0", - Type: s.evtType, - Attributes: make(map[string]*pb.CloudEventAttributeValue), - } - err = s.conv.Convert(r, evt) - switch err { - case nil: - err = s.svcWriter.Write(context.TODO(), evt, "default", s.from) - default: + err = s.svc.Submit(context.TODO(), s.from, r) + if err != nil { err = &smtp.SMTPError{ Code: 554, EnhancedCode: smtp.EnhancedCode{ diff --git a/config/config.go b/config/config.go index 4a0ef1b..6d3b90a 100644 --- a/config/config.go +++ b/config/config.go @@ -31,10 +31,11 @@ type ApiConfig struct { Tls struct { CertPath string `envconfig:"API_SMTP_TLS_CERT_PATH" default:"/etc/smtp/tls/tls.crt" required:"true"` KeyPath string `envconfig:"API_SMTP_TLS_KEY_PATH" default:"/etc/smtp/tls/tls.key" required:"true"` - MinVersion uint16 `envconfig:"API_SMTP_TLS_MIN_VERSION" default:"769" required:"true"` + VersionMin uint16 `envconfig:"API_SMTP_TLS_VERSION_MIN" default:"769" required:"true"` ClientAuthType tls.ClientAuthType `envconfig:"API_SMTP_TLS_CLIENT_AUTH_TYPE" default:"4" required:"true"` } } + Group string `envconfig:"API_GROUP" default:"default" required:"true"` EventType EventTypeConfig Interests struct { Uri string `envconfig:"API_INTERESTS_URI" required:"true" default:"subscriptions-proxy:50051"` diff --git a/config/config_test.go b/config/config_test.go index 3a27300..5041dd6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -19,5 +19,5 @@ func TestConfig(t *testing.T) { assert.Equal(t, 23*time.Hour, cfg.Api.Writer.Backoff) assert.Equal(t, "writer:56789", cfg.Api.Writer.Uri) assert.Equal(t, slog.LevelWarn, slog.Level(cfg.Log.Level)) - assert.Equal(t, tls.VerifyClientCertIfGiven, cfg.Api.Smtp.Tls.ClientAuthType) + assert.Equal(t, tls.RequireAndVerifyClientCert, cfg.Api.Smtp.Tls.ClientAuthType) } diff --git a/helm/int-email/templates/certificate.yaml b/helm/int-email/templates/certificate.yaml index 9cd53cf..1830f4f 100644 --- a/helm/int-email/templates/certificate.yaml +++ b/helm/int-email/templates/certificate.yaml @@ -4,9 +4,11 @@ metadata: name: "{{ include "int-email.fullname" . }}-tls-cert" namespace: "{{ .Release.Namespace }}" spec: - secretName: "{{ include "int-email.fullname" . }}-tls-secret" + {{- range .Values.ingress.tls }} + secretName: "{{ .secretName }}" + {{- end }} issuerRef: - name: "{{ .Values.cert.issuer.name }}-int-email" + name: "{{ .Values.tls.cert.issuer.name }}-int-email" kind: Issuer dnsNames: {{- range .Values.ingress.hosts }} diff --git a/helm/int-email/templates/deployment.yaml b/helm/int-email/templates/deployment.yaml index 998a49c..8013b40 100644 --- a/helm/int-email/templates/deployment.yaml +++ b/helm/int-email/templates/deployment.yaml @@ -37,8 +37,26 @@ spec: - name: API_SMTP_HOST value: "{{ .host }}" {{- end }} - - name: API_EVENT_TYPE - value: "{{ .Values.api.event.type }}" + - name: API_SMTP_DATA_LIMIT + value: "{{ .Values.api.smtp.data.limit }}" + - name: API_SMTP_RECIPIENTS_NAMES + value: "{{ .Values.api.smtp.rcpt.names }}" + - name: API_SMTP_RECIPIENTS_LIMIT + value: "{{ .Values.api.smtp.rcpt.limit }}" + - name: API_SMTP_TIMEOUT_READ + value: "{{ .Values.api.smtp.timeout.read }}" + - name: API_SMTP_TIMEOUT_WRITE + value: "{{ .Values.api.smtp.timeout.write }}" + - name: API_SMTP_TLS_CERT_PATH + value: "{{ .Values.tls.cert.path }}" + - name: API_SMTP_TLS_KEY_PATH + value: "{{ .Values.tls.key.path }}" + - name: API_SMTP_TLS_VERSION_MIN + value: "{{ .Values.tls.version.min }}" + - name: API_SMTP_TLS_CLIENT_AUTH_TYPE + value: "{{ .Values.tls.client.auth.type }}" + - name: API_GROUP + value: "{{ .Values.api.group }}" - name: API_INTERESTS_URI value: "{{ .Values.api.interests.uri }}" - name: API_INTERESTS_DETAILS_URI_PREFIX diff --git a/helm/int-email/templates/issuer-letsencrypt.yaml b/helm/int-email/templates/issuer-letsencrypt.yaml index c68b7c6..f2b8b91 100644 --- a/helm/int-email/templates/issuer-letsencrypt.yaml +++ b/helm/int-email/templates/issuer-letsencrypt.yaml @@ -2,14 +2,14 @@ apiVersion: cert-manager.io/v1 kind: Issuer metadata: - name: "{{ .Values.cert.issuer.name }}-int-email" + name: "{{ .Values.tls.cert.issuer.name }}-int-email" namespace: "{{ .Release.Namespace }}" spec: acme: - server: {{ .Values.cert.acme.server }} - email: {{ .Values.cert.acme.email }} + server: {{ .Values.tls.cert.acme.server }} + email: {{ .Values.tls.cert.acme.email }} privateKeySecretRef: - name: {{ .Values.cert.issuer.name }} + name: {{ .Values.tls.cert.issuer.name }} solvers: - dns01: cloudDNS: # Example for Google Cloud DNS, use your appropriate provider diff --git a/helm/int-email/values-awakari-com.yaml b/helm/int-email/values-awakari-com.yaml index 10cdc6b..ee7d96b 100644 --- a/helm/int-email/values-awakari-com.yaml +++ b/helm/int-email/values-awakari-com.yaml @@ -7,7 +7,7 @@ api: uri: "api:50051" # prod -cert: +tls: acme: server: "https://acme-v02.api.letsencrypt.org/directory" issuer: diff --git a/helm/int-email/values.yaml b/helm/int-email/values.yaml index ce70106..4b25060 100644 --- a/helm/int-email/values.yaml +++ b/helm/int-email/values.yaml @@ -85,6 +85,16 @@ affinity: - "true" api: + group: "default" + smtp: + data: + limit: 1048576 + rcpt: + names: "publish" + limit: 100 + timeout: + read: "1m" + write: "1m" event: typ: self: "com_awakari_email_v1" @@ -109,12 +119,21 @@ backup: schedule: "0 0 31 2 *" # never, manually only volume: name: "backup-secrets" -cert: - acme: - email: "awakari@awakari.com" - server: "https://acme-staging-v02.api.letsencrypt.org/directory" - issuer: - name: letsencrypt-staging +tls: + version: + min: 769 + client: + auth: + type: 4 + key: + path: "/etc/smtp/tls/tls.key" + cert: + path: "/etc/smtp/tls/tls.crt" + acme: + email: "awakari@awakari.com" + server: "https://acme-staging-v02.api.letsencrypt.org/directory" + issuer: + name: letsencrypt-staging log: # https://pkg.go.dev/golang.org/x/exp/slog#Level level: -4 diff --git a/main.go b/main.go index 68cbab3..bc73597 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "github.com/awakari/client-sdk-go/api" apiSmtp "github.com/awakari/int-email/api/smtp" "github.com/awakari/int-email/config" + "github.com/awakari/int-email/service" "github.com/awakari/int-email/service/converter" "github.com/awakari/int-email/service/writer" "github.com/emersion/go-smtp" @@ -43,17 +44,17 @@ func main() { svcWriter := writer.NewService(clientAwk, cfg.Api.Writer.Backoff, cfg.Api.Writer.Cache, log) svcWriter = writer.NewLogging(svcWriter, log) + svcConv := converter.NewConverter(cfg.Api.EventType.Self) + svcConv = converter.NewLogging(svcConv, log) + svc := service.NewService(svcConv, svcWriter, cfg.Api.Group) + svc = service.NewLogging(svc, log) rcpts := map[string]bool{} for _, name := range cfg.Api.Smtp.Recipients.Names { rcpt := fmt.Sprintf("%s@%s", name, cfg.Api.Smtp.Host) rcpts[rcpt] = true } - - svcConv := converter.NewConverter() - svcConv = converter.NewLogging(svcConv, log) - - b := apiSmtp.NewBackend(svcWriter, rcpts, int64(cfg.Api.Smtp.Data.Limit), cfg.Api.EventType.Self, svcConv) + b := apiSmtp.NewBackend(rcpts, int64(cfg.Api.Smtp.Data.Limit), svc) b = apiSmtp.NewBackendLogging(b, log) srv := smtp.NewServer(b) @@ -76,7 +77,7 @@ func main() { cert, }, ClientAuth: cfg.Api.Smtp.Tls.ClientAuthType, - MinVersion: cfg.Api.Smtp.Tls.MinVersion, + MinVersion: cfg.Api.Smtp.Tls.VersionMin, } log.Info("starting to listen for emails...") diff --git a/scripts/cover.sh b/scripts/cover.sh index 390f86d..48596fe 100755 --- a/scripts/cover.sh +++ b/scripts/cover.sh @@ -1,7 +1,7 @@ #!/bin/bash COVERAGE=$(cat cover.tmp) -THRESHOLD=1 +THRESHOLD=35 if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \ then \ echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \ diff --git a/service/converter/logging.go b/service/converter/logging.go index 893966e..e014bb7 100644 --- a/service/converter/logging.go +++ b/service/converter/logging.go @@ -23,6 +23,6 @@ func NewLogging(svc Service, log *slog.Logger) Service { func (l logging) Convert(src io.Reader, dst *pb.CloudEvent) (err error) { err = l.svc.Convert(src, dst) - l.log.Log(context.TODO(), util.LogLevel(err), fmt.Sprintf("converter.Convert(objectUrl=%s, evtId=%s): %s", dst.Attributes[ceKeyObjectUrl], dst.Id, err)) + l.log.Log(context.TODO(), util.LogLevel(err), fmt.Sprintf("converter.Convert(source=%s, objectUrl=%s, evtId=%s): %s", dst.Source, dst.Attributes[ceKeyObjectUrl], dst.Id, err)) return } diff --git a/service/converter/service.go b/service/converter/service.go index 85cef16..a262052 100644 --- a/service/converter/service.go +++ b/service/converter/service.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "github.com/jhillyerd/enmime" + "github.com/segmentio/ksuid" "google.golang.org/protobuf/types/known/timestamppb" "io" "strings" @@ -16,6 +17,7 @@ type Service interface { } type svc struct { + evtType string } const ceKeyLenMax = 20 @@ -24,6 +26,7 @@ const ceKeyTime = "time" const ceKeyAttContentIds = "attachmentcids" const ceKeyAttContentTypes = "attachmentctypes" const ceKeyAttFileNames = "attachmentfilenames" +const ceSpecVersion = "1.0" var ErrParse = errors.New("failed to parse message") @@ -43,8 +46,10 @@ var headerBlacklist = map[string]bool{ "xreceived": true, } -func NewConverter() Service { - return svc{} +func NewConverter(evtType string) Service { + return svc{ + evtType: evtType, + } } func (c svc) Convert(src io.Reader, dst *pb.CloudEvent) (err error) { @@ -71,75 +76,99 @@ func (c svc) convert(src *enmime.Envelope, dst *pb.CloudEvent) (err error) { TextData: src.HTML, } } + if dst.Data == nil { + err = fmt.Errorf("%w: %s", ErrParse, "no text data") + } - for _, k := range src.GetHeaderKeys() { - v := src.GetHeader(k) - ceKey := c.convertHeaderKey(k) - switch ceKey { - case "date": - var t time.Time - t, err = time.Parse(time.RFC1123Z, v) - if err != nil { - dst.Attributes[ceKeyTime] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeTimestamp{ - CeTimestamp: timestamppb.New(t), - }, + if err == nil { + for _, k := range src.GetHeaderKeys() { + v := src.GetHeader(k) + ceKey := c.convertHeaderKey(k) + switch ceKey { + case "date": + var t time.Time + t, err = time.Parse(time.RFC1123Z, v) + switch err { + case nil: + dst.Attributes[ceKeyTime] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeTimestamp{ + CeTimestamp: timestamppb.New(t), + }, + } + default: + err = fmt.Errorf("%w: %s", ErrParse, err) } - } - case "messageid": - objectUrl := v - if strings.HasPrefix(objectUrl, "<") { - objectUrl = objectUrl[1:] - } - if strings.HasSuffix(objectUrl, ">") { - objectUrl = objectUrl[:len(objectUrl)-1] - } - dst.Attributes[ceKeyObjectUrl] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeUri{ - CeUri: objectUrl, - }, - } - default: - if !headerBlacklist[ceKey] { - dst.Attributes[ceKey] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: v, + case "messageid": + objectUrl := v + if strings.HasPrefix(objectUrl, "<") { + objectUrl = objectUrl[1:] + } + if strings.HasSuffix(objectUrl, ">") { + objectUrl = objectUrl[:len(objectUrl)-1] + } + dst.Attributes[ceKeyObjectUrl] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeUri{ + CeUri: objectUrl, }, } + default: + if !headerBlacklist[ceKey] { + dst.Attributes[ceKey] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: v, + }, + } + } + } + if err != nil { + break } - } - if err != nil { - break } } - var parts []*enmime.Part - parts = append(parts, src.Attachments...) - parts = append(parts, src.Inlines...) - parts = append(parts, src.OtherParts...) - var contentIds []string - var contentTypes []string - var fileNames []string - for _, p := range parts { - contentIds = append(contentIds, p.ContentID) - contentTypes = append(contentTypes, p.ContentType) - fileNames = append(fileNames, p.FileName) - } - if len(parts) > 0 { - dst.Attributes[ceKeyAttContentIds] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: strings.Join(contentIds, ", "), - }, + if err == nil { + if dst.Attributes[ceKeyTime] == nil { + err = fmt.Errorf("%w: %s", ErrParse, "no message date in the source data") } - dst.Attributes[ceKeyAttContentTypes] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: strings.Join(contentTypes, ", "), - }, + if dst.Attributes[ceKeyObjectUrl] == nil { + err = fmt.Errorf("%w: %s", ErrParse, "no message in the source data") + } + } + + if err == nil { + + dst.Id = ksuid.New().String() + dst.SpecVersion = ceSpecVersion + dst.Type = c.evtType + + var parts []*enmime.Part + parts = append(parts, src.Attachments...) + parts = append(parts, src.Inlines...) + parts = append(parts, src.OtherParts...) + var contentIds []string + var contentTypes []string + var fileNames []string + for _, p := range parts { + contentIds = append(contentIds, p.ContentID) + contentTypes = append(contentTypes, p.ContentType) + fileNames = append(fileNames, p.FileName) } - dst.Attributes[ceKeyAttFileNames] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: strings.Join(fileNames, ", "), - }, + if len(parts) > 0 { + dst.Attributes[ceKeyAttContentIds] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: strings.Join(contentIds, ", "), + }, + } + dst.Attributes[ceKeyAttContentTypes] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: strings.Join(contentTypes, ", "), + }, + } + dst.Attributes[ceKeyAttFileNames] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: strings.Join(fileNames, ", "), + }, + } } } diff --git a/service/converter/service_test.go b/service/converter/service_test.go new file mode 100644 index 0000000..c7ef452 --- /dev/null +++ b/service/converter/service_test.go @@ -0,0 +1,150 @@ +package converter + +import ( + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" + "io" + "log/slog" + "strings" + "testing" +) + +func TestSvc_Convert(t *testing.T) { + cases := map[string]struct { + r io.Reader + out *pb.CloudEvent + err error + }{ + "empty": { + r: strings.NewReader(``), + err: ErrParse, + }, + "no date": { + r: strings.NewReader(`From: John Doe +To: Jane Smith +Subject: Meeting Notes and Attachment +Message-ID: +MIME-Version: 1.0 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hi Jane, + +Please find attached the meeting notes and presentation slides. + +Best regards, +John`), + err: ErrParse, + }, + "no message id": { + r: strings.NewReader(`From: John Doe +To: Jane Smith +Subject: Meeting Notes and Attachment +Date: Thu, 10 Oct 2024 12:34:56 +0000 +MIME-Version: 1.0 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hi Jane, + +Please find attached the meeting notes and presentation slides. + +Best regards, +John`), + err: ErrParse, + }, + "parse fail": { + r: strings.NewReader(`?`), + err: ErrParse, + }, + "ok": { + r: strings.NewReader(`From: John Doe +To: Jane Smith +Subject: Meeting Notes and Attachment +Date: Thu, 10 Oct 2024 12:34:56 +0000 +Message-ID: +MIME-Version: 1.0 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hi Jane, + +Please find attached the meeting notes and presentation slides. + +Best regards, +John`), + out: &pb.CloudEvent{ + Attributes: map[string]*pb.CloudEventAttributeValue{ + "contenttype": { + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: "text/plain; charset=\"UTF-8\"", + }, + }, + "contenttransferencod": { + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: "7bit", + }, + }, + "objecturl": { + Attr: &pb.CloudEventAttributeValue_CeUri{ + CeUri: "unique-message-id@example.com", + }, + }, + "mimeversion": { + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: "1.0", + }, + }, + "subject": { + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: "Meeting Notes and Attachment", + }, + }, + "time": { + Attr: &pb.CloudEventAttributeValue_CeTimestamp{ + CeTimestamp: ×tamppb.Timestamp{ + Seconds: 1728563696, + }, + }, + }, + }, + }, + }, + "invalid date format": { + r: strings.NewReader(`From: John Doe +To: Jane Smith +Subject: Meeting Notes and Attachment +Date: Thu, 40 Oct 1024 12-34:56 +MIME-Version: 1.0 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hi Jane, + +Please find attached the meeting notes and presentation slides. + +Best regards, +John`), + err: ErrParse, + }, + } + conv := NewConverter("com_awakari_email_v1") + conv = NewLogging(conv, slog.Default()) + for k, c := range cases { + t.Run(k, func(t *testing.T) { + dst := &pb.CloudEvent{ + Attributes: make(map[string]*pb.CloudEventAttributeValue), + } + err := conv.Convert(c.r, dst) + if c.err == nil { + assert.NotZero(t, dst.Id) + for attrK, attrV := range c.out.Attributes { + assert.True(t, dst.Attributes[attrK] != nil, attrK) + assert.Equal(t, dst.Attributes[attrK].Attr, attrV.Attr, attrK) + } + } + assert.ErrorIs(t, err, c.err) + }) + } +} diff --git a/service/logging.go b/service/logging.go new file mode 100644 index 0000000..b217f8c --- /dev/null +++ b/service/logging.go @@ -0,0 +1,27 @@ +package service + +import ( + "context" + "fmt" + "github.com/awakari/int-email/util" + "io" + "log/slog" +) + +type logging struct { + svc Service + log *slog.Logger +} + +func NewLogging(svc Service, log *slog.Logger) Service { + return logging{ + svc: svc, + log: log, + } +} + +func (l logging) Submit(ctx context.Context, from string, r io.Reader) (err error) { + err = l.svc.Submit(ctx, from, r) + l.log.Log(ctx, util.LogLevel(err), fmt.Sprintf("service.Submit(from=%s): %s", from, err)) + return +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..7639839 --- /dev/null +++ b/service/service.go @@ -0,0 +1,39 @@ +package service + +import ( + "context" + "github.com/awakari/int-email/service/converter" + "github.com/awakari/int-email/service/writer" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "io" +) + +type Service interface { + Submit(ctx context.Context, from string, r io.Reader) (err error) +} + +type svc struct { + conv converter.Service + writer writer.Service + group string +} + +func NewService(conv converter.Service, writer writer.Service, group string) Service { + return svc{ + conv: conv, + writer: writer, + group: group, + } +} + +func (s svc) Submit(ctx context.Context, from string, r io.Reader) (err error) { + evt := &pb.CloudEvent{ + Source: from, + Attributes: make(map[string]*pb.CloudEventAttributeValue), + } + err = s.conv.Convert(r, evt) + if err == nil { + err = s.writer.Write(context.TODO(), evt, s.group, from) + } + return +} diff --git a/service/service_test.go b/service/service_test.go new file mode 100644 index 0000000..b31f4ad --- /dev/null +++ b/service/service_test.go @@ -0,0 +1,75 @@ +package service + +import ( + "context" + "github.com/awakari/int-email/service/converter" + "github.com/awakari/int-email/service/writer" + "github.com/stretchr/testify/assert" + "io" + "log/slog" + "strings" + "testing" +) + +func TestSvc_Submit(t *testing.T) { + cases := map[string]struct { + from string + in io.Reader + err error + }{ + "empty": { + in: strings.NewReader(""), + err: converter.ErrParse, + }, + "ok": { + from: "johndoe@example.com", + in: strings.NewReader(`From: John Doe +To: Jane Smith +Subject: Meeting Notes and Attachment +Date: Thu, 10 Oct 2024 12:34:56 +0000 +Message-ID: +MIME-Version: 1.0 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hi Jane, + +Please find attached the meeting notes and presentation slides. + +Best regards, +John`), + }, + "fail write": { + from: "fail", + in: strings.NewReader(`From: John Doe +To: Jane Smith +Subject: Meeting Notes and Attachment +Date: Thu, 10 Oct 2024 12:34:56 +0000 +Message-ID: +MIME-Version: 1.0 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hi Jane, + +Please find attached the meeting notes and presentation slides. + +Best regards, +John`), + err: writer.ErrWrite, + }, + } + log := slog.Default() + s := NewService( + converter.NewLogging(converter.NewConverter("com_awakari_email_v1"), log), + writer.NewLogging(writer.NewMock(), log), + "default", + ) + s = NewLogging(s, log) + for k, c := range cases { + t.Run(k, func(t *testing.T) { + err := s.Submit(context.TODO(), c.from, c.in) + assert.ErrorIs(t, err, c.err) + }) + } +}