ParallelForEachAsync

Need to make a ParallelForEachAsync that works independently and runs in C#, inspiration from JavaScript way to handle async/await is like so

async function Promise1() {
    throw "Failure!";
}

async function Promise2() {
    return "Success!";
}

const [Promise1Result, Promise2Result] = await Promise.allSettled([Promise1(), Promise2()]);

console.log(Promise1Result); // {status: "rejected", reason: "Failure!"}
console.log(Promise2Result); // {status: "fulfilled", value: "Success!"}

Found this blog article about creating ParallelForEachAsync and it works well, but is missing the further wrapping you need to do for holding the results as a Task<T> result

This is what I came up with

public static async Task<IEnumerable<Z>> ParallelForEachAsync<T, Z>(this IEnumerable<T> source, Func<T, Task<Z>> runFn, int maxParallelism)
{
	var allResults = await Task.WhenAll(Partitioner
		.Create(source)
		.GetPartitions(maxParallelism)
		.AsParallel()
		.Select(RunAsSetFn));
	return allResults.SelectMany(k => k);

	async Task<ConcurrentBag<Z>> RunAsSetFn(IEnumerator<T> partition)
	{
		var setResults = new ConcurrentBag<Z>();
		using (partition)
		{
			while (partition.MoveNext())
			{
				await Task.Yield(); // prevents a sync/hot thread hangup
				var response = await runFn(partition.Current);
				setResults.Add(response);
			}
		}
		return setResults;
	}
}

Here the generic type Task<Z>allows for passing back a typed result on the awaited Task that was missing from the HouseOfCat implementation.

Full implementation filled in with Result holders

public static class ParallelExtensions
{
	public static int DefaultParallelism
	{
		get
		{
			var maxParallelism = (int) Math.Round(Environment.ProcessorCount * .75, MidpointRounding.AwayFromZero);
			return maxParallelism <= 0 ? 1 : maxParallelism;
		}
	}

	public static async Task<IEnumerable<Z>> ParallelForEachAsync<T, Z>(this IEnumerable<T> source, Func<T, Task<Z>> runFn, int maxParallelism)
	{
		var allResults = await Task.WhenAll(Partitioner
			.Create(source)
			.GetPartitions(maxParallelism)
			.AsParallel()
			.Select(RunAsSetFn));
		return allResults.SelectMany(k => k);

		async Task<ConcurrentBag<Z>> RunAsSetFn(IEnumerator<T> partition)
		{
			var setResults = new ConcurrentBag<Z>();
			using (partition)
			{
				while (partition.MoveNext())
				{
					await Task.Yield(); // prevents a sync/hot thread hangup
					var response = await runFn(partition.Current);
					setResults.Add(response);
				}
			}
			return setResults;
		}
	}
}

public class Result
{
	public bool Succeeded { get; set; }
	public List<ErrorMessage> ErrorMessages { get; set; } = new();

	public static Result OK()
	{
		return new Result
		{
			Succeeded = true
		};
	}

	public static Result Failed(string text)
	{
		return new Result
		{
			ErrorMessages = [new ErrorMessage{ Text = text }]
		};
	}
}

public class ErrorMessage
{
	public string Text { get; set; }
	public int Code { get; set; }
}

public class ErrorCode
{
	public const int NoOperation = -1;
}

Accompanying unit test code

public class ParallelForEachFacts
{
	readonly ITestOutputHelper output;

	public ParallelForEachFacts(ITestOutputHelper testConsole)
	{
		output = testConsole;
	}        
	
	private static async Task<Result> RandomWaitTimeAsync(string key, object data, Action<string> logFn = null)
	{
		const int minMilliseconds = 200;
		var waitTime = TimeSpan.FromMilliseconds(Faker.RandomNumber.Next(minMilliseconds, minMilliseconds * 10));
		var message = $"Waiting for {waitTime}, input args {key}";
		logFn?.Invoke(message);
		await Task.Delay(waitTime);

		return data == null
			? Result.Failed("Input is null")
			: Result.OK();
	}

	private static KeyValuePair<string, object> RandomArg(object data)
	{
		return new KeyValuePair<string, object>($"{data}: {Faker.Company.Name()}_{Guid.NewGuid().ToString()}", data);
	}

	private Action<string> WriteToLogFn()
	{
		return s => { output?.WriteLine(s); };
	}
	
	
	[Fact]
	public async Task BatchedParallelFunctionsTest()
	{
		var start = Faker.RandomNumber.Next(10, 15);
		var end = Faker.RandomNumber.Next(25, 35);
		var maxParallelism = ParallelExtensions.DefaultParallelism;
		
		if (maxParallelism > start)
		{
			maxParallelism = start;
		}
		var randomFns = new Dictionary<string, Func<string, object, Action<string>, Task<Result>>>();
		foreach (var index in Enumerable.Range(0, Faker.RandomNumber.Next(start, end)))
		{
			var batch = Math.DivRem(index, maxParallelism, out var remainder);
			var currentArgs = RandomArg($"In batch: {batch + 1}, position {remainder}");
			randomFns.Add(currentArgs.Key, async (s, o, arg3) => await RandomWaitTimeAsync(s, o, arg3));
		}
		Assert.True(randomFns.Count >= start);
		output.WriteLine("**************");
		output.WriteLine($"Will run {randomFns.Count} tasks");
		var partitions = Math.DivRem(randomFns.Count, maxParallelism, out var unused);
		output.WriteLine($"Using partitions of size {maxParallelism} will create {partitions + 1} partitions");
		output.WriteLine("**************");
		
		var innerLog = WriteToLogFn();

		var testFn = new Func<string, object, Task<Result>>(async (z, testObj) =>
		{
			var timing = Stopwatch.StartNew();
			var result = await RandomWaitTimeAsync(z, testObj, WriteToLogFn());
			if (result.Succeeded)
			{
				innerLog($"Complete {z} {timing.Elapsed}");
				return Result.OK();
			}
			
			innerLog($"Failed to test {z} check logs {result.ErrorMessages}");
			if (result.ErrorMessages.All(x => x.Code == ErrorCode.NoOperation))
			{
				innerLog($"Complete {z} with all errors of type {ErrorCode.NoOperation} {timing.Elapsed}");
				return Result.OK();
			}
			
			return Result.Failed($"{z} failed, details {result.ErrorMessages} {timing.Elapsed}");
		});

		var setResults = (await randomFns
			.ParallelForEachAsync(async z => await testFn(z.Key, z.Value), maxParallelism))
			.ToList();

		Assert.NotEmpty(setResults);
		
		Assert.All(setResults, z =>
		{
			Assert.True(z.Succeeded);
		});
	}
}

Run results

**************
Will run 26 tasks
Using partitions of size 14 will create 2 partitions
**************
Waiting for 00:00:01.1680000, input args In batch: 1, position 9: Zulauf-Krajcik_d164a827-e451-467c-a779-07c9d8b695a8
Waiting for 00:00:00.2070000, input args In batch: 1, position 11: Green, Heaney and Hilll_6bb2a551-a05a-474b-89dc-cfd16fd7b760
Waiting for 00:00:01.6260000, input args In batch: 1, position 6: Cummerata-Kling_65f03fbc-5bdb-4a06-a997-3f59e455bc03
Waiting for 00:00:00.7660000, input args In batch: 1, position 2: Price-Barrows_03751e00-3213-488c-92e6-41effb76d6de
Waiting for 00:00:00.3100000, input args In batch: 1, position 7: Nienow Inc and Sons_5f560ea2-35e0-49ed-92fa-0f643c156eda
Waiting for 00:00:00.8700000, input args In batch: 1, position 5: Stehr, Quigley and Casper_2e7a413e-7b53-4ea3-9cef-34848a967ec4
Waiting for 00:00:00.2080000, input args In batch: 1, position 0: Kihn, Rogahn and Daugherty_721b1c22-751e-4732-b049-a366cb836977
Waiting for 00:00:00.5730000, input args In batch: 1, position 13: Shanahan, Shields and Johnson_61ea1a08-e9af-4808-be16-8c640ddc1509
Waiting for 00:00:01.6980000, input args In batch: 1, position 4: Runolfsson-Aufderhar_7649501a-88dd-40b7-a91c-eaff104d6c3b
Waiting for 00:00:01.4930000, input args In batch: 1, position 3: Bayer, Willms and Ledner_ca102289-af82-4864-8030-4911396a0105
Waiting for 00:00:01.8130000, input args In batch: 1, position 10: Mueller, Gibson and Homenick_5680a903-ec29-45e4-bd75-67aa733c2e7e
Waiting for 00:00:01.2450000, input args In batch: 1, position 1: Sauer-Kovacek_897d7cdb-6744-49ed-a2b3-cfe9d800135e
Waiting for 00:00:00.4590000, input args In batch: 1, position 12: Block-Schamberger_1595deff-c32f-40da-b630-28d54afce222
Waiting for 00:00:00.7110000, input args In batch: 1, position 8: Pacocha-Torphy_0bb4b8b4-67eb-4749-a596-17a6c5b60b03
Complete In batch: 1, position 11: Green, Heaney and Hilll_6bb2a551-a05a-474b-89dc-cfd16fd7b760 00:00:00.2171558
Complete In batch: 1, position 0: Kihn, Rogahn and Daugherty_721b1c22-751e-4732-b049-a366cb836977 00:00:00.2170142
Waiting for 00:00:01.7110000, input args In batch: 2, position 1: Kreiger, Dickens and Kozey_1996203b-2673-4c0d-86b9-f6429faa0000
Waiting for 00:00:01.7440000, input args In batch: 2, position 0: Osinski-Kozey_8262fa25-dd19-491a-bffa-145aaf0e4ff2
Complete In batch: 1, position 7: Nienow Inc and Sons_5f560ea2-35e0-49ed-92fa-0f643c156eda 00:00:00.3252179
Waiting for 00:00:01.2000000, input args In batch: 2, position 2: Bashirian, Bergnaum and Rodriguez_1b683759-4406-4d9e-b54f-52ae5f8218ea
Complete In batch: 1, position 12: Block-Schamberger_1595deff-c32f-40da-b630-28d54afce222 00:00:00.4634332
Waiting for 00:00:01.7400000, input args In batch: 2, position 3: Macejkovic Group_9f62f13b-a15e-475c-96c3-173d4ec6b2cd
Complete In batch: 1, position 13: Shanahan, Shields and Johnson_61ea1a08-e9af-4808-be16-8c640ddc1509 00:00:00.5881410
Waiting for 00:00:01.7170000, input args In batch: 2, position 4: Jacobi Inc and Sons_ecdfe8c5-057c-4127-86e7-f91fe7155f1b
Complete In batch: 1, position 8: Pacocha-Torphy_0bb4b8b4-67eb-4749-a596-17a6c5b60b03 00:00:00.7253468
Waiting for 00:00:00.4680000, input args In batch: 2, position 5: Crist-Beer_fe1f6ec8-f4b2-40f8-8059-884247892791
Complete In batch: 1, position 2: Price-Barrows_03751e00-3213-488c-92e6-41effb76d6de 00:00:00.7733121
Waiting for 00:00:00.9380000, input args In batch: 2, position 6: Zemlak Group_73eee9c6-0865-4030-868a-1d4817f549d8
Complete In batch: 1, position 5: Stehr, Quigley and Casper_2e7a413e-7b53-4ea3-9cef-34848a967ec4 00:00:00.8826974
Waiting for 00:00:01.3030000, input args In batch: 2, position 7: Keeling Group_c8b09205-a8fc-44c0-8ca2-7753257148c9
Complete In batch: 1, position 9: Zulauf-Krajcik_d164a827-e451-467c-a779-07c9d8b695a8 00:00:01.1771468
Waiting for 00:00:00.6950000, input args In batch: 2, position 8: Larkin Inc and Sons_b81f3056-3655-493e-a31c-06dcb63851b9
Complete In batch: 2, position 5: Crist-Beer_fe1f6ec8-f4b2-40f8-8059-884247892791 00:00:00.4822062
Waiting for 00:00:01.2280000, input args In batch: 2, position 9: Stoltenberg LLC_ab7aa564-da33-4a12-af22-e07d4d4756d8
Complete In batch: 1, position 1: Sauer-Kovacek_897d7cdb-6744-49ed-a2b3-cfe9d800135e 00:00:01.2555281
Waiting for 00:00:01.7070000, input args In batch: 2, position 10: D'Amore-Wisozk_b38fc9eb-8800-412c-a507-25f0383d25b6
Complete In batch: 1, position 3: Bayer, Willms and Ledner_ca102289-af82-4864-8030-4911396a0105 00:00:01.5045898
Waiting for 00:00:01.4310000, input args In batch: 2, position 11: Rodriguez, Stracke and Kshlerin_559d95d7-1a2b-4595-962e-06e82cd4fe74
Complete In batch: 2, position 2: Bashirian, Bergnaum and Rodriguez_1b683759-4406-4d9e-b54f-52ae5f8218ea 00:00:01.2106117
Complete In batch: 1, position 6: Cummerata-Kling_65f03fbc-5bdb-4a06-a997-3f59e455bc03 00:00:01.6298709
Complete In batch: 1, position 4: Runolfsson-Aufderhar_7649501a-88dd-40b7-a91c-eaff104d6c3b 00:00:01.7084094
Complete In batch: 2, position 6: Zemlak Group_73eee9c6-0865-4030-868a-1d4817f549d8 00:00:00.9521255
Complete In batch: 1, position 10: Mueller, Gibson and Homenick_5680a903-ec29-45e4-bd75-67aa733c2e7e 00:00:01.8172318
Complete In batch: 2, position 8: Larkin Inc and Sons_b81f3056-3655-493e-a31c-06dcb63851b9 00:00:00.7017471
Complete In batch: 2, position 1: Kreiger, Dickens and Kozey_1996203b-2673-4c0d-86b9-f6429faa0000 00:00:01.7238026
Complete In batch: 2, position 0: Osinski-Kozey_8262fa25-dd19-491a-bffa-145aaf0e4ff2 00:00:01.7564155
Complete In batch: 2, position 7: Keeling Group_c8b09205-a8fc-44c0-8ca2-7753257148c9 00:00:01.3107621
Complete In batch: 2, position 3: Macejkovic Group_9f62f13b-a15e-475c-96c3-173d4ec6b2cd 00:00:01.7447568
Complete In batch: 2, position 4: Jacobi Inc and Sons_ecdfe8c5-057c-4127-86e7-f91fe7155f1b 00:00:01.7301797
Complete In batch: 2, position 9: Stoltenberg LLC_ab7aa564-da33-4a12-af22-e07d4d4756d8 00:00:01.2336234
Complete In batch: 2, position 11: Rodriguez, Stracke and Kshlerin_559d95d7-1a2b-4595-962e-06e82cd4fe74 00:00:01.4371870
Complete In batch: 2, position 10: D'Amore-Wisozk_b38fc9eb-8800-412c-a507-25f0383d25b6 00:00:01.7164414