diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index 8a39c72f..d6cbf3ef 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -99,6 +99,7 @@ async fn connect_socket( } let Some(conn) = parts.extensions.remove::() else { + warn!("Failed to upgrade connection: {:#?}", parts.headers); return Err(StatusCode::UPGRADE_REQUIRED); }; diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 92a95b20..1b310751 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -312,17 +312,14 @@ impl DecryptorCodec { } impl EncryptorCodec { - #[inline] - fn tag_overhead() -> usize { - ::TagSize::to_usize() - } + const TAG_SIZE: usize = ::TagSize::USIZE; } impl Encoder<&[u8]> for EncryptorCodec { type Error = io::Error; fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> Result<(), Self::Error> { - let mut enc_buf = EncBuffer::new(dst, item.len() + Self::tag_overhead()); + let mut enc_buf = EncBuffer::new(dst, (item.len() + Self::TAG_SIZE).try_into().expect("item to large")); enc_buf.extend_from_slice(item).expect("Infallible"); self.encryptor .encrypt_next_in_place(b"", &mut enc_buf) @@ -349,7 +346,7 @@ impl Decoder for DecryptorCodec { let plain = self .decryptor - .decrypt_next(&src[Self::SIZE_OVERHEAD..]) + .decrypt_next(&src[Self::SIZE_OVERHEAD..(Self::SIZE_OVERHEAD + size as usize)]) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Decryption failed"))?; src.advance(total_frame_size); Ok(Some(plain)) @@ -358,17 +355,20 @@ impl Decoder for DecryptorCodec { struct EncBuffer<'a> { buf: &'a mut BytesMut, + /// The index from which the inplace encryption takes place + enc_idx: usize, } impl<'a> EncBuffer<'a> { - fn new(buffer: &'a mut BytesMut, content_len: usize) -> Self { - buffer.reserve(content_len + Self::SIZE_OVERHEAD); - buffer.extend_from_slice(&u32::to_le_bytes(content_len as u32)); - Self { buf: buffer } + fn new(buffer: &'a mut BytesMut, content_len: u32) -> Self { + let enc_idx = buffer.len() + Self::SIZE_OVERHEAD; + buffer.reserve(content_len as usize + Self::SIZE_OVERHEAD); + buffer.extend_from_slice(&u32::to_le_bytes(content_len)); + Self { buf: buffer, enc_idx } } /// Reserved for size of msg - const SIZE_OVERHEAD: usize = 4; + const SIZE_OVERHEAD: usize = (u32::BITS / 8) as usize; } impl<'a> Buffer for EncBuffer<'a> { @@ -386,13 +386,13 @@ impl<'a> Buffer for EncBuffer<'a> { impl<'a> AsRef<[u8]> for EncBuffer<'a> { fn as_ref(&self) -> &[u8] { - &self.buf[Self::SIZE_OVERHEAD..] + &self.buf[self.enc_idx..] } } impl<'a> AsMut<[u8]> for EncBuffer<'a> { fn as_mut(&mut self) -> &mut [u8] { - &mut self.buf[Self::SIZE_OVERHEAD..] + &mut self.buf[self.enc_idx..] } }