Quickstart
Bringing up the Stack
$ docker-compose up
Configuring Lambda Flow
$ flow --config
Database connection URI [postgresql://postgres:pyfi101@phoenix:5432/pyfi]:
Result backend URI [redis://localhost]:
Message broker URI [pyamqp://localhost]:
Configuration file created at /home/user/pyfi.ini
Initialize the Database
$ flow db init
Enabling security on table action
Enabling security on table event
Enabling security on table flow
Enabling security on table jobs
Enabling security on table log
Enabling security on table privilege
Enabling security on table queue
Enabling security on table queuelog
Enabling security on table role
Enabling security on table scheduler
Enabling security on table settings
Enabling security on table task
Enabling security on table user
Enabling security on table node
Enabling security on table processor
Enabling security on table role_privileges
Enabling security on table user_privileges
Enabling security on table user_roles
Enabling security on table agent
Enabling security on table plug
Enabling security on table socket
Enabling security on table call
Enabling security on table plugs_queues
Enabling security on table plugs_source_sockets
Enabling security on table plugs_target_sockets
Enabling security on table sockets_queues
Enabling security on table worker
Enabling security on table calls_events
Database create all schemas done.
The Flow CLI
$ flow
Usage: flow [OPTIONS] COMMAND [ARGS]...
Flow CLI for managing the pyfi network
Options:
--debug Debug switch
-d, --db TEXT Database URI
--backend TEXT Task queue backend
--broker TEXT Message broker URI
-i, --ini TEXT Flow .ini configuration file
-c, --config Configure pyfi
--help Show this message and exit.
Commands:
add Add an object to the database
agent Run pyfi agent
api API server admin
db Database operations
delete Delete an object from the database
listen Listen to a processor output
ls List database objects and their relations
node Node management operations
proc Run or manage processors
scheduler Scheduler management commands
task Pyfi task management
update Update a database object
web Web server admin
whoami Database login user
worker Run pyfi worker
Creating Your First Flow
Let’s look at the sequence of CLI commands needed to build out our flow infrastructure and execute a task. From scratch!
First thing we do below is create a queue. This provides the persistent message broker the definition it needs to allocate a message queue
by the same name for holding task messages.
Next we create a processor, which refers to our gitrepo and defines the module within that codebase we want to expose. It also defines the host where the processor should be run, but that is optional. We specific a concurrency value of 5 that indicates the scale for our processor. This means it will seek to occupy 5 CPUs, allowing it to run in parallel and respond to high-volume message traffic better.
Then we create sockets and attach them to our processor. The socket tells pyfi what specific python function we want to receive messages for and what queue it should use. Lastly, it indicates what processor to be attached to.
Finally, we can run our task and get the result.
$ flow add queue -n pyfi.queue1 -t direct
$ flow add processor -n proc1 -g https://github.com/radiantone/pyfi-processors -m pyfi.processors.sample -h localhost -c 5
$ flow add socket -n pyfi.processors.sample.do_something -q pyfi.queue1 -pn proc1 -t do_something
$ flow add socket -n pyfi.processors.sample.do_this -q pyfi.queue1 -pn proc1 -t do_this
$ flow task run --socket pyfi.processors.sample.do_this --data "['some data']"
Do this: ['some data']
Creating Sockets
Sockets represent addressable endpoints for python functions hosted by a processor. Remember, the processor points to a gitrepo and defines a python module within that repo. The socket defines the task (or python function) within the processor python module. Thus, a single processor can have many sockets associated with it. Sockets also declare a queue they will use to pull their requests from. This allows calls to tasks to be durable and reliable.
The following extract from the above flow defines a socket, gives it a name pyfi.processors.sample.do_something
, declares the queue pyfi.queue1
, associates it with processor named proc1
and represents the python function/task do_something
.
$ flow add socket -n pyfi.processors.sample.do_something -q pyfi.queue1 -pn proc1 -t do_something
Defining Socket Functions
Once you’ve built out your flow and infrastructure to support it, you can create convenient types that represent your python functions via the Socket class.
For the parallel flow above, we import the .p (or partial) signature from this file, which comes from our Socket we created earlier named pyfi.processors.sample.do_something
.
Remember, the socket captures the module (from its parent Processor) and function name within that module you want to run. Think of it like an endpoint with a queue in front of it.
We take one step further in the file below and rename Socket class to Function simply as a linguistic preference in this context.
from pyfi.client.api import Socket as Function
do_something = Function(name='pyfi.processors.sample.do_something')
do_something_p = do_something.p
do_this = Function(name='pyfi.processors.sample.do_this')
do_this_p = do_this.p
Once we’ve created our function definitions above, we can use them like normal python functions as in the parallel workflow below!
Executing Socket Functions
Executing socket functions from python is very easy. Since we can create the socket ahead of time, we only need to refer to it by name as above.
from pyfi.client.examples.api import do_something_p as do_something
do_something("Some text!")
The just invoke the function reference as you normally would. If you are using the function within a parallel API structure such as parallel
, pipeline
, funnel
etc then you should use the partial
(.p, _p) version of the function signature.
This allows ElasticCode to add arguments to the task when it is invoked. The invocation is deferred so it doesn’t happen at the time you declare your workflow. The reason is because your task will execute on thos remote CPU at a time when the workflow reaches that task.
So the .p partial is a signature
for your task in that respect.
Running a Parallel Workflow
from pyfi.client.api import parallel, pipeline, funnel
from pyfi.client.example.api import do_something_p as do_something
# Create a pipeline that executes tasks sequentially, passing result to next task
_pipeline = pipeline([
do_something("One"),
do_something("Two"),
# Create a parallel structure that executes tasks in parallel and returns the
# result list
parallel([
do_something("Four"),
do_something("Five"),
]),
do_something("Three")])
# Create another parallel structure using the above pipeline as one of its tasks
_parallel = parallel([
_pipeline,
do_something("Six"),
do_something("Seven")])
# Create a funnel structure that executes all its tasks passing the result to the
# single, final task
_funnel = funnel([
do_something("Eight"),
_parallel,
do_something("Nine")])
# Gather the result from the _funnel and send it to do_something("Four")
print("FUNNEL: ", _funnel(do_something("Four")).get())