module Parallel

Overview

Core module for parallel processing functionality

Defined in:

parallel/context.cr
parallel/core.cr
parallel/version.cr

Constant Summary

PARALLEL_CONTEXT = Fiber::ExecutionContext::MultiThreaded.new("parallel-workers", Fiber::ExecutionContext.default_workers_count)

Global ExecutionContext for parallel processing Reusing a single context is recommended for performance

VERSION = {{ (`shards version /home/runner/work/parallel/parallel/src/parallel`).chomp.stringify }}

Class Method Summary

Class Method Detail

def self.adaptive_chunk_size(collection_size : Int32) : Int32 #

Determines optimal chunk size for adaptive chunking


[View source]
def self.check_empty_and_size(collection) : Tuple(Bool, Int32) #

Unified empty check for collections Returns {is_empty, estimated_size}


[View source]
def self.handle_each_errors_safe(errors_channel, completed_channel, expected_count : Int32) : Array(Exception) #

Handles parallel each operation with error collection (new behavior - collect all errors)


[View source]
def self.log_fiber_exception(ex : Exception, task_info : String | Nil = nil) #

Logs fiber exceptions with context information


[View source]
def self.parallel_each(collection_size : Int32, context : Fiber::ExecutionContext::MultiThreaded, chunk_size : Int32, &block : Int32 -> _) #

Common parallel each implementation This method handles the core logic for both Enumerable and Indexable versions


[View source]
def self.parallel_each_enumerable(enumerable : Enumerable(T), context : Fiber::ExecutionContext::MultiThreaded, chunk_size : Int32, &block : T -> _) forall T #

Lazy parallel each implementation for Enumerable collections This method processes elements without materializing the entire collection Uses lock-free work-stealing approach for better performance


[View source]
def self.parallel_map_enumerable(enumerable : Enumerable(T), context : Fiber::ExecutionContext::MultiThreaded, chunk_size : Int32, &block : T -> U) forall T, U #

Common parallel map implementation for Enumerable collections using lazy evaluation This method handles the core logic for Enumerable versions without creating intermediate arrays Uses lock-free work-stealing approach for better performance


[View source]
def self.parallel_map_indexable(collection_size : Int32, context : Fiber::ExecutionContext::MultiThreaded, chunk_size : Int32, &block : Int32 -> U) forall U #

Common parallel map implementation for Indexable collections This method handles the core logic for Indexable versions using direct index access


[View source]
def self.validate_chunk_size(chunk : Int32 | Nil, collection_size : Int32) : Int32 #

Validates and normalizes chunk size parameter Returns the validated chunk size or raises an exception for invalid values


[View source]