IC
Size: a a a
IC
AH
AH
IC
IC
AH
AH
AH
IC
AH
AH
AH
IC
IC
IC
AH
AH
AH
static async Task ChannelImpl()
{
var outCh = Channel.CreateUnbounded<(string, DateTime, string[])>();
Channel.CreateUnbounded<(string, ICslQueryProvider)>()
.Source(kustoClients)
.ReadAllConcurrentlyAsync(maxConcurrency: parallelism, async x =>
{
var (cluster, client) = x;
await Channel
.CreateUnbounded<(DateTime, DateTime)>()
.Source(DateQuants())
.TaskPipeAsync(maxConcurrency: parallelism, transform: async tuple =>
{
var (start, finish) = tuple;
var logEntries = await LogsBetween(client, start, finish);
var logs = logEntries.Select(x => x.ToString()).ToArray();
return (cluster, start, logs);
})
.ReadAllAsync(async x =>
{
await outCh.Writer.WriteAsync(x);
});
});
await outCh.TaskReadAllAsync(async x =>
{
var (cluster, start, logs) = x;
await afterTask(cluster, start, logs);
});
}
AH
AH