facebook / wangle
- пятница, 22 апреля 2016 г. в 03:14:49
C++
Wangle provides a full featured, high performance C++ futures implementation.
Going forward, Wangle will also provide a set of common client/server abstractions for building services in a consistent, modular, and composable way.
C++ networking library
Wangle is a library that makes it easy to build protocols, application clients, and application servers.
It's like Netty + Finagle smooshed together, but in C++
First, download and install the folly library from https://github.com/facebook/folly/ You'll also need cmake. Once folly is installed, run the following inside the wangle directory to build, test, and install wangle:
cmake .
make
ctest
sudo make install
There is a tutorial here that explains the basics of Wangle and shows how to build an echo server/client.
See the examples/ directory for some example Wangle servers and clients
Wangle is BSD-licensed. We also provide an additional patent grant.
See the CONTRIBUTING file for how to help out.
Wangle interfaces are asynchronous. Interfaces are currently based on Futures, but we're also exploring how to support fibers
You're probably familiar with Java's Netty, or Python's twisted, or similar libraries.
It is built on top of folly/async/io, so it's one level up the stack from that (or similar abstractions like boost::asio)
ServerBootstrap - easily manage creation of threadpools and pipelines
ClientBootstrap - the same for clients
Pipeline - set up a series of handlers that modify your socket data
This is roughly equivalent to the Finagle library.
Aims to provide easy testing, load balancing, client pooling, retry logic, etc. for any request/response type service - i.e. thrift, http, etc.
Service - a matched interface between client/server. A server will implement this interface, and a client will call in to it. These are protocol-specific
ServiceFilter - a generic filter on a service. Examples: stats, request timeouts, rate limiting
ServiceFactory - A factory that creates client connections. Any protocol specific setup code goes here
ServiceFactoryFilter - Generic filters that control how connections are created. Client examples: load balancing, pooling, idle timeouts, markdowns, etc.
Easily create a new server
ServerBootstrap does the work to set up one or multiple acceptor threads, and one or multiple sets of IO threads. The thread pools can be the same. SO_REUSEPORT is automatically supported for multiple accept threads. tcp is most common, although udp is also supported.
childPipeline(PipelineFactory<Pipeline>)
Sets the pipeline factory for each new connection. One pipeline per connection will be created.
group(IOThreadPoolExecutor accept, IOThreadPoolExecutor io)
Sets the thread pools for accept and io thread pools. If more than one thread is in the accept group, SO_REUSEPORT is used. Defaults to a single accept thread, and one io thread per core.
bind(SocketAddress),bind(port)
Binds to a port. Automatically starts to accept after bind.
stop()
Stops listening on all sockets.
join()
Joins all threadpools - all current reads and writes will be completed before this method returns.
waitForStop()
Waits for stop() to be called from another thread.
channelFactory(ServerSocketFactory)
Sets up the type of server. Defaults to TCP AsyncServerSocket, but AsyncUDPServerSocket is also supported to receive udp messages. In practice, ServerBootstrap is only useful for udp if you need to multiplex the messages across many threads, or have TCP connections going on at the same time, etc. Simple usages of AsyncUDPSocket probably don't need the complexity of ServerBootstrap.
pipeline(PipelineFactory<AcceptPipeline>)
This pipeline method is used to get the accepted socket (or udp message) *before* it has been handed off to an IO thread. This can be used to steer the accept thread to a particular thread, or for logging.
See also AcceptRoutingHandler and RoutingDataHandler for additional help in reading data off of the accepted socket before it gets attached to an IO thread. These can be used to hash incoming sockets to specific threads.
childHandler(AcceptorFactory)
Previously facebook had lots of code that used AcceptorFactories instead of Pipelines, this is a method to support this code and be backwards compatible. The AcceptorFactory is responsible for creating acceptors, setting up pipelines, setting up AsyncSocket read callbacks, etc.
A simple example:
ServerBootstrap<TelnetPipeline> server; server.childPipeline(std::make_shared<TelnetPipelineFactory>()); server.bind(FLAGS_port); server.waitForStop();
Create clients easily
ClientBootstrap is a thin wrapper around AsyncSocket that provides a future interface to the connect callback, and a Pipeline interface to the read callback.
group(IOThreadPoolExecutor)
Sets the thread or group of threads where the IO will take place. Callbacks are also made on this thread.
bind(port)
Optionally bind to a specific port
Future<Pipeline*> connect(SocketAddress)
Connect to the selected address. When the future is complete, the initialized pipeline will be returned.
pipelineFactory(PipelineFactory<Pipeline>)
Set the pipeline factory to use after a connection is successful.
ClientBootstrap<TelnetPipeline> client; client.group(std::make_shared<folly::wangle::IOThreadPoolExecutor>(1)); client.pipelineFactory(std::make_shared<TelnetPipelineFactory>()); // synchronously wait for the connect to finish auto pipeline = client.connect(SocketAddress(FLAGS_host,FLAGS_port)).get(); // close the pipeline when finished pipeline->close();
Send your socket data through a series of tubes
A Pipeline is a series of Handlers that intercept inbound or outbound events, giving full control over how events are handled. Handlers can be added dynamically to the pipeline.
When events are called, a Context* object is passed to the Handler - this means state can be stored in the context object, and a single instantiation of any individual Handler can be used for the entire program.
Netty's documentation: ChannelHandler
Usually, the bottom of the Pipeline is a wangle::AsyncSocketHandler to read/write to a socket, but this isn't a requirement.
A pipeline is templated on the input and output types:
EventBase base_; Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline; pipeline.addBack(AsyncSocketHandler(AsyncSocket::newSocket(eventBase)));
The above creates a pipeline and adds a single AsyncSocket handler, that will push read events through the pipeline when the socket gets bytes. Let's try handling some socket events:
class MyHandler : public InboundHandler<folly::IOBufQueue&> { public: void read(Context* ctx, folly::IOBufQueue& q) override { IOBufQueue data; if (q.chainLength() >= 4) { data.append(q.split(4)); ctx->fireRead(data); } } };
This handler only handles read (inbound) data, so we can inherit from InboundHandler, and ignore the outbound type (so the ordering of inbound/outbound handlers in your pipeline doesn't matter). It checks if there are at least 4 bytes of data available, and if so, passes them on to the next handler. If there aren't yet four bytes of data available, it does nothing, and waits for more data.
We can add this handler to our pipeline like so:
pipeline.addBack(MyHandler());
and remove it just as easily:
pipeline.remove<MyHandler>();
Instantiating all these handlers and pipelines can hit the allocator pretty hard. There are two ways to try to do fewer allocations. StaticPipeline allows *all* the handlers, and the pipeline, to be instantiated all in the same memory block, so we only hit the allocator once.
The other option is to allocate the handlers once at startup, and reuse them in many pipelines. This means all state has to be saved in the HandlerContext object instead of the Handler itself, since each handler can be in multiple pipelines. There is one context per pipeline to get around this limitation.
The stuff that comes with the box
This is almost always the first handler in the pipeline for clients and servers - it connects an AsyncSocket to the pipeline. Having it as a handler is nice, because mocking it out for tests becomes trivial.
Output is buffered and only sent once per event loop. This logic is exactly what is in ThriftServer, and very similar to what exists in proxygen - it can improve throughput for small writes by up to 300%.
Putting this right after an AsyncSocketHandler means that writes can happen from any thread, and eventBase->runInEventBaseThread() will automatically be called to put them in the correct thread. It doesn't intrinsically make the pipeline thread-safe though, writes from different threads may be interleaved, other handler stages must be only used from one thread or be thread safe, etc.
In addition, reads are still always called on the eventBase thread.
A decoder that splits received IOBufs by a fixed number of bytes. Used for fixed-length protocols
Prepends a fixed-length field length. Field length is configurable.
The receiving portion of LengthFieldPrepender - decodes based on a fixed frame length, with optional header/tailer data sections.
Decodes by line (with optional ending detection types), to be used for text-based protocols
Converts from IOBufs to std::strings and back for text-based protocols. Must be used after one of the above frame decoders
How to add a new protocol
Finagle's documentation on Services is highly recommended
A Pipeline was read() and write() methods - it streams bytes in one or both directions. write() returns a future, but the future is set when the bytes are successfully written. Using pipeline there is no easy way to match up requests and responses for RPC.
A Service is an RPC abstraction - Both clients and servers implement the interface. Servers implement it by handling the request. Clients implement it by sending the request to the server to complete.
A Dispatcher is the adapter between the Pipeline and Service that matches up the requests and responses. There are several built in Dispatchers, however if you are doing anything advanced, you may need to write your own.
Because both clients and servers implement the same interface, mocking either clients or servers is trivially easy.
ServiceFilters provide a way to wrap filters around every request and response. Things like logging, timeouts, retrying requests, etc. can be implemented as ServiceFilters.
Existing ServiceFilters include:
For some services, a Factory can help instantiate clients. In Finagle, these are frequently provided for easy use with specific protocols, i.e. http, memcache, etc.
ServiceFactoryFilters provide filters for getting clients. These include most connection-oriented things, like connection pooling, selection, dispatch, load balancing, etc.
Existing ServiceFactoryFilters:
Run your concurrent code in a performant way
Wangle provides two concrete thread pools (IOThreadPoolExecutor, CPUThreadPoolExecutor) as well as building them in as part of a complete async framework. Generally you might want to grab the global executor, and use it with a future, like this:
auto f = someFutureFunction().via(getCPUExecutor()).then(...)
Or maybe you need to construct a thrift/memcache client, and need an event base:
auto f = getClient(getIOExecutor()->getEventBase())->callSomeFunction(args...) .via(getCPUExecutor()) .then([](Result r){ .... do something with result});
The current C++11 std::launch only has two modes: async or deferred. In a production system, neither is what you want: async will launch a new thread for every launch without limit, while deferred will defer the work until it is needed lazily, but then do the work in the current thread synchronously when it is needed.
Wangle's thread pools always launch work as soon as possible, have limits to the maximum number of tasks / threads allowed, so we will never use more threads than absolutely needed. See implementation details below about each type of executor.
Unfortunately none of the existing thread pools had every feature needed - things based on pipes are too slow. Several older ones didn't support std::function.
If you want epoll support, you need an fd - event_fd is the latest notification hotness. Unfortunately, an active fd triggers all the epoll loops it is in, leading to thundering herd - so if you want a fair queue (one queue total vs. one queue per worker thread), you need to use some kind of semaphore. Unfortunately semaphores can't be put in epoll loops, so they are incompatible with IO. Fortunately, you usually want to separate the IO and CPU bound work anyway to give stronger tail latency guarantees on IO.
Base class that contains the thread startup/shutdown/stats logic, since this is pretty disjoint from how tasks are actually run
An observer interface is provided to listen for thread start/stop events. This is useful to create objects that should be one-per-thread, but also have them work correctly if threads are added/removed from the thread pool.
PoolStats are provided to get task count, running time, waiting time, etc.