Skip to content

Commit 7d82b25

Browse files
committed
Write logic refactor.
- No longer have to read whole file into memory in order to update. - We're not done, yet. This still needs to be tested, and we're still investigating what could be causign the 512-byte error that people are seeing.
1 parent 360f6cb commit 7d82b25

File tree

3 files changed

+82
-158
lines changed

3 files changed

+82
-158
lines changed

gdrivefs/gdfs/gdfuse.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -610,8 +610,6 @@ def truncate(self, filepath, length, fh=None):
610610
"with ID (%d) (truncate)." % (fh))
611611
raise FuseOSError(EIO)
612612

613-
opened_file.reset_state()
614-
615613
entry_id = opened_file.entry_id
616614
cache = EntryCache.get_instance().cache
617615

@@ -622,6 +620,8 @@ def truncate(self, filepath, length, fh=None):
622620
"ID [%s] for truncate with FH." %
623621
(entry_id))
624622
raise
623+
624+
opened_file.truncate(length)
625625
else:
626626
(entry, path, filename) = get_entry_or_raise(filepath)
627627

@@ -631,6 +631,9 @@ def truncate(self, filepath, length, fh=None):
631631
_logger.exception("Could not truncate entry [%s]." % (entry))
632632
raise FuseOSError(EIO)
633633

634+
# TODO(dustin): It would be a lot quicker if we truncate our temporary file
635+
# here, and make sure its mtime matches.
636+
634637
# We don't need to update our internal representation of the file (just
635638
# our file-handle and its related buffering).
636639

gdrivefs/gdfs/opened_file.py

Lines changed: 74 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22
import resource
33
import re
4+
import time
5+
import os
46

57
from errno import *
68
from threading import Lock, RLock
@@ -17,7 +19,6 @@
1719
from gdrivefs.cache.volume import PathRelations, EntryCache, path_resolver, \
1820
CLAUSE_ID, CLAUSE_ENTRY
1921
from gdrivefs.gdtool.drive import drive_proxy
20-
from gdrivefs.general.buffer_segments import BufferSegments
2122

2223
_static_log = logging.getLogger().getChild('(OF)')
2324

@@ -34,19 +35,6 @@ def get_temp_filepath(normalized_entry, mime_type):
3435
return ("%s/local/%s" % (temp_path, temp_filename))
3536

3637

37-
38-
# TODO(dustin): LCM runs in a greenlet pool. When we open a file that needs the
39-
# existing data for a file (read, append), a switch is done to an
40-
# LCM worker. If the data is absent or faulted, download the
41-
# content. Then, switch back.
42-
43-
class LocalCopyManager(object):
44-
"""Manages local copies of files."""
45-
46-
# def
47-
pass
48-
49-
5038
class OpenedManager(object):
5139
"""Manages all of the currently-open files."""
5240

@@ -296,8 +284,16 @@ def __init__(self, entry_id, path, filename, is_hidden, mime_type):
296284

297285
self.reset_state()
298286

287+
try:
288+
entry = self.__get_entry_or_raise()
289+
except:
290+
self.__log.exception("Could not get entry with ID [%s] for "
291+
"write-flush." % (self.__entry_id))
292+
raise
293+
294+
self.__temp_filepath = get_temp_filepath(entry, self.mime_type)
295+
299296
def reset_state(self):
300-
self.__buffer = None
301297
self.__is_loaded = False
302298
self.__is_dirty = False
303299

@@ -348,7 +344,7 @@ def __load_base_from_remote(self):
348344

349345
self.__log.debug("Ensuring local availability of [%s]." % (entry))
350346

351-
temp_file_path = get_temp_filepath(entry, self.mime_type)
347+
temp_filepath = get_temp_filepath(entry, self.mime_type)
352348

353349
self.__log.debug("__load_base_from_remote about to download.")
354350

@@ -358,7 +354,7 @@ def __load_base_from_remote(self):
358354

359355
self.__log.info("Attempting local cache update of file [%s] for "
360356
"entry [%s] and mime-type [%s]." %
361-
(temp_file_path, entry, self.mime_type))
357+
(temp_filepath, entry, self.mime_type))
362358

363359
if entry.requires_mimetype:
364360
length = DisplacedFile.file_size
@@ -367,11 +363,11 @@ def __load_base_from_remote(self):
367363
d = DisplacedFile(entry)
368364
stub_data = d.deposit_file(self.mime_type)
369365

370-
with file(temp_file_path, 'w') as f:
366+
with file(temp_filepath, 'w') as f:
371367
f.write(stub_data)
372368
except:
373369
self.__log.exception("Could not deposit to file [%s] from "
374-
"entry [%s]." % (temp_file_path,
370+
"entry [%s]." % (temp_filepath,
375371
entry))
376372
raise
377373

@@ -382,9 +378,10 @@ def __load_base_from_remote(self):
382378
self.__log.info("Executing the download.")
383379

384380
try:
385-
# TODO(dustin): We're not inheriting an existing file (same mtime, same size).
381+
# TODO(dustin): Confirm that this will inherit an existing file (same mtime,
382+
# same size).
386383
result = drive_proxy('download_to_local',
387-
output_file_path=temp_file_path,
384+
output_file_path=temp_filepath,
388385
normalized_entry=entry,
389386
mime_type=self.mime_type)
390387

@@ -401,45 +398,6 @@ def __load_base_from_remote(self):
401398
"__is_loaded= [%s]" %
402399
(cache_fault, self.__is_loaded))
403400

