摘要:前言對于多進程多線程的應用程序來說,保證數據正確的同步與更新離不開鎖和信號,中的鎖與信號基本采用系列函數實現。中的鎖類型有很多種互斥鎖自旋鎖文件鎖讀寫鎖原子鎖,本節就會講解中各種鎖的定義與使用。
前言
對于多進程多線程的應用程序來說,保證數據正確的同步與更新離不開鎖和信號,swoole 中的鎖與信號基本采用 pthread 系列函數實現。UNIX 中的鎖類型有很多種:互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,本節就會講解 swoole 中各種鎖的定義與使用。
APUE 學習筆記——線程與鎖
APUE 學習筆記——高級 IO與文件鎖
數據結構swoole 中無論哪種鎖,其數據結構都是 swLock,這個數據結構內部有一個聯合體 object,這個聯合體可以是 互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,type 可以指代這個鎖的類型,具體可選項是 SW_LOCKS 這個枚舉類型
該結構體還定義了幾個函數指針,這幾個函數類似于各個鎖需要實現的接口,值得注意的是 lock_rd 和 trylock_rd兩個函數是專門為了 swFileLock 和 swRWLock 設計的,其他鎖沒有這兩個函數。
</>復制代碼
typedef struct _swLock
{
int type;
union
{
swMutex mutex;
#ifdef HAVE_RWLOCK
swRWLock rwlock;
#endif
#ifdef HAVE_SPINLOCK
swSpinLock spinlock;
#endif
swFileLock filelock;
swSem sem;
swAtomicLock atomlock;
} object;
int (*lock_rd)(struct _swLock *);
int (*lock)(struct _swLock *);
int (*unlock)(struct _swLock *);
int (*trylock_rd)(struct _swLock *);
int (*trylock)(struct _swLock *);
int (*free)(struct _swLock *);
} swLock;
enum SW_LOCKS
{
SW_RWLOCK = 1,
#define SW_RWLOCK SW_RWLOCK
SW_FILELOCK = 2,
#define SW_FILELOCK SW_FILELOCK
SW_MUTEX = 3,
#define SW_MUTEX SW_MUTEX
SW_SEM = 4,
#define SW_SEM SW_SEM
SW_SPINLOCK = 5,
#define SW_SPINLOCK SW_SPINLOCK
SW_ATOMLOCK = 6,
#define SW_ATOMLOCK SW_ATOMLOCK
};
互斥鎖
互斥鎖是最常用的進程/線程鎖,swMutex 的基礎是 pthread_mutex 系列函數, 因此該數據結構只有兩個成員變量:_lock、attr:
</>復制代碼
typedef struct _swMutex
{
pthread_mutex_t _lock;
pthread_mutexattr_t attr;
} swMutex;
互斥鎖的創建
互斥鎖的創建就是 pthread_mutex 互斥鎖的初始化,首先初始化互斥鎖的屬性 pthread_mutexattr_t attr,設定互斥鎖是否要進程共享,之后設置各個關于鎖的函數:
</>復制代碼
int swMutex_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_MUTEX;
pthread_mutexattr_init(&lock->object.mutex.attr);
if (use_in_process == 1)
{
pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED);
}
if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0)
{
return SW_ERR;
}
lock->lock = swMutex_lock;
lock->unlock = swMutex_unlock;
lock->trylock = swMutex_trylock;
lock->free = swMutex_free;
return SW_OK;
}
互斥鎖函數
互斥鎖的函數就是調用相應的 pthread_mutex 系列函數:
</>復制代碼
static int swMutex_lock(swLock *lock)
{
return pthread_mutex_lock(&lock->object.mutex._lock);
}
static int swMutex_unlock(swLock *lock)
{
return pthread_mutex_unlock(&lock->object.mutex._lock);
}
static int swMutex_trylock(swLock *lock)
{
return pthread_mutex_trylock(&lock->object.mutex._lock);
}
static int swMutex_free(swLock *lock)
{
pthread_mutexattr_destroy(&lock->object.mutex.attr);
return pthread_mutex_destroy(&lock->object.mutex._lock);
}
int swMutex_lockwait(swLock *lock, int timeout_msec)
{
struct timespec timeo;
timeo.tv_sec = timeout_msec / 1000;
timeo.tv_nsec = (timeout_msec - timeo.tv_sec * 1000) * 1000 * 1000;
return pthread_mutex_timedlock(&lock->object.mutex._lock, &timeo);
}
讀寫鎖
對于讀多寫少的情況,讀寫鎖可以顯著的提高程序效率,swRWLock 的基礎是 pthread_rwlock 系列函數:
</>復制代碼
typedef struct _swRWLock
{
pthread_rwlock_t _lock;
pthread_rwlockattr_t attr;
} swRWLock;
讀寫鎖的創建
讀寫鎖的創建過程和互斥鎖類似:
</>復制代碼
int swRWLock_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_RWLOCK;
pthread_rwlockattr_init(&lock->object.rwlock.attr);
if (use_in_process == 1)
{
pthread_rwlockattr_setpshared(&lock->object.rwlock.attr, PTHREAD_PROCESS_SHARED);
}
if ((ret = pthread_rwlock_init(&lock->object.rwlock._lock, &lock->object.rwlock.attr)) < 0)
{
return SW_ERR;
}
lock->lock_rd = swRWLock_lock_rd;
lock->lock = swRWLock_lock_rw;
lock->unlock = swRWLock_unlock;
lock->trylock = swRWLock_trylock_rw;
lock->trylock_rd = swRWLock_trylock_rd;
lock->free = swRWLock_free;
return SW_OK;
}
讀寫鎖函數
</>復制代碼
static int swRWLock_lock_rd(swLock *lock)
{
return pthread_rwlock_rdlock(&lock->object.rwlock._lock);
}
static int swRWLock_lock_rw(swLock *lock)
{
return pthread_rwlock_wrlock(&lock->object.rwlock._lock);
}
static int swRWLock_unlock(swLock *lock)
{
return pthread_rwlock_unlock(&lock->object.rwlock._lock);
}
static int swRWLock_trylock_rd(swLock *lock)
{
return pthread_rwlock_tryrdlock(&lock->object.rwlock._lock);
}
static int swRWLock_trylock_rw(swLock *lock)
{
return pthread_rwlock_trywrlock(&lock->object.rwlock._lock);
}
static int swRWLock_free(swLock *lock)
{
return pthread_rwlock_destroy(&lock->object.rwlock._lock);
}
文件鎖
文件鎖是對多進程、多線程同一時間寫相同文件這一場景設定的鎖,底層函數是 fcntl:
</>復制代碼
typedef struct _swFileLock
{
struct flock lock_t;
int fd;
} swFileLock;
文件鎖的創建
</>復制代碼
int swFileLock_create(swLock *lock, int fd)
{
bzero(lock, sizeof(swLock));
lock->type = SW_FILELOCK;
lock->object.filelock.fd = fd;
lock->lock_rd = swFileLock_lock_rd;
lock->lock = swFileLock_lock_rw;
lock->trylock_rd = swFileLock_trylock_rd;
lock->trylock = swFileLock_trylock_rw;
lock->unlock = swFileLock_unlock;
lock->free = swFileLock_free;
return 0;
}
文件鎖函數
</>復制代碼
static int swFileLock_lock_rd(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_RDLCK;
return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}
static int swFileLock_lock_rw(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_WRLCK;
return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}
static int swFileLock_unlock(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_UNLCK;
return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}
static int swFileLock_trylock_rw(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_WRLCK;
return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock);
}
static int swFileLock_trylock_rd(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_RDLCK;
return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock);
}
static int swFileLock_free(swLock *lock)
{
return close(lock->object.filelock.fd);
}
自旋鎖
自旋鎖類似于互斥鎖,不同的是自旋鎖在加鎖失敗的時候,并不會沉入內核,而是空轉,這樣的鎖效率更高,但是會空耗 CPU
資源:
</>復制代碼
typedef struct _swSpinLock
{
pthread_spinlock_t lock_t;
} swSpinLock;
自旋鎖的創建
</>復制代碼
int swSpinLock_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_SPINLOCK;
if ((ret = pthread_spin_init(&lock->object.spinlock.lock_t, use_in_process)) < 0)
{
return -1;
}
lock->lock = swSpinLock_lock;
lock->unlock = swSpinLock_unlock;
lock->trylock = swSpinLock_trylock;
lock->free = swSpinLock_free;
return 0;
}
自旋鎖函數
</>復制代碼
static int swSpinLock_lock(swLock *lock)
{
return pthread_spin_lock(&lock->object.spinlock.lock_t);
}
static int swSpinLock_unlock(swLock *lock)
{
return pthread_spin_unlock(&lock->object.spinlock.lock_t);
}
static int swSpinLock_trylock(swLock *lock)
{
return pthread_spin_trylock(&lock->object.spinlock.lock_t);
}
static int swSpinLock_free(swLock *lock)
{
return pthread_spin_destroy(&lock->object.spinlock.lock_t);
}
原子鎖
不同于以上幾種鎖,swoole 的原子鎖并不是 pthread 系列的鎖,而是自定義實現的。
</>復制代碼
typedef volatile uint32_t sw_atomic_uint32_t;
typedef sw_atomic_uint32_t sw_atomic_t;
typedef struct _swAtomicLock
{
sw_atomic_t lock_t;
uint32_t spin;
} swAtomicLock;
原子鎖的創建
</>復制代碼
int swAtomicLock_create(swLock *lock, int spin)
{
bzero(lock, sizeof(swLock));
lock->type = SW_ATOMLOCK;
lock->object.atomlock.spin = spin;
lock->lock = swAtomicLock_lock;
lock->unlock = swAtomicLock_unlock;
lock->trylock = swAtomicLock_trylock;
return SW_OK;
}
原子鎖的加鎖
</>復制代碼
static int swAtomicLock_lock(swLock *lock)
{
sw_spinlock(&lock->object.atomlock.lock_t);
return SW_OK;
}
原子鎖的加鎖邏輯函數 sw_spinlock 非常復雜,具體步驟如下:
如果原子鎖沒有被鎖,那么調用原子函數 sw_atomic_cmp_set(__sync_bool_compare_and_swap ) 進行加鎖
若原子鎖已經被加鎖,如果是單核,那么就調用 sched_yield 函數讓出執行權,因為這說明自旋鎖已經被其他進程加鎖,但是卻被強占睡眠,我們需要讓出控制權讓那個唯一的 cpu 把那個進程跑下去,注意這時絕對不能進行自選,否則就是死鎖。
如果是多核,就要不斷空轉的嘗試加鎖,防止睡眠,加鎖的嘗試間隔時間會指數增加,例如第一次 1 個時鐘周期,第二次 2 時鐘周期,第三次 4 時鐘周期...
間隔時間內執行的函數 sw_atomic_cpu_pause 使用的是內嵌的匯編代碼,目的在讓 cpu 空轉,禁止線程或進程被其他線程強占導致睡眠,恢復上下文浪費時間。
如果超過了 SW_SPINLOCK_LOOP_N 次數,還沒有能夠獲取的到鎖,那么也要讓出控制權,這時很有可能被鎖保護的代碼有阻塞行為
</>復制代碼
#define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set)
#define sw_atomic_cpu_pause() __asm__ __volatile__ ("pause")
#define swYield() sched_yield() //or usleep(1)
static sw_inline void sw_spinlock(sw_atomic_t *lock)
{
uint32_t i, n;
while (1)
{
if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
{
return;
}
if (SW_CPU_NUM > 1)
{
for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1)
{
for (i = 0; i < n; i++)
{
sw_atomic_cpu_pause();
}
if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
{
return;
}
}
}
swYield();
}
}
原子鎖的函數
</>復制代碼
static int swAtomicLock_unlock(swLock *lock)
{
return lock->object.atomlock.lock_t = 0;
}
static int swAtomicLock_trylock(swLock *lock)
{
sw_atomic_t *atomic = &lock->object.atomlock.lock_t;
return (*(atomic) == 0 && sw_atomic_cmp_set(atomic, 0, 1));
}
信號量
信號量也是數據同步的一種重要方式,其數據結構為:
</>復制代碼
typedef struct _swSem
{
key_t key;
int semid;
} swSem;
信號量的創建
信號量的初始化首先需要調用 semget 創建一個新的信號量
semctl 會將信號量初始化為 0
</>復制代碼
int swSem_create(swLock *lock, key_t key)
{
int ret;
lock->type = SW_SEM;
if ((ret = semget(key, 1, IPC_CREAT | 0666)) < 0)
{
return SW_ERR;
}
if (semctl(ret, 0, SETVAL, 1) == -1)
{
swWarn("semctl(SETVAL) failed");
return SW_ERR;
}
lock->object.sem.semid = ret;
lock->lock = swSem_lock;
lock->unlock = swSem_unlock;
lock->free = swSem_free;
return SW_OK;
}
信號量的 V 操作
</>復制代碼
static int swSem_unlock(swLock *lock)
{
struct sembuf sem;
sem.sem_flg = SEM_UNDO;
sem.sem_num = 0;
sem.sem_op = 1;
return semop(lock->object.sem.semid, &sem, 1);
}
信號量的 P 操作
</>復制代碼
static int swSem_lock(swLock *lock)
{
struct sembuf sem;
sem.sem_flg = SEM_UNDO;
sem.sem_num = 0;
sem.sem_op = -1;
return semop(lock->object.sem.semid, &sem, 1);
}
信號量的銷毀
IPC_RMID 用于銷毀信號量
</>復制代碼
static int swSem_free(swLock *lock)
{
return semctl(lock->object.sem.semid, 0, IPC_RMID);
}
條件變量
條件變量并沒有作為 swLock 的一員,而是自成一體
條件變量不僅需要 pthread_cond_t,還需要互斥量 swLock
</>復制代碼
typedef struct _swCond
{
swLock _lock;
pthread_cond_t _cond;
int (*wait)(struct _swCond *object);
int (*timewait)(struct _swCond *object, long, long);
int (*notify)(struct _swCond *object);
int (*broadcast)(struct _swCond *object);
void (*free)(struct _swCond *object);
int (*lock)(struct _swCond *object);
int (*unlock)(struct _swCond *object);
} swCond;
條件變量的創建
</>復制代碼
int swCond_create(swCond *cond)
{
if (pthread_cond_init(&cond->_cond, NULL) < 0)
{
swWarn("pthread_cond_init fail. Error: %s [%d]", strerror(errno), errno);
return SW_ERR;
}
if (swMutex_create(&cond->_lock, 0) < 0)
{
return SW_ERR;
}
cond->notify = swCond_notify;
cond->broadcast = swCond_broadcast;
cond->timewait = swCond_timewait;
cond->wait = swCond_wait;
cond->lock = swCond_lock;
cond->unlock = swCond_unlock;
cond->free = swCond_free;
return SW_OK;
}
條件變量的函數
值得注意的是,條件變量的函數使用一定要結合 swCond_lock、swCond_unlock 等函數
</>復制代碼
static int swCond_notify(swCond *cond)
{
return pthread_cond_signal(&cond->_cond);
}
static int swCond_broadcast(swCond *cond)
{
return pthread_cond_broadcast(&cond->_cond);
}
static int swCond_timewait(swCond *cond, long sec, long nsec)
{
struct timespec timeo;
timeo.tv_sec = sec;
timeo.tv_nsec = nsec;
return pthread_cond_timedwait(&cond->_cond, &cond->_lock.object.mutex._lock, &timeo);
}
static int swCond_wait(swCond *cond)
{
return pthread_cond_wait(&cond->_cond, &cond->_lock.object.mutex._lock);
}
static int swCond_lock(swCond *cond)
{
return cond->_lock.lock(&cond->_lock);
}
static int swCond_unlock(swCond *cond)
{
return cond->_lock.unlock(&cond->_lock);
}
static void swCond_free(swCond *cond)
{
pthread_cond_destroy(&cond->_cond);
cond->_lock.free(&cond->_lock);
}
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/29220.html
摘要:在創建進程和線程之間,主線程開始進行信號處理函數的設置。事件循環結束前會調用函數,該函數會檢查并執行相應的信號處理函數。 前言 信號處理是網絡庫不可或缺的一部分,不論是 ALARM、SIGTERM、SIGUSR1、SIGUSR2、SIGPIPE 等信號對程序的控制,還是 reactor、read、write 等操作被信號中斷的處理,都關系著整個框架程序的正常運行。 Signal 數據...
摘要:修復添加超過萬個以上定時器時發生崩潰的問題增加模塊,下高性能序列化庫修復監聽端口設置無效的問題等。線程來處理網絡事件輪詢,讀取數據。當的三次握手成功了以后,由這個線程將連接成功的消息告訴進程,再由進程轉交給進程。此時進程觸發事件。 本文示例代碼詳見:https://github.com/52fhy/swoo...。 簡介 Swoole是一個PHP擴展,提供了PHP語言的異步多線程服務器...
摘要:清空主進程殘留的定時器與信號。設定為執行回調函數如果在回調函數中調用了異步系統,啟動函數進行事件循環。因此為了區分兩者,規定并不允許兩者同時存在。 前言 swoole-1.7.2 增加了一個進程管理模塊,用來替代 PHP 的 pcntl 擴展。 PHP自帶的pcntl,存在很多不足,如 pcntl 沒有提供進程間通信的功能 pcntl 不支持重定向標準輸入和輸出 pcntl 只...
摘要:當其就緒時,會調用執行定時函數。進程超時停止進程將要停止時,并不會立刻停止,而是會等待事件循環結束后停止,這時為了防止進程不退出,還設置了的延遲,超過就會停止該進程。當允許空閑時間小于時,統一每隔檢測空閑連接。 前言 swoole 的 timer 模塊功能有三個:用戶定時任務、剔除空閑連接、更新 server 時間。timer 模塊的底層有兩種,一種是基于 alarm 信號,一種是基于...
前言 作為一個網絡框架,最為核心的就是消息的接受與發送。高效的 reactor 模式一直是眾多網絡框架的首要選擇,本節主要講解 swoole 中的 reactor 模塊。 UNP 學習筆記——IO 復用 Reactor 的數據結構 Reactor 的數據結構比較復雜,首先 object 是具體 Reactor 對象的首地址,ptr 是擁有 Reactor 對象的類的指針, event_nu...
閱讀 3268·2021-09-23 11:55
閱讀 2602·2021-09-13 10:33
閱讀 1666·2019-08-30 15:54
閱讀 3096·2019-08-30 15:54
閱讀 2363·2019-08-30 10:59
閱讀 2370·2019-08-29 17:08
閱讀 1804·2019-08-29 13:16
閱讀 3589·2019-08-26 12:25