本篇文章,主要讲解Edege模式推拉流的调试和源码分析。
1.Edege推拉流相关类介绍
(1) 从这里的源码可以看出,Edege模式的拉流和推流的管理,都是由SrsSource这个类来管理,后面的源码和函数调用,也会体现出来。
SrsSource::SrsSource()
{
req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = 0;
//控制边缘节点的拉流 ,源站origin到边缘节点Edge
play_edge = new SrsPlayEdge();
//控制边缘节点的推流,边缘节点Edge到源站origin
publish_edge = new SrsPublishEdge();
//控制gop的cache
gop_cache = new SrsGopCache();
//控制源站的路由
hub = new SrsOriginHub();
//控制Meta的cache
meta = new SrsMetaCache();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
(2) SrsEdgeUpstream就是从源站里面去拉流。
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("cluster");
if (!conf) {
return NULL;
}
return conf->get("origin");
}
(3)Rtmp源站拉流类SrsEdgeRtmpUpstream,基础上面的类。源码如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("cluster");
if (!conf) {
return NULL;
}
return conf->get("origin");
}
(4)拉流,播放相关:
class SrsPublishEdge
{
private:
SrsEdgeState state;
SrsEdgeForwarder* forwarder;
public:
SrsPublishEdge();
virtual ~SrsPublishEdge();
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual bool can_publish();
// When client publish stream on edge.
virtual srs_error_t on_client_publish();
// Proxy publish stream to edge
virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);
// Proxy unpublish stream to edge.
virtual void on_proxy_unpublish();
};
// The edge used to ingest stream from origin.
class SrsEdgeIngester : public ISrsCoroutineHandler
{
private:
SrsSource* source;
SrsPlayEdge* edge;
SrsRequest* req;
SrsCoroutine* trd;
SrsLbRoundRobin* lb;
SrsEdgeUpstream* upstream;
public:
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
public:
virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
virtual std::string get_curr_origin();
// Interface ISrsReusableThread2Handler
public:
virtual srs_error_t cycle();
private:
virtual srs_error_t do_cycle();
private:
virtual srs_error_t ingest(std::string& redirect);
virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);
};
(5)源站推流类SrsEdgeForwarder,源码如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("cluster");
if (!conf) {
return NULL;
}
return conf->get("origin");
}
(6)推流相关类:
class SrsPublishEdge
{
private:
SrsEdgeState state;
SrsEdgeForwarder* forwarder;
public:
SrsPublishEdge();
virtual ~SrsPublishEdge();
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual bool can_publish();
// When client publish stream on edge.
virtual srs_error_t on_client_publish();
// Proxy publish stream to edge
virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);
// Proxy unpublish stream to edge.
virtual void on_proxy_unpublish();
};
2.Edge模式推流源码分析
(1)推流源码分析,从配置文件,开始读取。
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("cluster");
if (!conf) {
return NULL;
}
return conf->get("origin");
}
(2)前面的文章已经讲过,在Edge模式下,当推流端推流时,首先是推到源站。拉流时,如果边缘节点有缓存,就直接从边缘节点拉取,否则还是要需要从源站去拉取。当有多个源站origin时,
推流,首先推到源站。
srs_error_t SrsEdgeForwarder::start()
{
srs_error_t err = srs_success;
// reset the error code.
send_error_code = ERROR_SUCCESS;
std::string url;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
srs_assert(conf);
// select the origin.
std::string server = lb->select(conf->args);
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(server, server, port);
// support vhost tranform for edge,
// @see https://github.com/ossrs/srs/issues/372
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
}
// open socket.
srs_freep(sdk);
srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "sdk publish");
}
srs_freep(trd);
trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
srs_trace("edge-fwr publish url %s", url.c_str());
return err;
}
(3)如果这个时候有一个源站断开,当再开启推流时,就会推送到另外一个源站。如果有多个源站,就是按照配置文件的配置,从左到右,这样一个顺序,去一个接一个去推。
(4)开启调试SRS,输入命令:
gdb ./objs/srs
界面如下:
(5)调试配置文件,输入命令:
set args -c conf/edge1.conf
b main
c
r
界面如下:
打印断点,输入命令:
b SrsConfig::get_vhost_edge_origin
界面如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("cluster");
if (!conf) {
return NULL;
}
return conf->get("origin");
}
判断当前节点,输入命令:
b SrsConfig::get_vhost_is_edge
界面如下:
(6)判断当前节点是否为边缘节点的源码。
bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)
{
static bool DEFAULT = false;
SrsConfDirective* conf = vhost;
if (!conf) {
return DEFAULT;
}
conf = conf->get("cluster");
if (!conf) {
return DEFAULT;
}
conf = conf->get("mode");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return "remote" == conf->arg0();
}
运行起来,执行命令:
c
跑起来,如下界面:
到这里,应该要开启推流,注意这里是推流到边缘节点(这里举例以19350),如果不知道怎么推流,可以参考前面一篇文章。
(7)这个时候,就会运行到断点这里,然后输入命令:
bt
查看调用栈,如下界面:
0 SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost) at src/app/srs_app_config.cpp:5063
1 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:472
2 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:388
3 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:209
4 SrsConnection::cycle() at src/app/srs_app_conn.cpp:171
5 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
6 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
7 _st_thread_main at sched.c:337
8 st_thread_create at sched.c:616
继续输入调试命令:
c
最主要是看get_vhost_edge_origin,如下界面:
(8)查看调用栈,输入命令:
bt
如下界面:
0 SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:5091
1 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:482
2 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:777
3 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:2592
4 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:936
5 SrsRtmpConn::publishing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:822
6 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:534
7 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:388
8 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:209
9 SrsConnection::cycle() at src/app/srs_app_conn.cpp:171
10 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
11 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
12 _st_thread_main at sched.c:337
13 st_thread_create at sched.c:616
(9)从上面调用栈来看,最大的区别就是该函数下判断是否是边缘节点。源码如下:
srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
if (!source->can_publish(info->edge)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
}
// when edge, ignore the publish event, directly proxy it.
if (info->edge) {
//是边缘节点
if ((err = source->on_edge_start_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp: edge start publish");
}
} else {
//不是边缘节点
if ((err = source->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp: source publish");
}
}
return err;
}
(10)看看边缘节点的分支,源码如下:
srs_error_t SrsSource::on_edge_start_publish()
{
//推流到源站
return publish_edge->on_client_publish();
}
调用SrsPublishEdge::on_client_publish()。该函数的功能是从边缘节点推送到源站。从源码可以看出是调用forwarder去推送。
srs_error_t SrsPublishEdge::on_client_publish()
{
srs_error_t err = srs_success;
// error when not init state.
if (state != SrsEdgeStateInit) {
return srs_error_new(ERROR_RTMP_EDGE_PUBLISH_STATE, "invalid state");
}
// @see https://github.com/ossrs/srs/issues/180
// to avoid multiple publish the same stream on the same edge,
// directly enter the publish stage.
if (true) {
SrsEdgeState pstate = state;
state = SrsEdgeStatePublish;
srs_trace("edge change from %d to state %d (push).", pstate, state);
}
// start to forward stream to origin.
err = forwarder->start();
// @see https://github.com/ossrs/srs/issues/180
// when failed, revert to init
if (err != srs_success) {
SrsEdgeState pstate = state;
state = SrsEdgeStateInit;
srs_trace("edge revert from %d to state %d (push), error %s", pstate, state, srs_error_desc(err).c_str());
}
return err;
}
(11)每一路边缘节点推流到源站,都是用forwarder,开启forwarder去推送数据。这时候会开启一个协程。源码如下:
srs_error_t SrsEdgeForwarder::start()
{
srs_error_t err = srs_success;
// reset the error code.
send_error_code = ERROR_SUCCESS;
std::string url;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
srs_assert(conf);
// select the origin.
std::string server = lb->select(conf->args);
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(server, server, port);
// support vhost tranform for edge,
// @see https://github.com/ossrs/srs/issues/372
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
}
// open socket.
srs_freep(sdk);
srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "sdk publish");
}
srs_freep(trd);
//开启协程
trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
srs_trace("edge-fwr publish url %s", url.c_str());
return err;
}
(12)每一个协程,都必定有一个do_cycle(),这里的sdk代表的是客户端(指的是sdk edge到origin的rtmp客户端),源码如下:
srs_error_t SrsEdgeForwarder::do_cycle()
{
srs_error_t err = srs_success;
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "edge forward pull");
}
if (send_error_code != ERROR_SUCCESS) {
srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT);
continue;
}
// read from client.
if (true) {
SrsCommonMessage* msg = NULL;
//sdk代表的是客户端
err = sdk->recv_message(&msg);
srs_verbose("edge loop recv message. ret=%d", ret);
if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) {
srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str());
send_error_code = srs_error_code(err);
srs_error_reset(err);
continue;
}
srs_error_reset(err);
srs_freep(msg);
}
// forward all messages.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
//从队列里读取数据
if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) {
return srs_error_wrap(err, "queue dumps packets");
}
pprint->elapse();
// pithy print
if (pprint->can_print()) {
sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count);
}
// ignore when no messages.
if (count send_and_free_messages(msgs.msgs, count)) != srs_success) {
return srs_error_wrap(err, "send messages");
}
}
return err;
}
(13)这个函数是接收来自推流客户端到edge边缘节点的数据。也就是把数据放到队列里面去。然后把这些数据通过上面的do_cycle去读取和发送到origin。源码如下:
srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
if (send_error_code != ERROR_SUCCESS) {
return srs_error_new(send_error_code, "edge forwarder");
}
// the msg is auto free by source,
// so we just ignore, or copy then send it.
if (msg->size header.is_set_chunk_size()
|| msg->header.is_window_ackledgement_size()
|| msg->header.is_ackledgement()) {
return err;
}
SrsSharedPtrMessage copy;
if ((err = copy.create(msg)) != srs_success) {
return srs_error_wrap(err, "create message");
}
copy.stream_id = sdk->sid();
//接收数据,放入队列
if ((err = queue->enqueue(copy.copy())) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
return err;
}
(14)在入队列,这里打印断点,输入命令:
b SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
c
如下界面:
(15)查看调用栈,输入命令:
bt
如下界面:
这个调用栈的流程,就是从从推流队列里去读取数据,给SrsEdgeForwarder,最后再给对应的协程执行do_cycle去执行。
0 SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:5091
1 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:482
2 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:777
3 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:2592
4 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:936
5 SrsRtmpConn::publishing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:822
6 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:534
7 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:388
8 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:209
9 SrsConnection::cycle() at src/app/srs_app_recv_thread.cpp:198
10SrsRecvThread::cycle() at src/app/srs_app_st.cpp:198
11 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
12 _st_thread_main at sched.c:337
13 st_thread_create at sched.c:616
如果看过前面文章的朋友,应该知道,在RTMP推流时,如果是边缘节点和非边缘节点,走的流程是不一样。源码如下:
srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
// for edge, directly proxy message to origin.
//边缘节点
if (info->edge) {
if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: proxy publish");
}
return err;
}
// process audio packet
if (msg->header.is_audio()) {
if ((err = source->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume audio");
}
return err;
}
// process video packet
if (msg->header.is_video()) {
if ((err = source->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume video");
}
return err;
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((err = source->on_aggregate(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume aggregate");
}
return err;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
return srs_error_wrap(err, "rtmp: decode message");
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);
if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume metadata");
}
return err;
}
return err;
}
return err;
}
3.接下来分析下Edge模式url实现单点登录,拉流的源码。
(1)在该函数下,打印断点,输入命令:
b SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
b SrsEdgeIngester::SrsEdgeIngester()
b srs_error_t SrsEdgeIngester::do_cycle()
b srs_error_t SrsPlayEdge::on_client_play()
c
界面如下:
(2)启动拉流协程,源码如下:
srs_error_t SrsPlayEdge::on_client_play()
{
srs_error_t err = srs_success;
// start ingest when init state.
if (state == SrsEdgeStateInit) {
state = SrsEdgeStatePlay;
//启动拉流协程
err = ingester->start();
}
return err;
}
(3)运行到这里,要保证推流依然运行正常。然后再去拉流。
查看on_client_play的调用栈,输入命令:
bt
界面如下:
0 SrsPlayEdge::on_client_play() at src/app/srs_app_edge.cpp:677
1SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) at src/app/srs_app_source.cpp:2558
2 SrsRtmpConn::playing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:649
3 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:534
4 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:388
5 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:209
6 SrsConnection::cycle() at src/app/srs_app_conn.cpp:171
7 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
8 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
9_st_thread_main at sched.c:337
10 st_thread_create at sched.c:616
从调用栈的关系可以看出,这里可以看出,SrsSource::create_consumer这里需要判断是否是源站,如果有源站,那么就从源站这里去拉流,并启动on_client_play,源码如下图:
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size);
// if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) {
if (meta->data()) {
meta->data()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->vsh()) {
meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->ash()) {
meta->ash()->timestamp = srsu2ms(gop_cache->start_time());
}
}
// If stream is publishing, dumps the sequence header and gop cache.
if (hub->active()) {
// Copy metadata and sequence header to consumer.
if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
return srs_error_wrap(err, "meta dumps");
}
// copy gop cache to client.
if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "gop cache dumps");
}
}
// print status.
if (dg) {
srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
}
// for edge, when play edge stream, check the state
//如果是源站,就从源站去拉流
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
if ((err = play_edge->on_client_play()) != srs_success) {
return srs_error_wrap(err, "play edge");
}
}
return err;
}
(4)启动on_client_play(),从这里可以看出,这里只可以启动一次,源码如下:
srs_error_t SrsPlayEdge::on_client_play()
{
srs_error_t err = srs_success;
// start ingest when init state.
if (state == SrsEdgeStateInit) {
state = SrsEdgeStatePlay;
//要启动拉流origin-edge协程,一个source只会start一次
err = ingester->start();
}
return err;
}
(5)如果所有的拉流端都断开,那么需要有一个状态变更。源码如下:
void SrsPlayEdge::on_all_client_stop()
{
// when all client disconnected,
// and edge is ingesting origin stream, abort it.
if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {
SrsEdgeState pstate = state;
state = SrsEdgeStateIngestStopping;
ingester->stop();
state = SrsEdgeStateInit;
srs_trace("edge change from %d to %d then %d (init).", pstate, SrsEdgeStateIngestStopping, state);
return;
}
}
(6)查看SrsEdgeRtmpUpstream的调用栈
0 SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) at src/app/srs_app_edge.cpp:76
1 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:271
2 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:243
3 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
4 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
5_st_thread_main at sched.c:337
6 st_thread_create at sched.c:616
这里会创建一个upstream。源码如下:
srs_error_t SrsEdgeIngester::do_cycle()
{
srs_error_t err = srs_success;
std::string redirect;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "do cycle pull");
}
srs_freep(upstream);
upstream = new SrsEdgeRtmpUpstream(redirect);
if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "on source id changed");
}
if ((err = upstream->connect(req, lb)) != srs_success) {
return srs_error_wrap(err, "connect upstream");
}
if ((err = edge->on_ingest_play()) != srs_success) {
return srs_error_wrap(err, "notify edge play");
}
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
err = ingest(redirect);
// retry for rtmp 302 immediately.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
int port;
string server;
upstream->selected(server, port);
string url = req->get_stream_url();
srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str());
srs_error_reset(err);
continue;
}
if (srs_is_client_gracefully_close(err)) {
srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
}
break;
}
return err;
}
(7) 在前面的源码分析中,upstream对应的就是一个拉流客户端。如下界面:
上面的函数源码,重点关注SrsEdgeIngester::ingest,打印断点,调试,输入命令:
b SrsEdgeIngester::ingest(string& redirect)
bt
界面如下:
查看调用栈,如下界面:
(8) SysEdgeIngester::do_cycle是一个推流主循环,主要是从边缘节点推流到源站的推流主循环工作。
srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
srs_error_t err = srs_success;
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
// we only use the redict once.
// reset the redirect to empty, for maybe the origin changed.
redirect = "";
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "thread quit");
}
pprint->elapse();
// pithy print
if (pprint->can_print()) {
upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
}
// read from client.
SrsCommonMessage* msg = NULL;
//接收数据
if ((err = upstream->recv_message(&msg)) != srs_success) {
return srs_error_wrap(err, "recv message");
}
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg);
//重要的函数,推流
if ((err = process_publish_message(msg, redirect)) != srs_success) {
return srs_error_wrap(err, "process message");
}
}
return err;
}
(9)视频、音频、Metadata都是在这里处理,最终都是使用SrsSource里面对应的数据,所以这是一个重点分析的函数。
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
srs_error_t err = srs_success;
// process audio packet
if (msg->header.is_audio()) {
if ((err = source->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "source consume audio");
}
}
// process video packet
if (msg->header.is_video()) {
if ((err = source->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "source consume video");
}
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((err = source->on_aggregate(msg)) != srs_success) {
return srs_error_wrap(err, "source consume aggregate");
}
return err;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {
return srs_error_wrap(err, "decode message");
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);
if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
return srs_error_wrap(err, "source consume metadata");
}
return err;
}
return err;
}
// call messages, for example, reject, redirect.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {
return srs_error_wrap(err, "decode message");
}
SrsAutoFree(SrsPacket, pkt);
// RTMP 302 redirect
if (dynamic_cast(pkt)) {
SrsCallPacket* call = dynamic_cast(pkt);
if (!call->arguments->is_object()) {
return err;
}
SrsAmf0Any* prop = NULL;
SrsAmf0Object* evt = call->arguments->to_object();
if ((prop = evt->ensure_property_string("level")) == NULL) {
return err;
} else if (prop->to_str() != StatusLevelError) {
return err;
}
if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {
return err;
}
SrsAmf0Object* ex = prop->to_object();
// The redirect is tcUrl while redirect2 is RTMP URL.
// https://github.com/ossrs/srs/issues/1575#issuecomment-574999798
if ((prop = ex->ensure_property_string("redirect2")) == NULL) {
// TODO: FIXME: Remove it when SRS3 released, it's temporarily support for SRS3 alpha versions(a0 to a8).
if ((prop = ex->ensure_property_string("redirect")) == NULL) {
return err;
}
}
redirect = prop->to_str();
return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());
}
}
return err;
}
(10) 这里以SrsSource::on_video举例,打印断点,并查看调用栈,输入命令:
b SrsSource::on_video(SrsCommonMessage* shared_video)
bt
调用栈界面如下:
srs_error_t SrsEdgeIngester::ingest(string& redirect) at src/app/srs_app_edge.cpp:339
1 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:282
2 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:243
3 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
4 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
5_st_thread_main at sched.c:337
6 st_thread_create at sched.c:616
通过这里的函数调用关系,可以知道,从源站拉回来后,最终还是通过SrsSource去分发。
srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
{
srs_error_t err = srs_success;
// monotically increase detect.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_video->header.timestamp header.timestamp;
// drop any unknown header video.
// @see https://github.com/ossrs/srs/issues/421
if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
char b0 = 0x00;
if (shared_video->size > 0) {
b0 = shared_video->payload[0];
}
srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
return err;
}
// convert shared_video to msg, user should not use shared_video again.
// the payload is transfer to msg, and set to NULL in shared_video.
SrsSharedPtrMessage msg;
if ((err = msg.create(shared_video)) != srs_success) {
return srs_error_wrap(err, "create message");
}
// directly process the audio message.
if (!mix_correct) {
return on_video_imp(&msg);
}
// insert msg to the queue.
mix_queue->push(msg.copy());
// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop();
if (!m) {
return err;
}
// consume the monotonically increase message.
if (m->is_audio()) {
err = on_audio_imp(m);
} else {
err = on_video_imp(m);
}
srs_freep(m);
return err;
}
(11)前面讲过url实现单点登录,当有多个源站时,会选择一个正在运行的源站,通过函数调用lb->select(conf->args),体现如下:
srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
{
srs_error_t err = srs_success;
SrsRequest* req = r;
std::string url;
if (true) {
//读取配置文件源站
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
// @see https://github.com/ossrs/srs/issues/79
// when origin is error, for instance, server is shutdown,
// then user remove the vhost then reload, the conf is empty.
if (!conf) {
return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());
}
// select the origin.
//选择源站
std::string server = lb->select(conf->args);
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(server, server, port);
// override the origin info by redirect.
if (!redirect.empty()) {
int _port;
string _schema, _vhost, _app, _stream, _param, _host;
srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param);
server = _host;
port = _port;
}
// Remember the current selected server.
selected_ip = server;
selected_port = port;
// support vhost tranform for edge,
// @see https://github.com/ossrs/srs/issues/372
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
}
srs_freep(sdk);
srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());
}
return err;
}
4.总结
本篇文章重点分析了Edege模式的推拉流源码及调试过程,可以更加清楚认识Edege模式。希望能够帮助到大家。欢迎关注,转发,点赞,收藏,分享,评论区讨论。
本文到此结束,希望对大家有所帮助!