Skip to content

Latest commit

 

History

History
333 lines (188 loc) · 35.2 KB

XX53-tuning-practical_and_eager.asciidoc

File metadata and controls

333 lines (188 loc) · 35.2 KB

One of the wonderful and terrible things about Hadoop (or anything else at Big Data scale) is that there are very few boundary cases for performance optimization. If your dataflow has the wrong shape, it is typically so catastrophically inefficient as to be unworkable. Otherwise, Hadoop’s scalability makes the price of simply throwing more hardware at the problem competitive with investing to optimize it, especially for exploratory analytics. That’s why the repeated recommendation of this book is: Get the algorithm right, get the contextability right and size your cluster to your job.

When you begin to productionize the results of all your clever work, it becomes valuable to look for these 30-percent, 10-percent improvements. Debug loop time is also important, though, so it is useful to get a feel for when and why early optimization is justified.

The first part of this chapter discusses Tuning for the Wise and Lazy — showing you how to answer the question, “Is my job slow and should I do anything about it?” Next, we will discuss Tuning for the Brave and Foolish, diving into the details of Hadoop’s maddeningly numerous, often twitchy, configuration knobs. There are low-level setting changes that can dramatically improve runtime; we will show you how to recognize them and how to determine what per-job overrides to apply. Lastly, we will discuss a formal approach for diagnosing the health and performance of your Hadoop cluster and your Hadoop jobs to help you confidently and efficiently identify the source of deeper flaws or bottlenecks.

Tuning For The Wise and Lazy

Before tuning your job, it is important to first understand whether it is slow and if so, where the bottleneck arises. Generally speaking, your Hadoop job will either be CPU-bound within your code, memory-bound within your code, bound by excessive Reducer data (causing too many merge passes) or throughput-bound within the framework or underlying system. (In the rare cases it is not one of those, you would next apply the more comprehensive “USE method” (TODO: REF) described later in this chapter.) But for data-intensive jobs, the fundamental upper bound on Hadoop’s performance is its effective throughput from disk. You cannot make your job process data faster than it can load it.

Here is the conceptual model we will use: Apart from necessary overhead, job coordination and so forth, a Hadoop job must:

  • Streams data to the Mapper, either locally from disk or remotely over the network;

  • Runs that data through your code;

  • Spills the midstream data to disk one or more times;

  • Applies combiners, if any;

  • Merge/Sorts the spills and sends the over the network to the Reducer.

The Reducer:

  • Writes each Mapper’s output to disk;

  • Performs some number of Merge/Sort passes;

  • Reads the data from disk;

  • Runs that data through your code;

  • Writes its output to the DataNode, which writes that data once to disk and twice through the network;

  • Receives two other Reducers’ output from the network and writes those to disk.

Hadoop is, of course, pipelined; to every extent possible, those steps are happening at the same time. What we will do, then, is layer in these stages one by one, at each point validating that your job is as fast as your disk until we hit the stage where it is not.

Fixed Overhead

The first thing we want is a job that does nothing; this will help us understand the fixed overhead costs. Actually, what we will run is a job that does almost nothing; it is useful to know that your test really did run.

(TODO:  Disable combining splits)
Load 10_tiny_files
Filter most of it out
Store to disk

(TODO:  Restrict to 50 Mapper slots or rework)
Load 10000_tiny_files
Filter most of it out
Store to disk

(TODO: is there a way to limit the number of Reduce slots in Pig? Otherwise, revisit the below.)

In (TODO: REF), there is a performance comparison worksheet that you should copy and fill in as we go along. It lists the performance figures for several reference clusters on both cloud and dedicated environments for each of the tests we will perform. If your figures do not compare well with the appropriate reference cluster, it is probably worthwhile adjusting the overall configuration. Assuming your results are acceptable, you can tape the worksheet to your wall and use it to baseline all the jobs you will write in the future. The rest of the chapter will assume that your cluster is large enough to warrant tuning but not grossly larger than the largest reference cluster.

If you run the Pig script above (TODO: REF), Hadoop will execute two jobs: one with 10 Mappers and no Reducers and another with 10,000 Mappers and no Reducers. From the Hadoop Job Tracker page for your job, click on the link showing the number of Map tasks to see the full task listing. All 10 tasks for the first job should have started at the same time and uniformly finished a few seconds after that. Back on the main screen, you should see that the total job completion time was more or less identical to that of the slowest Map task.

The second job ran its 10,000 Map tasks through a purposefully restricted 50 Mapper slots — so each Mapper slot will have processed around 200 files. If you click through to the Task listing, the first wave of tasks should start simultaneously and all of them should run in the same amount of time that the earlier test did.

(TODO: show how to find out if one node is way slower)

Even in this trivial case, there is more variance in launch and runtimes than you might first suspect (if you don’t, you definitely will in the next — but for continuity, we will discuss it here). If that splay — the delay between the bulk of jobs finishing and the final job finishing — is larger than the runtime of a typical task, however, it may indicate a problem, but as long as it is only a few seconds, don’t sweat it. If you are interested in a minor but worth-it tweak, adjust the mapred.job.reuse.jvm.num.tasks setting to ‘10’. This causes each Mapper to use the same child process JVM for multiple attempts, minimizing the brief but noticeable JVM startup time’s impact. If you are writing your own native Java code, you might know a reason to force no reuse (the default), but it is generally harmless for any well-behaved program.

On the Job screen, you should see that the total runtime for the job was about 200 times slower for the second job than the first and not much more than 200 times the typical task’s runtime; if not, you may be putting pressure on the Job Tracker. Rerun your job and watch the Job Tracker’s heap size; you would like the Job Tracker heap to spend most of its life below, say 50-percent, so if you see it making any significant excursions toward 100-percent, that would unnecessarily impede cluster performance. The 1 GB out-of-the-box setting is fairly small; for production use we recommend at least 3 GB of heap on a dedicated machine with at least 7 GB total RAM.

If the Job coordination overhead is unacceptable but the Job Tracker heap is not to blame, a whole host of other factors might be involved; apply the USE method, described (TODO: REF).

Mapper Input

Now that we’ve done almost nothing, let’s do almost something — read in a large amount of data, writing just enough to disk to know that we really were there.

Load 100 GB from disk
Filter all but 100 MB
Store it to disk

Run that job on the 100-GB GitHub archive dataset. (TODO: Check that it will do speculative execution.) Once the job completes, you will see as many successful Map tasks as there were HDFS blocks in the input; if you are running a 128-MB block size, this will be about (TODO: How many blocks are there?).

Again, each Map task should complete in a uniform amount of time and the job as a whole should take about ‘length_of_Map_task*number_of_Map_tasks=number_of_Mapper_slots’. The Map phase does not end until every Mapper task has completed and, as we saw in the previous example, even in typical cases, there is some amount of splay in runtimes.

(TODO: Move some of JT and Nanette’s optimizations forward to this chapter). Like the chimpanzees at quitting time, the Map phase cannot finish until all Mapper tasks have completed.

You will probably notice a half-dozen or so killed attempts as well. The ‘TODO: name of speculative execution setting’, which we recommend enabling, causes Hadoop to opportunistically launch a few duplicate attempts for the last few tasks in a job. The faster job cycle time justifies the small amount of duplicate work.

Check that there are few non-local Map tasks — Hadoop tries to assign Map attempts (TODO: check tasks versus attempts) to run on a machine whose DataNode holds that input block, thus avoiding a trip across the network (or in the chimpanzees’ case, down the hallway). It is not that costly, but if you are seeing a large number of non-local tasks on a lightly-loaded cluster, dig deeper.

Dividing the average runtime by a full block of Map task by the size of an HDFS block gives you the Mapper’s data rate. In this case, since we did almost nothing and wrote almost nothing, that value is your cluster’s effective top speed. This has two implications: First, you cannot expect a data-intensive job to run faster than its top speed. Second, there should be apparent reasons for any job that runs much slower than its top speed. Tuning Hadoop is basically about making sure no other part of the system is slower than the fundamental limit at which it can stream from disk.

