React项目docker部署,nginx配置

  1. 项目下面新建Dockerfile文件,内容如下
1
2
3
4
5
6
7
8
9
FROM node:18
WORKDIR /app
COPY package*.json ./
RUN npm install
RUN npm install -g serve
COPY . .
RUN npm run build
EXPOSE 3000
CMD ["serve", "-s", "build", "-l", "3000"]
  1. 编译镜像
1
2
拉取指定架构的基础镜像编译
docker build --platform linux/amd64 -t pt20011203/echo-admin:amd64 .
  1. 上传到远程仓库
1
docker push pt20011203/echo-admin:amd64
  1. 服务器拉取,并运行
1
2
docker pull pt20011203/echo-admin:amd64
docker run -d -p 3000:3000 --name echo-admin pt20011203/echo-admin:amd64
  1. nginx配置,修改配置文件(这里有个小坑,要配置home-page与/echo-admin/一致,不然静态资源无法渲染)
1
2
3
location /echo-admin/ {
proxy_pass http://127.0.0.1:3000/;
}

效果:https://www.techkid.top/echo-admin/login

什么是JSON Web令牌?
JSON Web令牌(JWT)是一个开放标准( RFC 7519 ),它定义了一种紧凑且具有独立的方式,可将各方之间的信息安全地传输为JSON对象。可以验证和信任此信息,因为它是数字签名的。可以使用RSA或ECDSA使用秘密(带有HMAC算法)或公共/私钥对签名JWT。
尽管可以对JWT进行加密以在各方之间提供保密性,但我们将专注于签名的令牌。签名的令牌可以验证其中包含的索赔的完整性,而加密的令牌则将这些索赔隐藏在其他方中。当使用公共/私钥对签名令牌时,签名还证明只有持有私钥的一方才是签名的一方。

您什么时候应该使用JSON Web令牌?
授权:这是使用JWT的最常见情况。登录用户后,每个后续请求将包括JWT,允许用户访问该令牌允许的路由,服务和资源。单个标志是当今广泛使用JWT的功能,因为它的开销很小,并且能够轻松地在不同域中使用。
信息交换:JSON Web令牌是在各方之间安全传输信息的好方法。因为可以签署JWT(例如,使用公共/私钥对),您可以确保发件人是他们说的。此外,由于使用标头和有效载荷计算签名,您还可以验证内容尚未篡改。

什么是JSON Web令牌结构?
JSON Web令牌以紧凑的形式由三个部分组成,这些部分由点( . )隔开,这是:
标题,有效载荷,签名
因此,JWT通常看起来如下
xxxxx.yyyyy.zzzzz
让我们分解不同的部分。

  • 标题:标头通常由两个部分组成:令牌的类型,即JWT和使用的签名算法,例如HMAC SHA256或RSA。
    例如:
1
2
3
4
{
"alg": "HS256",
"typ": "JWT"
}

然后,该JSON是基本64url编码以形成JWT的第一部分。

  • 有效载荷
    有效载荷:其中包含索赔。索赔是关于实体(通常是用户)和其他数据的语句。索赔有三种类型:注册,公共和私人索赔。
    注册索赔:这些是一组预定义的索赔,不是强制性的,但建议提供一组有用的可互操作索赔。其中一些是: ISS (发行人), EXP (到期时间),子(主题), AUD (受众)等。
    请注意,索赔名称只有三个字符,因为JWT应该是紧凑的。
    公开主张:使用JWT的人可以随意定义这些要求。但是为了避免碰撞,应在IANA JSON Web令牌注册表中定义它们,或将其定义为包含抗碰撞命名空间的URI。
    私人索赔:这些是为在使用它们达成共识的当事方之间共享信息的自定义主张,既不是注册或公开索赔。
    一个示例有效载荷可能是:
1
2
3
4
5
{
"sub": "1234567890",
"name": "John Doe",
"admin": true
}

然后对有效负载进行编码,以形成JSON Web令牌的第二部分。
请注意,对于签名令牌,尽管受到保护,但任何人都可以读取此信息。除非加密,否则请勿将秘密信息放在JWT的有效载荷或标头元素中。

  • 签名:要创建签名部分,您必须采用编码的标头,编码的有效载荷,一个秘密,标题中指定的算法,然后签名。
    例如,如果要使用HMAC SHA256算法,则将以以下方式创建签名:
1
2
3
4
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret)

该签名用于验证该消息在此过程中没有更改,并且,对于用私钥签名的令牌,它还可以验证JWT的发件人是否是它说的。将所有人放在一起
输出是三个基本的64-url字符串,这些字符串被点隔开,可以在HTML和HTTP环境中轻松传递,而与基于XML的标准(例如SAML)相比,它更加紧凑。
以下显示了一个JWT,该JWT已编码了先前的标头和有效载荷,并且签名为秘密
截屏2025-02-09 12.07.50.png

JSON网络令牌如何工作?
在身份验证中,当用户成功使用其凭据登录时,将返回JSON Web令牌。由于令牌是凭证,因此必须格外小心以防止安全问题。通常,您不应将令牌保留的时间超过要求。由于缺乏安全性,您也不应将敏感的会话数据存储在浏览器存储中。
每当用户想要访问受保护的路由或资源时,用户代理都应使用载体模式在授权标题中发送JWT。标头的内容看起来如下:
Authorization: Bearer 在某些情况下,这可能是无状态授权机制。服务器的受保护路线将在Authorization标头中检查有效的JWT,如果存在,则将允许用户访问受保护的资源。如果JWT包含必要的数据,则需要减少数据库以查询数据库以减少某些操作,尽管情况并非总是如此。
请注意,如果您通过HTTP标头发送JWT令牌,则应尝试防止它们变得太大。一些服务器在标题中的接受不超过8 kb。如果您试图将过多的信息嵌入JWT令牌中,例如包括所有用户的权限,您可能需要一个替代解决方案,例如Auth0 Fine Graining授权。如果令牌是在Authorization标题中发送的,则交叉元素资源共享(CORS)将不会是问题,因为它不使用cookie。以下图显示了如何获得JWT并用于访问API或资
截屏2025-02-09 12.09.58.png
请注意,在签名令牌中,即使他们无法更改它,代币中包含的所有信息都暴露于用户或其他方。这意味着您不应将秘密信息放在令牌中

需求解决方案,令牌过期怎么办(自动续期)?

  1. 单token模式
    将 token 过期时间设置为15分钟,前端发起请求,后端验证 token 是否过期;如果过期,前端发起刷新token请求,后端为前端返回一个新的token;前端用新的token发起请求,请求成功;如果要实现每隔72小时,必须重新登录,后端需要记录每次用户的登录时间;用户每次请求时,检查用户最后一次登录日期,如超过72小时,则拒绝刷新token的请求,请求失败,跳转到登录页面。
    另外后端还可以记录刷新token的次数,比如最多刷新50次,如果达到50次,则不再允许刷新,需要用户重新授权。
    上面介绍的单token方案原理比较简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static Integer verifyToken(String token) {
byte[] bytes = generateKey().getBytes();
Key secretKey = Keys.hmacShaKeyFor(bytes);

try {
Jws<Claims> claimsJws=Jwts.parser()
.setSigningKey(secretKey)
.build()
.parseClaimsJws(token);
// Token 验证通过,可以从 claimsJws 对象中获取相关信息
Claims claims = claimsJws.getBody();
String id=claims.get("id").toString();
return Integer.valueOf(id);
}catch (Exception e){
// 捕获异常,判断是否是因为过期
if (e instanceof io.jsonwebtoken.ExpiredJwtException) {
// 获取过期的 Claims
Claims claims = ((io.jsonwebtoken.ExpiredJwtException) e).getClaims();
// 判断是否在允许的刷新时间内
long now = System.currentTimeMillis();
long exp = claims.getExpiration().getTime();
long diff = now - exp;
// 如果过期时间不超过一定分钟数(例如 30 分钟)
if (diff <= 30 * 60 * 1000) {
// 重新生成新的令牌
Map<String, Object> newClaims = new HashMap<>();
newClaims.put("id", claims.get("id"));
newClaims.put("exp", new Date(now + 60 * 60 * 1000)); // 设置新的过期时间为 1 小时
String newToken = Jwts.builder()
.setClaims(newClaims)
.signWith(secretKey)
.compact();
System.out.println("New token generated: " + newToken);
return Integer.valueOf(claims.get("id").toString());
}
}
return -1;
}
}
  1. 双token模式
    登录成功以后,后端返回 access_token 和 refresh_token,客户端缓存此两种token;
    使用 access_token 请求接口资源,成功则调用成功;如果token超时,客户端携带 refresh_token 调用token刷新接口获取新的 access_token;
    后端接受刷新token的请求后,检查 refresh_token 是否过期。如果过期,拒绝刷新,客户端收到该状态后,跳转到登录页;如果未过期,生成新的 access_token 返回给客户端。
    客户端携带新的 access_token 重新调用上面的资源接口。
    客户端退出登录或修改密码后,注销旧的token,使 access_token 和 refresh_token 失效,同时清空客户端的 access_token 和 refresh_toke

问题:双token模式是不是可以看作Oauth2模式呢?
回顾一下Oauth2,它是用户先同意后,先申请授权码code,然后用code去申请token,然后用户接收到返回的token(这个token持久化时间只需要很短,请求认证通过就可以删除掉)到服务端检验,如果通过,则OAuth认证通过,很显然两者有一定区别

如何优化垃圾回收机制?

在 Java 开发中,开发人员是无需过度关注对象的回收与释放的,JVM 的垃圾回收机制可以减轻不少工作量。但完全交由 JVM 回收对象,也会增加回收性能的不确定性。在一些特殊的业务场景下,不合适的垃圾回收算法以及策略,都有可能导致系统性能下降。面对不同的业务场景,垃圾回收的调优策略也不一样。例如,在对内存要求苛刻的情况下,需要提高对象的回收效率;在 CPU 使用率高的情况下,需要降低高并发时垃圾回收的频率。可以说,垃圾回收的调优是一项必备技能
垃圾回收机制掌握 GC 算法之前,我们需要先弄清楚 3 个问题。第一,回收发生在哪里?第二,对象在什么时候可以被回收?第三,如何回收这些对象?

问题一:回收发生在哪里?
JVM 的内存区域中,程序计数器、虚拟机栈和本地方法栈这 3 个区域是线程私有的,随着线程的创建而创建,销毁而销毁;栈中的栈帧随着方法的进入和退出进行入栈和出栈操作,每个栈帧中分配多少内存基本是在类结构确定下来的时候就已知的,因此这三个区域的内存分配和回收都具有确定性。那么垃圾回收的重点就是关注堆和方法区中的内存了,堆中的回收主要是对象的回收,方法区的回收主要是废弃常量和无用的类的回收