404-
# We've either not loaded it, yet, or it has changed.
405-
if cache_fault or not self.__is_loaded:
406-
with self.__class__.__update_lock:
407-
self.__log.info("Checking queued items for fault.")
408-
409-
if cache_fault:
410-
if self.__is_dirty:
411-
self.__log.error("Entry [%s] has been changed. "
412-
"Forcing buffer updates, and "
413-
"clearing uncommitted updates." %
414-
(entry))
415-
else:
416-
self.__log.debug("Entry [%s] has changed. "
417-
"Updating buffers." % (entry))
418-
419-
self.__log.debug("Loading buffers.")
420-
421-
with open(temp_file_path, 'rb') as f:
422-
# Read the locally cached file in.
423-
424-
try:
425-
# TODO(dustin): Our accounting is broken when it comes to loading and/or update-tracking. If we have a guarantee thawrites only appear in sequence and in increasing order, we can dump BufferSegments.
426-
427-
# TODO(dustin): This is the source of:
428-
# 1) An enormous slowdown where we first have to write the data, and then have to read it back.
429-
# 2) An enormous resource burden.
430-
data = f.read()
431-
432-
read_blocksize = Conf.get('default_buffer_read_blocksize')
433-
self.__buffer = BufferSegments(data, read_blocksize)
434-
except:
435-
self.__log.exception("Could not read current cached "
436-
"file into buffer.")
437-
raise
438-
439-
self.__is_dirty = False
440-
441-
self.__is_loaded = True
442-
443401
self.__log.debug("__load_base_from_remote complete.")
444402
return cache_fault
445403

@@ -454,15 +412,45 @@ def add_update(self, offset, data):
454412
self.__load_base_from_remote()
455413
except:
456414
self.__log.exception("Could not load entry to local cache [%s]." %
457-
(self.temp_file_path))
415+
(self.__temp_filepath))
458416
raise
459417

460418
self.__log.debug("Base loaded for add_update.")
461419

462420
with self.__class__.__update_lock:
463-
self.__buffer.apply_update(offset, data)
421+
with open(self.__temp_filepath, 'r+') as f:
422+
f.seek(offset)
423+
f.write(data)
424+
464425
self.__is_dirty = True
465426

427+
@dec_hint(['length'], ['length'], 'OF')
428+
def truncate(self, length):
429+
try:
430+
self.__load_base_from_remote()
431+
except:
432+
self.__log.exception("Could not load entry to local cache [%s]." %
433+
(self.__temp_filepath))
434+
raise
435+
436+
self.__log.debug("Base loaded for truncate.")
437+
438+
entry = self.__get_entry_or_raise()
439+
440+
with self.__class__.__update_lock:
441+
with open(self.__temp_filepath, 'r+') as f:
442+
f.truncate(length)
443+
444+
gd_mtime_epoch = time.mktime(
445+
entry.modified_date.timetuple())
446+
447+
# TODO(dustin): Shouldn't we be taking the first time component (atime?) from the entry?
448+
os.utime(self.__temp_filepath, (time.time(), gd_mtime_epoch))
449+
450+
self.__is_dirty = True
451+
452+
# TODO(dustin): We still have to make sure the mtime is set to match
453+
466454
@dec_hint(prefix='OF')
467455
def flush(self):
468456
"""The OS wants to effect any changes made to the file."""
@@ -472,81 +460,54 @@ def flush(self):
472460
entry = self.__get_entry_or_raise()
473461
cache_fault = self.__load_base_from_remote()
474462

463+
# TODO(dustin): We need to be able to do updates for separate files in
464+
# parallel.
475465
with self.__class__.__update_lock:
476466
if self.__is_dirty is False:
477467
self.__log.debug("Flush will be skipped because there are no "
478468
"changes.")
479-
# TODO: Raise an exception?
480469
return
481470

482-
# Write back out to the temporary file.
483-
484-
self.__log.debug("Writing buffer to temporary file.")
485-
# TODO: Make sure to uncache the temp data if self.temp_file_path is not None.
486-
487-
mime_type = self.mime_type
488-
489-
# If we've already opened a work file, use it. Else, use a
490-
# temporary file that we'll close at the end of the method.
491-
if self.__is_loaded:
492-
is_temp = False
493-
494-
temp_file_path = get_temp_filepath(entry, mime_type)
495-
496-
with file(temp_file_path, 'w') as f:
497-
for block in self.__buffer.read():
498-
f.write(block)
499-
500-
write_filepath = temp_file_path
501-
else:
502-
is_temp = True
503-
504-
with NamedTemporaryFile(delete=False) as f:
505-
write_filepath = f.name
506-
for block in self.__buffer.read():
507-
f.write(block)
508-
509471
# Push to GD.
510472

473+
# os.stat(self.__temp
474+
511475
self.__log.debug("Pushing (%d) bytes for entry with ID from [%s] "
512476
"to GD for file-path [%s]." %
513-
(self.__buffer.length, entry.id, write_filepath))
514-
515-
# print("Sending updates.")
477+
(self.__buffer.length, entry.id, self.__temp_filepath))
516478

517-
# TODO: Update mtime?
479+
# TODO: Will this automatically update mtime?
480+
# TODO(dustin): We need to be able to update individual slices of the file
481+
# (maybe only if we've affected less than X% of the file).
518482
try:
519483
entry = drive_proxy('update_entry',
520484
normalized_entry=entry,
521485
filename=entry.title,
522-
data_filepath=write_filepath,
523-
mime_type=mime_type,
486+
data_filepath=self.__temp_filepath,
487+
mime_type=self.mime_type,
524488
parents=entry.parents,
525489
is_hidden=self.__is_hidden)
526490
except:
527491
self.__log.exception("Could not localize displaced file with "
528492
"entry having ID [%s]." % (entry.id))
529493
raise
530494

531-
if not is_temp:
532-
unlink(write_filepath)
533-
else:
534-
# Update the write-cache file to the official mtime. We won't
535-
# redownload it on the next flush if it wasn't changed,
536-
# elsewhere.
495+
# Update the write-cache file to the official mtime. We won't
496+
# redownload it on the next flush if it wasn't changed,
497+
# elsewhere.
537498

538-
self.__log.debug("Updating local write-cache file to official "
539-
"mtime [%s]." % (entry.modified_date_epoch))
499+
self.__log.debug("Updating local write-cache file to official "
500+
"mtime [%s]." % (entry.modified_date_epoch))
540501

541-
try:
542-
utime(write_filepath, (entry.modified_date_epoch,
543-
entry.modified_date_epoch))
544-
except:
545-
self.__log.exception("Could not update mtime of write-"
546-
"cache [%s] for entry with ID [%s], "
547-
"post-flush." %
548-
(entry.modified_date_epoch, entry.id))
549-
raise
502+
try:
503+
utime(self.__temp_filepath, (entry.modified_date_epoch,
504+
entry.modified_date_epoch))
505+
except:
506+
self.__log.exception("Could not update mtime of write-"
507+
"cache [%s] for entry with ID [%s], "
508+
"post-flush." %
509+
(entry.modified_date_epoch, entry.id))
510+
raise
550511

551512
# Immediately update our current cached entry.
552513

gdrivefs/gdtool/drive.py

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ def download_to_local(self, output_file_path, normalized_entry, mime_type,
367367
# Use the cache. It's fine.
368368

369369
self.__log.debug("File retrieved from the previously downloaded, "
370-
"still-current file.")
370+
"still-current file.")
371+
371372
return (stat_info.st_size, False)
372373

373374
# Go and get the file.
@@ -377,42 +378,6 @@ def download_to_local(self, output_file_path, normalized_entry, mime_type,
377378

378379
url = normalized_entry.download_links[mime_type]
379380

380-
# self.__log.debug("Downloading file from [%s]." % (url))
381-
#
382-
# try:
383-
# data_tuple = authed_http.request(url)
384-
# except:
385-
# self.__log.exception("Could not download entry with ID [%s], type "
386-
# "[%s], and URL [%s]." % (normalized_entry.id,
387-
# mime_type, url))
388-
# raise
389-
#
390-
# (response_headers, data) = data_tuple
391-
#
392-
# # Throw a log-item if we see any "Range" response-headers. If GD ever
393-
# # starts supporting "Range" headers, we'll be able to write smarter
394-
# # download mechanics (resume, etc..).
395-
#
396-
# r = re.compile('Range')
397-
# range_found = [("%s: %s" % (k, v)) for k, v
398-
# in response_headers.iteritems()
399-
# if r.match(k)]
400-
# if range_found:
401-
# self.__log.info("GD has returned Range-related headers: %s" %
402-
# (", ".join(found)))
403-
#
404-
# self.__log.info("Downloaded file is (%d) bytes. Writing to [%s]." %
405-
# (len(data), output_file_path))
406-
#
407-
# try:
408-
# with open(output_file_path, 'wb') as f:
409-
# f.write(data)
410-
# except:
411-
# self.__log.exception("Could not cached downloaded file. Skipped.")
412-
#
413-
# else:
414-
# self.__log.info("File written to cache successfully.")
415-
416381
with open(output_file_path, 'wb') as f:
417382
downloader = gdrivefs.gdtool.chunked_download.ChunkedDownload(
418383
f,
@@ -424,12 +389,7 @@ def download_to_local(self, output_file_path, normalized_entry, mime_type,
424389
if done is True:
425390
break
426391

427-
try:
428-
utime(output_file_path, (time.time(), gd_mtime_epoch))
429-
except:
430-
self.__log.exception("Could not set time on [%s]." %
431-
(output_file_path))
432-
raise
392+
utime(output_file_path, (time.time(), gd_mtime_epoch))
433393

434394
return (total_size, True)
435395

0 commit comments

Comments
 (0)