Home Tags Posts tagged with "Redis"

Redis

0 49

今天学习的时候看到了Redis的pipeline,因为对这方面不甚了解,所以特地找了相关资料,并进行个人总结

 

什么是Pipeline?

可以将其解释为管道,流水线。如果站在软件架构的角度来说,Pipeline是一种通信架构。

 

为什么要有Pipeline?

这里以Redis的Pipeline来说明。

(一)简介
  Redis 使用的是客户端-服务器(CS)模型和请求/响应协议的 TCP 服务器。这意味着通常情况下一个请求会遵循以下步骤:

  • 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。
      Redis 客户端与 Redis 服务器之间使用 TCP 协议进行连接,一个客户端可以通过一个 socket 连接发起多个请求命令。每个请求命令发出后 client 通常会阻塞并等待 redis 服务器处理,redis 处理完请求命令后会将结果通过响应报文返回给 client,因此当执行多条命令的时候都需要等待上一条命令执行完毕才能执行。比如:

这里写图片描述

其执行过程如下图所示:这里写图片描述

由于通信会有网络延迟假如 client 和 server 之间的包传输时间需要0.125秒。那么上面的三个命令6个报文至少需要0.75秒才能完成。这样即使 redis 每秒能处理100个命令,而我们的 client 也只能一秒钟发出四个命令。这显然没有充分利用 redis 的处理能力。

管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。 Pipeline 的默认的同步的个数为53个,也就是说 arges 中累加到53条数据时会把数据提交。其过程如下图所示:client 可以将三个命令放到一个 tcp 报文一起发送,server 则可以将三条命令的处理结果放到一个 tcp 报文返回。
这里写图片描述

需要注意到是用 pipeline 方式打包命令发送,redis 必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。具体多少合适需要根据具体情况测试。

 

(二)比较普通模式与 PipeLine 模式

测试代码可参考原文链接:

https://blog.csdn.net/u011489043/article/details/78769428

 

(三)适用场景(可靠性,实时性)

有些系统可能对可靠性要求很高每次操作都需要立马知道这次操作是否成功,是否数据已经写进 redis 了,那这种场景就不适合。

还有的系统,可能是批量的将数据写入 redis,允许一定比例的写入失败,那么这种场景就可以使用了,比如10000条一下进入 redis,可能失败了2条无所谓,后期有补偿机制就行了,比如短信群发这种场景,如果一下群发10000条,按照第一种模式去实现,那这个请求过来,要很久才能给客户端响应,这个延迟就太长了,如果客户端请求设置了超时时间5秒,那肯定就抛出异常了,而且本身群发短信要求实时性也没那么高,这时候用 pipeline 最好了。

 

(四)管道(Pipelining) VS 脚本(Scripting)

管道和事务是不同的,pipeline只是表达“交互”中操作的传递的方向性,pipeline也可以在事务中运行,也可以不在。无论如何,pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的相应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行。简单来说就是管道中的命令是没有关系的,它们只是像管道一样流水发给server,而不是串行执行,仅此而已;但是如果pipeline的操作被封装在事务中,那么将有事务来确保操作的成功与失败。

使用管道可能在效率上比使用script要好,但是有的情况下只能使用script。因为在执行后面的命令时,无法得到前面命令的结果,就像事务一样,所以如果需要在后面命令中使用前面命令的value等结果,则只能使用script或者事务+watch。

 

参考链接:

1.https://blog.csdn.net/w1lgy/article/details/84455579?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link

2.https://blog.csdn.net/fangjian1204/article/details/50585080

3.https://blog.csdn.net/u011489043/article/details/78769428

0 38

http://yangbili.co/%e4%bb%8eredis%e9%9b%86%e7%be%a4%e7%9a%84%e6%90%ad%e5%bb%ba%e6%9d%a5%e5%89%96%e6%9e%90/ (剖析Redis集群之 主从复制)这篇博文中,我提到Redis支持三种集群方案

  • 主从复制模式
  • Sentinel(哨兵)模式
  • Cluster模式

哨兵模式应该被称作哨兵机制,它与主从复制模式并非对立的,相反,它是基于主从复制模式实现的。

我们先复习一下主从复制模式的优缺点:

优点:

  • 支持主从复制,主机会自动将数据同步到从机,可以进行读写分离
  • 为了分载Master的读操作压力,Slave服务器可以为客户端提供只读操作的服务,写服务仍然必须由Master来完成
  • Slave同样可以接受其它Slaves的连接和同步请求,这样可以有效的分载Master的同步压力。
  • Master Server是以非阻塞的方式为Slaves提供服务。所以在Master-Slave同步期间,客户端仍然可以提交查询或修改请求。
  • Slave Server同样是以非阻塞的方式完成数据同步。在同步期间,如果有客户端提交查询请求,Redis则返回同步之前的数据

缺点:

  • Redis不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。
  • 主机宕机,宕机前有部分数据未能及时同步到从机,切换IP后还会引入数据不一致的问题,降低了系统的可用性。
  • Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。

 

一、为什么需要哨兵机制

注意这一条:Redis不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。

那么在这个模式下,如果从库发生故障了,客户端可以继续向主库或其他从库发送请求,进行相关的操作,但是如果主库发生故障了,那就直接会影响到从库的同步,因为从库没有相应的主库可以进行数据复制操作了。

而且,如果客户端发送的都是读操作请求,那还可以由从库继续提供服务,这在纯读的业务场景下还能被接受。但是,一旦有写操作请求了,按照主从库模式下的读写分离要求,需要由主库来完成写操作。此时,也没有实例可以来服务客户端的写操作请求了,

也就是说,主从复制模式下,手动的将从库升级为主库,是需要时间的,在这段时间内有两种风险

  1. 写服务中断
  2. 从库无法进行数据同步

从故障发生到运维或者维护人员发现的这段时间里,可能已经造成了巨大的损失,因此,我们必须考虑让Redis实现主从库自动切换,而哨兵机制就是实现的关键。

 

二、主从库自动切换涉及到的三个问题

如果主库挂了,我们就需要运行一个新主库,比如说把一个从库切换为主库,把它当成主库。这就涉及到三个问题:

  1. 主库真的挂了吗?
  2. 该选择哪个从库作为主库?
  3. 怎么把新主库的相关信息通知给从库和客户端呢?

 

三、哨兵机制的工作流程

1.哨兵三大任务

哨兵其实就是一个运行在特殊模式下的 Redis 进程,主从库实例运行的同时,它也在运行。哨兵主要负责的就是三个任务:监控、选主(选择主库)和通知。

监控:(哨兵需要判断主库是否处于下线状态)

监控是指哨兵进程在运行时,周期性地给所有的主从库发送 PING 命令,检测它们是否仍然在线运行。如果从库没有在规定时间内响应哨兵的 PING 命令,哨兵就会把它标记为“下线状态”;同样,如果主库也没有在规定时间内响应哨兵的 PING 命令,哨兵就会判定主库下线,然后开始自动切换主库的流程。

选主:(哨兵需要决定选择哪个从库实例作为主库)

这个流程首先是执行哨兵的第二个任务,选主。主库挂了以后,哨兵就需要从很多个从库里,按照一定的规则选择一个从库实例,把它作为新的主库。这一步完成后,现在的集群里就有了新主库。然后,哨兵会执行最后一个任务:通知。在执行通知任务时,哨兵会把新主库的连接信息发给其他从库,让它们执行 replicaof 命令,和新主库建立连接,并进行数据复制。同时,哨兵会把新主库的连接信息通知给客户端,让它们把请求操作发到新主库上。

通知:

然后,哨兵会执行最后一个任务:通知。在执行通知任务时,哨兵会把新主库的连接信息发给其他从库,让它们执行 replicaof 命令,和新主库建立连接,并进行数据复制。同时,哨兵会把新主库的连接信息通知给客户端,让它们把请求操作发到新主库上。

哨兵机制的三项任务与目标

 

在这三个任务中,通知任务相对来说比较简单,哨兵只需要把新主库信息发给从库和客户端,让它们和新主库建立连接就行,并不涉及决策的逻辑。但是,在监控和选主这两个任务中,哨兵需要做出两个决策:

  • 在监控任务中,哨兵需要判断主库是否处于下线状态;
  • 在选主任务中,哨兵也要决定选择哪个从库实例作为主库。

接下来,我们就先说说如何判断主库的下线状态。你首先要知道的是,哨兵对主库的下线判断有“主观下线”和“客观下线”两种。那么,为什么会存在两种判断呢?它们的区别和联系是什么呢?

2.主观下线和客观下线

哨兵进程会使用 PING 命令检测它自己和主、从库的网络连接情况,用来判断实例的状态。如果哨兵发现主库或从库对 PING 命令的响应超时了,那么,哨兵就会先把它标记为“主观下线”。

如果检测的是从库,那么,哨兵简单地把它标记为“主观下线”就行了,因为从库的下线影响一般不太大,集群的对外服务不会间断。

但是,如果检测的是主库,那么,哨兵还不能简单地把它标记为“主观下线”,开启主从切换。因为很有可能存在这么一个情况:那就是哨兵误判了,其实主库并没有故障。可是,一旦启动了主从切换,后续的选主和通知操作都会带来额外的计算和通信开销。

为了避免这些不必要的开销,我们要特别注意误判的情况。

首先,我们要知道啥叫误判。很简单,就是主库实际并没有下线,但是哨兵误以为它下线了。误判一般会发生在集群网络压力较大、网络拥塞,或者是主库本身压力较大的情况下。

一旦哨兵判断主库下线了,就会开始选择新主库,并让从库和新主库进行数据同步,这个过程本身就会有开销,例如,哨兵要花时间选出新主库,从库也需要花时间和新主库同步。而在误判的情况下,主库本身根本就不需要进行切换的,所以这个过程的开销是没有价值的。正因为这样,我们需要判断是否有误判,以及减少误判。

那怎么减少误判呢?在日常生活中,当我们要对一些重要的事情做判断的时候,经常会和家人或朋友一起商量一下,然后再做决定。哨兵机制也是类似的,它通常会采用多实例组成的集群模式进行部署,这也被称为哨兵集群。引入多个哨兵实例一起来判断,就可以避免单个哨兵因为自身网络状况不好,而误判主库下线的情况。同时,多个哨兵的网络同时不稳定的概率较小,由它们一起做决策,误判率也能降低。

这节课,你只需要先理解哨兵集群在减少误判方面的作用,就行了。至于具体的运行机制,下节课我们再重点学习。在判断主库是否下线时,不能由一个哨兵说了算,只有大多数的哨兵实例,都判断主库已经“主观下线”了,主库才会被标记为“客观下线”,这个叫法也是表明主库下线成为一个客观事实了。这个判断原则就是:少数服从多数。同时,这会进一步触发哨兵开始主从切换流程。

为了方便你理解,我再画一张图展示一下这里的逻辑。如下图所示,Redis 主从集群有一个主库、三个从库,还有三个哨兵实例。在图片的左边,哨兵 2 判断主库为“主观下线”,但哨兵 1 和 3 却判定主库是上线状态,此时,主库仍然被判断为处于上线状态。

客观下线”的标准就是,当有 N 个哨兵实例时,最好要有 N/2 + 1 个实例判断主库为“主观下线”,才能最终判定主库为“客观下线”。这样一来,就可以减少误判的概率,也能避免误判带来的无谓的主从库切换。(当然,有多少个实例做出“主观下线”的判断才可以,可以由 Redis 管理员自行设定)。

好了,到这里,你可以看到,借助于多个哨兵实例的共同判断机制,我们就可以更准确地判断出主库是否处于下线状态。如果主库的确下线了,哨兵就要开始下一个决策过程了,即从许多从库中,选出一个从库来做新主库。

 

梳理思路:首先我们需要明确一点,在主从库实例运行的同时,它也在运行。因为 它身负三大任务:监控,选主和通知。

哨兵进程周期性的给所有的主从库发送PING命令,如果是从库没有在规定时间内回应PING命令,则将它标记为下线;如果主库没有在规定时间响应,有两种情况,第一种是主库真的已经挂掉了,第二种情况是主库没有下线,只是因为集群网络压力大,网络拥堵,因此没有回应;

一旦哨兵判断主库下线,就会启动主从切换,可是假如是第二种情况呢?主库没挂却启动了主从切换,那么这个过程的开销就是没有价值的,而这一切都是由于哨兵误判引起的。

因此,我们应当减小哨兵误判的概率,减少误判概率的方法就是引入多个哨兵实例一起来判断,由多个哨兵一起来决策,减小误判率,我们称这种部署方式为哨兵集群。

当大多数的哨兵实例都判断主库“主观下线”后,主库才会被标记为“客观下线”,此时才会启动主从切换。

如何选定新主库?(筛选 + 打分)

一般来说,我把哨兵选择新主库的过程称为“筛选 + 打分”。简单来说,我们在多个从库中,先按照一定的筛选条件,把不符合条件的从库去掉。然后,我们再按照一定的规则,给剩下的从库逐个打分,将得分最高的从库选为新主库,如下图所示:

在刚刚的这段话里,需要注意的是两个“一定”,现在,我们要考虑这里的“一定”具体是指什么。

首先来看筛选的条件。

一般情况下,我们肯定要先保证所选的从库仍然在线运行。不过,在选主时从库正常在线,这只能表示从库的现状良好,并不代表它就是最适合做主库的。

设想一下,如果在选主时,一个从库正常运行,我们把它选为新主库开始使用了。可是,很快它的网络出了故障,此时,我们就得重新选主了。这显然不是我们期望的结果。

所以,在选主时,除了要检查从库的当前在线状态,还要判断它之前的网络连接状态。如果从库总是和主库断连,而且断连次数超出了一定的阈值,我们就有理由相信,这个从库的网络状况并不是太好,就可以把这个从库筛掉了。

具体怎么判断呢?你使用配置项 down-after-milliseconds * 10。其中,down-after-milliseconds 是我们认定主从库断连的最大连接超时时间。如果在 down-after-milliseconds 毫秒内,主从节点都没有通过网络联系上,我们就可以认为主从节点断连了。如果发生断连的次数超过了 10 次,就说明这个从库的网络状况不好,不适合作为新主库。

好了,这样我们就过滤掉了不适合做主库的从库,完成了筛选工作。

接下来就要给剩余的从库打分了。我们可以分别按照三个规则依次进行三轮打分,这三个规则分别是从库优先级、从库复制进度以及从库 ID 号。只要在某一轮中,有从库得分最高,那么它就是主库了,选主过程到此结束。如果没有出现得分最高的从库,那么就继续进行下一轮。

第一轮:优先级最高的从库得分高。

用户可以通过 slave-priority 配置项,给不同的从库设置不同优先级。比如,你有两个从库,它们的内存大小不一样,你可以手动给内存大的实例设置一个高优先级。在选主时,哨兵会给优先级高的从库打高分,如果有一个从库优先级最高,那么它就是新主库了。如果从库的优先级都一样,那么哨兵开始第二轮打分。

第二轮:和旧主库同步程度最接近的从库得分高。

这个规则的依据是,如果选择和旧主库同步最接近的那个从库作为主库,那么,这个新主库上就有最新的数据。

如何判断从库和旧主库间的同步进度呢?上节课我向你介绍过,主从库同步时有个命令传播的过程。在这个过程中,主库会用 master_repl_offset 记录当前的最新写操作在 repl_backlog_buffer 中的位置,而从库会用 slave_repl_offset 这个值记录当前的复制进度。

此时,我们想要找的从库,它的 slave_repl_offset 需要最接近 master_repl_offset。如果在所有从库中,有从库的 slave_repl_offset 最接近 master_repl_offset,那么它的得分就最高,可以作为新主库。

就像下图所示,旧主库的 master_repl_offset 是 1000,从库 1、2 和 3 的 slave_repl_offset 分别是 950、990 和 900,那么,从库 2 就应该被选为新主库当然,如果有两个从库的 slave_repl_offset 值大小是一样的(例如,从库 1 和从库 2 的 slave_repl_offset 值都是 990),我们就需要给它们进行第三轮打分了。

第三轮:ID 号小的从库得分高。

每个实例都会有一个 ID,这个 ID 就类似于这里的从库的编号。目前,Redis 在选主库时,有一个默认的规定:在优先级和复制进度都相同的情况下,ID 号最小的从库得分最高,会被选为新主库。到这里,新主库就被选出来了,“选主”这个过程就完成了。

我们再回顾下这个流程。首先,哨兵会按照在线状态、网络状态,筛选过滤掉一部分不符合要求的从库,然后,依次按照优先级、复制进度、ID 号大小再对剩余的从库进行打分,只要有得分最高的从库出现,就把它选为新主库。

小结

这节课,我们一起学习了哨兵机制,它是实现 Redis 不间断服务的重要保证。具体来说,主从集群的数据同步,是数据可靠的基础保证;而在主库发生故障时,自动的主从切换是服务不间断的关键支撑。

Redis 的哨兵机制自动完成了以下三大功能,从而实现了主从库的自动切换,可以降低 Redis 集群的运维开销:

  1. 监控主库运行状态,并判断主库是否客观下线;
  2. 在主库客观下线后,选取新主库;
  3. 选出新主库后,通知从库和客户端。

为了降低误判率,在实际应用时,哨兵机制通常采用多实例的方式进行部署,多个哨兵实例通过“少数服从多数”的原则,来判断主库是否客观下线。一般来说,我们可以部署三个哨兵,如果有两个哨兵认定主库“主观下线”,就可以开始切换过程。当然,如果你希望进一步提升判断准确率,也可以再适当增加哨兵个数,比如说使用五个哨兵。

但是,使用多个哨兵实例来降低误判率,其实相当于组成了一个哨兵集群,我们会因此面临着一些新的挑战,例如:

  • 哨兵集群中有实例挂了,怎么办,会影响主库状态判断和选主吗?
  • 哨兵集群多数实例达成共识,判断出主库“客观下线”后,由哪个实例来执行主从切换呢?

要搞懂这些问题,就不得不提哨兵集群了,下节课,我们来具体聊聊哨兵集群的机制和问题。

每课一问

按照惯例,我给你提个小问题。这节课,我提到,通过哨兵机制,可以实现主从库的自动切换,这是实现服务不间断的关键支撑,同时,我也提到了主从库切换是需要一定时间的。所以,请你考虑下,在这个切换过程中,客户端能否正常地进行请求操作呢?如果想要应用程序不感知服务的中断,还需要哨兵或需要客户端再做些什么吗?欢迎你在留言区跟我交流讨论,也欢迎你能帮我把今天的内容分享给更多人,帮助他们一起解决问题。我们下节课见。

回答:

哨兵在操作主从切换的过程中,客户端能否正常地进行请求操作?

如果客户端使用了读写分离,那么读请求可以在从库上正常执行,不会受到影响。但是由于此时主库已经挂了,而且哨兵还没有选出新的主库,所以在这期间写请求会失败,失败持续的时间 = 哨兵切换主从的时间 + 客户端感知到新主库 的时间。

如果不想让业务感知到异常,客户端只能把写失败的请求先缓存起来或写入消息队列中间件中,等哨兵切换完主从后,再把这些写请求发给新的主库,但这种场景只适合对写入请求返回值不敏感的业务,而且还需要业务层做适配,另外主从切换时间过长,也会导致客户端或消息队列中间件缓存写请求过多,切换完成之后重放这些请求的时间变长。

哨兵检测主库多久没有响应就提升从库为新的主库,这个时间是可以配置的(down-after-milliseconds参数)。配置的时间越短,哨兵越敏感,哨兵集群认为主库在短时间内连不上就会发起主从切换,这种配置很可能因为网络拥塞但主库正常而发生不必要的切换,当然,当主库真正故障时,因为切换得及时,对业务的影响最小。如果配置的时间比较长,哨兵越保守,这种情况可以减少哨兵误判的概率,但是主库故障发生时,业务写失败的时间也会比较久,缓存写请求数据量越多。

应用程序不感知服务的中断,还需要哨兵和客户端做些什么?

当哨兵完成主从切换后,客户端需要及时感知到主库发生了变更,然后把缓存的写请求写入到新库中,保证后续写请求不会再受到影响,具体做法如下:

哨兵提升一个从库为新主库后,哨兵会把新主库的地址写入自己实例的pubsub(switch-master)中。客户端需要订阅这个pubsub,当这个pubsub有数据时,客户端就能感知到主库发生变更,同时可以拿到最新的主库地址,然后把写请求写到这个新主库即可,这种机制属于哨兵主动通知客户端。

如果客户端因为某些原因错过了哨兵的通知,或者哨兵通知后客户端处理失败了,安全起见,客户端也需要支持主动去获取最新主从的地址进行访问。

所以,客户端需要访问主从库时,不能直接写死主从库的地址了,而是需要从哨兵集群中获取最新的地址(sentinel get-master-addr-by-name命令),这样当实例异常时,哨兵切换后或者客户端断开重连,都可以从哨兵集群中拿到最新的实例地址。

一般Redis的SDK都提供了通过哨兵拿到实例地址,再访问实例的方式,我们直接使用即可,不需要自己实现这些逻辑。当然,对于只有主从实例的情况,客户端需要和哨兵配合使用,而在分片集群模式下,这些逻辑都可以做在proxy层,这样客户端也不需要关心这些逻辑了,Codis就是这么做的。

另外再简单回答下哨兵相关的问题:

1、哨兵集群中有实例挂了,怎么办,会影响主库状态判断和选主吗?(拜占庭将军问题)

这个属于分布式系统领域的问题了,指的是在分布式系统中,如果存在故障节点,整个集群是否还可以提供服务?而且提供的服务是正确的?

这是一个分布式系统容错问题,这方面最著名的就是分布式领域中的“拜占庭将军”问题了,“拜占庭将军问题”不仅解决了容错问题,还可以解决错误节点的问题,虽然比较复杂,但还是值得研究的,有兴趣的同学可以去了解下。

简单说结论:存在故障节点时,只要集群中大多数节点状态正常,集群依旧可以对外提供服务。具体推导过程细节很多,大家去查前面的资料了解就好。

2、哨兵集群多数实例达成共识,判断出主库“客观下线”后,由哪个实例来执行主从切换呢?(共识算法)

哨兵集群判断出主库“主观下线”后,会选出一个“哨兵领导者”,之后整个过程由它来完成主从切换。

但是如何选出“哨兵领导者”?这个问题也是一个分布式系统中的问题,就是我们经常听说的共识算法,指的是集群中多个节点如何就一个问题达成共识。共识算法有很多种,例如Paxos、Raft,这里哨兵集群采用的类似于Raft的共识算法。

简单来说就是每个哨兵设置一个随机超时时间,超时后每个哨兵会请求其他哨兵为自己投票,其他哨兵节点对收到的第一个请求进行投票确认,一轮投票下来后,首先达到多数选票的哨兵节点成为“哨兵领导者”,如果没有达到多数选票的哨兵节点,那么会重新选举,直到能够成功选出“哨兵领导者”。

 

以上内容整理来自极客时间 《Redis核心技术与实战》

0 50

在项目中用到了 Redis,为了水平扩展Redis 的处理能力(单机上的Redis能力受限于内存),保证Redis的高可用,所以需要搭建Redis集群

虽然之前已经系统的学习过Redis 的相关知识(极客时间的 Redis 课程),但对Redis的实际使用还有所欠缺,正好通过本次搭建集群整合Redis知识点,图中理论和图片来自极客时间。

 

 

关于Redis的介绍可以参考另一篇博文http://yangbili.co/wp-admin/post.php?post=1575&action=edit

 

使用Redis的什么情况下需要集群?

这个标题看上去有点好笑,Redis集群当然是在需要使用Redis集群的情况下使用;话虽如此说,但其实这句话包含很多东西。

第一,Redis不是只有一种模式的,也就是说Redis可以集群,也可以不集群

第二,你必须要使用Redis集群才去考虑它的集群(这句话用于劝告跟我一样的初学者,初学者有可能学习某项知识点,却不知道为什么要学习,或者说不知道学习了应该如何在实际中应用它)

一、实现Redis的水平扩展

好了,那么到底什么情况下需要使用Redis的集群呢?得从Redis的特点说起:redis是基于内存的,高性能,key-value,Nosql数据库;因为是纯内存操作,Redis的性能非常出色,每秒可以处理超过 10万次读写操作,是已知性能最快的Key-Value DB。基于内存却也有一个很大的缺点:数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此Redis适合的场景主要局限在较小数据量的高性能操作和运算上。

因此,我的理解是,当单个Redis实例(或服务)无法满足系统的读写要求,需要进行水平扩展的时候,采用Redis集群的策略。更形象一点就是,我们需要使用Redis存储1000万的数据,而单个的Redis实例因为内存限制无法存入这么多,所以我们使用Redis集群模式,启用多个RedisServer实例以满足需求。

二、强化Redis的读写能力,实现高可靠,保证某一个Redis服务实例挂掉不会影响系统运行

Redis的高可靠有两层含义,一是数据尽量少丢失,二是服务尽量少中断;AOF 和 RDB 保证了前者,而对于后者,Redis 的做法就是增加副本冗余量,将一份数据同时保存在多个实例上。即使有一个实例出现了故障,需要过一段时间才能恢复,其他实例也可以对外提供服务,不会影响业务使用。

结合实际来说,在我的项目中使用到了Redis进行数据的读写,刚开始我采用的是单个Redis服务实例,这种情况下,如果该Redis服务实例挂掉了,整个系统就会因此出现问题,也就是整个系统都不可用了。这样的情况在实际开发中是绝不允许的,在设计系统架构的时候就要充分考虑到尽力的保证系统的高可用性,也就是系统运行不中断。为了达到这个目的,我开始考虑使用Redis集群的策略。

Redis集群中有两种类型的节点:主节点(Master)、从节点(Slave)。

可以把集群理解为策略,而集群方案则是这个策略的具体实现。

 

Redis的集群方案(主从复制模式,哨兵模式,Cluster模式)

Redis支持三种集群方案

  • 主从复制模式
  • Sentinel(哨兵)模式
  • Cluster模式

一、主从复制模式

1.主从复制模式介绍

我们已知Redis通过增加副本冗余量将一份数据同时保存在多个实例上,这样即使一个实例出故障,其他实例也可以对外提供服务,不会影响业务使用。

多实例保存同一份数据,听起来很不错,但是我们必须要考虑一个问题就是:这么多副本,它们之间的数据如何保持一致呢?数据读写操作可以发给所有的实例呢?

实际上,Redis提供了主从库模式,以保证数据副本的一致,主从库之间采用的是读写分离的方式。

读操作:主库、从库都可以接收;

写操作:首先到主库执行,然后,主库将写操作同步给从库。

2.那么,为什么要采用读写分离的方式呢?

你可以设想一下,如果在上图中,不管是主库还是从库,都能接收客户端的写操作,那么,一个直接的问题就是:如果客户端对同一个数据(例如 k1)前后修改了三次,每一次的修改请求都发送到不同的实例上,在不同的实例上执行,那么,这个数据在这三个实例上的副本就不一致了(分别是 v1、v2 和 v3)。在读取这个数据的时候,就可能读取到旧的值。

如果我们非要保持这个数据在三个实例上一致,就要涉及到加锁、实例间协商是否完成修改等一系列操作,但这会带来巨额的开销,当然是不太能接受的。

而主从库模式一旦采用了读写分离,所有数据的修改只会在主库上进行,不用协调三个实例。主库有了最新的数据后,会同步给从库,这样,主从库的数据就是一致的。

 

3.主丛库间如何进行同步?(详细可看 https://time.geekbang.org/column/article/272852

具体工作机制为:

  • slave启动后,向master发送SYNC命令,master接收到SYNC命令后通过bgsave保存快照(RDB持久化),并使用缓冲区记录保存快照这段时间内执行的写命令
  • master将保存的快照文件发送给slave,并继续记录执行的写命令
  • slave接收到快照文件后,加载快照文件,载入数据(此三步为全量复制)
  • master快照发送完后开始向slave发送缓冲区的写命令,slave接收命令并执行,完成复制初始化
  • 此后master每次执行一个写命令都会同步发送给slave,保持master与slave之间数据的一致(后两步为增量复制)

总结来说,从库和主库建立连接,第一次复制采用全量复制,主库生成RDB文件(BGsave),同时记录在生成RDB期间执行的写命令,然后发送给从库,从库收到RDB文件后,首先清空数据库,在本地加载快照文件完成数据加载,因为RDB文件中不存在生成RDB期间执行的写命令,所以主库在发送完RDB文件后还需要发送这些写命令,从库接收这些写命令并重新执行这些操作,如此主从库实现同步。

三个阶段:建立连接,同步RDB文件,同步写命令

部署示例:https://www.zhihu.com/search?

type=content&q=Redis%E9%9B%86%E7%BE%A4

4.主从级联模式分担全量复制时的主库压力(主-从-从)

通过分析主从库间第一次数据同步的过程,你可以看到,一次全量复制中,对于主库来说,需要完成两个耗时的操作:生成 RDB 文件和传输 RDB 文件。

如果从库数量很多,而且都要和主库进行全量复制的话,就会导致主库忙于 fork 子进程生成 RDB 文件,进行数据全量同步。(fork完后的子线程不会阻塞,但是fork出子线程的这个动作会阻塞)

fork 这个操作会阻塞主线程处理正常请求,从而导致主库响应应用程序的请求速度变慢。此外,传输 RDB 文件也会占用主库的网络带宽,同样会给主库的资源使用带来压力。那么,有没有好的解决方法可以分担主库压力呢?

其实是有的,这就是“主 – 从 – 从”模式

在刚才介绍的主从库模式中,所有的从库都是和主库连接,所有的全量复制也都是和主库进行的。现在,我们可以通过“主 – 从 – 从”模式将主库生成 RDB 和传输 RDB 的压力,以级联的方式分散到从库上。

简单来说,我们在部署主从集群的时候,可以手动选择一个从库(比如选择内存资源配置较高的从库),用于级联其他的从库。然后,我们可以再选择一些从库(例如三分之一的从库),在这些从库上执行如下命令,让它们和刚才所选的从库,建立起主从关系。

replicaof 所选从库的IP 6379

这样一来,这些从库就会知道,在进行同步时,不用再和主库进行交互了,只要和级联的从库进行写操作同步就行了,这就可以减轻主库上的压力,如下图所示:

级联的“主-从-从”模式

好了,到这里,我们了解了主从库间通过全量复制实现数据同步的过程,以及通过“主 – 从 – 从”模式分担主库压力的方式。那么,一旦主从库完成了全量复制,它们之间就会一直维护一个网络连接,主库会通过这个连接将后续陆续收到的命令操作再同步给从库,这个过程也称为基于长连接的命令传播,可以避免频繁建立连接的开销。

听上去好像很简单,但不可忽视的是,这个过程中存在着风险点,最常见的就是网络断连或阻塞。如果网络断连,主从库之间就无法进行命令传播了,从库的数据自然也就没办法和主库保持一致了,客户端就可能从从库读到旧数据。

接下来,我们就来聊聊网络断连后的解决办法。

 

5.主从库间网络断了怎么办?

 

 

6. 主从复制的优缺点

优点:

  • master能自动将数据同步到slave,可以进行读写分离,分担master的读压力
  • master、slave之间的同步是以非阻塞的方式进行的,同步期间,客户端仍然可以提交查询或更新请求

缺点:

  • 不具备自动容错与恢复功能,master或slave的宕机都可能导致客户端请求失败,需要等待机器重启或手动切换客户端IP才能恢复(哨兵可解决)
  • master宕机,如果宕机前数据没有同步完,则切换IP后会存在数据不一致的问题
  • 难以支持在线扩容,Redis的容量受限于单机配置

 

个人总结:

Redis集群是为了水平扩展Redis的服务能力,而主从复制可以说是Redis集群实现的方案,我现在更加觉得Cluster集群和哨兵以及主从库复制不是相互对立的,而是一个整体的不同部分,这些不同的部分实现了Redis的高可靠性,高可用性;当然,你可以只实现其中某一部分。

本篇着重讲解的就是其中一个部分—主从复制,后续会讲到的哨兵模式是基于主从复制模式的。

主从复制,是通过增加副本冗余量来实现Redis的服务尽量少终端,以达到高可靠性;实现了副本冗余,就需要考虑各个副本分别负责什么,为了避免客户端对同一个数据进行修改,而不同的修改请求都发送到不同的实例上,在不同的实例上执行而导致的在不同实例上的副本不一样的情况,Redis的主从复制采用读写分离的分工;

读操作,主从都可以接收

写操作,由主库执行,再将写操作同步给从库

那怎么进行同步呢?

先发送RDB,再发送记录的写操作(第一次全量之后是增量,增量复制的时候采用长连接),同时为了减轻主库生成RDB和传输RDB的压力,可以采用 主-从-从的级联模式

那么如果在发送过程中网络中断了怎么办呢?

repl_backlog_buffer环形缓冲区,主库记录写的位置,从库记录自己读到的位置,相减再同步

 

补充:

 

提问:

主从库间的数据复制同步使用的是 RDB 文件,前面我们学习过,AOF 记录的操作命令更全,相比于 RDB 丢失的数据更少。那么,为什么主从库间的复制不使用 AOF 呢?

回答:

1、RDB文件内容是经过压缩的二进制数据(不同数据类型数据做了针对性优化),文件很小。而AOF文件记录的是每一次写操作的命令,写操作越多文件会变得很大,其中还包括很多对同一个key的多次冗余操作。在主从全量数据同步时,传输RDB文件可以尽量降低对主库机器网络带宽的消耗,从库在加载RDB文件时,一是文件小,读取整个文件的速度会很快,二是因为RDB文件存储的都是二进制数据,从库直接按照RDB协议解析还原数据即可,速度会非常快,而AOF需要依次重放每个写命令,这个过程会经历冗长的处理逻辑,恢复速度相比RDB会慢得多,所以使用RDB进行主从全量同步的成本最低。

2、假设要使用AOF做全量同步,意味着必须打开AOF功能,打开AOF就要选择文件刷盘的策略,选择不当会严重影响Redis性能。而RDB只有在需要定时备份和主从全量同步数据时才会触发生成一次快照。而在很多丢失数据不敏感的业务场景,其实是不需要开启AOF的。

 

参考资料:

https://www.zhihu.com/search?type=content&q=Redis%E9%9B%86%E7%BE%A4

Redis哨兵、复制、集群的设计原理与区别 – 知乎 (zhihu.com)

https://blog.csdn.net/shenjianxz/article/details/59775212

https://time.geekbang.org/column/article/272852

基于内存保存消息

最近开发基于Bio的Socket项目的时候,想把简单的聊天室往消息队列的方向靠拢,因此在考虑怎么记录每个客户端所发送的消息(即实现聊天记录的保存功能)

客户端发送消息— 服务器端接收 转发 存储—-目标客户端

服务器接收转发我使用的是Read线程转发即可,那么存储呢?一开始的想法是,要不存储在内存中,即创建HashMap<senderName,msgList>来保存

具体如下:

<橙汁,List<“消息一”,“消息二”…>>

<小阮,List<“消息一”,”消息二”…>>

代码如下:

这样的实现好处在于:简单,明了。客户端将消息发送给服务器,服务器做转发,同时存入msgMemoryMap(全局变量);由于我的网络通信模型是使用的Bio,服务器循环监听,每来一个客户端就创建一个对应的读线程,而这些步骤也是在读线程里面完成的

也就是说 客户端A的消息存入 和客户端B的消息存入不是同一个线程实现的(当然已经使用了线程池进行优化,此处是有隐患的,如果客户端太多那么服务器就要创建很多线程),因为效率上不用太过担心

更高效的序列化

在存入msgMemoryMap之前,我将原来的Serialize序列化方式改为了Protostuff序列化,因为存入消息实在太占用内存了…

如果使用Serialize序列化,平均每条消息要占用300个字节(300B)这还得期望聊天用户发送的消息都是短消息,如果长消息那更恐怖,我计算了一下,如果同时有1000个用户在聊天,每人发送1条,也就是1*1000*300 = 300Kb!这个内存占用实在是太可怕了,因此我不得不考虑更高效的序列化方式,即Protostuff,关于Protostuff的文章会在之后写出,目前只需要知道它的序列化更高效且生成的byte数组大小更小,差不多是Serialize生成的十分之一,那么接下来直接看测试代码:

 

这样仿佛解决了写入的消息占用内存过大的问题?其实还可以进一步优化,比如再优化MessageRedis类的字段,或者先压缩再存入msgMemoryMap,等需要拿出来使用的时候再解压,也就是“时间换空间”的想法,听上去好像使用msgMemoryMap只要解决了内存占用问题就好了,其实并不然,因为内存具有掉电即失的特性。

任何实际开发的项目都不可能将数据简单的写在内存,必须要进行持久化,不然你的用户使用你的聊天室向其他人发送重要的资料和文件,等到后面他需要取查看这些消息的时候,却发现居然全丢了。持久化,欸,会做啊,我写入到数据库去不就不会丢失了嘛。

写入数据库有两种策略:

1.每来一条消息,服务器做转发后,将其写入到数据库;

2.每来一条消息,服务器将其写入到数据库后,再做转发;

我们来分析一下这两种策略,假设是 用户 橙汁 发送给 小阮的一条消息 ;

          策略1 :

           服务器先转发,那挺好,小阮可以立刻收到橙汁所发送的消息,然后橙汁的这条消息写入数据库,完美保存;橙汁第二条消息来的时候重复这个逻辑,因为橙汁发送消息中间是有间隔的,也就是不可能一直不停的发(假设消息有意义),那么这个间隔时间足够橙汁将第一条消息写入数据库了,到此,消息既转发了又保存了,服务器完成一次操作的时间为(T转发 + T存入)。

           似乎万事大吉?其实不然,现在都追求高可用,高并发,高可靠,我们的系统也不能落下。很显然目前的策略没有满足高并发和高可用,因为如果在服务器收到消息并转发后,断电了怎么办?消息并没有被写入数据库,如果小阮收到了消息,比如是 “明天一起去吃饭吧” 然后橙汁把这事忘了,第二天小阮来算账,说 “你昨天说的今天一起吃饭啊”;橙汁说:“噢 是吗? 我看看聊天记录”,结果消息记录居然真的没有!橙汁确实说了这句话,也就是说我们的系统会出现很多很多比这更复杂的问题,那么怎么解决呢?

                 策略2:

                  服务器先写入,再转发,如果写入失败则重试(或者其他策略),直到成功后再转发;这样的话,如果小阮收到了这条消息,一定和数据库中 的是一样的;同样的,如果写入数据库之后断电了,消息没被转发怎么办?只需要标记消息是否转发成功,如果没有的话就重新转发,可以从数据库去获取,不怕数据丢了,当然这里设计还可以详细展开。

分析了两种策略,如果是更追求消息一致性的话,优先选择策略2,当然策略2只是一个简版(悄咪咪的告诉你,这两个策略借鉴了Redis和Mysql的设计思路)

在上述策略的基础上继续思考,写入数据库是可以实现,但是不可忽略数据库的压力

数据库OS:对啊,你想的倒是好,每个线程都写数据库,如果采用策略2还需要从我身上拉取信息,线程压力小了,我数据库的压力大了喂,快找点人替我分担

 

是的,为了缓解数据库的压力,同时又可以完成我们的持久化功能,同时操作起来还快

缓存中间件:你他妈直接报我身份证得了

 

因此,我们可以采用Redis 来实现我们的需求—在转发消息的同时持久化保存消息

一样的:

1.每来一条消息,服务器做转发后,将其写入到Redis;

2.每来一条消息,服务器将其写入到Redis后,再做转发;

            策略1:服务器转发后,写入到Redis,Redis操作起来更快,对其上述写入数据库的策略1来说,就是T保存(保存需要的时间)更短了。如果转发完成后,断电了,没有写入到Redis,这里也会有问题,先按下不表

            策略2:先写入Redis,再转发,因为T保存的时间更短,写入Redis后转发失败的可能性就更小了,同样的,可以在接收消息的客户端设置一个接收反馈,接收到了反馈,没接收到的话这样服务器也知道,可以重发;

Redis的持久化功能保证了即使Redis所在的服务器机器掉电了,也不会丢失太多数据,丢失的数量取决了我们所采用的Redis持久化策略,比如是使用AOF还是使用RDB,其参数设置,以及是否做了Redis集群等等。

好啦,先介绍到这里啦~

什么是分布式锁?为什么要分布式锁?

在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。

那具体什么是分布式锁,分布式锁应用在哪些业务场景、如何来实现分布式锁呢?

 

一 为什么要使用分布式锁

我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,并且可以完美的运行,毫无Bug!

注意这是单机应用,后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:

 

 

上图可以看到,变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!

如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!

为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

二、分布式锁应该具备哪些条件

在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;

2、高可用的获取锁与释放锁;

3、高性能的获取锁与释放锁;

4、具备可重入特性;

5、具备锁失效机制,防止死锁;

6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

三、分布式锁的三种实现方式

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。

  • 基于数据库实现分布式锁;
  • 基于缓存(Redis等)实现分布式锁;
  • 基于Zookeeper实现分布式锁;

 

四、基于数据库的实现方式

基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。

(1)创建一个表:

DROP TABLE IF EXISTS `method_lock`;

CREATE TABLE `method_lock` (

`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘主键’,

`method_name` varchar(64) NOT NULL COMMENT ‘锁定的方法名’,

`desc` varchar(255) NOT NULL COMMENT ‘备注信息’,

`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (`id`),

UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE

) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT=’锁定中的方法’;

2)想要执行某个方法,就使用这个方法名向表中插入数据:

