python / expert
Snippet
Django File Upload with Streaming and Memory-Efficient Processing
This snippet demonstrates memory-efficient file upload handling in Django using streaming and chunked processing. The ChunkedFileUpload class computes checksums without loading entire files into memory, while StreamingCSVProcessor processes CSV data row-by-row. Key features include configurable chunk sizes, duplicate detection via cache, and progress tracking for long-running operations.
snippet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import hashlibfrom django.core.files.uploadedfile import UploadedFilefrom django.core.cache import cachefrom myapp.validators import FileValidatorclass ChunkedFileUpload:CHUNK_SIZE = 8192MAX_MEMORY_SIZE = 10 * 1024 * 1024def __init__(self, uploaded_file: UploadedFile):self.file = uploaded_fileself.checksum = Noneself.processed_chunks = 0def compute_checksum(self):hasher = hashlib.sha256()for chunk in self.file.chunks(self.CHUNK_SIZE):hasher.update(chunk)self.checksum = hasher.hexdigest()return self.checksumdef process_in_chunks(self, processor_func):results = []for chunk in self.file.chunks(self.CHUNK_SIZE):result = processor_func(chunk)results.append(result)self.processed_chunks += 1return resultsdef is_memory_efficient(self):return self.file.size > self.MAX_MEMORY_SIZEclass StreamingCSVProcessor:def __init__(self, file_upload: ChunkedFileUpload):self.uploader = file_uploadself.validator = FileValidator()self.processed_rows = 0def process_csv_stream(self, destination):import csvfrom django.db import connectionwith connection.cursor() as cursor:for chunk in self.uploader.file.chunks(ChunkedFileUpload.CHUNK_SIZE):decoded_chunk = chunk.decode('utf-8')lines = decoded_chunk.strip().split('\n')for line in lines:if not line.strip():continuereader = csv.reader([line])row = next(reader)if self.validator.validate_row(row):self.insert_row_bulk_style(cursor, row)self.processed_rows += 1if self.processed_rows % 1000 == 0:cache.set('csv_progress', self.processed_rows, 30)def insert_row_bulk_style(self, cursor, row):values = [self.validator.sanitize(v) for v in row]cursor.execute('INSERT INTO imported_data (data) VALUES (%s)',[','.join(values)])def handle_large_upload(request):uploaded_file = request.FILES['file']uploader = ChunkedFileUpload(uploaded_file)if uploader.is_memory_efficient():checksum = uploader.compute_checksum()existing = cache.get(f'upload_checksum_{checksum}')if existing:return {'status': 'duplicate', 'checksum': checksum}processor = StreamingCSVProcessor(uploader)processor.process_csv_stream(None)cache.set(f'upload_checksum_{checksum}', True, 3600)return {'status': 'processed', 'rows': processor.processed_rows}
django
Breakdown
1
for chunk in self.file.chunks(self.CHUNK_SIZE)
Iterate over file in fixed-size chunks avoiding full file load into memory for large files
2
hasher.update(chunk)
Update hash accumulator with each chunk enabling SHA256 computation on arbitrarily large files
3
is_memory_efficient()
Check if file size exceeds threshold to determine streaming vs buffered processing approach
4
cache.set('csv_progress', self.processed_rows, 30)
Store progress in cache with TTL for cross-request state sharing and monitoring
5
cursor.execute('INSERT INTO imported_data...', [','.join(values)])
Execute raw SQL for maximum insert performance during bulk streaming import operations