Reactive Programming Là Gì

Các vận dụng hiện đại bây giờ đều phải đương đầu với một nỗi lo phệ đó là làm thế nào để ứng dụng có thể vận động tốt với con số lớn người dùng truy cập và thực hiện đồng thời (high concurrencies issues), tuy nhiên khả năng đáp ứng của phần cứng ngày càng cải thiện, nhưng tốc độ và năng lực chịu tải (performance) của phần mềm (software) vẫn luôn là yếu tố chính.

Bạn đang xem: Reactive programming là gì

Và để tăng performance của áp dụng ta có hai cách chủ yếu đó là:

Xử lý tuy vậy song (parallelize): Sử dụng những Thread và các tài nguyên phần cứng hơn.Tận dụng nguồn lực phần cứng tinh giảm một cách kết quả hơn.I. Parallelize

Theo cách lập trình thông thường chúng ta sẽ viết phần nhiều blocking code, và khi gặp những vụ việc performance bọn họ sẽ giải quyết bằng cách cho mọi blocking code kia chạy xong xuôi xong (parallelize) để tăng tốc độ xử lý của chương trình. Điều đó đồng nghĩa tương quan với việc khối hệ thống sẽ đòi hỏi thêm khoáng sản (threads) , với đồng thời ta phải đối mặt với concurrency problems như Race Condition, Critical Sections, Dead lock…

Bất kỳ một bottleneck như thế nào dẫn tới khối hệ thống có độ trễ (latency) như tương quan tới I/O events (request DB tốt network call) , các resources như Threads sẽ không còn được giải phóng cùng bị blocking tính đến khi tác vụ đó được ngừng (trả về data). Dẫn tới hệ thống sẽ mau lẹ bị hết sạch tài nguyên và một thảm hoạ thác sẽ xẩy ra (cascading failures).

Do vậy kỹ thuật xử lý tuy nhiên song (parallelize) trong phần mềm không phải là 1 trong những viên đạn bạc tình (silver bullet), nó đòi hỏi sức mạnh mẽ của phần cứng đi kèm và cũng kèm từ đó là hồ hết vấn đề phức hợp cần phải xử lý đi theo.

II. Tận dụng tối đa phần cứng kết quả hơn

Tận dụng tài nguyên phần cứng hiệu quả hơn tức là hệ thống sẽ yên cầu ít tài nguyên hơn nhưng vẫn đáp ứng được nhân tố về performance của hệ thống. Để thỏa mãn nhu cầu được điều này, một có mang được sinh ra chính là Reactive Programming.


Reactive Programming là gì?

Có thể giới thiệu ngắn ngọn Reactive = Asynchronous + Non-Blocking I/O (NIO), tức là một lịch trình được gọi là Reactive nó sẽ bảo vệ được 2 nhân tố là Asynchronous (xử lý sự không tương đồng bộ) và Non-Blocking I/O.

Bằng biện pháp viết hầu như đoạn mã asynchronous với non-blocking, công tác sẽ chất nhận được switch qua các bóc vụ khác mà đang thực hiện cùng một I/O resource, và hoàn toàn có thể quay lại sử lý tiếp khi tác vụ đó hoàn thành. Do đó với reactive programing chương trình rất có thể sử lý các request rộng trên cùng một tài nguyên hệ thống.

Reactive cùng non-blocking nhìn tổng thể thì không khiến cho ứng dụng chạy nhanh hơn. Công dụng mà nó được mong rằng là vận dụng chịu mua được tốt hơn nhưng mà chỉ yêu cầu ít tài nguyên hơn.


*

*

*

*

Với mỗi các loại emit, Stream sẽ có mang từng function nhằm xử lý, một function để hứng return data , một function khác xử trí error , cùng một function nữa để dấn completed signal. Việc lắng nghe (listen) Stream được điện thoại tư vấn là Subscribe, những function được điện thoại tư vấn là Observers (quan liền kề viên), và chủ đề (subject) quan ngay cạnh (Observable) ở đấy là một Stream. Mẫu kiến thiết (Design pattern) này được gọi là Observer thiết kế Pattern.

--a---b-c---d---X---|-> a, b, c, d are emitted valuesX is an error| is the 'completed' signal---> is the timelineĐặc điểm của từng Stream chính là Immutability (bất biến), hy vọng xử lý hoặc thay đổi dữ liệu vào Steam ta luôn luôn phải tạo ra một Stream new từ Stream gốc bằng các function như filter, map, reduce .

origin Stream: ---c----c--c----c------c--> vvvvv map(c becomes 1) vvvvmap Stream: ---1----1--1----1------1--> vvvvvvvvv reduce(+) vvvvvvvvvrespone Stream: ---1----2--3----4------5-->Ngoài ra cùng với Reactive họ cũng hoàn toàn có thể gộp những Stream thành một bằng những function như merge, concat hay zip

Merge giỏi Concat nói chung là khá kiểu như nhau, các gộp 2 hay nhiều Stream cùng với cùng tài liệu trả về về thành một Stream mới, nhưng khác biệt là Merge thì không đảm bảo an toàn thứ tự (sequence) của dữ liệu của những Steam nhưng Concat thì trái lại dữ liệu sau thời điểm gộp sẽ bảo đảm an toàn thứ tự theo dữ liệu của các Steam vừa gộp.

Xem thêm: Hướng Chọn Mua Blackberry Passport Chính Hãng Ở Đâu Rẻ Nhất Tháng 05/2021


*

Project Reactor

Trong hệ sinh thái của JVM để dành được reactive programming, một dự án (project) đã có được ra đời, đó đó là Project Reactor cùng hạt nhân (core) của project đó là reactor-core. Nó hỗ trợ cho chúng ta những bộ thư viện để giúp lập trình viên tiện lợi thao tác và xử trí Data Stream vào Reactive.

Đặc điểm chủ yếu của Reactor đó là cung cấp hai lại kiểu dữ liệu (data type) của luồng dữ liệu (Publisher) đó là Flux với Mono

1. Flux: là một Stream hoàn toàn có thể phát ra 0..n phần tử, rất có thể hình dung nó là 1 trong List dữ liệu. Ví dụ như tạo đối chọi giản:

Flux just = Flux.just(1,2,3,4);Và cũng tương tự khái niệm về Reactive, tất cả 3 tín hiệu mà Flux emit ra nhằm Subscribe rất có thể nhận được sẽ là onNext() để hứng return data, onComplete() nhằm nhận bộc lộ Stream xong và onError() để nhận quý giá lỗi trả về.


Sơ đồ hoạt động của Flux

2. Mono: là một Stream hoàn toàn có thể phát ra 0..1 phần tử. Nó chuyển động gần y hệt như Flux, chỉ nên bị giới hạn không quá một phần tử hoặc ko có phần tử nào (rỗng) . Ví dụ:

Mono just = Mono.just("ABC"); // Mono với một phần tửMono just = Mono.empty(); // Mono cùng với 0 phần tử (rỗng)Cũng hệt như Flux Mono hỗ trợ 3 function onNext(), onComplet() với onError() nhằm Subscribe thao tác làm việc với dữ liệu được trả về.

Mono cũng có thể truyển biến thành một Flux, ví dụ 2 hoặc các Mono hoàn toàn có thể gộp thành (combine) một Flux bằng phương pháp sử dụng function concatWith(), ví dụ Mono#concatWith(Publisher) vẫn trả về một Flux. Hay được dùng Mono#then(Mono) để trả về một Mono không giống với mục đích hoàn thành một Stream mà không niềm nở tới dữ liệu của Mono gốc. Điểm khác biệt giữa Mono#then với Mono#map sẽ là then vận động dựa bên trên tính hiệu onComplete mặc dù Mono gốc rất có thể empty, vào khi maps hay flatmap chỉ hoạt động dựa trên dấu hiệu onNext , tức là chỉ vận động khi Mono gốc có dữ liệu trả về (not empty).


Sơ đồ buổi giao lưu của Mono

3. Subscribe: Như sẽ nói bên trên rằng “không có gì xảy ra cho đến khi subscribe”, các Stream như Mono xuất xắc Flux sẽ không hành động gì cả tính đến khi nó được Observer hay Subcriber (lắng nghe). Do thế trong Reactor có cung cấp một function subscribe() để thực hiện lắng nghe Stream.

Ví dụ nhằm subcribe một Flux với basic method không tồn tại đối số (arguments)

Flux ints = Flux.range(1, 3); //Tạo một Flux với 3 bộ phận từ 1->3ints.subscribe(); // triển khai lắng nghe bên trên Flux vừa tạoVới ví dụ trên thì sẽ không tồn tại out-put nào sinh sản ra, để rất có thể bắt (catch) được các out-put thì ta đang truyền một đối số làConsumer vào subscribe() ví dụ:

Flux ints = Flux.range(1, 3); ints.subscribe(i -> System.out.println(i)); // subcribe Flux cùng in ra tài liệu trả về của nóOutput:

123Error Event: Một lỗi rất có thể được sử lý ngay trong subcribe ((error handler) như lấy một ví dụ sau:

Flux ints = Flux.range(1, 6) //(1) .map(i -> // (2) if (i return i; throw new RuntimeException("Got to 4"); });ints.subscribe(i -> System.out.println(i), //(3) error -> System.err.println("Error: " + error)); //(4)(1) tạo ra một Stream Flux tất cả 4 thành phần từ 1-> 6(2) map lại Stream hiện tại ra một Steam mới mà chỉ được phép bao gồm 3 bộ phận từ 1->3 nếu to hơn sẽ throw ra một Exception(3) Print ra tài liệu output của Stream bắt đầu được tạo thành (4) áp dụng consumer là error để chấm dứt Stream với out-put ra lỗi nếu như có

Output:

123Error: java.lang.RuntimeException: Got lớn 4Completed Event: Nếu tất cả một error được throw ra thì Stream sẽ dừng lại (completed) ngay lập tức. Nếu không tồn tại lỗi xẩy ra thì ta rất có thể tạo một event completed khi Stream ngừng như ví dụ:

Flux ints = Flux.range(1, 4); //(1)ints.subscribe(i -> System.out.println(i), //(2) error -> System.err.println("Error " + error), (3) () -> System.out.println("Done")); (4)(1) sinh sản một Stream Flux gồm 4 phần tử từ 1-> 4(2) Print ra tài liệu output của Stream(3) thực hiện consumer là error để completed Stream với out-put ra lỗi giả dụ có(4) sử dụng consumer là () để completed Stream và out-put ra event complete

Output:

1234DoneSubscribe hoàn toàn có thể yêu cầu một hành vi nào đó, ví như yêu cầu số lượng dữ liệu được emit ra trước khi Steam được complete bằng cách sử dụng sub , ví dụ

Flux ints = Flux.range(1, 4);ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> System.out.println("Done"), sub -> sub.request(10));Stream trên sẽ bị treo (hangs) lâu dài (không khi nào completed) trừ khi Stream được cancel, cũng chính vì Subscribe đã yêu cầu chỉ completed cho tới khi nhận thấy đủ 10 phần tử.

Output: sự kiện “Done” sẽ không lúc nào được output ra.

1234Cancel Event: function Subscribe trả về một kiểu dữ liệu là Disposable và Disposable Interface có cung ứng một method là dispose() sẽ giúp một Stream có thể bị hủy bỏ (cancel) ví dụ:

Flux.just(1,2,3).subscribe().dispose();Stream đã ngay chớp nhoáng bị cancel ngay sau khoản thời gian nó được Subscribe


Ta có rất nhiều các project Reactive không giống được base bên trên Project Reactor ví dụ như Spring Webflux, một phiên phiên bản Spring Web cung cấp non-blocking reactive streams, với nó sử dụng Server Netty để run khối hệ thống reactive.Hay để đã có được reactive sinh sống đầu Database thì đa số các NoSQL phần nhiều đã hỗ trợ reactive driver (ví dụ Reactive MongoDB), hay project R2DBC hỗ trợ reactive driver cho những RDBMS DB.

Trên đây new chỉ là đầy đủ khái hiệm cơ phiên bản về Reactive Programing và Project Reactor, còn nhiều điều thú vui khác đợi bạn tìm hiểu và phân tách sẻ.