Custom Workflow Orchestration In Python in an Art Generator and the above popped out)
This post originally appeared on my employer VISO Trust’s blog. It is lightly edited and reproduced here.
On the Data & Machine Learning team at VISO Trust, one of our core goals is to provide Document Intelligence to the audit team. Every Document that passes through the system is subject to collection, parsing, reformatting, analysis, reporting and more. Every day, we work to expand this feature set, increase its accuracy and deliver faster results.
Why we needed workflow orchestration
There are many individual tasks executed which eventually result in what’s provided by Document Intelligence, including but not limited to:
- Security Control Language Detections
- Audit Framework Control ID Detections
- Named Entity Extraction like organizations, dates and more
- Decryption of encrypted pdfs
- Translation of foreign language pdfs
- Document Classification
- Document Section Detection
Until our workflow orchestration implementation, the features listed above and more were all represented in code inside a single function. Over time, this function became unwieldy and difficult to read; snippets of ceremony, controls, logging, function calls and more sprinkled throughout. Moreover, this is one of the most important areas of our app where new features will be implemented regularly. So the need to clean this code up and make it easier to reason about became clear. Furthermore, execution inside this function occurred sequentially despite the fact that some of its function calls could occur in parallel. While in its current state, parallel execution isn’t required, we knew that in the near future, features in the roadmap would necessitate it. With these two requirements:
- task execution that is easier to reason about and
- the ability to execute in parallel
We knew we needed to either use an existing workflow orchestration tool or write it custom. We began with some rough analysis of what was going on in our main automation function, namely, we formalized each ‘step’ into a concept called Task and theorized on which Task’s could execute in parallel. At the time of the analysis, we had 11 ‘Tasks’ each of which required certain inputs and produced certain outputs; based on these inputs and outputs, we determined that a number could run in parallel. With this context, we reviewed some of the major open source python toolkits for workflow orchestration:
Both of these toolkits are designed for managing workflows that have tens, hundreds up to thousands of tasks to complete and can take days or weeks to finish. They have complex schedulers, user interfaces, failure modes, options for a variety of input and output modes and more. Our pipeline will reach this level of complexity someday, but with an 11 Task pipeline, we decided that these toolkits added too much complexity for our use. We resolved to build a custom workflow orchestration toolkit guided by the deep knowledge in these more advanced tools.
Our custom workflow orchestration
The first goal was to generalize all of the steps in our automation service into the concept of a Task. A few examples of a Task would be:
- detecting a document’s language,
- translating a foreign language document,
- processing OCR results into raw text,
- detecting keywords inside text,
- running machine learning inference on text.
Just reading this list gives one a feel for how each Task is dependent on a previous Task’s output to run. Being explicit about dependencies is core to workflow orchestration, so the first step in our Task concept was defining what inputs a given Task requires and what outputs it will produce. To demonstrate Task’s, we will develop a fake example Task called
DocClassifyInference, the goal of which is to run ML inference to classify a given document. Imagine that our model uses both images of the raw pdf file and the text inside it to make predictions. Our Task, then, will require the decrypted PDF and the paginated text of the pdf in order to execute. Further, when it’s complete it will write a file to S3 containing its results. Thus, the start of our example Task might look like:
DocClassifyInference subclasses S3Task, an abstract class that enforces defining a method to write to s3. S3Task itself is a subclass of the Task class which enforces that subclasses define input keys, output keys and an execute method. The keys are enforced in a Pipeline class:
This Pipeline will become the object that manages state as our Tasks execute. In our case we were not approaching memory limits so we decided to keep much of the Task state in-memory though this could easily be changed to always write to and read from storage. As a state manager, the Pipeline can also capture ceremony prior to executing any Tasks that downstream Tasks may require.
Continuing on with
DocClassifyInference, as a subclass of the abstract class Task,
DocClassifyInference will have to implement
def execute as well (enforced by Task). This method will take a Pipeline and return a Pipeline. In essence, it receives the state manager, modifies the state and returns it so the next Task can operate on it. In our example case,
execute will extract the decrypted pdf and paginated text so they can be used as inputs for a ML model to perform document classification. Let’s look at the entire stubbed out
It’s easy to see how
DocClassifyInference gets the Pipeline state, extracts what it needs, operates on that data, sets what it has declared it’s going to set and returns the Pipeline. This allows for an API like this:
Which of course was much cleaner than what we had previously. It also lends itself to writing easy, understandable unit tests per Task as well as adhering more closely to functional programming principles. So this solves our first goal of making the code cleaner and more easy to reason about. What about parallel processing?
Similar to Luigi and Apache Airflow, the goal of our workflow orchestration is to generate a topologically sorted Directed Acyclic Graph of Tasks. In short, having each Task explicitly define its required inputs and intended output allows the Tasks to be sorted for optimal execution. We no longer need to write the Tasks down in sequential order like the API described above, rather we can pass a Task Planner a list of Tasks and it can decide how to optimally execute them. What we’ll want then is a Task Planner that is passed a List of Tasks, sorts the Tasks topologically and returns a list where each member is a list that contains Tasks. Let’s take a look at what this might look like using some of our examples from above:
Here I have retained our examples while adding two new Tasks: KeywordDetection and CreateCSVOutput. You can imagine these like matching keywords in the paginated text and modifying the results of RunDocInference & KeywordDetection to create a formatted CSVOutput. When the Task Planner receives this list, we’ll want it to topologically sort the tasks and output a data structure that looks like this:
In the above List, you can imagine each of its members is a ‘stage’ of execution. Each stage has one-to-many Tasks; in the case of one, execution occurs sequentially and in the case of many, execution occurs in parallel. In english, the
expected_task_plan can described like so:
- DecryptPDF depends on nothing and creates a consumable PDF,
- PaginatedText depends on a consumable PDF and creates a list of strings
– RunDocInference depends on both and classifies the document
– KeywordDetection depends on paginated text and produces matches
- CreateCSVOutput depends on doc classification and keyword detection and produces a formatted CSV of their outputs.
An example of the function that creates the
expected_task_plan above might look like:
This function gets the list of Tasks, ensures that no two Task outputs have identical keys, adds the nodes to a sorter by interrogating the Task input_keys and output_keys and sorts them topologically. In our case the sorter comes from graphlib’s TopologicalSorter which is described here. Getting into what each of these functions are doing would take us too far afield so we will move on to executing a task plan.
expected_task_plan shown above, an
execute_task_plan() function is straightforward:
Here we iterate over the task list deciding between sequential execution or parallel execution. In the latter case, we utilize python’s threading.Thread library to create a thread per task and use idiomatic methods for starting and joining threads. Wait, then what is TaskThread?
In our case, we wanted to ensure that an exception in a child thread will always be raised to the calling thread so the calling thread can exit immediately. So we extended the threading.Thread class with our own class called TaskThread. Overriding threading.Thread’s .run() method is fairly common (so common that it’s suggested in run()’s comments); we overrode run() to set an instance variable carrying an exception’s content and then we check that variable at .join() time.
The calling thread can now
try/except at .join() time.
With these structures in place, the file containing the automation service’s primary functions was reduced from ~500 lines to ~90. Now when we create our threadpool to consume SQS messages, we get the Task plan like so
task_plan = get_task_plan() and pass the task_plan into each thread. Once execution reaches the main function for performing document intelligence, what previously was a large section of difficult-to-read code now becomes:
The introduction of parallel processing of these Task’s shaved consistent time off of performing document intelligence (an average of about a minute). The real benefit of this change, however, will come in the future as we add more and more Tasks to the pipeline that can be processed in parallel.
While we’ve reduced the time-to-audit significantly from the former state-of-the-art, we are definitely not done. Features like the above will enable us to continue reducing this time while maintaining consistent processing times. We hope this blog helps you in your workflow orchestration research.