Skip to content

Commit

Permalink
Merge pull request #54 from Ritchie6/memo6
Browse files Browse the repository at this point in the history
Added the append functionality for issue #21
  • Loading branch information
rtb-12 authored Jan 14, 2025
2 parents d84d87a + ca09b07 commit f13e4ec
Showing 1 changed file with 91 additions and 32 deletions.
123 changes: 91 additions & 32 deletions memoria.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ func (m *Memoria) transform(key string) (pathkey *PathKey) {
// Write synchronously the key-value pair to the disk making it immedialtely avaialble for
// reads. If you need stronger sync gaurantess see WriteStream
func (m *Memoria) Write(key string, val []byte) error {
return m.WriteStream(key, bytes.NewReader(val), false)
return m.WriteStream(key, bytes.NewReader(val), false, false)
}

func (m *Memoria) WriteWithAppend(key string, val []byte) error {
return m.WriteStream(key, bytes.NewReader(val), true, false)
}

// WriteString is a wrapper for Write that takes a string and writes it as bytes
Expand All @@ -109,7 +113,7 @@ func (m *Memoria) WriteString(key, val string) error {

// writes the data given by the io.reader performs explicit sync if mentioned otherwise
// depedning on the physical media it sync
func (m *Memoria) WriteStream(key string, r io.Reader, sync bool) error {
func (m *Memoria) WriteStream(key string, r io.Reader, append bool, sync bool) error { //adding the append bool

if len(key) <= 0 {
return fmt.Errorf("Empty key")
Expand All @@ -126,49 +130,88 @@ func (m *Memoria) WriteStream(key string, r io.Reader, sync bool) error {
return fmt.Errorf("Cannot create directory: %s", err)
}

f, err := m.createKeyFile(pathKey)
if !append {

if err != nil {
return fmt.Errorf("cannot create key file: %s", err)
}
f, err := m.createKeyFile(pathKey)

wc := io.WriteCloser(&nopWriteCloser{f})
if err != nil {
return fmt.Errorf("cannot create key file: %s", err)
}

//TODO: replace wc with compression writer when implementing compression
wc := io.WriteCloser(&nopWriteCloser{f})

// this is the place where data transfers actually happens when
// we transfer a read buffer to a writer
if _, err := io.Copy(wc, r); err != nil {
return cleanUp(f, fmt.Errorf("Cannot copy from read buffer %s", err))
}
//TODO: replace wc with compression writer when implementing compression

if err := wc.Close(); err != nil {
return cleanUp(f, fmt.Errorf("Cannot close compression error %s", err))
}
// this is the place where data transfers actually happens when
// we transfer a read buffer to a writer
if _, err := io.Copy(wc, r); err != nil {
return cleanUp(f, fmt.Errorf("Cannot copy from read buffer %s", err))
}

if sync {
if err := f.Sync(); err != nil {
cleanUp(f, fmt.Errorf("Cannot Sync: %s", err))
if err := wc.Close(); err != nil {
return cleanUp(f, fmt.Errorf("Cannot close compression error %s", err))
}
}
if err := f.Close(); err != nil {
return fmt.Errorf("Cannot close file: %s", err)

if sync {
if err := f.Sync(); err != nil {
cleanUp(f, fmt.Errorf("Cannot Sync: %s", err))
}
}
if err := f.Close(); err != nil {
return fmt.Errorf("Cannot close file: %s", err)
}

//Atomic Writes: uncomment the following code when implemented atomic writes
// fullPath := m.completePath(pathKey)

// if f.Name() != fullPath {
// if err := os.Rename(f.Name(), fullPath); err != nil {
// os.Remove(f.Name())
// return fmt.Errorf("Cannot rename files: %s", err)
// }
// }

// empty the cache for original key
m.emptyCacheFor(pathKey.originalKey) // cache is read only
}

//Atomic Writes: uncomment the following code when implemented atomic writes
// fullPath := m.completePath(pathKey)
if append {
f, err := m.createKeyFileWithAppend(pathKey, append)
if err != nil {
return fmt.Errorf("cannot create key file: %s", err)
}

// if f.Name() != fullPath {
// if err := os.Rename(f.Name(), fullPath); err != nil {
// os.Remove(f.Name())
// return fmt.Errorf("Cannot rename files: %s", err)
// }
// }
// Use the writer directly, handling compression or other transformations here if necessary
wc := io.WriteCloser(&nopWriteCloser{f})

// empty the cache for original key
m.emptyCacheFor(pathKey.originalKey) // cache is read only
// Perform the data copy operation
if _, err := io.Copy(wc, r); err != nil {
return cleanUp(f, fmt.Errorf("Cannot copy from read buffer: %s", err))
}

// Close the write closer
if err := wc.Close(); err != nil {
return cleanUp(f, fmt.Errorf("Cannot close file after writing: %s", err))
}

// Sync if required
if sync {
if err := f.Sync(); err != nil {
cleanUp(f, fmt.Errorf("Cannot sync file: %s", err))
}
}

if err := f.Close(); err != nil {
return fmt.Errorf("Cannot close file after sync: %s", err)
}

// Empty cache after write if necessary
m.emptyCacheFor(pathKey.originalKey)

}

return nil

}

func (m *Memoria) emptyCacheFor(key string) {
Expand Down Expand Up @@ -198,6 +241,22 @@ func (m *Memoria) createKeyFile(pathKey *PathKey) (*os.File, error) {

}

func (m *Memoria) createKeyFileWithAppend(pathKey *PathKey, append bool) (*os.File, error) {
var mode int
if append {
mode = os.O_APPEND | os.O_WRONLY
} else {
mode = os.O_CREATE | os.O_WRONLY | os.O_TRUNC
}

f, err := os.OpenFile(m.completePath(pathKey), mode, m.filePerm)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %s", m.completePath(pathKey), err)
}

return f, nil
}

func (m *Memoria) Read(key string) ([]byte, error) {
rc, err := m.ReadStream(key, false)
if err != nil {
Expand Down

0 comments on commit f13e4ec

Please sign in to comment.