While setting up your cluster, it might be worth baselining Hadoop’s top speed against the effective speed of your disk and your network. Follow the instructions for the ‘scripts/baseline_performance’ script (TODO: write script) from the example code above. It uses a few dependable user-level processes to measure the effective data rate to disk (‘DD’ and ‘CP’) and the effective network rate (‘NC’ and ‘SCP’). (We have purposely used user-level processes to account for system overhead; if you want to validate that as well, use a benchmark like Bonnie++ (TODO: link)). If you are dedicated hardware, the network throughput should be comfortably larger than the disk throughput. If you are on cloud machines, this, unfortunately, might not hold but it should not be atrociously lower.

If the effective top speed you measured above is not within (TODO: figure out healthy percent) percent, dig deeper; otherwise, record each of these numbers on your performance comparison chart.

If you’re setting up your cluster, take the time to generate enough additional data to keep your cluster fully saturated for 20 or more minutes and then ensure that each machine processed about the same amount of data. There is a lot more variance in effective performance among machines than you might expect, especially in a public cloud environment; it can also catch a machine with faulty hardware or setup. This is a crude but effective benchmark, but if you’re investing heavily in a cluster consider running a comprehensive benchmarking suite on all the nodes — the chapter on Stupid Hadoop Tricks shows how (TODO ref)

The Many Small Files Problem

One of the most pernicious ways to impair a Hadoop cluster’s performance is the “many-small-files” problem. With a 128-MB block size (which we will assume for the following discussion), a 128-MB file takes one block (obviously), a 1-byte file takes one block and a 128-MB+1 byte file takes two blocks, one of them full, the other with one solitary byte.

Storing 10 GB of data in, say, 100 files is harmless — the average block occupancy is a mostly-full 100 MB. Storing that same 10GB in say 10,000 files is, however, harmful in several ways. At the heart of the Namenode is a table that lists every file and block. As you would expect, the memory usage of that table roughly corresponds to the number of files plus the number of blocks, so the many-small-files example uses about 100 times as much memory as warranted. Engage in that bad habit often enough and you will start putting serious pressure on the Namenode heap and lose your job shortly thereafter. What is more, the many-small-files version will require 10,000 Map tasks, causing memory pressure on the Job Tracker and a job whose runtime is dominated by task overhead. Lastly, there is the simple fact that working with 10,000 things is more annoying than working with 100 — it takes up space in datanode heartbeats, client requests, your terminal screen and your head.

Causing this situation is easier to arrive at than you might expect; in fact, you just did so. The 100-GB job you just ran most likely used 800 Mapper slots yet output only a few MB of data. Any time your mapper output is significantly smaller than its input — for example, when you apply a highly-restrictive filter to a large input — your output files will have poor occupancy.

A sneakier version of this is a slightly “expansive” Mapper-Only job. A job whose Mappers turned a 128-MB block into, say, 150 MB of output data would reduce the block occupancy by nearly half and require nearly double the Mapper slots in the following jobs. Done once, that is merely annoying but in a workflow that iterates or has many stages, the cascading dilution could become dangerous.

You can audit your HDFS to see if this is an issue using the ‘hadoop fsck [directory]’ command. Running that command against the directory holding the GitHub data should show 100 GB of data in about 800 blocks. Running it against your last job’s output should show only a few MB of data in an equivalent number of blocks.

You can always distill a set of files by doing ‘group_by’ with a small number of Reducers using the record itself as a key. Pig and Hive both have settings to mitigate the many-small-files problem. In Pig, the (TODO: find name of option) setting will feed multiple small files to the same Mapper; in Hive (TODO: look up what to do in Hive). In both cases, we recommend modifying your configuration to make that the default and disable it on a per-job basis when warranted.

Midstream Data

Now let’s start to understand the performance of a proper Map/Reduce job. Run the following script, again, against the 100 GB GitHub data.

Parallel 50
Disable optimizations for pushing up filters and for Combiners
Load 100 GB of data
Group by record itself
Filter out almost everything
Store data

The purpose of that job is to send 100 GB of data at full speed through the Mappers and midstream processing stages but to do almost nothing in the Reducers and write almost nothing to disk. To keep Pig from “helpfully” economizing the amount of midstream data, you will notice in the script we disabled some of its optimizations. The number of Map tasks and their runtime should be effectively the same as in the previous example, and all the sanity checks we’ve given so far should continue to apply. The overall runtime of the Map phase should only be slightly longer (TODO: how much is slightly?) than in the previous Map-only example, depending on how well your network is able to outpace your disk.

