Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New dotnet libraries and mechanisms #24

Open
yoav-melamed opened this issue Apr 29, 2020 · 27 comments
Open

New dotnet libraries and mechanisms #24

yoav-melamed opened this issue Apr 29, 2020 · 27 comments

Comments

@yoav-melamed
Copy link

yoav-melamed commented Apr 29, 2020

Hello Ami,
A chance to thank you for your great job!
I'm using STP a lot and this is defiantly my favorite library!

Do you consider for future releases some of the new dotnet libraries and mechanisms?

  • async/await?
  • ValueTasks?
  • System.Threading.Channels as a Base queue?

Thanks!

@amibar
Copy link
Owner

amibar commented Apr 30, 2020

Hi Yoav,

Can you please give examples of how you wish to use these features with STP?

Regards,
Ami

@yoav-melamed
Copy link
Author

Hi Ami,
Just thought maybe it will be a nice improvement to replace some of the blocking code and having some async methods (such as 'await enqueue' or 'await WaitForIdle')

The idea to combine it with ValueTask is just for having a better allocation than just using a regular Task.

Regarding the 'System.Threading.Channels', one thing I like is the 'bounded queue' backpressure mechanism that automatically waits the queue will have a room for additional items when the queue is full (instead of throwing a 'queue is full' exception).

Yoav

@amibar
Copy link
Owner

amibar commented May 8, 2020

Hi @yoav-melamed,

By async/await I thought you meant that when you enqueue an async method it will continue to work on the STP.

For example let's take this async method:

public async Task DoIt()
{ 
   // Do first thing
   await Task.Delay(10);
   // Do second thing
}

Today if you enqueue it into to the STP, the work item will complete after "first thing" is done, and "second thing" will be done on the .net Thread Pool (out of the STP threads).

With an async/await feature the STP can do the second thing on the STP and only then will the work item complete.

Is this a required feature? Or you just need the equeue and WaitForIdle to be awaitable ?

Ami

@yoav-melamed
Copy link
Author

Hi @amibar
Sorry for the super-late response - I missed your comment :/
Yes, having an awaitable task in the queue will defiantly be a big step for STP
It will allow each thread to be more efficient and will defiantly help with writing an async code

In addition, since the WaitForIdle is an expensive blocking method, having it async would be great.

@MichelZ
Copy link

MichelZ commented Oct 4, 2022

Oh yeah, async/await feature so that you can do QueueWorkItem(async () => await xxx())

@amibar
Copy link
Owner

amibar commented Oct 9, 2022

Hi @yoav-melamed & @MichelZ,

I started to work on the async/await a while ago and I am still figuring out how to solve some scenarios.
However, why do you want the async/await feature instead of just using it directly ?
I just want to know that my work will be useful for you.

Regards,
Ami

@yoav-melamed
Copy link
Author

Hi @amibar
The reasons from my end is to make every thread of STP the most efficient it could be:

  • Having an awaitable task so each thread of the STP could do several async operations
  • Having awaitable 'WaitForIdle' so while the main thread waits for the STP to complete, we can render the UI/do other things (this operation could take some time and in the meanwhile the tread could do other stuff while waiting for the STP to complete)
  • Having the STP to stop or cancel in an async way (this operation could take some time and in the meanwhile the tread could do other stuff while waiting for the STP to complete)

That is what I had in mind
Thank you for your hard work!

@amibar
Copy link
Owner

amibar commented Oct 11, 2022

Hi @yoav-melamed,

Thanks.

The threads of the STP suppose to be worker threads, hence they should use the CPU.
When your work item on the STP awaits, it just delegate the work to another arbitrary thread which the STP has no control on.

One of the ideas behind STP is to limit concurrency.
If, for example, you limited the concurrency to 1 (max threads) and send 10 work items that each one awaits for a file to download. You'll end up with 10 concurrent downloads that executes on the threads of the .net ThreadPool, while the STP thread is idle.

As for WaitForIdleAsync() and GetResultAsync(), it's quite reasonable tom implement.

What do you think?

Regards,
Ami

@MichelZ
Copy link

MichelZ commented Oct 11, 2022

I often use it to limit concurrency, yes.
But I also need to use other api's in the worker threads sometimes - and they happen to be more and more async. So it's more a matter of convenience (.GetAwaiter(false).GetResult() anyone?)

