diff --git a/conduit/plugins/exporters/filewriter/README.md b/conduit/plugins/exporters/filewriter/README.md
index 21098847..c91e5c6e 100644
--- a/conduit/plugins/exporters/filewriter/README.md
+++ b/conduit/plugins/exporters/filewriter/README.md
@@ -2,7 +2,10 @@
Write block data to files. This plugin works with the file rerader plugin to create a simple file-based pipeine.
+The genesis file is always exported to a plain JSON file named `genesis.json` regardless of the `FilenamePattern`.
+
## Configuration
+
```yml @sample.yaml
name: file_writer
config:
@@ -13,9 +16,11 @@ config:
# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
- # If the file has a '.gz' extension, blocks will be gzipped.
- # Default: "%[1]d_block.json"
- filename-pattern: "%[1]d_block.json"
+ # To specify JSON encoding, add a '.json' extension to the filename.
+ # To specify MessagePack encoding, add a '.msgp' extension to the filename.
+ # If the file has a '.gz' extension, blocks will be gzipped regardless of encoding.
+ # Default: "%[1]d_block.msgp.gz"
+ filename-pattern: "%[1]d_block.msgp.gz"
# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
diff --git a/conduit/plugins/exporters/filewriter/file_exporter.go b/conduit/plugins/exporters/filewriter/file_exporter.go
index bcb3f7f6..28862372 100644
--- a/conduit/plugins/exporters/filewriter/file_exporter.go
+++ b/conduit/plugins/exporters/filewriter/file_exporter.go
@@ -18,13 +18,19 @@ import (
const (
// PluginName to use when configuring.
PluginName = "file_writer"
+
// FilePattern is used to name the output files.
- FilePattern = "%[1]d_block.json"
+ FilePattern = "%[1]d_block.msgp.gz"
+
+ // GenesisFilename is the name of the genesis file.
+ GenesisFilename = "genesis.json"
)
type fileExporter struct {
round uint64
cfg Config
+ gzip bool
+ format EncodingFormat
logger *logrus.Logger
}
@@ -51,20 +57,34 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
+ exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
+ if err != nil {
+ return fmt.Errorf("Init() error: %w", err)
+ }
+
// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
}
// create block directory
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
- if err != nil && errors.Is(err, os.ErrExist) {
- // Ignore mkdir if the dir exists
- err = nil
- } else if err != nil {
+ if err != nil && !errors.Is(err, os.ErrExist) {
+ // Ignore mkdir err if the dir exists (case errors.Is(err, os.ErrExist))
return fmt.Errorf("Init() error: %w", err)
}
+
exp.round = uint64(initProvider.NextDBRound())
- return err
+
+ genesis := initProvider.GetGenesis()
+ genesisPath := path.Join(exp.cfg.BlocksDir, GenesisFilename)
+
+ // the genesis is always exported as plain JSON:
+ err = EncodeToFile(genesisPath, genesis, JSONFormat, false)
+ if err != nil {
+ return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
+ }
+
+ return nil
}
func (exp *fileExporter) Close() error {
@@ -87,10 +107,12 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
}
blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
- err := EncodeJSONToFile(blockFile, exportData, true)
+
+ err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}
+
exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}
diff --git a/conduit/plugins/exporters/filewriter/file_exporter_test.go b/conduit/plugins/exporters/filewriter/file_exporter_test.go
index 555df398..9ee054f6 100644
--- a/conduit/plugins/exporters/filewriter/file_exporter_test.go
+++ b/conduit/plugins/exporters/filewriter/file_exporter_test.go
@@ -9,7 +9,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
@@ -21,29 +20,47 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)
+const (
+ defaultEncodingFormat = MessagepackFormat
+ defaultIsGzip = true
+)
+
var logger *logrus.Logger
var fileCons = exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
})
-var configTemplate = "block-dir: %s/blocks\n"
+var configTemplatePrefix = "block-dir: %s/blocks\n"
var round = sdk.Round(2)
func init() {
logger, _ = test.NewNullLogger()
}
-func getConfig(t *testing.T) (config, tempdir string) {
+func getConfigWithoutPattern(t *testing.T) (config, tempdir string) {
tempdir = t.TempDir()
- config = fmt.Sprintf(configTemplate, tempdir)
+ config = fmt.Sprintf(configTemplatePrefix, tempdir)
+ return
+}
+
+func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) {
+ config, tempdir = getConfigWithoutPattern(t)
+ config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, pattern)
return
}
+func TestDefaults(t *testing.T) {
+ format, gzip, err := ParseFilenamePattern(FilePattern)
+ require.NoError(t, err)
+ require.Equal(t, format, defaultEncodingFormat)
+ require.Equal(t, gzip, defaultIsGzip)
+}
+
func TestExporterMetadata(t *testing.T) {
fileExp := fileCons.New()
meta := fileExp.Metadata()
- assert.Equal(t, metadata.Name, meta.Name)
- assert.Equal(t, metadata.Description, meta.Description)
- assert.Equal(t, metadata.Deprecated, meta.Deprecated)
+ require.Equal(t, metadata.Name, meta.Name)
+ require.Equal(t, metadata.Description, meta.Description)
+ require.Equal(t, metadata.Deprecated, meta.Deprecated)
}
func TestExporterInitDefaults(t *testing.T) {
@@ -87,18 +104,18 @@ func TestExporterInitDefaults(t *testing.T) {
}
func TestExporterInit(t *testing.T) {
- config, _ := getConfig(t)
+ config, _ := getConfigWithPattern(t, "%[1]d_block.json")
fileExp := fileCons.New()
defer fileExp.Close()
// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
- assert.NoError(t, err)
+ require.NoError(t, err)
fileExp.Close()
// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
- assert.NoError(t, err)
+ require.NoError(t, err)
fileExp.Close()
}
@@ -155,55 +172,67 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds
}
func TestExporterReceive(t *testing.T) {
- config, tempdir := getConfig(t)
- fileExp := fileCons.New()
- numRounds := 5
- sendData(t, fileExp, config, numRounds)
-
- // block data is valid
- for i := 0; i < 5; i++ {
- filename := fmt.Sprintf(FilePattern, i)
- path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
- require.FileExists(t, path)
-
- blockBytes, err := os.ReadFile(path)
- require.NoError(t, err)
- assert.NotContains(t, string(blockBytes), " 0: ")
+ patterns := []string{
+ "%[1]d_block.json",
+ "%[1]d_block.json.gz",
+ "%[1]d_block.msgp",
+ "%[1]d_block.msgp.gz",
+ }
+ for _, pattern := range patterns {
+ pattern := pattern
+ t.Run(pattern, func(t *testing.T) {
+ t.Parallel()
- var blockData data.BlockData
- err = DecodeJSONFromFile(path, &blockData, true)
- require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
- require.NoError(t, err)
- require.NotNil(t, blockData.Certificate)
+ format, isGzip, err := ParseFilenamePattern(pattern)
+ require.NoError(t, err)
+ config, tempdir := getConfigWithPattern(t, pattern)
+ fileExp := fileCons.New()
+ numRounds := 5
+ sendData(t, fileExp, config, numRounds)
+
+ // block data is valid
+ for i := 0; i < 5; i++ {
+ filename := fmt.Sprintf(pattern, i)
+ path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
+ require.FileExists(t, path)
+
+ blockBytes, err := os.ReadFile(path)
+ require.NoError(t, err)
+ require.NotContains(t, string(blockBytes), " 0: ")
+
+ var blockData data.BlockData
+ err = DecodeFromFile(path, &blockData, format, isGzip)
+ require.NoError(t, err)
+ require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
+ require.NotNil(t, blockData.Certificate)
+ }
+ })
}
}
func TestExporterClose(t *testing.T) {
- config, _ := getConfig(t)
+ config, _ := getConfigWithoutPattern(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}
-func TestPatternOverride(t *testing.T) {
- config, tempdir := getConfig(t)
+func TestPatternDefault(t *testing.T) {
+ config, tempdir := getConfigWithoutPattern(t)
fileExp := fileCons.New()
- patternOverride := "PREFIX_%[1]d_block.json"
- config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, patternOverride)
-
numRounds := 5
sendData(t, fileExp, config, numRounds)
// block data is valid
for i := 0; i < 5; i++ {
- filename := fmt.Sprintf(patternOverride, i)
+ filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
- assert.FileExists(t, path)
+ require.FileExists(t, path)
var blockData data.BlockData
- err := DecodeJSONFromFile(path, &blockData, true)
+ err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
@@ -227,10 +256,10 @@ func TestDropCertificate(t *testing.T) {
for i := 0; i < numRounds; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/%s", tempdir, filename)
- assert.FileExists(t, path)
+ require.FileExists(t, path)
var blockData data.BlockData
- err := DecodeJSONFromFile(path, &blockData, true)
- assert.NoError(t, err)
- assert.Nil(t, blockData.Certificate)
+ err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
+ require.NoError(t, err)
+ require.Nil(t, blockData.Certificate)
}
}
diff --git a/conduit/plugins/exporters/filewriter/sample.yaml b/conduit/plugins/exporters/filewriter/sample.yaml
index 59895661..ea012a34 100644
--- a/conduit/plugins/exporters/filewriter/sample.yaml
+++ b/conduit/plugins/exporters/filewriter/sample.yaml
@@ -7,9 +7,11 @@ config:
# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
- # If the file has a '.gz' extension, blocks will be gzipped.
- # Default: "%[1]d_block.json"
- filename-pattern: "%[1]d_block.json"
+ # To specify JSON encoding, add a '.json' extension to the filename.
+ # To specify MessagePack encoding, add a '.msgp' extension to the filename.
+ # If the file has a '.gz' extension, blocks will be gzipped regardless of encoding.
+ # Default: "%[1]d_block.msgp.gz"
+ filename-pattern: "%[1]d_block.msgp.gz"
# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
diff --git a/conduit/plugins/exporters/filewriter/util.go b/conduit/plugins/exporters/filewriter/util.go
index 313bd323..0e787ecd 100644
--- a/conduit/plugins/exporters/filewriter/util.go
+++ b/conduit/plugins/exporters/filewriter/util.go
@@ -1,7 +1,6 @@
package filewriter
import (
- "bytes"
"compress/gzip"
"fmt"
"io"
@@ -9,44 +8,69 @@ import (
"strings"
"github.com/algorand/go-algorand-sdk/v2/encoding/json"
+ "github.com/algorand/go-algorand-sdk/v2/encoding/msgpack"
"github.com/algorand/go-codec/codec"
)
-var prettyHandle *codec.JsonHandle
-var jsonStrictHandle *codec.JsonHandle
+// EncodingFormat enumerates the acceptable encoding formats for Conduit file-based plugins.
+type EncodingFormat byte
+
+const (
+ // MessagepackFormat indicates the file is encoded using MessagePack.
+ MessagepackFormat EncodingFormat = iota
+
+ // JSONFormat indicates the file is encoded using JSON.
+ JSONFormat
+
+ // UnrecognizedFormat indicates the file's encoding is unknown to Conduit.
+ UnrecognizedFormat
+)
+
+var jsonPrettyHandle *codec.JsonHandle
func init() {
- prettyHandle = new(codec.JsonHandle)
- prettyHandle.ErrorIfNoField = json.CodecHandle.ErrorIfNoField
- prettyHandle.ErrorIfNoArrayExpand = json.CodecHandle.ErrorIfNoArrayExpand
- prettyHandle.Canonical = json.CodecHandle.Canonical
- prettyHandle.RecursiveEmptyCheck = json.CodecHandle.RecursiveEmptyCheck
- prettyHandle.Indent = json.CodecHandle.Indent
- prettyHandle.HTMLCharsAsIs = json.CodecHandle.HTMLCharsAsIs
- prettyHandle.MapKeyAsString = true
- prettyHandle.Indent = 2
-
- jsonStrictHandle = new(codec.JsonHandle)
- jsonStrictHandle.ErrorIfNoField = prettyHandle.ErrorIfNoField
- jsonStrictHandle.ErrorIfNoArrayExpand = prettyHandle.ErrorIfNoArrayExpand
- jsonStrictHandle.Canonical = prettyHandle.Canonical
- jsonStrictHandle.RecursiveEmptyCheck = prettyHandle.RecursiveEmptyCheck
- jsonStrictHandle.Indent = prettyHandle.Indent
- jsonStrictHandle.HTMLCharsAsIs = prettyHandle.HTMLCharsAsIs
- jsonStrictHandle.MapKeyAsString = true
+ jsonPrettyHandle = new(codec.JsonHandle)
+ jsonPrettyHandle.ErrorIfNoField = json.CodecHandle.ErrorIfNoField
+ jsonPrettyHandle.ErrorIfNoArrayExpand = json.CodecHandle.ErrorIfNoArrayExpand
+ jsonPrettyHandle.Canonical = json.CodecHandle.Canonical
+ jsonPrettyHandle.RecursiveEmptyCheck = json.CodecHandle.RecursiveEmptyCheck
+ jsonPrettyHandle.Indent = json.CodecHandle.Indent
+ jsonPrettyHandle.HTMLCharsAsIs = json.CodecHandle.HTMLCharsAsIs
+ jsonPrettyHandle.MapKeyAsString = true
+ jsonPrettyHandle.Indent = 2
}
-// EncodeJSONToFile is used to encode an object to a file. If the file ends in .gz it will be gzipped.
-func EncodeJSONToFile(filename string, v interface{}, pretty bool) error {
- var writer io.Writer
+// ParseFilenamePattern parses a filename pattern into an EncodingFormat and gzip flag.
+func ParseFilenamePattern(pattern string) (EncodingFormat, bool, error) {
+ originalPattern := pattern
+ gzip := false
+ if strings.HasSuffix(pattern, ".gz") {
+ gzip = true
+ pattern = pattern[:len(pattern)-3]
+ }
+
+ var blockFormat EncodingFormat
+ if strings.HasSuffix(pattern, ".msgp") {
+ blockFormat = MessagepackFormat
+ } else if strings.HasSuffix(pattern, ".json") {
+ blockFormat = JSONFormat
+ } else {
+ return UnrecognizedFormat, false, fmt.Errorf("unrecognized export format: %s", originalPattern)
+ }
+
+ return blockFormat, gzip, nil
+}
+// EncodeToFile encodes an object to a file using a given format and possible gzip compression.
+func EncodeToFile(filename string, v interface{}, format EncodingFormat, isGzip bool) error {
file, err := os.Create(filename)
if err != nil {
- return fmt.Errorf("EncodeJSONToFile(): failed to create %s: %w", filename, err)
+ return fmt.Errorf("EncodeToFile(): failed to create %s: %w", filename, err)
}
defer file.Close()
- if strings.HasSuffix(filename, ".gz") {
+ var writer io.Writer
+ if isGzip {
gz := gzip.NewWriter(file)
gz.Name = filename
defer gz.Close()
@@ -55,41 +79,51 @@ func EncodeJSONToFile(filename string, v interface{}, pretty bool) error {
writer = file
}
- var handle *codec.JsonHandle
- if pretty {
- handle = prettyHandle
- } else {
- handle = jsonStrictHandle
+ return Encode(format, writer, v)
+}
+
+// Encode an object to a writer using a given an EncodingFormat.
+func Encode(format EncodingFormat, writer io.Writer, v interface{}) error {
+ var handle codec.Handle
+ switch format {
+ case JSONFormat:
+ handle = jsonPrettyHandle
+ case MessagepackFormat:
+ handle = msgpack.LenientCodecHandle
+ default:
+ return fmt.Errorf("Encode(): unhandled format %d", format)
}
- enc := codec.NewEncoder(writer, handle)
- return enc.Encode(v)
+ return codec.NewEncoder(writer, handle).Encode(v)
}
-// DecodeJSONFromFile is used to decode a file to an object.
-func DecodeJSONFromFile(filename string, v interface{}, strict bool) error {
- // Streaming into the decoder was slow.
- fileBytes, err := os.ReadFile(filename)
+// DecodeFromFile decodes a file to an object using a given format and possible gzip compression.
+func DecodeFromFile(filename string, v interface{}, format EncodingFormat, isGzip bool) error {
+ file, err := os.Open(filename)
if err != nil {
- return fmt.Errorf("DecodeJSONFromFile(): failed to read %s: %w", filename, err)
+ return fmt.Errorf("DecodeFromFile(): failed to open %s: %w", filename, err)
}
+ defer file.Close()
- var reader io.Reader = bytes.NewReader(fileBytes)
-
- if strings.HasSuffix(filename, ".gz") {
- gz, err := gzip.NewReader(reader)
+ var reader io.Reader
+ if isGzip {
+ gz, err := gzip.NewReader(file)
if err != nil {
- return fmt.Errorf("DecodeJSONFromFile(): failed to make gzip reader: %w", err)
+ return fmt.Errorf("DecodeFromFile(): failed to make gzip reader: %w", err)
}
defer gz.Close()
reader = gz
- }
- var handle *codec.JsonHandle
- if strict {
- handle = json.CodecHandle
} else {
- handle = json.LenientCodecHandle
+ reader = file
}
- enc := codec.NewDecoder(reader, handle)
- return enc.Decode(v)
+ var handle codec.Handle
+ switch format {
+ case JSONFormat:
+ handle = json.LenientCodecHandle
+ case MessagepackFormat:
+ handle = msgpack.LenientCodecHandle
+ default:
+ return fmt.Errorf("DecodeFromFile(): unhandled format %d", format)
+ }
+ return codec.NewDecoder(reader, handle).Decode(v)
}
diff --git a/conduit/plugins/exporters/filewriter/util_test.go b/conduit/plugins/exporters/filewriter/util_test.go
index ba017e28..8891710f 100644
--- a/conduit/plugins/exporters/filewriter/util_test.go
+++ b/conduit/plugins/exporters/filewriter/util_test.go
@@ -1,13 +1,109 @@
package filewriter
import (
- "io/ioutil"
+ "os"
"path"
"testing"
"github.com/stretchr/testify/require"
)
+func TestParseFilenameFormat(t *testing.T) {
+ testCases := []struct {
+ name string
+ format string
+ gzip bool
+ blockFormat EncodingFormat
+ err string
+ }{
+ {
+ name: "messagepack vanilla",
+ format: "%d_block.msgp",
+ gzip: false,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "messagepack gzip",
+ format: "%d_block.msgp.gz",
+ gzip: true,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "json vanilla",
+ format: "%d_block.json",
+ gzip: false,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "json gzip",
+ format: "%d_block.json.gz",
+ gzip: true,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "messagepack vanilla 2",
+ format: "%[1]d_block round%[1]d.msgp",
+ gzip: false,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "messagepack gzip 2",
+ format: "%[1]d_block round%[1]d.msgp.gz",
+ gzip: true,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "json vanilla 2",
+ format: "%[1]d_block round%[1]d.json",
+ gzip: false,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "json gzip 2",
+ format: "%[1]d_block round%[1]d.json.gz",
+ gzip: true,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "invalid - gzip",
+ format: "%d_block.msgp.gzip",
+ gzip: false,
+ blockFormat: UnrecognizedFormat,
+ err: "unrecognized export format",
+ },
+ {
+ name: "invalid - no extension",
+ format: "%d_block",
+ gzip: false,
+ blockFormat: UnrecognizedFormat,
+ err: "unrecognized export format",
+ },
+ }
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ blockFormat, gzip, err := ParseFilenamePattern(tc.format)
+ if tc.err == "" {
+ require.NoError(t, err)
+ require.Equal(t, tc.gzip, gzip)
+ require.Equal(t, tc.blockFormat, blockFormat)
+ } else {
+ require.ErrorContains(t, err, tc.err)
+ }
+ })
+ }
+}
+
func TestEncodeToAndFromFile(t *testing.T) {
tempdir := t.TempDir()
@@ -25,16 +121,17 @@ func TestEncodeToAndFromFile(t *testing.T) {
}
{
- pretty := path.Join(tempdir, "pretty.json")
- err := EncodeJSONToFile(pretty, data, true)
+ jsonFile := path.Join(tempdir, "json.json")
+ err := EncodeToFile(jsonFile, data, JSONFormat, false)
require.NoError(t, err)
- require.FileExists(t, pretty)
+ require.FileExists(t, jsonFile)
var testDecode test
- err = DecodeJSONFromFile(pretty, &testDecode, false)
+ err = DecodeFromFile(jsonFile, &testDecode, JSONFormat, false)
+ require.NoError(t, err)
require.Equal(t, data, testDecode)
// Check the pretty printing
- bytes, err := ioutil.ReadFile(pretty)
+ bytes, err := os.ReadFile(jsonFile)
require.NoError(t, err)
require.Contains(t, string(bytes), " \"one\": \"one\",\n")
require.Contains(t, string(bytes), `"0": "int-key"`)
@@ -42,22 +139,24 @@ func TestEncodeToAndFromFile(t *testing.T) {
{
small := path.Join(tempdir, "small.json")
- err := EncodeJSONToFile(small, data, false)
+ err := EncodeToFile(small, data, JSONFormat, false)
require.NoError(t, err)
require.FileExists(t, small)
var testDecode test
- err = DecodeJSONFromFile(small, &testDecode, false)
+ err = DecodeFromFile(small, &testDecode, JSONFormat, false)
+ require.NoError(t, err)
require.Equal(t, data, testDecode)
}
// gzip test
{
small := path.Join(tempdir, "small.json.gz")
- err := EncodeJSONToFile(small, data, false)
+ err := EncodeToFile(small, data, JSONFormat, true)
require.NoError(t, err)
require.FileExists(t, small)
var testDecode test
- err = DecodeJSONFromFile(small, &testDecode, false)
+ err = DecodeFromFile(small, &testDecode, JSONFormat, true)
+ require.NoError(t, err)
require.Equal(t, data, testDecode)
}
}
diff --git a/conduit/plugins/importers/filereader/README.md b/conduit/plugins/importers/filereader/README.md
index cd023d71..3bda336e 100644
--- a/conduit/plugins/importers/filereader/README.md
+++ b/conduit/plugins/importers/filereader/README.md
@@ -2,7 +2,10 @@
Read files from a directory and import them as blocks. This plugin works with the file exporter plugin to create a simple file-based pipeline.
+The genesis must be a plain JSON file named `genesis.json` regardless of the `FilenamePattern`.
+
## Configuration
+
```yml @sample.yaml
name: file_reader
config:
@@ -10,17 +13,9 @@ config:
# The directory is created if it doesn't exist. If no directory is provided
# blocks are written to the Conduit data directory.
#block-dir: "/path/to/directory"
-
- # RetryDuration controls the delay between checks when the importer has
- # caught up and is waiting for new blocks to appear.
- retry-duration: "5s"
-
- # RetryCount controls the number of times to check for a missing block
- # before generating an error. The retry count and retry duration should
- # be configured according the expected round time.
- retry-count: 5
-
+
# FilenamePattern is the format used to find block files. It uses go string
# formatting and should accept one number for the round.
- filename-pattern: "%[1]d_block.json"
+ # The pattern should match the extension of the files to be read.
+ filename-pattern: "%[1]d_block.msgp.gz"
```
diff --git a/conduit/plugins/importers/filereader/fileReadWrite_test.go b/conduit/plugins/importers/filereader/fileReadWrite_test.go
new file mode 100644
index 00000000..85c5da5c
--- /dev/null
+++ b/conduit/plugins/importers/filereader/fileReadWrite_test.go
@@ -0,0 +1,240 @@
+package fileimporter
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "fmt"
+ "os"
+ "path"
+ "strings"
+ "testing"
+
+ logrusTest "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v3"
+
+ "github.com/algorand/conduit/conduit"
+ "github.com/algorand/conduit/conduit/data"
+ "github.com/algorand/conduit/conduit/plugins"
+ "github.com/algorand/conduit/conduit/plugins/exporters"
+ "github.com/algorand/conduit/conduit/plugins/exporters/filewriter"
+ "github.com/algorand/conduit/conduit/plugins/importers"
+ sdk "github.com/algorand/go-algorand-sdk/v2/types"
+)
+
+const (
+ conduitDataDir = "test_resources/conduit_data"
+ filePattern = "%[1]d_block.msgp.gz"
+ importerBlockDir = "test_resources/filereader_blocks"
+ exporterBlockDir = "test_resources/conduit_data/exporter_file_writer"
+)
+
+func cleanArtifacts(t *testing.T) {
+ err := os.RemoveAll(exporterBlockDir)
+ require.NoError(t, err)
+}
+
+// numGzippedFiles returns the number of files in the importerBlockDir
+// whose filename ends in .gz
+func numGzippedFiles(t *testing.T) uint64 {
+ files, err := os.ReadDir(importerBlockDir)
+ require.NoError(t, err)
+
+ gzCount := uint64(0)
+ for _, file := range files {
+ if strings.HasSuffix(file.Name(), ".gz") {
+ gzCount++
+ }
+ }
+
+ return gzCount
+}
+
+func fileBytes(t *testing.T, path string) []byte {
+ file, err := os.Open(path)
+ require.NoError(t, err, "error opening file %s", path)
+ defer file.Close()
+
+ var buf bytes.Buffer
+ _, err = buf.ReadFrom(file)
+ require.NoError(t, err, "error reading file %s", path)
+
+ return buf.Bytes()
+}
+
+func identicalFiles(t *testing.T, path1, path2 string) {
+ var file1, file2 *os.File
+
+ defer func() {
+ if file1 != nil {
+ file1.Close()
+ }
+ if file2 != nil {
+ file2.Close()
+ }
+ }()
+
+ bytes1 := fileBytes(t, path1)
+ bytes2 := fileBytes(t, path2)
+ require.Equal(t, len(bytes1), len(bytes2), "files %s and %s have different lengths", path1, path2)
+
+ for i, b1 := range bytes1 {
+ b2 := bytes2[i]
+ require.Equal(t, b1, b2, "files %s and %s differ at byte %d (%s) v (%s)", path1, path2, i, string(b1), string(b2))
+ }
+}
+
+func uncompressBytes(t *testing.T, path string) []byte {
+ file, err := os.Open(path)
+ require.NoError(t, err, "error opening file %s", path)
+ defer file.Close()
+
+ gr, err := gzip.NewReader(file)
+ require.NoError(t, err, "error creating gzip reader for file %s", path)
+ defer gr.Close()
+
+ var buf bytes.Buffer
+ _, err = buf.ReadFrom(gr)
+ require.NoError(t, err, "error reading file %s", path)
+
+ return buf.Bytes()
+}
+func identicalFilesUncompressed(t *testing.T, path1, path2 string) {
+ var file1, file2 *os.File
+
+ defer func() {
+ if file1 != nil {
+ file1.Close()
+ }
+ if file2 != nil {
+ file2.Close()
+ }
+ }()
+
+ bytes1 := uncompressBytes(t, path1)
+ bytes2 := uncompressBytes(t, path2)
+ require.Equal(t, len(bytes1), len(bytes2), "files %s and %s have different lengths", path1, path2)
+
+ for i, b1 := range bytes1 {
+ b2 := bytes2[i]
+ require.Equal(t, b1, b2, "files %s and %s differ at byte %d (%s) v (%s)", path1, path2, i, string(b1), string(b2))
+ }
+}
+
+func getConfig(t *testing.T, pt plugins.PluginType, cfg data.NameConfigPair, dataDir string) plugins.PluginConfig {
+ configs, err := yaml.Marshal(cfg.Config)
+ require.NoError(t, err)
+
+ var config plugins.PluginConfig
+ config.Config = string(configs)
+ if dataDir != "" {
+ config.DataDir = path.Join(dataDir, fmt.Sprintf("%s_%s", pt, cfg.Name))
+ err = os.MkdirAll(config.DataDir, os.ModePerm)
+ require.NoError(t, err)
+ }
+
+ return config
+}
+
+// TestRoundTrip tests that blocks read by the filereader importer
+// under the msgp.gz encoding are written to identical files by the filewriter exporter.
+// This includes both a genesis block and a round-0 block with different encodings.
+func TestRoundTrip(t *testing.T) {
+ cleanArtifacts(t)
+ defer cleanArtifacts(t)
+
+ round := sdk.Round(0)
+ lastRound := numGzippedFiles(t) - 2 // subtract round-0 and the separate genesis file
+ require.GreaterOrEqual(t, lastRound, uint64(1))
+ require.LessOrEqual(t, lastRound, uint64(1000)) // overflow sanity check
+
+ ctx := context.Background()
+
+ plineConfig, err := data.MakePipelineConfig(&data.Args{
+ ConduitDataDir: conduitDataDir,
+ })
+ require.NoError(t, err)
+
+ logger, _ := logrusTest.NewNullLogger()
+
+ // Assert configurations:
+ require.Equal(t, "file_reader", plineConfig.Importer.Name)
+ require.Equal(t, importerBlockDir, plineConfig.Importer.Config["block-dir"])
+ require.Equal(t, filePattern, plineConfig.Importer.Config["filename-pattern"])
+
+ require.Equal(t, "file_writer", plineConfig.Exporter.Name)
+ require.Equal(t, filePattern, plineConfig.Exporter.Config["filename-pattern"])
+ require.False(t, plineConfig.Exporter.Config["drop-certificate"].(bool))
+
+ // Simulate the portions of the pipeline's Init() that interact
+ // with the importer and exporter
+ initProvider := conduit.MakePipelineInitProvider(&round, nil, nil)
+
+ // Importer init
+ impCtor, err := importers.ImporterConstructorByName(plineConfig.Importer.Name)
+ require.NoError(t, err)
+ importer := impCtor.New()
+ impConfig := getConfig(t, plugins.Importer, plineConfig.Importer, conduitDataDir)
+ require.NoError(t, err)
+ require.Equal(t, path.Join(conduitDataDir, "importer_file_reader"), impConfig.DataDir)
+
+ err = importer.Init(ctx, initProvider, impConfig, logger)
+ require.NoError(t, err)
+
+ impGenesis, err := importer.GetGenesis()
+ require.NoError(t, err)
+ require.Equal(t, "generated-network", impGenesis.Network)
+
+ genesisFile := filewriter.GenesisFilename
+ // it should be the same as unmarshalling it directly from the expected path
+ require.Equal(t, "genesis.json", genesisFile)
+ require.NoError(t, err)
+
+ impGenesisPath := path.Join(importerBlockDir, genesisFile)
+ genesis := &sdk.Genesis{}
+
+ err = filewriter.DecodeFromFile(impGenesisPath, genesis, filewriter.JSONFormat, false)
+ require.NoError(t, err)
+
+ require.Equal(t, impGenesis, genesis)
+
+ initProvider.SetGenesis(impGenesis)
+
+ // Construct the exporter
+ expCtor, err := exporters.ExporterConstructorByName(plineConfig.Exporter.Name)
+ require.NoError(t, err)
+ exporter := expCtor.New()
+ expConfig := getConfig(t, plugins.Exporter, plineConfig.Exporter, conduitDataDir)
+ require.NoError(t, err)
+ require.Equal(t, path.Join(conduitDataDir, "exporter_file_writer"), expConfig.DataDir)
+
+ err = exporter.Init(ctx, initProvider, expConfig, logger)
+ require.NoError(t, err)
+
+ // It should have persisted the genesis which ought to be identical
+ // to the importer's.
+ expGenesisPath := path.Join(exporterBlockDir, genesisFile)
+ identicalFiles(t, impGenesisPath, expGenesisPath)
+
+ // Simulate the pipeline
+ require.Equal(t, sdk.Round(0), round)
+ for ; uint64(round) <= lastRound; round++ {
+ blk, err := importer.GetBlock(uint64(round))
+ require.NoError(t, err)
+
+ expBlockPath := path.Join(exporterBlockDir, fmt.Sprintf(filePattern, round))
+ _, err = os.OpenFile(expBlockPath, os.O_RDONLY, 0)
+ require.ErrorIs(t, err, os.ErrNotExist)
+
+ err = exporter.Receive(blk)
+ require.NoError(t, err)
+
+ _, err = os.OpenFile(expBlockPath, os.O_RDONLY, 0)
+ require.NoError(t, err)
+
+ impBlockBath := path.Join(importerBlockDir, fmt.Sprintf(filePattern, round))
+
+ identicalFilesUncompressed(t, impBlockBath, expBlockPath)
+ }
+}
diff --git a/conduit/plugins/importers/filereader/filereader.go b/conduit/plugins/importers/filereader/filereader.go
index 874e1185..ea363a9c 100644
--- a/conduit/plugins/importers/filereader/filereader.go
+++ b/conduit/plugins/importers/filereader/filereader.go
@@ -3,9 +3,7 @@ package fileimporter
import (
"context"
_ "embed" // used to embed config
- "errors"
"fmt"
- "io/fs"
"path"
"time"
@@ -25,10 +23,19 @@ const PluginName = "file_reader"
type fileReader struct {
logger *logrus.Logger
cfg Config
+ gzip bool
+ format filewriter.EncodingFormat
ctx context.Context
cancel context.CancelFunc
}
+// package-wide init function
+func init() {
+ importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
+ return &fileReader{}
+ }))
+}
+
// New initializes an algod importer
func New() importers.Importer {
return &fileReader{}
@@ -48,13 +55,6 @@ func (r *fileReader) Metadata() plugins.Metadata {
return metadata
}
-// package-wide init function
-func init() {
- importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
- return &fileReader{}
- }))
-}
-
func (r *fileReader) Init(ctx context.Context, _ data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error {
r.ctx, r.cancel = context.WithCancel(ctx)
r.logger = logger
@@ -66,14 +66,22 @@ func (r *fileReader) Init(ctx context.Context, _ data.InitProvider, cfg plugins.
if r.cfg.FilenamePattern == "" {
r.cfg.FilenamePattern = filewriter.FilePattern
}
+ r.format, r.gzip, err = filewriter.ParseFilenamePattern(r.cfg.FilenamePattern)
+ if err != nil {
+ return fmt.Errorf("Init() error: %w", err)
+ }
return nil
}
+// GetGenesis returns the genesis. Is is assumed that the genesis file is available as `genesis.json`
+// regardless of chosen encoding format and gzip flag.
+// It is also assumed that there is a separate round 0 block file adhering to the expected filename pattern with encoding.
+// This is because genesis and round 0 block have different data and encodings,
+// and the official network genesis files are plain uncompressed JSON.
func (r *fileReader) GetGenesis() (*sdk.Genesis, error) {
- genesisFile := path.Join(r.cfg.BlocksDir, "genesis.json")
var genesis sdk.Genesis
- err := filewriter.DecodeJSONFromFile(genesisFile, &genesis, false)
+ err := filewriter.DecodeFromFile(path.Join(r.cfg.BlocksDir, filewriter.GenesisFilename), &genesis, filewriter.JSONFormat, false)
if err != nil {
return nil, fmt.Errorf("GetGenesis(): failed to process genesis file: %w", err)
}
@@ -88,31 +96,15 @@ func (r *fileReader) Close() error {
}
func (r *fileReader) GetBlock(rnd uint64) (data.BlockData, error) {
- attempts := r.cfg.RetryCount
- for {
- filename := path.Join(r.cfg.BlocksDir, fmt.Sprintf(r.cfg.FilenamePattern, rnd))
- var blockData data.BlockData
- start := time.Now()
- err := filewriter.DecodeJSONFromFile(filename, &blockData, false)
- if err != nil && errors.Is(err, fs.ErrNotExist) {
- // If the file read failed because the file didn't exist, wait before trying again
- if attempts == 0 {
- return data.BlockData{}, fmt.Errorf("GetBlock(): block not found after (%d) attempts", r.cfg.RetryCount)
- }
- attempts--
-
- select {
- case <-time.After(r.cfg.RetryDuration):
- case <-r.ctx.Done():
- return data.BlockData{}, fmt.Errorf("GetBlock() context finished: %w", r.ctx.Err())
- }
- } else if err != nil {
- // Other error, return error to pipeline
- return data.BlockData{}, fmt.Errorf("GetBlock(): unable to read block file '%s': %w", filename, err)
- } else {
- r.logger.Infof("Block %d read time: %s", rnd, time.Since(start))
- // The read was fine, return the data.
- return blockData, nil
- }
+ filename := path.Join(r.cfg.BlocksDir, fmt.Sprintf(r.cfg.FilenamePattern, rnd))
+ var blockData data.BlockData
+ start := time.Now()
+
+ // Read file content
+ err := filewriter.DecodeFromFile(filename, &blockData, r.format, r.gzip)
+ if err != nil {
+ return data.BlockData{}, fmt.Errorf("GetBlock(): unable to read block file '%s': %w", filename, err)
}
+ r.logger.Infof("Block %d read time: %s", rnd, time.Since(start))
+ return blockData, nil
}
diff --git a/conduit/plugins/importers/filereader/filereader_config.go b/conduit/plugins/importers/filereader/filereader_config.go
index bd4f7352..34e6e8c7 100644
--- a/conduit/plugins/importers/filereader/filereader_config.go
+++ b/conduit/plugins/importers/filereader/filereader_config.go
@@ -3,23 +3,13 @@ package fileimporter
//go:generate go run ../../../../cmd/conduit-docs/main.go ../../../../conduit-docs/
//go:generate go run ../../../../cmd/readme_config_includer/generator.go
-import "time"
-
//Name: conduit_importers_filereader
// Config specific to the file importer
type Config struct {
// block-dir
is the path to a directory where block data is stored.
BlocksDir string `yaml:"block-dir"`
- /* retry-duration
controls the delay between checks when the importer has caught up and is waiting for new blocks to appear.
- The input duration will be interpreted in nanoseconds.
- */
- RetryDuration time.Duration `yaml:"retry-duration"`
- /* retry-count
controls the number of times to check for a missing block
- before generating an error. The retry count and retry duration should
- be configured according the expected round time.
- */
- RetryCount uint64 `yaml:"retry-count"`
+
/* filename-pattern
is the format used to find block files. It uses go string formatting and should accept one number for the round.
The default pattern is
diff --git a/conduit/plugins/importers/filereader/filereader_test.go b/conduit/plugins/importers/filereader/filereader_test.go
index 42cafc99..09a1ff57 100644
--- a/conduit/plugins/importers/filereader/filereader_test.go
+++ b/conduit/plugins/importers/filereader/filereader_test.go
@@ -6,7 +6,6 @@ import (
"os"
"path"
"testing"
- "time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@@ -22,10 +21,13 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)
+const (
+ defaultEncodingFormat = filewriter.MessagepackFormat
+ defaultIsGzip = true
+)
+
var (
logger *logrus.Logger
- ctx context.Context
- cancel context.CancelFunc
testImporter importers.Importer
pRound sdk.Round
)
@@ -37,6 +39,13 @@ func init() {
pRound = sdk.Round(1)
}
+func TestDefaults(t *testing.T) {
+ format, gzip, err := filewriter.ParseFilenamePattern(filewriter.FilePattern)
+ require.NoError(t, err)
+ require.Equal(t, format, defaultEncodingFormat)
+ require.Equal(t, gzip, defaultIsGzip)
+}
+
func TestImporterorterMetadata(t *testing.T) {
testImporter = New()
m := testImporter.Metadata()
@@ -57,7 +66,7 @@ func initializeTestData(t *testing.T, dir string, numRounds int) sdk.Genesis {
Timestamp: 1234,
}
- err := filewriter.EncodeJSONToFile(path.Join(dir, "genesis.json"), genesisA, true)
+ err := filewriter.EncodeToFile(path.Join(dir, filewriter.GenesisFilename), genesisA, filewriter.JSONFormat, false)
require.NoError(t, err)
for i := 0; i < numRounds; i++ {
@@ -70,7 +79,7 @@ func initializeTestData(t *testing.T, dir string, numRounds int) sdk.Genesis {
Certificate: nil,
}
blockFile := path.Join(dir, fmt.Sprintf(filewriter.FilePattern, i))
- err = filewriter.EncodeJSONToFile(blockFile, block, true)
+ err = filewriter.EncodeToFile(blockFile, block, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
}
@@ -82,8 +91,7 @@ func initializeImporter(t *testing.T, numRounds int) (importer importers.Importe
genesisExpected := initializeTestData(t, tempdir, numRounds)
importer = New()
cfg := Config{
- BlocksDir: tempdir,
- RetryDuration: 0,
+ BlocksDir: tempdir,
}
data, err := yaml.Marshal(cfg)
require.NoError(t, err)
@@ -122,54 +130,3 @@ func TestGetBlockSuccess(t *testing.T) {
require.Equal(t, sdk.Round(i), block.BlockHeader.Round)
}
}
-
-func TestRetryAndDuration(t *testing.T) {
- tempdir := t.TempDir()
- initializeTestData(t, tempdir, 0)
- importer := New()
- cfg := Config{
- BlocksDir: tempdir,
- RetryDuration: 10 * time.Millisecond,
- RetryCount: 3,
- }
- data, err := yaml.Marshal(cfg)
- require.NoError(t, err)
- err = importer.Init(context.Background(), conduit.MakePipelineInitProvider(&pRound, nil, nil), plugins.MakePluginConfig(string(data)), logger)
- assert.NoError(t, err)
-
- start := time.Now()
- _, err = importer.GetBlock(0)
- assert.ErrorContains(t, err, "GetBlock(): block not found after (3) attempts")
-
- expectedDuration := cfg.RetryDuration*time.Duration(cfg.RetryCount) + 10*time.Millisecond
- assert.WithinDuration(t, start, time.Now(), expectedDuration, "Error should generate after retry count * retry duration")
-}
-
-func TestRetryWithCancel(t *testing.T) {
- tempdir := t.TempDir()
- initializeTestData(t, tempdir, 0)
- importer := New()
- cfg := Config{
- BlocksDir: tempdir,
- RetryDuration: 1 * time.Hour,
- RetryCount: 3,
- }
- data, err := yaml.Marshal(cfg)
- ctx, cancel := context.WithCancel(context.Background())
- require.NoError(t, err)
- err = importer.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil, nil), plugins.MakePluginConfig(string(data)), logger)
- assert.NoError(t, err)
-
- // Cancel after delay
- delay := 5 * time.Millisecond
- go func() {
- time.Sleep(delay)
- cancel()
- }()
- start := time.Now()
- _, err = importer.GetBlock(0)
- assert.ErrorContains(t, err, "GetBlock() context finished: context canceled")
-
- // within 1ms of the expected time (but much less than the 3hr configuration.
- assert.WithinDuration(t, start, time.Now(), 2*delay)
-}
diff --git a/conduit/plugins/importers/filereader/sample.yaml b/conduit/plugins/importers/filereader/sample.yaml
index d0473c26..b9eaff21 100644
--- a/conduit/plugins/importers/filereader/sample.yaml
+++ b/conduit/plugins/importers/filereader/sample.yaml
@@ -4,16 +4,8 @@ config:
# The directory is created if it doesn't exist. If no directory is provided
# blocks are written to the Conduit data directory.
#block-dir: "/path/to/directory"
-
- # RetryDuration controls the delay between checks when the importer has
- # caught up and is waiting for new blocks to appear.
- retry-duration: "5s"
-
- # RetryCount controls the number of times to check for a missing block
- # before generating an error. The retry count and retry duration should
- # be configured according the expected round time.
- retry-count: 5
-
+
# FilenamePattern is the format used to find block files. It uses go string
# formatting and should accept one number for the round.
- filename-pattern: "%[1]d_block.json"
+ # The pattern should match the extension of the files to be read.
+ filename-pattern: "%[1]d_block.msgp.gz"
diff --git a/conduit/plugins/importers/filereader/test_resources/conduit_data/conduit.yml b/conduit/plugins/importers/filereader/test_resources/conduit_data/conduit.yml
new file mode 100644
index 00000000..a8d125e1
--- /dev/null
+++ b/conduit/plugins/importers/filereader/test_resources/conduit_data/conduit.yml
@@ -0,0 +1,71 @@
+# Log verbosity: PANIC, FATAL, ERROR, WARN, INFO, DEBUG, TRACE
+# log-level: INFO
+
+# If no log file is provided logs are written to stdout.
+#log-file:
+
+# Number of retries to perform after a pipeline plugin error.
+# Set to 0 to retry forever.
+# retry-count: 10
+
+# Time duration to wait between retry attempts.
+# retry-delay: "1s"
+
+# Optional filepath to use for pidfile.
+#pid-filepath: /path/to/pidfile
+
+# Whether or not to print the conduit banner on startup.
+# hide-banner: false
+
+# When enabled prometheus metrics are available on '/metrics'
+# metrics:
+# mode: OFF
+# addr: ":9999"
+# prefix: "conduit"
+
+
+# The importer is typically an algod follower node.
+importer:
+ name: file_reader
+ config:
+ # for purposes of fileReadWriteTest.go we have
+ # CWD = conduit/plugins/importers/filereader
+ # so `test_resources` is immediately available
+ block-dir: "test_resources/filereader_blocks"
+
+ # FilenamePattern is the format used to find block files. It uses go string
+ # formatting and should accept one number for the round.
+ filename-pattern: "%[1]d_block.msgp.gz"
+
+# Zero or more processors may be defined to manipulate what data
+# reaches the exporter.
+processors:
+
+# An exporter is defined to do something with the data.
+exporter:
+ name: file_writer
+ config:
+ # BlocksDir is the path to a directory where block data should be stored.
+ # The directory is created if it doesn't exist. If no directory is provided
+ # blocks are written to the Conduit data directory.
+ #block-dir: "/path/to/block/files"
+
+ # FilenamePattern is the format used to write block files. It uses go
+ # string formatting and should accept one number for the round.
+ # If the file has a '.gz' extension, blocks will be gzipped.
+ # Default: "%[1]d_block.msgp.gz"
+ filename-pattern: "%[1]d_block.msgp.gz"
+
+ # DropCertificate is used to remove the vote certificate from the block data before writing files.
+ drop-certificate: false
+
+
+# Enable telemetry for conduit
+# telemetry:
+# enabled: false
+ # By default the following fields will be configured to send data to Algorand.
+ # To store your own telemetry events, they can be overridden.
+ # uri: ""
+ # index: ""
+ # username: ""
+ # password: ""
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/0_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/0_block.msgp.gz
new file mode 100644
index 00000000..6ad7f275
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/0_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/1_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/1_block.msgp.gz
new file mode 100644
index 00000000..947f7190
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/1_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/2_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/2_block.msgp.gz
new file mode 100644
index 00000000..a8ec23cf
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/2_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/3_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/3_block.msgp.gz
new file mode 100644
index 00000000..6eb23030
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/3_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/4_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/4_block.msgp.gz
new file mode 100644
index 00000000..888e0853
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/4_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/5_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/5_block.msgp.gz
new file mode 100644
index 00000000..293070bf
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/5_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/6_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/6_block.msgp.gz
new file mode 100644
index 00000000..d78db3c1
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/6_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/7_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/7_block.msgp.gz
new file mode 100644
index 00000000..ee609a06
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/7_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/8_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/8_block.msgp.gz
new file mode 100644
index 00000000..f1db8be9
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/8_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/9_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/9_block.msgp.gz
new file mode 100644
index 00000000..16d521f1
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/9_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/genesis.json b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/genesis.json
new file mode 100644
index 00000000..4d171267
--- /dev/null
+++ b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/genesis.json
@@ -0,0 +1,30 @@
+{
+ "alloc": [
+ {
+ "addr": "AEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAKE3PRHE",
+ "comment": "",
+ "state": {
+ "algo": 1000000000000
+ }
+ },
+ {
+ "addr": "AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4",
+ "comment": "",
+ "state": {
+ "algo": 1000000000000
+ }
+ },
+ {
+ "addr": "AMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAANVWEXNA",
+ "comment": "",
+ "state": {
+ "algo": 1000000000000
+ }
+ }
+ ],
+ "fees": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAVIOOBQA",
+ "id": "v1",
+ "network": "generated-network",
+ "proto": "future",
+ "rwd": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABFFF5B2Y"
+}
\ No newline at end of file
diff --git a/conduit/plugins/metadata.go b/conduit/plugins/metadata.go
index 99adf929..46eb0a43 100644
--- a/conduit/plugins/metadata.go
+++ b/conduit/plugins/metadata.go
@@ -13,11 +13,11 @@ type PluginType string
const (
// Exporter PluginType
- Exporter = "exporter"
+ Exporter PluginType = "exporter"
// Processor PluginType
- Processor = "processor"
+ Processor PluginType = "processor"
// Importer PluginType
- Importer = "importer"
+ Importer PluginType = "importer"
)