Managing Workflow for Big Data
keywords: Big Data, Workflow
This article provides a more current discussion than a presentation that I gave at the Atlanta, Georgia Node Meetup on February 17, 2017 - The video for that presentation (called"Defeating the Janitorial Challenges of Big Data") is available at:
Most of the blog threads that I see about the implementation of Big Data systems concentrate on the algorithms employed or (more often and more opinionated) on the relative merits of the platforms that are most commonly used to toss petabytes about.
Here I focus on the overall architecture and implementation of a (relatively small) big data system and, in particular, how its unusually complex workflow is managed.
The solution provided by Headspace Sprockets, LLC. was motivated by the needs of a major consumer data processor client. The client's business revolves around a unique online analytics platform encompassing a very rich form of exploratory data analysis and sampling.
The source data that is ingested totals in size about 1 terabyte. They are provided as over 500 separate datasets, updated separately. Most of these are CSV files ranging in size from 1 to 20 million records and with 10 to 1000 fields. About 25 geographic region databases each containing 5000 to 100,000 GeoJSON multi-polygons and dozens of auxiliary databases are also used as source material. In addition, United States Census Bureau TIGER/Line Shapefiles are extensively used.
Data preparation including scalar and geospatial inference create datasets which total in size to about 10 terabytes. This is "small" big data, but the work performed during data prep is very complex compared to common ETL. Some indication of the complexity is from the structure of the output of this processing:
- 100+ new datasets totaling 400 Million records
- 2 Million "enumerative counters" assigned to taxonomic structures
- 5 Billion inferred region assignments (using latitude / longitude)
- 10,000 newly inferred regions (GeoJSON polygons)
- 10,000 "smoothed" GeoJSON regions
The source data changes irregularly but it is common that 5-10% of the records may be modified or in need of reconsideration each day and that a much higher proportion of the records may be modified upon some events that occur weekly and monthly.
The customer has a requirement that stipulates a data preparation turn-around time of fractions of a day for most incremental workflows and of approximately a full day for a complete re-build from scratch.
Clearly, these are unusual but not unique workflow requirements for big data processing.
Sketch of the Solution
I undertook both a literature search and a product search to discover as much as I could about the state of the art. What I found led me to believe that we needed to craft our own solution, but I did find a wonderful starting point: ActionHero.
ActionHero (https://www.actionherojs.com/) is an open sourced node.js API server. The most salient features of ActionHero that we found attractive were:
- Asynchronous task processing for long lived processing;
- Clustering (both local and distributed) support that enables scalability;
- Job queue management using Redis;
- Middleware hooks that enable injection of processing before and after task enqueuing and before and after task execution;
- Extremely clean project structure architecture for development, testing and deployment;
- Ease of incorporating process configuration and initialization routines, and
- An actively supported code base with solid documentation.
We created about 100 different types of tasks based on an analysis of how work may be best divided into reusable and manageable chunks. Each type of task provides a distinct form of data processing including forms of transformation, reconciliation, pattern analysis and inference, geo-spatial inference, and the like.
There are multiple workflows that are defined for different situations and requirements. A common workflow is an orchestration of these task types into a flow of 5200 task instance invocations where each invocation has typically from 1 to 50 immediate dependencies (task instance invocations that must complete before this task instance invocation may run).
Invocations of a task of a particular type may have very different computation requirements and run times - often more than a factor of 10 difference. The load of a task invocation is only partially predictable based on previous performance. Many of these tasks are used within different contexts; For example, a typical single task type takes six different parameters that translate into one of a few hundred permutations of input source and operating characteristics.
Workflow execution is organized into four phases:
- Situation sensing;
- Plan generation;
- Plan compilation; and
- Plan execution.
Each of these is described below.
Execution of all tasks from a cold start is time consuming - currently about 35 hours for our client's data. This has necessitated the development of an inference mechanism which is able to react to events and only flag for execution the tasks that are actually required. Events include but are not limited to:
- The metadata which specifies data models and sources has changed. Rebuild everything.
- One or more data sources has changed that affect broadly applied geo-spatial inferences. For example, annual changes to Census Bureau TIGER/Line data. This forces some "vertical" recomputation over large portions of the datasets but much work related to other "vertical" portions may be avoided. Some inferred geo-spatial regions may need to be recomputed.
- A particular data source has changed. A thread of dependent tasks and changed data needs to be determined to minimize the computational impact of the change.
Events may be combined into a run. Time for processing varies widely (measured from initiation of the processing to the point where all datasets are available). However, judicious planning allows subsets of the data to be ready for use (in an analytic database) at interim points in the overall process.
Plan generation is the creation of a particular instance of a plan based on the output of the situation sensing phase and a knowledge of the possible orderings of workflow tasks.
Workflow tasks are organized principally by precedence relations. Each task has 0 or more immediate predecessors. For our client, I have developed a sub-system which allows the precedence relations to be specified in a relatively compact form (about 40 pages of code) which is then compiled into a kind of execution plan (just under 2000 pages for a full cold-start rebuild).
A common small workflow for our client has 106 tasks. A rendering of the interdependencies is illustrated below.
Often the workflow is an order of magnitude larger. Its rendering is below.
A full cold-start rebuild cannot be meaningfully rendered having several times as many task nodes and considerably denser task precedence relationships.
Plan compilation consists of taking a plan from the last phase and combining it with further information regarding the structure of multiple priority queues and something we call "lanes". This allows an execution plan to be compiled that will dynamically attempt to minimize overall execution time.
A rather simple but very effective way of ensuring that some scarce resources are used effectively is through the creation of workflow lanes. Without getting too deep into an explanation here, imagine that there is a resource which can only be used in a serialized way - only one task is allowed to execute at a time. This is not strictly a precedence relation - we are not imposing a constraint on the order of task execution - merely that only one task is allowed at a time to be executed that uses that resource. We designate that those tasks are constrained to execute in a lane that is associated with the resource.
The heart of plan compilation is a topological sort which is used to verify that the plan is sound (is acyclic and fully connected) and the creation of an execution plan in a form which may be launched.
Plan execution consists of a very sophisticated way of dynamically packing as much work as possible into the job streams (typically being executed using approximately 50 to 100 cores).
ActionHero provides features for both task queue management and for task middleware. I have needed to largely bypass the use of its task queue management in favor of my own. I do make great use of task middleware. These are programmatic hooks for the start and completion of enqueue operations and actual task execution. In particular, plan execution makes use of the task completion middleware hook. As each task completes:
- an inventory is made of tasks that are not waiting for any unfulfilled dependencies,
- a partial topological sort is made to prioritize those tasks,
- a determination is made of what execution processors are available that are appropriate respecting constraints imposed by lanes, and
- finally, tasks are enqueued using a minimal commitment strategy.
During execution of a workflow, checks are performed on the health of the processing and to report progress.
Future Improvements and Variations
I have built what may be viewed as at least four generations of successively more efficient workflow management. The workflow system has grown in sophistication and in generality.
Future improvements may focus on better predictions of task execution times to enable even greater packing of work into dynamic priority queues to minimize time to interim results or overall time or both.