@amibar
Copy link
Owner

amibar commented Oct 11, 2022

Did you mean .ConfigureAwait(false) ?

@MichelZ
Copy link

MichelZ commented Oct 11, 2022

Yeah, that one :)

@amibar
Copy link
Owner

amibar commented Oct 11, 2022

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

@MichelZ
Copy link

MichelZ commented Oct 11, 2022

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Both. We use STP in a few projects

@amibar
Copy link
Owner

amibar commented Oct 11, 2022

I have dilemas regarding async methods and the WorkItemGroup.

  1. Lets say you enqueue 2 async work items into a WorkItemGroup with concurrency of 1. The 1st starts executing and then awaits, so the thread is released. Should the 2nd work item starts executing or should it wait until the 1st work item completes?
    (by "complete" I mean the async method returns rather than awaits)

  2. Another case is when several work items are enqued to a WorkItemGroup and after a while they are all in awaiting state. Does it mean that the WorkItemGroup is idle ? Or only when all these work items complete?

@yoav-melamed
Copy link
Author

Hi Ami,
For you question

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Same as @MichelZ , I'm using STP in several projects and it depends on the project I'm working on. Mainly I'm using the STP directly, but in some project, I need to have 4 workers that collect data each from a different source, and add it to a centralized queue to be processed (and each worker have a different priority) so - I'm using the WIG there

When your work item on the STP awaits, it just delegate the work to another arbitrary thread which the STP has no control on

I thought a lot about what you said, and you are right - executing an async task will make the dotnet ThreadPool control the thread life cycle, which is the opposite of what we want.
For calling async API's I think it's better to use the old fashion way (and NOT using ConfigureAwait(false) because we do want the results to returned to the caller thread [which is the STP workitem thread]).

A Sample I wrote to demonstrate what I mean:

using System;
using System.Threading.Tasks;
using Amib.Threading;

const int NUM_OF_ITEM = 10;

SmartThreadPool stp = new();
stp.Concurrency = 1;

for (int i = 1; i <= NUM_OF_ITEM; i++)
    stp.QueueWorkItem(Executor, i);

stp.WaitForIdle(); // would like to have `await stp.WaitForIdleAsync()` here

Console.WriteLine("All Done.");

void Executor(int ItemNumber)
{
    Console.WriteLine($"Executing item number {ItemNumber}...");

    // executing some async API
    var task = Task<int>.Run(async () =>
    {
        await Task.Delay(1000);
        return new Random().Next(1, 1000);
    });
    var results = task.Result;

    Console.WriteLine($"Item {ItemNumber} has done with results of {results}.");
}

@amibar
Copy link
Owner

amibar commented Oct 15, 2022

Hi @yoav-melamed,

I looked at your sample and I realized that using the QueueWorkItem for a Task is not the best solution.

When this project started (.net 1.1) the Task type didn’t exist yet, so I had to wrap delegates with objects (WorkItem) and return promises (IWorkItemResult). A Task is a promise, so I can harness its methods, instead of treating it like a delegate.

So for a Task I prefer using RunTask, similar to Task.Run(), rewriting your sample it should look like this:

using System;
using System.Threading.Tasks;
using Amib.Threading;

const int NUM_OF_ITEM = 10;

SmartThreadPool stp = new SmartThreadPool();
stp.Concurrency = 1;

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    _ = stp.RunTask(() => Executor(i));
}

// The tasks are converted to work items internally, so you can wait for idle on the stp as usual
await stp.WaitForIdleAsync(); 

Console.WriteLine("All Done.");

// Executor is an async method
async Task<int> Executor(int ItemNumber)
{
    Console.WriteLine($"Executing item number {ItemNumber}...");

    await Task.Delay(1000);

    var results = new Random().Next(1, 1000);

    Console.WriteLine($"Item {ItemNumber} has done with results of {results}.");

    return results;
}

The RunTask also supports cancelletion token and priority.

I can still implement the QueueWorkItem for tasks, but I don't think it necessary.

What’s your opinion?

I pushed a branch named async with what I did so far.

Ami

@yoav-melamed
Copy link
Author

Hi @amibar
That looks like a great idea! I think that's exactly what @MichelZ and myself are looking for.
I'm going to pull the async branch and take it on a tour - this will defiantly help us with all related to async operations.

Does the RunTask() method will execute a task? or queue it on the stp ThreadPool (like the .net Task.Run() is doing behind the scenes?

If so, would you be able to queue tasks also on WokItemGroups? or for now - only on the stp itself?

@yoav-melamed
Copy link
Author

Hi @amibar
After pulling the async branch and trying it myself, there is an issue I found with how the code is running:
The stp.Concurrency = 1; has no effect on the current code. All the items fired simultaneously and not 1 after the other.
Changing it to stp.Concurrency = 5; (for example) has no effect (compared to the original code) and if I'm getting it right - the reason is we are not actually queueing the items, but just firing tasks. Am I right?
Did you notice this behavior on your side as well?

@yoav-melamed
Copy link
Author

if I'm getting it right - the reason is we are not actually queueing the items, but just firing tasks. Am I right?
Did you notice this behavior on your side as well?

Ignore this comment... I saw the code:

if (cancellationToken?.IsCancellationRequested ?? false)
    return Task.FromCanceled(cancellationToken.Value);

PreQueueWorkItem();
WorkItem workItem = WorkItemFactory.CreateWorkItem(
    this,
    WIGStartInfo,
    _ =>
    {
         action();
         return null;
    },
    priority);
Enqueue(workItem);

var wir = workItem.GetWorkItemResult();
cancellationToken?.Register(() => wir.Cancel());

return wir.GetResultAsync();

You queue the task, convert it to WorkItem, and return a Promise (as you explain in your previous comment).
I am still trying to investigate why it's not working as expected and all the tasks runs in parallel while ignoring the concurrency parameter

@amibar
Copy link
Owner

amibar commented Oct 18, 2022

Hi @yoav-melamed,

The concurrency is of the threads not the tasks.
When a work item awaits, it releases the thread to another work item.

Ami

@yoav-melamed
Copy link
Author

Hi @amibar
Sorry, maybe I got confused, so what are the use cases you think RunTask will resolve? Because as for now I thought it will be executed on the queue as a work item but in awaitable way - probably my misunderstanding.

@amibar
Copy link
Owner

amibar commented Oct 22, 2022

Hi @yoav-melamed,

I pushed anothee commit to the async branch.

This time a WorkItemsGroup cannot have more than the Concurrency work items executing.

Please check if it works for you.

Thanks,
Ami

@yoav-melamed
Copy link
Author

Hi @amibar
Thanks for your feedback!
I tried to re-run the sample above with the updated async branch, but the results were the same:
When the stp.Concurrency = 1 all the tasks were executed simultaneously and returned the results after 1 second (the delay time in the Executor).

@amibar
Copy link
Owner

amibar commented Oct 23, 2022

@yoav-melamed, check the WorkItemsGroup, I didn't update the STP itself.

@yoav-melamed
Copy link
Author

Hi @amibar
I made the following changes in my code:

//stp.Concurrency = 1;
var wig = stp.CreateWorkItemsGroup(concurrency: 5);

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    //_ = stp.RunTask(() => Executor(i));

    _ = wig.RunTask(() => Executor(i));
}

Now the execution is running as expected (if concurrency is 1, the items are being executed one after the other, if it's 5 (for example) so only 5 in parallel are invoked).

The issue I noticed now is that since the RunTak returns a Task that we are not await to, all the items passed to the Executor() function have the value of 11, and it makes the Executor run with the wrong values. We cannot await to RunTask because it's executing it and not adding it to the queue. Is there any way you can think of to bypass it and make execute with the correct values?

@amibar
Copy link
Owner

amibar commented Oct 25, 2022

@yoav-melamed,

Check https://stackoverflow.com/questions/271440/captured-variable-in-a-loop-in-c-sharp to solve this.

You should copy i to another local variable before passing it to the Executor
Something like:

//stp.Concurrency = 1;
var wig = stp.CreateWorkItemsGroup(concurrency: 5);

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    //_ = stp.RunTask(() => Executor(i));
   var i2 = i;
    _ = wig.RunTask(() => Executor(i2));
}


@yoav-melamed
Copy link
Author

Hi @amibar
That's working great now - thank you!
Is there a more elegant way to handle the 'copy to another variable' functionality? or that's just something we will have to do for it to work?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants