Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct writing to/reading from of underlying buffer without Bincode #9

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

andrew-manifold
Copy link

@andrew-manifold andrew-manifold commented Jun 4, 2024

This crate has a fantastically designed API but at times the hard dependency on bincode introduces excessive overhead for very low latency/high throughput applications. I've included an identical set of methods for Sender and Receiver (the _direct_ versions) that allow for:

  • The simple ShmWriter trait allows implementing structs to customize their encoding logic and directly write to the mmap backing buffer without additional costs around bincode's layer and additional allocations this may have originally resulted in (i.e. no intermediary Vec<u8> gets setup)
  • Retrieving a direct slice of an encoded message for reading one layer up. This will allow for true zero copy casts from the backing memory. Large &[u8]'s contained within a message should be able to be easily offset to with no real overhead

@andrew-manifold andrew-manifold changed the title Direct writing to/reading from of underlying buffer without Bitcode Direct writing to/reading from of underlying buffer without Bincode Jun 4, 2024
@dicej
Copy link
Owner

dicej commented Jun 5, 2024

@andrew-manifold Thanks so much for this, and sorry for the delayed response. I'm planning to review this as soon as I have the chance -- probably this Friday.

Copy link
Owner

@dicej dicej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for this. I think the send_direct* parts look good overall (see some comments inline), but I have concerns about the {try_}recv_direct parts, and I think ZeroCopyContext::{try_}recv::<_, &[u8]> can do the job just as efficiently and more correctly.

If you can demonstrate that bincode is adding significant overhead even for the &[u8] case, then perhaps we could add {try_}recv_direct functions to ZeroCopyContext, but I don't believe they can be added soundly to Receiver.

src/lib.rs Outdated
/// Attempt to read a message without blocking. Returns a slice directly from the ring buffer
///
/// This will return `Ok(None)` if there are no messages immediately available.
pub fn try_recv_direct<'a>(&'a self) -> Result<Option<&'a [u8]>>{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What advantage does this have over using ZeroCopyContext::try_recv<_, &[u8]>? I would expect it to give identical performance. Have you run any benchmarks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you've mentioned below it's more around giving an assumption less means for the developer to get access to the underlying bytes. ShmWriter puts all the ownership on the user to properly implement encoding and it was helpful to have a means to allow for proper decoding too. Ideally there would be another trait for ShmReader but the lifetime pollution issues were getting pretty horrific so I opted to just return the backing buffer

src/lib.rs Outdated
@@ -267,6 +287,45 @@ impl Receiver {
})
}

