当前位置:朝夕网 » 数码科技 » SRS流媒体服务器集群之Edge模式(2)

SRS流媒体服务器集群之Edge模式(2)

(5)源站推流类SrsEdgeForwarder,源码如下:Edge模式推流源码分析(1)推流源码分析,从配置文件,开始读取。该函数的功能是从边缘节点推送到源站。(11)每一路边缘节点推流到源站,都是用forwarder,开启forward

本篇文章,主要讲解Edege模式推拉流的调试和源码分析。

1.Edege推拉流相关类介绍

(1) 从这里的源码可以看出,Edege模式的拉流和推流的管理,都是由SrsSource这个类来管理,后面的源码和函数调用,也会体现出来。

SrsSource::SrsSource()
{
    req = NULL;
    jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
    mix_correct = false;
    mix_queue = new SrsMixQueue();
    
    _can_publish = true;
    _pre_source_id = _source_id = -1;
    die_at = 0;
    //控制边缘节点的拉流 ,源站origin到边缘节点Edge
    play_edge = new SrsPlayEdge();
  //控制边缘节点的推流,边缘节点Edge到源站origin
    publish_edge = new SrsPublishEdge();
  //控制gop的cache
    gop_cache = new SrsGopCache();
  //控制源站的路由
    hub = new SrsOriginHub();
  //控制Meta的cache
    meta = new SrsMetaCache();
    
    is_monotonically_increase = false;
    last_packet_time = 0;
    
    _srs_config->subscribe(this);
    atc = false;
}

(2) SrsEdgeUpstream就是从源站里面去拉流。

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
    SrsConfDirective* conf = get_vhost(vhost);
    if (!conf) {
        return NULL;
    }
    
    conf = conf->get("cluster");
    if (!conf) {
        return NULL;
    }
    
    return conf->get("origin");
}

(3)Rtmp源站拉流类SrsEdgeRtmpUpstream,基础上面的类。源码如下:

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
    SrsConfDirective* conf = get_vhost(vhost);
    if (!conf) {
        return NULL;
    }
    
    conf = conf->get("cluster");
    if (!conf) {
        return NULL;
    }
    
    return conf->get("origin");
}

(4)拉流,播放相关:

class SrsPublishEdge
{
private:
    SrsEdgeState state;
    SrsEdgeForwarder* forwarder;
public:
    SrsPublishEdge();
    virtual ~SrsPublishEdge();
public:
    virtual void set_queue_size(srs_utime_t queue_size);
public:
    virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
    virtual bool can_publish();
    // When client publish stream on edge.
    virtual srs_error_t on_client_publish();
    // Proxy publish stream to edge
    virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);
    // Proxy unpublish stream to edge.
    virtual void on_proxy_unpublish();
};

// The edge used to ingest stream from origin.
class SrsEdgeIngester : public ISrsCoroutineHandler
{
private:
    SrsSource* source;
    SrsPlayEdge* edge;
    SrsRequest* req;
    SrsCoroutine* trd;
    SrsLbRoundRobin* lb;
    SrsEdgeUpstream* upstream;
public:
    SrsEdgeIngester();
    virtual ~SrsEdgeIngester();
public:
    virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);
    virtual srs_error_t start();
    virtual void stop();
    virtual std::string get_curr_origin();
// Interface ISrsReusableThread2Handler
public:
    virtual srs_error_t cycle();
private:
    virtual srs_error_t do_cycle();
private:
    virtual srs_error_t ingest(std::string& redirect);
    virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);
};

(5)源站推流类SrsEdgeForwarder,源码如下:

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
    SrsConfDirective* conf = get_vhost(vhost);
    if (!conf) {
        return NULL;
    }
    
    conf = conf->get("cluster");
    if (!conf) {
        return NULL;
    }
    
    return conf->get("origin");
}

(6)推流相关类:

class SrsPublishEdge
{
private:
    SrsEdgeState state;
    SrsEdgeForwarder* forwarder;
public:
    SrsPublishEdge();
    virtual ~SrsPublishEdge();
public:
    virtual void set_queue_size(srs_utime_t queue_size);
public:
    virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
    virtual bool can_publish();
    // When client publish stream on edge.
    virtual srs_error_t on_client_publish();
    // Proxy publish stream to edge
    virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);
    // Proxy unpublish stream to edge.
    virtual void on_proxy_unpublish();
};

2.Edge模式推流源码分析

(1)推流源码分析,从配置文件,开始读取。

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
    SrsConfDirective* conf = get_vhost(vhost);
    if (!conf) {
        return NULL;
    }
    
    conf = conf->get("cluster");
    if (!conf) {
        return NULL;
    }
    
    return conf->get("origin");
}

(2)前面的文章已经讲过,在Edge模式下,当推流端推流时,首先是推到源站。拉流时,如果边缘节点有缓存,就直接从边缘节点拉取,否则还是要需要从源站去拉取。当有多个源站origin时,

推流,首先推到源站。


srs_error_t SrsEdgeForwarder::start()
{
    srs_error_t err = srs_success;
    
    // reset the error code.
    send_error_code = ERROR_SUCCESS;
    
    std::string url;
    if (true) {
        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
        srs_assert(conf);
        
        // select the origin.
        std::string server = lb->select(conf->args);
        int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
        srs_parse_hostport(server, server, port);
        
        // support vhost tranform for edge,
        // @see https://github.com/ossrs/srs/issues/372
        std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
        vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
        
        url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
    }
    
    // open socket.
    srs_freep(sdk);
    srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;
    srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
    sdk = new SrsSimpleRtmpClient(url, cto, sto);
    
    if ((err = sdk->connect()) != srs_success) {
        return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
    }
    
    if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
        return srs_error_wrap(err, "sdk publish");
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());
    
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "coroutine");
    }
    srs_trace("edge-fwr publish url %s", url.c_str());
    
    return err;
}

(3)如果这个时候有一个源站断开,当再开启推流时,就会推送到另外一个源站。如果有多个源站,就是按照配置文件的配置,从左到右,这样一个顺序,去一个接一个去推。

(4)开启调试SRS,输入命令:

gdb ./objs/srs

界面如下:

(5)调试配置文件,输入命令:

set args -c conf/edge1.conf

b main

c

r

界面如下:

打印断点,输入命令:

b SrsConfig::get_vhost_edge_origin

界面如下:

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
    SrsConfDirective* conf = get_vhost(vhost);
    if (!conf) {
        return NULL;
    }
    
    conf = conf->get("cluster");
    if (!conf) {
        return NULL;
    }
    
    return conf->get("origin");
}

判断当前节点,输入命令:

b SrsConfig::get_vhost_is_edge

界面如下:

(6)判断当前节点是否为边缘节点的源码。

bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)
{
    static bool DEFAULT = false;
    
    SrsConfDirective* conf = vhost;
    if (!conf) {
        return DEFAULT;
    }
    
    conf = conf->get("cluster");
    if (!conf) {
        return DEFAULT;
    }
    
    conf = conf->get("mode");
    if (!conf || conf->arg0().empty()) {
        return DEFAULT;
    }
    
    return "remote" == conf->arg0();
}

运行起来,执行命令:

c

跑起来,如下界面:

到这里,应该要开启推流,注意这里是推流到边缘节点(这里举例以19350),如果不知道怎么推流,可以参考前面一篇文章。

(7)这个时候,就会运行到断点这里,然后输入命令:

bt

查看调用栈,如下界面:

0 SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)  at src/app/srs_app_config.cpp:5063
1 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:472
2  SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:388
3 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:209
4 SrsConnection::cycle()  at src/app/srs_app_conn.cpp:171
5 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
6 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
7 _st_thread_main at sched.c:337
8 st_thread_create at sched.c:616

继续输入调试命令:

c

最主要是看get_vhost_edge_origin,如下界面:

(8)查看调用栈,输入命令:

bt

如下界面:

0  SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:5091
1 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:482
2 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:777
3 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:2592
4 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:936
5 SrsRtmpConn::publishing(SrsSource* source)  at src/app/srs_app_rtmp_conn.cpp:822
6 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:534
7  SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:388
8 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:209
9 SrsConnection::cycle()  at src/app/srs_app_conn.cpp:171
10 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
11 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
12 _st_thread_main at sched.c:337
13  st_thread_create at sched.c:616

(9)从上面调用栈来看,最大的区别就是该函数下判断是否是边缘节点。源码如下:

srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    
    if (!source->can_publish(info->edge)) {
        return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
    }
    
    // when edge, ignore the publish event, directly proxy it.
    if (info->edge) {
      //是边缘节点
        if ((err = source->on_edge_start_publish()) != srs_success) {
            return srs_error_wrap(err, "rtmp: edge start publish");
        }
    } else {
      //不是边缘节点
        if ((err = source->on_publish()) != srs_success) {
            return srs_error_wrap(err, "rtmp: source publish");
        }
    }
    
    return err;
}

(10)看看边缘节点的分支,源码如下:

srs_error_t SrsSource::on_edge_start_publish()
{
  //推流到源站
    return publish_edge->on_client_publish();
}

调用SrsPublishEdge::on_client_publish()。该函数的功能是从边缘节点推送到源站。从源码可以看出是调用forwarder去推送。

srs_error_t SrsPublishEdge::on_client_publish()
{
    srs_error_t err = srs_success;
    
    // error when not init state.
    if (state != SrsEdgeStateInit) {
        return srs_error_new(ERROR_RTMP_EDGE_PUBLISH_STATE, "invalid state");
    }
    
    // @see https://github.com/ossrs/srs/issues/180
    // to avoid multiple publish the same stream on the same edge,
    // directly enter the publish stage.
    if (true) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStatePublish;
        srs_trace("edge change from %d to state %d (push).", pstate, state);
    }
    
    // start to forward stream to origin.
    err = forwarder->start();
    
    // @see https://github.com/ossrs/srs/issues/180
    // when failed, revert to init
    if (err != srs_success) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStateInit;
        srs_trace("edge revert from %d to state %d (push), error %s", pstate, state, srs_error_desc(err).c_str());
    }
    
    return err;
}

(11)每一路边缘节点推流到源站,都是用forwarder,开启forwarder去推送数据。这时候会开启一个协程。源码如下:

srs_error_t SrsEdgeForwarder::start()
{
    srs_error_t err = srs_success;
    
    // reset the error code.
    send_error_code = ERROR_SUCCESS;
    
    std::string url;
    if (true) {
        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
        srs_assert(conf);
        
        // select the origin.
        std::string server = lb->select(conf->args);
        int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
        srs_parse_hostport(server, server, port);
        
        // support vhost tranform for edge,
        // @see https://github.com/ossrs/srs/issues/372
        std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
        vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
        
        url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
    }
    
    // open socket.
    srs_freep(sdk);
    srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;
    srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
    sdk = new SrsSimpleRtmpClient(url, cto, sto);
    
    if ((err = sdk->connect()) != srs_success) {
        return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
    }
    
    if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
        return srs_error_wrap(err, "sdk publish");
    }
    
    srs_freep(trd);
  //开启协程
    trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());
    
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "coroutine");
    }
    srs_trace("edge-fwr publish url %s", url.c_str());
    
    return err;
}

(12)每一个协程,都必定有一个do_cycle(),这里的sdk代表的是客户端(指的是sdk edge到origin的rtmp客户端),源码如下:

