Here are 10 Python snippets that demonstrate concurrency using the concurrent.futures module with ThreadPoolExecutor and ProcessPoolExecutor.
1. Using ThreadPoolExecutor for Basic Task Execution
from concurrent.futures import ThreadPoolExecutordeftask(n):returnf"Task {n} completed."withThreadPoolExecutor(max_workers=3)as executor: results =list(executor.map(task, range(5)))print(results)
2. Using ProcessPoolExecutor for CPU-Intensive Tasks
from concurrent.futures import ProcessPoolExecutorimport mathdefcompute_factorial(n):return math.factorial(n)withProcessPoolExecutor(max_workers=3)as executor: results =list(executor.map(compute_factorial,[5,10,15,20]))print(results)
3. Submitting Tasks Individually
4. Using as_completed to Process Results as They Finish
5. Handling Exceptions in Concurrent Tasks
6. Cancelling Pending Tasks
7. Using ProcessPoolExecutor for Parallel File I/O
8. Mixing ThreadPoolExecutor with I/O Bound Tasks
9. Timing Concurrent Tasks
10. Using ProcessPoolExecutor for Large Dataset Processing
These snippets demonstrate the power and versatility of concurrent.futures for handling concurrency in both I/O-bound tasks (using ThreadPoolExecutor) and CPU-bound tasks (using ProcessPoolExecutor).
from concurrent.futures import ThreadPoolExecutor
def task(n):
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
print(future.result())
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n):
time.sleep(n)
return f"Task {n} completed after {n} seconds."
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(1, 4)]
for future in as_completed(futures):
print(future.result())
from concurrent.futures import ThreadPoolExecutor
def task(n):
if n == 2:
raise ValueError("Error in task!")
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
try:
print(future.result())
except Exception as e:
print(f"Exception: {e}")
from concurrent.futures import ThreadPoolExecutor
def task(n):
import time
time.sleep(n)
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 5)
print("Cancelling task...")
future.cancel()
print("Cancelled:", future.cancelled())
from concurrent.futures import ProcessPoolExecutor
def read_file(file_path):
with open(file_path, "r") as f:
return f.read()
file_paths = ["file1.txt", "file2.txt", "file3.txt"] # Replace with actual files
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(read_file, file_paths))
print(results)
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return f"{url} returned {len(response.content)} bytes."
urls = ["https://example.com", "https://httpbin.org/get", "https://api.github.com"]
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return f"Task {n} done."
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(task, [2, 3, 1]))
end = time.time()
print("Results:", results)
print(f"Total time: {end - start:.2f} seconds")
from concurrent.futures import ProcessPoolExecutor
def square(n):
return n * n
data = list(range(1, 11))
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, data))
print("Squared values:", results)