/favicon.ico

子恒的博客

[C++]C++ 11/14 笔记(四)

本部分介绍了c++11的并发编程。这些笔记是未完成的。 语法参考这里: 现代C++教程 实现参考这里: C++11中的mutex, lock,condition variable实现分析 std::thread C++ 11为我们带来了语言级的线程支持,包括线程的创建和等待: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 //g++ -o t main.cpp -lpthread --std=c++11 #include <iostream> #include <thread> #include <chrono> void foo(int sec) { // sleep_for() 休眠sec秒后唤醒 std::this_thread::sleep_for(std::chrono::seconds(sec)); } int main() { // join() 阻塞等待线程 // joinable() 是否可以join(没设置任务函数不可以join) std::thread t; std::cout << "before starting, joinable: " << t.

[后台]负载均衡 (三)限流篇

限流 限流能力是高并发系统中,对于服务提供方的一种保护手段。通过限流功能,我们可以通过控制QPS的方式,以避免被瞬时的流量高峰冲垮,从而保障系统的高可用性。 考虑的问题 完成一个限流系统, 我们可以结合场景的需要做下面的考虑 多规则匹配:是否会存在有多重规则的限流?比如有的规则限制每天1000次,有的规则限制每分钟1次?是同时生效还是优先生效某个? 资源类型:能限流什么?QPS,连接数,并发数 全局限流/单机限流:多个服务的实例共享一个全局的流量限额,比如所有机器共享1000QPS。或者单个实例的限流,比如被调限定每台机器不超过1000QPS 限流阈值:单位时间内的最大配额数。是按照每秒种一次,还是按照每分钟60次? 限流处理:客户端如何处理超出限额的请求?超额后直接拒绝,还是超额后进行排队? 抽象出一个方案 接口级别限流:每个接口分配一个appid和key,各自计算各自的配额 多维度限流:支持每秒N次、每分钟N次、每天N次等维度 匀速防刷:假设配置了每分钟60次,依然可能出现第一秒访问了60次用光了配额。匀速防刷可以匀速消耗配额,解决这个问题 多级限流:支持不同的限流规则,并有采用的优先级,采用优先级最高的方案进行限流 限流算法 固定窗口 固定窗口是在一段时间内可以限制访问次数的方法。 将时间划分为多个窗口 在每个窗口内每有一次请求就将计数器加一 如果计数器超过了限制数量,则本窗口内所有的请求都被丢弃。当时间到达下一个窗口时,计数器重置 这样有一定的限流效果,但是限制住的流量可能是有毛刺的。比如1000次/分钟,可能00:59的时候有1000流量,01:00的时候也有1000流量,这样这两秒内就有2000流量! 具体实现:用一个变量C标记访问次数,一个事件定时过期,并在过期时把变量C清零: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type FixedWindowCounter struct { TimeSlice time.Duration NowCount int32 AllowCount int32 } func (p *FixedWindowCounter) Take() bool { once.Do(func() { go func() { for { select { case <-time.

[后台]负载均衡(二)能力篇

名字服务 基础设计 名字服务考虑的基本设计 客户端发现: 服务提供者的实例在启动时或者位置信息发生变化时会向服务注册表注册自身,在停止时会向服务注册表注销自身,如果服务提供者的实例发生故障,在一段时间内不发送心跳之后,也会被服务注册表注销。 服务消费者的实例会向服务注册表查询服务提供者的位置信息,然后通过这些位置信息直接向服务提供者发起请求。 服务端发现: 第一步与客户端发现相同。 服务消费者不直接向服务注册表查询,也不直接向服务提供者发起请求,而是将对服务提供者的请求发往一个中央路由器或者负载均衡器,中央路由器或者负载均衡器查询服务注册表获取服务提供者的位置信息,并将请求转发给服务提供者。 这两种模式各有利弊,客户端发现模式的优势是,服务消费者向服务提供者发起请求时比服务端发现模式少了一次网络跳转,劣势是服务消费者需要内置特定的服务发现客户端和服务发现逻辑; 服务端发现模式的优势是服务消费者无需内置特定的服务发现客户端和服务发现逻辑,劣势是多了一次网络跳转,并且需要基础设施环境提供中央路由机制或者负载均衡机制。目前客户端发现模式应用的多一些,因为这种模式的对基础设施环境没有特殊的要求,和基础设施环境也没有过多的耦合性。 主调调用被调时,根据被调的名字从服务注册中心获取服务实例列表,包括节点ip、端口、权重、地理位置等;一般采取分钟级别的定时任务去拉取,本地做缓存,异步更新。 实现方式 DNS,传播速度太慢,没法发现端口。SkyDNS解决了这个问题,在k8s里大量使用 zookeeper或者etcd,如SmartStack,能保证强一致,但是要做很多开发 Eureka。Netflix的java生态里的优秀方案 Consul,提供服务配置、服务的内存和磁盘监测等 服务注册信息 IP和端口 一个服务端要接入名字服务,必须要先提供自己的IP和端口信息。 IP的获取方法: 通过遍历网卡的方式去获取,找到第一个不为本地环回地址的 IP 地址。dubbo就是这种方法 指定网卡名interfaceName,来获取IP 直接与服务注册中心建立 socket 连接,然后通过socket.getLocalAddress() 这种方式来获取本机的 IP 端口的获取方法: 一般的RPC服务或者Web服务监听的端口都在配置中写好,可以直接获取上报。 扩展设计 除了IP和端口,可能还有一些常用的服务信息需要注册上来,提供更高级的功能: 1.支持TLS:想知道某个 HTTP 服务是否开启了 TLS。 2.权重:对相同服务下的不同节点设置不同的权重,进行流量调度。 3.环境分配:将服务分成预发环境和生产环境,方便进行AB Test功能。 4.机房:不同机房的服务注册时加上机房的标签,以实现同机房优先的路由规则。 无损注册/下线 虽然服务注册一般发生在服务的启动阶段,但是细分的话,服务注册应该在服务已经完全启动成功,并准备对外提供服务之后才能进行注册。 1.有些 RPC 框架自身提供了方法来判断服务是否已经启动完成,如 Thrift ,我们可以通过 Server.isServing() 来判断。 2.有一些 RPC 框架本身没有提供服务是否启动完成的方式,这时我们可以通过检测端口是否已经处于监听状态来判断。 3.而对于 HTTP 服务,服务是否启动完毕也可以通过端口是否处于监听状态来判断。 下线也是一样的,可以注册服务下线的回调,或者监听服务下线的信号,或者做健康检查 健康检查 客户端主动心跳上报健康: 客户端每隔一定时间主动发送“心跳”的方式来向服务端表明自己的服务状态正常,心跳可以是 TCP 的形式,也可以是 HTTP 的形式。 也可以通过维持客户端和服务端的一个 socket 长连接自己实现一个客户端心跳的方式。 客户端的健康检查只能表明网络可达,不能代表服务可用。服务端的健康检查可以准确获得服务的健康状态: 服务端调用服务发布者某个 HTTP 接口来完成健康检查。 对于没有提供 HTTP 服务的 RPC 应用,服务端调用服务发布者的接口来完成健康检查。 可以通过执行某个脚本的形式来进行综合检查,覆盖多个场景。

[后台]负载均衡 (一)算法篇

当单机的访问压力很大时,就需要引入集群。集群一个很重要的事情就是把请求均匀地分配在各个机器上,这就是负载均衡的雏形。 有基于MAC地址的二层负载均衡和基于IP地址的三层负载均衡。 二层负载均衡会通过一个虚拟MAC地址接收请求,然后再分配到真实的MAC地址;三层负载均衡会通过一个虚拟IP地址接收请求,然后再分配到真实的IP地址; 四层通过虚拟IP+端口接收请求,然后再分配到真实的服务器(比如LVS,F5);七层通过虚拟的URL或主机名接收请求,然后再分配到真实的服务器(Haproxy和Nginx)。 四层和七层是最常见的负载均衡模型。 **四层:**以常见的TCP为例,负载均衡设备在接收到第一个来自客户端的SYN请求时,通过负载均衡算法选择服务器,并对报文中目标IP地址进行修改(改为后端服务器IP),直接转发给该服务器。TCP的连接建立,即三次握手是客户端和服务器直接建立的,负载均衡设备只是起到一个类似路由器的转发动作。在某些部署情况下,为保证服务器回包可以正确返回给负载均衡设备,在转发报文的同时可能还会对报文原来的源地址进行修改。 **七层:**以常见的TCP为例,负载均衡设备如果要根据真正的应用层内容再选择服务器,只能先代理最终的服务器和客户端建立连接(三次握手)后,才可能接受到客户端发送的真正应用层内容的报文,然后再根据该报文中的特定字段,再加上设置的负载均衡算法,选择内部某台服务器。负载均衡设备在这种情况下,更类似于一个代理服务器。负载均衡和前端的客户端以及后端的服务器会分别建立TCP连接。所以从这个技术原理上来看,七层负载均衡明显的对负载均衡设备的要求更高,处理七层的能力也必然会低于四层模式的部署方式。 参考资料:四层和七层负载均衡的区别 nginx用的负载均衡算法 Nginx可以作为HTTP反向代理,把访问本机的HTTP请求,均分到后端集群的若干台服务器上。负载均衡的核心就是负载均衡所使用的平衡算法,适用于各种场景。 Nginx的负载均衡算法 Nginx目前提供的负载均衡模块: ngx_http_upstream_round_robin,加权轮询,可均分请求,是默认的HTTP负载均衡算法,集成在框架中。 ngx_http_upstream_ip_hash_module,IP哈希,可保持会话。 ngx_http_upstream_least_conn_module,最少连接数,可均分连接。适用于链接数体现资源的服务,比如FTP。 ngx_http_upstream_hash_module,一致性哈希,可减少缓存数据的失效。 随机访问 在介绍nginx的模式前,先介绍下普通的负载均衡方法。假设有7个请求,我们给A、B、C三个节点分别4、2、1的权重。最朴素的负载均衡方式有下面几种: 完全轮询:访问完A去访问B,访问完B去访问C,再去访问A。缺点是没有权重,不能根据负载调节。 列表轮询:构造一个数组[A, A, A, A, B, B, C],每次pop出去一个访问。缺点是pop出去的元素太随机,可能一次集中访问A ,而且占用内存太大,对于几万的权重范围不合适。 随机数:我们按照A、B、C的权重划分好区间,A(0、1、2、3),B(4、5),C(6),然后取一个随机数,模余7,看看最后的结果在哪个区间内,就取哪个节点。缺点是完全随机,无法避免集中访问。 加权轮询 假设有7个请求,我们给A、B、C三个节点分别4、2、1的权重。如果随机按照概率来选,那么很可能出现连续四个请求都在A上面的情况,这样只能保证结果看起来均衡,但是时间段内不均衡。Nginx采用了一种平滑的加权平均算法来选取节点(Weighted Round Robin)。 先引入三个概念,都用来描述服务器节点的权重: $W$ : weight 我们指定的权重,就是上面例子中的4、2、1。 $W_{ew}$: effective_weight 有效权重,初始值为$W$。用来对故障节点降权。 如果通信中有错误产生,就减小effective_weight。(故障降权) 此后有新的请求过来时,再逐步增加effective_weight,最终又恢复到weight。(自动恢复) $W_{cw}$ : current_weight 当前真实权重,每次都会选到最大的真实权重的节点去请求 真实权重$W_{cw}$计算方式: 初始化:$W_{cw}$ 起始值为0 获得实时权重:请求到来后,给每个节点的真实权重加上有效权重,即$每个节点 W_{cw} = W_{cw} + W_{ew}$ 选出最大权重:选择真实权重最大的节点最为本次请求的目标 回避刚选的节点:最选择的节点的实时权重减去所有节点(包括自己)的有效权重和。即$选中节点 W_{cw} = W_{cw} - (W_{ew1} + W_{ew2} + … + W_{ewn})$ 来看一个具体的例子: 假设A、B、C三个节点的权重分别为4、2、1。

[Linux]文件和零拷贝

文件 文件描述符 文件描述符:在Linux中,所有的文件都是通过文件描述符引用。fd是一个非负整数。按照惯例,标准输入的fd是0,标准输出的fd是1,标准错误的fd是2。分别作为STDIN_FILENO、STDOUT_FILENO、STDERR_FILENO定义在unistd中。 文件描述符的上限:fd的范围是 0 ~ OEPN_MAX-1 。OPEN_MAX一般是20或者64。这代表一个进程最多打开19或63个文件。 文件内核API 文件的打开:int open(const char *pathname, int flags)参数填上要打开的文件的名字(甚至可以不存在),会返回打开的fd。下面是一些常用的选项: 1 2 3 4 5 6 O_APPEND 文件将以追加模式打开,每次写操作之前,文件偏移量都会置于文件末尾。 O_CREAT 创建文件。如果文件已经存在,则会直接打开。 O_EXCL 和上面的O_CREAT联用时,表示如果文件已经存在,就会失败。可以保证多进程同时创建文件的原子操作。 O_SYNC 打开文件用于同步I/O。在数据写到磁盘之前写操作不会完成;一般的读操作已是同步的,所以这个标志对读操作没有影响。 O_NONBLOCK 如果可以,文件将在非阻塞模式下打开。任何其它操作都不会使该进程在I/O中阻塞。这种情况可能只用于FIFO。 O_DIRECT 打开文件用于直接I/O。将会绕过缓冲区操作。 文件的关闭:int close(int fd) 关闭一个文件会释放上面所有的记录锁。一个进程终止后,内核会自动关闭它打开的所有文件。 文件定位: off_t lseek(int fd, off_t offset, int whence) 参数whence指定了偏移地址(开始点SEEK_SET 当前点SEEK_CUR 结束点SEET_END),另一个参数offset是从参考点开始的偏移量(可正可负)。返回新的偏移地址。 空洞文件:如果写入一部分之后lseek到后面去写入,中间就会产生一个空洞,实际不占用磁盘大小。 文件读取:ssize_t read(int fd, void *buf, size_t nbytes) 返回读取到的字节数。下面的情况可能使得读取到的字节数少于需要的字节数: 1)再读这么多就到了文件尾 2)读网络缓冲区读完 3)读FIFO管道包含的字节少于需要的长度 4)读终端设备,一次一行 文件写入:ssize_t write(int fd, const void* buf, size_t nbytes) 返回写入的字节数。一般和nbytes相同。 文件属性编辑:int fcntl(int fd, int cmd, .

[Linux]Linux socket API

创建和关闭 int socket(int domain, int type, int protocol) 创建一个socket。 domain 指定了通信的特性。AF_UNIX Unix域,AF_INET IPv4域。 AF_INET6 IPv6域。 type 指定了连接的类型。 SOCK_STREAM 有序、可靠、双向、面向连接的字节流 TCP SOCK_DGRAM 不可靠、无连接、固定长度的报文 UDP SOCK_SEQPACKET 固定长度、有序、可靠、双向、面向连接的字节流 protocol 为0时,选择指定域的默认协议。AF_INET域的SOCK_STREAM默认的协议为TCP,SOCK_DGRAM则为UDP。还有IPPROTO_ICMP、IPPROTO_IP、IPPROTO_RAW等。SOCK_STREAM提供字节流服务,所以程序分不出报文的界限。 int shutdown(int socketfd, int how) 关闭一个socket。 套接字函数是双向的。可以用这个函数关闭套接字的某个方向的IO。 how是SHUT_RD表示关闭读端,无法从套接字读取数据。SHUT_WR是关闭写端,无法向套接字写入数据。SHUT_RDWR无法读也无法写。 close()是一个fd的通用释放函数。他和shutdown有何不同?1) close要等所有的活动引用关闭后才释放套接字。这使得dup之后的套接字必须等待所有的引用释放。但是shutdown和引用fd数量无关。2)shutdown可以方便地关闭读写的任何一端。 网络地址 地址的结构体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 // Linux 套接字 // 这是Linux定义的一个通用结构。所有地址都可以强转为这个结构体,比如IPV4、IPV6等,便于使用。 struct sockaddr { u_short sa_family; /* address family */ char sa_data[14]; /* up to 14 bytes of direct address */ }; // IPv4 地址 // 1.

[后台]RocketMQ的架构和设计

主要整理文献: RocketMQ部署架构和技术架构 - Github RocketMQ关键机制的设计原理 - Github RocketMQ 原理简介 - 淘宝消息中间件项目组 设计理念和部署 消息队列需要解决的问题 发布/订阅 最基础的需求,可以做解耦&聚合,如果用Redis做,不够可靠 支持优先级队列、延时队列 顺序消费,rockmq严格有序 支持消息过滤,Producer和consumer共同过滤 持久化 内存缓存+文件 异常恢复 broker crash,os crash,掉电 —保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步) 磁盘损坏,机器永久损坏 —通过异步复制,可保证99%的消息不丢 实时性 RocketMQ使用长轮询Pull方式,可保证消息非常实时,消息实时性不低于Push。 At least Once 和 Exactly Only Once, 至少消费一次且只消费一次 broker的buffer容量问题。RocketMQ 的内存Buffer持久化在硬盘,抽象成一个无限长度的队列,不管有多少数据进来都能装得下,当然也会定时清理。 回溯消费 一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。 RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。 消息堆积 消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,对性能影响不大,但是不能堆积太多 消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。 消息重试 消息重试有两种原因,一种是消息本身处理失败,如编码有问题等,重试永远不会成功。另一部分是处理消息依赖的下游服务暂时不可用,一段时间重试后可以成功。所以可以消极重试,逐步重试增大等待重试间隔。 RockMQ 模块 Name Server :NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。 (1) 路由管理 Broker管理:NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活; 路由信息管理:每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,找到对应topic的路由信息,从而进行消息的投递和消费。 (2) 无状态:NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。它是一个几乎无状态的结点,他们之间互不通信。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。 (3) 随机选择:客户端连接时,会随机选择。 (4) 长连接:Broker向所有的NameServer结点建立长连接,注册Topic信息。Producer和Consumer也是长连接。

[Linux]进程间通信

管道 管道是进程间通信的最古老方式。它通过共享文件来完成进程间的通信。它有两个局限性: 它是半双工的。数据只能从一个进程流向另一个进程。 通信的进程之间必须另一个进程是fork出来的。通常,一个进程会创建一个管道,然后执行fork,这样管道就会在两个进程之间共享。 FIFO解决了第一种局限性,Unix域套接字解决了第二种。我们先来看管道。 int pipe(int pipefd[2]);管道由pipe函数创建。 参数pipefd是一个两个元素的数组。pipefd[0]用来读,pipefd[1]用来写。 成功返回0 。失败返回-1并设置errno。 单进程的管道没有任何用处。在这个函数之后一般会fork,然后一个进程来写pipefd[1],一个进程来读pipefd[0]。他们的另一个fd元素将会被关闭。 下面是一段实例, 父进程通过管道向子进程传递了信息,子进程接收并把他们输出: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include "apue.h" int main(void) { int n; int fd[2]; pid_t pid; char line[MAXLINE]; if (pipe(fd) < 0) err_sys("pipe error"); if ((pid = fork()) < 0) { err_sys("fork error"); } else if (pid > 0) { /* parent */ close(fd[0]); write(fd[1], "hello world\n", 12); } else { /* child */ close(fd[1]); n = read(fd[0], line, MAXLINE); write(STDOUT_FILENO, line, n); } exit(0); } FILE *popen(const char *command, const char *type) 函数popen执行一个命令,然后返回这个命令的文件指针。