srs_error_t SrsEdgeForwarder::do_cycle()
{
    srs_error_t err = srs_success;
    
    sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);
    
    SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
    SrsAutoFree(SrsPithyPrint, pprint);
    
    SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "edge forward pull");
        }
        
        if (send_error_code != ERROR_SUCCESS) {
            srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT);
            continue;
        }
        
        // read from client.
        if (true) {
            SrsCommonMessage* msg = NULL;
          //sdk代表的是客户端
            err = sdk->recv_message(&msg);
            
            srs_verbose("edge loop recv message. ret=%d", ret);
            if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) {
                srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str());
                send_error_code = srs_error_code(err);
                srs_error_reset(err);
                continue;
            }
            
            srs_error_reset(err);
            srs_freep(msg);
        }
        
        // forward all messages.
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        int count = 0;
      //从队列里读取数据
        if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) {
            return srs_error_wrap(err, "queue dumps packets");
        }
        
        pprint->elapse();
        
        // pithy print
        if (pprint->can_print()) {
            sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count);
        }
        
        // ignore when no messages.
        if (count send_and_free_messages(msgs.msgs, count)) != srs_success) {
            return srs_error_wrap(err, "send messages");
        }
    }
    
    return err;
}

(13)这个函数是接收来自推流客户端到edge边缘节点的数据。也就是把数据放到队列里面去。然后把这些数据通过上面的do_cycle去读取和发送到origin。源码如下:

srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
{
    srs_error_t err = srs_success;
    
    if (send_error_code != ERROR_SUCCESS) {
        return srs_error_new(send_error_code, "edge forwarder");
    }
    
    // the msg is auto free by source,
    // so we just ignore, or copy then send it.
    if (msg->size header.is_set_chunk_size()
        || msg->header.is_window_ackledgement_size()
        || msg->header.is_ackledgement()) {
        return err;
    }
    
    SrsSharedPtrMessage copy;
    if ((err = copy.create(msg)) != srs_success) {
        return srs_error_wrap(err, "create message");
    }
    
    copy.stream_id = sdk->sid();
  //接收数据,放入队列
    if ((err = queue->enqueue(copy.copy())) != srs_success) {
        return srs_error_wrap(err, "enqueue message");
    }
    
    return err;
}

(14)在入队列,这里打印断点,输入命令:

b SrsEdgeForwarder::proxy(SrsCommonMessage* msg)

c

如下界面:

(15)查看调用栈,输入命令:

bt

如下界面:

这个调用栈的流程,就是从从推流队列里去读取数据,给SrsEdgeForwarder,最后再给对应的协程执行do_cycle去执行。

0  SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:5091
1 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:482
2 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:777
3 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:2592
4 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:936
5 SrsRtmpConn::publishing(SrsSource* source)  at src/app/srs_app_rtmp_conn.cpp:822
6 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:534
7  SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:388
8 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:209
9 SrsConnection::cycle()  at src/app/srs_app_recv_thread.cpp:198
10SrsRecvThread::cycle()  at src/app/srs_app_st.cpp:198
11 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
12 _st_thread_main at sched.c:337
13  st_thread_create at sched.c:616

如果看过前面文章的朋友,应该知道,在RTMP推流时,如果是边缘节点和非边缘节点,走的流程是不一样。源码如下:

srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{
    srs_error_t err = srs_success;
    
    // for edge, directly proxy message to origin.
  //边缘节点
    if (info->edge) {
        if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: proxy publish");
        }
        return err;
    }
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume audio");
        }
        return err;
    }
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume video");
        }
        return err;
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((err = source->on_aggregate(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume aggregate");
        }
        return err;
    }
    
    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
            return srs_error_wrap(err, "rtmp: decode message");
        }
        SrsAutoFree(SrsPacket, pkt);
        
        if (dynamic_cast(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);
            if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
                return srs_error_wrap(err, "rtmp: consume metadata");
            }
            return err;
        }
        return err;
    }
    
    return err;
}

3.接下来分析下Edge模式url实现单点登录,拉流的源码。

(1)在该函数下,打印断点,输入命令:

b SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)

b SrsEdgeIngester::SrsEdgeIngester()

b srs_error_t SrsEdgeIngester::do_cycle()

b srs_error_t SrsPlayEdge::on_client_play()

c

界面如下:

(2)启动拉流协程,源码如下:

srs_error_t SrsPlayEdge::on_client_play()
{
    srs_error_t err = srs_success;
    
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
      //启动拉流协程
        err = ingester->start();
    }
    
    return err;
}

(3)运行到这里,要保证推流依然运行正常。然后再去拉流。

查看on_client_play的调用栈,输入命令:

bt

界面如下:


0 SrsPlayEdge::on_client_play() at src/app/srs_app_edge.cpp:677
1SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) at src/app/srs_app_source.cpp:2558
2 SrsRtmpConn::playing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:649
3 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:534
4 SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:388
5 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:209
6 SrsConnection::cycle()  at src/app/srs_app_conn.cpp:171
7 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
8 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
9_st_thread_main at sched.c:337
10  st_thread_create at sched.c:616

从调用栈的关系可以看出,这里可以看出,SrsSource::create_consumer这里需要判断是否是源站,如果有源站,那么就从源站这里去拉流,并启动on_client_play,源码如下图:

srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
{
    srs_error_t err = srs_success;
    
    consumer = new SrsConsumer(this, conn);
    consumers.push_back(consumer);
    srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
    consumer->set_queue_size(queue_size);
    // if atc, update the sequence header to gop cache time.
    if (atc && !gop_cache->empty()) {
        if (meta->data()) {
            meta->data()->timestamp = srsu2ms(gop_cache->start_time());
        }
        if (meta->vsh()) {
            meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());
        }
        if (meta->ash()) {
            meta->ash()->timestamp = srsu2ms(gop_cache->start_time());
        }
    }
    // If stream is publishing, dumps the sequence header and gop cache.
    if (hub->active()) {
        // Copy metadata and sequence header to consumer.
        if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
            return srs_error_wrap(err, "meta dumps");
        }
        // copy gop cache to client.
        if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {
            return srs_error_wrap(err, "gop cache dumps");
        }
    }
    // print status.
    if (dg) {
        srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);
    } else {
        srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
    }
    
    // for edge, when play edge stream, check the state
  //如果是源站,就从源站去拉流
    if (_srs_config->get_vhost_is_edge(req->vhost)) {
        // notice edge to start for the first client.
        if ((err = play_edge->on_client_play()) != srs_success) {
            return srs_error_wrap(err, "play edge");
        }
    }
    
    return err;
}

(4)启动on_client_play(),从这里可以看出,这里只可以启动一次,源码如下:

srs_error_t SrsPlayEdge::on_client_play()
{
    srs_error_t err = srs_success;
    
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
      //要启动拉流origin-edge协程,一个source只会start一次
        err = ingester->start();
    }
    
    return err;
}

(5)如果所有的拉流端都断开,那么需要有一个状态变更。源码如下:

void SrsPlayEdge::on_all_client_stop()
{
    // when all client disconnected,
    // and edge is ingesting origin stream, abort it.
    if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStateIngestStopping;
        ingester->stop();
        state = SrsEdgeStateInit;
        srs_trace("edge change from %d to %d then %d (init).", pstate, SrsEdgeStateIngestStopping, state);
        
        return;
    }
}

(6)查看SrsEdgeRtmpUpstream的调用栈

0 SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) at src/app/srs_app_edge.cpp:76
1 SrsEdgeIngester::do_cycle()   at src/app/srs_app_edge.cpp:271
2 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:243
3 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
4 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
5_st_thread_main at sched.c:337
6  st_thread_create at sched.c:616

这里会创建一个upstream。源码如下:


