Throttling API requests is an essential technique for managing rate limits imposed by APIs and ensuring that you don't overwhelm the API server. Python's asyncio and threading modules can be used effectively to throttle or limit the rate of API requests.
1. Using asyncio for Throttling API Requests
Here's an example of using asyncio.Semaphore to limit the number of concurrent API requests:
import asyncioimport aiohttpimport time# Define the maximum number of concurrent requestsCONCURRENT_REQUESTS=5asyncdeffetch(session,url):asyncwith session.get(url)as response:returnawait response.text()asyncdefthrottled_fetch(sem,session,url):asyncwith sem:# Acquire a semaphoreprint(f"Fetching {url}")returnawaitfetch(session, url)asyncdefmain(): urls =[f"https://jsonplaceholder.typicode.com/posts/{i}"for i inrange(1,21)] sem = asyncio.Semaphore(CONCURRENT_REQUESTS)# Throttle to 5 concurrent requestsasyncwith aiohttp.ClientSession()as session: tasks =[throttled_fetch(sem, session, url)for url in urls] results =await asyncio.gather(*tasks)print(f"Fetched {len(results)} responses")start = time.time()asyncio.run(main())print(f"Completed in {time.time()- start:.2f} seconds")
Explanation:
asyncio.Semaphore limits the number of concurrent requests.
aiohttp.ClientSession is used for making asynchronous HTTP requests.
Only CONCURRENT_REQUESTS requests run at a time, preventing overload.
2. Using asyncio.sleep for Time-Based Throttling
You can use asyncio.sleep to introduce delays between API calls to comply with rate limits:
Explanation:
asyncio.sleep ensures that there is at least a 1-second delay between consecutive requests.
3. Using threading for Throttling API Requests
Here's an example using a threading.Semaphore to limit the number of threads making API requests:
Explanation:
threading.Semaphore ensures that only CONCURRENT_REQUESTS threads run simultaneously.
This approach is useful when working with blocking I/O operations like requests.
4. Using Token Bucket Algorithm with asyncio
A token bucket algorithm can control the rate of API requests precisely:
Explanation:
The TokenBucket class refills tokens at a constant rate (rate).
Requests are throttled to match the rate and burst capacity.
5. Rate-Limiting with aiolimiter
The aiolimiter library simplifies rate-limiting.
Explanation:
AsyncLimiter limits the number of requests per second.
It automatically handles rate-limiting logic.
Summary of Techniques:
asyncio.Semaphore: Limits the number of concurrent requests.
asyncio.sleep: Adds delays between requests.
threading.Semaphore: Manages concurrent threads.
Token Bucket Algorithm: Precisely controls request rates and bursts.
aiolimiter: Simplifies rate-limiting with a pre-built solution.
Choose the technique that best suits your use case and API constraints!
import asyncio
import aiohttp
async def fetch_with_delay(session, url, delay):
print(f"Fetching {url}")
async with session.get(url) as response:
await asyncio.sleep(delay) # Introduce a delay between requests
return await response.text()
async def main():
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]
async with aiohttp.ClientSession() as session:
for url in urls:
result = await fetch_with_delay(session, url, delay=1) # 1-second delay
print(f"Fetched: {url}")
asyncio.run(main())
import threading
import requests
import time
CONCURRENT_REQUESTS = 3
semaphore = threading.Semaphore(CONCURRENT_REQUESTS)
def fetch(url):
with semaphore: # Limit the number of threads making requests
print(f"Fetching {url}")
response = requests.get(url)
print(f"Fetched {url} with status {response.status_code}")
def main():
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 16)]
threads = [threading.Thread(target=fetch, args=(url,)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
start = time.time()
main()
print(f"Completed in {time.time() - start:.2f} seconds")
import asyncio
import aiohttp
import time
class TokenBucket:
def __init__(self, rate, capacity):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.time()
async def acquire(self):
while self.tokens < 1:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
await asyncio.sleep(0.1)
self.tokens -= 1
async def fetch(bucket, session, url):
await bucket.acquire() # Wait until a token is available
print(f"Fetching {url}")
async with session.get(url) as response:
return await response.text()
async def main():
bucket = TokenBucket(rate=1, capacity=5) # 1 request per second, burst of 5
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(bucket, session, url) for url in urls]
await asyncio.gather(*tasks)
start = time.time()
asyncio.run(main())
print(f"Completed in {time.time() - start:.2f} seconds")
import asyncio
import aiohttp
from aiolimiter import AsyncLimiter
async def fetch(limiter, session, url):
async with limiter: # Acquire a slot in the limiter
print(f"Fetching {url}")
async with session.get(url) as response:
return await response.text()
async def main():
limiter = AsyncLimiter(5, 1) # 5 requests per second
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 21)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(limiter, session, url) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())