INSERT INTO method_lock (method_name, desc) VALUES (‘methodName’, ‘测试的methodName’);

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;

2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;

3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;

4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。

5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

五、基于Redis的实现方式

1、选用Redis实现分布式锁原因:

(1)Redis有很高的性能;

(2)Redis命令对此支持较好,实现起来比较方便

2、使用命令介绍:

(1)SETNX

SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。

(2)expire

expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。

(3)delete

delete key:删除key

在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

3、实现思想:

(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。(锁失效机制)

(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。(优化 可阻塞等待)

(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。(释放锁机制)

4、 分布式锁的简单实现代码:

#连接redis

redis_client = redis.Redis(host=”localhost”,

port=6379,

password=password,

db=10)

#获取一个锁

lock_name:锁定名称

acquire_time: 客户端等待获取锁的时间

time_out: 锁的超时时间

def acquire_lock(lock_name, acquire_time=10, time_out=10):

“””获取一个分布式锁”””

identifier = str(uuid.uuid4())

end = time.time() + acquire_time

lock = “string:lock:” + lock_name

while time.time() < end:

if redis_client.setnx(lock, identifier):

# 给锁设置超时时间, 防止进程崩溃导致其他进程无法获取锁

redis_client.expire(lock, time_out)

return identifier

elif not redis_client.ttl(lock):

redis_client.expire(lock, time_out)

time.sleep(0.001)

return False

#释放一个锁

def release_lock(lock_name, identifier):

“””通用的锁释放函数”””

lock = “string:lock:” + lock_name

pip = redis_client.pipeline(True)

while True:

try:

pip.watch(lock)

lock_value = redis_client.get(lock)

if not lock_value:

return True

if lock_value.decode() == identifier:

pip.multi()

pip.delete(lock)

pip.execute()

return True

pip.unwatch()

break

except redis.excetions.WacthcError:

pass

return False

5、测试刚才实现的分布式锁

例子中使用50个线程模拟秒杀一个商品,使用–运算符来实现商品减少,从结果有序性就可以看出是否为加锁状态。

def seckill():

identifier=acquire_lock(‘resource’)

print(Thread.getName(),”获得了锁”)

release_lock(‘resource’,identifier)

for i in range(50):

t = Thread(target=seckill)

t.start()

 

六、基于ZooKeeper的实现方式

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

(1)创建一个目录mylock;

(2)线程A想获取锁就在mylock目录下创建临时顺序节点;

(3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;

(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;

(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。

缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。

 

七、总结

上面的三种实现方式,没有在所有场合都是完美的,所以,应根据不同的应用场景选择最适合的实现方式。

在分布式环境中,对资源进行上锁有时候是很重要的,比如抢购某一资源,这时候使用分布式锁就可以很好地控制资源。

当然,在具体使用中,还需要考虑很多因素,比如超时时间的选取,获取锁时间的选取对并发量都有很大的影响,上述实现的分布式锁也只是一种简单的实现,主要是一种思想

前面的学习中,我们完成了防止超卖商品和抢购接口的限流,已经能够防止大流量把我们的服务器直接搞炸,这篇文章中,我们要开始关心一些细节问题,我们现在设计的系统中还有一些问题

1.我们应该在一定时间内执行秒杀处理,不能在任意时间都接收秒杀请求,如何加入时间验证?

2.对于现有的接口,暴露了我们的接口地址,然后通过脚本抢购怎么办?

3.秒杀开始之后如何限制单个用户的请求频率,即单位时间内的访问次数?

 

此节内容解决:

  • 限时抢购
  • 抢购接口隐藏
  • 单用户限制频率(单位时间内限制访问次数)

 

限时抢购的实现

使用Redis来记录秒杀商品的时间,对秒杀过期的请求进行拒绝处理

 

秒杀请求被拦截:

数据库无改变,即未卖出

Redis 抢购时间可以自己设置,通过传参

0 77

今天这个问题卡在这里很久,想取出opsForZSet.reverseRangeWithScores()里面的值

本来是想转为数组,却发现通过JSON字符串也好,JSON对象也好,都转出失败,要么里面的数据为null,要么报错

思路卡住了

 

通过看到https://blog.csdn.net/qq_41712271/article/details/103700647  这篇文章对Zset的操作,发现可以直接用迭代器.getValue即可

 

Set rangeWithScores = redisTemplate.opsForZSet().reverseRangeWithScores(VIEW_RANK, 0, 10);
System.out.println("获取到的排行和分数列表:" + JSON.toJSONString(rangeWithScores));
Iterator<ZSetOperations.TypedTuple<String>> iterator = rangeWithScores.iterator();
while (iterator.hasNext()) {
    ZSetOperations.TypedTuple<String> typedTuple = iterator.next();
    Object value = typedTuple.getValue();
    double score = typedTuple.getScore();
    System.out.println("获取RedisZSetCommands.Tuples的区间值:" + value + "---->" + score);
}

特此记录,取出来之后加入到自己想加入的List 即可!!!

0 67

1.什么是redis?

基于内存的,高性能,key-value,Nosql数据库

 

2.Reids的特点

Redis本质上是一个Key-Value类型内存数据库,很像memcached,整个数据库统统加载在内存当中进行操作,定期通过异步操作把数据库数据flush到硬盘上进行保存。因为是纯内存操作,Redis的性能非常出色,每秒可以处理超过 10万次读写操作,是已知性能最快的Key-Value DB。

Redis的出色之处不仅仅是性能,Redis最大的魅力是支持保存多种数据结构,此外单个value的最大限制是1GB,不像 memcached只能保存1MB的数据,因此Redis可以用来实现很多有用的功能,比方说用他的List来做FIFO双向链表,实现一个轻量级的高性 能消息队列服务,用他的Set可以做高性能的tag系统等等。另外Redis也可以对存入的Key-Value设置expire时间,因此也可以被当作一 个功能加强版的memcached来用。

Redis的主要缺点是数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此Redis适合的场景主要局限在较小数据量的高性能操作和运算上。

 

3.使用redis有哪些好处?

1.速度快,因为数据存在内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1)

2.支持丰富数据类型,支持string,list,set,sorted set,hash

1)String

常用命令:set/get/decr/incr/mget等;

应用场景:String是最常用的一种数据类型,普通的key/value存储都可以归为此类;

实现方式:String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr、decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

2)Hash

常用命令:hget/hset/hgetall等

应用场景:我们要存储一个用户信息对象数据,其中包括用户ID、用户姓名、年龄和生日,通过用户ID我们希望获取该用户的姓名或者年龄或者生日;

实现方式:Redis的Hash实际是内部存储的Value为一个HashMap,并提供了直接存取这个Map成员的接口。Key是用户ID, value是一个Map。这个Map的key是成员的属性名,value是属性值。这样对数据的修改和存取都可以直接通过其内部Map的Key(Redis里称内部Map的key为field), 也就是通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据

当前HashMap的实现有两种方式:当HashMap的成员比较少时Redis为了节省内存会采用类似一维数组的方式来紧凑存储,而不会采用真正的HashMap结构,这时对应的value的redisObject的encoding为zipmap当成员数量增大时会自动转成真正的HashMap,此时encoding为ht。

3)List

常用命令:lpush/rpush/lpop/rpop/lrange等;

应用场景:Redis list的应用场景非常多,也是Redis最重要的数据结构之一,比如twitter的关注列表,粉丝列表等都可以用Redis的list结构来实现;

实现方式:Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用的这个数据结构。

4)Set

常用命令:sadd/spop/smembers/sunion等;

应用场景:Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的;

实现方式:set 的内部实现是一个 value永远为null的HashMap,实际就是通过计算hash的方式来快速排重的,这也是set能提供判断一个成员是否在集合内的原因。

5)Sorted Set