srs_error_t SrsEdgeIngester::do_cycle()
{
    srs_error_t err = srs_success;
    std::string redirect;
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "do cycle pull");
        }
        
        srs_freep(upstream);
        upstream = new SrsEdgeRtmpUpstream(redirect);
        
        if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
            return srs_error_wrap(err, "on source id changed");
        }
        
        if ((err = upstream->connect(req, lb)) != srs_success) {
            return srs_error_wrap(err, "connect upstream");
        }
        
        if ((err = edge->on_ingest_play()) != srs_success) {
            return srs_error_wrap(err, "notify edge play");
        }
        // set to larger timeout to read av data from origin.
        upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
        
        err = ingest(redirect);
        
        // retry for rtmp 302 immediately.
        if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
            int port;
            string server;
            upstream->selected(server, port);
            string url = req->get_stream_url();
            srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str());
            srs_error_reset(err);
            continue;
        }
        
        if (srs_is_client_gracefully_close(err)) {
            srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());
            srs_error_reset(err);
        }
        break;
    }
    
    return err;
}

(7) 在前面的源码分析中,upstream对应的就是一个拉流客户端。如下界面:

上面的函数源码,重点关注SrsEdgeIngester::ingest,打印断点,调试,输入命令:

b SrsEdgeIngester::ingest(string& redirect)

bt

界面如下:

查看调用栈,如下界面:

(8) SysEdgeIngester::do_cycle是一个推流主循环,主要是从边缘节点推流到源站的推流主循环工作。

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
    srs_error_t err = srs_success;
    
    SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
    SrsAutoFree(SrsPithyPrint, pprint);
    // we only use the redict once.
    // reset the redirect to empty, for maybe the origin changed.
    redirect = "";
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "thread quit");
        }
        
        pprint->elapse();
        
        // pithy print
        if (pprint->can_print()) {
            upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
        }
        
        // read from client.
        SrsCommonMessage* msg = NULL;
      //接收数据
        if ((err = upstream->recv_message(&msg)) != srs_success) {
            return srs_error_wrap(err, "recv message");
        }
        
        srs_assert(msg);
        SrsAutoFree(SrsCommonMessage, msg);
        //重要的函数,推流
        if ((err = process_publish_message(msg, redirect)) != srs_success) {
            return srs_error_wrap(err, "process message");
        }
    }
    
    return err;
}

(9)视频、音频、Metadata都是在这里处理,最终都是使用SrsSource里面对应的数据,所以这是一个重点分析的函数。

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
    srs_error_t err = srs_success;
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume audio");
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume video");
        }
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((err = source->on_aggregate(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume aggregate");
        }
        return err;
    }
    
    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {
            return srs_error_wrap(err, "decode message");
        }
        SrsAutoFree(SrsPacket, pkt);
        
        if (dynamic_cast(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);
            if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
                return srs_error_wrap(err, "source consume metadata");
            }
            return err;
        }
        
        return err;
    }
    
    // call messages, for example, reject, redirect.
    if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
        SrsPacket* pkt = NULL;
        if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {
            return srs_error_wrap(err, "decode message");
        }
        SrsAutoFree(SrsPacket, pkt);
        
        // RTMP 302 redirect
        if (dynamic_cast(pkt)) {
            SrsCallPacket* call = dynamic_cast(pkt);
            if (!call->arguments->is_object()) {
                return err;
            }
            
            SrsAmf0Any* prop = NULL;
            SrsAmf0Object* evt = call->arguments->to_object();
            
            if ((prop = evt->ensure_property_string("level")) == NULL) {
                return err;
            } else if (prop->to_str() != StatusLevelError) {
                return err;
            }
            
            if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {
                return err;
            }
            SrsAmf0Object* ex = prop->to_object();
            // The redirect is tcUrl while redirect2 is RTMP URL.
            // https://github.com/ossrs/srs/issues/1575#issuecomment-574999798
            if ((prop = ex->ensure_property_string("redirect2")) == NULL) {
                // TODO: FIXME: Remove it when SRS3 released, it's temporarily support for SRS3 alpha versions(a0 to a8).
                if ((prop = ex->ensure_property_string("redirect")) == NULL) {
                    return err;
                }
            }
            redirect = prop->to_str();
            
            return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());
        }
    }
    
    return err;
}

(10) 这里以SrsSource::on_video举例,打印断点,并查看调用栈,输入命令:

b SrsSource::on_video(SrsCommonMessage* shared_video)

bt

调用栈界面如下:

srs_error_t SrsEdgeIngester::ingest(string& redirect) at src/app/srs_app_edge.cpp:339
1 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:282
2 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:243
3 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:198
4 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:213
5_st_thread_main at sched.c:337
6  st_thread_create at sched.c:616

通过这里的函数调用关系,可以知道,从源站拉回来后,最终还是通过SrsSource去分发。

srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
{
    srs_error_t err = srs_success;
    
    // monotically increase detect.
    if (!mix_correct && is_monotonically_increase) {
        if (last_packet_time > 0 && shared_video->header.timestamp header.timestamp;
    
    // drop any unknown header video.
    // @see https://github.com/ossrs/srs/issues/421
    if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
        char b0 = 0x00;
        if (shared_video->size > 0) {
            b0 = shared_video->payload[0];
        }
        
        srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
        return err;
    }
    
    // convert shared_video to msg, user should not use shared_video again.
    // the payload is transfer to msg, and set to NULL in shared_video.
    SrsSharedPtrMessage msg;
    if ((err = msg.create(shared_video)) != srs_success) {
        return srs_error_wrap(err, "create message");
    }
    
    // directly process the audio message.
    if (!mix_correct) {
        return on_video_imp(&msg);
    }
    
    // insert msg to the queue.
    mix_queue->push(msg.copy());
    
    // fetch someone from mix queue.
    SrsSharedPtrMessage* m = mix_queue->pop();
    if (!m) {
        return err;
    }
    
    // consume the monotonically increase message.
    if (m->is_audio()) {
        err = on_audio_imp(m);
    } else {
        err = on_video_imp(m);
    }
    srs_freep(m);
    
    return err;
}

(11)前面讲过url实现单点登录,当有多个源站时,会选择一个正在运行的源站,通过函数调用lb->select(conf->args),体现如下:

srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = r;
    
    std::string url;
    if (true) {
      //读取配置文件源站
        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
        
        // @see https://github.com/ossrs/srs/issues/79
        // when origin is error, for instance, server is shutdown,
        // then user remove the vhost then reload, the conf is empty.
        if (!conf) {
            return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());
        }
        
        // select the origin.
      //选择源站
        std::string server = lb->select(conf->args);
        int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
        srs_parse_hostport(server, server, port);
        
        // override the origin info by redirect.
        if (!redirect.empty()) {
            int _port;
            string _schema, _vhost, _app, _stream, _param, _host;
            srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param);
            
            server = _host;
            port = _port;
        }
        // Remember the current selected server.
        selected_ip = server;
        selected_port = port;
        
        // support vhost tranform for edge,
        // @see https://github.com/ossrs/srs/issues/372
        std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
        vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
        
        url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
    }
    
    srs_freep(sdk);
    srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;
    srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
    sdk = new SrsSimpleRtmpClient(url, cto, sto);
    
    if ((err = sdk->connect()) != srs_success) {
        return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
    }
    
    if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
        return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());
    }
    
    return err;
}

4.总结

本篇文章重点分析了Edege模式的推拉流源码及调试过程,可以更加清楚认识Edege模式。希望能够帮助到大家。欢迎关注,转发,点赞,收藏,分享,评论区讨论。

本文到此结束,希望对大家有所帮助!

免责声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如有侵权行为,请第一时间联系我们修改或删除,多谢。朝夕网 » SRS流媒体服务器集群之Edge模式(2)