国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之ReactorThread事件循環(下)

Maxiye / 3175人閱讀

摘要:之后如果仍然有剩余未發送的數據,那么就如果已經沒有剩余數據了,繼續去取下一個數據包。拿到后,要用函數轉化為相應的類型即可得到包長值。

swPort_onRead_check_eof EOF 自動分包

我們前面說過,swPort_onRead_raw 是最簡單的向 worker 進程發送數據包的方法,swoole 會將從客戶端接受到的數據包,立刻發送給 worker 進程,用戶自己把數據包拼接起來

如果啟用了 EOF 自動分包,那么 swoole 會檢測 EOF 符號,拼接完畢數據之后再向 worker 發送數據

swProtocol_recv_check_eof 用于檢測 EOF 符號,如果沒有檢測到數據就存儲到 buffer

static int swPort_onRead_check_eof(swReactor *reactor, swListenPort *port, swEvent *event)
{
    swConnection *conn = event->socket;
    swProtocol *protocol = &port->protocol;
    swServer *serv = reactor->ptr;

    swString *buffer = swServer_get_buffer(serv, event->fd);
    if (!buffer)
    {
        return SW_ERR;
    }

    if (swProtocol_recv_check_eof(protocol, conn, buffer) < 0)
    {
        swReactorThread_onClose(reactor, event);
    }

    return SW_OK;
}

static sw_inline swString *swServer_get_buffer(swServer *serv, int fd)
{
    swString *buffer = serv->connection_list[fd].recv_buffer;
    if (buffer == NULL)
    {
        buffer = swString_new(SW_BUFFER_SIZE_STD);
        //alloc memory failed.
        if (!buffer)
        {
            return NULL;
        }
        serv->connection_list[fd].recv_buffer = buffer;
    }
    return buffer;
}
swProtocol_recv_check_eof 檢測 EOF

首先需要調用 swConnection_recv 函數接受客戶端發來的數據,如果發生錯誤返回 SW_OK,等待 socket 讀就緒重新讀取;如果錯誤是 SW_CLOSE,那么就要返回 SW_ERR,然后讓 swPort_onRead_check_eof 函數調用 swReactorThread_onClose 函數。

EOF 自動分包也有兩種方式,分別是 open_eof_checkopen_eof_splitopen_eof_check 只檢查接收數據的末尾是否為 EOF,因此它的性能最好,幾乎沒有消耗,但是無法解決多個數據包合并的問題,比如同時發送兩條帶有 EOF 的數據,底層可能會一次全部返回;open_eof_split 會從左到右對數據進行逐字節對比,查找數據中的 EOF 進行分包,性能較差。但是每次只會返回一個數據包

如果采用 open_eof_check,那么只需要簡單的 memcmp 對比數據包的最后字符即可,如果符合條件就會調用 protocol->onPackage 函數,也就是 swReactorThread_dispatch

如果采用的是 open_eof_split 就會比較麻煩,需要調用 swProtocol_split_package_by_eof 逐個去找 EOF

如果超過了 protocol->package_max_length 大小,那么說明一直沒有發送成功,就會返回錯誤,結束當前連接

如果緩沖區不足,那么就將緩沖區擴容到 protocol->package_max_length,繼續接受數據

int swProtocol_recv_check_eof(swProtocol *protocol, swConnection *conn, swString *buffer)
{
    int recv_again = SW_FALSE;
    int buf_size;

    recv_data: buf_size = buffer->size - buffer->length;
    char *buf_ptr = buffer->str + buffer->length;

    if (buf_size > SW_BUFFER_SIZE_STD)
    {
        buf_size = SW_BUFFER_SIZE_STD;
    }

    int n = swConnection_recv(conn, buf_ptr, buf_size, 0);
    if (n < 0)
    {
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swSysError("recv from socket#%d failed.", conn->fd);
            return SW_OK;
        case SW_CLOSE:
            conn->close_errno = errno;
            return SW_ERR;
        default:
            return SW_OK;
        }
    }
    else if (n == 0)
    {
        return SW_ERR;
    }
    else
    {
        buffer->length += n;

        if (buffer->length < protocol->package_eof_len)
        {
            return SW_OK;
        }

        if (protocol->split_by_eof)
        {
            if (swProtocol_split_package_by_eof(protocol, conn, buffer) == 0)
            {
                return SW_OK;
            }
            else
            {
                recv_again = SW_TRUE;
            }
        }
        else if (memcmp(buffer->str + buffer->length - protocol->package_eof_len, protocol->package_eof, protocol->package_eof_len) == 0)
        {
            if (protocol->onPackage(conn, buffer->str, buffer->length) < 0)
            {
                return SW_ERR;
            }
            if (conn->removed)
            {
                return SW_OK;
            }
            swString_clear(buffer);
            return SW_OK;
        }

        //over max length, will discard
        if (buffer->length == protocol->package_max_length)
        {
            swWarn("Package is too big. package_length=%d", (int )buffer->length);
            return SW_ERR;
        }

        //buffer is full, may have not read data
        if (buffer->length == buffer->size)
        {
            recv_again = SW_TRUE;
            if (buffer->size < protocol->package_max_length)
            {
                uint32_t extend_size = swoole_size_align(buffer->size * 2, SwooleG.pagesize);
                if (extend_size > protocol->package_max_length)
                {
                    extend_size = protocol->package_max_length;
                }
                if (swString_extend(buffer, extend_size) < 0)
                {
                    return SW_ERR;
                }
            }
        }
        //no eof
        if (recv_again)
        {
            goto recv_data;
        }
    }
    return SW_OK;
}
swProtocol_split_package_by_eof 尋找 EOF

如果當前緩存中數據連 package_eof_len 也就是 EOF 的長度都不夠,那么就直接返回,繼續接受數據

根據 package_eof 來查找第一個 EOF 的位置,如果沒有找到 EOF,那么遞增 buffer->offset,返回繼續接受數據

找到了 EOF 之后,就要調用 protocol->onPackage 函數,發送給 worker 進程

接著就要從剩余的數據里面循環不斷尋找 EOF,調用 protocol->onPackage 函數

static sw_inline int swProtocol_split_package_by_eof(swProtocol *protocol, swConnection *conn, swString *buffer)
{
#if SW_LOG_TRACE_OPEN > 0
    static int count;
    count++;
#endif

    int eof_pos;
    if (buffer->length - buffer->offset < protocol->package_eof_len)
    {
        eof_pos = -1;
    }
    else
    {
        eof_pos = swoole_strnpos(buffer->str + buffer->offset, buffer->length - buffer->offset, protocol->package_eof, protocol->package_eof_len);
    }

    swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[0] count=%d, length=%ld, size=%ld, offset=%ld.", count, buffer->length, buffer->size, (long)buffer->offset);

    //waiting for more data
    if (eof_pos < 0)
    {
        buffer->offset = buffer->length - protocol->package_eof_len;
        return buffer->length;
    }

    uint32_t length = buffer->offset + eof_pos + protocol->package_eof_len;
    swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[4] count=%d, length=%d", count, length);
    if (protocol->onPackage(conn, buffer->str, length) < 0)
    {
        return SW_ERR;
    }
    if (conn->removed)
    {
        return SW_OK;
    }

    //there are remaining data
    if (length < buffer->length)
    {
        uint32_t remaining_length = buffer->length - length;
        char *remaining_data = buffer->str + length;
        swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[5] count=%d, remaining_length=%d", count, remaining_length);

        while (1)
        {
            if (remaining_length < protocol->package_eof_len)
            {
                goto wait_more_data;
            }
            eof_pos = swoole_strnpos(remaining_data, remaining_length, protocol->package_eof, protocol->package_eof_len);
            if (eof_pos < 0)
            {
                wait_more_data:
                swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[1] count=%d, remaining_length=%d, length=%d", count, remaining_length, length);
                memmove(buffer->str, remaining_data, remaining_length);
                buffer->length = remaining_length;
                buffer->offset = 0;
                return SW_OK;
            }
            else
            {
                length = eof_pos + protocol->package_eof_len;
                if (protocol->onPackage(conn, remaining_data, length) < 0)
                {
                    return SW_ERR;
                }
                if (conn->removed)
                {
                    return SW_OK;
                }
                swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[2] count=%d, remaining_length=%d, length=%d", count, remaining_length, length);
                remaining_data += length;
                remaining_length -= length;
            }
        }
    }
    swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[3] length=%ld, size=%ld, offset=%ld", buffer->length, buffer->size, (long)buffer->offset);
    swString_clear(buffer);
    return SW_OK;
}
swPort_onRead_check_length 包長檢測

類似地本函數也是調用 swProtocol_recv_check_length 來進行包長檢測

static int swPort_onRead_check_length(swReactor *reactor, swListenPort *port, swEvent *event)
{
    swServer *serv = reactor->ptr;
    swConnection *conn = event->socket;
    swProtocol *protocol = &port->protocol;

    swString *buffer = swServer_get_buffer(serv, event->fd);
    if (!buffer)
    {
        return SW_ERR;
    }

    if (swProtocol_recv_check_length(protocol, conn, buffer) < 0)
    {
        swTrace("Close Event.FD=%d|From=%d", event->fd, event->from_id);
        swReactorThread_onClose(reactor, event);
    }

    return SW_OK;
}
swProtocol_recv_check_length 函數

進行包長檢測的時候,每次讀取數據之前都要先讀取 header,從 header 中獲取到數據包的大小后,再去讀取真正的數據

當我們不知道包長大小的時候,buffer->offset 為 0,此時需要讀取 length 大小,但是這個數據位于 headerprotocol->package_length_offset 位置,假設 length 位于 header 的第 8 個字節;length 自身數據大小為 protocol->package_length_size,例如 int_32 類型,這個值就是 4,因此我們需要先讀取 12 個字節,這 12 個字節的最后 4 個字節就是 length 的值,也就是包長。

將數據拿到后(此時 recv_wait 為 0),調用 protocol->get_package_length 就可以獲取 length 的值,根據 buffer->offset 的值為包長值,

如果此時 buffer->length 已接收的數據大于這個包長,那么就調用 onPackage 發送給 worker 進程

如果此時已接收的數據不足,那么 recv_size 就是剩余需要接受的數據大小,此時 recv_wait 為 1,繼續接受數據

如果接受到的數據已經大于包長,那么就調用 onPackage 發送。之后如果仍然有剩余未發送的數據,那么就 do_get_length;如果已經沒有剩余數據了,繼續去取下一個數據包。

如果數據還是不夠,那么就返回,等待讀就緒事件

int swProtocol_recv_check_length(swProtocol *protocol, swConnection *conn, swString *buffer)
{
    int package_length;
    uint32_t recv_size;
    char swap[SW_BUFFER_SIZE_STD];

    if (conn->skip_recv)
    {
        conn->skip_recv = 0;
        goto do_get_length;
    }

    do_recv:
    if (conn->active == 0)
    {
        return SW_OK;
    }
    if (buffer->offset > 0)
    {
        recv_size = buffer->offset - buffer->length;
    }
    else
    {
        recv_size = protocol->package_length_offset + protocol->package_length_size;
    }

    int n = swConnection_recv(conn, buffer->str + buffer->length, recv_size, 0);
    if (n < 0)
    {
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swSysError("recv(%d, %d) failed.", conn->fd, recv_size);
            return SW_OK;
        case SW_CLOSE:
            conn->close_errno = errno;
            return SW_ERR;
        default:
            return SW_OK;
        }
    }
    else if (n == 0)
    {
        return SW_ERR;
    }
    else
    {
        buffer->length += n;

        if (conn->recv_wait)
        {
            if (buffer->length >= buffer->offset)
            {
                do_dispatch:
                if (protocol->onPackage(conn, buffer->str, buffer->offset) < 0)
                {
                    return SW_ERR;
                }
                if (conn->removed)
                {
                    return SW_OK;
                }
                conn->recv_wait = 0;

                int remaining_length = buffer->length - buffer->offset;
                if (remaining_length > 0)
                {
                    assert(remaining_length < sizeof(swap));
                    memcpy(swap, buffer->str + buffer->offset, remaining_length);
                    memcpy(buffer->str, swap, remaining_length);
                    buffer->offset = 0;
                    buffer->length = remaining_length;
                    goto do_get_length;
                }
                else
                {
                    swString_clear(buffer);
                    goto do_recv;
                }
            }
            else
            {
                return SW_OK;
            }
        }
        else
        {
            do_get_length: package_length = protocol->get_package_length(protocol, conn, buffer->str, buffer->length);
            //invalid package, close connection.
            if (package_length < 0)
            {
                return SW_ERR;
            }
            //no length
            else if (package_length == 0)
            {
                return SW_OK;
            }
            else if (package_length > protocol->package_max_length)
            {
                swWarn("package is too big, remote_addr=%s:%d, length=%d.", swConnection_get_ip(conn), swConnection_get_port(conn), package_length);
                return SW_ERR;
            }
            //get length success
            else
            {
                if (buffer->size < package_length)
                {
                    if (swString_extend(buffer, package_length) < 0)
                    {
                        return SW_ERR;
                    }
                }
                conn->recv_wait = 1;
                buffer->offset = package_length;

                if (buffer->length >= package_length)
                {
                    goto do_dispatch;
                }
                else
                {
                    goto do_recv;
                }
            }
        }
    }
    return SW_OK;
}
swProtocol_get_package_length 獲取包長

本函數邏輯很簡單,如果長度連 length 都不夠,那么包長信息并不在 data 中,直接返回繼續接受數據。拿到 length 后,要用 swoole_unpack 函數轉化為相應的類型即可得到包長值。

int swProtocol_get_package_length(swProtocol *protocol, swConnection *conn, char *data, uint32_t size)
{
    uint16_t length_offset = protocol->package_length_offset;
    int32_t body_length;
    /**
     * no have length field, wait more data
     */
    if (size < length_offset + protocol->package_length_size)
    {
        return 0;
    }
    body_length = swoole_unpack(protocol->package_length_type, data + length_offset);
    //Length error
    //Protocol length is not legitimate, out of bounds or exceed the allocated length
    if (body_length < 0)
    {
        swWarn("invalid package, remote_addr=%s:%d, length=%d, size=%d.", swConnection_get_ip(conn), swConnection_get_port(conn), body_length, size);
        return SW_ERR;
    }
    //total package length
    return protocol->package_body_offset + body_length;
}

static sw_inline int32_t swoole_unpack(char type, void *data)
{
    switch(type)
    {
    /*-------------------------16bit-----------------------------*/
    case "c":
        return *((int8_t *) data);
    case "C":
        return *((uint8_t *) data);
    /*-------------------------16bit-----------------------------*/
    /**
     * signed short (always 16 bit, machine byte order)
     */
    case "s":
        return *((int16_t *) data);
    /**
     * unsigned short (always 16 bit, machine byte order)
     */
    case "S":
        return *((uint16_t *) data);
    /**
     * unsigned short (always 16 bit, big endian byte order)
     */
    case "n":
        return ntohs(*((uint16_t *) data));
    /**
     * unsigned short (always 32 bit, little endian byte order)
     */
    case "v":
        return swoole_swap_endian16(ntohs(*((uint16_t *) data)));

    /*-------------------------32bit-----------------------------*/
    /**
     * unsigned long (always 32 bit, machine byte order)
     */
    case "L":
        return *((uint32_t *) data);
    /**
     * signed long (always 32 bit, machine byte order)
     */
    case "l":
        return *((int *) data);
    /**
     * unsigned long (always 32 bit, big endian byte order)
     */
    case "N":
        return ntohl(*((uint32_t *) data));
    /**
     * unsigned short (always 32 bit, little endian byte order)
     */
    case "V":
        return swoole_swap_endian32(ntohl(*((uint32_t *) data)));

    default:
        return *((uint32_t *) data);
    }
}
swReactorThread_onPipeWrite 寫事件回調

reactor 線程檢測到相對應的 worker 進程的 pipe_master 寫就緒的時候,就會調用 swReactorThread_onPipeWrite

in_buffer 不是空的話,就會循環拿出單鏈表的數據,調用 swServer_connection_verify 驗證 session_id 是否正確,然后調用 write 發送數據

當返回的錯誤是 EAGAIN 的時候,說明 socket 已經不可用,返回等待下一次寫就緒即可

值得注意的是 write 的返回結果不需要關心到底寫入了多少,因為對于 linux 來說,pipe 可以保證 write 小于 PIPE_BUF 大小數據的原子性,不是全部寫入成功,就是寫入失敗,不會出現寫入部分數據的可能。

當所有的數據都發送成功后,取消寫就緒監控,防止重復浪費調用

static int swReactorThread_onPipeWrite(swReactor *reactor, swEvent *ev)
{
    int ret;

    swBuffer_trunk *trunk = NULL;
    swEventData *send_data;
    swConnection *conn;
    swServer *serv = reactor->ptr;
    swBuffer *buffer = serv->connection_list[ev->fd].in_buffer;
    swLock *lock = serv->connection_list[ev->fd].object;

    //lock thread
    lock->lock(lock);

    while (!swBuffer_empty(buffer))
    {
        trunk = swBuffer_get_trunk(buffer);
        send_data = trunk->store.ptr;

        //server active close, discard data.
        if (swEventData_is_stream(send_data->info.type))
        {
            //send_data->info.fd is session_id
            conn = swServer_connection_verify(serv, send_data->info.fd);
            if (conn == NULL || conn->closed)
            {
#ifdef SW_USE_RINGBUFFER
                swReactorThread *thread = swServer_get_thread(SwooleG.serv, SwooleTG.id);
                swPackage package;
                memcpy(&package, send_data->data, sizeof(package));
                thread->buffer_input->free(thread->buffer_input, package.data);
#endif
                if (conn && conn->closed)
                {
                    swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSED_BY_SERVER, "Session#%d is closed by server.", send_data->info.fd);
                }
                swBuffer_pop_trunk(buffer, trunk);
                continue;
            }
        }

        ret = write(ev->fd, trunk->store.ptr, trunk->length);
        if (ret < 0)
        {
            //release lock
            lock->unlock(lock);
#ifdef HAVE_KQUEUE
            return (errno == EAGAIN || errno == ENOBUFS) ? SW_OK : SW_ERR;
#else
            return errno == EAGAIN ? SW_OK : SW_ERR;
#endif
        }
        else
        {
            swBuffer_pop_trunk(buffer, trunk);
        }
    }

    //remove EPOLLOUT event
    if (swBuffer_empty(buffer))
    {
        if (SwooleG.serv->connection_list[ev->fd].from_id == SwooleTG.id)
        {
            ret = reactor->set(reactor, ev->fd, SW_FD_PIPE | SW_EVENT_READ);
        }
        else
        {
            ret = reactor->del(reactor, ev->fd);
        }
        if (ret < 0)
        {
            swSysError("reactor->set(%d) failed.", ev->fd);
        }
    }

    //release lock
    lock->unlock(lock);

    return SW_OK;
}
swReactorThread_onPipeReceive 讀事件就緒

worker 進程返回的數據有三種:SW_RESPONSE_SMALL(少量數據)、SW_RESPONSE_SHM(大數據包存儲在共享內存中)、SW_RESPONSE_TMPFILE(臨時文件)

需要將從 worker 接受到的 swEventData 對象轉化為 swSendData

對于大數據包,worker 并不會將數據通過 socket 來傳遞,而是將 work_id 發送過來,數據存放在 worker->send_shm

如果是臨時文件,worker 發送過來的數據是臨時文件的名字,需要調用 swTaskWorker_large_unpack 將文件內容讀取到 SwooleTG.buffer_stack 中去

swReactorThread_send 函數用于向客戶端發送數據

typedef struct _swSendData
{
    swDataHead info;
    /**
     * for big package
     */
    uint32_t length;
    char *data;
} swSendData;

typedef struct
{
    int length;
    int worker_id;
} swPackage_response;

static int swReactorThread_onPipeReceive(swReactor *reactor, swEvent *ev)
{
    int n;
    swEventData resp;
    swSendData _send;

    swPackage_response pkg_resp;
    swWorker *worker;

#ifdef SW_REACTOR_RECV_AGAIN
    while (1)
#endif
    {
        n = read(ev->fd, &resp, sizeof(resp));
        if (n > 0)
        {
            memcpy(&_send.info, &resp.info, sizeof(resp.info));
            //pipe data
            if (_send.info.from_fd == SW_RESPONSE_SMALL)
            {
                _send.data = resp.data;
                _send.length = resp.info.len;
                swReactorThread_send(&_send);
            }
            //use send shm
            else if (_send.info.from_fd == SW_RESPONSE_SHM)
            {
                memcpy(&pkg_resp, resp.data, sizeof(pkg_resp));
                worker = swServer_get_worker(SwooleG.serv, pkg_resp.worker_id);

                _send.data = worker->send_shm;
                _send.length = pkg_resp.length;

                swReactorThread_send(&_send);
                worker->lock.unlock(&worker->lock);
            }
            //use tmp file
            else if (_send.info.from_fd == SW_RESPONSE_TMPFILE)
            {
                swString *data = swTaskWorker_large_unpack(&resp);
                if (data == NULL)
                {
                    return SW_ERR;
                }
                _send.data = data->str;
                _send.length = data->length;
                swReactorThread_send(&_send);
            }
            //reactor thread exit
            else if (_send.info.from_fd == SW_RESPONSE_EXIT)
            {
                reactor->running = 0;
                return SW_OK;
            }
            //will never be here
            else
            {
                abort();
            }
        }
        else if (errno == EAGAIN)
        {
            return SW_OK;
        }
        else
        {
            swWarn("read(worker_pipe) failed. Error: %s[%d]", strerror(errno), errno);
            return SW_ERR;
        }
    }

    return SW_OK;
}

static sw_inline swString* swTaskWorker_large_unpack(swEventData *task_result)
{
    swPackage_task _pkg;
    memcpy(&_pkg, task_result->data, sizeof(_pkg));

    int tmp_file_fd = open(_pkg.tmpfile, O_RDONLY);
    if (tmp_file_fd < 0)
    {
        swSysError("open(%s) failed.", _pkg.tmpfile);
        return NULL;
    }
    if (SwooleTG.buffer_stack->size < _pkg.length && swString_extend_align(SwooleTG.buffer_stack, _pkg.length) < 0)
    {
        close(tmp_file_fd);
        return NULL;
    }
    if (swoole_sync_readfile(tmp_file_fd, SwooleTG.buffer_stack->str, _pkg.length) < 0)
    {
        close(tmp_file_fd);
        return NULL;
    }
    close(tmp_file_fd);
    if (!(swTask_type(task_result) & SW_TASK_PEEK))
    {
        unlink(_pkg.tmpfile);
    }
    SwooleTG.buffer_stack->length = _pkg.length;
    return SwooleTG.buffer_stack;
}
swReactorThread_send 函數

首先要獲取連接的 session_id,利用 session_id 獲取 swConnection 對象,進而拿到負責該連接的 reactor 對象

SW_EVENT_CONFIRM 代表 worker 確認接收該連接(當服務端使用 enable_delay_receive 選項時)

當調用 swoole_server->pause 函數時,BASE 模式會調用本函數,將不會讀取客戶端數據,去除 reactor 對讀就緒事件的監聽

類似地 swoole_server->resume 函數用于恢復當前連接,重新將讀就緒放入 reactor 的監聽事件中

如果 conn->out_buffer 為空,那么就嘗試向 socket 寫數據,如果沒有全部寫入成功,那么就將數據放入 conn->out_buffer 中去,并開啟事件監聽

如果 conn->out_buffe 數據量過大,需要設置 conn->high_watermark 為 1,調用 onBufferFull 回調

int swReactorThread_send(swSendData *_send)
{
    swServer *serv = SwooleG.serv;
    uint32_t session_id = _send->info.fd;
    void *_send_data = _send->data;
    uint32_t _send_length = _send->length;

    swConnection *conn;
    if (_send->info.type != SW_EVENT_CLOSE)
    {
        conn = swServer_connection_verify(serv, session_id);
    }
    else
    {
        conn = swServer_connection_verify_no_ssl(serv, session_id);
    }

    int fd = conn->fd;
    swReactor *reactor;

    {
        reactor = &(serv->reactor_threads[conn->from_id].reactor);
        assert(fd % serv->reactor_num == reactor->id);
        assert(fd % serv->reactor_num == SwooleTG.id);
    }

    /**
     * Reset send buffer, Immediately close the connection.
     */
    if (_send->info.type == SW_EVENT_CLOSE && (conn->close_reset || conn->removed))
    {
        goto close_fd;
    }
    else if (_send->info.type == SW_EVENT_CONFIRM)
    {
        reactor->add(reactor, conn->fd, conn->fdtype | SW_EVENT_READ);
        conn->listen_wait = 0;
        return SW_OK;
    }
    /**
     * pause recv data
     */
    else if (_send->info.type == SW_EVENT_PAUSE_RECV)
    {
        if (conn->events & SW_EVENT_WRITE)
        {
            return reactor->set(reactor, conn->fd, conn->fdtype | SW_EVENT_WRITE);
        }
        else
        {
            return reactor->del(reactor, conn->fd);
        }
    }
    /**
     * resume recv data
     */
    else if (_send->info.type == SW_EVENT_RESUME_RECV)
    {
        if (conn->events & SW_EVENT_WRITE)
        {
            return reactor->set(reactor, conn->fd, conn->fdtype | SW_EVENT_READ | SW_EVENT_WRITE);
        }
        else
        {
            return reactor->add(reactor, conn->fd, conn->fdtype | SW_EVENT_READ);
        }
    }

    if (swBuffer_empty(conn->out_buffer))
    {
        /**
         * close connection.
         */
        if (_send->info.type == SW_EVENT_CLOSE)
        {
            close_fd:
            reactor->close(reactor, fd);
            return SW_OK;
        }
#ifdef SW_REACTOR_SYNC_SEND
        //Direct send
        if (_send->info.type != SW_EVENT_SENDFILE)
        {
            if (!conn->direct_send)
            {
                goto buffer_send;
            }

            int n;

            direct_send:
            n = swConnection_send(conn, _send_data, _send_length, 0);
            if (n == _send_length)
            {
                return SW_OK;
            }
            else if (n > 0)
            {
                _send_data += n;
                _send_length -= n;
                goto buffer_send;
            }
            else if (errno == EINTR)
            {
                goto direct_send;
            }
            else
            {
                goto buffer_send;
            }
        }
#endif
        //buffer send
        else
        {
#ifdef SW_REACTOR_SYNC_SEND
            buffer_send:
#endif
            if (!conn->out_buffer)
            {
                conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
                if (conn->out_buffer == NULL)
                {
                    return SW_ERR;
                }
            }
        }
    }

    swBuffer_trunk *trunk;
    //close connection
    if (_send->info.type == SW_EVENT_CLOSE)
    {
        trunk = swBuffer_new_trunk(conn->out_buffer, SW_CHUNK_CLOSE, 0);
        trunk->store.data.val1 = _send->info.type;
    }
    //sendfile to client
    else if (_send->info.type == SW_EVENT_SENDFILE)
    {
        swSendFile_request *req = (swSendFile_request *) _send_data;
        swConnection_sendfile(conn, req->filename, req->offset, req->length);
    }
    //send data
    else
    {
        //connection is closed
        if (conn->removed)
        {
            swWarn("connection#%d is closed by client.", fd);
            return SW_ERR;
        }
        //connection output buffer overflow
        if (conn->out_buffer->length >= conn->buffer_size)
        {
            if (serv->send_yield)
            {
                SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
            }
            else
            {
                swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "connection#%d output buffer overflow.", fd);
            }
            conn->overflow = 1;
            if (serv->onBufferEmpty && serv->onBufferFull == NULL)
            {
                conn->high_watermark = 1;
            }
        }

        int _length = _send_length;
        void* _pos = _send_data;
        int _n;

        //buffer enQueue
        while (_length > 0)
        {
            _n = _length >= SW_BUFFER_SIZE_BIG ? SW_BUFFER_SIZE_BIG : _length;
            swBuffer_append(conn->out_buffer, _pos, _n);
            _pos += _n;
            _length -= _n;
        }

        swListenPort *port = swServer_get_port(serv, fd);
        if (serv->onBufferFull && conn->high_watermark == 0 && conn->out_buffer->length >= port->buffer_high_watermark)
        {
            swServer_tcp_notify(serv, conn, SW_EVENT_BUFFER_FULL);
            conn->high_watermark = 1;
        }
    }

    //listen EPOLLOUT event
    if (reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_WRITE | SW_EVENT_READ) < 0
            && (errno == EBADF || errno == ENOENT))
    {
        goto close_fd;
    }

    return SW_OK;
}

swConnection_sendfile 發送文件

對于文件的發送,swoole 將文件的信息存儲在 swTask_sendfile 對象中,然后將其放入 conn->out_buffer 中。

typedef struct {
    char *filename;
    uint16_t name_len;
    int fd;
    size_t length;
    off_t offset;
} swTask_sendfile;

int swConnection_sendfile(swConnection *conn, char *filename, off_t offset, size_t length)
{
    if (conn->out_buffer == NULL)
    {
        conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
        if (conn->out_buffer == NULL)
        {
            return SW_ERR;
        }
    }

    swBuffer_trunk error_chunk;
    swTask_sendfile *task = sw_malloc(sizeof(swTask_sendfile));
    if (task == NULL)
    {
        swWarn("malloc for swTask_sendfile failed.");
        return SW_ERR;
    }
    bzero(task, sizeof(swTask_sendfile));

    task->filename = sw_strdup(filename);
    int file_fd = open(filename, O_RDONLY);
    if (file_fd < 0)
    {
        sw_free(task->filename);
        sw_free(task);
        swSysError("open(%s) failed.", filename);
        return SW_OK;
    }
    task->fd = file_fd;
    task->offset = offset;

    struct stat file_stat;
    if (fstat(file_fd, &file_stat) < 0)
    {
        swSysError("fstat(%s) failed.", filename);
        error_chunk.store.ptr = task;
        swConnection_sendfile_destructor(&error_chunk);
        return SW_ERR;
    }
    if (offset < 0 || (length + offset > file_stat.st_size))
    {
        swoole_error_log(SW_LOG_WARNING, SW_ERROR_INVALID_PARAMS, "length or offset is invalid.");
        error_chunk.store.ptr = task;
        swConnection_sendfile_destructor(&error_chunk);
        return SW_OK;
    }
    if (length == 0)
    {
        task->length = file_stat.st_size;
    }
    else
    {
        task->length = length + offset;
    }

    swBuffer_trunk *chunk = swBuffer_new_trunk(conn->out_buffer, SW_CHUNK_SENDFILE, 0);
    if (chunk == NULL)
    {
        swWarn("get out_buffer trunk failed.");
        error_chunk.store.ptr = task;
        swConnection_sendfile_destructor(&error_chunk);
        return SW_ERR;
    }

    chunk->store.ptr = (void *) task;
    chunk->destroy = swConnection_sendfile_destructor;

    return SW_OK;
}
swConnection_onSendfile 向客戶端發送文件

HAVE_TCP_NOPUSH 是避免 TCP 延遲接受的一種方法,為了避免 Nagle 算法造成的延遲,我們需要設置 TCP_NODELAY 選項和 TCP_CORK 選項來避免延遲接受和合并數據包(詳情可以看 Nagle 算法與 TCP socket 選項 TCP_CORK)

獲取到 sendn 后,就要調用 swoole_sendfile 讀取文件內容,發送數據

發送數據結束后,再將 TCP_CORK 設置為 0

static sw_inline int swSocket_tcp_nopush(int sock, int nopush)
{
    return setsockopt(sock, IPPROTO_TCP, TCP_CORK, (const void *) &nopush, sizeof(int));
}

int swConnection_onSendfile(swConnection *conn, swBuffer_trunk *chunk)
{
    int ret;
    swTask_sendfile *task = chunk->store.ptr;

#ifdef HAVE_TCP_NOPUSH
    if (task->offset == 0 && conn->tcp_nopush == 0)
    {
        /**
         * disable tcp_nodelay
         */
        if (conn->tcp_nodelay)
        {
            int tcp_nodelay = 0;
            if (setsockopt(conn->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                swWarn("setsockopt(TCP_NODELAY) failed. Error: %s[%d]", strerror(errno), errno);
            }
        }
        /**
         * enable tcp_nopush
         */
        if (swSocket_tcp_nopush(conn->fd, 1) == -1)
        {
            swWarn("swSocket_tcp_nopush() failed. Error: %s[%d]", strerror(errno), errno);
        }
        conn->tcp_nopush = 1;
    }
#endif

    int sendn = (task->length - task->offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : task->length - task->offset;

    {
        ret = swoole_sendfile(conn->fd, task->fd, &task->offset, sendn);
    }

    swTrace("ret=%d|task->offset=%ld|sendn=%d|filesize=%ld", ret, (long)task->offset, sendn, task->length);

    if (ret <= 0)
    {
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swSysError("sendfile(%s, %ld, %d) failed.", task->filename, (long)task->offset, sendn);
            swBuffer_pop_trunk(conn->out_buffer, chunk);
            return SW_OK;
        case SW_CLOSE:
            conn->close_wait = 1;
            return SW_ERR;
        case SW_WAIT:
            conn->send_wait = 1;
            return SW_ERR;
        default:
            break;
        }
    }

    //sendfile finish
    if (task->offset >= task->length)
    {
        swBuffer_pop_trunk(conn->out_buffer, chunk);

#ifdef HAVE_TCP_NOPUSH
        /**
         * disable tcp_nopush
         */
        if (swSocket_tcp_nopush(conn->fd, 0) == -1)
        {
            swWarn("swSocket_tcp_nopush() failed. Error: %s[%d]", strerror(errno), errno);
        }
        conn->tcp_nopush = 0;

        /**
         * enable tcp_nodelay
         */
        if (conn->tcp_nodelay)
        {
            int value = 1;
            if (setsockopt(conn->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &value, sizeof(int)) == -1)
            {
                swWarn("setsockopt(TCP_NODELAY) failed. Error: %s[%d]", strerror(errno), errno);
            }
        }
#endif
    }
    return SW_OK;
}

int swoole_sendfile(int out_fd, int in_fd, off_t *offset, size_t size)
{
    char buf[SW_BUFFER_SIZE_BIG];
    int readn = size > sizeof(buf) ? sizeof(buf) : size;

    int ret;
    int n = pread(in_fd, buf, readn, *offset);

    if (n > 0)
    {
        ret = write(out_fd, buf, n);
        if (ret < 0)
        {
            swSysError("write() failed.");
        }
        else
        {
            *offset += ret;
        }
        return ret;
    }
    else
    {
        swSysError("pread() failed.");
        return SW_ERR;
    }
}

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29241.html

相關文章

  • Swoole 源碼分析——Server模塊ReactorThread事件循環(上)

    摘要:線程在建立之時,就會調用函數開啟事件循環。如果為空,那么重新設置文件描述符的監聽事件,刪除寫就緒,只設置讀就緒。這個是水平觸發模式的必要步驟,避免無數據寫入時,頻繁地調用寫就緒回調函數。 前言 經過 php_swoole_server_before_start 調用 swReactorThread_create 創建了 serv->reactor_threads 對象后,swServe...

    gplane 評論0 收藏0
  • Swoole 源碼分析——Server模塊Start

    摘要:是緩存區高水位線,達到了說明緩沖區即將滿了創建線程函數用于將監控的存放于中向中添加監聽的文件描述符等待所有的線程開啟事件循環利用創建線程,線程啟動函數是保存監聽本函數將用于監聽的存放到當中,并設置相應的屬性 Server 的啟動 在 server 啟動之前,swoole 首先要調用 php_swoole_register_callback 將 PHP 的回調函數注冊到 server...

    3fuyu 評論0 收藏0
  • Swoole 源碼分析——Reactor模塊ReactorBase

    前言 作為一個網絡框架,最為核心的就是消息的接受與發送。高效的 reactor 模式一直是眾多網絡框架的首要選擇,本節主要講解 swoole 中的 reactor 模塊。 UNP 學習筆記——IO 復用 Reactor 的數據結構 Reactor 的數據結構比較復雜,首先 object 是具體 Reactor 對象的首地址,ptr 是擁有 Reactor 對象的類的指針, event_nu...

    baukh789 評論0 收藏0
  • 記JVM堆外內存泄漏Bug查找

    摘要:服務本身是一個,開起的線程數為,再加上一些其他線程,總的線程數不會超過服務內自己沒有顯示創建線程或者使用線程池。問題解決找到所在后,結局方案很簡單,只需將的通過單例的方式注入到服務中,即可解決堆外內存泄漏的問題。 內存泄漏Bug現場 一個做BI數據展示的服務在一個晚上重啟了5次,由于是通過k8s容器編排,服務掛了以后會自動重啟,所以服務還能繼續提供服務。 第一時間先上日志系統查看錯誤日...

    hiYoHoo 評論0 收藏0
  • Swoole 源碼分析——Server模塊Stream 模式

    摘要:新建可以看到,自動采用包長檢測的方法該函數主要功能是設置各種回調函數值得注意的是第三個參數代表是否異步。發送數據函數并不是直接發送數據,而是將數據存儲在,等著寫事件就緒之后調用發送數據。 swReactorThread_dispatch 發送數據 reactor 線程會通過 swReactorThread_dispatch 發送數據,當采用 stream 發送數據的時候,會調用 sw...

    wums 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<