AH
static class Velosiped
{
public static async IAsyncEnumerable<O> SelectAwaitConcurrent<I,O>(this IAsyncEnumerable<I> source, int parallelism, Func<I, Task<O>> func)
{
var e = source.GetAsyncEnumerator();
var semaphore = new SemaphoreSlim(parallelism);
var tasks = new HashSet<Task<O>>(parallelism);
try
{
while (await e.MoveNextAsync())
{
if (semaphore.CurrentCount == 0)
{
var completed = await Task.WhenAny(tasks);
tasks.Remove(completed);
yield return await completed;
}
await semaphore.WaitAsync();
var resultTask = func(e.Current);
tasks.Add(resultTask);
semaphore.Release();
}
while (tasks.Count > 0)
{
var completed = await Task.WhenAny(tasks);
tasks.Remove(completed);
yield return await completed;
}
}
finally
{
await e.DisposeAsync();
semaphore.Dispose();
}
}
}