Tutorial de Spring Reactor

    Visi贸n general

    En este art铆culo, nos presentaremos a Reactor de resorte proyecto y su importancia. La idea es aprovechar las Especificaci贸n de corrientes reactivas para crear aplicaciones reactivas sin bloqueo en la JVM.

    Con este conocimiento, crearemos una aplicaci贸n reactiva simple y la compararemos con una aplicaci贸n de bloqueo tradicional.

    Las aplicaciones reactivas son la “novedad de moda”, lo que hace que muchas aplicaciones cambien a este modelo. Puedes leer m谩s sobre esto en El Manifiesto Reactivo.

    Motivaci贸n

    Las API convencionales est谩n bloqueando

    Las aplicaciones modernas se ocupan de una gran cantidad de usuarios y datos simult谩neos. Ley de moore ya no se sostiene como sol铆a hacerlo. Las capacidades del hardware, aunque est谩n aumentando, no est谩n a la altura de las aplicaciones modernas donde el rendimiento es muy importante.

    Los desarrolladores de Java escriben c贸digo de bloqueo de forma predeterminada. As铆 es como se configur贸 la API. Otro ejemplo ser铆a el servlet tradicional (Gato) Acercarse. Cada solicitud garantiza un nuevo hilo que espera a que termine todo el proceso en segundo plano para enviar la respuesta.

    Esto significa que nuestra l贸gica de capa de datos est谩 bloqueando la aplicaci贸n de forma predeterminada, ya que los subprocesos esperan inactivos una respuesta. Es un desperdicio no reutilizar estos subprocesos para un prop贸sito diferente, mientras esperamos la respuesta.

    Cr茅dito: http://projectreactor.io/learn

    Nota: Esto puede ser un problema si tenemos recursos limitados o si un proceso tarda demasiado en ejecutarse.

    Bloques inm贸viles as铆ncronos

    En Java, puede escribir c贸digo de forma asincr贸nica usando Devoluciones de llamada y Futuros. Luego puede obtener y unir hilos en alg煤n momento posterior y procesar el resultado. Java 8 nos present贸 una nueva clase: CompletableFuturo, lo que hace que sea mucho m谩s f谩cil coordinar estas cosas.

    Funciona de una manera simple: cuando un solo proceso termina, comienza otro. Una vez finalizado el segundo, los resultados se combinan en un tercer proceso.

    Esto hace que sea mucho m谩s f谩cil coordinar su aplicaci贸n, pero en 煤ltima instancia sigue bloqueando, ya que crea Threads y espera al llamar a un .join() m茅todo.

    Cr茅dito: http://projectreactor.io/learn

    Programaci贸n reactiva

    Lo que queremos es asincr贸nico y sin bloqueo. Un grupo de desarrolladores de empresas como Netflix, Pivotal, RedHat, etc. se reunieron y convergieron en algo llamado La especificaci贸n de corrientes reactivas.

    Project Reactor es la implementaci贸n de Spring de The Reactive Specification y est谩 espec铆ficamente favorecido por la Spring Webflux m贸dulo, aunque puedes usarlo con otros m贸dulos como RxJava.

    La idea es operar de forma asincr贸nica con Backpressure utilizando editores y suscriptores.

    隆Aqu铆, nos presentan varios conceptos nuevos! Expliqu茅moslos uno por uno:

    • Editor – Un editor es un proveedor de un n煤mero de elementos potencialmente ilimitado.
    • Abonado – Un suscriptor escucha a ese publicador, solicitando nuevos datos. A veces, tambi茅n se lo conoce como consumidor.
    • Contrapresi贸n – La capacidad del suscriptor de permitirle al editor cu谩ntas solicitudes puede manejar en ese momento. Entonces, es el suscriptor el responsable del flujo de datos, no el editor, ya que solo proporciona los datos.

    El Proyecto Reactor ofrece 2 tipos de editoriales. Estos se consideran los principales componentes b谩sicos de Spring Webflux:

    • Flujo – es una editorial que produce 0 a N valores. Podr铆a ser ilimitado. Las operaciones que devuelven varios elementos utilizan este tipo.
    • Mononucleosis infecciosa – es una editorial que produce 0 a 1 valor. Las operaciones que devuelven un solo elemento utilizan este tipo.

    Desarrollo de aplicaciones reactivas

    Con todo lo anterior en mente, 隆saltemos a la creaci贸n de una aplicaci贸n web simple y aprovechemos este nuevo paradigma reactivo!

    La forma m谩s sencilla de comenzar con un proyecto esqueleto de Spring Boot, como siempre, es usando Spring Initializr. Seleccione su versi贸n preferida de Spring Boot y agregue la dependencia “Web reactiva”. Despu茅s de esto, generelo como un proyecto de Maven y 隆listo!

    Definamos un POJO simple – Greeting:

    public class Greeting {
        private String msg;
        // Constructors, getters and setters
    }
    

    Definici贸n de un editor

    Junto a 茅l, definamos un controlador REST simple con un mapeo adecuado:

    @RestController
    public class GreetReactiveController {
        @GetMapping("/greetings")
        public Publisher<Greeting> greetingPublisher() {
            Flux<Greeting> greetingFlux = Flux.<Greeting>generate(sink -> sink.next(new Greeting("Hello"))).take(50);
            return greetingFlux;
        }
    }
    

    Vocaci贸n Flux.generate () crear谩 un flujo interminable de Greeting objeto.

    los tomar() El m茅todo, como sugiere el nombre, solo tomar谩 los primeros 50 valores de la secuencia.

    Es importante tener en cuenta que el tipo de retorno del m茅todo es el tipo asincr贸nico Publisher<Greeting>.

    Para probar este punto final, navegue en su navegador hasta http: // localhost: 8080 / saludos o usa el rizo cliente en su l铆nea de comando – curl localhost:8080/greetings

    Se le pedir谩 una respuesta similar a la siguiente:

    Esto no parece tan importante y simplemente podr铆amos haber devuelto un List<Greeting> para lograr el mismo resultado visual.

    Pero nuevamente, observe que estamos devolviendo un Flux<Greeting>, que es un tipo asincr贸nico ya que cambia todo.

    Supongamos que tuvi茅ramos un editor que devolvi贸 m谩s de mil registros, o incluso m谩s. Piense en lo que tiene que hacer el marco. Se le da un objeto de tipo Greeting, que tiene que convertir a JSON para el usuario final.

    Si hubi茅ramos utilizado el enfoque tradicional con Spring MVC, estos objetos seguir铆an acumul谩ndose en su RAM y una vez que recopila todo lo devolver铆a al cliente. Esto podr铆a exceder nuestra capacidad de RAM y tambi茅n bloquea cualquier otra operaci贸n para que no se procese mientras tanto.

    Cuando usamos Spring Webflux, toda la din谩mica interna cambia. El marco comienza a suscribirse a estos registros del editor, serializa cada elemento y lo env铆a de vuelta al cliente en trozos.

    Hacemos las cosas de forma asincr贸nica sin crear demasiados hilos y reutilizando los hilos que est谩n esperando algo. La mejor parte es que no tienes que hacer nada extra por esto. En Spring MVC tradicional, podr铆amos lograr lo mismo volviendo AsyncResult, DefferedResult, etc. para obtener algo de asincron铆a, pero internamente Spring MVC tuvo que crear un nuevo Thread, que se bloquea ya que tiene que esperar.

    Eventos enviados por el servidor

    Otro editor que se ha utilizado desde su llegada es Eventos enviados por el servidor.

    Estos eventos permiten que una p谩gina web obtenga actualizaciones de un servidor en tiempo real.

    Definamos un servidor reactivo simple:

    @GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Publisher<Greeting> sseGreetings() {
        Flux<Greeting> delayElements = Flux
                .<Greeting>generate(sink -> sink.next(new Greeting("Hello @" + Instant.now().toString())))
                .delayElements(Duration.ofSeconds(1));
        return delayElements;
    }
    

    Alternativamente, podr铆amos haber definido esto:

    @GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Greeting> events() {
        Flux<Greeting> greetingFlux = Flux.fromStream(Stream.generate(() -> new Greeting("Hello @" + Instant.now().toString())));
        Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
        return Flux.zip(greetingFlux, durationFlux).map(Tuple2::getT1);
    }
    

    Estos m茅todos producen una TEXT_EVENT_STREAM_VALUE lo que esencialmente significa que los datos se env铆an en forma de eventos enviados por el servidor.

    Tenga en cuenta que en el primer ejemplo, estamos usando un Publisher y en el segundo ejemplo usamos un Flux. Una pregunta v谩lida ser铆a:

    “驴Qu茅 tipo de retorno debo usar entonces?”

    Se aconseja usar Flux y Mono encima Publisher. Ambas clases son implementaciones del Publisher interfaz que se origina en Reactive Streams. Si bien puede usarlos indistintamente, es m谩s expresivo y descriptivo usar las implementaciones.

    Estos dos ejemplos destacan dos formas de crear eventos enviados por el servidor con retraso:

    • .delayElements()– Este m茅todo retrasa cada elemento del Flux por la duraci贸n dada
    • .zip() – Estamos definiendo un Flux para generar eventos y un Flux para generar valores cada segundo. Al comprimirlos juntos, obtenemos un flujo que genera eventos cada segundo.

    Navegar a http: // localhost: 8080 / saludos / sse o usa un rizo cliente en su l铆nea de comando y ver谩 una respuesta que se parece a:

    Definici贸n de consumidor

    Ahora veamos el lado del consumidor. Vale la pena se帽alar que no es necesario tener un editor reactivo para usar la programaci贸n reactiva en el lado consumidor:

    public class Person {
        private int id;
        private String name;
        // Constructor with getters and setters
    }
    

    Y luego tenemos un tradicional RestController con un solo mapeo:

    @RestController
    public class PersonController {
        private static List<Person> personList = new ArrayList<>();
        static {
            personList.add(new Person(1, "John"));
            personList.add(new Person(2, "Jane"));
            personList.add(new Person(3, "Max"));
            personList.add(new Person(4, "Alex"));
            personList.add(new Person(5, "Aloy"));
            personList.add(new Person(6, "Sarah"));
        }
    
        @GetMapping("/person/{id}")
        public Person getPerson(@PathVariable int id, @RequestParam(defaultValue = "2") int delay)
                throws InterruptedException {
            Thread.sleep(delay * 1000);
            return personList.stream().filter((person) -> person.getId() == id).findFirst().get();
        }
    }
    

    Inicializamos una lista de tipo Person y basado en el id pasado a nuestro mapeo, filtramos a esa persona usando un flujo.

    Es posible que se alarme por el uso de Thread.sleep() aqu铆, aunque solo se usa para simular un retraso de red de 2 segundos.

    Si est谩 interesado en leer m谩s sobre Java Streams, 隆lo tenemos cubierto!

    Sigamos adelante y creemos nuestro consumidor. Al igual que el editor, podemos hacer esto f谩cilmente usando Spring Initializr:

    Nuestra aplicaci贸n de productor se est谩 ejecutando en el puerto 8080. Ahora digamos que queremos llamar al /person/{id} punto final 5 veces. Sabemos que, de forma predeterminada, cada respuesta tiene un retraso de 2 segundos debido al “retraso de la red”.

    Primero hagamos esto usando el tradicional RestTemplate Acercarse:

    public class CallPersonUsingRestTemplate {
    
        private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingRestTemplate.class);
        private static RestTemplate restTemplate = new RestTemplate();
    
        static {
            String baseUrl = "http://localhost:8080";
            restTemplate.setUriTemplateHandler(new DefaultUriBuilderFactory(baseUrl));
        }
    
        public static void main(String[] args) {
            Instant start = Instant.now();
    
            for (int i = 1; i <= 5; i++) {
                restTemplate.getForObject("/person/{id}", Person.class, i);
            }
    
            logTime(start);
        }
    
        private static void logTime(Instant start) {
            logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
        }
    }
    

    Vamos a ejecutarlo:

    Como se esperaba, tom贸 un poco m谩s de 10 segundos y as铆 es como Spring MVC funciona de forma predeterminada.

    En la actualidad, esperar un poco m谩s de 10 segundos para obtener un resultado en una p谩gina es inaceptable. 脡sta es la diferencia entre conservar un cliente / cliente y perderlo por esperar demasiado.

    Spring Reactor introdujo un nuevo cliente web para realizar solicitudes web llamado WebClient. En comparaci贸n con RestTemplate, este cliente tiene una sensaci贸n m谩s funcional y es completamente reactivo. Est谩 incluido en el spring-boot-starter-weblux dependencia y est谩 construido para reemplazar RestTemplate de una manera no bloqueante.

    Reescribamos el mismo controlador, esta vez, usando WebClient:

    public class CallPersonUsingWebClient_Step1 {
    
        private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingWebClient_Step1.class);
        private static String baseUrl = "http://localhost:8080";
        private static WebClient client = WebClient.create(baseUrl);
    
        public static void main(String[] args) {
    
            Instant start = Instant.now();
    
            for (int i = 1; i <= 5; i++) {
                client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class);
            }
    
            logTime(start);
        }
    
        private static void logTime(Instant start) {
            logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
        }
    
    }
    

    Aqu铆, creamos un WebClient pasando el baseUrl. Luego, en el m茅todo principal, simplemente llamamos al punto final.

    get() indica que estamos haciendo una solicitud GET. Sabemos que la respuesta ser谩 un solo objeto, por lo que estamos usando un Mono como se explic贸 antes.

    En 煤ltima instancia, le pedimos a Spring que mapeara la respuesta a un Person clase:

    Y no pas贸 nada, como se esperaba.

    Esto es porque nosotros no suscribiremos. Todo se aplaza. Es asincr贸nico, pero tampoco se inicia hasta que llamamos al .subscribe() m茅todo. Este es un problema com煤n con las personas que son nuevas en Spring Reactor, as铆 que est茅 atento a esto.

    Cambiemos nuestro m茅todo principal y agreguemos subscribe:

    for (int i = 1; i <= 5; i++) {
        client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).subscribe();
    }
    

    Agregar el m茅todo nos solicita el resultado deseado:

    La solicitud se env铆a pero el .subscribe() El m茅todo no se sienta y espera la respuesta. Como no se bloquea, termin贸 antes de recibir la respuesta.

    驴Podr铆amos contrarrestar esto encadenando .block() al final de las llamadas al m茅todo?

    for (int i = 1; i <= 5; i++) {
        client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).block();
    }
    

    Resultado:

    Obtuvimos la respuesta esta vez para cada persona, aunque tom贸 m谩s de 10 segundos. Esto frustra el prop贸sito de que la aplicaci贸n sea reactiva.

    La forma de solucionar todos estos problemas es simple: hacemos una lista de tipos Mono y espere a que se completen todos, en lugar de esperar a que cada uno:

    List<Mono<Person>> list = Stream.of(1, 2, 3, 4, 5)
        .map(i -> client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class))
        .collect(Collectors.toList());
    
    Mono.when(list).block();
    

    Resultado:

    Eso es lo que buscamos. Esta vez, tom贸 poco m谩s de dos segundos, incluso con un retraso de red masivo. Esto aumenta dr谩sticamente la eficiencia de nuestra aplicaci贸n y realmente cambia las reglas del juego.

    Si observa detenidamente los hilos, Reactor los est谩 reutilizando en lugar de crear nuevos. Esto es realmente importante si su aplicaci贸n maneja muchas solicitudes en un per铆odo corto de tiempo.

    Conclusi贸n

    En este art铆culo, discutimos la necesidad de programaci贸n reactiva y la implementaci贸n de Spring de la misma: Spring Reactor.

    Luego, discutimos el m贸dulo Spring Webflux, que usa internamente Reactor, as铆 como tambi茅n conceptos cubiertos como Publisher y Subscriber. Tras esto, creamos una aplicaci贸n que publica datos como un flujo reactivo y los consumimos en otra aplicaci贸n.

    El c贸digo fuente de este tutorial se puede encontrar en Github.

    Etiquetas:

    Deja una respuesta

    Tu direcci贸n de correo electr贸nico no ser谩 publicada. Los campos obligatorios est谩n marcados con *