Skip to main content
fast by design

Parallel Streams and the Fork/Join Overhead

9 min read Chapter 39 of 90

Parallel Streams and the Fork/Join Overhead

stream().parallel() is the easiest way to parallelize computation in Java. Add .parallel() and the stream splits work across the common ForkJoinPool. On a 4-core machine, a CPU-bound operation should run ~4x faster. In practice, the speedup depends on data size, operation cost, data source splittability, and contention with other parallel work.

The content platform uses parallel streams in three places:

  1. Batch article embedding: 500,000 articles, 5ms per embedding computation. Heavy compute, large dataset.
  2. Request-time filtering: 10,000 candidates, 0.1us per filter check. Light compute, small dataset.
  3. Analytics aggregation: 1,000,000 view records, 0.5us per aggregation step. Medium compute, large dataset.

Parallel streams help for case 1, hurt for case 2, and depend on contention for case 3.

The NQ Model: When Parallelism Pays

The NQ model (proposed by Doug Lea) says: parallelism pays when N * Q > 10,000, where:

  • N = number of elements
  • Q = cost per element in “units of work” (roughly: number of CPU instructions)

For trivial operations (Q = 1, like summing integers), you need N > 10,000 elements. For expensive operations (Q = 10,000, like computing ML embeddings), you need N > 1 element. The threshold accounts for the overhead of task splitting, thread dispatch, and result merging.

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(2)
@State(Scope.Benchmark)
public class ParallelStreamCrossoverBenchmark {

    @Param({"100", "1000", "10000", "100000", "1000000"})
    int size;

    int[] data;

    @Setup(Level.Trial)
    public void setup() {
        data = ThreadLocalRandom.current().ints(size).toArray();
    }

    // Q ~= 1: trivial operation
    @Benchmark
    public long sequentialSum() {
        return IntStream.of(data).asLongStream().sum();
    }

    @Benchmark
    public long parallelSum() {
        return IntStream.of(data).parallel().asLongStream().sum();
    }

    // Q ~= 100: moderate operation (simulate with math)
    @Benchmark
    public double sequentialModerate() {
        return IntStream.of(data)
            .mapToDouble(v -> Math.sin(v) * Math.cos(v) + Math.sqrt(Math.abs(v)))
            .sum();
    }

    @Benchmark
    public double parallelModerate() {
        return IntStream.of(data).parallel()
            .mapToDouble(v -> Math.sin(v) * Math.cos(v) + Math.sqrt(Math.abs(v)))
            .sum();
    }
}

Results on 4-core machine:

SizeSequential Sum (us)Parallel Sum (us)Sum SpeedupSeq Moderate (us)Par Moderate (us)Mod Speedup
1000.05120.004x1.2140.086x
1,0000.4140.029x12160.75x
10,0004150.27x120482.5x
100,00038221.7x1,2003803.2x
1,000,0003801153.3x12,0003,4003.5x

For the trivial sum (Q~1): parallel beats sequential only above 100,000 elements. The ForkJoin overhead (~12us for task splitting and thread dispatch) dominates at small sizes.

For the moderate operation (Q~100): parallel beats sequential at 10,000 elements. The higher per-element cost amortizes the fork-join overhead sooner.

Content platform mapping:

  • Batch embedding (Q~50,000, N=500,000): NQ = 25 billion. Parallel wins massively.
  • Request filtering (Q~1, N=10,000): NQ = 10,000. Borderline. Sequential is safer.
  • Analytics aggregation (Q~50, N=1,000,000): NQ = 50 million. Parallel wins if pool is not saturated.

Splitting Cost: Data Source Matters

Parallel streams split the data source into chunks using Spliterator.trySplit(). Different data sources split differently:

SourceSplit ComplexitySplit Quality
int[] / long[] / double[]O(1)Perfect (halves the array)
ArrayList<T>O(1)Perfect (index-based split)
HashSet<T>O(n)Good (bucket-based split)
TreeSet<T>O(log n)Good (subtree split)
LinkedList<T>O(n)Terrible (must traverse to find midpoint)
Stream.iterate(...)Cannot splitNone (sequential only)
BufferedReader.lines()Cannot split wellPoor (must read ahead)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(2)
@State(Scope.Benchmark)
public class SplittingCostBenchmark {

