diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index f34eab2c46b..e343c2b108f 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -144,12 +144,28 @@ func (c *crawler) startInput( c.inputs[id] = runner - c.log.Infof("Starting input (ID: %d)", id) + idFields := getRunnerID(runner) + + c.log.Infow(fmt.Sprintf("Starting input (ID: %d)", id), idFields...) runner.Start() return nil } +func getRunnerID(r cfgfile.Runner) []any { + type inputIDer interface { + InputID() string + } + + idFields := []any{} + idRunner, ok := r.(inputIDer) + if ok { + idFields = append(idFields, "input_id", idRunner.InputID()) + } + + return idFields +} + func (c *crawler) Stop() { c.log.Info("Stopping Crawler") diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index b5d0ad5a2a3..86c7199f3d2 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -380,8 +380,6 @@ func (inp *filestream) readFromSource( } ctx.UpdateStatus(status.Failed, "Failed on purpose") - return errors.New("Failed on purpose") - s.Offset += int64(message.Bytes) metrics.MessagesRead.Inc()