Decorators
ElasticCode Python decorator API
“””
Decorator API for Flow. Defines network from plain old classes and methods.
“””
import os
from pyfi.client.api import ProcessorBase
from pyfi.client.decorators import plug, processor, socket
@processor(
name="proc2",
gitrepo=os.environ["GIT_REPO"],
module="pyfi.processors.sample",
concurrency=1,
)
class ProcessorB(ProcessorBase):
"""Description"""
@socket(
name="proc2.do_this",
processor="proc2",
arguments=True,
queue={"name": "sockq2"},
)
def do_this(message):
from random import randrange
print("Do this!", message)
message = "Do this String: " + str(message)
graph = {
"tag": {"name": "tagname", "value": "tagvalue"},
"name": "distance",
"value": randrange(50),
}
return {"message": message, "graph": graph}
@processor(
name="proc1",
gitrepo=os.environ["GIT_REPO"],
module="pyfi.processors.sample",
concurrency=7,
)
class ProcessorA(ProcessorBase):
"""Description"""
def get_message(self):
return "Self message!"
@plug(
name="plug1",
target="proc2.do_this", # Must be defined above already (prevents cycles)
queue={
"name": "queue1",
"message_ttl": 300000,
"durable": True,
"expires": 200,
},
)
@socket(
name="proc1.do_something",
processor="proc1",
beat=False,
interval=15,
queue={"name": "sockq1"},
)
def do_something(message):
"""do_something"""
from random import randrange
message = "TEXT:" + str(message)
graph = {
"tag": {"name": "tagname", "value": "tagvalue"},
"name": "temperature",
"value": randrange(10),
}
return {"message": message, "graph": graph}
@processor(
name="proc3",
gitrepo=os.environ["GIT_REPO"],
module="pyfi.processors.sample",
concurrency=1,
)
class ProcessorC(ProcessorBase):
"""Description"""
def get_message(self):
return "Self message!"
@plug(
name="plug2",
target="proc2.do_this", # Must be defined above already (prevents cycles)
queue={
"name": "queue2",
"message_ttl": 300000,
"durable": True,
"expires": 200,
},
)
@socket(
name="proc3.do_something",
processor="proc3",
beat=False,
interval=5,
queue={"name": "sockq3"},
)
def do_something(message):
"""do_something"""
from random import randrange
message = "TEXT2:" + str(message)
graph = {
"tag": {"name": "tagname", "value": "tagvalue"},
"name": "temperature",
"value": randrange(10),
}
return {"message": message, "graph": graph}
if __name__ == "__main__":
print("Network created.")