Documentation Outline
Overview
ElasticCode is a distributed data flow and computation system that runs on transactional messaging infrastructure. It implements the concept of a NVM Networked-Virtual-Machine by distributing logic over networked hardware CPU/GPU processors.
It offers applications and users the following benefits:
Persistent Task & Workflow Execution - Tasks & Workflows persist within the network
Reliable Task Execution - Tasks execution survives failure anomalies, restarts, hardware faults
Simplified Workflow Functions - Parallel, Pipeline, Funnel
Powerful Compute Composition - Build at-scale data and compute flows using CLI, UI or API
Streaming Compute - Real-time streaming compute data flows
Secure & Trusted Code Execution - No client-side code marshalling or serialization. Code is loaded from the network side via git repositories into isolated virtual environments
Micro-Scheduling - Efficient task scheduling and 100% hardware utilization
Next-Gen User Interface - Quickly build out at-scale HPC data flows with simple and intuitive interfaces.
As a platform, ElasticCode is designed so you can build rich, high-performance applications, services and scripts on top. Doing this provides the transparent benefits listed above and makes building powerful compute applications fast and easy.

ElasticCode Platform Architecture
Managed Compute environment
The entire ElasticCode stack, as a whole, provides a complete “Managed Compute Platform” (MCP) with specialized tooling to support different layers of concern, such as:
Hardware Compute Resources
Compute Availability & Scheduling
Distributed Flows
Python Processors + Modules & Functions
Data Logging and Streaming + Real-time & Historical Metrics
ElasticCode is designed as a single, extensible platform for building reliable & persistent computational workflows. It relieves developers from having to know where and when tasks get executed or having to configure client side services. In addition, ElasticCode’s multiple API’s are designed for users (of all kinds) to build complex, fully-distributed HPC apps and sharable workflows. The platform nature of ElasticCode sets it apart from other libraries and frameworks that only tackle part of the big picture.
Simple, Parallel Workflows
ElasticCode exposes simple APIs that make writing powerful, distributed workflows fast and easy. A few examples below.
from pyfi.client.api import parallel, pipeline, funnel
from pyfi.client.example.api import do_something_p as do_something
# Create a pipeline that executes tasks sequentially, passing result to next task
_pipeline = pipeline([
do_something("One"),
do_something("Two"),
# Create a parallel structure that executes tasks in parallel and returns the
# result list
parallel([
do_something("Four"),
do_something("Five"),
]),
do_something("Three")])
# Create another parallel structure using the above pipeline as one of its tasks
_parallel = parallel([
_pipeline,
do_something("Six"),
do_something("Seven")])
# Create a funnel structure that executes all its tasks passing the result to the
# single, final task
_funnel = funnel([
do_something("Eight"),
_parallel,
do_something("Nine")])
# Gather the result from the _funnel and send it to do_something("Four")
print("FUNNEL: ", _funnel(do_something("Four")).get())
# Build out the infrastructure
pyfi add queue -n pyfi.queue1 -t direct
pyfi add processor -n proc1 -g https://github.com/radiantone/pyfi-processors -m pyfi.processors.sample
# Add sockets (not POSIX sockets!) that receive incoming task requests with -c concurrency factors (i.e. # of CPUs occupied)
pyfi add socket -n pyfi.processors.sample.do_something -q pyfi.queue1 -pn proc1 -t do_something -c 5
pyfi add socket -n pyfi.processors.sample.do_this -q pyfi.queue1 -pn proc1 -t do_this -c 8
# Execute a task (can re-run only this after network is built)
pyfi task run --socket pyfi.processors.sample.do_something --data "['some data']"
# Create alias' for the run task commands
alias pyfi.processors.sample.do_something="pyfi task run -s pyfi.processors.sample.do_something"
alias pyfi.processors.sample.do_this="pyfi task run -s pyfi.processors.sample.do_this"
# Pipe some output from stdin to a task
echo "HI THERE!" | pyfi.processors.sample.do_something
# Pipe some text to a task, then append some new text to that output, then send that to final task, do_this
echo "HI THERE!" | pyfi.processors.sample.do_something | echo "$(cat -) add some text" | pyfi.processors.sample.do_this
# Echo a string as input to two different processors and they run in parallel
echo "HI THERE!" | tee -a >(pyfi.processors.sample.do_something) tee -a >(pyfi.processors.sample.do_this)
$ pyfi ls call --id 033cf3d3-a0fa-492d-af0a-f51cf5f58d49 -g
pyfi.processors.sample.do_something
└─────────────────────────────────────────────┐
pyfi.processors.sample.do_something
┌──────────────────────┴───────────────────┬───────────────────────────────────────┐
pyfi.processors.sample.do_something pyfi.processors.sample.do_something pyfi.processors.sample.do_something
Persistent, Reliable Tasks
Task calls in your workflows are completely persisent, meaning they are stored in the ElasticCode network (database) and delivered to the task at the soonest possible time. This depends when the processor hosting the task is available and free to do the compute, but the task call will remain active until it has completed. If the task worker fails for any reason, the task can be retried on another node. These qualities of service are completely invisible to the application or user script.
High Level Architecture
ElasticCode’s high level architecture can be seen in the diagram below. Central to the architecture is the ElasticCode Model Database which stores the relational meta-model for the ElasticCode compute network. This database provides the single source of truth for the runtime operation of the distributed architecture. Equally as important is the reliable message broker which is the heart of ElasticCode’s execution model. Workflows execute functions just like any other python code, but those functions trigger persistent requests for ElasticCode to execute a remote task when the compute resources are available. The message broker handles all the runtime orchestration with compute nodes to carry out tasks.

ElasticCode System Architecture
Virtual Processors
ElasticCode introduces the notion of virtual processors that network together to form a reliable and distributed mesh topology for executing compute tasks.
ElasticCode Processors are object abstractions that capture the location, version and definition of python modules and functions via your own git repository. This trusted code model is important as it establishes the contract between your code, ElasticCode and virtual processors where the code is to be executed. This relationship must be strong and well-defined.
Via the various ElasticCode interfaces (CLI, API, Python etc) you define ElasticCode virtual processors. Agents (a kind of ElasticCode service) running across your network will deploy them and receive tasks to execute their code.
This type of service (or task) mesh architecture allows for fine-grained scalability characteristics that benefit the peformance and stability of the network.
At Scale Design
ElasticCode is designed to operate “at scale”, which means there is a one-to-one correspondence between logic compute units (processors) and physical compute units (CPU cores). This provides a number of obvious and inherent benefits such as hardware redundancy, high-availability, fault-tolerance, fail-over, performance and ease of maintenance.

ElasticCode At-Scale Task/CPU Architecture
Event Driven
ElasticCode is an event driven architecture from the bottom (data) to the top (ui). This design approach allows it to scale smoothly and not overconsume resources. Messages and notifications are sent when they are available which eliminates the need for long polling or similar resource intensive designs. Because ElasticCode is purely event driven, when there are no events, ElasticCode is respectful of system resources and can idle - allowing kernel schedulers and other system tasks to operate as needed.
Message-Oriented Execution Graphs
ElasticCode differs from other dataflow engines in that it is fully distributed and runs “at-scale” across heterogeneous infrastructure and computational resources.
It establishes a logical directed-graph (DG) overlay network across compute nodes and executes your custom processor scripts (python, node, bash).
Using the power of reliable, transactional messaging, compute tasks are never lost, discarded or undone. Fault tolerance and load-balancing are intrinsic qualities of ElasticCode and not tacked on as a separate process, which itself would be a failure point.
Execution Stack
There are various layers within ElasticCode that allow it to scale seamless and expose simple APIs that do powerful things behind the scenes. A quick glance at the lifecycle of a ElasticCode python task is below. Various qualities of service are offered by each layer, most of which are implied during a task invocation.

Micro-Scheduling
ElasticCode uses a scheduling design that will allow tasks to fully utilize the available CPUs in the ElasticCode network, if processors are created in the ElasticCode database. ElasticCode will never consume more resources than what is requested in its database. Although traditional batch scheduling design allows for blocks of compute resources to be dedicated to one task or flow, it comes at the expense of resource utilization and wait time for other requests. Micro-scheduling seeks to remedy this situation and provide better compute efficiency which means higher task throughput and more satisfied users.


A True Elastic Compute Platform
ElasticCode provides a set of interacting compute layers that control the location and execution of managed code assets. With ElasticCode, code modules and functions can be loaded at multiple locations and invoked from clients without knowledge of where those functions are or how those functions are executed.
Redundant code (processors) loaded into a ElasticCode network will be able to respond to higher volume of data and requests and thus can scale at will, individually.
Functional tasks (processors hosting code) are fronted by durable queues that deliver reliable invocations when those functions are present on the network, regardless of their exact location. This allows the system to be resilient to hardware or network changes, as well as influence by schedulers that might change the location of functions (processors) to re-balance the resources across the network.
All of this underlying management, hardware arriving and departing, services starting and stopping, processors moving from one host to another (or failing), is completely invisibile to the applications and clients using the system. To them, function calls will always, eventually be executed, if not immediately, in the near future when compute resources allow it.
System Benefits
The ElasticCode platform provides numerous benefits, only some of which are below.
A single, purpose-built platform that addresses end-to-end managed compute from the CPU to the end user. Compared to cobbled together frameworks.
Data flow and data streaming support
Real-time observable data across your compute resources
DevOps out-of-the-box - ElasticCode integrates directly with GIT allowing your existing code management practices to be used.
Elastic, At-Scale - ElasticCode is an elastic infrastructure, meaning that it scales up and down on-the-fly. Code can be moved across hardware locations at any time without data loss.
Extensible - ElasticCode is designed to be extended and specialized to your needs. Both the UI and the core platform is open and leverages modern framework design patterns to easily build on top of.
Ecosystem of Supported Roles
The ElasticCode compute environment is a seamless collaboration across disciplines with powerful, out-of-the-box tooling for everyone to manage their concerns, independent of the whole. Let’s quantify the previous sentence some. Let’s say you are in the middle of running a lengthy workflow, but elsewhere in the grid, hardware administrators need to replace hardware some of your tasks might be running on. With ElasticCode, your workflow would simply pause if it cannot find an active ElasticCode processor hosting the task (python function) it needs and when the hardware admins bring new hardware online, the ElasticCode agents resume and your workflow would continue running where it left off, seamlessly. Sounds amazing but it’s true!
Some of the roles that might participate in a ElasticCode network, directly or indirectly.
Hardware Admins
Infrastructure Admins
Compute Admins
Data Admins
Code Repository Owners
End Users
Powerful, Next-Gen UI
ElasticCode’s user interface is a powerful, next-gen no-code application that enpowers anyone to create fast, parallel workflows across ElasticCode’s distributed task mesh.



