Skip to content

Latest commit

 

History

History
891 lines (623 loc) · 26.8 KB

29.Multiprocessing.md

File metadata and controls

891 lines (623 loc) · 26.8 KB

Lesson 29: Multiprocessing

"Turbo booster of Python, and nothing to say else"

Topics

  1. Multiprocessing vs Multithreading
  2. multiprocessing
  3. Inter-process Communication (IPC)
  4. Synchronisation
  5. Process Pooling
  6. Production Approaches
  7. Practice
  8. Quiz
  9. Homework

Multiprocessing in Python is a powerful paradigm that allows developers to achieve true parallelism in their applications, particularly in CPU-bound tasks.

1. Multiprocessing vs Multithreading

Let's finally compare and get the detailed analysis of both mechanisms.

Multiprocessing Multithreading
Concurrency Model Parallel execution of processes Concurrent execution of threads within a single process
Memory Space Separate memory space for each process Shared memory space among threads
Use Cases CPU-bound tasks requiring full CPU utilization I/O-bound tasks, or when tasks need to share memory and resources
GIL Impact Bypasses the GIL, allowing full use of multiple CPU cores Limited by the GIL, not suitable for CPU-bound tasks
Resource Usage Higher, due to separate memory space and process management overhead Lower, threads are lighter than processes
Data Safety Processes do not share memory, reducing data corruption risks Shared memory can lead to data corruption if not properly managed

1.1 Use Cases

Multiprocessing shines in scenarios where tasks are CPU-bound and can be performed independently. Such as Data Processing and Computing, though sometimes may be good for Web as well.

Let's dive into the practice, but don't forget to refer to the table we saw in the first lesson while designing your application.

2. multiprocessing

2.1 Creating Processes

To create a new process, you instantiate a Process object and call its start() method. Each Process requires a target function to execute and can also accept arguments for the target function.

Example:

from multiprocessing import Process

def greet(name):
    print(f"Hello, {name}!")

if __name__ == '__main__':
    p = Process(target=greet, args=('World',))
    p.start()
    p.join()  # Wait for the process to finish

Output

Hello, World!

2.2 Process Management

The Process class represents an activity that is run in a separate process. The class has methods like start(), join(), and terminate() to manage the process lifecycle. You can also check if the process is still running using is_alive().

Example

from multiprocessing import Process
import time

def worker():
    """Function to be executed by the process"""
    print("Worker process is running...")
    time.sleep(2)
    print("Worker process is finishing...")

if __name__ == "__main__":
    # Create a Process
    process = Process(target=worker)
    
    # Start the process
    process.start()
    print("Process started.")
    
    # Check if the process is alive
    print(f"Is process alive? {process.is_alive()}")
    
    # Wait for the process to complete
    process.join()
    print("Process joined.")
    
    # Check again if the process is alive
    print(f"Is process alive after join? {process.is_alive()}")

    # Try to terminate the process (has no effect if already finished)
    process.terminate()
    print("Process terminated.")

Output

Process started.
Is process alive? True
Worker process is running...
Worker process is finishing...
Process joined.
Is process alive after join? False
Process terminated.

3. Inter-process Communication (IPC)

In multiprocessing, since each process operates in its own memory space, direct data sharing like in multithreading (with shared memory) is not possible. Python's multiprocessing module provides several ways to enable IPC. Pipes and Queues are the most common.

3.1 Pipes

A Pipe can be used for two-way communication between processes. Data in a pipe is buffered, which means it's held in a temporary storage area until the recipient retrieves it.

Example

from multiprocessing import Process, Pipe

def sender(pipe):
    """Function to send data through the pipe."""
    pipe.send(["Hello from the sender!"])
    pipe.close()

