diff --git a/plugins/application/elasticsearch/pkg/lib/client.go b/plugins/application/elasticsearch/pkg/lib/client.go index 9b7ccc7d..78900a6b 100644 --- a/plugins/application/elasticsearch/pkg/lib/client.go +++ b/plugins/application/elasticsearch/pkg/lib/client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -82,7 +83,15 @@ func (esc *Client) Connect(cfg *AppConfig) error { return fmt.Errorf("failed to initialize connection: %s", err.Error()) } - _, err = esc.conn.Info() + res, err := esc.conn.Info() + if err != nil { + return fmt.Errorf("failed to get info from connection: %s", err.Error()) + } + defer res.Body.Close() + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + return fmt.Errorf("failed to discard response body: %s", err.Error()) + } return err } @@ -92,6 +101,11 @@ func (esc *Client) IndicesExists(indices []string) (bool, error) { if err != nil { return false, err } + defer res.Body.Close() + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + return false, fmt.Errorf("failed to discard response body: %s", err.Error()) + } if res.StatusCode == http.StatusOK { return true, nil } @@ -107,10 +121,15 @@ func (esc *Client) IndicesDelete(indices []string) error { if err != nil { return err } + defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNotFound { body, _ := ioutil.ReadAll(res.Body) return fmt.Errorf("failed to delete indices [%d]: %s", res.StatusCode, body) } + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + return fmt.Errorf("failed to discard response body: %s", err.Error()) + } return nil } @@ -121,6 +140,8 @@ func (esc *Client) IndicesCreate(indices []string) error { if err != nil { return err } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { body, _ := ioutil.ReadAll(res.Body) msg := string(body) @@ -129,6 +150,10 @@ func (esc *Client) IndicesCreate(indices []string) error { } return fmt.Errorf("failed to create index [%d]: %s", res.StatusCode, msg) } + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + return fmt.Errorf("failed to discard response body: %s", err.Error()) + } } return nil } @@ -141,20 +166,30 @@ func (esc *Client) Index(index string, documents []string, bulk bool) error { if err != nil { return err } + defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { body, _ := ioutil.ReadAll(res.Body) return fmt.Errorf("failed to index document[%d]: %s", res.StatusCode, body) } + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + return fmt.Errorf("failed to discard response body: %s", err.Error()) + } } } else { res, err := esc.conn.Bulk(strings.NewReader(formatBulkRequest(index, documents))) if err != nil { return err } + defer res.Body.Close() if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { body, _ := ioutil.ReadAll(res.Body) return fmt.Errorf("failed to index document(s)[%d]: %s", res.StatusCode, body) } + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + return fmt.Errorf("failed to discard response body: %s", err.Error()) + } } return nil }