Going Beyond MapReduce for Hadoop ETL Pt.2 : Introducing Apache YARN and Apache Tez
In the first post in this three part series on going beyond MapReduce for Hadoop ETL, I looked at how a typical Apache Pig script gets compiled into a series of MapReduce jobs, and those MapReduce jobs pass data between themselves by writing intermediate resultsets to disk (HDFS, the Hadoop cluster file system). As a reminder, here’s the Pig script we’re working with:
register /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar raw_logs = LOAD '/user/mrittman/rm_logs' USING TextLoader AS (line:chararray); logs_base = FOREACH raw_logs GENERATE FLATTEN (REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"') )AS (remoteAddr: chararray, remoteLogname: chararray, user: chararray,time: chararray, request: chararray, status: chararray, bytes_string: chararray,referrer:chararray,browser: chararray); logs_base_nobots = FILTER logs_base BY NOT (browser matches '.*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*'); logs_base_page = FOREACH logs_base_nobots GENERATE SUBSTRING(time,0,2) as day, SUBSTRING(time,3,6) as month, SUBSTRING(time,7,11) as year, FLATTEN(STRSPLIT(request,' ',5)) AS (method:chararray, request_page:chararray, protocol:chararray), remoteAddr, status; logs_base_page_cleaned = FILTER logs_base_page BY NOT (SUBSTRING(request_page,0,3) == '/wp' or request_page == '/' or SUBSTRING(request_page,0,7) == '/files/' or SUBSTRING(request_page,0,12) == '/favicon.ico'); logs_base_page_cleaned_by_page = GROUP logs_base_page_cleaned BY request_page; page_count = FOREACH logs_base_page_cleaned_by_page GENERATE FLATTEN(group) as request_page, COUNT(logs_base_page_cleaned) as hits; page_count_sorted = ORDER page_count BY hits DESC; page_count_top_10 = LIMIT page_count_sorted 10; posts = LOAD '/user/mrittman/posts.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage() as (post_id:int,title:chararray,post_date:chararray,post_type:chararray,author:chararray,url:chararray,generated_url:chararray); posts_cleaned = FOREACH posts GENERATE CONCAT(generated_url,'/') as page_url,author as author, title as title; pages_and_post_details = JOIN page_count by request_page, posts_cleaned by page_url; pages_and_posts_trim = FOREACH pages_and_post_details GENERATE page_count::request_page as request_page, posts_cleaned::author as author, posts_cleaned::title as title, page_count::hits as hits; pages_and_posts_sorted = ORDER pages_and_posts_trim BY hits DESC; pages_and_post_top_10 = LIMIT pages_and_posts_sorted 10; store pages_and_post_top_10 into 'top_10s/pages';
When you submit the script for execution using the Pig client, it then parses the Pig Latin script, logically optimizes it and then compiles it into MapReduce programs; these programs are then sent in-turn to the Hadoop 1.0 Job Manager which then sends them for execution on the Hadoop cluster - in the case of our Pig script, there’s five MapReduce programs generated in-total.
Each one of these MapReduce programs are what’s called “directed acyclic graphs”, or DAGs. A directed acyclic graph is a programming style within distributed computing where processing is broken down into functions that can be run independently of one another, as long as one is not an ancestor of another. In MapReduce terms, this means that all mappers can run independently of each other (and therefore on different nodes in a cluster) and it’s only the reducers that have to wait for their ancestors to finish before they can also start their work independently of the other reducers. It’s a great programming model for processing large amounts of data with fault tolerance across a cluster and it was the key insight that made MapReduce possible and the “big data” systems that we work with today.
A Pig script that generates five MapReduce jobs then effectively creates five DAGs, with each one using files (HDFS) to persist and hand-off data between them and JVMs being spun-up for the individual map and reduce jobs. As such, there’s no way for this version of MapReduce and the Hadoop framework it runs on to consider the overall dataflow, and each DAG has to run in isolation.
To address this issue, version 2.0 of Hadoop introduced a new feature called Apache YARN, or “yet another resource negotiator”. YARN took on the resource management and job scheduling/monitoring parts of Hadoop and made it so that YARN then effectively became an “operating system” for Hadoop that then allowed frameworks to run on it; initially MapReduce2 (reworked to run on YARN), but since then other ones like Apache Tez, and Apache Spark.
YARN also crucially supported frameworks that used DAGs that describe the entire dataflow, not just individual MapReduce jobs, and a new framework that came out of this that used that new capability was Apache Tez. Tez is a generalisation of the MapReduce distributed compute framework that supports these dataflow-style DAGs and runs either MapReduce code unaltered, or has its own API for describing DAGs using vertexes (logic and resources) or edges (connections). Both Hive and Pig have been ported to run on Tez, and in Pig’s case this means another type of compiler is added to the existing MapReduce one, and Pig scripts executed on Tez can now submit a DAG that encompasses all stages in the dataflow and the reducers and be linked-together via in-memory passing of intermediate steps, rather than having to write those intermediate steps to disk.
In practice, what this means is that if your version of Pig or Hive has been updated to also run on Tez, you can run your code unaltered on Tez and typically see a 2-3x performance improvement without any changes to your code or application logic. Hortonworks have been the main Hadoop vendor backing Tez and their new Hortonworks Data Platform 2.2 comes with Tez support, so I took my Pig script and ran it on a five-node cluster to get an initial timing and it took 4 min 17 secs to run, again generating the same five MapReduce jobs that I saw on the Cloudera CDH5 cluster. Then, running it using Tez as the execution engine (pig -x tez analyzeblog.pig) the same script took 2mins 25secs to run, around twice as fast as when we ran it using regular MapReduce.
And Tez is great for getting existing Pig and Hive scripts to run faster - if your platform supports it, you should use it instead of MapReduce as your execution engine; MapReduce code submitted “as is” will benefit from better YARN container re-use, and Hive and Pig scripts that run on the Tez execution engine can run as a single DAG and use memory, rather than disk, to pass data between jobs in the DAG. For ETL and analytic jobs that you’re creating from new though, Apache Spark is arguably the framework you should look to use instead of Tez, and tomorrow we’ll find out why.