Parallel Processing Documentation
Documentation for thread.ParallelProcessing
.
Why Parallel Processing?
Parallel Processing is used to speed up the data processing of large datasets by splitting workflow into multiple threads.
Traditionally, this is achieved with a for loop.
my_dataset = [] # Large dataset
def my_data_processor(Data_In) -> Data_Out:
...
processed_data = []
for data in my_dataset:
processed_data = my_data_processor(data)
print(processed_data) # Processed data
While this is simple and decent enough for a small dataset, this is not ideal for large datasets, especially when runtime matters.
By using thread.ParallelProcessing
we can split the large dataset into multiple chunks and process each chunk simultaneously.
Parallel Processing is not True Parallel. Learn more here.
How It Works
Determine Thread Count
The number of threads used is determined by the following formula:
thread_count = min(max_threads, len(dataset))
This ensures that the number of threads used will always be less than or equal to the length of the dataset, which prevents redundant threads to be initialized for small datasets.
Chunking
The dataset is split as evenly as possible into chunks, preserving the order of data. Chunks follow the structure:
chunks = [[1, 2, 3, ...], [50, 51, 52, ...], ...]
Let be the length of the dataset and let be the number of threads.
The individual chunk lengths decrease down the chunk list. The length of each chunk will can be either or .
The chunks generated are not generators, meaning they will exist alongside dataset
and take up memory.
This will change to more memory-efficient generators in v1.0.1
.
Importing the class
import thread
thread.ParallelProcessing
from thread import ParallelProcessing
Quick Start
There are main 2 ways of initializing a parallel processing object.
On-Demand
You can create a simple process by initializing thread.ParallelProcessing
and passing the function
and dataset
.
def my_data_processor(Data_In) -> Data_Out: ...
# Recommended way
my_processor = ParallelProcessing(
function = my_data_processor,
dataset = [i in range(0, n)]
)
# OR
# Not the recommended way
my_processor = ParallelProcessing(my_data_processor, [i in range(0, n)])
It can be ran by invoking the start()
method
my_processor.start()
Decorated Function
You can decorate a function with thread.processor
which uses thread.ParallelProcessing
.
When the decorated function is invoked, it will automatically be ran in a new thread each time and return a thread.ParallelProcessing
object.
A decorated function's signature is overwritten, replacing the first argument to require a sequence of the Data_In
type.
import thread
@thread.processor
def my_target(Data_In, arg1, arg2, *, arg3: bool = False) -> Data_Out: ...
dataset: Sequence[type[Data_In]]
worker = my_target(dataset, arg1, arg2, arg3 = True) # thread.ParallelProcessing()
Did you know?
Decorators can take in keyword arguments that change the behavior of the thread.
import thread
@thread.processor(name = 'my_thread', suppress_errors = True)
def my_target(): ...
See the full list of arguments here
Initialization
This will cover the required and optional arguments initializing a parallel process.
Required
function
(Data_In, *args, **kwargs) -> Data_Out
This should be a function that takes in a data from the dataset
with/without overloads and returns Data_Out.
Arguments and keyword arguments excluding the first argument parsed to the function
can be parsed through args
and kwargs
.
Data_Out
will be written to the generated thread's Thread._returned_value
and can be accessed via ParallelProcessing.results
or ParallelProcessing.get_return_values()
.
function
can be parsed as the first argument to ParallelProcessing.__init__()
, although it is recommended to use only keyword arguments.
import thread
thread.ParallelProcessing(lambda x: x + 1, [])
thread.ParallelProcessing(function = lambda x: x + 1, dataset = [])
Best Practices
While you can use a lambda function, it is best to use a normal function for your LSP/Linter to infer types.
from thread import ParallelProcessing
worker = ParallelProcessing(function = lambda x: x + 1, dataset = [1, 2, 3])
worker.start()
worker.join()
worker.results # This will be inferred as Unknown by your LSP/Linter
from thread import ParallelProcessing
def my_target(x: int) -> int:
return x + 1
worker = ParallelProcessing(function = my_target, dataset = [1, 2, 3])
worker.start()
worker.join()
worker.results # This will be inferred as a list[int]
dataset
Sequence[Data_In]
This should be an interable sequence of data parsed as the first argument to function
.
import thread
def my_function(x: int) -> int:
...
thread.ParallelProcessing(function = my_function, dataset = [1, 2, 3])
thread.ParallelProcessing(function = my_function, dataset = ('hi')) # This will be highlighted by your LSP/Linter
Optional
*args / **kwargs
(default: None)
Any / Mapping[str, Any]
These overloads are parsed to thread.Thread.__init__()
, then threading.Thread.__init__()
.
If kwargs contain an argument named args
, then it will automatically be removed from kwargs and joined with ParallelProcessing.__init__().args
.
See thread.Thread
documentation for more details.
See threading
documentation (opens in a new tab) for more details.
Properties
Attributes
These are attributes of thread.ParallelProcessing
class.
results
List[Data_Out]
This is a list of the data that was returned by the function
in thread.ParallelProcessing
.
status
thread.ThreadStatus
This is the current status of the thread.
These Are The Possible Values
Methods
These are methods of thread.ParallelProcessing
class.
start
() -> None
This starts the processing.
Simply invoke ParallelProcessing.start()
on a ParallelProcessing object.
import thread
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.start()
Exceptions Raised
is_alive
() -> bool
This indicates whether the threads are still alive.
Simply invoke ParallelProcessing.is_alive()
on a ParallelProcessing object.
import thread
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.is_alive()
Exceptions Raised
get_return_values
() -> List[Data_Out]
This halts the current thread execution until the processing completes and returns the value returned by function
.
Simply invoke ParallelProcessing.get_return_values()
on a thread object.
import thread
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.get_return_values()
Exceptions Raised
join
(timeout: float = None) -> bool
This halts the current thread execution until the ParallelProcessing
completes or exceeds the timeout.
A None value for timeout will have the same effect as passing float("inf")
as a timeout.
The boolean returned by ParallelProcessing.join()
is True
if the threads complete within the timeout and False
if otherwise.
Simply invoke ParallelProcessing.join()
on a ParallelProcessing object.
import thread
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.join(5)
worker.join()
Exceptions Raised
kill
(yielding: bool = False, timeout: float = 5) -> bool
This schedules the threads to be killed.
If yielding is True, it halts the current thread execution until the threads are killed or the timeout is exceeded.
Similar to ParallelProcessing.join()
, a None value for timeout will have the same effect as passing float("inf")
as a timeout.
Simply invoke ParallelProcessing.kill()
on a ParallelProcessing object.
import thread
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.kill(True, 10)
worker.kill(False)
worker.kill()
Exceptions Raised
This only schedules the threads to be killed, and does not immediately kill the threads.
Meaning that if function
has a long time.wait()
call, it will only be killed after it moves onto the next line.