fn try_recv_direct_0<'a>(&'a self) -> Result<Option<(&'a [u8], u32)>> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's significant code duplication across this and try_recv_0 (and likewise across recv_direct_timeout_0 and recv_timeout_0). Assuming these provide a significant performance improvement over ZeroCopyContext, and that they can be used safely and soundly (per my comment above), perhaps we could re-implement try_recv_0 in terms of try_recv_direct_0 (and likewise recv_timeout_0 in terms of recv_direct_timeout_0)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully agree, the code duplication isn't great. I'll try to come up with a more unified approach that would generalize across the safe + unsafe implementations.

/// This will return `Ok(None)` if there are no messages immediately available.
pub fn try_recv_direct<'a>(&'a self) -> Result<Option<&'a [u8]>>{
Ok(if let Some((value, position)) = self.try_recv_direct_0()? {
self.seek(position)?;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is both unsafe and unsound. The call to seek tells the writer(s) that we're done reading the buffer, which is not true, since we're about to return a shared reference to the caller. That violates Rust's aliasing rules because any writer(s) will believe they have unique, mutable access to something the caller has shared, immutable access to.

ZeroCopyContext exists precisely to avoid this problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a valid point, I'll drop that method for the direct version, it indeed should only every be called within the usage of ZeroCopyContext

/// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the message size is
/// greater than the ring buffer capacity, this method will return
/// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`.
pub fn send_direct(&self, value: &impl ShmWriter) -> Result<()> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all the send_direct* methods should be marked unsafe given the inherent unsafety of ShmWriter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, can easily update those

src/lib.rs Outdated
self.send_direct_timeout_0(value, true, None).map(drop)
}

fn send_direct_timeout_0(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could reduce code duplication here by re-implementing send_timeout_0 in terms of send_direct_timeout_0, using a bincode-based ShmWriter implementation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I like that idea lot. It will keep things much cleaner and easier to maintain

src/lib.rs Outdated
/// to account for padding, alignment, proper offsetting etc. or you will cause mayhem.
pub trait ShmWriter {
fn msg_len(&self) -> u32;
fn write_to_shm(&self, shm_ptr: *mut u8, msg_len: u32) -> Result<()>;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of using a raw pointer here rather than a &mut [u8] slice? That would make the API safe, but still allow the implementer to convert the slice to a raw pointer if they want to do something unusual (in which case soundness is their responsibility).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't necessarily think there is an advantage for using a raw pointer, probably more so a result of some bad habits that have been picked up over time haha. Will update to use &mut [u8] as it does give more flexibility

@dicej
Copy link
Owner

dicej commented Jun 7, 2024

Ah, I just realized that using bincode to receive a &[u8] on the receiver end would not be correct if send_direct* was used on the sender end, since bincode will presumably expect a length prefix at the beginning of the message. So we probably will need to add {try_}recv_direct methods to ZeroCopyContext.

@andrew-manifold
Copy link
Author

Thanks again for this. I think the send_direct* parts look good overall (see some comments inline), but I have concerns about the {try_}recv_direct parts, and I think ZeroCopyContext::{try_}recv::<_, &[u8]> can do the job just as efficiently and more correctly.

If you can demonstrate that bincode is adding significant overhead even for the &[u8] case, then perhaps we could add {try_}recv_direct functions to ZeroCopyContext, but I don't believe they can be added soundly to Receiver.

The context of my interest in adding these methods is that I've run into some corner cases with bincode where the combination of writing to a shared memory space and trying to do zero copy deserialization leads to memory alignment issues when using crates like bytemuck to cast a large &[u8] that's contained within a struct. Ideally the safe api on this crate would have been enough but the alternative was to then do a memcpy on any slices to the heap to guarantee alignment which defeats the purpose of zero copy deserialization.

All the feedback is very much appreciated, I'll try to get updates into this PR soon that address all the issues you've pointed out.

@andrew-manifold
Copy link
Author

@dicej I believe all major issues have been fixed, let me know if you think other tweaks would be helpful. I managed to unify both the sender and receiver api's to use one common implementation underneath for each

Copy link
Owner

@dicej dicej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking; thanks for the updates.

Three requests (plus another inline, below):

  • Would you please run cargo fmt on the code?
  • Would you mind adding one or more tests for the _direct cases so we have some coverage there? That will help ensure this feature doesn't regress in the future.
  • I just noticed that there seems to be a subtle regression such that cargo test arbitrary_case hangs indefinitely on your branch but works on the master branch. It's not consuming any CPU, so it might be a deadlock. Would you mind taking a look at that?

src/lib.rs Outdated
/// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the message size is
/// greater than the ring buffer capacity, this method will return
/// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`.
pub unsafe fn send_direct(&self, value: &impl ShmWriter) -> Result<()> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I recommended adding unsafe to these, but now that ShmWriter::write_to_shim takes a &mut [u8] instead of a *mut u8, I think we can remove the unsafe keywords from the send_direct* methods. AFAICT, there's no way for the caller to use them unsoundly unless the caller (or ShmWriter implementer) itself uses unsafe. Sorry for the confusion.

@andrew-manifold
Copy link
Author

andrew-manifold commented Jun 9, 2024

I have everything besides the arbitrary_case issue figured out. Can confirm that the hanging is happening on my end too, the issue seems to be around (via cargo test arbitrary_case -- --nocapture):

thread '<unnamed>' panicked at src/lib.rs:770:25:
assertion `left == right` failed

Currently looking into what is setup incorrectly to cause this as it's this section in the tests:

let receiver_thread = if self.sender_count == 1 {
                // Only one sender means we can expect to receive in a predictable order:
                let expected = self.data.clone();
                thread::spawn(move || -> Result<()> {
                    for item in &expected {
                        let received = rx.recv::<Vec<u8>>()?;
                        assert_eq!(item, &received);
                    }

                    Ok(())
                })

@andrew-manifold
Copy link
Author

andrew-manifold commented Jun 10, 2024

@dicej so after doing more digging I've found a few takeaways:

  • the regression on arbitrary size is always for the first test case but how far along in the test that it fails is not deterministic. this would imply that a contributing factor would be the environment (machine) this test runs on and hence how quickly messages can be processed
  • increasing the channel size to something larger (3x+) then the tests default of ~1024 bytes allows for the test to pass
  • simple tests are able to pass because the message size is substantially smaller then the channel size and the ring buffer recv is able to keep a similar pace with the send
  • I believe the issue is relating to corner cases for wrap arounds. an example of a failed unit test:
arbitrary_case test 1:
channel size: 761 bytes
total size: 897 bytes (includes header + 8 bytes)

msg 0: encoded msg len = 288, start index = 132, end index = 420 
msg 1: encoded msg len = 410, start index = 424, end index = 834
write wraps around ring buffer
msg 2: encoded msg len = 618, start index = 132, end index = 750 
       -> recv fails as the data for msg 2 being received is inconsistent with the actual data

As fas as how this is happening with my PR and not on master is still an unknown to me. My hypothesis is that the compiler is able to possibly do some optimizations in one of the two versions that allows for faster processing. I'll need to look at the assembly to see where things are inconsistent.

running this test on my branch on one of my dev servers and not a Mac book pro (my laptop) has it pass (machine specs below):

Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         48 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  24
  On-line CPU(s) list:   0-23
Vendor ID:               AuthenticAMD
  Model name:            AMD Ryzen 9 7900 12-Core Processor
    CPU family:          25
    Model:               97
    Thread(s) per core:  2
    Core(s) per socket:  12
    Socket(s):           1
    Stepping:            2
    Frequency boost:     enabled
    CPU max MHz:         5481.3472
    CPU min MHz:         3000.0000
    BogoMIPS:            7385.91
    Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mc
                         a cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall n
                         x mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_go
                         od nopl nonstop_tsc cpuid extd_apicid aperfmperf rapl p
                         ni pclmulqdq monitor ssse3 fma cx16 sse4_1 sse4_2 x2api
                         c movbe popcnt aes xsave avx f16c rdrand lahf_lm cmp_le
                         gacy svm extapic cr8_legacy abm sse4a misalignsse 3dnow
                         prefetch osvw ibs skinit wdt tce topoext perfctr_core p
                         erfctr_nb bpext perfctr_llc mwaitx cpb cat_l3 cdp_l3 hw
                         _pstate ssbd mba ibrs ibpb stibp vmmcall fsgsbase bmi1 
                         avx2 smep bmi2 erms invpcid cqm rdt_a avx512f avx512dq 
                         rdseed adx smap avx512ifma clflushopt clwb avx512cd sha
                         _ni avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves cq
                         m_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local avx512_
                         bf16 clzero irperf xsaveerptr rdpru wbnoinvd cppc arat 
                         npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushb
                         yasid decodeassists pausefilter pfthreshold avic v_vmsa
                         ve_vmload vgif v_spec_ctrl avx512vbmi umip pku ospke av
                         x512_vbmi2 gfni vaes vpclmulqdq avx512_vnni avx512_bita
                         lg avx512_vpopcntdq rdpid overflow_recov succor smca fs
                         rm flush_l1d
Virtualization features: 
  Virtualization:        AMD-V
Caches (sum of all):     
  L1d:                   384 KiB (12 instances)
  L1i:                   384 KiB (12 instances)
  L2:                    12 MiB (12 instances)
  L3:                    64 MiB (2 instances)

@andrew-manifold
Copy link
Author

Additional unit tests are now included that cover _direct_ methods too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants