cgarciae / pypeln
- вторник, 25 сентября 2018 г. в 00:15:55
Python
Concurrent data pipelines made easy
Pypeline is a simple yet powerful python library for creating concurrent data pipelines.
Install Pypeline using pip:
pip install pypeln
With Pypeline you can easily create multi-stage data pipelines using 3 type of workers:
You can create a pipeline based on multiprocessing.Process workers by using the pr
module:
from pypeln import pr
import time
from random import random
def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pr.map(slow_add1, data, workers = 3, maxsize = 4)
stage = pr.filter(slow_gt3, stage, workers = 2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
At each stage the you can specify the numbers of workers
. The maxsize
parameter limits the maximum amount of elements that the stage can hold simultaneously.
You can create a pipeline based on threading.Thread workers by using the th
module:
from pypeln import th
import time
from random import random
def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = th.map(slow_add1, data, workers = 3, maxsize = 4)
stage = th.filter(slow_gt3, stage, workers = 2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Here we have the exact same situation as in the previous case except that the worker are Threads.
You can create a pipeline based on asyncio.Task workers by using the io
module:
from pypeln import io
import asyncio
from random import random
async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1
async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = io.map(slow_add1, data, workers = 3, maxsize = 4)
stage = io.filter(slow_gt3, stage, workers = 2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Conceptually similar but everything is running in a single thread and Task workers are created dynamically.
For more information see the Pypeline Guide.
In the spirit of being a true pipeline library, Pypeline also lets you create your pipelines using the pipe |
operator:
data = (
range(10)
| pr.map(slow_add1, workers = 3, maxsize = 4)
| pr.filter(slow_gt3, workers = 2)
| list
)