    @Param({"100000"})
    int size;

    int[] primitiveArray;
    ArrayList<Integer> arrayList;
    LinkedList<Integer> linkedList;
    HashSet<Integer> hashSet;

    @Setup(Level.Trial)
    public void setup() {
        primitiveArray = ThreadLocalRandom.current().ints(size).toArray();
        arrayList = new ArrayList<>(size);
        linkedList = new LinkedList<>();
        hashSet = new HashSet<>(size * 2);
        for (int v : primitiveArray) {
            arrayList.add(v);
            linkedList.add(v);
            hashSet.add(v);
        }
    }

    @Benchmark
    public long parallelPrimitiveArray() {
        return IntStream.of(primitiveArray).parallel()
            .mapToLong(v -> v * v)
            .sum();                                  // FAST: O(1) split
    }

    @Benchmark
    public long parallelArrayList() {
        return arrayList.parallelStream()
            .mapToLong(v -> (long) v * v)
            .sum();                                  // FAST: O(1) split
    }

    @Benchmark
    public long parallelLinkedList() {
        return linkedList.parallelStream()
            .mapToLong(v -> (long) v * v)
            .sum();                                  // SLOW: O(n) split
    }

    @Benchmark
    public long parallelHashSet() {
        return hashSet.parallelStream()
            .mapToLong(v -> (long) v * v)
            .sum();                                  // Medium: O(n) split
    }
}
SourceParallel (us)Sequential (us)Speedup
int[]22683.1x
ArrayList38822.2x
HashSet851201.4x
LinkedList210920.44x (slower!)

LinkedList parallel stream is 2.3x slower than sequential because trySplit() traverses the list to find the midpoint, doing O(n) work before any actual computation begins. This traversal also pollutes the cache with linked-list node accesses, slowing subsequent element processing.

Rule: Never use .parallelStream() on a LinkedList. Convert to an array or ArrayList first if parallelism is needed.

ForkJoinPool Saturation

The common ForkJoinPool has Runtime.getRuntime().availableProcessors() - 1 worker threads (plus the calling thread). On a 4-core machine, that is 3 workers + 1 caller = 4 threads. All parallel streams, CompletableFuture default async operations, and parallel sorts share this pool.

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 3, time = 2)
@Fork(1)
@State(Scope.Benchmark)
@Threads(1)
public class PoolSaturationBenchmark {

    @Param({"1000000"})
    int size;

    @Param({"0", "3", "6"})
    int concurrentStreams;

    int[] data;
    ExecutorService submitter;

    @Setup(Level.Trial)
    public void setup() {
        data = ThreadLocalRandom.current().ints(size).toArray();
        submitter = Executors.newFixedThreadPool(concurrentStreams);
    }

    @Setup(Level.Invocation)
    public void launchConcurrent() {
        // Launch background parallel streams that compete for pool threads
        for (int i = 0; i < concurrentStreams; i++) {
            int[] backgroundData = data.clone();
            submitter.submit(() -> {
                IntStream.of(backgroundData).parallel()
                    .mapToDouble(Math::sqrt)
                    .sum();
            });
        }
    }

    @Benchmark
    public double parallelUnderContention() {
        return IntStream.of(data).parallel()
            .mapToDouble(Math::sqrt)
            .sum();
    }

    @TearDown(Level.Trial)
    public void tearDown() {
        submitter.shutdownNow();
    }
}
Concurrent StreamsTime (ms)vs Uncontended
0 (alone)1.8baseline
34.22.3x slower
67.54.2x slower

With 6 concurrent parallel streams on a 4-core machine, each stream gets less than one thread on average. The parallelism benefit disappears, and the splitting/merging overhead remains.

Custom ForkJoinPool for Isolation

The solution: run latency-sensitive parallel work on a dedicated pool, isolating it from background batch processing:

public class IsolatedParallelExecution {

    // Dedicated pool for request-serving parallel operations
    private static final ForkJoinPool REQUEST_POOL = new ForkJoinPool(
        2,  // Two threads for request work
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
        null,
        false
    );

    // Dedicated pool for batch processing
    private static final ForkJoinPool BATCH_POOL = new ForkJoinPool(
        Runtime.getRuntime().availableProcessors() - 2, // Remaining cores
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
        null,
        false
    );

