Zwiększanie wydajności przesyłania strumieniowego i obsługi żądań dzięki gRPC i coroutines: część 4

sair.synerise.com 1 tydzień temu

Overview

As highlighted in our erstwhile articles, the gateway plays a crucial function in receiving responses from Query API Gateway, interacting with multiple workers to gather data, and performing operations on them erstwhile needed (such as returning the top 100 records or reducing the data to a single value, or the full income for a company over a given period). The gateway besides plays a key function in supporting SQL syntax, allowing you to connect utilizing any MySQL connector and send queries.

Some queries, like SELECT, can limit the amount of returned data (e.g., SELECT * FROM Customers LIMIT 1000;), so we want to fetch only a subset of the data. An effective approach is to communicate the desired number of records with the workers. Yet, in cases of intricate queries, determining the precise data request upfront may not be possible. To address this uncertainty, we implement streaming. By utilizing streaming, we can extract data in manageable chunks, ensuring we retrieve the essential information without the request to know the full size beforehand.

Streaming proves useful in managing complex analytic queries, where the combined consequence from all workers could be respective gigabytes. alternatively than risking memory overflow by trying to fetch all the data at once, we can retrieve the data in smaller chunks, execute reductions on each chunk, and then fetch the next chunk erstwhile needed.

In this article, I’ll show how to implement a server and client for streaming data. We'll kick things off with a straightforward solution before diving into a more advanced coroutine mechanics featuring a generator.

Image1. Communication between services

Protobuf structure.

First, we request to specify a protobuf structure for streaming. We can reuse the existing Request and Response objects from the Handle method. To enable streaming, we simply add the stream keyword before the Response in the method definition. erstwhile this structure is defined, we can initiate streaming. Initially, we send a Request, and then, for each read operation, we receive a Response. In this case, each Response represents a chunk of data.

message Request { string message = 1; int id = 2; } message consequence { string message = 1; } service individual { rpc Stream (Request) returns (stream Response) {} }

Listing 1. Example protobuf structure for a streaming request named Stream.

After defining the protobuf, we can set up a basic streaming mechanism. First, let's examine how we can implement this synchronously. While this approach works, it's inefficient due to the fact that each Read method blocks the thread. If we request to read data from 2,160 workers, this results in frequent thread context switching, which can importantly impact performance.

auto client = Worker::NewStub(grpc::CreateChannel(url, grpc::InsecureChannelCredentials())); Request request; Response response; // Set any request request.set_request(message);\ // make a reader object that will let to read chunks auto reader = client->Stream(&context, request); // Read data until server close connection while (reader->Read(&response)) { std::cout << response.response() << std::endl; }

Listing 2. Simple streaming method

Client-Side Streaming with Coroutines

To optimize the communication for streaming, we will control to an asynchronous model utilizing coroutines. By adopting this approach, we can avoid blocking full threads and alternatively suspend the coroutine during data retrieval, resuming erstwhile the desired results erstwhile they are ready. Here’s how we can implement this:

  1. Create an object to hold request information: First, we request an object that will encapsulate all the essential details about the request. This object will be utilized to manage the request lifecycle.
  2. Create an awaitable object: We request to specify an Awaitable object that will let the coroutine to suspend until the consequence is available. This object will handle the completion of the request erstwhile the consequence is received from the worker.
  3. Add streaming method to worker and workerManager: request to extend the individual class with a fresh method that supports streaming, as well as update the workerManager class to let simultaneous querying of all workers.

Listing 3 illustrates modification of the RequestContext class to handle streaming data asynchronously. By adopting this method, the client can manage multiple data chunks as they arrive, all without hindering threads. The co_await mechanism will be utilized to suspend the coroutine while waiting for data, and erstwhile the data arrives, the coroutine will resume.

Class RequestContxet { public: RequestContxet(/* all neccessary data */); // Return current awaitable. We request to remember // that this awaitable will be destroyed erstwhile we call // `readNext` method. So, we should carefully // usage this object. Later I will show you a better solution. Awaitable<Response>& awaitable() { return *_awaitable; } // Return current response, each time we call `readNext` // it will be replaced with a fresh data const Response& currentChunk { return _response; } // Initialize a streaming void asyncStream() { // make first awaitable to wait until streaming start _awaitable = std::make_shared<Awaitable<Response>>(); _context = std::make_unique<grpc::ClientContext>(); _reader = _client.AsyncStream(&context, request, _cq, reinterpret_cast<void*>(this)); } // Ask for another chunk and return a corresponding // awaitable object, on which we can call `co_await` // to suspend coroutine. std::shared_ptr<Awaitable<Response>> readNext() { // override erstwhile awaitable _awaitable = std::make_shared<Awaitable<Response>>(); // ask about next chunk _reader->Read(&response, reinterpret_cast<void*>(this)); return _awaitable; } std::shared_ptr<Awaitable<Response>> finish() { // override erstwhile awaitable _awaitable = std::make_shared<Awaitable<Response>>(); // send info that we want to halt streaming _reader->Finish(&_status, reinterpret_cast<void*>(this)); return _awaitable; } private: std::shared_ptr<Awaitable<Response>> _awaitable; std::unique_ptr<grpc::ClientContext> _context; grpc::Status _status; consequence _response; std::unique_ptr<grpc::ClientAsyncReader<Response> _reader; };

Listing 3. RequestContext class

After refactoring the RequestContext class, we can now initiate a stream and read as many data chunks as needed, or until the server sends everything. Since we will be overriding the awaitable object with each request (asyncStream, readNext, and finish), it is crucial to usage a std::shared_ptr to guarantee safety. Failing to do so could consequence in a dangling pointer situation erstwhile the object is overwritten. However, we have a more efficient solution for this challenge, which we will discuss later.

Key Methods for Streaming

  • readNext: This method requests the server for the next chunk of data. erstwhile the data becomes available, it triggers the coroutine for the associated awaitable object to resume, thus allowing for uninterrupted data reception without blocking the thread.
  • Finish: The finish method is utilized to end the stream early erstwhile we have received all the data we need. It sends a signal to the server, telling it to halt streaming further data. This prevents unnecessary data transfer and helps control memory usage more effectively.

Rewriting the individual Class to Support Streaming

Now, let's take a look at how we should update the individual class to support the streaming functionality. The key changes revolve around managing multiple read operations and effectively handling the lifecycle of the request.

class individual { public: init(grpc::CompletionQueue* cq); // Return an *RequestContext* object, that allow // to get current *awaitable* object // and read streaming data. This is simply a temporary // solution which is rather bad, due to the fact that // this object will be destroyed by CompletionQueue, // and we can have a dangling pointer // Later I will show you how we can hide this object // and make streaming better RequestContext* stream(const Request& request); private: // Identified individual WorkerID id; // Channel for sending a requests std::unique_ptr<Worker::Stub> _client; // CompletionQueue on which we push a request. grpc::CompletionQueue* _cq; };

Listing 4. individual class

The main drawback of returning RequestContext directly is that it exposes implementation details to the user, which we previously kept hidden. erstwhile dealing with a single request, the user remains unaware of this object's presence. Additionally, the issue of dangling pointers arises erstwhile the CompletionQueue disposes of the object at the end of streaming, requiring caution in its usage. erstwhile the streaming concludes and the halt signal is received, the pointer will become invalid. In the upcoming Server implementation, I will show how we can usage std::shared_ptr to destruct manual memory management. But before that, let’s first research how we can implement the stream method for the worker.

RequestContext* stream(const Request& request) { // We request to manually manage the life of request. // That's why we put this class // in *asyncHandle* request as a *tag*. // erstwhile request finish, we will get in from // completion queue and we can delete it. RequestContext* requestContext = fresh RequestContext(/* all essential data */); requestContxet->asyncStream(); return requestContxet; }

Listing 5. Reading streaming data.

After initiating a streaming request, we make a RequestContext, which generates the first awaitable. Access to this awaitable is granted by calling the awaitable() method, allowing for the wait as the streaming commences. Although not ideal, this method suffices for now. erstwhile we research generators, we’ll replace it with a more efficient approach. Next, erstwhile data needs to be read, the readNext method is called. Each invocation of readNext returns an Awaitable for the Response object. erstwhile starting the streaming, the consequence will be empty due to the fact that no data has been read yet.

Task<void> streamSomething(const std::string& messageRequest) { Request request; request.set_message(messageRequest); RequestContext* context = workerManager.requestStream(WorkerId(42), request); // First awaitable will notify about start streaming. car startStreamingAwaitable = context->awaitable(); co_await *startStreamingAwaitable; while (true) { car awaitable = context->readNext(); // suspend coroutine until fresh chunk is available. const car chunk = co_await *awaitable; if (chunk->response() == "FINISH") { // Server finish, so we besides should finish. // Ofc. This is bad to depends on specified string :) // I fix it later. co_return; } // do sth with chunk doSth(chunk); } co_return; }

Listing 6. usage a RequestContext class to read data

The streamSomething function demonstrates how we can usage RequestContext to read data. However, there's an issue with comparing the consequence to the string "FINISH" as it may pose a hazard if the value "FINISH is valid message in the stream. To address this, we request to make any adjustments to the Response object. It should be a std::optional or std::unique_ptr, and erstwhile the streaming is finished, we should return std::nullopt or nullptr to signal the end of the stream. I’ll leave this modification up to you. For now, let’s decision forward and research a more solution for handling the coroutine part.

Coroutine Generator in C++20

Starting with C++20, it introduced support for coroutines, which let writing asynchronous code in a more natural, sequential style. In C++23, the standard library introduced std::generator, a coroutine-based generator type. However, since we are assuming the usage of only C++20 (Ubuntu requirements), we will request to implement our own generator mechanism.

A coroutine generator is fundamentally a coroutine that can produce a series of values over time, each time it's awaited. alternatively of returning a single value, a generator yields multiple values across suspensions and resumptions, making it suitable for streaming data or lazy evaluation. Let's look at how we can implement our own generator based on C++20 coroutine functionality.

Basic Structure of a Generator

In C++20, coroutines are based on the co_await, co_return, and co_yield keywords. To implement a generator, we'll request to specify a coroutine that yields values 1 at a time and allows the consumer to await the next value erstwhile ready.

template <typename T> struct generator { struct promise_type; utilizing handle_type = std::coroutine_handle<promise_type>; struct promise_type { T currentValue; std::exception_ptr exception; generator get_return_object() { return generator(handle_type::from_promise(*this)); } std::suspend_always initial_suspend() { return {}; } std::suspend_always final_suspend() { return {}; } void return_void() {} std::suspend_always yield_value(const T& val) { currentValue = val; return {}; } void unhandled_exception() { // store exception for further usage exception = std::current_exception(); } }; handle_type h_; generator(handle_type h) : h_(h) {} ~generator() { if (h_) { h_.destroy(); } } bool getNext() { h_.resume(); if (h_.promise().exception) { // propagate coroutine exception in called context std::rethrow_exception(h_.promise().exception); } return !h_.done(); } T value() { return h_.promise().currentValue ; } };

Listing 7. Simple coroutine implementation

Now that we have our generator class, we can usage it in the following way (listing 8).

generator<int> getValues(int n) { for (int i = 0; i <= n; ++i) { co_yield i; // Yield values 1 at a time } } int main() { car gen = getValues(5); while (gen.getNext()) { std::cout << gen.value() << std::endl; } return 0; }

Listing 8. Simple usage of generic coroutine generator.

Implementing a Simple Streaming Function with a Generator

Now that we've set up a basic coroutine generator capable of yielding values and managing exceptions, let's implement a simple streaming function. This function will usage our generator to produce n values.

The key thought is that any function that uses the co_yield operator is simply a generator function. This allows us to make multiple objects or values as part of a single function call. erstwhile we want to halt producing values, we usage the co_return operator to halt the coroutine. The operator bool will indicate whether there are more values to produce, and it will return false erstwhile the generator has finished.

We'll besides introduce a full_ member, which acts as a safeguard to guarantee that the coroutine doesn't resume twice erstwhile we've already reached the end of the stream.

emplate <typename T> struct generator { struct promise_type; utilizing handle_type = std::coroutine_handle<promise_type>; struct promise_type { T current_value; // To track if the coroutine has already been resumed once bool full_ = false; std::exception_ptr exception; generator get_return_object() { return generator(handle_type::from_promise(*this)); } std::suspend_always initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} std::suspend_always yield_value(const T& val) { current_value = val; return {}; } void unhandled_exception() { exception = std::current_exception(); } }; generator(handle_type h) : _h(h) { } ~generator() { if (_h) { _h.destroy(); } } operator bool() { // The only way to check if generator is done // is to resume it and check if there is sth more fill(); return !_h.done(); } T operator()() { fill(); if (_h.promise().exception) { std::rethrow_exception(_h.promise().exception); } full_ = false; return _h.promise().current_value; } private: void fill() { if (!full_ && _h) { _h.resume(); full_ = true; } } handle_type _h; bool full_ = false; }; // Generator function that simulates streaming of `n` values generator<int> streamSomething(int n) { for (int i = 0; i < n; ++i) { co_yield i; // Yielding values 1 at a time } co_return; // End of the generator erstwhile all values are produced } int main() { // Request 5 values from the stream car gen = streamSomething(5); // Streaming values, checking for completion while (gen) { // Produce and print the value std::cout << "Received: " << gen() << std::endl; } std::cout << "Stream finished!" << std::endl; return 0; }

Listing 9. Example generator function.

Image2. Generator sequence

Based on listing 9 we can read that:

The generator class handles the lifecycle of the coroutine. It includes:

  • A promise_type that defines the behaviour of the generator (how it yields values, handles exceptions, and ends).
  • The bool() operator checks if there are more values to be yielded (i.e., whether the coroutine has completed or not).
  • The operator() method is utilized to resume the coroutine and get the next value. It besides manages the full_ state to prevent multiple resumptions of the coroutine.

streamSomething Function:

  • The streamSomething function takes an integer n and yields values from 0 to n - 1.
  • It uses the co_yield operator to return each value, and erstwhile the loop ends, co_return ensures the generator completes.
  • As long as bool() operator returns true, the generator will proceed to yield values erstwhile operator() is called.

Main Loop:

  • The main function creates a generator that will yield 5 values.
  • It uses a while (gen) loop to keep calling gen() and printing the returned values until the generator finishes.
  • Once the generator is done (when operator bool() returns false), the program exits the loop and prints "Stream finished!".

Now that we have everything in place, we can implement streaming functionality. The stream function should stay unaware of the details of coroutines. Instead, we invoke operator() on the generator object to produce a fresh value. erstwhile streaming stops, bool() operator will return false to indicate that there is no more data to yield. If we decide to terminate the streaming early, we simply return from the function, and the generator will be destroyed, thereby halting the streaming process. We request to implement akin logic in the coroutine to support stopping the streaming, just like we did in the finish method of the RequestContext class (refer to Listing 3).

At this point, we are almost ready to usage the generator for streaming data. However, there is 1 key missing piece. The stream function from Listing 9 does not wait for data—it simply produces values. To address this, we request to introduce an awaitable object, which will let us to co_await until the next chunk of data is ready. In the improved implementation shown in Listing 10, we wait for the data to become available before calling co_yield to return it.

Generator<Response> Worker::stream( const std::string& messageRequest) { // make first awaitable to wait until streaming start. _awaitable = std::make_shared<Awaitable<std::optional<Response>>>(); // first we request to initialize a streaming before // we return a first value Request request; request.set_message(messageRequest); _context = std::make_unique<grpc::ClientContext>(); _reader = _client.AsyncStream(&context, request, _cq, reinterpret_cast<void*>(this)); // Here we hide information that we request to wait // for the start of streaming. // End user will get a generator that is ready to read data co_await *_awaitable; // Ask about fresh chunk until server will finish streaming while (true) { // make awaitable that will wait for result. // Here we besides hide coroutine details from the user. _awaitable = std::make_shared<Awaitable<std::optional<Response>>>(); // ask about next chunk _reader->Read(&response, reinterpret_cast<void*>(this)); // Suspend until we get a fresh chunk car chunk = co_await *_awaitable; // If we get *std::nullopt* it's mean streaming finished if (!chunk) { co_return; } // If we get a value return, a fresh chunk for a user co_yield chunk; } }

Listing 10. Generator which suspends until fresh data appear.

With the current setup, we have devised elegant mechanics for streaming data, effectively concealing all implementation complexities from the end user. Users are not required to have any cognition of RequestContext or initialization of streaming and waiting for completion of the first awaitable object.. Their only task is to call co_await to get the values. Additionally, users don't gotta manage or even be aware of the awaitables themselves, as the generator handles the waiting for data and then co_awaiting it erstwhile it becomes available.

From the user's perspective, the process is simplified: they just call co_await operator(), and the value is produced, without the request for thread switching. The coroutine is suspended and resumed as needed, facilitating efficient data streaming.

Server side (without coroutines)

In this implementation, I will show how to set up an asynchronous mechanics without manually allocating and freeing memory. We'll usage shared_ptr to manage memory automatically. Additionally, I'll show you how to implement this without relying on coroutines, due to the fact that in scenarios with only a fewer simultaneous requests, thread switching can become a bottleneck. For example, erstwhile a gateway opens 10 streams per worker, that leads to 10 * 2160 = 21,600 parallel requests, and frequent thread switching can slow things down significantly. However, if a single individual only opens 10 streams, thread switching becomes negligible and performance is little affected.

Image3. Server communication

We will extend the previously implemented Handler class by adding a fresh method, InitializeStreamRequest. This method will make a dedicated StreamContext object liable for managing the full streaming process. Since streaming involves more complexity than a single request, we request to handle the following states: Pending, Sending, Finish, and AsyncDone.

  • PendingTag: This state is triggered erstwhile streaming begins. It will be set erstwhile the server initializes a stream.
  • SendingTag: This state is triggered all time fresh data is sent. It will be called erstwhile a chunk is received by the client, signaling that another chunk can be sent.
  • Finish: This state is triggered erstwhile the stream is closed.
  • AsyncDone: This state is called erstwhile the user cancels streaming in the mediate or erstwhile the last chunk is sent.
Image4. Tag handling

Since we don't want to manually manage the life of objects, we've decided to usage a reference-counting mechanism. C++ provides std::shared_ptr, which automatically tracks references to an object and destroys it erstwhile the last mention is released. This mechanics is thread-safe, making it perfect for our usage case. Each state will be represented by a HandlerTag object, and each tag will hold its own copy of the shared_ptr to the StreamContext. erstwhile the last HandlerTag is processed, the request will no longer be in usage and will be automatically freed, thanks to the shared_ptr's mention counting and automatic memory management.

class RequestContext : public std::enable_shared_from_this<RequestContext> { public: utilizing Callback = std::function<void(std::shared_ptr<RequestContext>)>; RequestContext(grpc::ServerCompletionQueue* cq, Callback callback, Worker::AsyncService* service); // Virtual D’tor and remainder of regulation of 5 virtual void run(); virtual void initRequest(const std::string& requestName); virtual void finish(const QByteArray& result) = 0; private: HandlerTag _pendingTag; HandlerTag _finishTag; HandlerTag _doneTag; // another memebers };

Listing11. Request context class

gRPC allows for the initialization of 1 request at a time. While it can handle multiple requests simultaneously, the initialization process needs to be performed sequentially. Therefore, the developer must guarantee thread safety and guarantee that RequestStream is called only erstwhile at a time. erstwhile the server receives a request, it will be notified through the completion queue (CQ) by receiving the corresponding tag that was provided during initialization. Only erstwhile this process is complete can RequestStream be called again for the next request.

Sending chunks can happen simultaneously, as multiple threads can send their own data without needing to synchronize with each other. Each time we want to send a fresh chunk, we make a SendingTag, which will be triggered erstwhile the receiver gets the message. To maximize throughput and minimize waiting time, we will always prepare the next chunk to send while waiting for the acknowledgment of the erstwhile one. To facilitate this, we will store 2 tags at a time: the current tag and the erstwhile tag. This allows us to manage the sending process efficiently and proceed streaming without unnecessary delays.

Based on the information you provided, the server has 2 options erstwhile ending a stream: the Finish method and the WriteAndFinish method. The first approach simply closes the stream with a position code (either OK or an error). The second approach sends the last chunk of data and then closes the stream with the appropriate position code. If streaming finishes normally, after all data has been sent, we call Finish. However, if an mistake occurs during streaming, we usage WriteAndFinish to send the final message, ensuring that the last message sent is an mistake message, signaling the issue to the client before closing the stream.

With our server now equipped with a full asynchronous streaming process, complete with mistake handling and automatic memory management, we encounter a pivotal question: What happens if the client decides to interrupt the streaming? The client can decide at any minute that it has received all the data and wants to halt the stream. This can be done by calling try_cancel on the clientContext. erstwhile the client interrupts the streaming, the server will receive an AsyncDone tag. At this point, the server should halt sending fresh chunks and close the stream. This is why we request both AsyncDone and FinishTag:

  • The AsyncDone tag will indicate that the client has canceled the streaming, and the server should halt sending data.
  • The FinishTag will be utilized to decently close the stream erstwhile the process is complete, whether due to average completion or a client cancellation.
Image5. Error/Cancelation handling
void StreamContext::sendRow(std::unique_ptr<HandlerTag>&& sendTag) { effort { if (const car row = _stream->read()) { // Send next chunk consequence response; response.set_response(*row); _prevTag = std::move(_sendTag); _sendTag = std::move(sendTag); _responder.Write(response, (void*)_sendTag.get()); return; } // Finish streaming _responder.Finish(grpc::Status::OK, (void*)&_finishTag); } catch (const Exception& ec) { terrarium::Response response; response.set_response(ec.toJson()); _responder.WriteAndFinish(response, {}, grpc::Status::OK, (void*)&_finishTag); } }

Listing 12. Send row method

Conclusion

In this article, we explored how to enhance the efficiency of streaming and request handling in a distributed strategy utilizing gRPC and coroutines. By leveraging the power of asynchronous programming and streaming, we demonstrated how to optimize the handling of large datasets and complex queries. Through coroutines, we avoided the pitfalls of thread-blocking, ensuring that the strategy remains responsive even under dense load. Additionally, our introduction of coroutine-based generators offers a notable advantage, simplifying the management of streaming data and facilitating request handling without the request for intricate thread management strategies.

On the server side, we implemented a robust memory management strategy utilizing std::shared_ptr, which ensures that resources are decently allocated and deallocated without manual intervention. This allowed us to handle multiple streaming requests simultaneously without performance degradation, while besides enabling the strategy to respond rapidly to client interruptions and errors.

Through the integration of these techniques, we've engineered a scalable and efficient solution tailored for managing high-volume, real-time data streams. Whether tasked with handling millions of records or intricate analytic queries, this approach steadfastly guarantees the transportation of results without overwhelming the strategy or compromising performance.

As we proceed to build upon these foundations, there are many opportunities for further optimization, including fine-tuning the coroutine mechanisms and exploring alternate data streaming paradigms. However, the key takeaway is that by utilizing gRPC with coroutines and streaming, we can importantly improve both the efficiency and flexibility of distributed systems, making them more capable of handling modern, data-intensive workloads.

Mateusz Adamski, elder C++ developer at Synerise

Idź do oryginalnego materiału