Skip to content

Commit b4c0396

Browse files
committed
perf: optimize DataMap::get_batched
1 parent ec2ce0f commit b4c0396

File tree

2 files changed

+51
-96
lines changed

2 files changed

+51
-96
lines changed

node/store/src/rocksdb/map.rs

+47-76
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
use super::*;
1818

19+
use indexmap::IndexMap;
20+
use rocksdb::WriteBatch;
1921
use snarkvm::synthesizer::store::helpers::{Map, MapRead};
2022

2123
use core::{fmt, fmt::Debug, hash::Hash};
@@ -25,7 +27,10 @@ use std::{borrow::Cow, sync::atomic::Ordering};
2527
pub struct DataMap<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> {
2628
pub(super) database: RocksDB,
2729
pub(super) context: Vec<u8>,
28-
pub(super) _phantom: PhantomData<(K, V)>,
30+
/// The tracker for whether a database transaction is in progress.
31+
pub(super) batch_in_progress: Arc<AtomicBool>,
32+
/// The database transaction.
33+
pub(super) atomic_batch: Arc<Mutex<IndexMap<K, Option<V>>>>,
2934
}
3035

3136
impl<
@@ -38,18 +43,19 @@ impl<
3843
/// Inserts the given key-value pair into the map.
3944
///
4045
fn insert(&self, key: K, value: V) -> Result<()> {
41-
// Prepare the prefixed key and serialized value.
42-
let raw_key = self.create_prefixed_key(&key)?;
43-
let raw_value = bincode::serialize(&value)?;
44-
4546
// Determine if an atomic batch is in progress.
46-
let is_batch = self.database.batch_in_progress.load(Ordering::SeqCst);
47+
let is_batch = self.batch_in_progress.load(Ordering::SeqCst);
4748

4849
match is_batch {
4950
// If a batch is in progress, add the key-value pair to the batch.
50-
true => self.database.atomic_batch.lock().put(raw_key, raw_value),
51+
true => {
52+
self.atomic_batch.lock().insert(key, Some(value));
53+
}
5154
// Otherwise, insert the key-value pair directly into the map.
5255
false => {
56+
// Prepare the prefixed key and serialized value.
57+
let raw_key = self.create_prefixed_key(&key)?;
58+
let raw_value = bincode::serialize(&value)?;
5359
self.database.put(&raw_key, &raw_value)?;
5460
}
5561
}
@@ -61,17 +67,18 @@ impl<
6167
/// Removes the key-value pair for the given key from the map.
6268
///
6369
fn remove(&self, key: &K) -> Result<()> {
64-
// Prepare the prefixed key.
65-
let raw_key = self.create_prefixed_key(key)?;
66-
6770
// Determine if an atomic batch is in progress.
68-
let is_batch = self.database.batch_in_progress.load(Ordering::SeqCst);
71+
let is_batch = self.batch_in_progress.load(Ordering::SeqCst);
6972

7073
match is_batch {
7174
// If a batch is in progress, add the key to the batch.
72-
true => self.database.atomic_batch.lock().delete(raw_key),
75+
true => {
76+
self.atomic_batch.lock().insert(*key, None);
77+
}
7378
// Otherwise, remove the key-value pair directly from the map.
7479
false => {
80+
// Prepare the prefixed key.
81+
let raw_key = self.create_prefixed_key(key)?;
7582
self.database.delete(&raw_key)?;
7683
}
7784
}
@@ -85,9 +92,9 @@ impl<
8592
///
8693
fn start_atomic(&self) {
8794
// Set the atomic batch flag to `true`.
88-
self.database.batch_in_progress.store(true, Ordering::SeqCst);
95+
self.batch_in_progress.store(true, Ordering::SeqCst);
8996
// Ensure that the atomic batch is empty.
90-
assert!(self.database.atomic_batch.lock().is_empty());
97+
assert!(self.atomic_batch.lock().is_empty());
9198
}
9299

93100
///
@@ -96,33 +103,50 @@ impl<
96103
/// if they are already part of a larger one.
97104
///
98105
fn is_atomic_in_progress(&self) -> bool {
99-
self.database.batch_in_progress.load(Ordering::SeqCst)
106+
self.batch_in_progress.load(Ordering::SeqCst)
100107
}
101108

102109
///
103110
/// Aborts the current atomic operation.
104111
///
105112
fn abort_atomic(&self) {
106113
// Clear the atomic batch.
107-
self.database.atomic_batch.lock().clear();
114+
*self.atomic_batch.lock() = Default::default();
108115
// Set the atomic batch flag to `false`.
109-
self.database.batch_in_progress.store(false, Ordering::SeqCst);
116+
self.batch_in_progress.store(false, Ordering::SeqCst);
110117
}
111118

112119
///
113120
/// Finishes an atomic operation, performing all the queued writes.
114121
///
115122
fn finish_atomic(&self) -> Result<()> {
116123
// Retrieve the atomic batch.
117-
let batch = core::mem::take(&mut *self.database.atomic_batch.lock());
118-
119-
if !batch.is_empty() {
120-
// Execute the batch of operations atomically.
124+
let operations = core::mem::take(&mut *self.atomic_batch.lock());
125+
126+
if !operations.is_empty() {
127+
// Prepare operations batch for underlying database.
128+
let mut batch = WriteBatch::default();
129+
for operation in operations {
130+
match operation {
131+
(key, Some(value)) => {
132+
// Prepare the prefixed key and serialized value for insertion.
133+
let raw_key = self.create_prefixed_key(&key)?;
134+
let raw_value = bincode::serialize(&value)?;
135+
batch.put(raw_key, raw_value);
136+
}
137+
(key, None) => {
138+
// Prepare the prefixed key for deletion.
139+
let raw_key = self.create_prefixed_key(&key)?;
140+
batch.delete(raw_key);
141+
}
142+
};
143+
}
144+
// Execute all the operations atomically.
121145
self.database.rocksdb.write(batch)?;
122146
}
123147

124148
// Set the atomic batch flag to `false`.
125-
self.database.batch_in_progress.store(false, Ordering::SeqCst);
149+
self.batch_in_progress.store(false, Ordering::SeqCst);
126150

127151
Ok(())
128152
}
@@ -177,60 +201,7 @@ impl<
177201
K: Borrow<Q>,
178202
Q: PartialEq + Eq + Hash + Serialize + ?Sized,
179203
{
180-
// Return early if there is no atomic batch in progress.
181-
if self.database.batch_in_progress.load(Ordering::SeqCst) {
182-
struct OperationFinder {
183-
key: Vec<u8>,
184-
value: Option<Option<Box<[u8]>>>,
185-
}
186-
187-
impl rocksdb::WriteBatchIterator for OperationFinder {
188-
fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) {
189-
if *key == self.key {
190-
self.value = Some(Some(value));
191-
}
192-
}
193-
194-
fn delete(&mut self, key: Box<[u8]>) {
195-
if *key == self.key {
196-
self.value = Some(None);
197-
}
198-
}
199-
}
200-
201-
// Prepare the prefixed key and serialized value.
202-
let raw_key = match self.create_prefixed_key(key) {
203-
Ok(key) => key,
204-
Err(error) => {
205-
error!("Failed to create prefixed key in 'get_batched': {:?}", error);
206-
return None;
207-
}
208-
};
209-
210-
// Retrieve the atomic batch.
211-
let batch = self.database.atomic_batch.lock();
212-
213-
// Initialize the operation finder.
214-
let mut finder = OperationFinder { key: raw_key, value: None };
215-
216-
// Iterate over the batch.
217-
batch.iterate(&mut finder);
218-
219-
// Return the value.
220-
match finder.value {
221-
Some(Some(value)) => match bincode::deserialize(&value) {
222-
Ok(value) => Some(Some(value)),
223-
Err(error) => {
224-
error!("Failed to deserialize value in 'get_batched': {:?}", error);
225-
None
226-
}
227-
},
228-
Some(None) => Some(None),
229-
None => None,
230-
}
231-
} else {
232-
None
233-
}
204+
if self.batch_in_progress.load(Ordering::SeqCst) { self.atomic_batch.lock().get(key).cloned() } else { None }
234205
}
235206

236207
///

node/store/src/rocksdb/mod.rs

+4-20
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ pub struct RocksDB {
6262
network_id: u16,
6363
/// The optional development ID.
6464
dev: Option<u16>,
65-
/// The tracker for whether a database transaction is in progress.
66-
batch_in_progress: Arc<AtomicBool>,
67-
/// The database transaction.
68-
atomic_batch: Arc<Mutex<rocksdb::WriteBatch>>,
6965
}
7066

7167
impl Deref for RocksDB {
@@ -102,13 +98,7 @@ impl Database for RocksDB {
10298
Arc::new(rocksdb::DB::open(&options, primary)?)
10399
};
104100

105-
Ok::<_, anyhow::Error>(RocksDB {
106-
rocksdb,
107-
network_id,
108-
dev,
109-
batch_in_progress: Default::default(),
110-
atomic_batch: Default::default(),
111-
})
101+
Ok::<_, anyhow::Error>(RocksDB { rocksdb, network_id, dev })
112102
})?
113103
.clone();
114104

@@ -133,7 +123,7 @@ impl Database for RocksDB {
133123
context.extend_from_slice(&(data_id as u16).to_le_bytes());
134124

135125
// Return the DataMap.
136-
Ok(DataMap { database, context, _phantom: PhantomData })
126+
Ok(DataMap { database, context, batch_in_progress: Default::default(), atomic_batch: Default::default() })
137127
}
138128
}
139129

@@ -162,13 +152,7 @@ impl RocksDB {
162152
Arc::new(rocksdb::DB::open(&options, primary)?)
163153
};
164154

165-
Ok::<_, anyhow::Error>(RocksDB {
166-
rocksdb,
167-
network_id: u16::MAX,
168-
dev,
169-
batch_in_progress: Default::default(),
170-
atomic_batch: Default::default(),
171-
})
155+
Ok::<_, anyhow::Error>(RocksDB { rocksdb, network_id: u16::MAX, dev })
172156
}?;
173157

174158
// Ensure the database development ID match.
@@ -193,7 +177,7 @@ impl RocksDB {
193177
context.extend_from_slice(&(data_id as u16).to_le_bytes());
194178

195179
// Return the DataMap.
196-
Ok(DataMap { database, context, _phantom: PhantomData })
180+
Ok(DataMap { database, context, batch_in_progress: Default::default(), atomic_batch: Default::default() })
197181
}
198182
}
199183

0 commit comments

Comments
 (0)