Skip to content

Commit

Permalink
libuv
Browse files Browse the repository at this point in the history
  • Loading branch information
TimWhiting committed Jan 22, 2024
1 parent 90a2bf9 commit ffbbfd9
Show file tree
Hide file tree
Showing 29 changed files with 3,364 additions and 74 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ bench
test/bench
.koka
scratch
.cache
4 changes: 4 additions & 0 deletions kklib/include/kklib.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ typedef struct kk_context_s {
kk_yield_t yield; // inlined yield structure (for efficiency)
int32_t marker_unique; // unique marker generation
kk_block_t* delayed_free; // list of blocks that still need to be freed
void* loop; // a reference to an event loop (e.g. uv_loop_t* or NULL)
kk_integer_t unique; // thread local unique number generation
size_t thread_id; // unique thread id
kk_box_any_t kk_box_any; // used when yielding as a value of any type
Expand All @@ -444,6 +445,9 @@ typedef struct kk_context_s {
kk_decl_export kk_context_t* kk_get_context(void);
kk_decl_export void kk_free_context(void);


kk_decl_export void kk_debugger_break(kk_context_t* ctx);

// The current context is passed as a _ctx parameter in the generated code
#define kk_context() _ctx

Expand Down
3 changes: 3 additions & 0 deletions kklib/src/string.c
Original file line number Diff line number Diff line change
Expand Up @@ -865,20 +865,23 @@ kk_string_t kk_string_trim_right(kk_string_t str, kk_context_t* ctx) {
kk_unit_t kk_println(kk_string_t s, kk_context_t* ctx) {
// TODO: set locale to utf-8?
puts(kk_string_cbuf_borrow(s, NULL, ctx)); // todo: allow printing embedded 0 characters?
fflush(stdout);
kk_string_drop(s, ctx);
return kk_Unit;
}

kk_unit_t kk_print(kk_string_t s, kk_context_t* ctx) {
// TODO: set locale to utf-8?
fputs(kk_string_cbuf_borrow(s, NULL, ctx), stdout); // todo: allow printing embedded 0 characters?
fflush(stdout);
kk_string_drop(s, ctx);
return kk_Unit;
}

kk_unit_t kk_trace(kk_string_t s, kk_context_t* ctx) {
fputs(kk_string_cbuf_borrow(s, NULL, ctx), stderr); // todo: allow printing embedded 0 characters?
fputs("\n", stderr);
fflush(stdout);
kk_string_drop(s, ctx);
return kk_Unit;
}
Expand Down
99 changes: 49 additions & 50 deletions lib/std/async.kk
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@ import std/data/array
import std/num/int32
import std/num/ddouble // for C# backend
import std/time/duration
import std/debug
import std/os/null
import std/core/unsafe
pub import std/os/uv
// We utilize some timer functions from it's header file.
pub import std/time/timer

extern import
c file "async/async-inline.h"
js file "async/async-inline.js"
cs file "async/async-inline.cs"

// A type alias for asynchronous operations that can raise exceptions non-deterministically.
// This is common for almost all `:async` operations since `cancel` and `timeout` can
// cancel operations non-deterministically which raises the `Cancel` exception and cancels
// outstanding asynchronous requests.
pub alias asyncx = <async,exn,ndet>
pub alias asyncx = <async,io>


// ----------------------------------------------------------------------------
Expand All @@ -51,7 +59,7 @@ abstract struct promise<a>
state : ref<global,promise-state<a>>


abstract type promise-state<a>
abstract value type promise-state<a>
Resolved( value : a )
Awaiting( listeners : list<a -> io ()> )

Expand All @@ -61,7 +69,7 @@ pub fun promise() : async promise<a>

// Await a promise; returns immediately if the promise was already resolved and otherwise
// waits asynchronously.
pub fun await( p : promise<a> ) : asyncx a
pub fun promise/await( p : promise<a> ) : asyncx a
fun setup(cb : _ -> io-noexn ())
val r = p.state
match (!r)
Expand Down Expand Up @@ -96,7 +104,7 @@ pub fun resolve( p : promise<a>, value : a ) : asyncx ()

// A _channel_ of values of type `:a`. Values can be asynchronously `emit`ed into
// a channel, and asynchronously `receive`d.
abstract struct channel<a>(
abstract value struct channel<a>(
chid : int,
state : ref<global,channel-state<a>>
)
Expand Down Expand Up @@ -191,7 +199,7 @@ fun trace-anyx( s : string, x : a ) : async ()
// abstraction can reliably time out over any composition of asynchronous operations
// and is therefore quite expressive.

pub fun timeout( secs : duration, action : () -> <async,exn|e> a ) : <async,exn|e> maybe<a>
pub fun timeout( secs : duration, action : () -> <async,io|e> a ) : <async,io|e> maybe<a>
firstof { wait(secs); Nothing} { Just(action()) }

// Execute `a` and `b` interleaved. As soon as one of them finishes,
Expand All @@ -208,20 +216,20 @@ pub fun firstof( a : () -> <async,exn|e> a, b : () -> <async,exn|e> a ) : <asyn

// Wait (asynchronously) for `secs` seconds as a `:double`.
// Use `yield()` to yield to other asynchronous operations.
pub fun wait( secs : double ) : <async,exn> ()
pub fun float/wait( secs : float64 ) : <async,io> ()
wait(secs.duration)

// Wait (asynchronously) for optional `secs` seconds `:duration` (`= 0.seconds`).
// Use `yield()` to yield generally to other asynchronous operations.
pub fun wait( secs : duration = zero ) : <async,exn> ()
pub fun wait( secs : duration = zero ) : <async,io> ()
if secs <= zero then return yield()
val msecs = max(zero:int32,secs.milli-seconds.int32)
await fn(cb)
val tid = set-timeout( fn(){ cb(Ok(())) }, msecs )
Just( { clear-timeout(tid) } )

// Yield to other asynchronous operations. Same as `wait(0)`.
pub fun yield() : <async,exn> ()
pub fun yield() : <async,io> ()
await0 fn(cb)
set-timeout( cb, zero )
()
Expand All @@ -237,13 +245,15 @@ fun set-timeout( cb : () -> io-noexn (), ms : int32 ) : io-noexn timeout-id
extern set-timeoutx( cb : () -> io-noexn (), ms : int32 ) : io-noexn any
cs "_Async.SetTimeout"
js "setTimeout"
c "kk_set_timeout"

fun clear-timeout( tid : timeout-id ) : io-noexn ()
clear-timeoutx(tid.timer)

extern clear-timeoutx( tid : any) : io-noexn ()
cs "_Async.ClearTimeout"
js "clearTimeout"
c "kk_clear_timeout"


// ----------------------------------------------------------------------------
Expand All @@ -257,7 +267,7 @@ pub fun interleaved( action1 : () -> <async,exn|e> a, action2 : () -> <async,exn
(ra.untry,rb.untry)

// Interleave a list of actions around their asynchronous operations.
pub fun interleaved( xs : list<() -> <async,exn|e> a> ) : <async,exn|e> list<a>
pub fun list/interleaved( xs : list<() -> <async,exn|e> a> ) : <async,exn|e> list<a>
val ress = xs.map( fn(f) { return { mask behind<exn>(f) } } ).interleavedx
//ress.map(maybe).ordered_throw
ress.map(untry)
Expand Down Expand Up @@ -331,7 +341,7 @@ fun insert( xs : list<(int,a)>, idx : int, value : a, n : int = 0 ) : list<(int

// Interleave a list actions around their asynchronous operations and explicitly returning either
// either their result or their exception.
pub fun interleavedx( xs : list<() -> <async,exn|e> a> ) : <async|e> list<error<a>>
pub fun list/interleavedx( xs : list<() -> <async,exn|e> a> ) : <async|e> list<error<a>>
val n = xs.length
if n==0 then []
elif n==1 then xs.map(unsafe-try-all)
Expand Down Expand Up @@ -359,12 +369,12 @@ fun unsafe-no-ndet-div( action : () -> <ndet,div|e> a ) : e a
inline extern inject-effects : forall<a,h,e> (() -> e a) -> total (() -> <strands<a>,ndet,div|e> a)
inline "#1"

fun is-finalize( t : error<a> ) : bool
fun error/is-finalize( t : error<a> ) : bool
match t
Error(exn) -> exn.is-finalize
_ -> False

fun is-cancel( t : error<a> ) : bool
fun error/is-cancel( t : error<a> ) : bool
match t
Error(exn) -> exn.is-cancel
_ -> False
Expand Down Expand Up @@ -408,13 +418,13 @@ fun interleaved-div( xs : list<() -> <async,exn|e> a> ) : <async,ndet,div,strand
// ----------------------------------------------------------------------------

// Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception.
pub fun await-exn0( setup : (cb : (null<exception>) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,exn> ()
pub fun await-exn0( setup : (cb : (null<exception>) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,io> ()
await fn(cb)
setup( fn(nexn) cb(nexn.unnull(())) )

// Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception
// and the second argument the possible result value.
pub fun await-exn1( setup : (cb : (null<exception>,a) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,exn> a
pub fun await-exn1( setup : (cb : (null<exception>,a) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,io> a
await fn(cb)
setup( fn(nexn,x) cb(nexn.unnull(x)) )

Expand All @@ -424,13 +434,13 @@ fun unnull( nexn : null<exception>, x : a ) : error<a>
Just(exn) -> Error(exn)

// Convenience function for awaiting a zero argument callback.
pub fun await0( setup : (cb : () -> io-noexn () ) -> io () ) : <async,exn> ()
pub fun await0( setup : (cb : () -> io-noexn () ) -> io () ) : <async,io> ()
await fn(cb)
setup( fn() cb(Ok(())) )
Nothing

// Convenience function for awaiting a single argument callback.
pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : <async,exn> a
pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : <async,io> a
await fn(cb)
setup( fn(x) cb(Ok(x)) )
Nothing
Expand All @@ -440,7 +450,7 @@ pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : <async,exn> a
// value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`).
// The callback should be invoked exactly once -- when that happens `await` is resumed with the result using `untry`
// either raise an exception or return the plain result.
pub fun await( setup : (cb : error<a> -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,exn> a
pub fun setup/await( setup : (cb : error<a> -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : <async,io> a
await-exn(setup).untry


Expand All @@ -463,7 +473,7 @@ pub effect async
// The `cancel` operations cancels any outstanding asynchronous operation under the innermost
// `cancelable` handler by returning the `Cancel` exception. The `cancel` operation itself returns normally
// without raising a `Cancel` exception.
pub fun cancel() : async ()
pub fun noscope/cancel() : async ()
cancel(empty-scope)

// Primitive: Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback:
Expand Down Expand Up @@ -525,33 +535,34 @@ pub fun cancelable( action : () -> <async|e> a ) : <async|e> a
// this might be needed for `no-await` operations.
cancel(empty-scope.extend)
x
ctl do-await(setup,scope,c) -> resume(do-await(setup,scope.extend,c))
ctl no-await(setup,scope,c,f) -> resume(no-await(setup,scope.extend,c,f))
ctl cancel(scope) -> resume(cancel(scope.extend))
ctl async-iox(f) -> resume(async-iox(f))
fun do-await(setup,scope,c) -> do-await(setup,scope.extend,c)
fun no-await(setup,scope,c,f) -> no-await(setup,scope.extend,c,f)
fun cancel(scope) -> cancel(scope.extend)
fun async-iox(f) -> async-iox(f)

// ----------------------------------------------------------------------------
// Async handle
// ----------------------------------------------------------------------------

pub fun ".default-async"(action)
async-handle(action);
pub fun @default-async(action)
with handle-uv
async/handle(action);

fun nodispose() : io-noexn ()
()

// The outer `:async` effect handler. This is automatically applied by the compiler
// around the `main` function if it has an `:async` effect.
pub fun async-handle(action : () -> <async,io-noexn> () ) : io-noexn ()
val callbacks : ref<global,list<(scope,() -> io-noexn ())>> = ref([])
pub fun async/handle(action : () -> <async,io-noexn> () ) : io-noexn ()
val callbacks : ref<global,list<(scope,() -> io-noexn ())>> = unsafe-total{ref([])}
fun handle-await( setup : await-setup<a>, scope : scope, f : error<a> -> io-noexn (), cancelable : bool) : io-noexn ()
val cscope = child-scope(unique(),scope)
val dispose = ref(nodispose)
fun cb( res : error<_>, is-done : bool ) : io-noexn ()
if ((!callbacks).contains(cscope)) then
if is-done then
callbacks := (!callbacks).remove(cscope)
if res.is-error then try-default((),!dispose)
if res.is-error then try(!dispose).default(())
f(res)

// trace("register: " + cscope.show)
Expand All @@ -573,12 +584,12 @@ pub fun async-handle(action : () -> <async,io-noexn> () ) : io-noexn ()
handle(action)
raw ctl do-await( setup, scope, c )
handle-await(setup,scope, fn(x) rcontext.resume(x), c) // returns to outer event loop
ctl no-await( setup, scope, c, f )
resume(handle-await(setup,scope,f,c))
ctl cancel( scope )
resume(handle-cancel(scope))
ctl async-iox( f )
resume(f())
fun no-await( setup, scope, c, f )
handle-await(setup,scope,f,c)
fun cancel( scope )
handle-cancel(scope)
fun async-iox( f )
f()

fun io-noexn( f : () -> io-noexn a ) : io a
f()
Expand All @@ -602,7 +613,7 @@ fun child-scope( id : int, scope : scope ) : scope
match scope
Scope(cids) -> Scope(cids ++ [id])

fun in-scope-of( child : list<int>, parent : list<int> ) : bool
fun ids/in-scope-of( child : list<int>, parent : list<int> ) : bool
match parent
Nil -> True
Cons(p,ps) -> match child
Expand All @@ -614,22 +625,11 @@ fun in-scope-of( child : scope, parent : scope ) : bool
Scope(pids) -> match child
Scope(cids) -> in-scope-of(cids,pids)

fun (==)( ids1 : list<int>, ids2 : list<int> ) : bool
match ids1
Nil -> ids2.is-nil
Cons(i1,is1) -> match ids2
Cons(i2,is2) -> (i1 == i2 && is1 == is2)
Nil -> False

fun (==)(scope1 : scope, scope2 : scope ) : bool
fun scope/(==)(scope1 : scope, scope2 : scope ) : bool
match scope1
Scope(ids1) -> match scope2
Scope(ids2) -> ids1==ids2

fun (!=)(scope1 : scope, scope2 : scope ) : bool
!(scope1 == scope2)


// Convenience functions for scope maps
fun remove( xs : list<(scope,a)>, scope : scope ) : list<(scope,a)>
xs.remove( fn(x:(scope,_)) { x.fst == scope })
Expand All @@ -640,7 +640,6 @@ fun lookup( xs : list<(scope,a)>, scope : scope ) : maybe<a>
fun contains( xs : list<(scope,a)>, scope : scope ) : bool
xs.lookup(scope).bool


fun show( s : scope ) : string
match s
Scope(ids) -> ids.map(show).join("-")
Expand All @@ -652,13 +651,13 @@ abstract extend type exception-info
con Finalize(yld:yield-info)

// Was this a cancelation exception?
fun is-cancel( exn : exception ) : bool
fun exn/is-cancel( exn : exception ) : bool
match exn.info
Cancel -> True
_ -> False

// Was this a finalization exception?
fun is-finalize(exn : exception) : bool
fun exn/is-finalize(exn : exception) : bool
match exn.info
Finalize -> True
_ -> False
Expand Down
30 changes: 30 additions & 0 deletions lib/std/async/async-inline.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include "std_time_timer.h"


kk_box_t kk_set_timeout(kk_function_t cb, int64_t time, kk_context_t* _ctx) {
kk_std_time_timer__timer t = kk_std_time_timer_timer_init(_ctx);
kk_std_time_timer_timer_start(t, time, 0, cb, _ctx);
return kk_std_time_timer__timer_box(t, _ctx);
}


static kk_box_t kk_unit_closure(kk_function_t _fself, kk_context_t* _ctx);
static kk_function_t kk_new_unit_closure(kk_context_t* _ctx) {
kk_define_static_function(_fself, kk_unit_closure, _ctx)
return kk_function_dup(_fself,kk_context());
}

static kk_box_t kk_unit_closure(kk_function_t _fself, kk_context_t* _ctx) {
kk_unused(_fself);
return kk_unit_box(kk_Unit);
}


kk_unit_t kk_clear_timeout(kk_box_t t, kk_context_t* _ctx) {
kk_std_time_timer__timer timer = kk_std_time_timer__timer_unbox(t, KK_OWNED, _ctx);
kk_std_os_uv_close(kk_std_os_uv__new_UvHandle(timer.internal, _ctx), kk_new_unit_closure(_ctx), _ctx);
return kk_Unit;
}



3 changes: 0 additions & 3 deletions lib/std/core/types.kk
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ pub type vector<a>
// A raw wrapper around a uint8 character array.
pub type bytes

// A raw wrapper around a uint8 character array.
pub type bytes

// An any type. Used for external calls.
pub type any

Expand Down
Loading

0 comments on commit ffbbfd9

Please sign in to comment.