It is an excellent idea to get into the habit of predicting the record counts and data sizes in and out of both Mapper and Reducer based on what you believe Hadoop will be doing to each record and then comparing to what you see on the Job Tracker screen. In this case, you will see identical record counts for Mapper input, Mapper output and Reducer input and nearly identical data sizes for HDFS bytes read, Mapper output, Mapper file bytes written and Reducer input. The reason for the small discrepancies is that, for the file system metrics, Hadoop is recording everything that is read or written, including logged files and so forth.

Midway or so through the job — well before the finish of the Map phase — you should see the Reducer tasks start up; their eagerness can be adjusted using the (TODO: name of setting) setting. By starting them early, the Reducers are able to begin merge/sorting the various Map task outputs in parallel with the Map phase. If you err low on this setting, you will disappoint your coworkers by consuming Reducer slots with lots of idle time early but that is better than starting them too late, which will sabotage parallels.

Visit the Reducer tasks listing. Each Reducer task should have taken a uniform amount of time, very much longer than the length of the Map tasks. Open a few of those tasks in separate browser tabs and look at their counters; each should have roughly the same input record count and data size. It is annoying that this information is buried as deeply as it is because it is probably the single most important indicator of a flawed job; we will discuss it in detail a bit later on.

Spills

First, though, let’s finish understanding the data’s detailed journey from Mapper to Reducer. As a Map task outputs records, Hadoop sorts them in the fixed-size io.sort buffer. Hadoop files records into the buffer in partitioned, sorted order as it goes. When that buffer fills up (or the attempt completes), Hadoop begins writing to a new empty io.sort buffer and, in parallel, “spills” that buffer to disk. As the Map task concludes, Hadoop merge/sorts these spills (if there were more than one) and sends the sorted chunks to each Reducer for further merge/sorting.

The Job Tracker screen shows the number of Mapper spills. If the number of spills equals the number of Map tasks, all is good — the Mapper output is checkpointed to disk before being dispatched to the Reducer. If the size of your Map output data is large, having multiple spills is the natural outcome of using memory efficiently; that data was going to be merge/sorted anyway, so it is a sound idea to do it on the Map side where you are confident it will have a uniform size distribution.

(TODO: do combiners show as multiple spills?)

What you hate to see, though, are Map tasks with two or three spills. As soon as you have more than one spill, the data has to be initially flushed to disk as output, then read back in full and written again in full for at least one merge/sort pass. Even the first extra spill can cause roughly a 30-percent increase in Map task runtime.

There are two frequent causes of unnecessary spills. First is the obvious one: Mapper output size that slightly outgrows the io.sort buffer size. We recommend sizing the io.sort buffer to comfortably accommodate Map task output slightly larger than your typical HDFS block size — the next section (TODO: REF) shows you how to calculate. In the significant majority of jobs that involve a Reducer, the Mapper output is the same or nearly the same size — JOINs or GROUPs that are direct, are preceded by a projection or filter or have a few additional derived fields. If you see many of your Map tasks tripping slightly over that limit, it is probably worth requesting a larger io.sort buffer specifically for your job.

There is also a disappointingly sillier way to cause unnecessary spills: The io.sort buffer holds both the records it will later spill to disk and an index to maintain the sorted order. An unfortunate early design decision set a fixed size on both of those with fairly confusing control knobs. The ‘iosortrecordpercent’ (TODO: check name of setting) setting gives the size of that index as a fraction of the sort buffer. Hadoop spills to disk when either the fraction devoted to records or the fraction devoted to the index becomes full. If your output is long and skinny, cumulatively not much more than an HDFS block but with a typical record size smaller than, say, 100 bytes, you will end up spilling multiple small chunks to disk when you could have easily afforded to increase the size of the bookkeeping buffer.

There are lots of ways to cause long, skinny output but set a special triggers in your mind for cases where you have long, skinny input; turn an adjacency-listed graph into an edge-listed graph or otherwise FLATTEN bags of records on the Mapper side. In each of these cases, the later section (TODO: REF) will show you how to calculate it.

(TODO: either here or later, talk about the surprising cases where you fill up MapRed scratch space or FS.S3.buffer.dir and the rest of the considerations about where to put this).

Combiners

It is a frequent case that the Reducer output is smaller than its input (and kind of annoying that the word “Reducer” was chosen, since it also frequently is not smaller). “Algebraic” aggregations such as COUNT, AVG and so forth, and many others can implement part of the Reducer operation on the Map side, greatly lessening the amount of data sent to the Reducer.

Pig and Hive are written to use Combiners whenever generically appropriate. Applying a Combiner requires extra passes over your data on the Map side and so, in some cases, can themselves cost much more time than they save.

If you ran a distinct operation over a data set with 50-percent duplicates, the Combiner is easily justified since many duplicate pairs will be eliminated early. If, however, only a tiny fraction of records are duplicated, only a disappearingly-tiny fraction will occur on the same Mapper, so you will have spent disk and CPU without reducing the data size.

Whenever your Job Tracker output shows that Combiners are being applied, check that the Reducer input data is, in fact, diminished. (TODO: check which numbers show this) If Pig or Hive have guessed badly, disable the (TODO: name of setting) setting in Pig or the (TODO: name of setting) setting in Hive.

Reducer Merge (aka Shuffle and Sort)

We are now ready to dig into the stage with the most significant impact on job performance, the merge/sort performed by the Reducer before processing. In almost all the rest of the cases discussed here, an inefficient choice causes only a marginal impact on runtime. Bring down too much data on your Reducers, however, and you will find that, two hours into the execution of what you thought was a one-hour job, a handful of Reducers indicate they have four hours left to run.

First, let’s understand what is going on and describe healthy execution; then, we will discuss various ways it can go wrong and how to address them.

As you just saw, data arrives from the Mappers pre-sorted. The Reducer reads them from memory into its own sort buffers. Once a threshold (controlled by the (TODO: name of setting) setting) of data has been received, the Reducer commissions a new sort buffer and separately spills the data to disk, merge/sorting the Mapper chunks as it goes. (TODO: check that this first merge/sort happens on spill)

Enough of these spills later (controlled by the (TODO: setting) setting), the Reducer begins merge/sorting the spills into a larger combined chunk. All of this activity is happening in parallel, so by the time the last Map task output is received, the typical healthy situation is to have a modest number of large sorted chunks and one small-ish chunk holding the dregs of the final spill. Once the number of chunks is below the (TODO: look up name of setting) threshold, the merge/sort is complete — it does not need to fully merge the data into a single file onto disk. Instead, it opens an input stream onto each of those final chunks, consuming them in sort order.

Notice that the Reducer flushes the last spill of received Map data to disk, then immediately starts reconsuming it. If the memory needs of your Reducer are modest, you can instruct Hadoop to use the sort buffer directly in the final merge, eliminating the cost and delay of that final spill. It is a nice marginal improvement when it works but if you are wrong about how modest your Reducer’s memory needs are, the negative consequences are high and if your Reducers have to perform multiple merge/sort passes, the benefits are insignificant.

For a well-tested job heading to production that requires one or fewer merge/sort passes, you may judiciously (TODO: describe how to adjust this).

(TODO: discuss buffer sizes here or in Brave and Foolish section) (TODO: there is another setting that I’m forgetting here - what is it?)

Once your job has concluded, you can find the number of merge/sort passes by consulting the Reduce tasks counters (TODO: DL screenshot and explanation). During the job, however, the only good mechanism is to examine the Reducer logs directly. At some reasonable time after the Reducer has started, you will see it initiate spills to disk (TODO: tell what the log line looks like). At some later point, it will begin merge/sorting those spills (TODO: tell what the log line looks like).

The CPU burden of a merge/sort is disappearingly small against the dominating cost of reading then writing the data to disk. If, for example, your job only triggered one merge/sort pass halfway through receiving its data, the cost of the merge/sort is effectively one and a half times the base cost of writing that data at top speed to disk: all of the data was spilled once, half of it was rewritten as merged output. Comparing the total size of data received by the Reducer to the merge/sort settings will let you estimate the expected number of merge/sort passes; that number, along with the “top speed” figure you collected above, will, in turn, allow you to estimate how long the Reduce should take. Much of this action happens in parallel but it happens in parallel with your Mapper’s mapping, spilling and everything else that is happening on the machine.

