-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hubspot cell balancer & normalizer #126
base: hubspot-2.5
Are you sure you want to change the base?
Conversation
Prioritize balance by region and THEN evening out cell isolation
… if it goes negative
Remove all the debugging changes, generally make ready for real review & merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry mate, this is dense, I've done what I can this afternoon.
One macro-comment is that we probably don't want this merged into this repo, but to ship it as an extension like we do a coprocessor or custom filter (if we have custom filters). Instead of patching the balancer or normalizer, you'd swap in your own implementation via configuration injection, i.e.,
<name>hbase.master.normalizer.class</name> |
@@ -0,0 +1,252 @@ | |||
package org.apache.hadoop.hbase.hubspot; | |||
|
|||
import org.agrona.collections.Int2IntCounterMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is agrona? It's already on the master classpath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure - I used this after seeing it in the balancer cluster state representation. It seems to be a fairly efficient counter map
long regionId = obj.get("regionId").getAsLong(); | ||
int replicaId = obj.get("replicaId").getAsInt(); | ||
JsonObject tableName = obj.get("tableName").getAsJsonObject(); | ||
JsonArray startKey = obj.get("startKey").getAsJsonArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An array of bytes? Wouldn't this be better as a Base64 encoded string or some such?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, you may want to extend/use/copy the existing Json configuration produced by GsonFactory ; it provides byte[] serialization.
|
||
private HubSpotCellUtilities() {} | ||
|
||
public static String toCellSetString(Set<Short> cells) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a name conflict -- HBase already uses the term "Cell" extensively, to mean a key-value pair with all of its trimmings. If you want this code to colocate in HBase, I recommend a name like "tenant partition" or something like that.
} | ||
|
||
public static boolean isStopInclusive(byte[] endKey) { | ||
return (endKey == null || endKey.length != 2) && (endKey == null || endKey.length <= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's with this magic number 2
? Oh, your cell prefix is byte[2]
. Maybe put this in a named constant?
return 0; | ||
} | ||
|
||
Set<Short> cellsInRegions = Arrays.stream(regionInfos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I wonder if it'll be easier to represent the cells as a fixed-length BitSet.
RACK | ||
RACK, | ||
// HubSpot addition | ||
HUBSPOT_CELL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird I wonder why Region Server Group is not accounted for here.
Arrays.stream(cluster.regions) | ||
.flatMap(region -> HubSpotCellUtilities.toCells(region.getStartKey(), region.getEndKey(), HubSpotCellUtilities.MAX_CELL_COUNT).stream()) | ||
.forEach(cellOnRegion -> cellCounts[cellOnRegion]++); | ||
double[] cellPercents = new double[HubSpotCellUtilities.MAX_CELL_COUNT]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused.
List<Map<Short, Integer>> cellGroupSizesPerServer, | ||
int targetRegionsPerServer | ||
) { | ||
Optional<Integer> fromServerMaybe = pickOverloadedServer(cluster, targetRegionsPerServer, ComparisonMode.ALLOW_OFF_BY_ONE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: OptionalInt
.
return BalanceAction.NULL_ACTION; | ||
} | ||
int toServer = toServerMaybe.get(); | ||
short cell = pickMostFrequentCell(cluster, cellCounts, cellGroupSizesPerServer.get(fromServer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this filter by Cells that are already over-represented on the target host and return a NULL_ACTION instead of making an oversaturated situation worse?
|
||
enum ComparisonMode { | ||
STRICT, | ||
ALLOW_OFF_BY_ONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
off-by-one is generally enough slop?
Small clusters may not have enough regions/cell to support lower isolation
…al review & merge"
Revert "Remove all the debugging changes, generally make ready for real review & merge"
Overview
HubSpot is experimenting internally with a cellular architecture for some of our most critical hbase tables. At a high level, this involves using the first two bytes of a row key to partition the set of rows into n (fixed to 360 in this early draft) distinct partitions. Invariants that establish that regions are entirely contained by a single cell (so no merging across cell lines) combined with an update to the balancer that makes region to server assignment cell aware allow us to trade off performance (maximized by a uniform distribution of data to regions to servers) against isolation (maximized by placing all cell data on a minimal set of servers with no overlap with other cells), with a minimal (or even zero) impact on the size of the cluster required, or the operational burden of running it. There are two major parts to this PR, and I'll address them in separate sections. At a high level, here are the outcomes this PR enables:
In testing, the cluster we use has 550-600 region servers, 31,000-34,000 regions, and is about 130TB compressed.
Normalizer
We assume that the initial set of regions in the cluster are cell-aligned (that is, all of the data within every region comes from a single cell, but a single cell may (and probably will) require more than one region to represent it). Then, we update the normalizer such that routine cluster operations (merging regions that are too small, or splitting regions that are too large) cannot break it. Because every region starts contained within a cell, we do not need to adjust the split logic. We do need to adjust merging. In particular: when computing runs of small regions that may be merged, cell aware tables should stop when a run crosses a boundary and subsequent regions no longer belong to the same cell. These changes are contained within the SimpleRegionNormalizer, and are relatively simple.
Balancer
The cell-aware balancer represents the bulk of the complexity for this change. A quick review of hbase's balancer. hbase models the balance of a cluster (or table on a cluster) as an objective function, which is a scaled sum of constituent parts. A static threshold is set that determines when the table is unbalanced enough to require rebalancing. When a rebalance is required, a stochastic greedy algorithm is used which approximates a method of gradient descent - generator functions are randomly selected to propose iterative cluster state changes, which are accepted only if they improve the global balance state for the cluster (differentiating the approach from e.g. simulated annealing which might permit a temporary worsening, to avoid getting caught in a local optimum).
So, this PR introduces a new cost function (the HubSpotCellCostFunction) and a new generator (HubSpotCellBasedCandidateGenerator) to propose iterative mutative steps for the cluster.
Cell cost function
Our cost function captures two guiding principles for a cellular table, with s servers, r regions, and c cells:
⌊r/s⌋
or⌊r/s⌋ + 1
)Our cost function has two parts, one for each of these principles. To capture (1), it includes the raw count of the number of servers with a count of regions outside of the two permissible values listed above. To capture (2), we consider the concept of a server-cell (i.e. the unique instance of a given cell on a given server), and consider a desired capacity defined by the max cells per server scaled by s. In our cluster (with 585 servers), if the bound on cells per server were 36 (10%), then the capacity of server cells would be 21,060. The imbalance of the cluster (from a cell to server perspective) is represented as the number of server-cells overloaded on the cluster, scaled by that capacity. A server-cell is overloaded if the server (excluding that cell) already has the configurable bound per server of cells. In a concrete example: if a server has 50 regions (each of which is for a different cell), 50 distinct cells, and the bound is 36, then that server will have 14 excess server-cells. The sum of all excess server-cells over all servers, scaled by the overall capacity of server-cells, provides a normalized (ranging from 0-1) metric describing imbalance.
This construction relies on the step generator acting to maximize the count of distinct cells per server, and the stochastic harness to discard proposed solutions that might go over the configured limit.
Cell step generator
The step generator is by far the most complex component of this change. It is highly stochastic: at every step when "pick" is used, we utilize a reservoir random online sampling approach to do an efficient single-pass random selection (subject to some filtering/optimizing criterion). With that in mind, the step generator operates as follows.
Consider a cellular table, with s servers, r regions, and c cells:
⌊r/s⌋
):i.If there are any servers with excess supply (count of region is more than
⌊r/s⌋
), pick a server with the most regions, and move a region from it to the underloaded server⌊r/s⌋ + 1
)i. If there are any servers with excess capacity (count of region is exactly
⌊r/s⌋
), pick one such server and move a region from the overloaded serveri. pick a server with the most cells
ii. pick the least frequent cell (by representing region) on that server
iii. pick another server and cell such that the other server's cell is present on the picked server from (i)
iv. swap our selected server/cell pairs
i. pick a server with the fewest cells
ii. pick the most frequent cell (by representing region) on that server
iii. pick another server and cell such that the other cell is not present on our selected server
iv. swap our selected server/cell pairs
This is not recommended for target cell per server counts approaching 1 (anything <4 is likely to not converge), but should work very well for bounds that represent 30-70% of a given server's region capacity.
Future work
TableDescriptor
. We are already starting to think about the concept of balancer conditionals (see initial work here), and a future implementation of this, well generalized, might build on that concept. In keeping with this concept, the normalizer currently offers restrictions on how regions can be split (with e.g. the DelimitedKeyPrefixRegionSplitRestriction ), but there is no corresponding restriction on merges. A simple prefix-based merge restriction could also be implemented to simplify how we maintain groupings and isolation.