Skip to content

Commit

Permalink
fix(image): props read issues
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Jun 24, 2024
1 parent 969c069 commit 12ecea0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 80 deletions.
82 changes: 36 additions & 46 deletions extensions/sinks/image/ext/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,59 +26,46 @@ import (
"time"

"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
)

type c struct {
Path string `json:"path"`
ImageFormat string `json:"imageFormat"`
MaxAge int `json:"maxAge"`
MaxCount int `json:"maxCount"`
}

type imageSink struct {
path string
format string
maxAge int
maxCount int
cancel context.CancelFunc
c *c
cancel context.CancelFunc
}

func (m *imageSink) Configure(props map[string]interface{}) error {
if i, ok := props["imageFormat"]; ok {
if i, ok := i.(string); ok {
if "png" != i && "jpeg" != i {
return fmt.Errorf("%s image type is not currently supported", i)
}
m.format = i
}
} else {
return fmt.Errorf("Field not found format.")
conf := &c{
MaxAge: 72,
MaxCount: 1000,
}

if i, ok := props["path"]; ok {
if ii, ok := i.(string); ok {
m.path = ii
} else {
return fmt.Errorf("%v image type is not supported", i)
}
} else {
return fmt.Errorf("Field not found path.")
err := cast.MapToStruct(props, conf)
if err != nil {
return err
}

m.maxAge = 72
if i, ok := props["maxAge"]; ok {
if i, ok := i.(int); ok {
m.maxAge = i
}
if conf.Path == "" {
return fmt.Errorf("path is required")
}
m.maxCount = 1000
if i, ok := props["maxCount"]; ok {
if i, ok := i.(int); ok {
m.maxCount = i
}
if conf.ImageFormat != "png" && conf.ImageFormat != "jpeg" {
return fmt.Errorf("%s image type is not currently supported", conf.ImageFormat)
}
m.c = conf
return nil
}

func (m *imageSink) Open(ctx api.StreamContext) error {
logger := ctx.GetLogger()
logger.Debug("Opening image sink")

if _, err := os.Stat(m.path); os.IsNotExist(err) {
if err := os.MkdirAll(m.path, os.ModePerm); nil != err {
if _, err := os.Stat(m.c.Path); os.IsNotExist(err) {
if err := os.MkdirAll(m.c.Path, os.ModePerm); nil != err {
return fmt.Errorf("fail to open image sink for %v", err)
}
}
Expand All @@ -102,8 +89,10 @@ func (m *imageSink) Open(ctx api.StreamContext) error {
}

func (m *imageSink) delFile(logger api.Logger) error {
dirEntries, err := os.ReadDir(m.path)
logger.Debugf("deleting images")
dirEntries, err := os.ReadDir(m.c.Path)
if nil != err || 0 == len(dirEntries) {
logger.Error("read dir fail")
return err
}

Expand All @@ -116,8 +105,8 @@ func (m *imageSink) delFile(logger api.Logger) error {
files = append(files, info)
}

pos := m.maxCount
delTime := time.Now().Add(time.Duration(0-m.maxAge) * time.Hour)
pos := m.c.MaxCount
delTime := time.Now().Add(time.Duration(0-m.c.MaxAge) * time.Hour)
for i := 0; i < len(files); i++ {
for j := i + 1; j < len(files); j++ {
if files[i].ModTime().Before(files[j].ModTime()) {
Expand All @@ -129,11 +118,12 @@ func (m *imageSink) delFile(logger api.Logger) error {
break
}
}

logger.Debugf("pos is %d, and file len is %d", pos, len(files))
for i := pos; i < len(files); i++ {
fname := files[i].Name()
if strings.HasSuffix(fname, m.format) {
fpath := filepath.Join(m.path, fname)
logger.Debugf("try to delete %s", fname)
if strings.HasSuffix(fname, m.c.ImageFormat) {
fpath := filepath.Join(m.c.Path, fname)
os.Remove(fpath)
}
}
Expand All @@ -150,7 +140,7 @@ func (m *imageSink) getSuffix() string {

func (m *imageSink) saveFile(b []byte, fpath string) error {
reader := bytes.NewReader(b)
switch m.format {
switch m.c.ImageFormat {
case "png":
img, err := png.Decode(reader)
if err != nil {
Expand Down Expand Up @@ -182,7 +172,7 @@ func (m *imageSink) saveFile(b []byte, fpath string) error {
return err
}
default:
return fmt.Errorf("unsupported format %s", m.format)
return fmt.Errorf("unsupported format %s", m.c.ImageFormat)
}
return nil
}
Expand All @@ -194,8 +184,8 @@ func (m *imageSink) saveFiles(images map[string]interface{}) error {
return fmt.Errorf("found none bytes data %v for path %s", image, k)
}
suffix := m.getSuffix()
fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
fpath := filepath.Join(m.path, fname)
fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.c.ImageFormat)
fpath := filepath.Join(m.c.Path, fname)
err := m.saveFile(image, fpath)
if err != nil {
return err
Expand Down
125 changes: 91 additions & 34 deletions extensions/sinks/image/ext/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,131 @@ import (
"github.com/stretchr/testify/assert"
)

func TestConfigure(t *testing.T) {
tests := []struct {
name string
props map[string]any
c *c
err string
}{
{
name: "wrong type",
props: map[string]any{
"maxAge": "0.11",
},
err: "1 error(s) decoding:\n\n* 'maxAge' expected type 'int', got unconvertible type 'string', value: '0.11'",
},
{
name: "missing path",
props: map[string]any{
"imageFormat": "jpeg",
},
err: "path is required",
},
{
name: "wrong format",
props: map[string]any{
"path": "data",
"imageFormat": "abc",
},
err: "abc image type is not currently supported",
},
{
name: "default age",
props: map[string]any{
"path": "data",
"imageFormat": "png",
"maxCount": 1,
},
c: &c{
Path: "data",
ImageFormat: "png",
MaxCount: 1,
MaxAge: 72,
},
},
{
name: "default count",
props: map[string]any{
"path": "data",
"imageFormat": "png",
"maxAge": 0.11,
},
c: &c{
Path: "data",
ImageFormat: "png",
MaxCount: 1000,
MaxAge: 0,
},
},
}
s := &imageSink{}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := s.Configure(test.props)
if test.err == "" {
assert.NoError(t, err)
assert.Equal(t, test.c, s.c)
} else {
assert.EqualError(t, err, test.err)
}
})
}
}

func TestSave(t *testing.T) {
tests := []struct {
name string
sink *imageSink
props map[string]any
image string
err string
}{
{
name: "normal",
sink: &imageSink{
path: "data",
format: "png",
maxAge: 0,
maxCount: 0,
props: map[string]any{
"path": "data",
"imageFormat": "png",
},
image: "../../../../docs/en_US/wechat.png",
},
{
name: "wrong format",
sink: &imageSink{
path: "data",
format: "jpeg",
maxAge: 0,
maxCount: 0,
props: map[string]any{
"path": "data",
"imageFormat": "jpeg",
},
image: "../../../../docs/en_US/wechat.png",
err: "invalid JPEG format: missing SOI marker",
},
{
name: "normal jpeg",
sink: &imageSink{
path: "data",
format: "jpeg",
maxAge: 0,
maxCount: 0,
props: map[string]any{
"path": "data",
"imageFormat": "jpeg",
},
image: "ekuiper.jpg",
},
{
name: "wrong png",
sink: &imageSink{
path: "data",
format: "png",
maxAge: 0,
maxCount: 0,
props: map[string]any{
"path": "data",
"imageFormat": "png",
},
image: "ekuiper.jpg",
err: "png: invalid format: not a PNG file",
},
{
name: "unsupported format",
sink: &imageSink{
path: "data",
format: "abc",
maxAge: 0,
maxCount: 0,
},
image: "../../../../docs/cover.jpg",
err: "unsupported format abc",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := os.MkdirAll("data", os.ModePerm)
assert.NoError(t, err)
b, err := os.ReadFile(tt.image)
assert.NoError(t, err)
err = tt.sink.saveFiles(map[string]any{
s := &imageSink{}
err = s.Configure(tt.props)
assert.NoError(t, err)

err = s.saveFiles(map[string]any{
"self": b,
})
if tt.err == "" {
Expand All @@ -102,7 +159,7 @@ func TestSave(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, entries, 0)
}
//_ = os.RemoveAll("data")
_ = os.RemoveAll("data")
})
}
}

0 comments on commit 12ecea0

Please sign in to comment.