230. Multiprocessing Pipelines

1. Basic Queue Communication Example

A simple example of how to pass data between processes using a queue.

import multiprocessing

def producer(q):
    for i in range(5):
        print(f"Producer: {i}")
        q.put(i)

def consumer(q):
    while True:
        item = q.get()
        if item is None:  # Exit condition
            break
        print(f"Consumer: {item}")

if __name__ == "__main__":
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()

    p1.join()
    q.put(None)  # Signal consumer to exit
    p2.join()

Explanation:

  • The producer puts data into the queue, and the consumer retrieves it.

  • The queue is shared between processes for communication.


2. Using Pipe for Simple Communication

A simple producer-consumer pipeline using a pipe for two-way communication.

Explanation:

  • The Pipe provides a unidirectional channel for communication between two processes.

  • The producer sends data via the pipe, and the consumer receives it.


3. Multiprocessing with Queue and Multiple Consumers

Using a queue with multiple consumers to handle messages concurrently.

Explanation:

  • Multiple consumers can process items concurrently from a shared queue.

  • A "STOP" signal is used to terminate the consumers.


4. Multiprocessing Pipeline with Function Chaining

Building a pipeline where the output of one process becomes the input of the next.

Explanation:

  • This demonstrates chaining multiple processes using pipes, where each task sends its output to the next.


5. Pipe and Queue for Error Handling

Using a pipe and a queue to send errors and data between processes.

Explanation:

  • If an error occurs in one process, it’s communicated to another process via the pipe.


6. Using Queue for Worker Pool

Managing a pool of worker processes that process tasks using a queue.

Explanation:

  • A pool of worker processes consumes tasks from the queue and processes them concurrently.

  • Workers exit when receiving a None value.


7. Multiprocessing with Shared Memory

Using shared memory for communication between processes.

Explanation:

  • Processes share memory using multiprocessing.Array, allowing them to modify the shared data.


8. Using Queue for Task Distribution

Distribute tasks to worker processes using a queue.

Explanation:

  • A set of workers retrieves tasks from the queue and processes them.

  • The STOP signal terminates the workers.


9. Pipe and Queue for Synchronization

Using both a pipe and a queue to synchronize tasks and communicate status.

Explanation:

  • The pipe is used to communicate the status of one task to another, demonstrating synchronization.


10. Using Queue to Implement a Producer-Consumer Pipeline

Implementing a classic producer-consumer pipeline using queues.

Explanation:

  • A producer produces tasks and puts them into a queue.

  • A consumer retrieves tasks from the queue and processes them. The consumer stops when receiving a "STOP" signal.


Last updated