Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
/// Errors
/// - `IndexError::InvalidInput`: Either Key or Value is 0
/// - `IndexError::OutOfMemory`: Append after index file is already full.
pub fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> {
pub fn append(&mut self, key: Key, value: u64) -> Result<usize, IndexError> {
let key = key.into();
let last_key = self.last_key()?;
if last_key >= key {
Expand All @@ -179,7 +179,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
self.inner[start..start + KEY_SIZE].copy_from_slice(&key_bytes);
self.inner[start + KEY_SIZE..start + ENTRY_SIZE].copy_from_slice(&value_bytes);
self.num_entries += 1;
Ok(())
Ok(start)
}

/// Asynchronously flushes any pending changes to the index file
Expand All @@ -190,6 +190,21 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
self.inner.flush_async()
}

/// Asynchronously flushes the index entry starting at `offset` to the index file.
///
/// On linux, the underlying `msync` is a documented no-op since the kernel already
/// tracks dirty pages and flushes them as needed.
///
/// See https://man7.org/linux/man-pages/man2/msync.2.html for details.
///
/// On macOS, it is not a documented no-op, and it explicitly states that `msync`
/// will only examine pages covered by the provided address range. Hence this should
/// be preferred over [`Self::async_flush`] when only flushing a single entry at a
/// time, since it may avoid examining pages across the whole mapping.
pub fn async_flush_entry(&self, offset: usize) -> io::Result<()> {
self.inner.flush_async_range(offset, ENTRY_SIZE)
}

/// Truncates the index file starting from the entry with a key greater than
/// or equal to the given key.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Default for Options {
impl Options {
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = true;
Comment thread
joshua-spacetime marked this conversation as resolved.
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024;

Expand Down
9 changes: 3 additions & 6 deletions crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ impl OffsetIndexWriter {
return Ok(());
}

self.head
let entry_offset = self
.head
.append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
self.head.async_flush()?;
self.head.async_flush_entry(entry_offset)?;
self.reset();

Ok(())
Expand All @@ -371,10 +372,6 @@ impl FileLike for OffsetIndexWriter {
let _ = self.append_internal().map_err(|e| {
warn!("failed to append to offset index: {e:?}");
});
let _ = self
.head
.async_flush()
.map_err(|e| warn!("failed to flush offset index: {e:?}"));
Comment thread
joshua-spacetime marked this conversation as resolved.
Ok(())
}

Expand Down
Loading