    public double[] computeScoresParallel(double[] features) {
        try {
            return REQUEST_POOL.submit(() ->
                IntStream.range(0, features.length).parallel()
                    .mapToDouble(i -> computeScore(features[i]))
                    .toArray()
            ).join();                                // Runs on REQUEST_POOL
        } catch (Exception e) {
            // Fallback to sequential on pool rejection
            return IntStream.range(0, features.length)
                .mapToDouble(i -> computeScore(features[i]))
                .toArray();
        }
    }

    public void batchReindex(Article[] articles) {
        BATCH_POOL.submit(() ->
            Arrays.stream(articles).parallel()
                .forEach(this::reindexArticle)
        ).join();                                    // Runs on BATCH_POOL
    }

    private double computeScore(double feature) { return Math.sqrt(feature); }
    private void reindexArticle(Article a) { /* ... */ }
    record Article() {}
}

Trade-off: Custom pools reduce maximum throughput for each task type (fewer threads per pool) but provide predictable latency. The request pool always has 2 threads available, regardless of batch processing load.

The Parallel Stream Decision Flowchart

  1. Is N * Q > 10,000? No: use sequential. The fork-join overhead will not amortize.
  2. Is the data source splittable in O(1)? No (LinkedList, Stream.iterate): use sequential or convert to array first.
  3. Is the operation stateless and non-interfering? No: parallel execution will produce incorrect results.
  4. Is the common ForkJoinPool under contention? Yes: use a custom pool or use sequential.
  5. Is the result order-dependent? Yes: use forEachOrdered (loses some parallelism) or collect to a list.
  6. All yes? Use parallel.

For the content platform:

OperationNQNQSplittableContention RiskDecision
Batch embedding500K50,00025BYes (array)Low (isolated)Parallel (batch pool)
Request filter10K110KYes (array)HighSequential
Analytics agg1M5050MYes (array)MediumParallel (request pool)
Article sort10K10100KYes (array)HighSequential (TimSort)

The safest default is sequential. Add .parallel() only after benchmarking proves it helps for the specific data size, operation cost, and contention profile of your deployment.

Parallel Collector Overhead

Parallel streams with complex collectors (groupingBy, partitioningBy) add synchronization overhead:

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(2)
@State(Scope.Benchmark)
public class ParallelCollectorBenchmark {

    @Param({"100000"})
    int size;

    record ViewEvent(String articleId, String category, int duration) {}

    List<ViewEvent> events;
    String[] categories = {"java", "python", "rust", "go", "typescript"};

    @Setup(Level.Trial)
    public void setup() {
        ThreadLocalRandom rng = ThreadLocalRandom.current();
        events = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            events.add(new ViewEvent(
                "article-" + rng.nextInt(10_000),
                categories[rng.nextInt(categories.length)],
                rng.nextInt(300)
            ));
        }
    }

    @Benchmark
    public Map<String, Long> sequentialGroupCount() {
        return events.stream()
            .collect(Collectors.groupingBy(ViewEvent::category,
                Collectors.counting()));
    }

    @Benchmark
    public Map<String, Long> parallelGroupCount() {
        return events.parallelStream()
            .collect(Collectors.groupingBy(ViewEvent::category,
                Collectors.counting()));
    }

    @Benchmark
    public Map<String, Long> manualGroupCount() {
        // FAST: Pre-sized map, no stream overhead
        Map<String, Long> counts = HashMap.newHashMap(categories.length);
        for (ViewEvent e : events) {
            counts.merge(e.category(), 1L, Long::sum);
        }
        return counts;
    }
}
Approach100K events (us)
Sequential stream groupingBy4,200
Parallel stream groupingBy3,800
Manual loop + merge1,400

Parallel groupingBy provides only 1.1x speedup because groupingByConcurrent must merge partial maps from each thread. The manual loop is 3x faster than either stream approach because it avoids lambda dispatch, collector framework overhead, and map merging.

Lesson: Parallel streams shine for map/filter/reduce on large datasets with expensive per-element operations. For grouping, counting, and aggregation, a manual loop with a pre-sized map is almost always faster than any stream variant. The parallel stream’s merging overhead often exceeds the parallelism benefit for these operations.