[Linux]高级IO

如果有多个IO需要处理 当一个描述符读,然后又写到另一个描述符时,可以用循环的方式访问阻塞io: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <stdio.h> #include <unistd.h> #define BUF_SIZE 128 int main(){ char buf[BUF_SIZE]; int n; while ( n = read(STDIN_FILENO, buf, BUF_SIZE) > 0) { if (write(STDOUT_FILENO, buf, n) != n){ fprintf(stderr, "%s\n", "write error!"); break; } } return 0; } 但是如果要从两个fd读的时候,就不能用阻塞io去读了。因为进程阻塞在某个fd的时候,另一个fd即使输入了数据也无法处理。 比如一个telnet程序的输出有两个来源,用户输入回显和远端回包。如果阻塞在等待远端回包,用户输入就不会有回显了。解决这个问题就几种思路: 再fork一个进程,每个进程处理一个fd。这看起来很好,但是处理EOF却成了问题。如果子进程先读到EOF,那么子进程终止,返回SIGCHLD给父进程,然后父进程终止。如果父进程读到EOF,父进程就需要通知子进程结束,此时需要额外的信号(如SIGUSER1). 用两个线程来处理,每个线程一个fd。同样的,处理线程之间的同步也会变的比较复杂。 用非阻塞的io来轮询。先read一个fd,如果没数据立即返回,然后等待若干时间,然后再read下一个fd,直到某个fd有数据读为止。这个方法有两个缺点,一是频繁的read调用浪费了cpu时间,但是大部分时间是没数据读的。二是每次read返回后等待的时间不好确定,太久会读取不及时,太短会使得cpu更加繁忙。 使用异步IO。当fd归属的设备准备好的时候,用信号通知处理进程。这个方法有两个缺点,一是信号在不同的系统上实现不同,移植性较差。二是进程收到的信号只有一种(SIGPOLL或者SIGIO),进程无法分辨是哪个fd。 有没有比较完善的方案?io多路复用来了。这种方案会记录一个我们需要的fd的列表,然后我们去查询,当这个列表中有fd有数据时,查询就会返回这个fd。 IO多路复用:select int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); select 函数提供了查询fd状态的能力。我们传入希望监听的fd,内核告诉我们哪些fd已经有事件发生并返回。

[Linux]谈一谈并行Counting

简单的并行计数 在一个简单的多线程计数程序中,我们假设要每个线程去把sum的值多加100m,同时进行。代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <pthread.h> #include <stdio.h> #if 0 #define ADD_P(x) __sync_fetch_and_add((x), 1) #else #define ADD_P(x) (++(*x)) #endif #define TC 8 void *thgo(void *arg){ long i = 1000*1000*100; while(i-- > 0){ADD_P((long *)arg);}; pthread_t me = pthread_self(); printf("thread sum: %ld tid: %lu \n", *(long *)arg, (unsigned long)me); } int main (){ long sum = 0; pthread_t ths[TC]; // threads for (int i = 0; i < TC; ++i){ pthread_create(&ths[i], NULL, thgo, &sum); } // main thread thgo(&sum); // join for (int i = 0; i < TC; ++i){ pthread_join(ths[i], NULL); } printf("all final sum : %ld\n", sum); return 0; } 如果使用一般的计数,会出现严重的数据踩踏问题,导致结果只能取得一定近似的值: