Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rs/moq-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley <kixelated@gmail.com>", "Brian Medley <bpm@bmedley.org>"
repository = "https://github.com/moq-dev/moq"
license = "MIT OR Apache-2.0"

version = "0.1.6"
version = "0.2.0"
edition = "2024"

keywords = ["quic", "http3", "webtransport", "media", "live"]
Expand Down
22 changes: 15 additions & 7 deletions rs/moq-ffi/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ pub struct MoqMediaConsumer {
}

struct Media {
inner: hang::container::OrderedConsumer,
inner: moq_mux::ordered::Consumer<hang::catalog::Container>,
}

impl Media {
async fn next(&mut self) -> Result<Option<MoqFrame>, MoqError> {
let Some(frame) = self.inner.read().await? else {
let frame = self.inner.read().await?;

let Some(frame) = frame else {
return Ok(None);
};

let keyframe = frame.is_keyframe();
let timestamp_us: u64 = frame
.timestamp
.as_micros()
Expand All @@ -64,7 +65,7 @@ impl Media {
Ok(Some(MoqFrame {
payload,
timestamp_us,
keyframe,
keyframe: frame.keyframe,
}))
}
}
Expand All @@ -83,14 +84,21 @@ impl MoqBroadcastConsumer {
}))
}

/// Subscribe to a media track by name, delivering frames in decode order.
/// Subscribe to a track by name, delivering frames in decode order.
///
/// `container` is the track container from the catalog.
/// `max_latency_ms` controls the maximum buffering before skipping a GoP.
pub fn subscribe_media(&self, name: String, max_latency_ms: u64) -> Result<Arc<MoqMediaConsumer>, MoqError> {
pub fn subscribe_media(
&self,
name: String,
container: Container,
max_latency_ms: u64,
) -> Result<Arc<MoqMediaConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&moq_lite::Track { name, priority: 0 })?;
let container: hang::catalog::Container = container.into();
let latency = std::time::Duration::from_millis(max_latency_ms);
let consumer = hang::container::OrderedConsumer::new(track, latency);
let consumer = moq_mux::ordered::Consumer::new(track, container).with_latency(latency);
Ok(Arc::new(MoqMediaConsumer {
task: Task::new(Media { inner: consumer }),
}))
Expand Down
3 changes: 3 additions & 0 deletions rs/moq-ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub enum MoqError {
#[error(transparent)]
Media(#[from] hang::Error),

#[error(transparent)]
Mux(#[from] moq_mux::Error),

#[error(transparent)]
Url(#[from] url::ParseError),

Expand Down
17 changes: 13 additions & 4 deletions rs/moq-ffi/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::HashMap;

#[derive(uniffi::Record)]
#[derive(Clone, uniffi::Record)]
pub struct MoqDimensions {
pub width: u32,
pub height: u32,
}

#[derive(uniffi::Enum)]
#[derive(Clone, uniffi::Enum)]
pub enum Container {
Legacy,
Cmaf { timescale: u64, track_id: u32 },
Expand All @@ -21,6 +21,15 @@ impl From<hang::catalog::Container> for Container {
}
}

impl From<Container> for hang::catalog::Container {
fn from(container: Container) -> Self {
match container {
Container::Legacy => Self::Legacy,
Container::Cmaf { timescale, track_id } => Self::Cmaf { timescale, track_id },
}
}
}

#[derive(uniffi::Record)]
pub struct MoqCatalog {
pub video: HashMap<String, MoqVideo>,
Expand All @@ -30,7 +39,7 @@ pub struct MoqCatalog {
pub flip: Option<bool>,
}

#[derive(uniffi::Record)]
#[derive(Clone, uniffi::Record)]
pub struct MoqVideo {
pub codec: String,
pub description: Option<Vec<u8>>,
Expand All @@ -41,7 +50,7 @@ pub struct MoqVideo {
pub container: Container,
}

#[derive(uniffi::Record)]
#[derive(Clone, uniffi::Record)]
pub struct MoqAudio {
pub codec: String,
pub description: Option<Vec<u8>>,
Expand Down
14 changes: 10 additions & 4 deletions rs/moq-ffi/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ async fn local_publish_consume_audio() {
assert_eq!(audio.channel_count, 2);
assert!(catalog.video.is_empty());

let media_consumer = broadcast_consumer.subscribe_media(track_name.clone(), 10_000).unwrap();
let media_consumer = broadcast_consumer
.subscribe_media(track_name.clone(), audio.container.clone(), 10_000)
.unwrap();

let payload = b"opus audio payload data".to_vec();
media.write_frame(payload.clone(), 1_000_000).unwrap();
Expand Down Expand Up @@ -151,7 +153,9 @@ async fn video_publish_consume() {
assert_eq!(coded.height, 720);
assert!(catalog.audio.is_empty());

let media_consumer = broadcast_consumer.subscribe_media(track_name.clone(), 10_000).unwrap();
let media_consumer = broadcast_consumer
.subscribe_media(track_name.clone(), video.container.clone(), 10_000)
.unwrap();

let keyframe = vec![0x00, 0x00, 0x00, 0x01, 0x65, 0xAA, 0xBB, 0xCC];
media.write_frame(keyframe, 0).unwrap();
Expand Down Expand Up @@ -190,8 +194,10 @@ async fn multiple_frames_ordering() {
.unwrap()
.unwrap();

let track_name = catalog.audio.keys().next().unwrap().clone();
let media_consumer = broadcast_consumer.subscribe_media(track_name, 10_000).unwrap();
let (track_name, audio) = catalog.audio.iter().next().unwrap();
let media_consumer = broadcast_consumer
.subscribe_media(track_name.clone(), audio.container.clone(), 10_000)
.unwrap();

let timestamps: [u64; 5] = [0, 20_000, 40_000, 60_000, 80_000];
for (i, &ts) in timestamps.iter().enumerate() {
Expand Down
3 changes: 2 additions & 1 deletion rs/moq-mux/src/cmaf/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub(crate) fn decode(data: Bytes, timescale: u64) -> Result<Vec<Frame>, Error> {
return Ok(frames);
}

let timestamp = Timestamp::from_scale(dts, timescale)?;
let pts = dts as i64 + entry.cts.unwrap_or(0) as i64;
let timestamp = Timestamp::from_scale(pts.max(0) as u64, timescale)?;
let payload = Bytes::copy_from_slice(&mdat_data[offset..end]);
let flags = entry.flags.unwrap_or(0);
// depends_on_no_other (bits 24-25 == 0x2) means keyframe
Expand Down
39 changes: 3 additions & 36 deletions rs/moq-mux/src/hang/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ impl Container for Legacy {
}

#[cfg(feature = "mp4")]
impl Container for hang::catalog::VideoConfig {
Comment thread
Qizot marked this conversation as resolved.
impl Container for hang::catalog::Container {
type Error = crate::Error;

fn write(&self, group: &mut moq_lite::GroupProducer, frames: &[Frame]) -> Result<(), Self::Error> {
match &self.container {
match self {
hang::catalog::Container::Legacy => Legacy.write(group, frames).map_err(Into::into),
hang::catalog::Container::Cmaf { timescale, track_id } => {
crate::cmaf::encode(group, frames, *timescale, *track_id).map_err(Into::into)
Expand All @@ -63,40 +63,7 @@ impl Container for hang::catalog::VideoConfig {
group: &mut moq_lite::GroupConsumer,
waiter: &conducer::Waiter,
) -> Poll<Result<Option<Vec<Frame>>, Self::Error>> {
match &self.container {
hang::catalog::Container::Legacy => Legacy.poll_read(group, waiter).map(|r| r.map_err(Into::into)),
hang::catalog::Container::Cmaf { timescale, .. } => {
use std::task::ready;

let Some(data) = ready!(group.poll_read_frame(waiter)?) else {
return Poll::Ready(Ok(None));
};

Poll::Ready(crate::cmaf::decode(data, *timescale).map(Some).map_err(Into::into))
}
}
}
}

#[cfg(feature = "mp4")]
impl Container for hang::catalog::AudioConfig {
type Error = crate::Error;

fn write(&self, group: &mut moq_lite::GroupProducer, frames: &[Frame]) -> Result<(), Self::Error> {
match &self.container {
hang::catalog::Container::Legacy => Legacy.write(group, frames).map_err(Into::into),
hang::catalog::Container::Cmaf { timescale, track_id } => {
crate::cmaf::encode(group, frames, *timescale, *track_id).map_err(Into::into)
}
}
}

fn poll_read(
&self,
group: &mut moq_lite::GroupConsumer,
waiter: &conducer::Waiter,
) -> Poll<Result<Option<Vec<Frame>>, Self::Error>> {
match &self.container {
match self {
hang::catalog::Container::Legacy => Legacy.poll_read(group, waiter).map(|r| r.map_err(Into::into)),
hang::catalog::Container::Cmaf { timescale, .. } => {
use std::task::ready;
Expand Down
19 changes: 3 additions & 16 deletions rs/moq-mux/src/ordered/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1226,26 +1226,13 @@ mod tests {

#[cfg(feature = "mp4")]
#[tokio::test]
async fn video_config_legacy() {
async fn video_container_legacy() {
tokio::time::pause();

let config = hang::catalog::VideoConfig {
codec: "avc1.64001f".parse().unwrap(),
container: hang::catalog::Container::Legacy,
description: None,
coded_width: None,
coded_height: None,
display_ratio_width: None,
display_ratio_height: None,
bitrate: None,
framerate: None,
optimize_for_latency: None,
jitter: None,
};

let mut track = moq_lite::Track::new("video").produce();
let consumer_track = track.consume();
let mut consumer = Consumer::new(consumer_track, config).with_latency(Duration::from_millis(500));
let mut consumer =
Consumer::new(consumer_track, hang::catalog::Container::Legacy).with_latency(Duration::from_millis(500));

// Write frames using Legacy encoding
let mut group = track.create_group(moq_lite::Group { sequence: 0 }).unwrap();
Expand Down
Loading