AH
Size: a a a
AH
AH
AH
AH
AH
IC
AH
IC
IC
AH
I
I
AH
AH
AH
let private linkOptions = DataflowLinkOptions (PropagateCompletion = true)
let private getLogsBlock(cluster, kustoClient) =
TransformBlock<DateTime * DateTime,
string * DateTime * string[]>(fun (start, finish) -> task {
let! logEntries = logsBetween kustoClient start finish
let allLines = [| for log in logEntries -> log.ToString() |]
return (cluster, start, allLines)
}, ExecutionDataflowBlockOptions(MaxDegreeOfParallelism = 1))
let writeToFileBlock = ActionBlock<string*DateTime*string[]>(fun (cluster, start, allLines) -> unitTask {
do! afterTask(cluster, start, allLines)
}, ExecutionDataflowBlockOptions(MaxDegreeOfParallelism = 1))
let impl(): Task<unit> = task {
do! kustoClients
|> Seq.map(fun (cluster, kustoClient) -> unitTask {
let logs = getLogsBlock (cluster, kustoClient)
logs.LinkTo(writeToFileBlock, linkOptions) |> ignore
for _, start, finish in dateQuants do
let! _ = logs.SendAsync((start, finish))
()
})
|> Task.WhenAll
}
AH
AH
I
IC
S
let private linkOptions = DataflowLinkOptions (PropagateCompletion = true)
let private getLogsBlock(cluster, kustoClient) =
TransformBlock<DateTime * DateTime,
string * DateTime * string[]>(fun (start, finish) -> task {
let! logEntries = logsBetween kustoClient start finish
let allLines = [| for log in logEntries -> log.ToString() |]
return (cluster, start, allLines)
}, ExecutionDataflowBlockOptions(MaxDegreeOfParallelism = 1))
let writeToFileBlock = ActionBlock<string*DateTime*string[]>(fun (cluster, start, allLines) -> unitTask {
do! afterTask(cluster, start, allLines)
}, ExecutionDataflowBlockOptions(MaxDegreeOfParallelism = 1))
let impl(): Task<unit> = task {
do! kustoClients
|> Seq.map(fun (cluster, kustoClient) -> unitTask {
let logs = getLogsBlock (cluster, kustoClient)
logs.LinkTo(writeToFileBlock, linkOptions) |> ignore
for _, start, finish in dateQuants do
let! _ = logs.SendAsync((start, finish))
()
})
|> Task.WhenAll
}