问题二:对象在什么时候可以被回收?
那 JVM 又是怎样判断一个对象是可以被回收的呢?一般一个对象不再被引用,就代表该对象可以被回收。目前有以下两种算法可以判断该对象是否可以被回收。引用计数算法:这种算法是通过一个对象的引用计数器来判断该对象是否被引用了。每当对象被引用,引用计数器就会加 1;每当引用失效,计数器就会减 1。当对象的引用计数器的值为 0 时,就说明该对象不再被引用,可以被回收了。这里强调一点,虽然引用计数算法的实现简单,判断效率也很高,但它存在着对象之间相互循环引用的问题,可达性分析算法:GC Roots 是该算法的基础,GC Roots 是所有对象的根对象,在 JVM 加载时,会创建一些普通对象引用正常对象。这些对象作为正常对象的起始点,在垃圾回收时,会从这些 GC Roots 开始向下搜索,当一个对象到 GC Roots 没有任何引用链相连时,就证明此对象是不可用的。目前 HotSpot 虚拟机采用的就是这种算法。以上两种算法都是通过引用来判断对象是否可以被回收。在 JDK 1.2 之后,Java 对引用的概念进行了扩充,将引用分为了以下四种
截屏2025-02-07 17.38.49.png

问题三:如何回收这些对象?
了解完 Java 程序中对象的回收条件,那么垃圾回收线程又是如何回收这些对象的呢?JVM 垃圾回收遵循以下两个特性。

  • 自动性:Java 提供了一个系统级的线程来跟踪每一块分配出去的内存空间,当 JVM 处于空闲循环时,垃圾收集器线程会自动检查每一块分配出去的内存空间,然后自动回收每一块空闲的内存块。
  • 不可预期性:一旦一个对象没有被引用了,该对象是否立刻被回收呢?答案是不可预期的。我们很难确定一个没有被引用的对象是不是会被立刻回收掉,因为有可能当程序结束后,这个对象仍在内存中。垃圾回收线程在 JVM 中是自动执行的,Java 程序无法强制执行。我们唯一能做的就是通过调用 System.gc 方法来”建议”执行垃圾收集器,但是否可执行,什么时候执行?仍然不可预期”

GC 算法
截屏2025-02-07 17.42.10.png
DK1.7 update14 之后 Hotspot 虚拟机所有的回收器整理如下(以下为服务端垃圾收集器):
截屏2025-02-07 17.43.06.png
其实在 JVM 规范中并没有明确 GC 的运作方式,各个厂商可以采用不同的方式实现垃圾收集器。我们可以通过 JVM 工具查询当前 JVM 使用的垃圾收集器类型,首先通过 ps 命令查询出进程 ID,再通过 jmap -heap ID 查询出 JVM 的配置信息,其中就包括垃圾收集器的设置类型
截屏2025-02-07 17.44.17.png

GC 性能衡量指标

  • 吞吐量:这里的吞吐量是指应用程序所花费的时间和系统总运行时间的比值。我们可以按照这个公式来计算 GC 的吞吐量:系统总运行时间 = 应用程序耗时 +GC 耗时。如果系统运行了 100 分钟,GC 耗时 1 分钟,则系统吞吐量为 99%。GC 的吞吐量一般不能低于 95%。
  • 停顿时间:指垃圾收集器正在运行时,应用程序的暂停时间。对于串行回收器而言,停顿时间可能会比较长;而使用并发回收器,由于垃圾收集器和应用程序交替运行,程序的停顿时间就会变短,但其效率很可能不如独占垃圾收集器,系统的吞吐量也很可能会降低。
  • 垃圾回收频率:多久发生一次指垃圾回收呢?通常垃圾回收的频率越低越好,增大堆内存空间可以有效降低垃圾回收发生的频率,但同时也意味着堆积的回收对象越多,最终也会增加回收时的停顿时间。所以我们只要适当地增大堆内存空间

查看 & 分析 GC 日志
已知了性能衡量指标,现在我们需要通过工具查询 GC 相关日志,统计各项指标的信息。首先,我们需要通过 JVM 参数预先设置 GC 日志,通常有以下几种 JVM 参数设置

1
2
3
-XX:+PrintGC:打印GC的基本信息。
-XX:+PrintGCDetails:打印更详细的GC信息,包括年轻代和老年代的使用情况。
-Xloggc:<file>:将GC日志输出到指定文件

GCeasy是一款非常直观的 GC 日志分析工具,我们可以将日志文件压缩之后,上传到 GCeasy 官网即可看到非常清楚的 GC 日志分析结果
截屏2025-02-07 17.48.35.png

GC 调优策略

  1. 降低 Minor GC 频率通常情况下,由于新生代空间较小,Eden 区很快被填满,就会导致频繁 Minor GC,因此我们可以通过增大新生代空间来降低 Minor GC 的频率。可能你会有这样的疑问,扩容 Eden 区虽然可以减少 Minor GC 的次数,但不会增加单次 Minor GC 的时间吗?如果单次 Minor GC 的时间增加,那也很难达到我们期待的优化效果呀
  2. 降低 Full GC 的频率通常情况下,由于堆内存空间不足或老年代对象太多,会触发 Full GC,频繁的 Full GC 会带来上下文切换,增加系统的性能开销。我们可以使用哪些方法来降低 Full GC 的频率呢?减少创建大对象:在平常的业务场景中,我们习惯一次性从数据库中查询出一个大对象用于 web 端显示。
  • 减少创建大对象:在平常的业务场景中,我们习惯一次性从数据库中查询出一个大对象用于 web 端显示。例如,我之前碰到过一个一次性查询出 60 个字段的业务操作,这种大对象如果超过年轻代最大对象阈值,会被直接创建在老年代;即使被创建在了年轻代,由于年轻代的内存空间有限,通过 Minor GC 之后也会进入到老年代。这种大对象很容易产生较多的 Full GC。我们可以将这种大对象拆解出来,首次只查询一些比较重要的字段,如果还需要其它字段辅助查看,再通过第二次查询显示剩余的字段。
  • 增大堆内存空间:在堆内存不足的情况下,增大堆内存空间,且设置初始化堆内存为最大堆内存,也可以降低 Full GC 的频率。
  • 选择合适的 GC 回收器:假设我们有这样一个需求,要求每次操作的响应时间必须在 500ms 以内,这个时候我们一般会选择响应速度较快的 GC 回收器,CMS(Concurrent Mark Sweep)回收器和 G1 回收器都是不错的选择,而当我们的需求对系统吞吐量有要求时,就可以选择 Parallel Scavenge 回收器来提高系统的吞吐量

内存快照(RDB):宕机后,Redis如何实现快速恢复

什么是内存快照RDB?
用 AOF 方法进行故障恢复的时候,需要逐一把操作日志都执行一遍。如果操作日志非常多,Redis 就会恢复得很缓慢,影响到正常使用。这当然不是理想的结果。那么,还有没有既可以保证可靠性,还能在宕机时实现快速恢复的其他方法呢?当然有了,这就是我们今天要一起学习的另一种持久化方法:内存快照。所谓内存快照,就是指内存中的数据在某一个时刻的状态记录。这就类似于照片,当你给朋友拍照时,一张照片就能把朋友一瞬间的形象完全记下来。对 Redis 来说,它实现类似照片记录效果的方式,就是把某一时刻的状态以文件的形式写到磁盘上,也就是快照。这样一来,即使宕机,快照文件也不会丢失,数据的可靠性也就得到了保证。这个快照文件就称为 RDB 文件,其中,RDB 就是 Redis DataBase 的缩写

和 AOF 相比,RDB 记录的是某一时刻的数据,并不是操作,所以,在做数据恢复时,我们可以直接把 RDB 文件读入内存,很快地完成恢复。听起来好像很不错,但内存快照也并不是最优选项。为什么这么说呢?我们还要考虑两个关键问题:对哪些数据做快照?这关系到快照的执行效率问题;做快照时,数据还能被增删改吗?这关系到 Redis 是否被阻塞,能否同时正常处理请求。

给哪些内存数据做快照?
Redis 的数据都在内存中,为了提供所有数据的可靠性保证,它执行的是全量快照,也就是说,把内存中的所有数据都记录到磁盘中,这就类似于给 100 个人拍合影,把每一个人都拍进照片里,给内存的全量数据做快照,把它们全部写入磁盘也会花费很多时间。而且,全量数据越多,RDB 文件就越大,往磁盘上写数据的时间开销就越大

会阻塞主线程?
对于 Redis 而言,它的单线程模型就决定了,我们要尽量避免所有会阻塞主线程的操作,所以,针对任何操作,我们都会提一个灵魂之问:“它会阻塞主线程吗?”RDB 文件的生成是否会阻塞主线程,这就关系到是否会降低 Redis 的性能。Redis 提供了两个命令来生成 RDB 文件,分别是 save 和 bgsave。save:在主线程中执行,会导致阻塞;bgsave:创建一个子进程,专门用于写入 RDB 文件,避免了主线程的阻塞,这也是 Redis RDB 文件生成的默认配置。好了,这个时候,我们就可以通过 bgsave 命令来执行全量快照,这既提供了数据的可靠性保证,也避免了对 Redis 的性能影响

快照时数据能修改吗?
在给别人拍照时,一旦对方动了,那么这张照片就拍糊了,我们就需要重拍,所以我们当然希望对方保持不动。对于内存快照而言,我们也不希望数据动,举个例子。我们在时刻 t 给内存做快照,假设内存数据量是 4GB,磁盘的写入带宽是 0.2GB/s,简单来说,至少需要 20s(4/0.2 = 20)才能做完。如果在时刻 t+5s 时,一个还没有被写入磁盘的内存数据 A,被修改成了 A’,那么就会破坏快照的完整性,因为 A’不是时刻 t 时的状态。因此,和拍照类似,我们在做快照时也不希望数据“动”,也就是不能被修改。但是,如果快照执行期间数据不能被修改,是会有潜在问题的。对于刚刚的例子来说,在做快照的 20s 时间里,如果这 4GB 的数据都不能被修改,Redis 就不能处理对这些数据的写操作,那无疑就会给业务服务造成巨大的影响。你可能会想到,可以用 bgsave 避免阻塞啊。避免阻塞和正常处理写操作并不是一回事。此时,主线程的确没有阻塞,可以正常接收请求,但是,为了保证快照完整性,它只能处理读操作,因为不能修改正在执行快照的数据
为了快照而暂停写操作,肯定是不能接受的。所以这个时候,Redis 就会借助操作系统提供的写时复制技术(Copy-On-Write, COW),在执行快照的同时,正常处理写操作,简单来说,bgsave 子进程是由主线程 fork 生成的,可以共享主线程的所有内存数据。bgsave 子进程运行后,开始读取主线“的内存数据,并把它们写入 RDB 文件。此时,如果主线程对这些数据也都是读操作(例如图中的键值对 A),那么,主线程和 bgsave 子进程相互不影响。但是,如果主线程要修改一块数据(例如图中的键值对 C),那么,这块数据就会被复制一份,生成该数据的副本。然后,bgsave 子进程会把这个副本数据写入 RDB 文件,而在这个过程中,主线程仍然可以直接修改原来的数据
截屏2025-02-07 11.34.43.png

这既保证了快照的完整性,也允许主线程同时对数据进行修改,避免了对正常业务的影响,现在,我们再来看另一个问题:多久做一次快照?我们在拍照的时候,还有项技术叫“连拍”,可以记录人或物连续多个瞬间的状态。那么,快照也适合“连拍”吗?

可以每秒做一次快照吗?
对于快照来说,所谓“连拍”就是指连续地做快照。这样一来,快照的间隔时间变得很短,即使某一时刻发生宕机了,因为上一时刻快照刚执行,丢失的数据也不会太多。但是,这其中的快照间隔时间就很关键了,如果频繁地执行全量快照,也会带来两方面的开销,“一方面,频繁将全量数据写入磁盘,会给磁盘带来很大压力,多个快照竞争有限的磁盘带宽,前一个快照还没有做完,后一个又开始做了,容易造成恶性循环。另一方面,bgsave 子进程需要通过 fork 操作从主线程创建出来。虽然,子进程在创建后不会再阻塞主线程,但是,fork 这个创建过程本身会阻塞主线程,而且主线程的内存越大,阻塞时间越长。如果频繁 fork 出 bgsave 子进程,这就会频繁阻塞主线程了。那么,有什么其他好方法吗?此时,我们可以做增量快照,所谓增量快照,就是指,做了一次全量快照后,后续的快照只对修改的数据进行快照记录,这样可以避免每次全量快照的开销。在第一次做完全量快照后,T1 和 T2 时刻如果再做快照,我们只需要将被修改的数据写入快照
截屏2025-02-07 11.40.37.png

混合使用 AOF 日志和内存快照
Redis 4.0 中提出了一个混合使用 AOF 日志和内存快照的方法。简单来说,内存快照以一定的频率执行,在两次快照之间,使用 AOF 日志记录这期间的所有命令操作。这样一来,快照不用很频繁地执行,这就避免了频繁 fork 对主线程的影响。而且,AOF 日志也只用记录两次快照间的操作,也就是说,不需要记录所有操作了,因此,就不会出现文件过大的情况了,也可以避免重写开销。如下图所示,T1 和 T2 时刻的修改,用 AOF 日志记录,等到第二次做全量快照时,就可以清空 AOF 日志,因为此时的修改都已经记录到快照中了,恢复时就不再用日志了
截屏2025-02-07 11.41.41.png
内存快照和AOF混合使用这个方法既能享受到 RDB 文件快速恢复的好处,又能享受到 AOF 只记录操作命令的简单优势,颇有点“鱼和熊掌可以兼得”的感觉,建议你在实践中用起来

AOF日志:宕机了,Redis如何避免数据丢失?

如果有人问你:“你会把 Redis 用在什么业务场景下?
我想你大概率会说:“我会把它当作缓存使用,因为它把后端数据库中的数据存储在内存中,然后直接从内存中读取数据,响应速度会非常快。”没错,这确实是 Redis 的一个普遍使用场景,但是,这里也有一个绝对不能忽略的问题:一旦服务器宕机,内存中的数据将全部丢失。我们很容易想到的一个解决方案是,从后端数据库恢复这些数据,但这种方式存在两个问题:一是,需要频繁访问数据库,会给数据库带来巨大的压力;二是,这些数据是从慢速数据库中读取出来的,性能肯定比不上从 Redis 中读取,导致使用这些数据的应用程序响应变慢。所以,对 Redis 来说,实现数据的持久化,避免从后端数据库中进行恢复,是至关重要的。目前,Redis 的持久化主要有两大机制,即 AOF 和 RDB 快照。

AOF 日志是如何实现的?
说到日志,我们比较熟悉的是数据库的写前日志(Write Ahead Log, WAL),也就是说,在实际写数据前,先把修改的数据记到日志文件中,以便故障时进行恢复。不过,AOF 日志正好相反,它是写后日志,“写后”的意思是 Redis 是先执行命令,把数据写入内存,然后才记录日志,如下图所示:Redis AOF操作过程那 AOF 为什么要先执行命令再记日志呢?要回答这个问题,我们要先知道 AOF 里记录了什么内容。传统数据库的日志,例如 redo log(重做日志),记录的是修改后的数据,而 AOF 里记录的是 Redis 收到的每一条命令,这些命令是以文本形式保存的。我们以 Redis 收到“set testkey testvalue”命令后记录的日志为例,看看 AOF 日志的内容。其中,“*3”表示当前命令有三个部分“每部分都是由“$+数字”开头,后面紧跟着具体的命令、键或值。这里,“数字”表示这部分中的命令、键或值一共有多少字节。例如,“$3 set”表示这部分有 3 个字节,也就是“set”命令”

为什么先执行命令在记录日志?(或者说这样做的好处)

  1. 如果先记日志再执行命令的话,日志中就有可能记录了错误的命令,Redis 在使用日志恢复数据时,就可能会出错。而写后日志这种方式,就是先让系统执行命令,只有命令能执行成功,才会被记录到日志中,否则,系统就会直接向客户端报错。所以,Redis 使用写后日志这一方式的一大好处是,可以避免出现记录错误命令的情况
  2. 它是在命令执行后才记录日志,所以不会阻塞当前的写操作

这样做的潜在风险

  1. 如果刚执行完一个命令,还没有来得及记日志就宕机了,那么这个命令和相应的数据就有丢失的风险。如果此时 Redis 是用作缓存,还可以从后端数据库重新读入数据进行恢复,但是,如果 Redis 是直接用作数据库的话,此时,因为命令没有记入日志,所以就无法用日志进行恢复了
  2. AOF 虽然避免了对当前命令的阻塞,但可能会给下一个操作带来阻塞风险。这是因为,AOF 日志也是在主线程中执行的,如果在把日志文件写入磁盘时,磁盘写压力大,就会导致写盘很慢,进而导致后续的操作也无法执行了

仔细分析的话,你就会发现,这两个风险都是和 AOF 写回磁盘的时机相关的。这也就意味着,如果我们能够控制一个写命令执行完后 AOF 日志写回磁盘的时机,这两个风险就解除了

解决方案-三种写回策略
AOF 机制给我们提供了三个选择,也就是 AOF 配置项 appendfsync 的三个可选值。

  • Always:同步写回:每个写命令执行完,立马同步地将日志写回磁盘;
  • Everysec:每秒写回:每个写命令执行完,只是先把日志写到 AOF 文件的内存缓冲区,每隔一秒把缓冲区中的内容写入磁盘;
  • No:操作系统控制的写回:每个写命令执行完,只是先把日志写到 AOF 文件的内存缓冲区,由操作系统决定何时将缓冲区内容写回磁盘

针对避免主线程阻塞和减少数据丢失问题,这三种写回策略都无法做到两全其美。我们来分析下其中的原因。

  • “同步写回”可以做到基本不丢数据,但是它在每一个写命令后都有一个慢速的落盘操作,不可避免地会影响主线程性能;
  • 虽然“操作系统控制的写回”在写完缓冲区后,就可以继续执行后续的命令,但是落盘的时机已经不在 Redis 手中了,只要 AOF 记录没有写回磁盘,一旦宕机对应的数据就丢失了;
  • “每秒写回”采用一秒写回一次的频率,避免了“同步写回”的性能开销,虽然减少了对系统性能的影响,但是如果发生宕机,上一秒内未落盘的命令操作仍然会丢失。所以,这只能算是,在避免影响主线程性能和避免数据丢失两者间取了个折中

截屏2025-02-07 10.53.01.png

但是,按照系统的性能需求选定了写回策略,并不是“高枕无忧”了。毕竟,AOF 是以文件的形式在记录接收到的所有写命令。随着接收的写命令越来越多,AOF 文件会越来越大。这也就意味着,我们一定要小心 AOF 文件过大带来的性能问题。这里的“性能问题”,主要在于以下三个方面:一是,文件系统本身对文件大小有限制,无法保存过大的文件;二是,如果文件太大,之后再往里面追加命令记录的话,效率也会变低;三是,如果发生宕机,AOF 中记录的命令要一个个被重新执行,用于故障恢复,如果日志文件太大,整个恢复过程就会非常缓慢,这就会影响到 Redis 的正常使用。所以,我们就要采取一定的控制手段,这个时候,AOF 重写机制就登场了

AOF 重写机制
简单来说,AOF 重写机制就是在重写时,Redis 根据数据库的现状创建一个新的 AOF 文件,也就是说,读取数据库中的所有键值对,然后对每一个键值对用一条命令记录它的写入。比如说,当读取了键值对“testkey”: “testvalue”之后,重写机制会记录 set testkey testvalue 这条命令。这样,当需要恢复时,可以重新执行该命令,实现“testkey”: “testvalue”的写入
为什么重写机制可以把日志文件变小呢? 实际上,重写机制具有“多变一”功能。所谓的“多变一”,也就是说,旧日志文件中的多条命令,在重写后的新日志中变成了一条命令,AOF 文件是以追加的方式,逐一记录接收到的写命令的。当一个键值对被多条写命令反复修改时,AOF 文件会记录相应的多条命令。但是,在重写的时候,是根据这个键值对当前的最新状态,为它生成对应的写入命令。这样一来,一个键值对在重写日志中只用一条命令就行了,而且,在日志恢复时,只用执行这条命令,就可以直接完成这个键值对的写入了,不过,虽然 AOF 重写后,日志文件会缩小,但是,要把整个数据库的最新数据的操作日志都写回磁盘,仍然是一个非常耗时的过程。这时,我们就要继续关注另一个问题了:重写会不会阻塞主线程?

AOF 重写会阻塞吗?
AOF 日志由主线程写回不同,重写过程是由后台线程 bgrewriteaof 来完成的,这也是为了避免阻塞主线程,导致数据库性能下降。我把重写的过程总结为“一个拷贝,两处日志“一个拷贝”就是指,每次执行重写时,主线程 fork 出后台的 bgrewriteaof 子进程。此时,fork 会把主线程的内存拷贝一份给 bgrewriteaof 子进程,这里面就包含了数据库的最新数据。然后,bgrewriteaof 子进程就可以在不影响主线程的情况下,逐一把拷贝的数据写成操作,记入重写日志。“两处日志”又是什么呢?因为主线程未阻塞,仍然可以处理新来的操作。此时,如果有写操作,第一处日志就是指正在使用的 AOF 日志,Redis 会把这个操作写到它的缓冲区。这样一来,即使宕机了,这个 AOF 日志的操作仍然是齐全的,可以用于恢复。而第二处日志,就是指新的 AOF 重写日志。这个操作也会被写到重写日志的缓冲区。这样,重写日志也不会丢失最新的操作。等到拷贝数据的所有操作,重写日志记录的这些最新操作也会写入新的 AOF 文件,以保证数据库最新状态的记录。此时,我们就可以用新的 AOF 文件替代旧文件了”

kafka生产经验

  1. 生产经验—–>生产者提高吞吐量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Properties properties = new Properties();
// 2. 给kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// batch.size:批次大小,默认16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、
lz4和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 3. 创建 kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","atguigu " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
}
  1. 生产经验—–>数据可靠性
  • 发送流程

  • ACK应答级别


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1. 创建 kafka生产者的配置对象
Properties properties = new Properties();
// 2. 给kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 设置acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数retries,默认是int最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 3. 创建 kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","atguigu " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
}
  1. 生产经验—–>数据去重
    - 数据传输语义
    至少一次(At Least Once)= ACK级别设置为-1 +分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
    最多一次(At Most Once)= ACK级别设置为0
    总结:
    At Least Once可以保证数据不丢失,但是不能保证数据不重复;
    At Most Once可以保证数据不重复,但是不能保证数据不丢失。
    精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
    Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
    - 幂等性

    - 生产者事务
  2. 生产经验—–>数据有序