常用命令:zadd/zrange/zrem/zcard等;

应用场景:Redis sorted set的使用场景与set类似,区别是set不是自动有序的,而sorted set可以通过用户额外提供一个优先级(score)的参数来为成员排序,并且是插入有序的,即自动排序。当你需要一个有序的并且不重复的集合列表,那么可以选择sorted set数据结构,比如twitter 的public timeline可以以发表时间作为score来存储,这样获取时就是自动按时间排好序的。

实现方式:Redis sorted set的内部使用HashMap跳跃表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

3.支持事务,操作都是原子性,所谓的原子性就是对数据的更改要么全部执行,要么全部不执行

4.丰富的特性:可用于缓存,消息,按key设置过期时间,过期后将会自动删除

 

4.redis相比memcached有哪些优势?

  • memcached所有的值均是简单的字符串redis作为其替代者,支持更为丰富的数据类型(五种)
  • redis的速度比memcached快很多 (3) redis可以持久化其数据

 

5.Memcache与Redis的区别都有哪些?

  • 存储方式 Memecache把数据全部存在内存之中,断电后会挂掉,数据不能超过内存大小。Redis有部份存在硬盘上,这样能保证数据的持久性。
  • 数据支持类型 Memcache对数据类型支持相对简单。Redis有复杂的数据类型。
  • 使用底层模型不同 它们之间底层实现方式 以及与客户端之间通信的应用协议不一样。Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求。

 

6.redis适用于的场景?

Redis最适合所有数据in-momory的场景,如:

1.会话缓存(Session Cache)

最常用的一种使用Redis的情景是会话缓存(session cache)。用Redis缓存会话比其他存储(如Memcached)的优势在于:Redis提供持久化。

2.全页缓存(FPC)

除基本的会话token之外,Redis还提供很简便的FPC平台。回到一致性问题,即使重启了Redis实例,因为有磁盘的持久化,用户也不会看到页面加载速度的下降,这是一个极大改进,类似PHP本地FPC。

3.队列

Reids在内存存储引擎领域的一大优点是提供 list 和 set 操作,这使得Redis能作为一个很好的消息队列平台来使用。Redis作为队列使用的操作,就类似于本地程序语言(如Python)对 list 的 push/pop 操作。

如果你快速的在Google中搜索“Redis queues”,你马上就能找到大量的开源项目,这些项目的目的就是利用Redis创建非常好的后端工具,以满足各种队列需求。例如,Celery有一个后台就是使用Redis作为broker,你可以从这里去查看。

4.排行榜/计数器

Redis在内存中对数字进行递增或递减的操作实现的非常好。集合(Set)和有序集合(Sorted Set)也使得我们在执行这些操作的时候变的非常简单,Redis只是正好提供了这两种数据结构。所以,我们要从排序集合中获取到排名最靠前的10个用户–我们称之为“user_scores”,我们只需要像下面一样执行即可:

当然,这是假定你是根据你用户的分数做递增的排序。如果你想返回用户及用户的分数,你需要这样执行:

ZRANGE user_scores 0 10 WITHSCORES  

Agora Games就是一个很好的例子,用Ruby实现的,它的排行榜就是使用Redis来存储数据的,你可以在这里看到。

5.发布/订阅

最后(但肯定不是最不重要的)是Redis的发布/订阅功能。发布/订阅的使用场景确实非常多。推荐阅读:Redis 的 8 大应用场景

 

7、redis的缓存失效策略主键失效机制

作为缓存系统都要定期清理无效数据,就需要一个主键失效和淘汰策略.

在Redis当中,有生存期的key被称为volatile。在创建缓存时,要为给定的key设置生存期,当key过期的时候(生存期为0),它可能会被删除。

1、影响生存时间的一些操作

生存时间可以通过使用 DEL 命令来删除整个 key 来移除,或者被 SET 和 GETSET 命令覆盖原来的数据,也就是说,修改key对应的value和使用另外相同的key和value来覆盖以后,当前数据的生存时间不同。

比如说,对一个 key 执行INCR命令,对一个列表进行LPUSH命令,或者对一个哈希表执行HSET命令,这类操作都不会修改 key 本身的生存时间。另一方面,如果使用RENAME对一个 key 进行改名,那么改名后的 key的生存时间和改名前一样。

RENAME命令的另一种可能是,尝试将一个带生存时间的 key 改名成另一个带生存时间的 another_key ,这时旧的 another_key (以及它的生存时间)会被删除,然后旧的 key 会改名为 another_key ,因此,新的 another_key 的生存时间也和原本的 key 一样。使用PERSIST命令可以在不删除 key 的情况下,移除 key 的生存时间,让 key 重新成为一个persistent key 。

2、如何更新生存时间

可以对一个已经带有生存时间的 key 执行EXPIRE命令,新指定的生存时间会取代旧的生存时间。过期时间的精度已经被控制在1ms之内,主键失效的时间复杂度是O(1),EXPIRE和TTL命令搭配使用,TTL可以查看key的当前生存时间。设置成功返回 1;当 key 不存在或者不能为 key 设置生存时间时,返回 0 。

最大缓存配置,在 redis 中,允许用户设置最大使用内存大小

server.maxmemory默认为0,没有指定最大缓存,如果有新的数据添加,超过最大内存,则会使redis崩溃,所以一定要设置。redis 内存数据集大小上升到一定大小的时候,就会实行数据淘汰策略。

 

redis 提供 6种数据淘汰策略

  • volatile-lru:已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰
  • volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰
  • volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰
  • allkeys-lru:数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰
  • allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰
  • no-enviction(驱逐):禁止驱逐数据

注意这里的6种机制,volatile和allkeys规定了是对已设置过期时间的数据集淘汰数据还是从全部数据集淘汰数据,后面的lru、ttl以及random是三种不同的淘汰策略,再加上一种no-enviction永不回收的策略。

使用策略规则:

  • 如果数据呈现幂律分布,也就是一部分数据访问频率高,一部分数据访问频率低,则使用allkeys-lru
  • 如果数据呈现平等分布,也就是所有的数据访问频率都相同,则使用allkeys-random

三种数据淘汰策略:

ttl和random比较容易理解,实现也会比较简单。主要是Lru最近最少使用淘汰策略,设计上会对key 按失效时间排序,然后取最少使用的key进行淘汰

 

8.为什么redis需要把所有数据放到内存中?

Redis为了达到最快的读写速度将数据都读到内存中,并通过异步的方式将数据写入磁盘。所以redis具有快速和数据持久化的特征。如果不将数据放在内存中,磁盘I/O速度为严重影响redis的性能。在内存越来越便宜的今天,redis将会越来越受欢迎。

如果设置了最大使用的内存,则数据已有记录数达到内存限值后不能继续插入新值。

 

9.Redis是单进程单线程

redis利用队列技术将并发访问变为串行访问消除了传统数据库串行控制的开销

 

10.redis的并发竞争问题如何解决?(redis本身无问题,而是使用redis的客户端可能出现并发竞争)

Redis为单进程单线程模式,采用队列模式将并发访问变为串行访问。Redis本身没有锁的概念,Redis对于多个客户端连接并不存在竞争,但是在Jedis客户端对Redis进行并发访问时会发生连接超时、数据转换错误、阻塞、客户端关闭连接等问题,这些问题均是由于客户端连接混乱造成。对此有2种解决方法:

  1. 客户端角度,为保证每个客户端间正常有序与Redis进行通信,对连接进行池化,同时对客户端读写Redis操作采用内部锁synchronized。
  2. 服务器角度,利用setnx实现锁。

注:对于第一种,需要应用程序自己处理资源的同步,可以使用的方法比较通俗,可以使用synchronized也可以使用lock;第二种需要用到Redis的setnx命令,但是需要注意一些问题。

11、redis常见性能问题和解决方案:

1.Master写内存快照,save命令调度rdbSave函数,会阻塞主线程的工作,当快照比较大时对性能影响是非常大的,会间断性暂停服务,所以Master最好不要写内存快照。

2.Master AOF持久化,如果不重写AOF文件,这个持久化方式对性能的影响是最小的,但是AOF文件会不断增大,AOF文件过大会影响Master重启的恢复速度。Master最好不要做任何持久化工作,包括内存快照和AOF日志文件特别是不要启用内存快照做持久化,如果数据比较关键,某个Slave开启AOF备份数据,策略为每秒同步一次。

3.Master调用BGREWRITEAOF重写AOF文件,AOF在重写的时候会占大量的CPU和内存资源,导致服务load过高,出现短暂服务暂停现象。

4.Redis主从复制的性能问题,为了主从复制的速度和连接的稳定性,Slave和Master最好在同一个局域网内。

12.redis事务的了解CAS(check-and-set 操作实现乐观锁 )?

