Tutorial de Spring Reactor

    Visión general

    En este artículo, nos presentaremos a Reactor de resorte proyecto y su importancia. La idea es aprovechar las Especificación de corrientes reactivas para crear aplicaciones reactivas sin bloqueo en la JVM.

    Con este conocimiento, crearemos una aplicación reactiva simple y la compararemos con una aplicación de bloqueo tradicional.

    Las aplicaciones reactivas son la «novedad de moda», lo que hace que muchas aplicaciones cambien a este modelo. Puedes leer más sobre esto en El Manifiesto Reactivo.

    Motivación

    Las API convencionales están bloqueando

    Las aplicaciones modernas se ocupan de una gran cantidad de usuarios y datos simultáneos. Ley de moore ya no se sostiene como solía hacerlo. Las capacidades del hardware, aunque están aumentando, no están a la altura de las aplicaciones modernas donde el rendimiento es muy importante.

    Los desarrolladores de Java escriben código de bloqueo de forma predeterminada. Así es como se configuró la API. Otro ejemplo sería el servlet tradicional (Gato) Acercarse. Cada solicitud garantiza un nuevo hilo que espera a que termine todo el proceso en segundo plano para enviar la respuesta.

    Esto significa que nuestra lógica de capa de datos está bloqueando la aplicación de forma predeterminada, ya que los subprocesos esperan inactivos una respuesta. Es un desperdicio no reutilizar estos subprocesos para un propósito diferente, mientras esperamos la respuesta.

    Crédito: http://projectreactor.io/learn

    Nota: Esto puede ser un problema si tenemos recursos limitados o si un proceso tarda demasiado en ejecutarse.

    Bloques inmóviles asíncronos

    En Java, puede escribir código de forma asincrónica usando Devoluciones de llamada y Futuros. Luego puede obtener y unir hilos en algún momento posterior y procesar el resultado. Java 8 nos presentó una nueva clase: CompletableFuturo, lo que hace que sea mucho más fácil coordinar estas cosas.

    Funciona de una manera simple: cuando un solo proceso termina, comienza otro. Una vez finalizado el segundo, los resultados se combinan en un tercer proceso.

    Esto hace que sea mucho más fácil coordinar su aplicación, pero en última instancia sigue bloqueando, ya que crea Threads y espera al llamar a un .join() método.

    Crédito: http://projectreactor.io/learn

    Te puede interesar:Cómo hacer excepciones personalizadas en Java

    Programación reactiva

    Lo que queremos es asincrónico y sin bloqueo. Un grupo de desarrolladores de empresas como Netflix, Pivotal, RedHat, etc. se reunieron y convergieron en algo llamado La especificación de corrientes reactivas.

    Project Reactor es la implementación de Spring de The Reactive Specification y está específicamente favorecido por la Spring Webflux módulo, aunque puedes usarlo con otros módulos como RxJava.

    La idea es operar de forma asincrónica con Backpressure utilizando editores y suscriptores.

    ¡Aquí, nos presentan varios conceptos nuevos! Expliquémoslos uno por uno:

    • Editor – Un editor es un proveedor de un número de elementos potencialmente ilimitado.
    • Abonado – Un suscriptor escucha a ese publicador, solicitando nuevos datos. A veces, también se lo conoce como consumidor.
    • Contrapresión – La capacidad del suscriptor de permitirle al editor cuántas solicitudes puede manejar en ese momento. Entonces, es el suscriptor el responsable del flujo de datos, no el editor, ya que solo proporciona los datos.

    El Proyecto Reactor ofrece 2 tipos de editoriales. Estos se consideran los principales componentes básicos de Spring Webflux:

    • Flujo – es una editorial que produce 0 a N valores. Podría ser ilimitado. Las operaciones que devuelven varios elementos utilizan este tipo.
    • Mononucleosis infecciosa – es una editorial que produce 0 a 1 valor. Las operaciones que devuelven un solo elemento utilizan este tipo.

    Desarrollo de aplicaciones reactivas

    Con todo lo anterior en mente, ¡saltemos a la creación de una aplicación web simple y aprovechemos este nuevo paradigma reactivo!

    La forma más sencilla de comenzar con un proyecto esqueleto de Spring Boot, como siempre, es usando Spring Initializr. Seleccione su versión preferida de Spring Boot y agregue la dependencia «Web reactiva». Después de esto, generelo como un proyecto de Maven y ¡listo!

    Definamos un POJO simple – Greeting:

    public class Greeting {
        private String msg;
        // Constructors, getters and setters
    }
    

    Definición de un editor

    Junto a él, definamos un controlador REST simple con un mapeo adecuado:

    @RestController
    public class GreetReactiveController {
        @GetMapping("/greetings")
        public Publisher<Greeting> greetingPublisher() {
            Flux<Greeting> greetingFlux = Flux.<Greeting>generate(sink -> sink.next(new Greeting("Hello"))).take(50);
            return greetingFlux;
        }
    }
    

    Vocación Flux.generate () creará un flujo interminable de Greeting objeto.

    los tomar() El método, como sugiere el nombre, solo tomará los primeros 50 valores de la secuencia.

    Es importante tener en cuenta que el tipo de retorno del método es el tipo asincrónico Publisher<Greeting>.

    Te puede interesar:Manejo de excepciones en Spring

    Para probar este punto final, navegue en su navegador hasta http: // localhost: 8080 / saludos o usa el rizo cliente en su línea de comando – curl localhost:8080/greetings

    Se le pedirá una respuesta similar a la siguiente:

    Esto no parece tan importante y simplemente podríamos haber devuelto un List<Greeting> para lograr el mismo resultado visual.

    Pero nuevamente, observe que estamos devolviendo un Flux<Greeting>, que es un tipo asincrónico ya que cambia todo.

    Supongamos que tuviéramos un editor que devolvió más de mil registros, o incluso más. Piense en lo que tiene que hacer el marco. Se le da un objeto de tipo Greeting, que tiene que convertir a JSON para el usuario final.

    Si hubiéramos utilizado el enfoque tradicional con Spring MVC, estos objetos seguirían acumulándose en su RAM y una vez que recopila todo lo devolvería al cliente. Esto podría exceder nuestra capacidad de RAM y también bloquea cualquier otra operación para que no se procese mientras tanto.

    Cuando usamos Spring Webflux, toda la dinámica interna cambia. El marco comienza a suscribirse a estos registros del editor, serializa cada elemento y lo envía de vuelta al cliente en trozos.

    Hacemos las cosas de forma asincrónica sin crear demasiados hilos y reutilizando los hilos que están esperando algo. La mejor parte es que no tienes que hacer nada extra por esto. En Spring MVC tradicional, podríamos lograr lo mismo volviendo AsyncResult, DefferedResult, etc. para obtener algo de asincronía, pero internamente Spring MVC tuvo que crear un nuevo Thread, que se bloquea ya que tiene que esperar.

    Eventos enviados por el servidor

    Otro editor que se ha utilizado desde su llegada es Eventos enviados por el servidor.

    Estos eventos permiten que una página web obtenga actualizaciones de un servidor en tiempo real.

    Definamos un servidor reactivo simple:

    @GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Publisher<Greeting> sseGreetings() {
        Flux<Greeting> delayElements = Flux
                .<Greeting>generate(sink -> sink.next(new Greeting("Hello @" + Instant.now().toString())))
                .delayElements(Duration.ofSeconds(1));
        return delayElements;
    }
    

    Alternativamente, podríamos haber definido esto:

    Te puede interesar:Proyecto Lombok: Reducción del código repetitivo de Java
    @GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Greeting> events() {
        Flux<Greeting> greetingFlux = Flux.fromStream(Stream.generate(() -> new Greeting("Hello @" + Instant.now().toString())));
        Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
        return Flux.zip(greetingFlux, durationFlux).map(Tuple2::getT1);
    }
    

    Estos métodos producen una TEXT_EVENT_STREAM_VALUE lo que esencialmente significa que los datos se envían en forma de eventos enviados por el servidor.

    Tenga en cuenta que en el primer ejemplo, estamos usando un Publisher y en el segundo ejemplo usamos un Flux. Una pregunta válida sería:

    «¿Qué tipo de retorno debo usar entonces?»

    Se aconseja usar Flux y Mono encima Publisher. Ambas clases son implementaciones del Publisher interfaz que se origina en Reactive Streams. Si bien puede usarlos indistintamente, es más expresivo y descriptivo usar las implementaciones.

    Estos dos ejemplos destacan dos formas de crear eventos enviados por el servidor con retraso:

    • .delayElements()– Este método retrasa cada elemento del Flux por la duración dada
    • .zip() – Estamos definiendo un Flux para generar eventos y un Flux para generar valores cada segundo. Al comprimirlos juntos, obtenemos un flujo que genera eventos cada segundo.

    Navegar a http: // localhost: 8080 / saludos / sse o usa un rizo cliente en su línea de comando y verá una respuesta que se parece a:

    Definición de consumidor

    Ahora veamos el lado del consumidor. Vale la pena señalar que no es necesario tener un editor reactivo para usar la programación reactiva en el lado consumidor:

    public class Person {
        private int id;
        private String name;
        // Constructor with getters and setters
    }
    

    Y luego tenemos un tradicional RestController con un solo mapeo:

    @RestController
    public class PersonController {
        private static List<Person> personList = new ArrayList<>();
        static {
            personList.add(new Person(1, "John"));
            personList.add(new Person(2, "Jane"));
            personList.add(new Person(3, "Max"));
            personList.add(new Person(4, "Alex"));
            personList.add(new Person(5, "Aloy"));
            personList.add(new Person(6, "Sarah"));
        }
    
        @GetMapping("/person/{id}")
        public Person getPerson(@PathVariable int id, @RequestParam(defaultValue = "2") int delay)
                throws InterruptedException {
            Thread.sleep(delay * 1000);
            return personList.stream().filter((person) -> person.getId() == id).findFirst().get();
        }
    }
    

    Inicializamos una lista de tipo Person y basado en el id pasado a nuestro mapeo, filtramos a esa persona usando un flujo.

    Es posible que se alarme por el uso de Thread.sleep() aquí, aunque solo se usa para simular un retraso de red de 2 segundos.

    Si está interesado en leer más sobre Java Streams, ¡lo tenemos cubierto!

    Sigamos adelante y creemos nuestro consumidor. Al igual que el editor, podemos hacer esto fácilmente usando Spring Initializr:

    Te puede interesar:Validación de contraseña personalizada de Spring

    Nuestra aplicación de productor se está ejecutando en el puerto 8080. Ahora digamos que queremos llamar al /person/{id} punto final 5 veces. Sabemos que, de forma predeterminada, cada respuesta tiene un retraso de 2 segundos debido al «retraso de la red».

    Primero hagamos esto usando el tradicional RestTemplate Acercarse:

    public class CallPersonUsingRestTemplate {
    
        private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingRestTemplate.class);
        private static RestTemplate restTemplate = new RestTemplate();
    
        static {
            String baseUrl = "http://localhost:8080";
            restTemplate.setUriTemplateHandler(new DefaultUriBuilderFactory(baseUrl));
        }
    
        public static void main(String[] args) {
            Instant start = Instant.now();
    
            for (int i = 1; i <= 5; i++) {
                restTemplate.getForObject("/person/{id}", Person.class, i);
            }
    
            logTime(start);
        }
    
        private static void logTime(Instant start) {
            logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
        }
    }
    

    Vamos a ejecutarlo:

    Como se esperaba, tomó un poco más de 10 segundos y así es como Spring MVC funciona de forma predeterminada.

    En la actualidad, esperar un poco más de 10 segundos para obtener un resultado en una página es inaceptable. Ésta es la diferencia entre conservar un cliente / cliente y perderlo por esperar demasiado.

    Spring Reactor introdujo un nuevo cliente web para realizar solicitudes web llamado WebClient. En comparación con RestTemplate, este cliente tiene una sensación más funcional y es completamente reactivo. Está incluido en el spring-boot-starter-weblux dependencia y está construido para reemplazar RestTemplate de una manera no bloqueante.

    Reescribamos el mismo controlador, esta vez, usando WebClient:

    public class CallPersonUsingWebClient_Step1 {
    
        private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingWebClient_Step1.class);
        private static String baseUrl = "http://localhost:8080";
        private static WebClient client = WebClient.create(baseUrl);
    
        public static void main(String[] args) {
    
            Instant start = Instant.now();
    
            for (int i = 1; i <= 5; i++) {
                client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class);
            }
    
            logTime(start);
        }
    
        private static void logTime(Instant start) {
            logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
        }
    
    }
    

    Aquí, creamos un WebClient pasando el baseUrl. Luego, en el método principal, simplemente llamamos al punto final.

    get() indica que estamos haciendo una solicitud GET. Sabemos que la respuesta será un solo objeto, por lo que estamos usando un Mono como se explicó antes.

    En última instancia, le pedimos a Spring que mapeara la respuesta a un Person clase:

    Y no pasó nada, como se esperaba.

    Esto es porque nosotros no suscribiremos. Todo se aplaza. Es asincrónico, pero tampoco se inicia hasta que llamamos al .subscribe() método. Este es un problema común con las personas que son nuevas en Spring Reactor, así que esté atento a esto.

    Te puede interesar:Spring Data: Tutorial de MongoDB

    Cambiemos nuestro método principal y agreguemos subscribe:

    for (int i = 1; i <= 5; i++) {
        client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).subscribe();
    }
    

    Agregar el método nos solicita el resultado deseado:

    La solicitud se envía pero el .subscribe() El método no se sienta y espera la respuesta. Como no se bloquea, terminó antes de recibir la respuesta.

    ¿Podríamos contrarrestar esto encadenando .block() al final de las llamadas al método?

    for (int i = 1; i <= 5; i++) {
        client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).block();
    }
    

    Resultado:

    Obtuvimos la respuesta esta vez para cada persona, aunque tomó más de 10 segundos. Esto frustra el propósito de que la aplicación sea reactiva.

    La forma de solucionar todos estos problemas es simple: hacemos una lista de tipos Mono y espere a que se completen todos, en lugar de esperar a que cada uno:

    List<Mono<Person>> list = Stream.of(1, 2, 3, 4, 5)
        .map(i -> client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class))
        .collect(Collectors.toList());
    
    Mono.when(list).block();
    

    Resultado:

    Eso es lo que buscamos. Esta vez, tomó poco más de dos segundos, incluso con un retraso de red masivo. Esto aumenta drásticamente la eficiencia de nuestra aplicación y realmente cambia las reglas del juego.

    Si observa detenidamente los hilos, Reactor los está reutilizando en lugar de crear nuevos. Esto es realmente importante si su aplicación maneja muchas solicitudes en un período corto de tiempo.

    Conclusión

    En este artículo, discutimos la necesidad de programación reactiva y la implementación de Spring de la misma: Spring Reactor.

    Luego, discutimos el módulo Spring Webflux, que usa internamente Reactor, así como también conceptos cubiertos como Publisher y Subscriber. Tras esto, creamos una aplicación que publica datos como un flujo reactivo y los consumimos en otra aplicación.

    Te puede interesar:Spring Cloud: descubrimiento de servicios con Eureka

    El código fuente de este tutorial se puede encontrar en Github.

    Rate this post

    Etiquetas: