Skip to content

Commit

Permalink
Organized source codes related to RTMP
Browse files Browse the repository at this point in the history
  • Loading branch information
dimiden committed Oct 15, 2024
1 parent 0eddb3e commit 17d8fe6
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 64 deletions.
114 changes: 61 additions & 53 deletions src/projects/providers/rtmp/rtmp_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,14 @@ namespace pvd

if (process_size < 0)
{
logtd("Could not parse RTMP packet: [%s/%s] (%u/%u), size: %zu bytes, returns: %d",
logtd("Could not process RTMP packet: [%s/%s] (%u/%u), size: %zu bytes, returns: %d",
_vhost_app_name.CStr(), _stream_name.CStr(),
_app_id, GetId(),
_remained_data->GetLength(),
process_size);

return process_size;
Stop();
return false;
}
else if (process_size == 0)
{
Expand All @@ -246,15 +247,36 @@ namespace pvd
{
if (_tc_url.IsEmpty() == false)
{
_full_url = _tc_url;
auto remain = document.GetProperty(3)->GetString();
if (remain.IsEmpty() == false)
auto url = ov::Url::Parse(_tc_url);

if (url == nullptr)
{
logtw("Could not parse the URL: %s", _tc_url.CStr());
return false;
}

auto stream_name = document.GetProperty(3)->GetString();

if (stream_name.IsEmpty() == false)
{
_full_url += ov::String::FormatString("/%s", remain.CStr());
url->SetStream(stream_name);
}

if (SetFullUrl(_full_url) &&
CheckAccessControl() &&
// PORT can be omitted (1935), but SignedPolicy requires this information.
if (url->Port() == 0)
{
url->SetPort(_remote->GetLocalAddress()->Port());
}

_url = url;
_publish_url = _url;
_stream_name = _url->Stream();
_import_chunk->SetStreamName(_stream_name);

SetRequestedUrl(_url);
SetFinalUrl(_url);

if (CheckAccessControl() &&
ValidatePublishUrl())
{
return true;
Expand All @@ -265,7 +287,7 @@ namespace pvd
return false;
}

void RtmpStream::OnAmfConnect(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
bool RtmpStream::OnAmfConnect(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
{
double object_encoding = 0.0;

Expand Down Expand Up @@ -331,35 +353,39 @@ namespace pvd
if (SendWindowAcknowledgementSize(RTMP_DEFAULT_ACKNOWNLEDGEMENT_SIZE) == false)
{
logte("SendWindowAcknowledgementSize Fail");
return;
return false;
}

if (SendSetPeerBandwidth(_peer_bandwidth) == false)
{
logte("SendSetPeerBandwidth Fail");
return;
return false;
}

if (SendStreamBegin(0) == false)
{
logte("SendStreamBegin Fail");
return;
return false;
}

if (SendAmfConnectResult(header->basic_header.chunk_stream_id, transaction_id, object_encoding) == false)
{
logte("SendAmfConnectResult Fail");
return;
return false;
}

return true;
}

void RtmpStream::OnAmfCreateStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
bool RtmpStream::OnAmfCreateStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
{
if (SendAmfCreateStreamResult(header->basic_header.chunk_stream_id, transaction_id) == false)
{
logte("SendAmfCreateStreamResult Fail");
return;
return false;
}

return true;
}

bool RtmpStream::CheckAccessControl()
Expand Down Expand Up @@ -464,7 +490,7 @@ namespace pvd
return false;
}

void RtmpStream::OnAmfFCPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
bool RtmpStream::OnAmfFCPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
{
if (_stream_name.IsEmpty())
{
Expand All @@ -476,15 +502,17 @@ namespace pvd
if (SendAmfOnFCPublish(header->basic_header.chunk_stream_id, _rtmp_stream_id, _client_id) == false)
{
logte("SendAmfOnFCPublish Fail");
return;
return false;
}

PostPublish(document);
return PostPublish(document);
}
}

return false;
}

void RtmpStream::OnAmfPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
bool RtmpStream::OnAmfPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
{
if (_stream_name.IsEmpty())
{
Expand All @@ -493,7 +521,7 @@ namespace pvd
{
if (PostPublish(document) == false)
{
return;
return false;
}
}
else
Expand All @@ -508,7 +536,7 @@ namespace pvd
"Authentication Failed.",
_client_id);

return;
return false;
}
}

Expand All @@ -517,7 +545,7 @@ namespace pvd
if (SendStreamBegin(_rtmp_stream_id) == false)
{
logte("SendStreamBegin Fail");
return;
return false;
}

if (SendAmfOnStatus(static_cast<uint32_t>(_chunk_stream_id),
Expand All @@ -528,33 +556,9 @@ namespace pvd
_client_id) == false)
{
logte("SendAmfOnStatus Fail");
return;
}
}

bool RtmpStream::SetFullUrl(ov::String url)
{
_url = ov::Url::Parse(url);

if (_url == nullptr)
{
logtw("Could not parse the URL: %s", url.CStr());
return false;
}

// PORT can be omitted (1935), but SignedPolicy requires this information.
if (_url->Port() == 0)
{
_url->SetPort(_remote->GetLocalAddress()->Port());
}

_publish_url = _url;
_stream_name = _url->Stream();
_import_chunk->SetStreamName(_stream_name);

SetRequestedUrl(_url);
SetFinalUrl(_url);

return true;
}

Expand Down Expand Up @@ -869,7 +873,7 @@ namespace pvd
return true;
}

void RtmpStream::OnAmfDeleteStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
bool RtmpStream::OnAmfDeleteStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id)
{
logtd("Delete Stream - stream(%s/%s) id(%u/%u)", _vhost_app_name.CStr(), _stream_name.CStr(), _app_id, GetId());

Expand All @@ -878,6 +882,8 @@ namespace pvd

// it will call PhysicalPort::OnDisconnected
_remote->Close();

return true;
}

off_t RtmpStream::ReceiveHandshakePacket(const std::shared_ptr<const ov::Data> &data)
Expand Down Expand Up @@ -1055,7 +1061,7 @@ namespace pvd
ReceiveAmfDataMessage(message);
break;
case RtmpMessageTypeID::AMF0_COMMAND:
ReceiveAmfCommandMessage(message);
result = ReceiveAmfCommandMessage(message);
break;
case RtmpMessageTypeID::USER_CONTROL:
result = ReceiveUserControlMessage(message);
Expand Down Expand Up @@ -1158,7 +1164,7 @@ namespace pvd
}
}

void RtmpStream::ReceiveAmfCommandMessage(const std::shared_ptr<const RtmpMessage> &message)
bool RtmpStream::ReceiveAmfCommandMessage(const std::shared_ptr<const RtmpMessage> &message)
{
OV_ASSERT2(message->header != nullptr);
OV_ASSERT2(message->payload != nullptr);
Expand All @@ -1170,7 +1176,7 @@ namespace pvd
if (document.Decode(byte_stream) == false)
{
logte("Could not decode AMFDocument");
return;
return false;
}

// Message Name
Expand All @@ -1181,7 +1187,7 @@ namespace pvd
if (property == nullptr)
{
logte("Message name is not available");
return;
return false;
}

message_name = property->GetString();
Expand All @@ -1200,7 +1206,7 @@ namespace pvd

if (message_name == RTMP_CMD_NAME_CONNECT)
{
OnAmfConnect(message->header, document, transaction_id);
return OnAmfConnect(message->header, document, transaction_id);
}
else if (message_name == RTMP_CMD_NAME_CREATESTREAM)
{
Expand Down Expand Up @@ -1228,8 +1234,10 @@ namespace pvd
else
{
logtd("Unknown Amf0CommandMessage - Message(%s:%.1f)", message_name.CStr(), transaction_id);
return;
return false;
}

return true;
}

void RtmpStream::ReceiveAmfDataMessage(const std::shared_ptr<const RtmpMessage> &message)
Expand Down
17 changes: 6 additions & 11 deletions src/projects/providers/rtmp/rtmp_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,16 @@ namespace pvd
protected:
bool Start() override;

bool ConvertToVideoData(const std::shared_ptr<ov::Data> &data, int64_t &cts);
bool ConvertToAudioData(const std::shared_ptr<ov::Data> &data);

private:
// Called when received AmfFCPublish & AmfPublish event
bool PostPublish(const AmfDocument &document);

// AMF Event
void OnAmfConnect(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
void OnAmfCreateStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
void OnAmfFCPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
void OnAmfPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
void OnAmfDeleteStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfConnect(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfCreateStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfFCPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfDeleteStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfMetaData(const std::shared_ptr<const RtmpChunkHeader> &header, const AmfProperty *property);


Expand Down Expand Up @@ -105,7 +102,7 @@ namespace pvd
bool ReceiveSetChunkSize(const std::shared_ptr<const RtmpMessage> &message);
bool ReceiveUserControlMessage(const std::shared_ptr<const RtmpMessage> &message);
void ReceiveWindowAcknowledgementSize(const std::shared_ptr<const RtmpMessage> &message);
void ReceiveAmfCommandMessage(const std::shared_ptr<const RtmpMessage> &message);
bool ReceiveAmfCommandMessage(const std::shared_ptr<const RtmpMessage> &message);
void ReceiveAmfDataMessage(const std::shared_ptr<const RtmpMessage> &message);

bool CheckEventMessage(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document);
Expand All @@ -121,8 +118,6 @@ namespace pvd
bool PublishStream();
bool SetTrackInfo(const std::shared_ptr<RtmpMediaInfo> &media_info);

bool SetFullUrl(ov::String url);

bool CheckAccessControl();
bool CheckStreamExpired();
bool ValidatePublishUrl();
Expand Down

0 comments on commit 17d8fe6

Please sign in to comment.