和众多其它数据库一样,Redis作为NoSQL数据库也同样提供了事务机制。在Redis中,MULTI/EXEC/DISCARD/WATCH这四个命令是我们实现事务的基石。相信对有关系型数据库开发经验的开发者而言这一概念并不陌生,即便如此,我们还是会简要的列出Redis中事务的实现特征:

1). 在事务中的所有命令都将会被串行化的顺序执行,事务执行期间,Redis不会再为其它客户端的请求提供任何服务,从而保证了事物中的所有命令被原子的执行。

2). 和关系型数据库中的事务相比,在Redis事务中如果有某一条命令执行失败,其后的命令仍然会被继续执行。

3). 我们可以通过MULTI命令开启一个事务,有关系型数据库开发经验的人可以将其理解为”BEGIN TRANSACTION”语句。在该语句之后执行的命令都将被视为事务之内的操作,最后我们可以通过执行EXEC/DISCARD命令来提交/回滚该事务内的所有操作。这两个Redis命令可被视为等同于关系型数据库中的COMMIT/ROLLBACK语句。

4). 在事务开启之前,如果客户端与服务器之间出现通讯故障并导致网络断开,其后所有待执行的语句都将不会被服务器执行。然而如果网络中断事件发生在客户端执行EXEC命令之后,那么该事务中的所有命令都会被服务器执行。

5). 当使用Append-Only模式时,Redis会通过调用系统函数write将该事务内的所有写操作在本次调用中全部写入磁盘。然而如果在写入的过程中出现系统崩溃,如电源故障导致的宕机,那么此时也许只有部分数据被写入到磁盘,而另外一部分数据却已经丢失。Redis服务器会在重新启动时执行一系列必要的一致性检测,一旦发现类似问题,就会立即退出并给出相应的错误提示。此时,我们就要充分利用Redis工具包中提供的redis-check-aof工具,该工具可以帮助我们定位到数据不一致的错误,并将已经写入的部分数据进行回滚。修复之后我们就可以再次重新启动Redis服务器

13.WATCH命令基于CAS乐观锁?

在Redis的事务中,WATCH命令可用于提供CAS(check-and-set)功能。假设我们通过WATCH命令在事务执行之前监控了多个Keys,倘若在WATCH之后有任何Key的值发生了变化,EXEC命令执行的事务都将被放弃,同时返回Null multi-bulk应答以通知调用者事务执行失败。例如,我们再次假设Redis中并未提供incr命令来完成键值的原子性递增,如果要实现该功能,我们只能自行编写相应的代码。

其伪码如下:

val = GET mykey  
val = val + 1  
SET mykey $val  


以上代码只有在单连接的情况下才可以保证执行结果是正确的,因为如果在同一时刻有多个客户端在同时执行该段代码,那么就会出现多线程程序中经常出现的一种错误场景–竞态争用(race condition)。

比如,客户端A和B都在同一时刻读取了mykey的原有值,假设该值为10,此后两个客户端又均将该值加一后set回Redis服务器,这样就会导致mykey的结果为11,而不是我们认为的12。为了解决类似的问题,我们需要借助WATCH命令的帮助,见如下代码:

WATCH mykey  
val = GET mykey  
val = val + 1  
MULTI  
SET mykey $val  
EXEC  


和此前代码不同的是,新代码在获取mykey的值之前先通过WATCH命令监控了该键,此后又将set命令包围在事务中,这样就可以有效的保证每个连接在执行EXEC之前,如果当前连接获取的mykey的值被其它连接的客户端修改,那么当前连接的EXEC命令将执行失败。这样调用者在判断返回值后就可以获悉val是否被重新设置成功。

 

14.使用过Redis分布式锁么,它是什么回事?

先拿setnx来争抢锁,抢到之后,再用expire给锁加一个过期时间防止锁忘记了释放

这时候对方会告诉你说你回答得不错,然后接着问如果在setnx之后执行expire之前进程意外crash或者要重启维护了,那会怎么样?

这时候你要给予惊讶的反馈:唉,是喔,这个锁就永远得不到释放了。紧接着你需要抓一抓自己得脑袋,故作思考片刻,好像接下来的结果是你主动思考出来的,然后回答:我记得set指令有非常复杂的参数,这个应该是可以同时把setnx和expire合成一条指令来用的!对方这时会显露笑容,心里开始默念:摁,这小子还不错。

 

15.假如Redis里面有1亿个key,其中有10w个key是以某个固定的已知的前缀开头的,如果将它们全部找出来?

使用keys指令可以扫出指定模式的key列表

对方接着追问:如果这个redis正在给线上的业务提供服务,那使用keys指令会有什么问题?

这个时候你要回答redis关键的一个特性:redis的单线程的。keys指令会导致线程阻塞一段时间,线上服务会停顿,直到指令执行完毕,服务才能恢复。这个时候可以使用scan指令scan指令可以无阻塞的提取出指定模式的key列表,但是会有一定的重复概率在客户端做一次去重就可以了,但是整体所花费的时间会比直接用keys指令长。

 

16.使用过Redis做异步队列么,你是怎么用的?

一般使用list结构作为队列rpush生产消息,lpop消费消息。当lpop没有消息的时候,要适当sleep一会再重试

如果对方追问可不可以不用sleep呢list还有个指令叫blpop,在没有消息的时候,它会阻塞住直到消息到来。

如果对方追问能不能生产一次消费多次呢?使用pub/sub主题订阅者模式,可以实现1:N的消息队列。

如果对方追问pub/sub有什么缺点?在消费者下线的情况下,生产的消息会丢失,得使用专业的消息队列如rabbitmq等。

如果对方追问redis如何实现延时队列?我估计现在你很想把面试官一棒打死如果你手上有一根棒球棍的话,怎么问的这么详细。但是你很克制,然后神态自若的回答道:使用sortedset,拿时间戳作为score,消息内容作为key调用zadd来生产消息,消费者用zrangebyscore指令获取N秒之前的数据轮询进行处理。

到这里,面试官暗地里已经对你竖起了大拇指。但是他不知道的是此刻你却竖起了中指,在椅子背后。

 

17.如果有大量的key需要设置同一时间过期,一般需要注意什么?

