Skip to content

Commit

Permalink
Merge pull request #167 from moov-io/future-actions
Browse files Browse the repository at this point in the history
Future actions
  • Loading branch information
adamdecaf authored Aug 3, 2023
2 parents 1d63203 + 9e9d352 commit 5a78997
Show file tree
Hide file tree
Showing 15 changed files with 953 additions and 199 deletions.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ This project is used in production at an early stage and might undergo breaking

We publish a [public Docker image `moov/ach-test-harness`](https://hub.docker.com/r/moov/ach-test-harness/) from Docker Hub or use this repository. No configuration is required to serve on `:2222` and metrics at `:3333/metrics` in Prometheus format. <!-- We also have Docker images for [OpenShift](https://quay.io/repository/moov/ach-test-harness?tab=tags) published as `quay.io/moov/ach-test-harness`. -->

### Docker image

Pull & start the Docker image:
```
$ docker-compose up
Expand All @@ -46,6 +48,8 @@ harness_1 | ts=2021-03-24T20:36:10Z msg="listening on [::]:3333" level=info app

You can then use an FTP client that connects to `localhost:2222` with a username of `admin` and password of `secret`. Upload files to the `outbound/` directory and watch for any responses.

### config.yml

After setup inspect the configuration file in `./examples/config.yml` and setup some scenarios to match uploaded files.

```yaml
Expand Down Expand Up @@ -78,8 +82,18 @@ ACHTestHarness:
action:
return:
code: "R03"

- match:
amount:
value: 12357 # $123.57
action:
delay: "12h"
return:
code: "R10"
```
#### config schema
The full config for Responses is below:
```yaml
Expand All @@ -95,8 +109,22 @@ match:
routingNumber: <string> # Exact match of ABA routing number (RDFIIdentification and CheckDigit)
traceNumber: <string> # Exact match of TraceNumber
entryType: <string> # Checks TransactionCode. Accepted values: credit, debit or prenote.
# Matching will find at most two Actions in the config file order. One Copy Action and one Return/Correction Action.
# Both actions will be executed if the Return/Correction Action has a delay.
# Valid combinations include:
# 1. Copy
# 2. Return/Correction with Delay
# 3. Return/Correction without Delay
# 4. Copy and Return/Correction with Delay
# 5. Nothing
# Invalid combinations are:
# 1. Copy and Return/Correction without Delay
# 2. Copy with Delay (validated when reading configuration)
action:
# Copy the EntryDetail to another directory
# How long into the future should we wait before making the correction/return available?
delay: <duration>

# Copy the EntryDetail to another directory (not valid with a delay)
copy:
path: <string> # Filepath on the FTP server

Expand Down
1 change: 1 addition & 0 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ACHTestHarness:
# This matches ./examples/utility-bill.ach
accountNumber: "744-5678-99"
action:
delay: "12h"
correction:
code: "C01"
data: "744567899"
Expand Down
36 changes: 36 additions & 0 deletions pkg/filedrive/mtime_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package filedrive

import (
"time"

"goftp.io/server/core"
)

type MTimeFilter struct {
core.Driver
}

func (mtf MTimeFilter) ListDir(path string, callback func(core.FileInfo) error) error {
now := time.Now()

return mtf.Driver.ListDir(path, func(info core.FileInfo) error {
if info.ModTime().Before(now) {
return callback(info)
}
return nil
})
}

type Factory struct {
DriverFactory core.DriverFactory
}

func (f *Factory) NewDriver() (core.Driver, error) {
dd, err := f.DriverFactory.NewDriver()
if err != nil {
return nil, err
}
return MTimeFilter{
Driver: dd,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/response/batch_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (bm *batchMirror) saveFiles() error {
if filename, err := bm.filename(); err != nil {
return fmt.Errorf("unable to get filename: %v", err)
} else {
bm.writer.Write(filepath.Join(path, filename), &buf)
bm.writer.Write(filepath.Join(path, filename), &buf, nil)
}
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/response/entry_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

type EntryTransformer interface {
MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action service.Action) (*ach.EntryDetail, error)
MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action *service.Action) (*ach.EntryDetail, error)
}

type EntryTransformers []EntryTransformer

func (et EntryTransformers) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action service.Action) (*ach.EntryDetail, error) {
func (et EntryTransformers) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action *service.Action) (*ach.EntryDetail, error) {
var err error
for i := range et {
ed, err = et[i].MorphEntry(fh, ed, action)
Expand All @@ -27,7 +27,7 @@ func (et EntryTransformers) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, a

type CorrectionTransformer struct{}

func (t *CorrectionTransformer) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action service.Action) (*ach.EntryDetail, error) {
func (t *CorrectionTransformer) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action *service.Action) (*ach.EntryDetail, error) {
if action.Correction == nil {
return ed, nil
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func generateCorrectedData(cor *service.Correction) string {

type ReturnTransformer struct{}

func (t *ReturnTransformer) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action service.Action) (*ach.EntryDetail, error) {
func (t *ReturnTransformer) MorphEntry(fh ach.FileHeader, ed *ach.EntryDetail, action *service.Action) (*ach.EntryDetail, error) {
if action.Return == nil {
return ed, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/response/entry_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestMorphEntry__Correction(t *testing.T) {
},
}
ed := file.Batches[0].GetEntries()[0]
out, err := xform.MorphEntry(file.Header, ed, action)
out, err := xform.MorphEntry(file.Header, ed, &action)
require.NoError(t, err)

if out.Addenda98 == nil {
Expand All @@ -49,7 +49,7 @@ func TestMorphEntry__Return(t *testing.T) {
},
}
ed := file.Batches[0].GetEntries()[0]
out, err := xform.MorphEntry(file.Header, ed, action)
out, err := xform.MorphEntry(file.Header, ed, &action)
require.NoError(t, err)

if out.Addenda98 != nil {
Expand Down
150 changes: 109 additions & 41 deletions pkg/response/file_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,11 @@ func NewFileTransformer(logger log.Logger, cfg *service.Config, responses []serv
func (ft *FileTransfomer) Transform(file *ach.File) error {
out := ach.NewFile()
out.SetValidation(ft.ValidateOpts)

out.Header = ach.NewFileHeader()
out.Header.SetValidation(ft.ValidateOpts)

out.Header.ImmediateDestination = file.Header.ImmediateOrigin
out.Header.ImmediateDestinationName = file.Header.ImmediateOriginName
out.Header.ImmediateOrigin = file.Header.ImmediateDestination
out.Header.ImmediateOriginName = file.Header.ImmediateDestinationName
out.Header.FileCreationDate = time.Now().Format("060102")
out.Header.FileCreationTime = time.Now().Format("1504")
out.Header.FileIDModifier = "A"

if err := out.Header.Validate(); err != nil {
return fmt.Errorf("file transform: header validate: %v", err)
if err := createOutHeader(out, file, ft.ValidateOpts); err != nil {
return err
}

var entrySaved = false
for i := range file.Batches {
mirror := newBatchMirror(ft.Writer, file.Batches[i])
batch, err := ach.NewBatch(file.Batches[i].GetHeader())
Expand All @@ -64,57 +53,136 @@ func (ft *FileTransfomer) Transform(file *ach.File) error {
}
entries := file.Batches[i].GetEntries()
for j := range entries {
// Check if there's a matching Action and perform it
action := ft.Matcher.FindAction(entries[j])
if action != nil {
entry, err := ft.Entry.MorphEntry(file.Header, entries[j], *action)
// Check if there's a matching Action and perform it. There may also be a future-dated action to execute.
copyAction, processAction := ft.Matcher.FindAction(entries[j])
if copyAction != nil {
logger := ft.Matcher.Logger.With(copyAction)
logger.Log("Processing matched action")

mirror.saveEntry(copyAction.Copy, entries[j])
entrySaved = true
}
if processAction != nil {
logger := ft.Matcher.Logger.With(processAction)
logger.Log("Processing matched action")

entry, err := ft.Entry.MorphEntry(file.Header, entries[j], processAction)
if err != nil {
return fmt.Errorf("transform batch[%d] morph entry[%d] error: %v", i, j, err)
}

// When the entry is corrected we need to change the SEC code
if entry.Category == ach.CategoryNOC {
bh := batch.GetHeader()
bh.StandardEntryClassCode = ach.COR
if b, err := ach.NewBatch(bh); b != nil {
batch = b // replace entire Batch
if processAction.Delay != nil {
// need to save off the future-dated entry
futOut := ach.NewFile()
futOut.SetValidation(ft.ValidateOpts)
if futErr := createOutHeader(futOut, file, ft.ValidateOpts); futErr != nil {
return futErr
}

futMirror := newBatchMirror(ft.Writer, file.Batches[i])
var futBatch ach.Batcher
var futErr error

// When the entry is corrected we need to change the SEC code
if entry.Category == ach.CategoryNOC {
bh := *file.Batches[i].GetHeader()
bh.StandardEntryClassCode = ach.COR
futBatch, futErr = ach.NewBatch(&bh)
} else {
return fmt.Errorf("transform batch[%d] NOC entry[%d] error: %v", i, j, err)
futBatch, futErr = ach.NewBatch(file.Batches[i].GetHeader())
}
if futErr != nil {
return fmt.Errorf("transform batch[%d] problem creating Batch: %v", i, futErr)
}

// Add the transformed entry onto the batch
if entry != nil {
futBatch.AddEntry(entry)
}
}

// Save this Entry
if action.Copy != nil {
mirror.saveEntry(action.Copy, entries[j])
// Save off the entries as requested
if futErr = futMirror.saveFiles(); futErr != nil {
return fmt.Errorf("problem saving entries: %v", futErr)
}
// Create our Batch's Control and other fields
if futErr = futBatch.Create(); futErr != nil {
return fmt.Errorf("transform batch[%d] create error: %v", i, futErr)
}
futOut.AddBatch(futBatch)

if futErr = writeOutFile(futOut, ft, processAction.Delay); futErr != nil {
return futErr
}
} else {
// When the entry is corrected we need to change the SEC code
if entry.Category == ach.CategoryNOC {
bh := batch.GetHeader()
bh.StandardEntryClassCode = ach.COR
if b, err := ach.NewBatch(bh); b != nil {
batch = b // replace entire Batch
} else {
return fmt.Errorf("transform batch[%d] NOC entry[%d] error: %v", i, j, err)
}
}

// Add the transformed entry onto the batch
if entry != nil {
batch.AddEntry(entry)
}
entrySaved = true
}
}
}
// Save off the entries as requested
if err := mirror.saveFiles(); err != nil {
return fmt.Errorf("problem saving entries: %v", err)
}
// Create our Batch's Control and other fields
if entries := batch.GetEntries(); len(entries) > 0 {
if err := batch.Create(); err != nil {
return fmt.Errorf("transform batch[%d] create error: %v", i, err)

if entrySaved {
// Save off the entries as requested
if err := mirror.saveFiles(); err != nil {
return fmt.Errorf("problem saving entries: %v", err)
}
// Create our Batch's Control and other fields
if entries := batch.GetEntries(); len(entries) > 0 {
if err := batch.Create(); err != nil {
return fmt.Errorf("transform batch[%d] create error: %v", i, err)
}
out.AddBatch(batch)
}
out.AddBatch(batch)
}
}

if err := writeOutFile(out, ft, nil); err != nil {
return err
}
return nil
}

func createOutHeader(out *ach.File, file *ach.File, opts *ach.ValidateOpts) error {
out.Header = ach.NewFileHeader()
out.Header.SetValidation(opts)

out.Header.ImmediateDestination = file.Header.ImmediateOrigin
out.Header.ImmediateDestinationName = file.Header.ImmediateOriginName
out.Header.ImmediateOrigin = file.Header.ImmediateDestination
out.Header.ImmediateOriginName = file.Header.ImmediateDestinationName
out.Header.FileCreationDate = time.Now().Format("060102")
out.Header.FileCreationTime = time.Now().Format("1504")
out.Header.FileIDModifier = "A"

if err := out.Header.Validate(); err != nil {
return fmt.Errorf("file transform: header validate: %v", err)
}

return nil
}

func writeOutFile(out *ach.File, ft *FileTransfomer, delay *time.Duration) error {
if out != nil && len(out.Batches) > 0 {
if err := out.Create(); err != nil {
return fmt.Errorf("transform out create: %v", err)
}
if err := out.Validate(); err == nil {
filepath := filepath.Join(ft.returnPath, generateFilename(out)) // TODO(adam): need to determine return path
if err := ft.Writer.WriteFile(filepath, out); err != nil {
return fmt.Errorf("transform write %s: %v", filepath, err)
generatedFilePath := filepath.Join(ft.returnPath, generateFilename(out)) // TODO(adam): need to determine return path
if err := ft.Writer.WriteFile(generatedFilePath, out, delay); err != nil {
return fmt.Errorf("transform write %s: %v", generatedFilePath, err)
}
} else {
return fmt.Errorf("transform validate out file: %v", err)
Expand Down
Loading

0 comments on commit 5a78997

Please sign in to comment.