ParallelForEachAsync
Need to make a ParallelForEachAsync that works independently and runs in C#, inspiration from JavaScript way to handle async/await is like so
async function Promise1() {
throw "Failure!";
}
async function Promise2() {
return "Success!";
}
const [Promise1Result, Promise2Result] = await Promise.allSettled([Promise1(), Promise2()]);
console.log(Promise1Result); // {status: "rejected", reason: "Failure!"}
console.log(Promise2Result); // {status: "fulfilled", value: "Success!"}
Found this blog article about creating ParallelForEachAsync and it works well, but is missing the further wrapping you need to do for holding the results as a Task<T> result
This is what I came up with
public static async Task<IEnumerable<Z>> ParallelForEachAsync<T, Z>(this IEnumerable<T> source, Func<T, Task<Z>> runFn, int maxParallelism)
{
var allResults = await Task.WhenAll(Partitioner
.Create(source)
.GetPartitions(maxParallelism)
.AsParallel()
.Select(RunAsSetFn));
return allResults.SelectMany(k => k);
async Task<ConcurrentBag<Z>> RunAsSetFn(IEnumerator<T> partition)
{
var setResults = new ConcurrentBag<Z>();
using (partition)
{
while (partition.MoveNext())
{
await Task.Yield(); // prevents a sync/hot thread hangup
var response = await runFn(partition.Current);
setResults.Add(response);
}
}
return setResults;
}
}
Here the generic type Task<Z>
allows for passing back a typed result on the awaited Task that was missing from the HouseOfCat implementation.
Full implementation filled in with Result holders
public static class ParallelExtensions
{
public static int DefaultParallelism
{
get
{
var maxParallelism = (int) Math.Round(Environment.ProcessorCount * .75, MidpointRounding.AwayFromZero);
return maxParallelism <= 0 ? 1 : maxParallelism;
}
}
public static async Task<IEnumerable<Z>> ParallelForEachAsync<T, Z>(this IEnumerable<T> source, Func<T, Task<Z>> runFn, int maxParallelism)
{
var allResults = await Task.WhenAll(Partitioner
.Create(source)
.GetPartitions(maxParallelism)
.AsParallel()
.Select(RunAsSetFn));
return allResults.SelectMany(k => k);
async Task<ConcurrentBag<Z>> RunAsSetFn(IEnumerator<T> partition)
{
var setResults = new ConcurrentBag<Z>();
using (partition)
{
while (partition.MoveNext())
{
await Task.Yield(); // prevents a sync/hot thread hangup
var response = await runFn(partition.Current);
setResults.Add(response);
}
}
return setResults;
}
}
}
public class Result
{
public bool Succeeded { get; set; }
public List<ErrorMessage> ErrorMessages { get; set; } = new();
public static Result OK()
{
return new Result
{
Succeeded = true
};
}
public static Result Failed(string text)
{
return new Result
{
ErrorMessages = [new ErrorMessage{ Text = text }]
};
}
}
public class ErrorMessage
{
public string Text { get; set; }
public int Code { get; set; }
}
public class ErrorCode
{
public const int NoOperation = -1;
}
Accompanying unit test code
public class ParallelForEachFacts
{
readonly ITestOutputHelper output;
public ParallelForEachFacts(ITestOutputHelper testConsole)
{
output = testConsole;
}
private static async Task<Result> RandomWaitTimeAsync(string key, object data, Action<string> logFn = null)
{
const int minMilliseconds = 200;
var waitTime = TimeSpan.FromMilliseconds(Faker.RandomNumber.Next(minMilliseconds, minMilliseconds * 10));
var message = $"Waiting for {waitTime}, input args {key}";
logFn?.Invoke(message);
await Task.Delay(waitTime);
return data == null
? Result.Failed("Input is null")
: Result.OK();
}
private static KeyValuePair<string, object> RandomArg(object data)
{
return new KeyValuePair<string, object>($"{data}: {Faker.Company.Name()}_{Guid.NewGuid().ToString()}", data);
}
private Action<string> WriteToLogFn()
{
return s => { output?.WriteLine(s); };
}
[Fact]
public async Task BatchedParallelFunctionsTest()
{
var start = Faker.RandomNumber.Next(10, 15);
var end = Faker.RandomNumber.Next(25, 35);
var maxParallelism = ParallelExtensions.DefaultParallelism;
if (maxParallelism > start)
{
maxParallelism = start;
}
var randomFns = new Dictionary<string, Func<string, object, Action<string>, Task<Result>>>();
foreach (var index in Enumerable.Range(0, Faker.RandomNumber.Next(start, end)))
{
var batch = Math.DivRem(index, maxParallelism, out var remainder);
var currentArgs = RandomArg($"In batch: {batch + 1}, position {remainder}");
randomFns.Add(currentArgs.Key, async (s, o, arg3) => await RandomWaitTimeAsync(s, o, arg3));
}
Assert.True(randomFns.Count >= start);
output.WriteLine("**************");
output.WriteLine($"Will run {randomFns.Count} tasks");
var partitions = Math.DivRem(randomFns.Count, maxParallelism, out var unused);
output.WriteLine($"Using partitions of size {maxParallelism} will create {partitions + 1} partitions");
output.WriteLine("**************");
var innerLog = WriteToLogFn();
var testFn = new Func<string, object, Task<Result>>(async (z, testObj) =>
{
var timing = Stopwatch.StartNew();
var result = await RandomWaitTimeAsync(z, testObj, WriteToLogFn());
if (result.Succeeded)
{
innerLog($"Complete {z} {timing.Elapsed}");
return Result.OK();
}
innerLog($"Failed to test {z} check logs {result.ErrorMessages}");
if (result.ErrorMessages.All(x => x.Code == ErrorCode.NoOperation))
{
innerLog($"Complete {z} with all errors of type {ErrorCode.NoOperation} {timing.Elapsed}");
return Result.OK();
}
return Result.Failed($"{z} failed, details {result.ErrorMessages} {timing.Elapsed}");
});
var setResults = (await randomFns
.ParallelForEachAsync(async z => await testFn(z.Key, z.Value), maxParallelism))
.ToList();
Assert.NotEmpty(setResults);
Assert.All(setResults, z =>
{
Assert.True(z.Succeeded);
});
}
}
Run results
**************
Will run 26 tasks
Using partitions of size 14 will create 2 partitions
**************
Waiting for 00:00:01.1680000, input args In batch: 1, position 9: Zulauf-Krajcik_d164a827-e451-467c-a779-07c9d8b695a8
Waiting for 00:00:00.2070000, input args In batch: 1, position 11: Green, Heaney and Hilll_6bb2a551-a05a-474b-89dc-cfd16fd7b760
Waiting for 00:00:01.6260000, input args In batch: 1, position 6: Cummerata-Kling_65f03fbc-5bdb-4a06-a997-3f59e455bc03
Waiting for 00:00:00.7660000, input args In batch: 1, position 2: Price-Barrows_03751e00-3213-488c-92e6-41effb76d6de
Waiting for 00:00:00.3100000, input args In batch: 1, position 7: Nienow Inc and Sons_5f560ea2-35e0-49ed-92fa-0f643c156eda
Waiting for 00:00:00.8700000, input args In batch: 1, position 5: Stehr, Quigley and Casper_2e7a413e-7b53-4ea3-9cef-34848a967ec4
Waiting for 00:00:00.2080000, input args In batch: 1, position 0: Kihn, Rogahn and Daugherty_721b1c22-751e-4732-b049-a366cb836977
Waiting for 00:00:00.5730000, input args In batch: 1, position 13: Shanahan, Shields and Johnson_61ea1a08-e9af-4808-be16-8c640ddc1509
Waiting for 00:00:01.6980000, input args In batch: 1, position 4: Runolfsson-Aufderhar_7649501a-88dd-40b7-a91c-eaff104d6c3b
Waiting for 00:00:01.4930000, input args In batch: 1, position 3: Bayer, Willms and Ledner_ca102289-af82-4864-8030-4911396a0105
Waiting for 00:00:01.8130000, input args In batch: 1, position 10: Mueller, Gibson and Homenick_5680a903-ec29-45e4-bd75-67aa733c2e7e
Waiting for 00:00:01.2450000, input args In batch: 1, position 1: Sauer-Kovacek_897d7cdb-6744-49ed-a2b3-cfe9d800135e
Waiting for 00:00:00.4590000, input args In batch: 1, position 12: Block-Schamberger_1595deff-c32f-40da-b630-28d54afce222
Waiting for 00:00:00.7110000, input args In batch: 1, position 8: Pacocha-Torphy_0bb4b8b4-67eb-4749-a596-17a6c5b60b03
Complete In batch: 1, position 11: Green, Heaney and Hilll_6bb2a551-a05a-474b-89dc-cfd16fd7b760 00:00:00.2171558
Complete In batch: 1, position 0: Kihn, Rogahn and Daugherty_721b1c22-751e-4732-b049-a366cb836977 00:00:00.2170142
Waiting for 00:00:01.7110000, input args In batch: 2, position 1: Kreiger, Dickens and Kozey_1996203b-2673-4c0d-86b9-f6429faa0000
Waiting for 00:00:01.7440000, input args In batch: 2, position 0: Osinski-Kozey_8262fa25-dd19-491a-bffa-145aaf0e4ff2
Complete In batch: 1, position 7: Nienow Inc and Sons_5f560ea2-35e0-49ed-92fa-0f643c156eda 00:00:00.3252179
Waiting for 00:00:01.2000000, input args In batch: 2, position 2: Bashirian, Bergnaum and Rodriguez_1b683759-4406-4d9e-b54f-52ae5f8218ea
Complete In batch: 1, position 12: Block-Schamberger_1595deff-c32f-40da-b630-28d54afce222 00:00:00.4634332
Waiting for 00:00:01.7400000, input args In batch: 2, position 3: Macejkovic Group_9f62f13b-a15e-475c-96c3-173d4ec6b2cd
Complete In batch: 1, position 13: Shanahan, Shields and Johnson_61ea1a08-e9af-4808-be16-8c640ddc1509 00:00:00.5881410
Waiting for 00:00:01.7170000, input args In batch: 2, position 4: Jacobi Inc and Sons_ecdfe8c5-057c-4127-86e7-f91fe7155f1b
Complete In batch: 1, position 8: Pacocha-Torphy_0bb4b8b4-67eb-4749-a596-17a6c5b60b03 00:00:00.7253468
Waiting for 00:00:00.4680000, input args In batch: 2, position 5: Crist-Beer_fe1f6ec8-f4b2-40f8-8059-884247892791
Complete In batch: 1, position 2: Price-Barrows_03751e00-3213-488c-92e6-41effb76d6de 00:00:00.7733121
Waiting for 00:00:00.9380000, input args In batch: 2, position 6: Zemlak Group_73eee9c6-0865-4030-868a-1d4817f549d8
Complete In batch: 1, position 5: Stehr, Quigley and Casper_2e7a413e-7b53-4ea3-9cef-34848a967ec4 00:00:00.8826974
Waiting for 00:00:01.3030000, input args In batch: 2, position 7: Keeling Group_c8b09205-a8fc-44c0-8ca2-7753257148c9
Complete In batch: 1, position 9: Zulauf-Krajcik_d164a827-e451-467c-a779-07c9d8b695a8 00:00:01.1771468
Waiting for 00:00:00.6950000, input args In batch: 2, position 8: Larkin Inc and Sons_b81f3056-3655-493e-a31c-06dcb63851b9
Complete In batch: 2, position 5: Crist-Beer_fe1f6ec8-f4b2-40f8-8059-884247892791 00:00:00.4822062
Waiting for 00:00:01.2280000, input args In batch: 2, position 9: Stoltenberg LLC_ab7aa564-da33-4a12-af22-e07d4d4756d8
Complete In batch: 1, position 1: Sauer-Kovacek_897d7cdb-6744-49ed-a2b3-cfe9d800135e 00:00:01.2555281
Waiting for 00:00:01.7070000, input args In batch: 2, position 10: D'Amore-Wisozk_b38fc9eb-8800-412c-a507-25f0383d25b6
Complete In batch: 1, position 3: Bayer, Willms and Ledner_ca102289-af82-4864-8030-4911396a0105 00:00:01.5045898
Waiting for 00:00:01.4310000, input args In batch: 2, position 11: Rodriguez, Stracke and Kshlerin_559d95d7-1a2b-4595-962e-06e82cd4fe74
Complete In batch: 2, position 2: Bashirian, Bergnaum and Rodriguez_1b683759-4406-4d9e-b54f-52ae5f8218ea 00:00:01.2106117
Complete In batch: 1, position 6: Cummerata-Kling_65f03fbc-5bdb-4a06-a997-3f59e455bc03 00:00:01.6298709
Complete In batch: 1, position 4: Runolfsson-Aufderhar_7649501a-88dd-40b7-a91c-eaff104d6c3b 00:00:01.7084094
Complete In batch: 2, position 6: Zemlak Group_73eee9c6-0865-4030-868a-1d4817f549d8 00:00:00.9521255
Complete In batch: 1, position 10: Mueller, Gibson and Homenick_5680a903-ec29-45e4-bd75-67aa733c2e7e 00:00:01.8172318
Complete In batch: 2, position 8: Larkin Inc and Sons_b81f3056-3655-493e-a31c-06dcb63851b9 00:00:00.7017471
Complete In batch: 2, position 1: Kreiger, Dickens and Kozey_1996203b-2673-4c0d-86b9-f6429faa0000 00:00:01.7238026
Complete In batch: 2, position 0: Osinski-Kozey_8262fa25-dd19-491a-bffa-145aaf0e4ff2 00:00:01.7564155
Complete In batch: 2, position 7: Keeling Group_c8b09205-a8fc-44c0-8ca2-7753257148c9 00:00:01.3107621
Complete In batch: 2, position 3: Macejkovic Group_9f62f13b-a15e-475c-96c3-173d4ec6b2cd 00:00:01.7447568
Complete In batch: 2, position 4: Jacobi Inc and Sons_ecdfe8c5-057c-4127-86e7-f91fe7155f1b 00:00:01.7301797
Complete In batch: 2, position 9: Stoltenberg LLC_ab7aa564-da33-4a12-af22-e07d4d4756d8 00:00:01.2336234
Complete In batch: 2, position 11: Rodriguez, Stracke and Kshlerin_559d95d7-1a2b-4595-962e-06e82cd4fe74 00:00:01.4371870
Complete In batch: 2, position 10: D'Amore-Wisozk_b38fc9eb-8800-412c-a507-25f0383d25b6 00:00:01.7164414