Documentation
v1.0.0 Latest
Parallel Processing

Parallel Processing Documentation

Documentation for thread.ParallelProcessing.

Why Parallel Processing?

Parallel Processing is used to speed up the data processing of large datasets by splitting workflow into multiple threads.

Traditionally, this is achieved with a for loop.

my_dataset = [] # Large dataset
def my_data_processor(Data_In) -> Data_Out:
  ...
 
processed_data = []
for data in my_dataset:
  processed_data = my_data_processor(data)
 
print(processed_data) # Processed data

While this is simple and decent enough for a small dataset, this is not ideal for large datasets, especially when runtime matters. By using thread.ParallelProcessing we can split the large dataset into multiple chunks and process each chunk simultaneously.

💡

Parallel Processing is not True Parallel. Learn more here.

How It Works

Determine Thread Count

The number of threads used is determined by the following formula:

thread_count = min(max_threads, len(dataset))

This ensures that the number of threads used will always be less than or equal to the length of the dataset, which prevents redundant threads to be initialized for small datasets.

Chunking

The dataset is split as evenly as possible into chunks, preserving the order of data. Chunks follow the structure:

chunks = [[1, 2, 3, ...], [50, 51, 52, ...], ...]

Let NN be the length of the dataset and let MM be the number of threads.

The individual chunk lengths decrease down the chunk list. The length of each chunk will can be either N/M+0.5+1\lfloor{N/M + 0.5}\rfloor + 1 or N/MN/M.

⚠️

The chunks generated are not generators, meaning they will exist alongside dataset and take up memory. This will change to more memory-efficient generators in v1.0.1.

Importing the class

import thread
thread.ParallelProcessing
 
from thread import ParallelProcessing

Quick Start

There are main 2 ways of initializing a parallel processing object.

On-Demand

You can create a simple process by initializing thread.ParallelProcessing and passing the function and dataset.

def my_data_processor(Data_In) -> Data_Out: ...
 
# Recommended way
my_processor = ParallelProcessing(
  function = my_data_processor,
  dataset = [i in range(0, n)]
)
 
# OR
# Not the recommended way
my_processor = ParallelProcessing(my_data_processor, [i in range(0, n)])

It can be ran by invoking the start() method

my_processor.start()

Decorated Function

You can decorate a function with thread.processor which uses thread.ParallelProcessing. When the decorated function is invoked, it will automatically be ran in a new thread each time and return a thread.ParallelProcessing object.

A decorated function's signature is overwritten, replacing the first argument to require a sequence of the Data_In type.

import thread
 
@thread.processor
def my_target(Data_In, arg1, arg2, *, arg3: bool = False) -> Data_Out: ...
 
dataset: Sequence[type[Data_In]]
worker = my_target(dataset, arg1, arg2, arg3 = True) # thread.ParallelProcessing()

Did you know?

Decorators can take in keyword arguments that change the behavior of the thread.

import thread
 
@thread.processor(name = 'my_thread', suppress_errors = True)
def my_target(): ...

See the full list of arguments here

Initialization

This will cover the required and optional arguments initializing a parallel process.

Required

function (Data_In, *args, **kwargs) -> Data_Out

This should be a function that takes in a data from the dataset with/without overloads and returns Data_Out.

Arguments and keyword arguments excluding the first argument parsed to the function can be parsed through args and kwargs. Data_Out will be written to the generated thread's Thread._returned_value and can be accessed via ParallelProcessing.results or ParallelProcessing.get_return_values().

function can be parsed as the first argument to ParallelProcessing.__init__(), although it is recommended to use only keyword arguments.

import thread
 
thread.ParallelProcessing(lambda x: x + 1, [])
thread.ParallelProcessing(function = lambda x: x + 1, dataset = [])

Best Practices

While you can use a lambda function, it is best to use a normal function for your LSP/Linter to infer types.

from thread import ParallelProcessing
 
worker = ParallelProcessing(function = lambda x: x + 1, dataset = [1, 2, 3])
worker.start()
worker.join()
 
worker.results # This will be inferred as Unknown by your LSP/Linter
from thread import ParallelProcessing
 
def my_target(x: int) -> int:
  return x + 1
 
worker = ParallelProcessing(function = my_target, dataset = [1, 2, 3])
worker.start()
worker.join()
 
worker.results # This will be inferred as a list[int]

dataset Sequence[Data_In]

This should be an interable sequence of data parsed as the first argument to function.

import thread
 
def my_function(x: int) -> int:
  ...
 
thread.ParallelProcessing(function = my_function, dataset = [1, 2, 3])
thread.ParallelProcessing(function = my_function, dataset = ('hi')) # This will be highlighted by your LSP/Linter

Optional

*args / **kwargs (default: None) Any / Mapping[str, Any]

These overloads are parsed to thread.Thread.__init__(), then threading.Thread.__init__().

If kwargs contain an argument named args, then it will automatically be removed from kwargs and joined with ParallelProcessing.__init__().args.

See thread.Thread documentation for more details.
See threading documentation (opens in a new tab) for more details.

Properties

Attributes

These are attributes of thread.ParallelProcessing class.

results List[Data_Out]

This is a list of the data that was returned by the function in thread.ParallelProcessing.

💡
Raised when the thread is still running and cannot invoke the method. You can wait for the thread to terminate by calling `Thread.join()` or check the status with `Thread.status`.

status thread.ThreadStatus

This is the current status of the thread.

These Are The Possible Values

This means that the thread is idle and ready to be ran.

Methods

These are methods of thread.ParallelProcessing class.

start () -> None

This starts the processing.

Simply invoke ParallelProcessing.start() on a ParallelProcessing object.

import thread
 
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.start()
💡

Exceptions Raised

Raised when the thread is still running and cannot invoke the method. You can wait for the thread to terminate by calling `Thread.join()` or check the status with `Thread.status`.

is_alive () -> bool

This indicates whether the threads are still alive.

Simply invoke ParallelProcessing.is_alive() on a ParallelProcessing object.

import thread
 
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.is_alive()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.

get_return_values () -> List[Data_Out]

This halts the current thread execution until the processing completes and returns the value returned by function.

Simply invoke ParallelProcessing.get_return_values() on a thread object.

import thread
 
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.get_return_values()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.

join (timeout: float = None) -> bool

This halts the current thread execution until the ParallelProcessing completes or exceeds the timeout.

A None value for timeout will have the same effect as passing float("inf") as a timeout. The boolean returned by ParallelProcessing.join() is True if the threads complete within the timeout and False if otherwise.

Simply invoke ParallelProcessing.join() on a ParallelProcessing object.

import thread
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.join(5)
worker.join()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.

kill (yielding: bool = False, timeout: float = 5) -> bool

This schedules the threads to be killed.

If yielding is True, it halts the current thread execution until the threads are killed or the timeout is exceeded. Similar to ParallelProcessing.join(), a None value for timeout will have the same effect as passing float("inf") as a timeout.

Simply invoke ParallelProcessing.kill() on a ParallelProcessing object.

import thread
 
worker = thread.ParallelProcessing(function = my_func, dataset = [1, 2, 3])
worker.kill(True, 10)
worker.kill(False)
worker.kill()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.
⚠️

This only schedules the threads to be killed, and does not immediately kill the threads.

Meaning that if function has a long time.wait() call, it will only be killed after it moves onto the next line.