kafka基本操作

1. 安装Kafka ,可以使用本地脚本和下载的文件或 docker 镜像以 KRaft 模式运行
下载文件安装不多说,以 KRaft 模式运行如下

1
2
docker pull apache/kafka:3.9.0
docker run -p 9092:9092 apache/kafka:3.9.0

2. 启动zookper
运行以下命令以便以正确的顺序启动所有服务:

1
2
# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

1
2
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

所有服务成功启动后,您将拥有一个正在运行并可供使用的基本 Kafka 环境。
但我习惯直接进入docker容器进行kafka操作
截屏2025-01-21 11.24.28.png
config是kafka配置文件,bin目录下是操作命令
截屏2025-01-21 11.26.49.png
包含一些命令,消费者命令,生产者命令,主题操作命令,zookeeper命令等
3.常用命令
文档链接 https://kafka.apache.org/documentation/#operations
4.java操作

  1. 生产者异步发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CustomizeProducer {
public static void main(String[] args) {
// 1. 创建 kafka生产者的配置对象
Properties properties = new Properties();
// 2. 给kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send方法,发送消息
for (int i = 200; i < 210; i++) {
kafkaProducer.send(new
ProducerRecord<>("my_topic",2,"","分区二上传数据:----atguigu_pt " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
}
  1. 生产者同步发送,调用get
1
2
// 同步发送
kafkaProducer.send(new ProducerRecord<>("three", "同步发送----kafka" + i)).get();
  1. 事务
1
2
3
4
5
6
7
8
9
10
11
12
13
Kafka 的事务一共有如下 5 个API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
  1. 消费者
  • 同步提交offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 1. 创建 kafka消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
//3. 创建kafka消费者
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("__consumer_offsets"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交offset
consumer.commitSync();
}
}
}
  • 异步提交offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CustomConsumerByHandAsync {
public static void main(String[] args) {
// 1. 创建 kafka消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
//3. 创建Kafka消费者
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("my_topic"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {
System.out.println(consumerRecord.value());
}
// 异步提交offset
consumer.commitAsync();
}
}
}
  • 指定消费时间消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class CustomConsumerForTime {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("my_topic");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
Map<TopicPartition, Long> timestampToSearch = new
HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition,
System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从1天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets =
kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp =
offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition,
offsetAndTimestamp.offset());
}
}// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
  • 指定offset消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  public class CustomConsumerSeek {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);
// 2 订阅一个主题 可订阅多个主题
// 指定要消费的主题和分区
<!-- String topic = "my_topic";
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition(topic, 0)); // 消费分区 0
partitions.add(new TopicPartition(topic, 1)); // 消费分区 1
// 手动分配分区
kafkaConsumer.assign(partitions);
-->
ArrayList<String> topics = new ArrayList<>();
topics.add("my_topic");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset从1700 的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 1700);
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

json转化问题

在开发过程中,我们时常遇到更改json的格式,返回特定的类型,比如日期类型等,之前写到过一片文章处理这类问题;
这几天就遇到一个问题,之前项目中遇到了大量枚举,平常枚举一般是这种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public enum Channel {
app(1),
H5(1);
Channel(int code) {
this.code = code;
}
private final int code;

public int getCode() {
return code;
}
// 通过整数值获取枚举常量
public static Channel getValue(int value) {
for (Channel channel : Channel.values()) {
if (channel.getCode() == value) {
return channel;
}
}
throw new IllegalArgumentException("Invalid channel value: " + value);
}
}

