【Java教程】SpringBoot 流式输出时,正常输出后为何突然报错?

零 Java教程评论95字数 6084阅读20分16秒阅读模式
  1. 一个 SpringBoot 项目同时使用了 Tomcat 的过滤器和 Spring 的拦截器,一些线程变量在过滤器中初始化并在拦截器中使用。
  2. 该项目需要调用大语言模型进行流式输出。
  3. 项目中,笔者使用 SpringBoot 的 ResponseEntity<StreamingResponseBody> 将流式输出返回前端。

问题出现

问题出现在上述第 3 点:正常输出一段内容后,后台突然报错,而报错内容由拦截器产生

笔者仔细查看了报错日志,发现只是拦截器的问题:执行时由于某些线程变量不存在而报错。但是,这些线程变量已经在过滤器中初始化了。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

那么问题来了:为什么这个接口明明可以正常通过过滤器和拦截器,并开始正常输出,却又突然在拦截器中报错呢?文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

场景重现

Filter文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

java

复制代码
@Slf4j
@Component
@Order(1)
public class MyFilter implements Filter {

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        // 要继续处理请求,必须添加 filterChain.doFilter()
        log.info("doFilter method is running..., thread: {}, dispatcherType: {}", Thread.currentThread(), servletRequest.getDispatcherType()); 
        filterChain.doFilter(servletRequest,servletResponse);
    } 
}

Interceptor文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

java

复制代码
@Slf4j
public class MyInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(@NotNull HttpServletRequest request, @NotNull HttpServletResponse response, @NotNull Object handler) throws Exception {
        log.info("preHandle method is running..., thread: {}, dispatcherType: {}", Thread.currentThread(), request.getDispatcherType());
        if (DispatcherType.ASYNC == request.getDispatcherType()) {
            log.info("preHandle dispatcherType={}", request.getDispatcherType());
        }
        return true;
    }
    
    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        log.info("postHandle method is running..., thread: {}", Thread.currentThread());
    }      
    
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        log.info("afterCompletion method is running..., thread: {}", Thread.currentThread());
    } 
}

WebMvcConfigurer文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

java

复制代码
@Configuration
public class WebAppConfigurer implements WebMvcConfigurer {
    
    @Bean
    public MyInterceptor myInterceptor() {
        return new MyInterceptor();
    }
    
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(myInterceptor()).addPathPatterns("/**");
    }
    
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(120_000L);
        configurer.registerCallableInterceptors();
        configurer.registerDeferredResultInterceptors();
    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("web-async-");
        executor.initialize();
        configurer.setTaskExecutor(executor);
    }
}

Controller文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

java

复制代码
@Slf4j
@RestController
@RequestMapping("/test-stream")
public class TestStreamController {

    @ApiOperation("流式输出示例")
    @PostMapping(value = "/example", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<StreamingResponseBody> example() {
        log.info("Stream method is running, thread: {}", Thread.currentThread());
        return  ResponseEntity.status(HttpStatus.OK)
            .contentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8))
            .body(outputStream -> {
                log.info("Internal stream method is running, thread: {}", Thread.currentThread());
                try (outputStream) {
                    String msg = "To be or not to be!";
                    outputStream.write(msg.getBytes(StandardCharsets.UTF_8));
                    outputStream.flush();
                }
            });
    }
}

根据以下运行日志,我们可以看到拦截器的 preHandle 确实执行了两次,并且此次调用过程共有 3 个线程(io-14000-exec-1web-async-1io-14000-exec-2)参与了工作。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

yaml

复制代码
2024-05-06 07:35:27.362  INFO 209108 --- [io-14000-exec-1] o.a.c.c.C.[.[localhost].[/java-study]    : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-05-06 07:35:27.362  INFO 209108 --- [io-14000-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2024-05-06 07:35:27.365  INFO 209108 --- [io-14000-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 3 ms
2024-05-06 07:35:27.402  INFO 209108 --- [io-14000-exec-1] com.peng.java.study.web.config.MyFilter  : doFilter method is running..., thread: Thread[http-nio-14000-exec-1,5,main], dispatcherType: REQUEST
2024-05-06 07:35:28.107  INFO 209108 --- [io-14000-exec-1] c.p.java.study.web.config.MyInterceptor  : preHandle method is running..., thread: Thread[http-nio-14000-exec-1,5,main], dispatcherType: REQUEST
2024-05-06 07:35:28.121  INFO 209108 --- [io-14000-exec-1] c.p.j.s.w.r.test.TestStreamController    : Stream method is running, thread: Thread[http-nio-14000-exec-1,5,main]
2024-05-06 07:35:28.152  INFO 209108 --- [    web-async-1] c.p.j.s.w.r.test.TestStreamController    : Internal stream method is running, thread: Thread[web-async-1,5,main]
2024-05-06 07:35:28.167  INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor  : preHandle method is running..., thread: Thread[http-nio-14000-exec-2,5,main], dispatcherType: ASYNC
2024-05-06 07:35:28.167  INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor  : preHandle dispatcherType=ASYNC
2024-05-06 07:35:28.174  INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor  : postHandle method is running..., thread: Thread[http-nio-14000-exec-2,5,main]
2024-05-06 07:35:28.183  INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor  : afterCompletion method is running..., thread: Thread[http-nio-14000-exec-2,5,main]

问题分析

1. 方法调用流程的差异文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

众所周知,SpringBoot 的普通输出接口调用流程图如图 1 所示。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

0516-1.png文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/15456.html

(图1-SpringBoot 普通输出调用流程图)

结合日志,我们可以简单画出流式输出接口对应的流程图(图 2)。

0516-2.png

(图2-SpringBoot 流式输出调用流程图)

2. 线程的差异

普通接口的执行时序图如图 3 所示。

0516-3.png

(图3-普通接口的时序图)

而流式接口的时序图如图 4 所示。

0516-4.png

(图4-流式接口的调用时序图)

解决问题

通过分析,对流式输出的情况提出两种解决方案:

  1. 将过滤器中的部分业务逻辑迁移到拦截器中。
  2. 根据条件,跳过第二次的拦截器 preHandle 方法。

笔者选择了第二个方案,实现代码如下。

java

复制代码
@Slf4j
public class MyInterceptor implements HandlerInterceptor {
    
    @Override
    public boolean preHandle(@NotNull HttpServletRequest request, @NotNull HttpServletResponse response, @NotNull Object handler) throws Exception {
        log.info("preHandle method is running..., thread: {}, dispatcherType: {}", Thread.currentThread(), request.getDispatcherType());
        // 如果是异步请求,则跳过
        if (DispatcherType.ASYNC == request.getDispatcherType()) {
            log.info("preHandle dispatcherType={}", request.getDispatcherType());
            return true;
        }
        return true;
    }
    
    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        log.info("postHandle method is running..., thread: {}", Thread.currentThread());     
    }
    
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        log.info("afterCompletion method is running..., thread: {}", Thread.currentThread());
    } 
}

需要注意,请求线程和回调线程都需考虑清理线程变量,不然会导致内存泄漏。

零
  • 转载请务必保留本文链接:https://www.0s52.com/bcjc/javajc/15456.html
    本社区资源仅供用于学习和交流,请勿用于商业用途
    未经允许不得进行转载/复制/分享

发表评论