A healthy, data-intensive job will have Mappers with nearly top speed throughput, the expected number of merge/sort passes and the merge/sort should conclude shortly after the last Map input is received. (TODO: tell what the log line looks like). In general, if the amount of data each Reducer receives is less than a factor of two to three times its share of machine RAM, (TODO: should I supply a higher-fidelity thing to compare against?) all those conditions should hold. Otherwise, consult the USE method (TODO: REF).

If the merge/sort phase is killing your job’s performance, it is most likely because either all of your Reducers are receiving more data than they can accommodate or because some of your Reducers are receiving far more than their fair share. We will take the uniform distribution case first.

The best fix to apply is to send less data to your Reducers. The chapters on writing Map/Reduce jobs (TODO: REF or whatever we are calling Chapter 5) and the chapter on advanced Pig (TODO: REF or whatever we are calling that now) both have generic recommendations for how to send around less data and throughout the book, we have described powerful methods in a domain-specific context which might translate to your problem.

If you cannot lessen the data burden, well, the laws of physics and economics must be obeyed. The cost of a merge/sort is ‘O(N LOG N)’. In a healthy job, however, most of the merge/sort has been paid down by the time the final merge pass begins, so up to that limit, your Hadoop job should run in ‘O(N)’ time governed by its top speed.

The cost of excessive merge passes, however, accrues directly to the total runtime of the job. Even though there are other costs that increase with the number of machines, the benefits of avoiding excessive merge passes are massive. A cloud environment makes it particularly easy to arbitrage the laws of physics against the laws of economics — it costs the same to run 60 machines for two hours as it does to run ten machines for 12 hours, as long as your runtime stays roughly linear with the increased number of machines, you should always size your cluster to your job, not the other way around. The thresholding behavior of excessive reduces makes it exceptionally valuable to do so. This is why we feel exploratory data analytics is far more efficiently done in an elastic cloud environment, even given the quite significant performance hit you take. Any physical cluster is too large and also too small; you are overpaying for your cluster overnight while your data scientists sleep and you are overpaying your data scientists to hold roller chair sword fights while their undersized cluster runs. Our rough rule of thumb is to have not more than 2-3 times as much total reducer data as you have total child heap size on all the reducer machines you’ll use.

(TODO: complete)

Skewed Data and Stuck Reducers

(TODO: complete)

Reducer Processing

(TODO: complete)

Commit and Replication

(TODO: complete)

Pathology of Tuning (aka "when you should touch that dial")

Mapper

A few map tasks take noticably longer than all the rest

Typically, the only good reason for a map task to run much longer than its peers is if it’s processing a lot more data.

The jobtracker assigns blocks in decreasing order of size to help prevent the whole job waiting on one last mapper. If your input format is non-splitable (eg it’s .bz2 compressed), and some files are huge, those files will take proportionally longer. If these are so imbalanced as to cause trouble, I can’t offer much more than a) don’t write imbalance files, or b) switch to a splitable format.

If some map tasks are very slow, but aren’t processing proportionally more data, look for the following:

  • Check the logs of the slow tasks — did they hit a patch of bad records and spend all their time rescuing a ton of exceptions?

  • Some records take far longer to process than others — for example a complex regexp (regular expression) that requires a lot of backtracking.

  • If the slow tasks always occur on the same machine(s), they might be failing.

  • Check the logs for both the child processes and the daemons — are there timeouts or other errrors?

Tons of tiny little mappers

For jobs with a reduce step, the number of map tasks times the heap size of each mapper should be about twice the size of the input data.

CombineFileInputFormat

…​TODO…​

Many non-local mappers

  • A prior stage used one (or very few) reducers

  • You recently enlarged your cluster

  • HDFS is nearly full

  • Overloaded data nodes

Map tasks "spill" multiple times

Each mapper is responsible for sorting the individual records it emits, allowing the reducers to just do a merge sort. It does so by filing records into a fixed-size sort buffer (the analog of the inbox sorter on the back of each elephant). Each time the sort buffer overflows, the mapper will "spill" it to disk; once finished, it merge sorts those spills to produce its final output. Those merge passes are not cheap — the full map output is re-read and re-written. If the midflight data size is several times your allocable heap, then those extra merge passes are necessary: you should smile at how well Hadoop is leveraging what RAM you have.

However, if a mapper’s midflight size even slightly exceeds the sort buffer size, it will trigger an extra spill, causing a 20-40% performance hit.

On the jobtracker, check that the

Job output files that are each slightly larger than an HDFS block

If your mapper task is slightly expansive (outputs more data than it reads), you may end up with an output file that for every input block emits one full block and almost-empty block. For example, a task whose output is about 120% of its input will have an output block ratio of 60% — 40% of the disk space is wasted, and downstream map tasks will be inefficient.

  1. Check this by comparing (TODO: grab actual terms) HDFS bytes read with mapper output size.

You can check the block ratio of your output dataset with hadoop fsck -blocks

(TODO: capture output)

If your mapper task is expansive and the ratio is less than aobut 60%, you may want to set a min split size of about

Alternatively, turn up the min split size on the next stage, sized so that it

Reducer

Tons of data to a few reducers (high skew)

  • Did you set a partition key?

  • Can you use a finer partition key?

  • Can you use a combiner?

  • Are there lots of records with a NULL key?

    • Here’s a great way to get null keys: j = JOIN a BY akey LEFT OUTER, b by bkey; res = FOREACH j GENERATE bkey AS key, a::aval, b::bval; grouped = GROUP res BY key;. Whenever there’s a key in a with no match in b, bkey will be null and the final GROUP statement will be painful. You probably meant to say …​ GENERATE akey AS key …​..

  • Does your data have intrinsically high skew? — If records follow a long-tail distribution,

  • Do you have an "LA and New York" problem? If you’re unlucky, the two largest keys might be hashed to the same reducer. Try running with one fewer reducers — it should jostle the keys onto different machines.

If you have irrevocably high skew, Pig offers a skewed join operator.

Reducer merge (sort+shuffle) is longer than Reducer processing

Output Commit phase is longer than Reducer processing

Way more total data to reducers than cumulative cluster RAM

If you are generating a huge amount of midflight data for a merely-large amount of reducer output data, you might be a candidate for a better algorithm.

In the graph analytics chapter, we talk about "counting triangles": how many of your friends-of-friends are also direct friends? More than a million people follow Britney Spears and Justin Bieber on Twitter. If they follow each other (TODO: verify that they do), the "obvious" way of counting shared friends will result in trillions (10^12) of candidates — but only millions if results. This is an example of "multiplying the short tails" of two distributions. The graph analytics chapter shows you one pattern for handling this.

If you can’t use a better algorithm, then as they said in Jaws: "you’re going to need a bigger boat".

System

Excessive Swapping

  • don’t use swap — deprovision it.

  • if you’re going to use swap, set swappiness and overcommit

Otherwise, treat the presence of any significant swap activity as if the system were Out of Memory / No C+B reserve.

Out of Memory / No C+B reserve

Your system is out of memory if any of the following occurs:

  • there is no remaining reserve for system caches+buffers

  • significant swap activity

  • OOM killer (the operating system’s Out Of Memory handler) kills processes

For Hadoop’s purposes, if the OS has no available space for caches+buffers, it has already run out of system RAM — even if it is not yet swapping or OOMing

  • check overcommit

You may have to reduce slots, or reduce heap per slot.

Stop-the-world (STW) Garbage collections

STW garbage collection on a too-large heap can lead to socket timeouts, increasing the load and memory pressure on other machines, leading to a cascading degradation of the full cluster.

Checklist

  • Unless your map task is CPU-intensive, mapper task throughput should be comparable to baseline throughput.

  • The number of non-local map tasks is small.

  • Map tasks take more than a minute or so.

  • Either 'Spilled bytes' and 'mapper output bytes' are nearly equal, or 'Spilled bytes' is three or more times 'mapper output bytes'.

  • The size of each output file is not close-to-but-above the HDFS block size

Other

Basic Checks

  • enough disk space

  • hadoop native libraries are discovered (org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library appears in the logs).

  • midstream data uses snappy compression (org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library is available appears in the logs).