python / expert
Snippet
Django Channels WebSocket Consumer mit Raum-Multiplexing
Dieser WebSocket-Consumer implementiert Echtzeit-Zusammenarbeit mit raumbasierter Multiplexing. Er verarbeitet mehrere Nachrichtentypen durch ein Handler-Wörterbuch-Muster, verwendet Channel-Layer für Gruppen-Broadcasting und nutzt Cache für Teilnehmer-Verfolgung. Der Consumer verwaltet asynchrone Datenbankoperationen korrekt mit database_sync_to_async und pflegt den WebSocket-Verbindungszustand.
snippet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import jsonfrom channels.generic.websocket import AsyncWebsocketConsumerfrom channels.db import database_sync_to_asyncfrom django.core.cache import cacheclass RealtimeCollaborationConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = self.scope['url_route']['kwargs']['room_id']self.room_group_name = f"collab_{self.room_name}"self.user = self.scope['user']await self.channel_layer.group_add(self.room_group_name, self.channel_name)await self.accept()participants = await self.get_participants()await self.send(json.dumps({'type': 'connection_established','participants': participants,'user_count': len(participants)}))async def disconnect(self, close_code):await self.channel_layer.group_discard(self.room_group_name, self.channel_name)await self.update_participants_cache('remove')await self.broadcast_user_count()async def receive(self, text_data):data = json.loads(text_data)message_type = data.get('type')handlers = {'cursor_update': self.handle_cursor_update,'document_edit': self.handle_document_edit,'presence_heartbeat': self.handle_heartbeat}handler = handlers.get(message_type)if handler:await handler(data)@database_sync_to_asyncdef get_participants(self):cache_key = f"room_{self.room_name}_participants"return cache.get(cache_key, [])async def handle_cursor_update(self, data):cursor_data = {'user_id': self.user.id,'username': self.user.username,'position': data.get('position'),'color': data.get('color', '#3B82F6')}await self.channel_layer.group_send(self.room_group_name,{'type': 'cursor_broadcast', 'cursor': cursor_data})async def document_edit(self, event):await self.send(json.dumps({'type': 'document_update','content': event['content'],'editor': event['editor']}))
django
Erklärung
1
class RealtimeCollaborationConsumer(AsyncWebsocketConsumer)
Erbt von AsyncWebsocketConsumer für vollständige asynchrone WebSocket-Behandlung
2
self.room_group_name = f"collab_{self.room_name}"
Erstellt einen Channel-Layer-Gruppennamen zum Broadcasten von Nachrichten an alle Raumteilnehmer
3
handlers = {'cursor_update': self.handle_cursor_update, ...}
Wörterbuch verknüpft Nachrichtentypen mit Handler-Methoden für sauberes erweiterbares Routing
4
@database_sync_to_async def get_participants(self)
Decorator konvertiert synchrone ORM-Operationen zu async und verhindert Blockierung der Event-Loop
5
await self.channel_layer.group_send(self.room_group_name, {...})
Broadcastet Nachricht an alle Gruppenmitglieder über das Channel-Layer Pub/Sub-System