如果大量的key过期时间设置的过于集中,到过期的那个时间点,redis可能会出现短暂的卡顿现象。一般需要在时间上加一个随机值,使得过期时间分散一些。(同时也是为了避免缓存雪崩

 

18.Redis如何做持久化的?

bgsave镜像全量持久化aof增量持久化因为bgsave会耗费较长时间,不够实时,在停机的时候会导致大量丢失数据,所以需要aof来配合使用。在redis实例重启时,会使用bgsave持久化文件重新构建内存,再使用aof重放近期的操作指令来实现完整恢复重启之前的状态。

对方追问那如果突然机器掉电会怎样取决于aof日志sync属性的配置,如果不要求性能,在每条写指令时都sync一下磁盘,就不会丢失数据。但是在高性能的要求下每次都sync是不现实的,一般都使用定时sync比如1s1次,这个时候最多就会丢失1s的数据

对方追问bgsave的原理是什么?你给出两个词汇就可以了,fork和cowfork是指redis通过创建子进程来进行bgsave操作cow指的是copy on write,子进程创建后,父子进程共享数据段,父进程继续提供读写服务,写脏的页面数据会逐渐和子进程分离开来。

 

19.Pipeline有什么好处,为什么要用pipeline?

可以将多次IO往返的时间缩减为一次前提是pipeline执行的指令之间没有因果相关性。使用redis-benchmark进行压测的时候可以发现影响redis的QPS峰值的一个重要因素是pipeline批次指令的数目。

 

20.Redis的同步机制了解么?

Redis可以使用主从同步,从从同步。第一次同步时,主节点做一次bgsave,并同时将后续修改操作记录到内存buffer,待完成后将rdb文件全量同步到复制节点,复制节点接受完成后将rdb镜像加载到内存。加载完成后,再通知主节点将期间修改的操作记录同步到复制节点进行重放就完成了同步过程。

 

21.是否使用过Redis集群,集群的原理是什么?

Redis Sentinal着眼于高可用,在master宕机时会自动将slave提升为master,继续提供服务。

Redis Cluster着眼于扩展性,在单个redis内存不足时,使用Cluster进行分片存储。

使用Springboot实现项目的注册和验证码功能,差不多花了三个小时,特此记录!

首先,思路:我们要实现注册功能,无非是用户点击注册按钮,跳转到注册页面,然后填写信息,接着发送验证码,用户填写验证码,成功写入数据库这几步

因此验证码这个东西,具有时效性,比如三分钟有效,五分钟有效,然后它不应该留在数据库里我们去手动删除,因此选用Redis缓存来实现再合适不过,我们可以设置其失效时间redisTemplate.opsForValue().set(redisKey,code,300, TimeUnit.SECONDS),过期之后会自动消失;

那么我们细化一下实现流程:

1.用户点击注册按钮跳转到注册页面

2.填写信息,点击发送验证码

3.发送验证码的同时,将验证码写入Redis缓存并设置过期时间

4.用户填写验证码,后台接收并进行验证

5.如果验证码正确,写入数据库并放行,否则提示重新验证

 

了解了实现流程,我们开始编写代码:

首先实体类,User

/**
* 实体类
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Repository
public class User {
private int id;
private String name;
private String pwd;
}

定义InsertController类
定义InsertController类模拟接收前端传参,然后调用验证码发送模块

/**
* 模拟前端传参,调用验证码发送
*/
@Controller
public class InsertController {

@Autowired
RegsentController regsentController;

@RequestMapping(“/insert”)
@ResponseBody
public String insert(User user,RedisTemplate redisTemplate) {
//接收参数后,发送短信,返回的是该用户验证码在Redis里的key
String userKey = regsentController.sendmessage(user,redisTemplate);
return userKey;
}
}

定义RegsentController类
调用生成随机码的方法(getCode)和发送给前端用户的方法(send)

/**
 *接收前端传来的参数并发送验证码
 * 保存到Redis里
 * 返回该用户存进Redis里验证码的key
 */
@Controller
public class RegsentController {

    @ResponseBody
    public String sendmessage(User user,RedisTemplate redisTemplate){
        //这里的userId在真实业务中通过个人身份的令牌获取
        //生成六位数随机验证码
        String code=getCode();
        //设置redis的key,这里设置为用户Id
        String redisKey=""+user.getId();
        //将这个验证码存入redis中,并设置失效时间为5分钟
        redisTemplate.opsForValue().set(redisKey,code,300, TimeUnit.SECONDS);
        //发送短信
        boolean isSuccess=send(code);
        if (isSuccess){
            System.out.println("成功发送");
            return redisKey;
        }
        return null;
    }
    private boolean send(String code) {
        String msg="验证码为:"+code+",验证码有效期5分钟,请及时验证";
        System.out.println(msg);
        return true;
    }
    //生成六位随机验证码
    private static String getCode() {
        Random random = new Random();
        String result="";
        for (int i=0;i<6;i++)
        {
            result+=random.nextInt(10);
        }
        return result;
    }
}

定义RegVerification类

RegVerfication用于完成用户输入的验证码和Redis里的正确验证码的对比,并调用Mapper写入数据库(这里应该拆分成几个service,由于是测试就不细化了)

/**
 *接收前端传来的验证码
 * 与Redis里保存的value进行对比
 * 相同则写入数据库
 * 否则提示出错
 * @param Key 用户验证码在Redis里保存的键名
 * @param scan_code 是用户输入的验证码
 * @param redisTemplate 处理Redis
 * @param user 需要写入数据库的对象
 */

@Controller
public class RegVerification {

    @Autowired
    UserMapper userMapper;
    @ResponseBody
    public void Verfication(String key, String scan_code, RedisTemplate redisTemplate, User user){
        //进行对比
        String o = (String) redisTemplate.opsForValue().get(key);
        System.out.println("Redis中存储的正确的验证码为:"+o);
        System.out.println("用户输入的验证码为:"+scan_code);
        if(o.equals(scan_code)) {
            System.out.println("验证码正确");
            //调用Mapper写入数据库
            userMapper.insert(user);
            System.out.println("插入数据库成功!恭喜您成功注册!");
        }
        else System.out.println("很抱歉,您的验证码有误,请重试");
    }
}

定义Mapper接口
//@Mapper这个注解表示了这是一个mybatis的mapper类
//@Repository表示其为Dao层(@Component万能)的Bean

/**
 * UserMapper接口
 */
@Mapper
@Repository
public interface UserMapper {

    //注册功能,添加用户
    void insert(User user);
}

定义Mapper接口的实现文件UserMapper.xml
<?xml version="1.0" encoding="UTF8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!--绑定命名空间-->
<mapper namespace="com.yang.Mapper.UserMapper">

    <select id="insert" parameterType="com.yang.Pojo.User">
          insert into redisname (id,name,pwd) values (#{id},#{name},#{pwd})
    </select>

</mapper>

定义测试类ApplicationTests
/**
 * 测试类
 */
@SpringBootTest
class ApplicationTests {

    @Autowired
    InsertController insertController;
    @Autowired
    RegVerification regVerification;
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;
    @Autowired
    User user;
    @Test
    void contextLoads() {
        //自定义user,模拟前端传参
        user.setId(10);
        user.setName("十号玫瑰");
        user.setPwd("9973");
        //发送验证码
        String userKey = insertController.insert(user,redisTemplate);
        //获取键
        System.out.println("用户验证码在Redis中的键为:"+10);
        //模拟前端:用户输入验证码
        String scan_code ="386247";
        //将usrKey传给ResVerification,供其对比userKey对应的正确的code和用户自己输入的scan_code
        regVerification.Verfication("10", scan_code, redisTemplate,user);
    }
}

开始测试:
因为前端传参我们是模拟的,每次执行测试类我们都会发送验证码,因此为了验证我们是否插入成功,要把发送验证码语句注释掉,如图
可以看到,当我们没有将其注释掉的时候,生成验证码386247

注释之后

成功注册!!!

0 36

运行环境和另一篇博客(Zset)的一样,读者需要可自行查找:http://yangbili.co/redis%e6%88%90%e7%bb%a9%e6%8e%92%e8%a1%8c%e6%a6%9c%e5%ae%9e%e7%8e%b0%ef%bc%88set%ef%bc%89/

废话不多说,直接上代码:

 



import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.List;
import java.util.Set;


/**
 * 2021.4.2
 * 学习了Redis的基本数据类型,决定在Springboot环境下进行使用
 *
 * 目的:实现对其他人的查询,添加,删除及共同好友的查看
 * 采用Set数据结构,
 */

@SpringBootTest
public class YangSetTest {


    //注入rediTemplate
    @Autowired
    private RedisTemplate<String,String> redisTemplate;


    //创建用户A
    private final String ANAME = "咩咩咩";

    //创建用户B
    private final String BNAME = "大黑狗";

    /*1.批量添加关注列表,模拟从数据库中获得*/
    @Test
    void batchAdd() {
        Long start = System.currentTimeMillis();
        for(int i = 0; i < 10; i++){
            redisTemplate.opsForSet().add(ANAME,i+"号明星");
        }
        System.out.println("添加成功");
        System.out.println("批量新增关注所消耗的时间:"+(System.currentTimeMillis()-start));
    }

    /*2.关注某人*/
    @Test
    void add(){
        redisTemplate.opsForSet().add(BNAME,"电气鼠");
        System.out.println("关注成功");
    }

    /*3.获取当前的关注总数(基数)*/
    @Test
    void getCardNum(){
        Long size = redisTemplate.opsForSet().size(ANAME);
        System.out.println(ANAME+"的关注总数为:"+size);
    }

    /*4.取消单个关注*/
    @Test
    void remmove(){
        redisTemplate.opsForSet().remove(ANAME,"3号关注");
        System.out.println("取消成功");
    }

    /*5.查找单个关注*/
    @Test
    void selete(){
        Boolean member = redisTemplate.opsForSet().isMember(ANAME, BNAME);
        if(member == true){
            System.out.println("名为"+BNAME+"的人是"+ANAME+"的关注!");
        }
        else {
            System.out.println("对方不是+"+ANAME+"的关注!");
        }
    }

    /** 6.事务,A添加B,B同意
     *  那么A的列表和B的列表都要变化
     *  如果只有一个有变化,那么添加失败
     */
    @Test
    void addMulti() {
        //开启乐观锁(模拟多线程下)
        //这里假设B是明星,多个人关注B
        redisTemplate.watch(BNAME);
        //开启事务
        redisTemplate.multi();
        try {
            //A关注B
            redisTemplate.opsForSet().add(ANAME, BNAME);
            //B的粉丝列表也要变化;
            redisTemplate.opsForSet().add(BNAME, ANAME);
            System.out.println("关注成功!");
        } catch (Exception e) {
            System.out.println("出现错误!关注失败,事务取消");
            //取消事务,事务队列中的所有命令都不会被执行
            redisTemplate.discard();
            //解锁
            redisTemplate.unwatch();
        }
    }

    /*7.看破红尘,取消所有关注*/
//    @Test
//    void removerAll(){
//        重复调用remove()方法
//        redisTemplate.opsForSet().remove(ANAME,"","");
//    }

    /*8.一览天下,获取用户关注列表*/
    @Test
    void lookMem(){
        //调用psForSet().members
        Set<String> members = redisTemplate.opsForSet().members(BNAME);
        //使用JSON.toJSONString(Object)将对象转换为JSON字符串
        System.out.println(JSON.toJSONString(members));
    }

    /*9.宠幸后宫,翻牌子随机获取一个关注对象*/
    @Test
    void randomLookMem(){
        //调用psForSet().members
        String s = redisTemplate.opsForSet().randomMember(ANAME);
        System.out.println(s);
    }

    /*10.多人运动,随机选出关注列表中的元素,获取多个关注对象*/
    @Test
    void randomLookMems(){
        //个数为:3
        List<String> strings = redisTemplate.opsForSet().randomMembers(ANAME,3);
        System.out.println(strings);
    }

    /*11.推荐关注,将用户A关注列表中的对象推送给B*/
    @Test
    void move() {
        //调用opsForSet().move()方法
        redisTemplate.opsForSet().move(ANAME,"8号明星",BNAME);
        System.out.println("推荐关注成功!");
    }

    /**
     * 12.并,差,交及的使用
     *
     * 交集,可做共同关注
     * 实验步骤:
     * 1.先往用户A,B中添加共同关注对象"电气鼠"
     * 2.查看二者的共同关注(也可以获得多个,略)
     */
    @Test
    void comment(){
        Set<String> intersect = redisTemplate.opsForSet().intersect(ANAME, BNAME);
        System.out.println(ANAME+"和"+BNAME+"的共同关注对象为:"+intersect);
        System.out.println(ANAME+"和"+BNAME+"都是LSP...");
    }

    /*并集,统计两个用户的关注对象之合*/
    @Test
    void union(){
        Set<String> union = redisTemplate.opsForSet().union(ANAME, BNAME);
        System.out.println(ANAME+"和"+BNAME+"的关注合列表为:"+union);
    }

    /*差集,可以判断用户之间的关注差异*/
    @Test
    void differ(){
        Set<String> difference = redisTemplate.opsForSet().difference(ANAME, BNAME);
        System.out.println(ANAME+"和"+BNAME+"的关注列表差集为:"+difference);
        System.out.println("根据此功能也可以推送让B用户关注其他");
    }

}