def receiver(pipe):
    """Function to receive data through the pipe."""
    print(f"Receiver got message: {pipe.recv()}")

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    
    # Process for sending data
    p1 = Process(target=sender, args=(parent_conn,))
    
    # Process for receiving data
    p2 = Process(target=receiver, args=(child_conn,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

Output

Receiver got message: ['Hello from the sender!']

3.2 Queues

Queues are thread and process-safe, making them ideal for IPC. They can be used to exchange data between processes.

Example

from multiprocessing import Process, Queue

def writer(queue):
    """Function to write data to the queue."""
    queue.put("Hello from the writer!")

def reader(queue):
    """Function to read data from the queue."""
    print(f"Reader received: {queue.get()}")

if __name__ == "__main__":
    queue = Queue()
    
    # Process for writing to the queue
    p1 = Process(target=writer, args=(queue,))
    
    # Process for reading from the queue
    p2 = Process(target=reader, args=(queue,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

Output

Reader received: Hello from the writer!

Explanation

  • Pipes are best for simple, unidirectional communication between two processes.
  • Queues are more flexible and suitable for complex data exchange between multiple producers and consumers.

3.3 Real-world App

Objective: Create a data processing application that needs to perform several tasks: fetching data from a database, processing this data (e.g., filtering, aggregation), and finally saving the results to a new location. This scenario is common in data analytics and ETL (Extract, Transform, Load) operations.

graph LR
    A[Data Fetcher Process] -->|Pipe| B[Data Processor Process]
    B -->|Queue| C[Data Saver Process]
    A -->|Fetches data from source| D[Database/Data Source]
    C -->|Saves processed data| E[Database/File System]
Loading

Using multiprocessing we can create a streamlined pipeline that efficiently processes large datasets by dividing the work across multiple processes.

3.3.1 Application Requirements

  1. Data Fetcher Process: Connects to a database or data source, fetches data, and sends it through a Pipe to the next stage.
  2. Data Processor Process: Receives raw data through a Pipe, performs processing (filtering, aggregating), and places processed data into a Queue.
  3. Data Saver Process: Takes processed data from the Queue and saves it to a database, file system, or another storage system.

3.3.2 Implementation

from multiprocessing import Process, Pipe, Queue
import time

def fetch_data(sender_conn):
    """Simulates fetching data and sends it to the processor."""
    for i in range(3):  # Simulate fetching 3 data batches
        data = f"Data batch {i}"
        print(f"Fetching: {data}")
        sender_conn.send(data)
        time.sleep(1)  # Simulate delay
    sender_conn.close()

def process_data(receiver_conn, data_queue):
    """Receives data, processes it, and puts it into a queue."""
    while True:
        data = receiver_conn.recv()
        processed_data = f"{data} - Processed"
        print(f"Processing: {processed_data}")
        data_queue.put(processed_data)

def save_data(data_queue):
    """Takes data from the queue and simulates saving it."""
    while True:
        data = data_queue.get()
        print(f"Saving: {data}")
        time.sleep(1)  # Simulate save delay

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    data_queue = Queue()

    # Creating processes
    fetcher = Process(target=fetch_data, args=(parent_conn,))
    processor = Process(target=process_data, args=(child_conn, data_queue))
    saver = Process(target=save_data, args=(data_queue,))

    # Starting processes
    fetcher.start()
    processor.start()
    saver.start()

    # Joining fetcher and processor (saver would ideally run indefinitely or have a stop condition)
    fetcher.join()
    processor.join()

Output

Fetching: Data batch 0
Processing: Data batch 0 - Processed
Saving: Data batch 0 - Processed

Fetching: Data batch 1
Processing: Data batch 1 - Processed
Saving: Data batch 1 - Processed

Fetching: Data batch 2
Processing: Data batch 2 - Processed
Saving: Data batch 2 - Processed

Explanation

Basically, here, we created a 3 separate processes, which are responsible for different operations, we can check them in task manager on Windows or using the following command on Linux/ Mac OS.

ps -u $(whoami) | grep python

Output

134616 pts/0    00:00:00 python3
134618 pts/0    00:00:00 python3
134619 pts/0    00:00:00 python3

NOTE: In a real-world scenario, you would have mechanisms to gracefully stop the saver process (e.g., using an Event or a sentinel value in the Queue).

Doing processing in parallel is much faster then synchronous approach we used before. You can try this out yourself, with a same code provided. And compare execution time.

4. Synchronisation

Same as multithreading, multiprocessing module provides several synchronization primitives, such as Lock, Semaphore, Event, and Condition, which help prevent data corruption and ensure data consistency.

4.1 Lock

A Lock is a synchronization primitive that can be locked or unlocked. It is used to prevent simultaneous access to a shared resource by multiple processes, ensuring that only one process can access the resource at a time.

Example

Synchronize File Access for multiple processesL

from multiprocessing import Process, Lock

def write_to_file(lock, file_path, data):
    with lock:
        with open(file_path, 'a') as f:
            f.write(data + '\n')

if __name__ == '__main__':
    lock = Lock()
    file_path = 'shared_file.txt'
    processes = [Process(target=write_to_file, args=(lock, file_path, f'Process {i}')) for i in range(5)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

Output

alt text

4.2 Semaphore

A Semaphore is a more general version of a Lock. While a Lock allows only one process to access a certain section of code at a time, a Semaphore allows a fixed number of processes to do so.

Example

from multiprocessing import Process, Semaphore
import time

def access_database(semaphore, process_id):
    with semaphore:
        print(f"Process {process_id} is accessing the database")
        # Simulate database access
        time.sleep(1)
        print(f"Process {process_id} is done accessing the database")

if __name__ == '__main__':
    semaphore = Semaphore(2)  # Allows up to 2 processes to access the critical section
    processes = [Process(target=access_database, args=(semaphore, i)) for i in range(3)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

Output

Process 0 is accessing the database
Process 1 is accessing the database
Process 0 is done accessing the database
Process 2 is accessing the database
Process 1 is done accessing the database
Process 2 is done accessing the database

4.3 Event

An Event is a synchronization primitive that can be used to notify one or more processes that something has happened. Processes can wait for an event to be set before proceeding.

Example

from multiprocessing import Process, Event
import time

def wait_for_event(event):
    print("Waiting for the event to be set...")
    event.wait()
    print("Event has been set, continuing with the task")

if __name__ == '__main__':
    event = Event()
    process = Process(target=wait_for_event, args=(event,))
    process.start()

    # Simulate doing something
    time.sleep(2)
    event.set()

    process.join()

Output

Waiting for the event to be set...
Event has been set, continuing with the task

4.4 Condition

A Condition is a synchronization primitive that allows one process to wait for a condition to be met, while allowing other processes to notify it that the condition has been met.

Example

We can model Producer-Consumer pattern, same what was described in a lesson about threading as well, add a Condition.

from multiprocessing import Process, Condition, Array

def producer(condition, shared_array):
    with condition:
        print("Producing items")
        shared_array[0] = 99  # Simulating item production
        condition.notify()

def consumer(condition, shared_array):
    with condition:
        condition.wait()  # Wait for an item to be produced
        print(f"Consumed item: {shared_array[0]}")

if __name__ == '__main__':
    condition = Condition()
    shared_array = Array('i', [0])  # Shared array between processes

    p = Process(target=producer, args=(condition, shared_array))
    c = Process(target=consumer, args=(condition, shared_array))

    c.start()
    p.start()

    c.join()
    p.join()

These synchronization helping to avoid race conditions and ensure data consistency across concurrent processes.

Don't hesitate to use them, and again, be extremly careful with shared resources!

5. Process Pooling

5.1 Pool

The Pool class allows you to create a pool of worker processes that can execute tasks in parallel. It manages the available processes and assigns tasks to them as they become available.

Example:

from multiprocessing import Pool

def square(number):
    return number * number

if __name__ == '__main__':
    # Create a pool of 4 worker processes
    with Pool(processes=4) as pool:
        results = pool.map(square, range(10))
        print(results)

Output

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

5.2 Applying Pool

The Pool class provides several methods to parallelize tasks. The map and apply methods are particularly useful for distributing tasks among the pool's worker processes.

5.3 map

The map function applies a given function to each item of an iterable (e.g., list) and collects the results. It blocks until the result is ready.

results = pool.map(square, range(10))
print(results)

5.4 apply

The apply function applies a given function with arguments to a worker process. Unlike map, apply is used for a single invocation and blocks until the result is ready.

result = pool.apply(square, (10,))
print(result)

5.5 Worker Processes

The Pool class also provides ways to manage the worker processes, allowing for asynchronous task execution and callback handling.

Example

def cube(number):
    return number * number * number

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        async_result = pool.apply_async(cube, (3,))
        # Do something else
        print(async_result.get())  # Blocks until result is ready

Output

27

This approach enables non-blocking task submission to the pool, allowing the main program to continue executing while the task is processed in the background.

6. Production Approaches

From what we have seen already, we working on a low level with processes, in real world cases, generally programmers prefer to use Manager and ProcessPoolExecutor classes to let the Python handle synchronisation and reduce potential bugs.

6.1 Manager

The Manager class in the multiprocessing module allows creating shared objects that can be accessed and modified by different processes.

This is especially useful for creating complex data structures shared across processes.

Example

from multiprocessing import Manager, Process

def modify_shared_list(shared_list):
    shared_list.append("Hello")
    shared_list.append("World")

if __name__ == '__main__':
    with Manager() as manager:
        # Create a shared list
        shared_list = manager.list()
        
        # Create a process that modifies the shared list
        p = Process(target=modify_shared_list, args=(shared_list,))
        p.start()
        p.join()
        
        # Access the modified list
        print(shared_list)

Output

['Hello', 'World']

6.2 ProcessPoolExecutor

The concurrent.futures module provides a high-level interface for asynchronously executing callables with ProcessPoolExecutor.

It simplifies the management of process pools, offering an alternative to the Pool class for executing tasks in parallel.

Example

from concurrent.futures import ProcessPoolExecutor

def power(base, exponent):
    return base ** exponent

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(power, 2, exp) for exp in range(10)]
        for future in futures:
            print(future.result())

Output

1
2
4
8
16
32
64
128
256
512

7. Practice

7.1 Images Converter

  • Goal: Convert a set of color images to grayscale.
  • Approach: Use multiprocessing to parallelize the processing, speeding up the operation on systems with multiple CPU cores.
  • Tools: The Pillow library for image processing and Python's multiprocessing module for parallel execution.

Step 1: Install Pillow

First, ensure you have the Pillow library installed, as it's used for image processing:

pip install Pillow

Step 2: Image Processing Function

This function reads an image, converts it to grayscale, and saves it.

from PIL import Image
import os

def process_image(image_path):
    # Open the image file
    with Image.open(image_path) as img:
        # Convert the image to grayscale
        grayscale_img = img.convert("L")
        
        # Save the processed image
        base, ext = os.path.splitext(image_path)
        new_image_path = f"{base}_grayscale{ext}"
        grayscale_img.save(new_image_path)
        
        print(f"Processed {image_path} -> {new_image_path}")

Step 3: Parallelize Image Processing

Use the multiprocessing.Pool class to process multiple images in parallel.

from multiprocessing import Pool
import glob

def main():
    # List of image files to process
    image_files = glob.glob('path/to/images/*.jpg')  # Update path as needed
    
    # Create a pool of worker processes
    with Pool(processes=4) as pool:
        pool.map(process_image, image_files)

if __name__ == "__main__":
    main()

Step 4: Put it alltogether

from PIL import Image
import os
from multiprocessing import Pool
import glob

def process_image(image_path):
    # Open the image file
    with Image.open(image_path) as img:
        # Convert the image to grayscale
        grayscale_img = img.convert("L")
        
        # Save the processed image
        base, ext = os.path.splitext(image_path)
        new_image_path = f"{base}_grayscale{ext}"
        grayscale_img.save(new_image_path)
        
        print(f"Processed {image_path} -> {new_image_path}")

def main():
    # List of image files to process
    image_files = glob.glob('images/*.jpg') 
    
    # Create a pool of worker processes
    with Pool(processes=4) as pool:
        pool.map(process_image, image_files)

if __name__ == "__main__":
    main()

Output

Processed images/image1_grayscale.jpg -> images/image1_grayscale_grayscale.jpg
Processed images/image3.jpg -> images/image3_grayscale.jpg
Processed images/image2.jpg -> images/image2_grayscale.jpg
Processed images/image1.jpg -> images/image1_grayscale.jpg

Explanation:

  • The program searches for JPEG images in the specified directory.
  • It then creates a pool of worker processes (adjust the processes parameter based on your CPU).
  • Each image is processed in parallel by converting it to grayscale and saving the result.

7.2 Data Processing Pipeline

  • Goal: Filter and aggregate data from a large dataset in parallel.
  • Approach: Use multiprocessing to distribute data chunks to different processes for filtering based on a condition, then aggregate the results.
  • Dataset: For simplicity, we'll simulate a dataset using random numbers or a CSV file if you have one handy.

Step 1: Prepare the Dataset

For demonstration purposes, let's create a function that generates a large dataset of random integers.

import random

def generate_dataset(size):
    return [random.randint(1, 100) for _ in range(size)]

Step 2: Define the Filter and Aggregate Functions

Define a function to filter the dataset based on a condition (e.g., finding even numbers) and a function to aggregate the filtered results (e.g., calculating the sum).

def filter_data(data_chunk):
    return [num for num in data_chunk if num % 2 == 0]

def aggregate_data(filtered_data_list):
    return sum(filtered_data_list)

Step 3: Distribute Tasks

Use the multiprocessing module to distribute the dataset chunks across multiple processes for filtering and then aggregate the results.

from multiprocessing import Pool

def process_data_chunk(data_chunk):
    """Process a chunk of data: filter and return the aggregation."""
    filtered_data = filter_data(data_chunk)
    return aggregate_data(filtered_data)

if __name__ == "__main__":
    dataset = generate_dataset(1000000)  # Generate a dataset with 1,000,000 items
    chunks = [dataset[i:i + 10000] for i in range(0, len(dataset), 10000)]  # Divide into chunks of 10,000 items
    
    with Pool(processes=4) as pool:
        results = pool.map(process_data_chunk, chunks)
    
    total_aggregate = sum(results)
    print(f"Total aggregate of filtered data: {total_aggregate}")

Step 4: Put it altogether

from multiprocessing import Pool
import random

def generate_dataset(size):
    return [random.randint(1, 100) for _ in range(size)]

def filter_data(data_chunk):
    """Filter data based on a condition. Here, we find even numbers."""
    return [num for num in data_chunk if num % 2 == 0]

def aggregate_data(filtered_data_list):
    """Aggregate the filtered data. Here, we calculate the sum."""
    return sum(filtered_data_list)

def process_data_chunk(data_chunk):
    """Process a chunk of data: filter and return the aggregation."""
    filtered_data = filter_data(data_chunk)
    return aggregate_data(filtered_data)

if __name__ == "__main__":
    dataset = generate_dataset(1000000)
    chunks = [dataset[i:i + 10000] for i in range(0, len(dataset), 10000)]
    
    with Pool(processes=4) as pool:
        results = pool.map(process_data_chunk, chunks)
    
    total_aggregate = sum(results)
    print(f"Total: {total_aggregate}")

Output

Total: 25564084

Happy multiprocessing and multithreading! Find what suits best for your based on the table from the first lesson! And we step into asyncio!

9. Quiz

Question 1:

What is the primary advantage of using multiprocessing over multithreading in Python?

A) Multiprocessing can bypass the Global Interpreter Lock (GIL) and fully utilize multiple CPU cores.
B) Multiprocessing uses less memory than multithreading.
C) Multiprocessing is simpler to implement for CPU-bound tasks.
D) Multiprocessing allows for easier data sharing between threads.

Question 2:

Which Python module is most suitable for CPU-bound tasks?

A) threading
B) multiprocessing
C) asyncio
D) subprocess

