python / expert
Snippet
Django Channels WebSocket Consumer with Room Multiplexing
This WebSocket consumer implements real-time collaboration with room-based multiplexing. It handles multiple message types through a handler dictionary pattern, uses channel layers for group broadcasting, and leverages cache for participant tracking. The consumer properly manages async database operations with database_sync_to_async and maintains WebSocket connection state.
snippet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import jsonfrom channels.generic.websocket import AsyncWebsocketConsumerfrom channels.db import database_sync_to_asyncfrom django.core.cache import cacheclass RealtimeCollaborationConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = self.scope['url_route']['kwargs']['room_id']self.room_group_name = f"collab_{self.room_name}"self.user = self.scope['user']await self.channel_layer.group_add(self.room_group_name, self.channel_name)await self.accept()participants = await self.get_participants()await self.send(json.dumps({'type': 'connection_established','participants': participants,'user_count': len(participants)}))async def disconnect(self, close_code):await self.channel_layer.group_discard(self.room_group_name, self.channel_name)await self.update_participants_cache('remove')await self.broadcast_user_count()async def receive(self, text_data):data = json.loads(text_data)message_type = data.get('type')handlers = {'cursor_update': self.handle_cursor_update,'document_edit': self.handle_document_edit,'presence_heartbeat': self.handle_heartbeat}handler = handlers.get(message_type)if handler:await handler(data)@database_sync_to_asyncdef get_participants(self):cache_key = f"room_{self.room_name}_participants"return cache.get(cache_key, [])async def handle_cursor_update(self, data):cursor_data = {'user_id': self.user.id,'username': self.user.username,'position': data.get('position'),'color': data.get('color', '#3B82F6')}await self.channel_layer.group_send(self.room_group_name,{'type': 'cursor_broadcast', 'cursor': cursor_data})async def document_edit(self, event):await self.send(json.dumps({'type': 'document_update','content': event['content'],'editor': event['editor']}))
django
Breakdown
1
class RealtimeCollaborationConsumer(AsyncWebsocketConsumer)
Inherits from AsyncWebsocketConsumer for full async WebSocket handling support
2
self.room_group_name = f"collab_{self.room_name}"
Creates a channel layer group name for broadcasting messages to all room participants
3
handlers = {'cursor_update': self.handle_cursor_update, ...}
Dictionary maps message types to handler methods, enabling clean extensible message routing
4
@database_sync_to_async def get_participants(self)
Decorator converts sync ORM operations to async, preventing blocking in the event loop
5
await self.channel_layer.group_send(self.room_group_name, {...})
Broadcasts message to all group members using the channel layer pub/sub system