Skip to content

Commit

Permalink
Merge branch 'master' of github.com:flyteorg/flyte into flytectl-demo
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw committed May 13, 2024
2 parents 19b1b49 + 2f38d65 commit 6679fc7
Show file tree
Hide file tree
Showing 12 changed files with 4,025 additions and 828 deletions.
406 changes: 262 additions & 144 deletions deployment/stats/prometheus/flyteadmin-dashboard.json

Large diffs are not rendered by default.

3,407 changes: 2,997 additions & 410 deletions deployment/stats/prometheus/flytepropeller-dashboard.json

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions docs/_templates/base.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/user_guide/advanced_composition/map_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def map_workflow_with_metadata(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 1
data_point=data
)
```
When `cache` and `cache_version` are used in `TaskMetadata` for a map task, the cache hits occur on individual tasks being mapped over, rather than the parent map task operation. This means that if one input item in a list changes, each previously executed task is read from cache and only the task for the changed item is actually executed, rather than the task being re-executed for every item. Note that this has the same effect as adding `cache` and `cache_version` in the `@task` decorator for a task being mapped over.

You can also configure `concurrency` and `min_success_ratio` for a map task:
- `concurrency` limits the number of mapped tasks that can run in parallel to the specified batch size.
Expand Down
17 changes: 15 additions & 2 deletions docs/user_guide/data_types_and_io/structureddataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ You can now use `numpy.ndarray` to deserialize the parquet file to NumPy and ser

```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/data_types_and_io/data_types_and_io/structured_dataset.py
:caption: data_types_and_io/structured_dataset.py
:lines: 134-147
:lines: 134-149
```

:::{note}
Expand All @@ -248,7 +248,20 @@ You can run the code locally as follows:

```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/data_types_and_io/data_types_and_io/structured_dataset.py
:caption: data_types_and_io/structured_dataset.py
:lines: 151-155
:lines: 153-157
```

### The nested typed columns

Like most storage formats (e.g. Avro, Parquet, and BigQuery), StructuredDataset support nested field structures.

:::{note}
Nested field StructuredDataset should be run when flytekit version > 1.11.0.
:::

```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/data_types_and_io/data_types_and_io/structured_dataset.py
:caption: data_types_and_io/structured_dataset.py
:lines: 159-270
```

[flytesnacks]: https://github.com/flyteorg/flytesnacks/tree/master/examples/data_types_and_io/
16 changes: 7 additions & 9 deletions flytectl/cmd/get/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ func ExecutionToProtoMessages(l []*admin.Execution) []proto.Message {
}

func getCallBack(ctx context.Context, cmdCtx cmdCore.CommandContext) bubbletea.DataCallback {
return func(filter filters.Filters) []proto.Message {
return func(filter filters.Filters) ([]proto.Message, error) {
executionList, err := cmdCtx.AdminFetcherExt().ListExecution(ctx, config.GetConfig().Project, config.GetConfig().Domain, filter)
if err != nil {
return nil
return nil, err
}
return ExecutionToProtoMessages(executionList.Executions)
return ExecutionToProtoMessages(executionList.Executions), nil
}
}

Expand Down Expand Up @@ -154,17 +154,15 @@ func getExecutionFunc(ctx context.Context, args []string, cmdCtx cmdCore.Command
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), executionColumns,
ExecutionToProtoMessages(executions)...)
}
if config.GetConfig().Interactive {
err := bubbletea.Paginator(executionColumns, getCallBack(ctx, cmdCtx), execution.DefaultConfig.Filter)
return err
}
executionList, err := cmdCtx.AdminFetcherExt().ListExecution(ctx, config.GetConfig().Project, config.GetConfig().Domain, execution.DefaultConfig.Filter)
if err != nil {
return err
}
logger.Infof(ctx, "Retrieved %v executions", len(executionList.Executions))

if config.GetConfig().Interactive {
bubbletea.Paginator(executionColumns, getCallBack(ctx, cmdCtx))
return nil
}

return adminPrinter.Print(config.GetConfig().MustOutputFormat(), executionColumns,
ExecutionToProtoMessages(executionList.Executions)...)
}
1 change: 1 addition & 0 deletions flytectl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/charmbracelet/lipgloss v0.10.0 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/containerd/containerd v1.6.26 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flytectl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ github.com/charmbracelet/bubbles v0.18.0 h1:PYv1A036luoBGroX6VWjQIE9Syf2Wby2oOl/
github.com/charmbracelet/bubbles v0.18.0/go.mod h1:08qhZhtIwzgrtBjAcJnij1t1H0ZRjwHyGsy6AL11PSw=
github.com/charmbracelet/bubbletea v0.25.0 h1:bAfwk7jRz7FKFl9RzlIULPkStffg5k6pNt5dywy4TcM=
github.com/charmbracelet/bubbletea v0.25.0/go.mod h1:EN3QDR1T5ZdWmdfDzYcqOCAps45+QIJbLOBxmVNWNNg=
github.com/charmbracelet/lipgloss v0.10.0 h1:KWeXFSexGcfahHX+54URiZGkBFazf70JNMtwg/AFW3s=
github.com/charmbracelet/lipgloss v0.10.0/go.mod h1:Wig9DSfvANsxqkRsqj6x87irdy123SR4dOXlKa91ciE=
github.com/checkpoint-restore/go-criu/v4 v4.1.0/go.mod h1:xUQBLp4RLc5zJtWY++yjOoMoB5lihDt7fai+75m+rGw=
github.com/checkpoint-restore/go-criu/v5 v5.0.0/go.mod h1:cfwC0EG7HMUenopBsUf9d89JlCLQIfgVcNsNN0t6T2M=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
Expand Down
123 changes: 113 additions & 10 deletions flytectl/pkg/bubbletea/bubbletea_pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,175 @@ package bubbletea
import (
"fmt"
"log"
"math"
"strings"

"github.com/charmbracelet/bubbles/key"
"github.com/charmbracelet/bubbles/paginator"
"github.com/charmbracelet/bubbles/spinner"
"github.com/charmbracelet/lipgloss"
"github.com/flyteorg/flyte/flytectl/pkg/filters"
"github.com/flyteorg/flyte/flytectl/pkg/printer"
"github.com/golang/protobuf/proto"

tea "github.com/charmbracelet/bubbletea"
)

var (
spin = false
// Avoid fetching multiple times while still fetching
fetchingBackward = false
fetchingForward = false
)

type pageModel struct {
items []proto.Message
items *[]proto.Message
paginator paginator.Model
spinner spinner.Model
}

func newModel(initMsg []proto.Message) pageModel {
p := paginator.New()
p.PerPage = msgPerPage
p.SetTotalPages(len(initMsg))
p.Page = int(filter.Page) - 1
// Set the upper bound of the page number
p.SetTotalPages(getLastMsgIdx())

s := spinner.New()
s.Style = lipgloss.NewStyle().Foreground(lipgloss.Color("56"))
s.Spinner = spinner.Points

return pageModel{
paginator: p,
items: initMsg,
spinner: s,
items: &initMsg,
}
}

func (m pageModel) Init() tea.Cmd {
return nil
return m.spinner.Tick
}

func (m pageModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
var cmd tea.Cmd
switch msg := msg.(type) {
case error:
return m, tea.Quit
case tea.KeyMsg:
switch msg.String() {
case "q", "esc", "ctrl+c":
return m, tea.Quit
}
switch {
case key.Matches(msg, m.paginator.KeyMap.PrevPage):
// If previous page will be out of the range of the first batch, don't update
if m.paginator.Page == firstBatchIndex*pagePerBatch {
return m, nil
}
}
case spinner.TickMsg:
m.spinner, cmd = m.spinner.Update(msg)
return m, cmd
case newDataMsg:
if msg.fetchDirection == forward {
// Update if current page is in the range of the last batch
// i.e. if user not in last batch when finished fetching, don't update
if m.paginator.Page/pagePerBatch >= lastBatchIndex {
*m.items = append(*m.items, msg.newItems...)
lastBatchIndex++
if lastBatchIndex-firstBatchIndex >= localBatchLimit {
*m.items = (*m.items)[batchLen[firstBatchIndex]:]
firstBatchIndex++
}
}
fetchingForward = false
} else {
// Update if current page is in the range of the first batch
// i.e. if user not in first batch when finished fetching, don't update
if m.paginator.Page/pagePerBatch <= firstBatchIndex {
*m.items = append(msg.newItems, *m.items...)
firstBatchIndex--
if lastBatchIndex-firstBatchIndex >= localBatchLimit {
*m.items = (*m.items)[:len(*m.items)-batchLen[lastBatchIndex]]
lastBatchIndex--
}
}
fetchingBackward = false
}
// Set the upper bound of the page number
m.paginator.SetTotalPages(getLastMsgIdx())
return m, nil
}
m.paginator, cmd = m.paginator.Update(msg)
preFetchBatch(&m)

m.paginator, _ = m.paginator.Update(msg)

switch msg := msg.(type) {
case tea.KeyMsg:
switch {
case key.Matches(msg, m.paginator.KeyMap.NextPage):
if (m.paginator.Page >= (lastBatchIndex+1)*pagePerBatch-prefetchThreshold) && !fetchingForward {
// If no more data, don't fetch again (won't show spinner)
value, ok := batchLen[lastBatchIndex+1]
if !ok || value != 0 {
fetchingForward = true
cmd = fetchDataCmd(lastBatchIndex+1, forward)
}
}
case key.Matches(msg, m.paginator.KeyMap.PrevPage):
if (m.paginator.Page <= firstBatchIndex*pagePerBatch+prefetchThreshold) && (firstBatchIndex > 0) && !fetchingBackward {
fetchingBackward = true
cmd = fetchDataCmd(firstBatchIndex-1, backward)
}
}
}

return m, cmd
}

func (m pageModel) View() string {
var b strings.Builder
table, err := getTable(&m)
if err != nil {
return ""
return "Error rendering table"
}
b.WriteString(table)
b.WriteString(fmt.Sprintf(" PAGE - %d\n", m.paginator.Page+1))
b.WriteString(fmt.Sprintf(" PAGE - %d ", m.paginator.Page+1))
if spin {
b.WriteString(fmt.Sprintf("%s%s", m.spinner.View(), " Loading new pages..."))
}
b.WriteString("\n\n h/l ←/→ page • q: quit\n")

return b.String()
}

func Paginator(_listHeader []printer.Column, _callback DataCallback) {
func Paginator(_listHeader []printer.Column, _callback DataCallback, _filter filters.Filters) error {
listHeader = _listHeader
callback = _callback
filter = _filter
filter.Page = int32(_max(int(filter.Page), 1))
firstBatchIndex = (int(filter.Page) - 1) / pagePerBatch
lastBatchIndex = firstBatchIndex

var msg []proto.Message
for i := firstBatchIndex; i < lastBatchIndex+1; i++ {
msg = append(msg, getMessageList(i)...)
newMessages, err := getMessageList(i)
if err != nil {
return err
}
if int(filter.Page)-(firstBatchIndex*pagePerBatch) > int(math.Ceil(float64(len(newMessages))/msgPerPage)) {
return fmt.Errorf("the specified page has no data, please enter a valid page number")
}
msg = append(msg, newMessages...)
}

p := tea.NewProgram(newModel(msg))
if _, err := p.Run(); err != nil {
log.Fatal(err)
}

if errMsg != nil {
return errMsg
}

return nil
}
Loading

0 comments on commit 6679fc7

Please sign in to comment.