Goals
As the name suggests, ElasticCode is a spiritual offshoot of Apache NIFI except built using a python stack for running python (and other scripting languages) processors. However, ElasticCode is designed to be more broad in terms of design and scope which we will discuss below.
Some important design goals for this technology are:
Fault-Tolerant - ElasticCode runs as a distributed network of logical compute processors that have redundancy and load-balancing built in.
At-Scale - This phrase is important. It indicates that the logical constructs (e.g. pyfi processors) run at the scale of the hardware (e.g. CPU processors), meaning there is a 1-1 correlation (physical mapping) between hardware processors and pyfi processors.
Secure - All the functional components in ElasticCode (database, broker, storage, cache) have security built in.
Dynamic - The topology and behavior of a ElasticCode network can be adjusted and administered in real-time without taking down the entire network. Because ElasticCode is not a single VM controlling everything, you can add/remove update components without negatively impacting the functionality of the system.
Distributed - As was mentioned above, everything in ElasticCode is inherently distributed, down to the processors. There is no physical centralization of any kind.
Performance - ElasticCode is built on mature technology stack that is capable of high-throughput message traffic.
Reliability - The distributed queue paradigm used by ElasticCode allows for every processor in your dataflow to consume and acknowledge message traffic from its inbound queues and write to outbound queues. These durable queues persist while processors consume messages off them.
Scalability - Processors can scale across CPUs, Machines and networks, consuming message traffic off the same or multiple persistent queues. In fact, ElasticCode can auto-scale processors to accommodate the swell of tasks arriving on a queue. In addition, flow processors will be automatically balanced across physical locations to evenly distribute computational load and reduce local resource contention.
Pluggable Backends - ElasticCode supports various implementations of backend components such as message (e.g. RabbitMQ, SQS) or result storage (SQL, Redis, S3) in addition to allowing you to implement an entire backend (behind the SQL database) yourself.
Real-time Metrics - ElasticCode processors will support real-time broadcasting of data throughput metrics via subscription web-sockets. This will allow for all kinds of custom integrations and front-end visualizations to see what the network is doing.
Data Analysis - One of the big goals for ElasticCode is to save important data metrics about the flows and usages so it can be mined by predictive AI models later. This will give your organization key insights into the movement patterns of data.
GIT Integration - All the code used by processors can be pulled from your own git repositories giving you instant integration into existing devops and CM processes. ElasticCode will let you select which repo and commit version you want a processor to execute code from in your flows.
Use Cases
There are a wide variety of use-cases ElasticCode can address, a few of which are listed below.
Enterprise Workflow Automation - ElasticCode can design and execute dynamic workflows across heterogenous enterprise, leveraging a variety of data sources and services.
High Performance Computing - ElasticCode’s support for real-time streaming compute and parallel workflow execution lends itself to big-data and compute intensive tasks.
Enterprise DevOps - DevOps involves automated and repeatable pipelines for building software assets. ElasticCode’s flow models and distributed compute is a perfect fit for custom DevOps.
IoT and Factory Automation - Orchestrating across connected devices or machinery in a factory is easy to model with ElasticCode due to it’s dynamic and ad hoc workflow capability. Custom scripting allows for easy integration into existing device APIs.
AI & Machine Learning Modelling - Generating effective AI models requires obtaining and cleaning data from various sources, feature extraction, merging and training epochs. This is naturally a multi-step process that can be done visually with ElasticCode’s visual modelling tools.
Simulation - Simulation seeks to model real world processes and given a set of inputs, determine or predict certain target variables. These models are typically designed as a network of connected dependencies or entities along with environmental conditions that affect the simulation.
Decision Systems & Analytics - State-transition modelling is technique used at major companies that have to make important stochastic financial decisions using key business metrics. ElasticCode’s visual modeling and streaming compute capability allow for such models to be easily designed and customized, fully integrating into company databases, spreadsheets, accounting systems or other data sources.
Install
Quickstart
Bringing up the Stack
$ docker-compose up
Configuring Lambda Flow
$ flow --config
Database connection URI [postgresql://postgres:pyfi101@phoenix:5432/pyfi]:
Result backend URI [redis://localhost]:
Message broker URI [pyamqp://localhost]:
Configuration file created at /home/user/pyfi.ini
Initialize the Database
$ flow db init
Enabling security on table action
Enabling security on table event
Enabling security on table flow
Enabling security on table jobs
Enabling security on table log
Enabling security on table privilege
Enabling security on table queue
Enabling security on table queuelog
Enabling security on table role
Enabling security on table scheduler
Enabling security on table settings
Enabling security on table task
Enabling security on table user
Enabling security on table node
Enabling security on table processor
Enabling security on table role_privileges
Enabling security on table user_privileges
Enabling security on table user_roles
Enabling security on table agent
Enabling security on table plug
Enabling security on table socket
Enabling security on table call
Enabling security on table plugs_queues
Enabling security on table plugs_source_sockets
Enabling security on table plugs_target_sockets
Enabling security on table sockets_queues
Enabling security on table worker
Enabling security on table calls_events
Database create all schemas done.
The Flow CLI
$ flow
Usage: flow [OPTIONS] COMMAND [ARGS]...
Flow CLI for managing the pyfi network
Options:
--debug Debug switch
-d, --db TEXT Database URI
--backend TEXT Task queue backend
--broker TEXT Message broker URI
-i, --ini TEXT Flow .ini configuration file
-c, --config Configure pyfi
--help Show this message and exit.
Commands:
add Add an object to the database
agent Run pyfi agent
api API server admin
db Database operations
delete Delete an object from the database
listen Listen to a processor output
ls List database objects and their relations
node Node management operations
proc Run or manage processors
scheduler Scheduler management commands
task Pyfi task management
update Update a database object
web Web server admin
whoami Database login user
worker Run pyfi worker
Creating Your First Flow
Let’s look at the sequence of CLI commands needed to build out our flow infrastructure and execute a task. From scratch!
First thing we do below is create a queue. This provides the persistent message broker the definition it needs to allocate a message queue
by the same name for holding task messages.
Next we create a processor, which refers to our gitrepo and defines the module within that codebase we want to expose. It also defines the host where the processor should be run, but that is optional. We specific a concurrency value of 5 that indicates the scale for our processor. This means it will seek to occupy 5 CPUs, allowing it to run in parallel and respond to high-volume message traffic better.
Then we create sockets and attach them to our processor. The socket tells pyfi what specific python function we want to receive messages for and what queue it should use. Lastly, it indicates what processor to be attached to.
Finally, we can run our task and get the result.
$ flow add queue -n pyfi.queue1 -t direct
$ flow add processor -n proc1 -g https://github.com/radiantone/pyfi-processors -m pyfi.processors.sample -h localhost -c 5
$ flow add socket -n pyfi.processors.sample.do_something -q pyfi.queue1 -pn proc1 -t do_something
$ flow add socket -n pyfi.processors.sample.do_this -q pyfi.queue1 -pn proc1 -t do_this
$ flow task run --socket pyfi.processors.sample.do_this --data "['some data']"
Do this: ['some data']
Creating Sockets
Sockets represent addressable endpoints for python functions hosted by a processor. Remember, the processor points to a gitrepo and defines a python module within that repo. The socket defines the task (or python function) within the processor python module. Thus, a single processor can have many sockets associated with it. Sockets also declare a queue they will use to pull their requests from. This allows calls to tasks to be durable and reliable.
The following extract from the above flow defines a socket, gives it a name pyfi.processors.sample.do_something
, declares the queue pyfi.queue1
, associates it with processor named proc1
and represents the python function/task do_something
.
$ flow add socket -n pyfi.processors.sample.do_something -q pyfi.queue1 -pn proc1 -t do_something
Defining Socket Functions
Once you’ve built out your flow and infrastructure to support it, you can create convenient types that represent your python functions via the Socket class.
For the parallel flow above, we import the .p (or partial) signature from this file, which comes from our Socket we created earlier named pyfi.processors.sample.do_something
.
Remember, the socket captures the module (from its parent Processor) and function name within that module you want to run. Think of it like an endpoint with a queue in front of it.
We take one step further in the file below and rename Socket class to Function simply as a linguistic preference in this context.
from pyfi.client.api import Socket as Function
do_something = Function(name='pyfi.processors.sample.do_something')
do_something_p = do_something.p
do_this = Function(name='pyfi.processors.sample.do_this')
do_this_p = do_this.p
Once we’ve created our function definitions above, we can use them like normal python functions as in the parallel workflow below!
Executing Socket Functions
Executing socket functions from python is very easy. Since we can create the socket ahead of time, we only need to refer to it by name as above.
from pyfi.client.examples.api import do_something_p as do_something
do_something("Some text!")
The just invoke the function reference as you normally would. If you are using the function within a parallel API structure such as parallel
, pipeline
, funnel
etc then you should use the partial
(.p, _p) version of the function signature.
This allows ElasticCode to add arguments to the task when it is invoked. The invocation is deferred so it doesn’t happen at the time you declare your workflow. The reason is because your task will execute on thos remote CPU at a time when the workflow reaches that task.
So the .p partial is a signature
for your task in that respect.
Running a Parallel Workflow
from pyfi.client.api import parallel, pipeline, funnel
from pyfi.client.example.api import do_something_p as do_something
# Create a pipeline that executes tasks sequentially, passing result to next task
_pipeline = pipeline([
do_something("One"),
do_something("Two"),
# Create a parallel structure that executes tasks in parallel and returns the
# result list
parallel([
do_something("Four"),
do_something("Five"),
]),
do_something("Three")])
# Create another parallel structure using the above pipeline as one of its tasks
_parallel = parallel([
_pipeline,
do_something("Six"),
do_something("Seven")])
# Create a funnel structure that executes all its tasks passing the result to the
# single, final task
_funnel = funnel([
do_something("Eight"),
_parallel,
do_something("Nine")])
# Gather the result from the _funnel and send it to do_something("Four")
print("FUNNEL: ", _funnel(do_something("Four")).get())
Data Flows
ElasticCode provides a unique and easy way to deploy distributed data flows (sometimes called workflows). These flows are contructed by using the ElasticCode object model and linking them together.
To review the ElasticCode object model, we have the following taxonomy used in an ElasticCode network.
- Nodes
- Agents
- Workers
- Processors
- Sockets
- Tasks
-Arguments
For a given processor, multiple sockets can be exposed that allow incoming requests to different functions (tasks) within the processors python module code. Links between outputs of one socket and inputs of another are established using Plugs.
Each Plug has a source socket and a target socket, such that when the function associated with the source socket completes, its output is used as input to the target socket function. These requests persist on a queue and execute in an orderly fashion to not stress resources. Since processors are bound to one or more CPUs, they can service requests in parallel but will only execute requests when resources are free to do so.
Because functions are coupled into data flows using loose coupling, you are able to change the topology of your data flow anytime. Execution will follow the path of the current dataflow.
When connecting a Plug to a target Socket, you can specify a specific argument for the target function that the plug is connected to.
For example, consider this target function:
def add_two(one, two):
return one+two
diagram
It has two arguments one and two by name. You might have a data flow with two separate inputs to add_two where one plug satisfies the one argument and the other plug satisfies the two argument. In this design,`add_two` will only trigger once both arguments have arrived at the socket. This means arguments can arrive at different times and different orders.
Architecture
ElasticCode is a scalable, high-performance network architecture that separates concerns across layers. Each layer has best-of-breed components that manage the responsibility of that layer. The slides below show the different layers and their responsibilities, starting with the bottom-most layer.
Managed Compute
ElasticCode takes a different approach to staging and executing python code on its network. Other frameworks or libraries allow you to define your functions in your execution environment and serialize that code to remote workers for execution. Obviously that has some serious security implications in a shared, managed compute environment. So ElasticCode does not allow this. Rather, you request ElasticCode to mount your code through a secure git repostiory URL. This becomes the contract between you and ElasticCode and allows ElasticCode to securely load your code into its network.
This approach also allows administrators to control white and blacklists for what repositories of code it trusts.
Code Isolation
Each ElasticCode worker that mounts a git repository, will create a virtual environment for that code and execute the repositories setup.py to install the code in that virtual environment. This is beneficial for a number of reasons, but most importantly it keeps the environment for the mounted code separate from the ElasticCode agent’s python environment.
Layered Design
ElasticCode is a distributed, scalable architecture and as such it is relationship between connected hardware & service layers interacting as a whole.




Database
Data Model

Servers
CLI
Examples
$ flow
Usage: flow [OPTIONS] COMMAND [ARGS]...
CLI for creating & managing flow networks
Options:
--debug Debug switch
-d, --db TEXT Database URI
--backend TEXT Task queue backend
--broker TEXT Message broker URI
-a, --api TEXT Message broker API URI
-u, --user TEXT Message broker API user
-p, --password TEXT Message broker API password
-i, --ini TEXT flow .ini configuration file
-c, --config Configure pyfi
--help Show this message and exit.
Commands:
add Add an object to the database
agent Commands for remote agent management
api API server admin
compose Manage declarative infrastructure files
db Database operations
delete Delete an object from the database
listen Listen to a processor output
login Log into flow CLI
logout Logout current user
ls List database objects and their relations
network Network operations
node Node management operations
proc Run or manage processors
scheduler Scheduler management commands
task Pyfi task management
update Update a database object
user User commands
web Web server admin
whoami Database login user
worker Run pyfi worker
Database
$ flow db
Usage: flow db [OPTIONS] COMMAND [ARGS]...
Database operations
Options:
--help Show this message and exit.
Commands:
drop Drop all database tables
init Initialize database tables
json Dump the database to JSON
migrate Perform database migration/upgrade
rebuild Drop and rebuild database tables
Objects
There are numerous objects within an ElasticCode network. Some are infrastructure related, others are service related. Using the ElasticCode CLI you create, update and manage these objects in the database, which acts as a single source of truth for the entire ElasticCode network. All the deployed ElasticCode services (e.g. agents) react to changes in the ElasticCode database. So you could say that ElasticCode is reactive on a distributed, network-scale.
Some of the system objects and CLI commands are shown below.
Queues
$ flow add queue --help
Usage: flow add queue [OPTIONS]
Add queue object to the database
Options:
-n, --name TEXT [required]
-t, --type [topic|direct|fanout]
[default: direct; required]
--help Show this message and exit.
Processors
$ flow add processor --help
Usage: flow add processor [OPTIONS]
Add processor to the database
Options:
-n, --name TEXT Name of this processor [required]
-m, --module TEXT Python module (e.g. some.module.path
[required]
-h, --hostname TEXT Target server hostname
-w, --workers INTEGER Number of worker tasks
-r, --retries INTEGER Number of retries to invoke this processor
-g, --gitrepo TEXT Git repo URI [required]
-c, --commit TEXT Git commit id for processor code
-rs, --requested_status TEXT The requested status for this processor
-b, --beat Enable the beat scheduler
-br, --branch TEXT Git branch to be used for checkouts
-p, --password TEXT Password to access this processor
-rq, --requirements TEXT requirements.txt file
-e, --endpoint TEXT API endpoint path
-a, --api BOOLEAN Has an API endpoint
-cs, --cpus INTEGER Number of CPUs for default deployment
-d, --deploy Enable the beat scheduler
-mp, --modulepath TEXT Relative repo path to python module file
--help Show this message and exit.
$ flow proc
Usage: flow proc [OPTIONS] COMMAND [ARGS]...
Run or manage processors
Options:
--id TEXT ID of processor
--help Show this message and exit.
Commands:
pause Pause a processor
remove Remove a processor
restart Start a processor
resume Pause a processor
start Start a processor
stop Stop a processor
Calls
$ flow ls calls --help
Usage: flow ls calls [OPTIONS]
List calls
Options:
-p, --page INTEGER
-r, --rows INTEGER
-u, --unfinished
-a, --ascend
-i, --id
-t, --tracking
-tk, --task
--help Show this message and exit.
$ flow ls calls
+------+-----+-------------------------------------+--------------------------------------+----------+----------------------------+-------------------------------------+----------------------------+----------------------------+----------+
| Page | Row | Name | ID | Owner | Last Updated | Socket | Started | Finished | State |
+------+-----+-------------------------------------+--------------------------------------+----------+----------------------------+-------------------------------------+----------------------------+----------------------------+----------+
| 1 | 1 | pyfi.processors.sample.do_this | e3f73300-f3fd-4230-ba11-258d4f5a17f4 | postgres | 2021-09-13 19:30:19.933346 | pyfi.processors.sample.do_this | 2021-09-13 19:30:19.903573 | 2021-09-13 19:30:19.932491 | finished |
| 1 | 2 | pyfi.processors.sample.do_something | e3bf09c5-ae45-4772-b301-c394acae3c4e | postgres | 2021-09-13 19:30:19.885993 | pyfi.processors.sample.do_something | 2021-09-13 19:30:19.847282 | 2021-09-13 19:30:19.885440 | finished |
| 1 | 3 | pyfi.processors.sample.do_this | a58de16a-1b92-4acb-81c1-92e81cb6ea56 | postgres | 2021-09-13 19:29:49.944219 | pyfi.processors.sample.do_this | 2021-09-13 19:29:49.917225 | 2021-09-13 19:29:49.943415 | finished |
| 1 | 4 | pyfi.processors.sample.do_something | 58df162a-ac2e-40b7-9e27-635c61a4d9a7 | postgres | 2021-09-13 19:29:49.868975 | pyfi.processors.sample.do_something | 2021-09-13 19:29:49.820097 | 2021-09-13 19:29:49.868109 | finished |
| 1 | 5 | pyfi.processors.sample.do_this | 60d8b91d-1b8b-433c-a289-5704856d37d1 | postgres | 2021-09-13 19:29:19.907705 | pyfi.processors.sample.do_this | 2021-09-13 19:29:19.880742 | 2021-09-13 19:29:19.906931 | finished |
| 1 | 6 | pyfi.processors.sample.do_something | 66c78849-9052-48d0-ae62-59942d544096 | postgres | 2021-09-13 19:29:19.861880 | pyfi.processors.sample.do_something | 2021-09-13 19:29:19.824456 | 2021-09-13 19:29:19.861330 | finished |
| 1 | 7 | pyfi.processors.sample.do_this | e5189a71-9805-492e-a8d7-e5eb2b8d68d3 | postgres | 2021-09-13 19:28:49.873301 | pyfi.processors.sample.do_this | 2021-09-13 19:28:49.842724 | 2021-09-13 19:28:49.872176 | finished |
| 1 | 8 | pyfi.processors.sample.do_something | 35fd3635-743a-4015-acfe-c5a8f62ef65d | postgres | 2021-09-13 19:28:49.812921 | pyfi.processors.sample.do_something | 2021-09-13 19:28:49.789503 | 2021-09-13 19:28:49.812406 | finished |
| 1 | 9 | pyfi.processors.sample.do_this | 4136ebe2-ee96-4b74-ba0e-33d8c5974252 | postgres | 2021-09-13 19:28:19.830508 | pyfi.processors.sample.do_this | 2021-09-13 19:28:19.805839 | 2021-09-13 19:28:19.829667 | finished |
| 1 | 10 | pyfi.processors.sample.do_something | 707f18c5-5708-4c70-81fb-ca0afb30e28b | postgres | 2021-09-13 19:28:19.789542 | pyfi.processors.sample.do_something | 2021-09-13 19:28:19.764792 | 2021-09-13 19:28:19.788999 | finished |
+------+-----+-------------------------------------+--------------------------------------+----------+----------------------------+-------------------------------------+----------------------------+----------------------------+----------+
Page 1 of 383 of 3830 total records
$ flow ls call --help
Usage: flow ls call [OPTIONS]
List details about a call record
Options:
--id TEXT ID of call
-n, --name TEXT Name of call
-r, --result Include result of call
-t, --tree Show forward call tree
-g, --graph Show complete call graph
-f, --flow Show all calls in a workflow
--help Show this message and exit.
$ flow ls call --id e3bf09c5-ae45-4772-b301-c394acae3c4e
+-------------------------------------+--------------------------------------+----------+----------------------------+-------------------------------------+----------------------------+----------------------------+----------+
| Name | ID | Owner | Last Updated | Socket | Started | Finished | State |
+-------------------------------------+--------------------------------------+----------+----------------------------+-------------------------------------+----------------------------+----------------------------+----------+
| pyfi.processors.sample.do_something | e3bf09c5-ae45-4772-b301-c394acae3c4e | postgres | 2021-09-13 19:30:19.885993 | pyfi.processors.sample.do_something | 2021-09-13 19:30:19.847282 | 2021-09-13 19:30:19.885440 | finished |
+-------------------------------------+--------------------------------------+----------+----------------------------+-------------------------------------+----------------------------+----------------------------+----------+
Provenance
+--------------------------------------+-------------+-------------+
| Task | Task Parent | Flow Parent |
+--------------------------------------+-------------+-------------+
| a13ba1e7-78f9-4644-9c29-696dfd89e9e4 | None | None |
+--------------------------------------+-------------+-------------+
Events
+----------+--------------------------------------+----------+----------------------------+-----------------------------------------------------+
| Name | ID | Owner | Last Updated | Note |
+----------+--------------------------------------+----------+----------------------------+-----------------------------------------------------+
| received | 8e8845d5-cd32-40d9-93c7-e95f7500926c | postgres | 2021-09-13 19:30:19.844512 | Received task pyfi.processors.sample.do_something |
| prerun | a2507cd1-1d72-4ad1-be74-375aac29f1c4 | postgres | 2021-09-13 19:30:19.874789 | Prerun for task pyfi.processors.sample.do_something |
| postrun | f8b5ff03-e0e3-467d-9257-a682f0865581 | postgres | 2021-09-13 19:30:19.886504 | Postrun for task |
+----------+--------------------------------------+----------+----------------------------+-----------------------------------------------------+
$ flow ls call --id e3bf09c5-ae45-4772-b301-c394acae3c4e --tree
pyfi.processors.sample.do_something
└────────────────────┐
pyfi.processors.sample.do_this
Listening
The listen command allows you to listen to the pubsub channels associated with queues and processors. A kind of network sniffer that displays in real-time the various message traffic, compute results, lifecycle events etc. You can provide your own custom class to receive the results which is designed to provide a simple and loosely coupled mechanism for system integrations.
$ flow listen --help
Usage: flow listen [OPTIONS]
Listen to a processor output
Options:
-n, --name TEXT Name of processor [required]
-c, --channel TEXT Listen channel (e.g. task, log, etc) [required]
-a, --adaptor TEXT Adaptor class function (e.g. my.module.class.function)
--help Show this message and exit.
$ flow listen --name pyfi.queue1.proc1 --channel task
Listening to pyfi.queue1.proc1
{'type': 'psubscribe', 'pattern': None, 'channel': b'pyfi.queue1.proc1.task', 'data': 1}
{'type': 'pmessage', 'pattern': b'pyfi.queue1.proc1.task', 'channel': b'pyfi.queue1.proc1.task', 'data': b'{"channel": "task", "state": "received", "date": "2021-09-13 19:37:20.094443", "room": "pyfi.queue1.proc1"}'}
{'type': 'pmessage', 'pattern': b'pyfi.queue1.proc1.task', 'channel': b'pyfi.queue1.proc1.task', 'data': b'{"channel": "task", "state": "running", "date": "2021-09-13 19:37:20.108668", "room": "pyfi.queue1.proc1"}'}
{'type': 'pmessage', 'pattern': b'pyfi.queue1.proc1.task', 'channel': b'pyfi.queue1.proc1.task', 'data': b'{"module": "pyfi.processors.sample", "date": "2021-09-13 19:37:20.133327", "resultkey": "celery-task-meta-b3feb181-484d-4b98-aba8-daabd07ee3d1", "message": "{\\"module\\": \\"pyfi.processors.sample\\", \\"date\\": \\"2021-09-13 19:37:20.133327\\", \\"resultkey\\": \\"celery-task-meta-b3feb181-484d-4b98-aba8-daabd07ee3d1\\", \\"message\\": \\"\\\\\\"\\\\\\\\\\\\\\"Message:Hello World!\\\\\\\\\\\\\\"\\\\\\"\\", \\"channel\\": \\"task\\", \\"room\\": \\"pyfi.queue1.proc1\\", \\"task\\": \\"do_something\\"}", "channel": "task", "room": "pyfi.queue1.proc1", "task": "do_something", "state": "postrun"}'}
Running an Agent
$ flow agent
Usage: flow agent [OPTIONS] COMMAND [ARGS]...
Run flow agent
Options:
--help Show this message and exit.
Commands:
start Run pyfi agent server
$ flow agent start --help
Usage: flow agent start [OPTIONS]
Start an agent
Options:
-p, --port INTEGER Healthcheck port
--clean Remove work directories before launch
-b, --backend TEXT Message backend URI
-r, --broker TEXT Message broker URI
-n, --name TEXT Hostname for this agent to use
-c, --config TEXT Config module.object import (e.g.
path.to.module.MyConfigClass
-q, --queues Run the queue monitor only
-u, --user TEXT Run the worker as user
-po, --pool INTEGER Process pool for message dispatches
-cp, --cpus INTEGER Number of CPUs
-s, --size INTEGER Maximum number of messages on worker internal
queue
-h, --host TEXT Remote hostname to start the agent via ssh
-wp, --workerport INTEGER Healthcheck port for worker
--help Show this message and exit.
Roles & Users
$ flow add user --help
Usage: flow add user [OPTIONS]
Add user object to the database
Options:
-n, --name TEXT [required]
-e, --email TEXT [required]
-p, --password TEXT [required]
--help Show this message and exit.
$ flow add role --help
Usage: flow add role [OPTIONS]
Add role object to the database
Options:
-n, --name TEXT [required]
--help Show this message and exit.
$ flow add privilege --help
Usage: flow add privilege [OPTIONS]
Add privilege to the database
Options:
-u, --user TEXT
-n, --name TEXT [required]
-r, --role TEXT
--help Show this message and exit.
$ flow add user
Name: joe
Email: joe@xyz
Password: 12345
CREATE USER joe WITH PASSWORD '12345'
User "joe" added
$ flow add role -n developer
bc15ee9d-a208-43a9-82d2-bf0810dc4380:developer:2021-09-15 21:50:40.714192
$ flow add privilege -u joe -n ADD_PROCESSOR
Privilege added
$ flow ls user -n joe
+------+--------------------------------------+----------+---------+
| Name | ID | Owner | Email |
+------+--------------------------------------+----------+---------+
| joe | a8dcf9bb-c821-4d44-82f5-828dceb4cb23 | postgres | joe@xyz |
+------+--------------------------------------+----------+---------+
Privileges
+------+---------------+----------------------------+----------+
| Name | Right | Last Updated | By |
+------+---------------+----------------------------+----------+
| joe | ADD_PROCESSOR | 2021-09-15 21:46:48.611286 | postgres |
+------+---------------+----------------------------+----------+
$ flow add privilege -r developer -n ADD_PROCESSOR
Privilege added
Privileges & Rights
A right is an atomic string that names a particular privilege. It only becomes a privilege when it’s associated with a user. When it’s just a name we call it a right.
rights = ['ALL',
'CREATE',
'READ',
'UPDATE',
'DELETE',
'DB_DROP',
'DB_INIT',
'START_AGENT',
'RUN_TASK',
'CANCEL_TASK',
'START_PROCESSOR',
'STOP_PROCESSOR',
'PAUSE_PROCESSOR',
'RESUME_PROCESSOR',
'LOCK_PROCESSOR',
'UNLOCK_PROCESSOR',
'VIEW_PROCESSOR',
'VIEW_PROCESSOR_CONFIG',
'VIEW_PROCESSOR_CODE',
'EDIT_PROCESSOR_CONFIG',
'EDIT_PROCESSOR_CODE'
'LS_PROCESSORS',
'LS_USERS',
'LS_USER',
'LS_PLUGS',
'LS_SOCKETS',
'LS_QUEUES',
'LS_AGENTS',
'LS_NODES',
'LS_SCHEDULERS',
'LS_WORKERS',
'ADD_PROCESSOR',
'ADD_AGENT',
'ADD_NODE',
'ADD_PLUG',
'ADD_PRIVILEGE',
'ADD_QUEUE',
'ADD_ROLE',
'ADD_SCHEDULER',
'ADD_SOCKET',
'ADD_USER',
'UPDATE_PROCESSOR',
'UPDATE_AGENT',
'UPDATE_NODE',
'UPDATE_PLUG',
'UPDATE_PRIVILEGE',
'UPDATE_QUEUE',
'UPDATE_ROLE',
'UPDATE_SCHEDULER',
'UPDATE_SOCKET',
'UPDATE_USER',
'DELETE_PROCESSOR',
'DELETE_AGENT',
'DELETE_NODE',
'DELETE_PLUG',
'DELETE_PRIVILEGE',
'DELETE_QUEUE',
'DELETE_ROLE',
'DELETE_SCHEDULER',
'DELETE_SOCKET',
'DELETE_USER',
'READ_PROCESSOR',
'READ_AGENT',
'READ_NODE',
'READ_PLUG',
'READ_PRIVILEGE',
'READ_QUEUE',
'READ_ROLE',
'READ_SCHEDULER',
'READ_SOCKET',
'READ_USER'
]
UI











API
CLI
See section on CLI
Python
Decorators
from pyfi.client.api import ProcessorBase
from pyfi.client.decorators import plug, processor, socket
@processor(
name="proc2",
gitrepo=os.environ["GIT_REPO"],
module="pyfi.processors.sample",
concurrency=1,
)
class ProcessorB(ProcessorBase):
"""Description"""
@socket(
name="proc2.do_this",
processor="proc2",
arguments=True,
queue={"name": "sockq2"},
)
def do_this(message):
from random import randrange
print("Do this!", message)
message = "Do this String: " + str(message)
graph = {
"tag": {"name": "tagname", "value": "tagvalue"},
"name": "distance",
"value": randrange(50),
}
return {"message": message, "graph": graph}
@processor(
name="proc1",
gitrepo=os.environ["GIT_REPO"],
module="pyfi.processors.sample",
concurrency=7,
)
class ProcessorA(ProcessorBase):
"""Description"""
def get_message(self):
return "Self message!"
@plug(
name="plug1",
target="proc2.do_this", # Must be defined above already (prevents cycles)
queue={
"name": "queue1",
"message_ttl": 300000,
"durable": True,
"expires": 200,
},
)
@socket(
name="proc1.do_something",
processor="proc1",
beat=False,
interval=15,
queue={"name": "sockq1"},
)
def do_something(message):
"""do_something"""
from random import randrange
message = "TEXT:" + str(message)
graph = {
"tag": {"name": "tagname", "value": "tagvalue"},
"name": "temperature",
"value": randrange(10),
}
return {"message": message, "graph": graph}
@processor(
name="proc3",
gitrepo=os.environ["GIT_REPO"],
module="pyfi.processors.sample",
concurrency=1,
)
class ProcessorC(ProcessorBase):
"""Description"""
def get_message(self):
return "Self message!"
@plug(
name="plug2",
target="proc2.do_this", # Must be defined above already (prevents cycles)
queue={
"name": "queue2",
"message_ttl": 300000,
"durable": True,
"expires": 200,
},
)
@socket(
name="proc3.do_something",
processor="proc3",
beat=False,
interval=5,
queue={"name": "sockq3"},
)
def do_something(message):
"""do_something"""
from random import randrange
message = "TEXT2:" + str(message)
graph = {
"tag": {"name": "tagname", "value": "tagvalue"},
"name": "temperature",
"value": randrange(10),
}
return {"message": message, "graph": graph}
if __name__ == "__main__":
print("Network created.")
Objects
from pyfi.client.api import Plug, Processor, Socket
from pyfi.client.user import USER
from pyfi.db.model import AlchemyEncoder
# Log in a user first
print("USER", USER)
# Create a processor
processor = Processor(
name="proc1",
beat=True,
user=USER,
module="pyfi.processors.sample",
branch="main",
concurrency=6,
gitrepo="https://user:key@github.com/radiantone/pyfi-processors#egg=pyfi-processor",
)
processor2 = Processor(
name="proc2",
user=USER,
module="pyfi.processors.sample",
hostname="agent1",
concurrency=6,
branch="main",
gitrepo="https://user:key@github.com/radiantone/pyfi-processors#egg=pyfi-processor",
)
processor3 = Processor(
name="proc3",
user=USER,
module="pyfi.processors.sample",
hostname="agent2",
concurrency=6,
branch="main",
gitrepo="https://user:pword@github.com/radiantone/pyfi-processors#egg=pyfi-processor",
)
# Create a socket on the processor to receive requests for the do_something python function(task)
do_something = Socket(
name="pyfi.processors.sample.do_something",
user=USER,
interval=5,
processor=processor,
queue={"name": "pyfi.queue1"},
task="do_something",
)
print(json.dumps(do_something.socket, indent=4, cls=AlchemyEncoder))
# Create a socket on the processor to receive requests for the do_this python function(task)
do_this = Socket(
name="pyfi.processors.sample.do_this",
user=USER,
processor=processor2,
queue={"name": "pyfi.queue2"},
task="do_this",
)
do_this2 = Socket(
name="pyfi.processors.sample.do_this",
user=USER,
processor=processor3,
queue={"name": "pyfi.queue3"},
task="do_this",
)
do_something2 = Socket(
name="proc2.do_something",
user=USER,
processor=processor2,
queue={"name": "pyfi.queue1"},
task="do_something",
)
# Create a plug that connects one processor to a socket of another
plug = Plug(
name="plug1",
processor=processor,
user=USER,
source=do_something,
queue={"name": "pyfi.queue3"},
target=do_this,
)
Lambda
from pyfi.client.api import funnel, parallel, pipeline
from pyfi.client.example.api import do_something_p as do_something
from pyfi.client.example.api import do_this_p as do_this
"""
An example app on top of pyfi. References existing infrastructure and then runs complex workflows and parallel operations on it
"""
_pipeline = pipeline(
[
do_something("One"),
do_something("Two"),
parallel(
[
do_this("Four"),
do_this("Five"),
]
),
do_this("Three"),
]
)
print(_pipeline().get())
_parallel = parallel([_pipeline, do_something("Six"), do_something("Seven")])
_funnel = funnel(
[do_something("Eight"), _parallel, do_something("Nine")], do_something("A")
)
_funnel2 = funnel([_parallel, do_something("Ten")], do_something("B"))
_funnel3 = funnel([_funnel, _funnel2])
result = _funnel3(do_something("Eleven"))
print("FUNNEL: ", result.get())
ORM
from oso import Oso
from sqlalchemy import (
Boolean,
Column,
DateTime,
Enum,
Float,
ForeignKey,
Integer,
LargeBinary,
String,
Table,
Text,
and_,
literal_column,
)
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.orm import declarative_base, foreign, relationship
from sqlalchemy.schema import CreateColumn
Base: Any = declarative_base(name="Base")
oso = Oso()
@compiles(CreateColumn, "postgresql")
def use_identity(element, compiler, **kw):
text = compiler.visit_create_column(element, **kw)
text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
return text
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
from datetime import datetime
if isinstance(obj.__class__, DeclarativeMeta):
# an SQLAlchemy class
fields = {}
for field in [
x for x in dir(obj) if not x.startswith("_") and x != "metadata"
]:
data = obj.__getattribute__(field)
try:
# this will fail on non-encodable values, like other classes
if type(data) is datetime:
data = str(data)
json.dumps(data)
fields[field] = data
except TypeError:
fields[field] = None
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
class HasLogins(object):
@declared_attr
def logins(cls):
return relationship(
"LoginModel",
order_by="desc(LoginModel.created)",
primaryjoin=lambda: and_(foreign(LoginModel.user_id) == cls.id),
lazy="select",
)
class HasLogs(object):
@declared_attr
def logs(cls):
return relationship(
"LogModel",
order_by="desc(LogModel.created)",
primaryjoin=lambda: and_(
foreign(LogModel.oid) == cls.id,
LogModel.discriminator == cls.__name__,
),
lazy="select",
)
class BaseModel(Base):
"""
Docstring
"""
__abstract__ = True
id = Column(
String(40),
autoincrement=False,
default=literal_column("uuid_generate_v4()"),
unique=True,
primary_key=True,
)
name = Column(String(80), unique=True, nullable=False, primary_key=True)
owner = Column(String(40), default=literal_column("current_user"))
status = Column(String(20), nullable=False, default="ready")
requested_status = Column(String(40), default="ready")
enabled = Column(Boolean)
created = Column(DateTime, default=datetime.now, nullable=False)
lastupdated = Column(
DateTime, default=datetime.now, onupdate=datetime.now, nullable=False
)
def __repr__(self):
return json.dumps(self, cls=AlchemyEncoder)
class LogModel(Base):
"""
Docstring
"""
__tablename__ = "log"
id = Column(
String(40),
autoincrement=False,
default=literal_column("uuid_generate_v4()"),
unique=True,
primary_key=True,
)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", lazy=True)
public = Column(Boolean, default=False)
created = Column(DateTime, default=datetime.now, nullable=False)
oid = Column(String(40), primary_key=True)
discriminator = Column(String(40))
text = Column(String(80), nullable=False)
source = Column(String(40), nullable=False)
def __repr__(self):
return json.dumps(self, cls=AlchemyEncoder)
rights = [
"ALL",
"CREATE",
"READ",
"UPDATE",
"DELETE",
"DB_DROP",
"DB_INIT",
"START_AGENT",
"RUN_TASK",
"CANCEL_TASK",
"START_PROCESSOR",
"STOP_PROCESSOR",
"PAUSE_PROCESSOR",
"RESUME_PROCESSOR",
"LOCK_PROCESSOR",
"UNLOCK_PROCESSOR",
"VIEW_PROCESSOR",
"VIEW_PROCESSOR_CONFIG",
"VIEW_PROCESSOR_CODE",
"EDIT_PROCESSOR_CONFIG",
"EDIT_PROCESSOR_CODE" "LS_PROCESSORS",
"LS_USERS",
"LS_USER",
"LS_PLUGS",
"LS_SOCKETS",
"LS_QUEUES",
"LS_AGENTS",
"LS_NODES",
"LS_SCHEDULERS",
"LS_WORKERS",
"ADD_PROCESSOR",
"ADD_AGENT",
"ADD_NODE",
"ADD_PLUG",
"ADD_PRIVILEGE",
"ADD_QUEUE",
"ADD_ROLE",
"ADD_SCHEDULER",
"ADD_SOCKET",
"ADD_USER",
"UPDATE_PROCESSOR",
"UPDATE_AGENT",
"UPDATE_NODE",
"UPDATE_PLUG",
"UPDATE_ROLE",
"UPDATE_SCHEDULER",
"UPDATE_SOCKET",
"UPDATE_USER",
"DELETE_PROCESSOR",
"DELETE_AGENT",
"DELETE_NODE",
"DELETE_PLUG",
"DELETE_PRIVILEGE",
"DELETE_QUEUE",
"DELETE_ROLE",
"DELETE_SCHEDULER",
"DELETE_SOCKET",
"DELETE_USER",
"READ_PROCESSOR",
"READ_AGENT",
"READ_NODE",
"READ_LOG",
"READ_PLUG",
"READ_PRIVILEGE",
"READ_QUEUE",
"READ_ROLE",
"READ_SCHEDULER",
"READ_SOCKET",
"READ_USER",
]
class PrivilegeModel(BaseModel):
"""
Docstring
"""
__tablename__ = "privilege"
right = Column("right", Enum(*rights, name="right"))
role_privileges = Table(
"role_privileges",
Base.metadata,
Column("role_id", ForeignKey("role.id")),
Column("privilege_id", ForeignKey("privilege.id")),
)
class RoleModel(BaseModel):
"""
Docstring
"""
__tablename__ = "role"
privileges = relationship(
"PrivilegeModel", secondary=role_privileges, lazy="subquery"
)
user_privileges_revoked = Table(
"user_privileges_revoked",
Base.metadata,
Column("user_id", ForeignKey("users.id")),
Column("privilege_id", ForeignKey("privilege.id")),
)
user_privileges = Table(
"user_privileges",
Base.metadata,
Column("user_id", ForeignKey("users.id")),
Column("privilege_id", ForeignKey("privilege.id")),
)
user_roles = Table(
"user_roles",
Base.metadata,
Column("user_id", ForeignKey("users.id")),
Column("role_id", ForeignKey("role.id")),
)
class UserModel(HasLogins, BaseModel):
"""
Docstring
"""
__tablename__ = "users"
email = Column(String(120), unique=True, nullable=False)
password = Column(String(60), unique=False, nullable=False)
clear = Column(String(60), unique=False, nullable=False)
privileges = relationship(
"PrivilegeModel", secondary=user_privileges, lazy="subquery"
)
revoked = relationship(
"PrivilegeModel", secondary=user_privileges_revoked, lazy="subquery"
)
roles = relationship("RoleModel", secondary=user_roles, lazy="subquery")
socket_types = ["RESULT", "ERROR"]
plug_types = ["RESULT", "ERROR"]
schedule_types = ["CRON", "INTERVAL"]
strategies = ["BALANCED", "EFFICIENT"]
class FileModel(BaseModel):
__tablename__ = "file"
path = Column(String(120))
filename = Column(String(80))
collection = Column(String(80))
code = Column(Text)
type = Column(String(40))
icon = Column(String(40))
versions = relationship(
"VersionModel", back_populates="file", cascade="all, delete-orphan"
)
flows_versions = Table(
"flows_versions",
Base.metadata,
Column("flow_id", ForeignKey("flow.id"), primary_key=True),
Column("version_id", ForeignKey("versions.id"), primary_key=True),
)
class FlowModel(BaseModel):
"""
A flow model
"""
__tablename__ = "flow"
# Collection of processors within this flow. A processor can reside
# in multiple flows at once
processors = relationship("ProcessorModel", lazy=True)
# File reference for this flow. i.e. it's saved state
file_id = Column(String, ForeignKey("file.id"), nullable=False)
file = relationship(
"FileModel", lazy=True, cascade="all, delete-orphan", single_parent=True
)
# List of versions associated with this flow
versions = relationship("VersionModel", secondary=flows_versions, lazy=True)
class AgentModel(BaseModel):
"""
Docstring
"""
__tablename__ = "agent"
hostname = Column(String(60))
cpus = Column(Integer)
port = Column(Integer)
pid = Column(Integer)
workers = relationship(
"WorkerModel", backref="agent", lazy=True, cascade="all, delete-orphan"
)
node_id = Column(String(40), ForeignKey("node.id"), nullable=False)
class ActionModel(BaseModel):
"""
Docstring
"""
__tablename__ = "action"
params = Column(String(80))
# host, worker, processor, queue, or all
target = Column(String(20), nullable=False)
class WorkerModel(BaseModel):
"""
Docstring
"""
__tablename__ = "worker"
backend = Column(String(40), nullable=False)
broker = Column(String(40), nullable=False)
concurrency = Column(Integer)
process = Column(Integer)
port = Column(Integer)
hostname = Column(String(60))
workerdir = Column(String(256))
processor = relationship("ProcessorModel")
processor_id = Column(
String(40), ForeignKey("processor.id", ondelete="CASCADE"), nullable=False
)
deployment_id = Column(String(40), ForeignKey("deployment.id"), nullable=True)
deployment = relationship("DeploymentModel", back_populates="worker")
agent_id = Column(String(40), ForeignKey("agent.id"), nullable=False)
# agent = relationship("AgentModel", back_populates="worker")
class ContainerModel(BaseModel):
__tablename__ = "container"
container_id = Column(String(80), unique=True, nullable=False)
class VersionModel(Base):
__tablename__ = "versions"
id = Column(
String(40),
autoincrement=False,
default=literal_column("uuid_generate_v4()"),
unique=True,
primary_key=True,
)
name = Column(String(80), unique=False, nullable=False)
file_id = Column(String, ForeignKey("file.id"), nullable=False)
file = relationship(
"FileModel", lazy=True, cascade="all, delete-orphan", single_parent=True
)
owner = Column(String(40), default=literal_column("current_user"))
flow = Column(Text, unique=False, nullable=False)
version = Column(
DateTime, default=datetime.now, onupdate=datetime.now, nullable=False
)
class DeploymentModel(BaseModel):
__tablename__ = "deployment"
name = Column(String(80), unique=False, nullable=False)
hostname = Column(String(80), nullable=False)
cpus = Column(Integer, default=1, nullable=False)
processor_id = Column(String(40), ForeignKey("processor.id"), nullable=False)
worker = relationship(
"WorkerModel", lazy=True, uselist=False, back_populates="deployment"
)
class ProcessorModel(HasLogs, BaseModel):
"""
Docstring
"""
__tablename__ = "processor"
module = Column(String(80), nullable=False)
beat = Column(Boolean)
gitrepo = Column(String(180))
branch = Column(String(30), default="main")
commit = Column(String(50), nullable=True)
gittag = Column(String(50), nullable=True)
retries = Column(Integer)
concurrency = Column(Integer)
receipt = Column(String(80), nullable=True)
ratelimit = Column(String(10), default=60)
perworker = Column(Boolean, default=True)
timelimit = Column(Integer)
ignoreresult = Column(Boolean)
serializer = Column(String(10))
backend = Column(String(80))
ackslate = Column(Boolean)
trackstarted = Column(Boolean)
disabled = Column(Boolean)
retrydelay = Column(Integer)
password = Column(Boolean)
requirements = Column(Text)
endpoint = Column(Text)
modulepath = Column(Text)
icon = Column(Text)
cron = Column(Text)
hasapi = Column(Boolean)
uistate = Column(Text)
description = Column(Text(), nullable=True, default="Some description")
container_image = Column(String(60))
container_command = Column(String(180))
container_version = Column(String(20), default="latest")
use_container = Column(Boolean, default=False)
detached = Column(Boolean, default=False)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", backref="processor", lazy=True)
flow_id = Column(String(40), ForeignKey("flow.id"), nullable=True)
password = relationship("PasswordModel", lazy=True, viewonly=True)
password_id = Column(String, ForeignKey("passwords.id"), nullable=True)
plugs = relationship(
"PlugModel", backref="processor", lazy=True, cascade="all, delete-orphan"
)
deployments = relationship(
"DeploymentModel", backref="processor", lazy=True, cascade="all, delete-orphan"
)
sockets = relationship(
"SocketModel", backref="processor", lazy=True, cascade="all, delete-orphan"
)
class JobModel(Base):
__tablename__ = "jobs"
id = Column(String(200), primary_key=True)
next_run_time = Column(DOUBLE_PRECISION)
job_state = Column(LargeBinary)
class PasswordModel(BaseModel):
__tablename__ = "passwords"
id = Column(
String(40),
autoincrement=False,
default=literal_column("uuid_generate_v4()"),
unique=True,
primary_key=True,
)
password = Column(String(60), nullable=False)
processor = relationship("ProcessorModel", lazy=True, uselist=False)
class NetworkModel(BaseModel):
__tablename__ = "network"
schedulers = relationship(
"SchedulerModel", backref="network", lazy=True, cascade="all, delete"
)
queues = relationship(
"QueueModel", backref="network", lazy=True, cascade="all, delete"
)
nodes = relationship(
"NodeModel", backref="network", lazy=True, cascade="all, delete"
)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", lazy=True)
class WorkModel(BaseModel):
__tablename__ = "work"
next_run_time = Column(DOUBLE_PRECISION)
job_state = Column(LargeBinary)
task_id = Column(String(40), ForeignKey("task.id"))
task = relationship("TaskModel", single_parent=True)
calls_events = Table(
"calls_events",
Base.metadata,
Column("call_id", ForeignKey("call.id"), primary_key=True),
Column("event_id", ForeignKey("event.id"), primary_key=True),
)
class CallModel(BaseModel):
"""
Docstring
"""
__tablename__ = "call"
name = Column(String(80), unique=False, nullable=False)
state = Column(String(10))
parent = Column(String(80), nullable=True)
taskparent = Column(String(80), nullable=True)
resultid = Column(String(80))
celeryid = Column(String(80))
tracking = Column(String(80))
argument = Column(String(40))
task_id = Column(String(40), ForeignKey("task.id"), nullable=False)
started = Column(DateTime, default=datetime.now, nullable=False)
finished = Column(DateTime)
socket_id = Column(String(40), ForeignKey("socket.id"), nullable=False)
socket = relationship(
"SocketModel", back_populates="call", lazy=True, uselist=False
)
events = relationship(
"EventModel", secondary=calls_events, lazy=True, cascade="all, delete"
)
class SchedulerModel(BaseModel):
"""
Docstring
"""
__tablename__ = "scheduler"
nodes = relationship("NodeModel", backref="scheduler", lazy=True)
strategy = Column("strategy", Enum(*strategies, name="strategies"))
network_id = Column(String(40), ForeignKey("network.id"))
class SettingsModel(BaseModel):
"""
Docstring
"""
__tablename__ = "settings"
value = Column(String(80), nullable=False)
class NodeModel(BaseModel):
"""
Docstring
"""
__tablename__ = "node"
hostname = Column(String(60))
scheduler_id = Column(String(40), ForeignKey("scheduler.id"), nullable=True)
memsize = Column(String(60), default="NaN")
freemem = Column(String(60), default="NaN")
memused = Column(Float, default=0)
disksize = Column(String(60), default="NaN")
diskusage = Column(String(60), default="NaN")
cpus = Column(Integer, default=0)
cpuload = Column(Float, default=0)
network_id = Column(String(40), ForeignKey("network.id"))
agent = relationship(
"AgentModel", backref="node", uselist=False, cascade="all, delete-orphan"
)
plugs_arguments = Table(
"plugs_arguments",
Base.metadata,
Column("plug_id", ForeignKey("plug.id"), primary_key=True),
Column("argument_id", ForeignKey("argument.id"), primary_key=True),
)
class ArgumentModel(BaseModel):
__tablename__ = "argument"
name = Column(String(60), nullable=False)
position = Column(Integer, default=0)
kind = Column(Integer)
task_id = Column(String(40), ForeignKey("task.id"))
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", lazy=True)
plugs = relationship("PlugModel", backref="argument")
class TaskModel(BaseModel):
"""
Docstring
"""
__tablename__ = "task"
module = Column(String(120), nullable=False, primary_key=True)
gitrepo = Column(String(180), nullable=False, primary_key=True)
"""
Tasks can also be mixed-in to the module loaded by the processor as new functions
using the code field, which must contain a function
"""
mixin = Column(Boolean, default=False)
source = Column(Text) # Repo module function code
code = Column(Text) # Source code override for task
sockets = relationship("SocketModel", back_populates="task")
arguments = relationship("ArgumentModel", backref="task")
class EventModel(BaseModel):
"""
Events are linked to call objects: received, prerun, postrun
"""
__tablename__ = "event"
note = Column(String(80), nullable=False)
name = Column(String(80), nullable=False)
call_id = Column(String(40), ForeignKey("call.id"))
call = relationship(
"CallModel",
back_populates="events",
single_parent=True,
cascade="all, delete-orphan",
)
sockets_queues = Table(
"sockets_queues",
Base.metadata,
Column("socket_id", ForeignKey("socket.id")),
Column("queue_id", ForeignKey("queue.id")),
)
plugs_source_sockets = Table(
"plugs_source_sockets",
Base.metadata,
Column("plug_id", ForeignKey("plug.id"), primary_key=True),
Column("socket_id", ForeignKey("socket.id"), primary_key=True),
)
plugs_target_sockets = Table(
"plugs_target_sockets",
Base.metadata,
Column("plug_id", ForeignKey("plug.id"), primary_key=True),
Column("socket_id", ForeignKey("socket.id"), primary_key=True),
)
class GateModel(BaseModel):
__tablename__ = "gate"
open = Column(Boolean)
task_id = Column(String(40), ForeignKey("task.id"))
class SocketModel(BaseModel):
"""
Docstring
"""
__tablename__ = "socket"
processor_id = Column(String(40), ForeignKey("processor.id"), nullable=False)
schedule_type = Column("schedule_type", Enum(*schedule_types, name="schedule_type"))
scheduled = Column(Boolean)
cron = Column(String(20))
description = Column(Text(), nullable=True, default="Some description")
interval = Column(Integer)
task_id = Column(String(40), ForeignKey("task.id"))
task = relationship(
"TaskModel",
back_populates="sockets",
single_parent=True,
cascade="delete, delete-orphan",
)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", lazy=True)
# Wait for all sourceplugs to deliver their data before invoking the task
wait = Column(Boolean, default=False)
sourceplugs = relationship("PlugModel", secondary=plugs_source_sockets)
targetplugs = relationship("PlugModel", secondary=plugs_target_sockets)
queue = relationship("QueueModel", secondary=sockets_queues, uselist=False)
call = relationship(
"CallModel", back_populates="socket", cascade="all, delete-orphan"
)
plugs_queues = Table(
"plugs_queues",
Base.metadata,
Column("plug_id", ForeignKey("plug.id")),
Column("queue_id", ForeignKey("queue.id")),
)
class PlugModel(BaseModel):
"""
Docstring
"""
__tablename__ = "plug"
type = Column("type", Enum(*plug_types, name="plug_type"), default="RESULT")
processor_id = Column(String(40), ForeignKey("processor.id"), nullable=False)
source = relationship(
"SocketModel",
back_populates="sourceplugs",
secondary=plugs_source_sockets,
uselist=False,
)
target = relationship(
"SocketModel",
back_populates="targetplugs",
secondary=plugs_target_sockets,
uselist=False,
)
argument_id = Column(String, ForeignKey("argument.id"))
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", lazy=True)
description = Column(Text(), nullable=True, default="Some description")
queue = relationship("QueueModel", secondary=plugs_queues, uselist=False)
class QueueModel(BaseModel):
"""
Docstring
"""
__tablename__ = "queue"
qtype = Column(String(20), nullable=False, default="direct")
durable = Column(Boolean, default=True)
reliable = Column(Boolean, default=True)
auto_delete = Column(Boolean, default=True)
max_length = Column(Integer, default=-1)
max_length_bytes = Column(Integer, default=-1)
message_ttl = Column(Integer, default=3000)
expires = Column(Integer, default=3000)
network_id = Column(String(40), ForeignKey("network.id"))
class LoginModel(Base):
__tablename__ = "login"
id = Column(
String(40),
autoincrement=False,
default=literal_column("uuid_generate_v4()"),
unique=True,
primary_key=True,
)
owner = Column(String(40), default=literal_column("current_user"))
created = Column(DateTime, default=datetime.now, nullable=False)
lastupdated = Column(
DateTime, default=datetime.now, onupdate=datetime.now, nullable=False
)
login = Column(DateTime, default=datetime.now, nullable=False)
token = Column(
String(40),
autoincrement=False,
default=literal_column("uuid_generate_v4()"),
unique=True,
primary_key=True,
)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
user = relationship("UserModel", lazy=True, overlaps="logins")
REST
Stack
Containers







