Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions demos/supabase-todolist/lib/attachments/queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class PhotoAttachmentQueue extends AbstractAttachmentQueue {
return results.map((row) => row['photo_id'] as String).toList();
}).listen((ids) async {
List<String> idsInQueue = await attachmentsService.getAttachmentIds();
for (String id in ids) {
await syncingService.reconcileId(id, idsInQueue, fileExtension);
}
List<String> relevantIds =
ids.where((element) => !idsInQueue.contains(element)).toList();
syncingService.processIds(relevantIds, fileExtension);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-todolist/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ packages:
path: "../../packages/powersync_attachments_helper"
relative: true
source: path
version: "0.2.1"
version: "0.3.0"
realtime_client:
dependency: transitive
description:
Expand Down
5 changes: 5 additions & 0 deletions packages/powersync_attachments_helper/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.3.0

- BREAKING CHANGE: `reconcileId` has been removed in favour of `reconcileIds`. This will require a change to `watchIds` implementation which is shown in `example/getting_started.dart`
- Improved queue so that uploads, downloads and deletes do not happen multiple times

## 0.2.1

- Added `onUploadError` as an optional function that can be set when setting up the queue to handle upload errors
Expand Down
8 changes: 4 additions & 4 deletions packages/powersync_attachments_helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ class PhotoAttachmentQueue extends AbstractAttachmentQueue {
// This watcher will handle adding items to the queue based on
// a users table element receiving a photoId
@override
StreamSubscription<void> watchIds() {
StreamSubscription<void> watchIds({String fileExtension = 'jpg'}) {
return db.watch('''
SELECT photo_id FROM users
WHERE photo_id IS NOT NULL
''').map((results) {
return results.map((row) => row['photo_id'] as String).toList();
}).listen((ids) async {
List<String> idsInQueue = await attachmentsService.getAttachmentIds();
for (String id in ids) {
await syncingService.reconcileId(id, idsInQueue);
}
List<String> relevantIds =
ids.where((element) => !idsInQueue.contains(element)).toList();
syncingService.processIds(relevantIds, fileExtension);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class PhotoAttachmentQueue extends AbstractAttachmentQueue {
return results.map((row) => row['photo_id'] as String).toList();
}).listen((ids) async {
List<String> idsInQueue = await attachmentsService.getAttachmentIds();
for (String id in ids) {
await syncingService.reconcileId(id, idsInQueue, fileExtension);
}
List<String> relevantIds =
ids.where((element) => !idsInQueue.contains(element)).toList();
syncingService.processIds(relevantIds, fileExtension);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ abstract class AbstractAttachmentQueue {
await localStorage.makeDir(await getStorageDirectory());

watchIds();
syncingService.watchUploads();
syncingService.watchDownloads();
syncingService.watchDeletes();
syncingService.watchAttachments();

db.statusStream.listen((status) {
if (db.currentStatus.connected) {
Expand All @@ -79,9 +77,7 @@ abstract class AbstractAttachmentQueue {
}

_trigger() async {
await syncingService.runDownloads();
await syncingService.runDeletes();
await syncingService.runUploads();
await syncingService.runSync();
}

/// Returns the local file path for the given filename, used to store in the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,47 @@ class AttachmentsService {
return updatedRecord;
}

/// Save the attachments to the attachment queue.
Future<void> saveAttachments(List<Attachment> attachments) async {
if (attachments.isEmpty) {
return;
}
List<List<String>> ids = List.empty(growable: true);

RegExp extractObjectValueRegEx = RegExp(r': (.*?)(?:,|$)');

// This adds a timestamp to the attachments and
// extracts the values from the attachment object
// e.g "foo: bar, baz: qux" => ["bar", "qux"]
// TODO: Extract value without needing to use regex
List<List<String?>> updatedRecords = attachments
.map((attachment) {
ids.add([attachment.id]);
return attachment.copyWith(
timestamp: DateTime.now().millisecondsSinceEpoch,
);
})
.toList()
.map((attachment) {
return extractObjectValueRegEx
.allMatches(attachment.toString().replaceAll('}', ''))
.map((match) => match.group(1))
.toList();
})
.toList();

await db.executeBatch('''
INSERT OR REPLACE INTO $table
(id, filename, local_uri, media_type, size, timestamp, state) VALUES (?, ?, ?, ?, ?, ?, ?)
''', updatedRecords);

await db.executeBatch('''
DELETE FROM $table WHERE id = ?
''', ids);

return;
}

/// Get all the ID's of attachments in the attachment queue.
Future<List<String>> getAttachmentIds() async {
ResultSet results =
Expand Down
143 changes: 42 additions & 101 deletions packages/powersync_attachments_helper/lib/src/syncing_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SyncingService {
onDownloadError;
final Future<bool> Function(Attachment attachment, Object exception)?
onUploadError;
bool isProcessing = false;

SyncingService(this.db, this.remoteStorage, this.localStorage,
this.attachmentsService, this.getLocalUri,
Expand Down Expand Up @@ -103,139 +104,79 @@ class SyncingService {
}
}

/// Function to manually run downloads for attachments marked for download
/// in the attachment queue.
/// Once a an attachment marked for download is found it will initiate a
/// download of the file to local storage.
StreamSubscription<void> watchDownloads() {
log.info('Watching downloads...');
return db.watch('''
SELECT * FROM ${attachmentsService.table}
WHERE state = ${AttachmentState.queuedDownload.index}
''').map((results) {
return results.map((row) => Attachment.fromRow(row));
}).listen((attachments) async {
for (Attachment attachment in attachments) {
log.info('Downloading ${attachment.filename}');
await downloadAttachment(attachment);
}
});
}
/// Handle downloading, uploading or deleting of attachments
Future<void> handleSync(Iterable<Attachment> attachments) async {
if (isProcessing == true) {
return;
}

/// Watcher for attachments marked for download in the attachment queue.
/// Once a an attachment marked for download is found it will initiate a
/// download of the file to local storage.
Future<void> runDownloads() async {
List<Attachment> attachments = await db.execute('''
SELECT * FROM ${attachmentsService.table}
WHERE state = ${AttachmentState.queuedDownload.index}
''').then((results) {
return results.map((row) => Attachment.fromRow(row)).toList();
});
isProcessing = true;

for (Attachment attachment in attachments) {
log.info('Downloading ${attachment.filename}');
await downloadAttachment(attachment);
}
}

/// Watcher for attachments marked for upload in the attachment queue.
/// Once a an attachment marked for upload is found it will initiate an
/// upload of the file to remote storage.
StreamSubscription<void> watchUploads() {
log.info('Watching uploads...');
return db.watch('''
SELECT * FROM ${attachmentsService.table}
WHERE local_uri IS NOT NULL
AND state = ${AttachmentState.queuedUpload.index}
''').map((results) {
return results.map((row) => Attachment.fromRow(row));
}).listen((attachments) async {
for (Attachment attachment in attachments) {
if (AttachmentState.queuedDownload.index == attachment.state) {
log.info('Downloading ${attachment.filename}');
await downloadAttachment(attachment);
}
if (AttachmentState.queuedUpload.index == attachment.state) {
log.info('Uploading ${attachment.filename}');
await uploadAttachment(attachment);
}
});
}

/// Function to manually run uploads for attachments marked for upload
/// in the attachment queue.
/// Once a an attachment marked for deletion is found it will initiate an
/// upload of the file to remote storage
Future<void> runUploads() async {
List<Attachment> attachments = await db.execute('''
SELECT * FROM ${attachmentsService.table}
WHERE local_uri IS NOT NULL
AND state = ${AttachmentState.queuedUpload.index}
''').then((results) {
return results.map((row) => Attachment.fromRow(row)).toList();
});

for (Attachment attachment in attachments) {
log.info('Uploading ${attachment.filename}');
await uploadAttachment(attachment);
if (AttachmentState.queuedDelete.index == attachment.state) {
log.info('Deleting ${attachment.filename}');
await deleteAttachment(attachment);
}
}

isProcessing = false;
}

/// Watcher for attachments marked for deletion in the attachment queue.
/// Once a an attachment marked for deletion is found it will initiate remote
/// and local deletions of the file.
StreamSubscription<void> watchDeletes() {
log.info('Watching deletes...');
/// Watcher for changes to attachments table
/// Once a change is detected it will initiate a sync of the attachments
StreamSubscription<void> watchAttachments() {
log.info('Watching attachments...');
return db.watch('''
SELECT * FROM ${attachmentsService.table}
WHERE state = ${AttachmentState.queuedDelete.index}
WHERE state != ${AttachmentState.archived.index}
''').map((results) {
return results.map((row) => Attachment.fromRow(row));
}).listen((attachments) async {
for (Attachment attachment in attachments) {
log.info('Deleting ${attachment.filename}');
await deleteAttachment(attachment);
}
await handleSync(attachments);
});
}

/// Function to manually run deletes for attachments marked for deletion
/// in the attachment queue.
/// Once a an attachment marked for deletion is found it will initiate remote
/// and local deletions of the file.
Future<void> runDeletes() async {
/// Run the sync process on all attachments
Future<void> runSync() async {
List<Attachment> attachments = await db.execute('''
SELECT * FROM ${attachmentsService.table}
WHERE state = ${AttachmentState.queuedDelete.index}
WHERE state != ${AttachmentState.archived.index}
''').then((results) {
return results.map((row) => Attachment.fromRow(row)).toList();
});

for (Attachment attachment in attachments) {
log.info('Deleting ${attachment.filename}');
await deleteAttachment(attachment);
}
await handleSync(attachments);
}

/// Reconcile an ID with ID's in the attachment queue.
/// If the ID is not in the queue, but the file exists locally then it is
/// in local and remote storage.
/// If the ID is in the queue, but the file does not exist locally then it is
/// marked for download.
reconcileId(String id, List<String> idsInQueue, String fileExtension) async {
bool idIsInQueue = idsInQueue.contains(id);
/// Process ID's to be included in the attachment queue.
processIds(List<String> ids, String fileExtension) async {
List<Attachment> attachments = List.empty(growable: true);

String path = await getLocalUri('$id.$fileExtension');
File file = File(path);
bool fileExists = await file.exists();
for (String id in ids) {
String path = await getLocalUri('$id.$fileExtension');
File file = File(path);
bool fileExists = await file.exists();

if (!idIsInQueue) {
if (fileExists) {
log.info('ignore file $id.$fileExtension as it already exists');
return;
}

log.info('Adding $id to queue');
return await attachmentsService.saveAttachment(Attachment(
id: id,
filename: '$id.$fileExtension',
state: AttachmentState.queuedDownload.index,
));
attachments.add(Attachment(
id: id,
filename: '$id.$fileExtension',
state: AttachmentState.queuedDownload.index));
}

await attachmentsService.saveAttachments(attachments);
}
}
8 changes: 4 additions & 4 deletions packages/powersync_attachments_helper/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: powersync_attachments_helper
description: A helper library for handling attachments when using PowerSync.
version: 0.2.1
version: 0.3.0
repository: https://github.com/powersync-ja/powersync.dart
homepage: https://www.powersync.com/
environment:
Expand All @@ -10,14 +10,14 @@ dependencies:
flutter:
sdk: flutter

powersync: ^1.1.0
powersync: ^1.2.2
logging: ^1.2.0
sqlite_async: ^0.6.0
path_provider: ^2.1.1
path_provider: ^2.1.2

dev_dependencies:
lints: ^3.0.0
test: ^1.25.0
test: ^1.25.2

platforms:
android:
Expand Down