在springmvc中,前端我们经常接收code 1或2,返回的则是app或h5,但是我们现在后端也需要返回的是数字1和2,这怎么办,我们就只能定义json的序列化和发序列化,类似与下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@JsonComponent
public class Config {

public static class Serializer extends JsonSerializer<Channel> {


@Override
public void serialize(Channel channel, JsonGenerator json, SerializerProvider serializerProvider) throws IOException {
json.writeNumber(channel.getCode());
}
}

public static class Deserializer extends JsonDeserializer<Channel> {
@Override
public Channel deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JacksonException {
int i = jsonParser.getIntValue();
return Channel.getValue(i);
}
}

public static class DateSerializer extends JsonSerializer<Date> {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yy-MM-dd 00:00:00");

@Override
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
String formattedDate = dateFormat.format(date);
jsonGenerator.writeString(formattedDate);
}
}
public static class DateDeserializer extends JsonDeserializer<Date> {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yy-MM-dd 00:00:00");

@Override
public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JacksonException {
String dateStr = jsonParser.getText().trim();
try {
return dateFormat.parse(dateStr);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
}

注意,不同springboot版本有不同处理方案,建议阅读官方文档;

Kafka如何实现高吞吐量

1.顺序读写

kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能
顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写
生产者负责写入数据,Kafka会将消息持久化到磁盘,保证不会丢失数据,Kafka采用了俩个技术提高写入的速度。
1、 顺序写入:在大学的计算机组成(划重点)里我们学过,硬盘是机械结构,需要指针寻址找到存储数据的位置,所以,如果是随机IO,磁盘会进行频繁的寻址,导致写入速度下降Kafka使用了顺序IO提高了磁盘的写入速度,Kafka会将数据顺序插入到文件末尾,消费者端通过控制偏移量来读取消息,这样做会导致数据无法删除,时间一长,磁盘空间会满,kafka提供了2种策略来删除数据:基于时间删除和基于partition文件的大小删除;
2、 MemoryMappedFiles:这个和JavaNIO中的内存映射基本相同,在大学的计算机原理里我们学过(划重点),mmf直接利用操作系统的Page来实现文件到物理内存的映射,完成之后对物理内存的操作会直接同步到硬盘mmf通过内存映射的方式大大提高了IO速率,省去了用户空间到内核空间的复制它的缺点显而易见–不可靠,当发生宕机而数据未同步到硬盘时,数据会丢失,Kafka提供了produce.type参数来控制是否主动的进行刷新,如果kafka写入到mmp后立即flush再返回给生产者则为同步模式,反之为异步模式;

2.零拷贝

在这之前先来了解一下零拷贝(直接让操作系统的 Cache 中的数据发送到网卡后传输给下游的消费者):平时从服务器读取静态文件时,服务器先将文件从复制到内核空间,再复制到用户空间,最后再复制到内核空间并通过网卡发送出去,而零拷贝则是直接从内核到内核再到网卡,省去了用户空间的复制。
Kafka把所有的消息存放到一个文件中,当消费者需要数据的时候直接将文件发送给消费者,比如10W的消息共10M,全部发送给消费者,10M的消息在内网中传输是非常快的,假如需要1s,那么kafka的tps就是10w。Zero copy对应的是Linux中sendfile函数,这个函数会接受一个offsize来确定从哪里开始读取。现实中,不可能将整个文件全部发给消费者,他通过消费者传递过来的偏移量来使用零拷贝读取指定内容的数据返回给消费者。
在Linux kernel2.2 之后出现了一种叫做”零拷贝(zero-copy)”系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”,系统上下文切换减少为2次,可以提升一倍的性能。

3.分区

kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力

4.批量发送

kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka
1、 等消息条数到固定条数;
2、 一段时间发送一次;

5.数据压缩

Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。
压缩的好处就是减少传输的数据量,减轻对网络传输的压力。
Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得
批量发送和数据压缩一起使用,单条做数据压缩的话,效果不明显
Kafka的设计目标是高吞吐量,它比其它消息系统快的原因体现在以下几方面:
1、 Kafka操作的是序列文件I/O(序列文件的特征是按顺序写,按顺序读),为保证顺序,Kafka强制点对点的按顺序传递消息,这意味着,一个consumer在消息流(或分区)中只有一个位置;
2、 Kafka不保存消息的状态,即消息是否被“消费”一般的消息系统需要保存消息的状态,并且还需要以随机访问的形式更新消息的状态而Kafka的做法是保存Consumer在Topic分区中的位置offset,在offset之前的消息是已被“消费”的,在offset之后则为未“消费”的,并且offset是可以任意移动的,这样就消除了大部分的随机IO;
3、 Kafka支持点对点的批量消息传递;
4、 Kafka的消息存储在OSpagecache(页缓存,pagecache的大小为一页,通常为4K,在Linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问)

springboot自动装配原理

具体实现代码

1
this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);

具体执行如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void prepareContext(DefaultBootstrapContext bootstrapContext, ConfigurableApplicationContext context, ConfigurableEnvironment environment, SpringApplicationRunListeners listeners, ApplicationArguments applicationArguments, Banner printedBanner) {
context.setEnvironment(environment);
this.postProcessApplicationContext(context);
this.addAotGeneratedInitializerIfNecessary(this.initializers);
this.applyInitializers(context);
listeners.contextPrepared(context);
bootstrapContext.close(context);
if (this.logStartupInfo) {
this.logStartupInfo(context.getParent() == null);
this.logStartupProfileInfo(context);
}

ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
beanFactory.registerSingleton("springApplicationArguments", applicationArguments);
if (printedBanner != null) {
beanFactory.registerSingleton("springBootBanner", printedBanner);
}

if (beanFactory instanceof AbstractAutowireCapableBeanFactory autowireCapableBeanFactory) {
autowireCapableBeanFactory.setAllowCircularReferences(this.allowCircularReferences);
if (beanFactory instanceof DefaultListableBeanFactory listableBeanFactory) {
listableBeanFactory.setAllowBeanDefinitionOverriding(this.allowBeanDefinitionOverriding);
}
}

if (this.lazyInitialization) {
context.addBeanFactoryPostProcessor(new LazyInitializationBeanFactoryPostProcessor());
}

if (this.keepAlive) {
context.addApplicationListener(new KeepAlive());
}

context.addBeanFactoryPostProcessor(new PropertySourceOrderingBeanFactoryPostProcessor(context));
if (!AotDetector.useGeneratedArtifacts()) {
Set<Object> sources = this.getAllSources();
Assert.notEmpty(sources, "Sources must not be empty");
this.load(context, sources.toArray(new Object[0]));
}

listeners.contextLoaded(context);
}

自动配置的执行过程
this.applyInitializers(context);:
这个方法会遍历所有注册的ApplicationContextInitializer,并调用它们的initialize()方法.
ApplicationContextInitializer接口的实现类可以对应用程序上下文进行进一步的配置和准备,包括加载自动配置类、注册Bean定义等.
在Spring Boot中,自动配置类通常通过@AutoConfigureAfter、@AutoConfigureBefore等注解来指定加载顺序,确保自动配置的正确性和一致性.
其他相关步骤
listeners.contextPrepared(context);:
发布ContextPreparedEvent事件,通知其他监听器上下文已经准备就绪。这个事件可以触发一些基于上下文准备好的操作,例如加载配置文件、初始化资源等.
this.load(context, sources.toArray(new Object[0]));:
加载应用程序的配置类和组件扫描路径,注册Bean定义到Bean工厂中。这个步骤会根据配置类上的注解(如@ComponentScan、@Import等)来扫描和注册Bean.

0%