JDK Spliterator with Parallel Processing

Jackie
7 min readJun 11, 2021
I have created a fixed size Spliterator to split the collection into fixed size. https://github.com/openjdk/jdk/pull/2907 It's working as expected. With a collection of size greater than the threshold, it would trySplit and generate a new Spliterator. And if the stream is parallel, it would pass the spliterator to different threads.

however, when trying to group the elements by thread, below code is not always working

Map<String, List<Integer>> partition = new ForkJoinPool(10).submit( () ->StreamSupport.stream(new FixedSizeSpliterator<>(numbers.toArray(new Integer[0]), 5), true) .collect(Collectors.groupingBy(i-> Thread.currentThread().getName(),Collectors.toList()))).join(); partition.entrySet().stream().forEach(en ->log.warn("check key {} vs value {}", en.getKey(), en.getValue() ));

most of the time, it would equally divided the elements into 10 threads. however, there are once in a while, it would put everything into a single thread.

after a lot of investigation, it turns out this could be due to the fact one thread is fast enough to handle the whole processing, which is just doing a group by operation here.

so the solution is to just signal the pool to invoke another thread.

Map<String, List<Integer>> partition2 = new ForkJoinPool(10).submit( () ->StreamSupport.stream(new FixedSizeSpliterator<>(numbers.toArray(new Integer[0]), 5), true) .parallel() .map(i -> { try { Thread.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); } return i; }) .collect(Collectors.groupingBy(i-> Thread.currentThread().getName(),Collectors.toList()))).join(); partition2.entrySet().stream().forEach(en ->log.warn("check key2 {} vs value {}", en.getKey(), en.getValue() ));

output

17:56:03.074 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 6000 till 12000 17:56:03.079 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 9000 till 12000 17:56:03.080 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 7501 till 9001 17:56:03.080 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 7502 to 8251 till 9001 17:56:03.083 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 6751 till 7502 17:56:03.084 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 10500 till 12000 17:56:03.084 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 3000 till 6001 17:56:03.084 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 10501 to 11250 till 12000 17:56:03.085 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 4501 till 6001 17:56:03.085 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 3751 till 4502 17:56:03.085 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 9751 till 10501 17:56:03.087 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 1500 till 3001 17:56:03.087 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 1501 to 2251 till 3001 17:56:03.088 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 750 till 1501 17:56:03.087 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 4502 to 5251 till 6001 17:56:03.089 [main] WARN com.zg.d.TestSplitter - check key ForkJoinPool-2-worker-2 vs value [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 17:56:03.091 [main] WARN com.zg.d.TestSplitter - check key ForkJoinPool-2-worker-9 vs value [751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805, 806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 17:56:03.117 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 6000 till 12000 17:56:03.118 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 9000 till 12000 17:56:03.118 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 7501 till 9001 17:56:03.118 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 7502 to 8251 till 9001 17:56:03.120 [ForkJoinPool-3-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 3000 till 6001 17:56:03.120 [ForkJoinPool-3-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 4501 till 6001 17:56:03.120 [ForkJoinPool-3-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 3751 till 4502 17:56:03.120 [ForkJoinPool-3-worker-11] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 10500 till 12000 17:56:03.121 [ForkJoinPool-3-worker-11] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 10501 to 11250 till 12000 17:56:03.121 [ForkJoinPool-3-worker-4] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 1500 till 3001 17:56:03.122 [ForkJoinPool-3-worker-4] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 1501 to 2251 till 3001 17:56:03.123 [ForkJoinPool-3-worker-6] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 9751 till 10501 17:56:03.123 [ForkJoinPool-3-worker-15] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 6751 till 7502 17:56:03.124 [ForkJoinPool-3-worker-13] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 4502 to 5251 till 6001 17:56:03.124 [ForkJoinPool-3-worker-8] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 750 till 1501 17:56:03.160 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-15 vs value [6752, 6753, 6754, 6755, 6756, 6757, 6758, 6759, 6760, 6761, 6762, 6763, 6764, 6765, 6766, 6767, 6768, 6769, 6770, 6771, 6772, 6773, 6774, 6775, 6776, 6777, 6778, 6779, 6780, 6781, 6782, 6783, 6784, 6785, 6786, 6787, 6788, 6789, 6790, 6791, 6792, 6793, 6794, 6795, 6796, 6797, 6798, 6799, 6800, 6801, 6802, 6803, 6804, 6805, 6806, 6807, 6808, 6809, 6810, 6811, 6812, 6813, 6814, 6815, 6816, 17:56:03.161 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-9 vs value [7502, 7503, 7504, 7505, 7506, 7507, 7508, 7509, 7510, 7511, 7512, 7513, 7514, 7515, 7516, 7517, 7518, 7519, 7520, 7521, 7522, 7523, 7524, 7525, 7526, 7527, 7528, 7529, 7530, 7531, 7532, 7533, 7534, 7535, 7536, 7537, 7538, 7539, 7540, 7541, 7542, 7543, 7544, 7545, 7546, 7547, 7548, 7549, 7550, 7551, 7552, 7553, 7554, 7555, 7556, 7557, 7558, 7559, 7560, 7561, 7562, 7563, 7564, 7565, 7566, 17:56:03.162 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-11 vs value [10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509, 10510, 10511, 10512, 10513, 10514, 10515, 10516, 10517, 10518, 10519, 10520, 10521, 10522, 10523, 10524, 10525, 10526, 10527, 10528, 10529, 10530, 10531, 10532, 10533, 10534, 10535, 10536, 10537, 10538, 10539, 10540, 10541, 10542, 10543, 10544, 10545, 10546, 10547, 10548, 10549, 10550, 10551, 10552, 10553, 10554, 10555, 10556, 17:56:03.163 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-8 vs value [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 17:56:03.163 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-13 vs value [5252, 5253, 5254, 5255, 5256, 5257, 5258, 5259, 5260, 5261, 5262, 5263, 5264, 5265, 5266, 5267, 5268, 5269, 5270, 5271, 5272, 5273, 5274, 5275, 5276, 5277, 5278, 5279, 5280, 5281, 5282, 5283, 5284, 5285, 5286, 5287, 5288, 5289, 5290, 5291, 5292, 5293, 5294, 5295, 5296, 5297, 5298, 5299, 5300, 5301, 5302, 5303, 5304, 5305, 5306, 5307, 5308, 5309, 5310, 5311, 5312, 5313, 5314, 5315, 5316, 5317 17:56:03.164 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-4 vs value [1501, 1502, 1503, 1504, 1505, 1506, 1507, 1508, 1509, 1510, 1511, 1512, 1513, 1514, 1515, 1516, 1517, 1518, 1519, 1520, 1521, 1522, 1523, 1524, 1525, 1526, 1527, 1528, 1529, 1530, 1531, 1532, 1533, 1534, 1535, 1536, 1537, 1538, 1539, 1540, 1541, 1542, 1543, 1544, 1545, 1546, 1547, 1548, 1549, 1550, 1551, 1552, 1553, 1554, 1555, 1556, 1557, 1558, 1559, 1560, 1561, 1562, 1563, 1564, 1565, 1566, 17:56:03.164 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-6 vs value [9752, 9753, 9754, 9755, 9756, 9757, 9758, 9759, 9760, 9761, 9762, 9763, 9764, 9765, 9766, 9767, 9768, 9769, 9770, 9771, 9772, 9773, 9774, 9775, 9776, 9777, 9778, 9779, 9780, 9781, 9782, 9783, 9784, 9785, 9786, 9787, 9788, 9789, 9790, 9791, 9792, 9793, 9794, 9795, 9796, 9797, 9798, 9799, 9800, 9801, 9802, 9803, 9804, 9805, 9806, 9807, 9808, 9809, 9810, 9811, 9812, 9813, 9814, 9815, 9816, 9817, 17:56:03.164 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-10 vs value [4502, 4503, 4504, 4505, 4506, 4507, 4508, 4509, 4510, 4511, 4512, 4513, 4514, 4515, 4516, 4517, 4518, 4519, 4520, 4521, 4522, 4523, 4524, 4525, 4526, 4527, 4528, 4529, 4530, 4531, 4532, 4533, 4534, 4535, 4536, 4537, 4538, 4539, 4540, 4541, 4542, 4543, 4544, 4545, 4546, 4547, 4548, 4549, 4550, 4551, 4552, 4553, 4554, 4555, 4556, 4557, 4558, 4559, 4560, 4561, 4562, 4563, 4564, 4565, 4566, 4567 17:56:03.165 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-1 vs value [3001, 3002, 3003, 3004, 3005, 3006, 3007, 3008, 3009, 3010, 3011, 3012, 3013, 3014, 3015, 3016, 3017, 3018, 3019, 3020, 3021, 3022, 3023, 3024, 3025, 3026, 3027, 3028, 3029, 3030, 3031, 3032, 3033, 3034, 3035, 3036, 3037, 3038, 3039, 3040, 3041, 3042, 3043, 3044, 3045, 3046, 3047, 3048, 3049, 3050, 3051, 3052, 3053, 3054, 3055, 3056, 3057, 3058, 3059, 3060, 3061, 3062, 3063, 3064, 3065, 3066, 17:56:03.165 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-2 vs value [2252, 2253, 2254, 2255, 2256, 2257, 2258, 2259, 2260, 2261, 2262, 2263, 2264, 2265, 2266, 2267, 2268, 2269, 2270, 2271, 2272, 2273, 2274, 2275, 2276, 2277, 2278, 2279, 2280, 2281, 2282, 2283, 2284, 2285, 2286, 2287, 2288, 2289, 2290, 2291, 2292, 2293, 2294, 2295, 2296, 2297, 2298, 2299, 2300, 2301, 2302, 2303, 2304, 2305, 2306, 2307, 2308, 2309, 2310, 2311, 2312, 2313, 2314, 2315, 2316, 2317,

Originally published at https://lwpro2.dev on June 11, 2021.

--

--