首页 > 基础资料 博客日记
springboot~ApplicationContextAware和Interceptor产生了真感情
2023-08-18 10:34:48基础资料围观254次
本篇文章分享springboot~ApplicationContextAware和Interceptor产生了真感情,对你有帮助的话记得收藏一下,看Java资料网收获更多编程知识
看着题目,有点一头污水吧,事实上,没有经历过,很难去说ApplicationContextAware
在什么时候会用到,直接在一个bean对象里,你可以直接使用构造方法注入或者Autowired属性注入的方式来使用其它的bean对象
,这在springboot里是非常自然的,也是天然支持的;但如果你的这个bean不是由spring ioc自动注入的,而是通过拦截器动态配置的,这时你使用@Autowired时,是无法获取到其它bean对象的;这时你需要使用ApplicationContextAware接口,再定义一个静态的ApplicationContext实例,在你的拦截器执行方法里使用它就可以了。【应该和拦截器里的动态代理有关】
一个kafka的ConsumerInterceptor实例
在这个例子中,我们通过ConsumerInterceptor实现了一个TTL的延时队列,当topic过期时,再通过KafkaTemplate将消息转发到其它队列里
- DelayPublisher.publish发送延时topic的方法
/**
* 发送延时消息
* @param message 消息体
* @param delaySecondTime 多个秒后过期
* @param delayTopic 过期后发送到的话题
*/
public void publish(String message, long delaySecondTime, String delayTopic) {
ProducerRecord producerRecord = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), delayTopic, message,
new RecordHeaders().add(new RecordHeader("ttl", toBytes(delaySecondTime))));
kafkaTemplate.send(producerRecord);
}
- ConsumerInterceptorTTL
/**
* @author lind
* @date 2023/8/18 8:33
* @since 1.0.0
*/
@Component
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String>, ApplicationContextAware {
// 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取bean
private static ApplicationContext applicationContext;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
Headers headers = record.headers();
long ttl = -1;
for (Header header : headers) {
if (header.key().equals("ttl")) {
ttl = toLong(header.value());
}
}
// 消息超时判定
if (ttl > 0 && now - record.timestamp() < ttl * 1000) {
// 可以放在死信队列中
System.out.println("消息超时了,需要发到topic:" + record.key());
KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
kafkaTemplate.send(record.key(), record.value());
}
else { // 没有设置TTL,不需要超时判定
newTpRecords.add(record);
}
}
if (!newRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
// 它的时机是在KafkaListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法中,applicationContext应该定时成static,否则在实例对象中,它的值可能是空
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
- 配置文件中注入拦截器
spring:
kafka:
consumer:
properties:
interceptor.classes: com.example.ConsumerInterceptorTTL
文章来源:https://www.cnblogs.com/lori/p/17639745.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
上一篇:SpringBoot
下一篇:Java面试指导-JavaEE基础知识