I have a table of some entities.
For each of those entity I need to listen in the corresponding kafka topic to get data.
If no data flows for a minute I'll stop the listening process.
I can only spin n number of processes.
Each process will accommodate only one entity at a time. Once it hits the idle timeout it stops.
The master process will maintain we have n number of processes running. It'll check this every 30 seconds.
It checks whether n number of processes running else it'll fill the spaces.
We have an order in which the entities are fetched. Ordered by the listening end time. That will give entities that are given chance way hack.
But the catch master process waits for 30s at this time if all processes hits idle timeout. That time goes to waste.
How can we implement this better? Any existing implementation are there?
Let me suggest an improved approach to this problem that addresses the idle time issue and provides better resource utilization.
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional
from asyncio import Queue, Task
import logging
from confluent_kafka import Consumer, KafkaError
from collections import defaultdict
@dataclass
class Entity:
id: str
topic: str
last_processed: datetime
class KafkaConsumerWorker:
def __init__(self, consumer_config: dict):
self.consumer_config = consumer_config
self.running = False
self.current_entity: Optional[Entity] = None
self.last_message_time: Optional[datetime] = None
async def process_messages(self, entity: Entity, message_queue: Queue) -> None:
self.running = True
self.current_entity = entity
consumer = Consumer(self.consumer_config)
consumer.subscribe([entity.topic])
try:
while self.running:
msg = consumer.poll(1.0) # 1 second timeout
if msg is None:
# Check idle timeout
if self.last_message_time and \\\\
(datetime.now() - self.last_message_time).total_seconds() > 60:
break
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f"Kafka error: {msg.error()}")
break
# Process message
self.last_message_time = datetime.now()
await message_queue.put((entity.id, msg.value()))
finally:
consumer.close()
self.running = False
self.current_entity = None
self.last_message_time = None
class ConsumerPool:
def __init__(self, max_workers: int, consumer_config: dict):
self.max_workers = max_workers
self.consumer_config = consumer_config
self.workers: List[KafkaConsumerWorker] = []
self.active_tasks: Dict[str, Task] = {}
self.message_queue: Queue = Queue()
self.entity_queue: Queue = Queue()
self.last_processed: Dict[str, datetime] = defaultdict(lambda: datetime.min)
async def start(self):
# Start the pool manager and message processor
await asyncio.gather(
self.manage_pool(),
self.process_messages()
)
async def manage_pool(self):
while True:
current_workers = len(self.active_tasks)
# Clean up completed tasks
completed_tasks = [
entity_id for entity_id, task in self.active_tasks.items()
if task.done()
]
for entity_id in completed_tasks:
self.last_processed[entity_id] = datetime.now()
del self.active_tasks[entity_id]
# Start new workers if capacity available
while current_workers < self.max_workers:
try:
# Try to get a new entity immediately
entity = await asyncio.wait_for(self.entity_queue.get(), timeout=0.1)
# Create and start new worker
worker = KafkaConsumerWorker(self.consumer_config)
task = asyncio.create_task(
worker.process_messages(entity, self.message_queue)
)
self.active_tasks[entity.id] = task
current_workers += 1
except asyncio.TimeoutError:
# No entities waiting, continue monitoring
break
# Short sleep to prevent busy waiting
await asyncio.sleep(0.1)
async def process_messages(self):
while True:
entity_id, message = await self.message_queue.get()
# Process message here
# This is where you'd implement your message handling logic
logging.info(f"Processed message from entity {entity_id}")
def add_entity(self, entity: Entity):
self.entity_queue.put_nowait(entity)
# Example usage
async def main():
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest'
}
pool = ConsumerPool(max_workers=3, consumer_config=consumer_config)
# Add some example entities
entities = [
Entity('entity1', 'topic1', datetime.now()),
Entity('entity2', 'topic2', datetime.now()),
Entity('entity3', 'topic3', datetime.now()),
]
for entity in entities:
pool.add_entity(entity)
await pool.start()
if __name__ == "__main__":
asyncio.run(main())
I've designed an improved solution that addresses the idle time issue using an event-driven architecture. Here are the key improvements:
To further improve this implementation, you could: