-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathviews.py
389 lines (294 loc) · 12.5 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
from __future__ import annotations
from django.db import IntegrityError
from django.db.models import ManyToManyField, QuerySet
from django.http import Http404, JsonResponse
from django.utils.translation import gettext_lazy as _
from django.views.generic.edit import FormView
from .app_settings import app_settings
from .constants import ActionChoices
from .forms import ChunkedUploadFileForm
from .models import FileManager
from .typed import (
ArchiveFile,
AudioFile,
BinaryFile,
DocumentFile,
File,
FontFile,
HyperTextFile,
ImageFile,
JSONFile,
MicrosoftExcelFile,
MicrosoftPowerPointFile,
MicrosoftWordFile,
SeparatedFile,
XMLFile,
)
from .utils import get_logger, get_md5_checksum
LOGGER = get_logger(__name__)
class ChunkedUploadView(FormView):
"""Chunked upload view."""
http_method_names = ["get", "post", "delete"]
chunk_size = app_settings.chunk_size
file_class = File
file_status = app_settings.status
form_class = ChunkedUploadFileForm
optimize = app_settings.optimize
permission_classes = app_settings.permission_classes
remove_file_on_update = app_settings.remove_file_on_update
template_name = "django_chunk_file_upload/chunked_upload.html"
upload_to = app_settings.upload_to
def check_object_permissions(self, request):
for permission in self.permission_classes:
permission = permission() if isinstance(permission, type) else permission
if permission.has_permission(request, self):
return True
return False
def has_add_permission(self, request, obj=None) -> bool:
return self.check_object_permissions(request)
def has_view_permission(self, request, obj=None) -> bool:
return self.check_object_permissions(request)
def has_change_permission(self, request, obj=None) -> bool:
return self.check_object_permissions(request)
def has_delete_permission(self, request, obj=None) -> bool:
return self.check_object_permissions(request)
def is_valid(self, form, file_obj) -> bool:
if form.is_valid() and file_obj.is_valid():
return True
return False
def get_model(self):
return self.form_class.Meta.model
def get_instance(self):
opts = dict(
user=self.request.user if self.request.user.is_authenticated else None
)
checksum = self.request.headers.get("x-file-id")
if not checksum and self.request.headers.get("x-file-checksum"):
checksum = self.request.headers["x-file-checksum"]
if checksum:
opts["checksum"] = checksum
if self.request.user.is_superuser:
opts.pop("user")
return self.get_model().objects.filter(**opts).first()
def get_context_data(self, **kwargs):
context = super(ChunkedUploadView, self).get_context_data(**kwargs)
context["chunk_size"] = self.chunk_size
return context
def get(self, request, *args, **kwargs):
return self._get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
"""Override POST method from View."""
action = request.POST.get("action")
if action == ActionChoices.UPDATE:
return self._update(request, *args, **kwargs)
return self._post(request, *args, **kwargs)
def delete(self, request, *args, **kwargs):
"""Override DELETE method from View."""
return self._delete(request, *args, **kwargs)
def _get(self, request, *args, **kwargs):
if self.has_view_permission(request):
return super(ChunkedUploadView, self).get(request, *args, **kwargs)
raise Http404
def _post(self, request, *args, **kwargs):
form, file_obj = self._get_form_file(request, *args, **kwargs)
if self.has_add_permission(self.request) and self.is_valid(form, file_obj):
instance = self.get_instance()
return self.chunked_upload(instance, form, file_obj)
file_obj.message = _("Cannot create file, reason: permission denied.")
return self.ajax_response(None, file_obj, status=400, save=False)
def _update(self, request, *args, **kwargs):
form, file_obj = self._get_form_file(request, *args, **kwargs)
if self.has_change_permission(self.request) and self.is_valid(form, file_obj):
instance = self.get_instance()
if instance:
if self.remove_file_on_update and not instance.metadata.get(
"_remove_file_on_update"
):
LOGGER.info(
"File update request received. File: %s", instance.file.name
)
LOGGER.info("Delete original file: %s", instance.file.name)
instance.metadata["_remove_file_on_update"] = True
instance.file.delete()
instance.eof = False
return self.chunked_upload(instance, form, file_obj)
file_obj.message = _("Not found.")
return self.ajax_response(None, file_obj, status=400, save=False)
file_obj.message = _("Cannot update file, reason: permission denied.")
return self.ajax_response(None, file_obj, status=400, save=False)
def _delete(self, request, *args, **kwargs):
LOGGER.info("File deletion request received.")
form, file_obj = self._get_form_file(request, *args, **kwargs)
if self.has_delete_permission(request):
instance = self.get_instance()
if instance and (
self.request.user.is_superuser or self.request.user == instance.user
):
file_obj.message = _(
"Deleted successfully. File: %s" % instance.file.name
)
instance.file.delete()
instance.delete()
return self.ajax_response(None, file_obj, status=200, save=False)
file_obj.message = _("Cannot delete file, reason: permission denied.")
return self.ajax_response(None, file_obj, status=400, save=False)
def _get_form_file(
self, request, *args, **kwargs
) -> tuple[ChunkedUploadFileForm, File]:
form = self.get_form(self.form_class)
file_obj = self.file_class.from_request(self.request, self.upload_to)
return form, file_obj
def chunked_upload(self, instance, form, file_obj):
"""Chunked upload file
Handle requests from JQuery AJAX, save files to server.
Check MD5 checksum of file when upload is complete, if not correct delete file from server.
Args:
instance (FileManager): FileManager object.
form (ChunkedUploadFileForm): Chunked upload from.
file_obj (File): File metadata instance.
Returns:
JsonResponse: return file metadata data.
"""
if not instance:
LOGGER.info("File update request received. File: %s", file_obj.name)
instance = form.instance
LOGGER.info("Proceed to chunk upload. File: %s", file_obj.name)
if instance.eof:
file_obj.message = _("The file already exists.")
return self.ajax_response(instance, file_obj, 403, save=False)
kwargs, m2m_kwargs = self.get_kwargs(form)
for k, v in kwargs.items():
setattr(instance, k, v)
try:
self.save(instance, file_obj)
self.save_m2m(instance, **m2m_kwargs)
file_obj.write("ab+" if instance.file else "wb+")
except IntegrityError as e:
return self.raise_exception(e, instance, file_obj)
except Exception as e:
return self.raise_exception(e, instance, file_obj)
if file_obj.eof is False:
return self.ajax_response(instance, file_obj)
checksum = get_md5_checksum(file_obj.save_path)
instance.metadata = {}
if checksum != file_obj.checksum:
instance.file.delete()
instance.eof = False
instance.file = None
instance.save()
file_obj.message = _("MD5 checksum does not match, please try again.")
return self.ajax_response(instance, file_obj, 400, save=False)
self.background_task(instance)
if self.optimize:
file_obj.optimize(instance)
return self.ajax_response(instance, file_obj)
def raise_exception(
self, exception: Exception, instance: FileManager, file_obj: File
):
error = str(exception)
file_obj.message = error
if isinstance(exception, IntegrityError):
file_obj.message = _("DB error: %s.") % error
if "UNIQUE" in error:
file_obj.message = _("The file was created by another user.")
return self.ajax_response(instance, file_obj, 400, False)
return self.ajax_response(instance, file_obj, 400)
def ajax_response(
self,
instance: FileManager,
file_obj: File,
status: int = 201,
save: bool = True,
):
if save:
self.save(instance, file_obj)
data = file_obj.to_response()
if instance and instance.eof:
data["url"] = instance.file.url
LOGGER.info(str(file_obj.message))
return JsonResponse(
data=data,
status=status,
)
def get_kwargs(self, form, **kwargs) -> tuple[dict, dict]:
kwargs, m2m_kwargs = {}, {}
m2m_field = {
field.attname: []
for field in form.instance._meta.get_fields()
if isinstance(field, ManyToManyField)
}
if hasattr(form, "cleaned_data"):
for k, v in form.cleaned_data.items():
if k in m2m_field:
m2m_kwargs[k] = v
else:
kwargs[k] = v
return kwargs, m2m_kwargs
def save_m2m(self, instance, **kwargs):
for field, values in kwargs.items():
m2m_field = getattr(instance, field, None)
if m2m_field:
if isinstance(values, QuerySet):
removed = []
added = {obj.id: obj for obj in values}
for obj in m2m_field.all():
if obj.id not in added:
removed.append(obj)
else:
added.pop(obj.id, None)
m2m_field.add(*added.values())
m2m_field.remove(*removed)
else:
m2m_field.clear()
def save(self, instance: FileManager, file_obj: File):
instance.eof = file_obj.eof
instance.file = file_obj.path
instance.type = file_obj.type
instance.user = file_obj.user
instance.status = self.file_status
if not instance.checksum:
instance.checksum = file_obj.checksum
if app_settings.is_metadata_storage:
instance.metadata = file_obj.to_metadata()
instance.save()
def background_task(self, instance):
pass
class ChunkArchiveUploadView(ChunkedUploadView):
"""Chunk Archive Upload View"""
file_class = ArchiveFile
class ChunkAudioUploadView(ChunkedUploadView):
"""Chunk Audio Upload View"""
file_class = AudioFile
class ChunkBinaryUploadView(ChunkedUploadView):
"""Chunk Binary Upload View"""
file_class = BinaryFile
class ChunkDocumentUploadView(ChunkedUploadView):
"""Chunk Document Upload View"""
file_class = DocumentFile
class ChunkFontUploadView(ChunkedUploadView):
"""Chunk Font Upload View"""
file_class = FontFile
class ChunkHyperTextUploadView(ChunkedUploadView):
"""Chunk HyperText Upload View"""
file_class = HyperTextFile
class ChunkImageUploadView(ChunkedUploadView):
"""Chunk Image Upload View"""
file_class = ImageFile
class ChunkJSONUploadView(ChunkedUploadView):
"""Chunk JSON Upload View"""
file_class = JSONFile
class ChunkMicrosoftWordUploadView(ChunkedUploadView):
"""Chunk Microsoft Word Upload View"""
file_class = MicrosoftWordFile
class ChunkMicrosoftPowerPointUploadView(ChunkedUploadView):
"""Chunk Microsoft PowerPoint Upload View"""
file_class = MicrosoftPowerPointFile
class ChunkMicrosoftExcelUploadView(ChunkedUploadView):
"""Chunk Microsoft Excel Upload View"""
file_class = MicrosoftExcelFile
class ChunkSeparatedUploadView(ChunkedUploadView):
"""Chunk Separated Upload View"""
file_class = SeparatedFile
class ChunkXMLUploadView(ChunkedUploadView):
"""Chunk XML Upload View"""
file_class = XMLFile