R
R
Rocket-API
Search…
通过redis来实现集群环境下mapping刷新(2.4.0移除)
2.3.7.RELEASE新增功能

业务描述

在分布式的场景中,应用多实例部署是一个常态。 Rocket-API通过动态注册springboot RequestMapping来实现API的动态管理和注册,那么在多实例的情况下,如何实现在一个实例中编辑某个接口后把变更同步到其他实例,是我们需要迫切解决的问题,(虽然我们的保底方案是重启所有实例,那也丧失了动态管理API的能力),Rocket-API提供了接口变更的回调接口,当发生接口的新增,修改,删除时,将会触发`IApiInfoCache.refreshNotify`方法,开发者可以实现此接口来实现集群内的广播,下面的例子使用了Redis的消息订阅方式,也可以使用其他方法,开发者自行扩展

1. 重写`com.github.alenfive.rocketapi.extend.IApiInfoCache`抽象类

1
import com.fasterxml.jackson.core.JsonProcessingException;
2
import com.fasterxml.jackson.databind.ObjectMapper;
3
import com.github.alenfive.rocketapi.config.QLRequestMappingFactory;
4
import com.github.alenfive.rocketapi.entity.ApiInfo;
5
import com.github.alenfive.rocketapi.entity.vo.RefreshMapping;
6
import com.github.alenfive.rocketapi.extend.IApiInfoCache;
7
import com.github.alenfive.rocketapi.utils.GenerateId;
8
import org.springframework.beans.factory.annotation.Autowired;
9
import org.springframework.beans.factory.annotation.Value;
10
import org.springframework.context.annotation.Lazy;
11
import org.springframework.data.redis.connection.Message;
12
import org.springframework.data.redis.connection.MessageListener;
13
import org.springframework.data.redis.core.ConvertingCursor;
14
import org.springframework.data.redis.core.Cursor;
15
import org.springframework.data.redis.core.ScanOptions;
16
import org.springframework.data.redis.core.StringRedisTemplate;
17
import org.springframework.data.redis.serializer.RedisSerializer;
18
import org.springframework.stereotype.Component;
19
20
import java.io.IOException;
21
import java.util.ArrayList;
22
import java.util.Collection;
23
import java.util.List;
24
import java.util.stream.Collectors;
25
26
@Component
27
public class CustomApiInfoCache implements IApiInfoCache ,MessageListener{
28
29
@Value("${spring.application.name}")
30
private String service;
31
32
@Autowired
33
@Lazy
34
private QLRequestMappingFactory mappingFactory;
35
36
private String instanceId = GenerateId.get().toHexString();
37
38
@Autowired
39
private StringRedisTemplate redisTemplate;
40
41
@Autowired
42
private ObjectMapper objectMapper;
43
44
private String buildPrefix(){
45
return "rocket-api:"+service;
46
}
47
48
public String buildChannelName(){
49
return "rocket-api:"+service+":channel";
50
}
51
52
private String buildApiInfoKey(ApiInfo apiInfo) {
53
return buildPrefix()+":"+apiInfo.getMethod() +"-"+ apiInfo.getFullPath();
54
}
55
56
@Override
57
public ApiInfo get(ApiInfo apiInfo) {
58
String strValue = redisTemplate.opsForValue().get(buildApiInfoKey(apiInfo));
59
try {
60
return objectMapper.readValue(strValue,ApiInfo.class);
61
} catch (JsonProcessingException e) {
62
e.printStackTrace();
63
}
64
return null;
65
}
66
67
@Override
68
public void put(ApiInfo apiInfo) {
69
try {
70
String strValue = objectMapper.writeValueAsString(apiInfo);
71
redisTemplate.opsForValue().set(buildApiInfoKey(apiInfo),strValue);
72
} catch (JsonProcessingException e) {
73
e.printStackTrace();
74
}
75
}
76
77
@Override
78
public void remove(ApiInfo apiInfo) {
79
redisTemplate.delete(buildApiInfoKey(apiInfo));
80
}
81
82
@Override
83
public void removeAll() {
84
redisTemplate.delete(getKeys());
85
}
86
87
private List<String> getKeys(){
88
String patternKey = buildPrefix()+":*";
89
ScanOptions options = ScanOptions.scanOptions()
90
.count(10000)
91
.match(patternKey).build();
92
RedisSerializer<String> redisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
93
Cursor cursor = (Cursor) redisTemplate.executeWithStickyConnection(redisConnection -> new ConvertingCursor<>(redisConnection.scan(options), redisSerializer::deserialize));
94
List<String> keys = new ArrayList<>();
95
while(cursor.hasNext()){
96
keys.add(cursor.next().toString());
97
}
98
try {
99
cursor.close();
100
} catch (IOException e) {
101
e.printStackTrace();
102
}
103
return keys;
104
}
105
106
@Override
107
public Collection<ApiInfo> getAll() {
108
return redisTemplate.opsForValue().multiGet(getKeys()).stream().map(item->{
109
try {
110
return objectMapper.readValue(item,ApiInfo.class);
111
} catch (JsonProcessingException e) {
112
e.printStackTrace();
113
}
114
return null;
115
}).collect(Collectors.toList());
116
}
117
118
@Override
119
public void refreshNotify(RefreshMapping refreshMapping) {
120
NotifyEntity notifyEntity = new NotifyEntity();
121
notifyEntity.setRefreshMapping(refreshMapping);
122
notifyEntity.setIdentity(this.instanceId);
123
String messageStr = null;
124
try {
125
messageStr = objectMapper.writeValueAsString(notifyEntity);
126
} catch (JsonProcessingException e) {
127
e.printStackTrace();
128
}
129
130
redisTemplate.convertAndSend(buildChannelName(), messageStr);
131
}
132
133
@Override
134
public void receiveNotify(String instanceId, RefreshMapping refreshMapping) {
135
//避免本实例重复初始化
136
if (this.instanceId.equals(instanceId)){
137
return;
138
}
139
140
//刷新单个接口
141
if (refreshMapping != null){
142
try {
143
mappingFactory.refreshMapping(refreshMapping);
144
}catch (Exception e){
145
e.printStackTrace();
146
}
147
return;
148
}
149
150
//全局刷新
151
try {
152
mappingFactory.buildInit();
153
}catch (Exception e){
154
e.printStackTrace();
155
}
156
}
157
158
@Override
159
public void onMessage(Message message, byte[] bytes) {
160
String messageStr = new String(message.getBody());
161
NotifyEntity notifyEntity = null;
162
try {
163
notifyEntity = objectMapper.readValue(messageStr, NotifyEntity.class);
164
} catch (JsonProcessingException e) {
165
e.printStackTrace();
166
}
167
this.receiveNotify(notifyEntity.getIdentity(),notifyEntity.getRefreshMapping());
168
}
169
170
}
Copied!

