最近在看SRS的源码。SRS是基于协程开发的,底层使用了StateThreads。所以为了充分的理解SRS源码,需要先学习一下StateThreads。这里对StateThreads的学习做了一些总结和记录。
StateThreads是什么
StateThreads是一个用户级线程库,用于多线程编程。它提供了一种轻量级的线程模型,允许开发人员以更简单的方式编写并发程序。
StateThreads有什么用
StateThreads 的主要目标是提供一种高效的用户级线程模型,以减少线程切换和上下文切换的开销。它采用协作式调度策略,即线程在主动释放执行权之前不会被抢占。这种方式可以减少线程切换的开销,但也需要开发人员在适当的时机主动释放执行权,以避免长时间的阻塞导致程序响应性下降。
StateThreads 提供了一组简单的函数和宏,用于创建和管理线程、同步和通信等操作。它支持线程的创建、销毁、休眠、唤醒等基本操作,以及互斥锁、条件变量、信号量等同步机制。开发人员可以使用这些函数和宏来编写并发程序,而不需要直接操作操作系统提供的线程和同步原语。
总的来说,StateThreads是一个高性能、高并发、高扩展性和可读性的网络服务器架构。
StateThreads怎么用
下载
git clone -b srs https://github.com/ossrs/state-threads.git
编译
make linux-debug
编译完成后,将头文件导入需要使用到StateThreads的项目。并在编译项目时链接st库即可。
使用示例
示例一
下面是用StateThreads实现的一个简单的服务,可以监听客户端的连接。
#include <iostream>#include <stdio.h>#include <arpa/inet.h>#include <errno.h>#include <stdlib.h>#include <string.h>#include <sys/socket.h>#include <sys/types.h>#include <st.h>#define LISTEN_PORT 8000#define ERR_EXIT(m) \do \{ \perror(m); \exit(-1); \} while (0)void *client_thread(void *arg){st_netfd_t client_st_fd = (st_netfd_t)arg;// 用于获取与 st_netfd_t 对象关联的文件描述符(File Descriptor)。它返回一个整数值,表示文件描述符的值。// 将 st_netfd_t 对象转换为普通的文件描述符int client_fd = st_netfd_fileno(client_st_fd);sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);// 获取与套接字连接的对端的地址信息int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);if (ret == -1){printf("[WARN] Failed to get client ip: %s\n", strerror(ret));}char ip_buf[INET_ADDRSTRLEN];// 内存区域清零memset(ip_buf, 0, sizeof(ip_buf));inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,sizeof(ip_buf));while (1){char buf[1024] = {0};// 从给定的套接字中读取指定字节数的数据,并将其存储在提供的缓冲区 buf 中ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);if (ret == -1){printf("client st_read error\n");break;}else if (ret == 0){printf("client quit, ip = %s\n", ip_buf);break;}printf("recv from %s, data = %s", ip_buf, buf);ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);if (ret == -1){printf("client st_write error\n");}}}void *listen_thread(void *arg)// 监听{while (1){st_netfd_t client_st_fd =st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);if (client_st_fd == NULL){continue;}printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));st_thread_t client_tid =st_thread_create(client_thread, (void *)client_st_fd, 0, 0);if (client_tid == NULL){printf("Failed to st create client thread\n");}}}int main(){// 用于设置 ST 库的事件系统。int ret = st_set_eventsys(ST_EVENTSYS_ALT);if (ret == -1){printf("st_set_eventsys use linux epoll failed\n");}// st初始化ret = st_init();if (ret != 0){printf("st_init failed. ret = %d\n", ret);return -1;}// 创建套接字int listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (listen_fd == -1){ERR_EXIT("socket");}int reuse_socket = 1;// 设置套接字选项ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,sizeof(int));if (ret == -1){ERR_EXIT("setsockopt");}struct sockaddr_in server_addr; // 用于表示 IPv4 地址的结构体server_addr.sin_family = AF_INET; // 地址族,一般为 AF_INETserver_addr.sin_port = htons(LISTEN_PORT); // 端口server_addr.sin_addr.s_addr = INADDR_ANY; // ipv4地址结构// 将套接字与特定的 IP 地址和端口号进行绑定ret =bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));if (ret == -1){ERR_EXIT("bind");}ret = listen(listen_fd, 128);if (ret == -1){ERR_EXIT("listen");}// st_netfd_open_socket() 是 State Threads (ST) 库中的一个函数,用于创建一个 st_netfd_t 类型的文件描述符对象,以便进行异步 I/O 操作。st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);if (!st_listen_fd){printf("st_netfd_open_socket open socket failed.\n");return -1;}// 创建线程监听来一个建立连接的请求st_thread_t listen_tid =st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);if (listen_tid == NULL){printf("Failed to st create listen thread\n");}while (1){st_sleep(1);}return 0;}#include <iostream> #include <stdio.h> #include <arpa/inet.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <st.h> #define LISTEN_PORT 8000 #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(-1); \ } while (0) void *client_thread(void *arg) { st_netfd_t client_st_fd = (st_netfd_t)arg; // 用于获取与 st_netfd_t 对象关联的文件描述符(File Descriptor)。它返回一个整数值,表示文件描述符的值。 // 将 st_netfd_t 对象转换为普通的文件描述符 int client_fd = st_netfd_fileno(client_st_fd); sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); // 获取与套接字连接的对端的地址信息 int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len); if (ret == -1) { printf("[WARN] Failed to get client ip: %s\n", strerror(ret)); } char ip_buf[INET_ADDRSTRLEN]; // 内存区域清零 memset(ip_buf, 0, sizeof(ip_buf)); inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf, sizeof(ip_buf)); while (1) { char buf[1024] = {0}; // 从给定的套接字中读取指定字节数的数据,并将其存储在提供的缓冲区 buf 中 ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_read error\n"); break; } else if (ret == 0) { printf("client quit, ip = %s\n", ip_buf); break; } printf("recv from %s, data = %s", ip_buf, buf); ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_write error\n"); } } } void *listen_thread(void *arg) // 监听 { while (1) { st_netfd_t client_st_fd = st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT); if (client_st_fd == NULL) { continue; } printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd)); st_thread_t client_tid = st_thread_create(client_thread, (void *)client_st_fd, 0, 0); if (client_tid == NULL) { printf("Failed to st create client thread\n"); } } } int main() { // 用于设置 ST 库的事件系统。 int ret = st_set_eventsys(ST_EVENTSYS_ALT); if (ret == -1) { printf("st_set_eventsys use linux epoll failed\n"); } // st初始化 ret = st_init(); if (ret != 0) { printf("st_init failed. ret = %d\n", ret); return -1; } // 创建套接字 int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { ERR_EXIT("socket"); } int reuse_socket = 1; // 设置套接字选项 ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)); if (ret == -1) { ERR_EXIT("setsockopt"); } struct sockaddr_in server_addr; // 用于表示 IPv4 地址的结构体 server_addr.sin_family = AF_INET; // 地址族,一般为 AF_INET server_addr.sin_port = htons(LISTEN_PORT); // 端口 server_addr.sin_addr.s_addr = INADDR_ANY; // ipv4地址结构 // 将套接字与特定的 IP 地址和端口号进行绑定 ret = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)); if (ret == -1) { ERR_EXIT("bind"); } ret = listen(listen_fd, 128); if (ret == -1) { ERR_EXIT("listen"); } // st_netfd_open_socket() 是 State Threads (ST) 库中的一个函数,用于创建一个 st_netfd_t 类型的文件描述符对象,以便进行异步 I/O 操作。 st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd); if (!st_listen_fd) { printf("st_netfd_open_socket open socket failed.\n"); return -1; } // 创建线程监听来一个建立连接的请求 st_thread_t listen_tid = st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0); if (listen_tid == NULL) { printf("Failed to st create listen thread\n"); } while (1) { st_sleep(1); } return 0; }#include <iostream> #include <stdio.h> #include <arpa/inet.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <st.h> #define LISTEN_PORT 8000 #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(-1); \ } while (0) void *client_thread(void *arg) { st_netfd_t client_st_fd = (st_netfd_t)arg; // 用于获取与 st_netfd_t 对象关联的文件描述符(File Descriptor)。它返回一个整数值,表示文件描述符的值。 // 将 st_netfd_t 对象转换为普通的文件描述符 int client_fd = st_netfd_fileno(client_st_fd); sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); // 获取与套接字连接的对端的地址信息 int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len); if (ret == -1) { printf("[WARN] Failed to get client ip: %s\n", strerror(ret)); } char ip_buf[INET_ADDRSTRLEN]; // 内存区域清零 memset(ip_buf, 0, sizeof(ip_buf)); inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf, sizeof(ip_buf)); while (1) { char buf[1024] = {0}; // 从给定的套接字中读取指定字节数的数据,并将其存储在提供的缓冲区 buf 中 ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_read error\n"); break; } else if (ret == 0) { printf("client quit, ip = %s\n", ip_buf); break; } printf("recv from %s, data = %s", ip_buf, buf); ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_write error\n"); } } } void *listen_thread(void *arg) // 监听 { while (1) { st_netfd_t client_st_fd = st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT); if (client_st_fd == NULL) { continue; } printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd)); st_thread_t client_tid = st_thread_create(client_thread, (void *)client_st_fd, 0, 0); if (client_tid == NULL) { printf("Failed to st create client thread\n"); } } } int main() { // 用于设置 ST 库的事件系统。 int ret = st_set_eventsys(ST_EVENTSYS_ALT); if (ret == -1) { printf("st_set_eventsys use linux epoll failed\n"); } // st初始化 ret = st_init(); if (ret != 0) { printf("st_init failed. ret = %d\n", ret); return -1; } // 创建套接字 int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { ERR_EXIT("socket"); } int reuse_socket = 1; // 设置套接字选项 ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)); if (ret == -1) { ERR_EXIT("setsockopt"); } struct sockaddr_in server_addr; // 用于表示 IPv4 地址的结构体 server_addr.sin_family = AF_INET; // 地址族,一般为 AF_INET server_addr.sin_port = htons(LISTEN_PORT); // 端口 server_addr.sin_addr.s_addr = INADDR_ANY; // ipv4地址结构 // 将套接字与特定的 IP 地址和端口号进行绑定 ret = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)); if (ret == -1) { ERR_EXIT("bind"); } ret = listen(listen_fd, 128); if (ret == -1) { ERR_EXIT("listen"); } // st_netfd_open_socket() 是 State Threads (ST) 库中的一个函数,用于创建一个 st_netfd_t 类型的文件描述符对象,以便进行异步 I/O 操作。 st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd); if (!st_listen_fd) { printf("st_netfd_open_socket open socket failed.\n"); return -1; } // 创建线程监听来一个建立连接的请求 st_thread_t listen_tid = st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0); if (listen_tid == NULL) { printf("Failed to st create listen thread\n"); } while (1) { st_sleep(1); } return 0; }
示例二
StateThreads创建多线程
#include <stdio.h>#include <st.h>#include <string>void *do_calc(void *arg){int sleep_ms = (int)(long int)(char *)arg * 10;for (;;){printf("in sthread #%dms\n", sleep_ms);st_usleep(sleep_ms * 1000);}return NULL;}int main(int argc, char **argv){if (argc <= 1){printf("Test the concurrence of state-threads!\n""Usage: %s <sthread_count>\n""eg. %s 10000\n",argv[0], argv[0]);return -1;}if (st_init() < 0){printf("error!");return -1;}int i;int count = std::stoi(argv[1]);for (i = 1; i <= count; i++){if (st_thread_create(do_calc, (void *)i, 0, 0) == NULL){printf("error!");return -1;}}st_thread_exit(NULL);return 0;}#include <stdio.h> #include <st.h> #include <string> void *do_calc(void *arg) { int sleep_ms = (int)(long int)(char *)arg * 10; for (;;) { printf("in sthread #%dms\n", sleep_ms); st_usleep(sleep_ms * 1000); } return NULL; } int main(int argc, char **argv) { if (argc <= 1) { printf("Test the concurrence of state-threads!\n" "Usage: %s <sthread_count>\n" "eg. %s 10000\n", argv[0], argv[0]); return -1; } if (st_init() < 0) { printf("error!"); return -1; } int i; int count = std::stoi(argv[1]); for (i = 1; i <= count; i++) { if (st_thread_create(do_calc, (void *)i, 0, 0) == NULL) { printf("error!"); return -1; } } st_thread_exit(NULL); return 0; }#include <stdio.h> #include <st.h> #include <string> void *do_calc(void *arg) { int sleep_ms = (int)(long int)(char *)arg * 10; for (;;) { printf("in sthread #%dms\n", sleep_ms); st_usleep(sleep_ms * 1000); } return NULL; } int main(int argc, char **argv) { if (argc <= 1) { printf("Test the concurrence of state-threads!\n" "Usage: %s <sthread_count>\n" "eg. %s 10000\n", argv[0], argv[0]); return -1; } if (st_init() < 0) { printf("error!"); return -1; } int i; int count = std::stoi(argv[1]); for (i = 1; i <= count; i++) { if (st_thread_create(do_calc, (void *)i, 0, 0) == NULL) { printf("error!"); return -1; } } st_thread_exit(NULL); return 0; }
关于StateThreads的运行原理,可以看文章《SRS开源直播服务 – StateThreads微线程框架学习》
SRS中的StateThreads
使用的源码为SRS4.0
系统架构图:
在SRS的源码中,StateThreads在srs_st_init()
函数中完成初始化。具体的调用流程如下。
SRS的main函数在文件srs_main_server.cpp中。
srs_main_server.cpp
......int main(int argc, char** argv){srs_error_t err = do_main(argc, argv);......}srs_error_t do_main(int argc, char** argv){srs_error_t err = srs_success;// Initialize global or thread-local variables.if ((err = srs_thread_initialize()) != srs_success) {return srs_error_wrap(err, "thread init");}......}...... int main(int argc, char** argv) { srs_error_t err = do_main(argc, argv); ...... } srs_error_t do_main(int argc, char** argv) { srs_error_t err = srs_success; // Initialize global or thread-local variables. if ((err = srs_thread_initialize()) != srs_success) { return srs_error_wrap(err, "thread init"); } ...... }...... int main(int argc, char** argv) { srs_error_t err = do_main(argc, argv); ...... } srs_error_t do_main(int argc, char** argv) { srs_error_t err = srs_success; // Initialize global or thread-local variables. if ((err = srs_thread_initialize()) != srs_success) { return srs_error_wrap(err, "thread init"); } ...... }
srs_app_threads.cpp
......srs_error_t srs_thread_initialize(){srs_error_t err = srs_success;......// Initialize ST, which depends on pps cids.if ((err = srs_st_init()) != srs_success) {return srs_error_wrap(err, "initialize st failed");}......}............ srs_error_t srs_thread_initialize() { srs_error_t err = srs_success; ...... // Initialize ST, which depends on pps cids. if ((err = srs_st_init()) != srs_success) { return srs_error_wrap(err, "initialize st failed"); } ...... } ............ srs_error_t srs_thread_initialize() { srs_error_t err = srs_success; ...... // Initialize ST, which depends on pps cids. if ((err = srs_st_init()) != srs_success) { return srs_error_wrap(err, "initialize st failed"); } ...... } ......
srs_service_st.cpp
......srs_error_t srs_st_init(){......int r0 = 0;if((r0 = st_init()) != 0){return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);}............ srs_error_t srs_st_init() { ...... int r0 = 0; if((r0 = st_init()) != 0){ return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0); } ............ srs_error_t srs_st_init() { ...... int r0 = 0; if((r0 = st_init()) != 0){ return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0); } ......
在srs_service_st.cpp中调用StateThreads库的初始化函数,完成StateThreads的初始化。