Question 3:

What is the purpose of using a Pool in the multiprocessing module?

A) To manage a group of threads for performing I/O-bound tasks.
B) To manage a group of processes for performing CPU-bound tasks in parallel.
C) To lock resources shared among various threads.
D) To provide a way to execute asynchronous tasks.

Question 4:

When using multiprocessing, how is data typically passed between processes?

A) Through shared memory locations.
B) Via global variables.
C) Using inter-process communication mechanisms like Pipes and Queues.
D) Data cannot be passed between processes; each process must access data independently.

Question 5:

What is true about Python's Global Interpreter Lock (GIL) with regard to multiprocessing?

A) The GIL prevents the effective parallel execution of threads and is not a concern in multiprocessing.
B) The GIL enhances the performance of multiprocessing by optimizing memory management.
C) Multiprocessing requires the GIL to manage memory between processes efficiently.
D) The GIL synchronizes the execution of multiple processes on multi-core systems.

Question 6:

Which of the following is NOT a valid way to manage process synchronization in Python's multiprocessing environment?

A) Using a Lock to ensure that only one process accesses a critical section of code at a time.
B) Employing a Semaphore to limit the number of processes that can access a particular resource.
C) Utilizing a Condition variable to control workflow between processes.
D) Implementing a Goto statement to control process flow.

Question 7:

In the context of multiprocessing, what is a Manager used for?

A) To terminate processes that are not responding.
B) To control access to a shared database.
C) To create shared data objects that can be modified by multiple processes.
D) To monitor the CPU usage of each process.

10. Homework

Objective: Develop a Python script that uses multiprocessing to search for a keyword in a collection of text files simultaneously.

Requirements

  • Before developing, prepare a directory with several large text files.
  • Scan the directory for text files.
  • Each process should search one text file for a given keyword and record the lines where the keyword appears.
  • Collect and aggregate results from all processes to show which files contain the keyword and the line numbers.
  • Use Pool to distribute the search tasks across multiple processes.
  • Ensure synchronization when aggregating results.

Additionaly, you can write it in a synchronous manner and compare the time exectuion, you will be impressed!