2. 监听Redis消息

1
import org.springframework.beans.factory.annotation.Autowired;
2
import org.springframework.context.annotation.Bean;
3
import org.springframework.context.annotation.Configuration;
4
import org.springframework.data.redis.connection.RedisConnectionFactory;
5
import org.springframework.data.redis.listener.PatternTopic;
6
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
7
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
8
9
@Configuration
10
public class RedisListenerConfig {
11
12
@Autowired
13
private CustomApiInfoCache apiInfoCache;
14
15
@Bean
16
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
17
MessageListenerAdapter listenerAdapter) {
18
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
19
container.setConnectionFactory(connectionFactory);
20
String channelName = apiInfoCache.buildChannelName();
21
container.addMessageListener(listenerAdapter, new PatternTopic(channelName));
22
return container;
23
}
24
25
@Bean
26
public MessageListenerAdapter listenerAdapter(CustomApiInfoCache receiver) {
27
return new MessageListenerAdapter(receiver, "onMessage");
28
}
29
}
Copied!

3. 自定义消息通过实体

1
import com.github.alenfive.rocketapi.entity.vo.RefreshMapping;
2
import lombok.AllArgsConstructor;
3
import lombok.Builder;
4
import lombok.Data;
5
import lombok.NoArgsConstructor;
6
7
@Data
8
@Builder
9
@AllArgsConstructor
10
@NoArgsConstructor
11
public class NotifyEntity {
12
private RefreshMapping refreshMapping;
13
private String identity;
14
}
15
Copied!

4. 结果如下

  1. 1.
    对某一接口进行新增,修改,删除时,会触发集群环境下的mapping更新
  2. 2.
    在执行全局"Rebuild API List" 或 "Remote Release" 时会触发集群环境下所有资源的重新加载
Last modified 8mo ago
Copy link
Contents