Next generation tools for data science



Since inception, this blog has defined “data science” as inference derived from information too large to suit on a single laptop. Thus the flexibility to govern large information is important to our notion of knowledge science. Whereas MapReduce stays a basic device, many attention-grabbing analyses require greater than it will possibly provide. As an illustration, the well-known Mantel-Haenszel estimator can’t be carried out in a single MapReduce. Apache Spark and Google Cloud Dataflow symbolize two alternate options as “subsequent technology” information processing frameworks. This publish compares the 2, counting on the creator’s first-hand expertise and subsequent background analysis.


That MapReduce was the resolution to jot down information processing pipelines scalable to lots of of terabytes (or extra) is evidenced by the large uptake. This was true inside Google in addition to outdoors of Google within the type of Hadoop/MapReduce (for some “Hadoop” and “information science” are synonyms). Nonetheless, it didn’t take lengthy for the ache of writing and working many-stage MapReduce packages to turn out to be obvious. MapReduce’s limitations motivated two completely different teams, AMPLab (Apache Spark) and Google (Cloud Dataflow), to jot down next-generation information processing frameworks. The 2 teams got here on the drawback from completely different vantage factors and this may be seen in each apparent and delicate variations within the end-products. Dataflow was designed as a wedding of Google frameworks for expressing complicated batch pipelines (FlumeJava) and streaming (MillWheel) pipelines. Spark was developed at UC Berkeley to enable exploratory analysis and ML at scale.

Six months in the past, Google donated the Cloud Dataflow programming model and SDKs to the Apache Software Foundation, ensuing within the incubating undertaking Apache Beam. Going ahead, Apache Beam will turn out to be the way in which to precise information processing pipelines, whereas Cloud Dataflow stays as a completely managed service for executing these pipelines. Equally, I’ll consult with the programming mannequin and API as Beam and the service that runs on GCP as Dataflow. Instance code on this publish makes use of the present Dataflow SDK, however related ideas and programming patterns maintain in Apache Beam.

On this publish, I’m going to make clear the similarities and variations of Spark and Beam/Dataflow. For framing functions, Spark’s candy spot is rapidly creating exploratory/interactive evaluation and iterative algorithms, e.g., gradient descent and MCMC, whereas Dataflow’s candy spot is processing streaming information and highly-optimized, strong, fastened pipelines.

The remainder of the publish is organized as follows: a very powerful distinction between the 2, a motivating instance with implementations in Spark and Beam/Dataflow, an instance of an clustering algorithm written in Spark, examples of streaming processing in Spark and Dataflow, and at last, different main variations between the frameworks.

When MapReduce just isn’t sufficient

MapReduce is a good device for single-stage parallel information processing. Just a few ache factors emerge when attempting to make use of MapReduce past small pipelines:

  • Expressing complicated pipelines requires important boilerplate, separate packages, and the “interface” between phases to be recordsdata.
  • Writing intermediate outcomes to disk between phases of pipelines is a severe bottleneck and forces the person to hand-optimize the position of those divisions
  • Performing exploratory evaluation requires studying and writing to disk, which is sluggish
  • Expressing streaming pipelines (low-latency and infinite information sources) just isn’t supported,
  • Writing multi-stage pipelines are simply to come upon, take for instance attempting to measure the consequences of a randomized experiment on ratios of metrics utilizing Mantel-Haenszel (see extra beneath).

Each Beam and Spark resolve most of those issues storing outcomes of MapReduce steps in by optimizing away many steps and storing vital intermediate leads to reminiscence, PCollections for Dataflow and resilient distributed dataset (RDD) for Spark, as a substitute of writing to disk simply to load again into reminiscence once more.

The core distinction: graph analysis

The elemental and enduring distinction between Spark and Beam is that Spark solely builds as a lot of the computation graph as wanted (for lazy analysis on actions) whereas Dataflow builds all the computation graph earlier than it’s optimized and despatched to the service or cluster to be executed. Many important variations between the 2 are a consequence of this distinction. Thus it’s simple to make use of Spark for interactive evaluation (from a python or scala shell) and to prototype ML algorithms however tougher to take action in Dataflow. To be clear, it isn’t that one can’t do ML with Dataflow — single cross, streaming algorithms are tremendous. However many ML algorithms, resembling fundamental gradient descent and MCMC, make data-driven selections of what number of iterations they should run till convergence. Spark supplies the person with higher flexibility.

However, the advantages of requiring a whole computation graph are that it permits techniques that execute Beam pipelines, like Dataflow to optimize the entire graph. Extra importantly, decoupling graph building from execution lets Beam create a language-agnostic illustration of the computation graph. Thus, Dataflow graphs may be executed on different distributed processing back-ends, even together with Spark. This property is what enabled the creation of the Apache Beam project.

One other consequence of Beam’s resolution to require specification of the total computation graph is that Dataflow could also be supplied as a service. This running-a-pipeline-job-as-a-service takes a graph specification and information inputs and produces outputs. The Cloud Dataflow Service may be regarded as a black field the place particulars such because the variety of VMs to make use of, general or per stage, or distribution of labor can all be left as much as the service. In fact, most of those particulars may be specified if desired, see here for a extra expansive dialogue. That is in stark comparability to Spark, which is cluster-oriented as a substitute of job-oriented. To run a hard and fast pipeline in Spark requires spinning up a Spark cluster, working the pipeline and tearing the cluster down. Despite the fact that there are instruments to make this course of simpler (see Cloud Dataproc on GCP or Databricks on AWS) cluster administration stays the accountability of the person.

A motivating instance for going past Hadoop/MapReduce

To match the 2 frameworks, I’m going to resolve the identical drawback twice then spotlight key variations. Suppose now we have on a web based public sale web site with all kinds of merchandise on the market. Customers browse the pictures and specs of merchandise within the stock and will select to bid on them. As a result of it’s an public sale web site, costs for any merchandise fluctuate on a regular basis. Merchandise vary in worth from a number of {dollars} (emoji eraser kits) to hundreds (nitro espresso kits) so an essential method we monitor them is by product class.

We wish to run a randomized management experiment to find out the impact of high-resolution photos on our gross sales. Say, we want to decide the influence of the experiment on common promote worth. The naive factor to do can be merely to check the typical promoting worth of remedy to that of management. Nonetheless this could possibly be deceptive — this measure will present a distinction even when the worth for every particular person merchandise remained fixed and solely the combo of merchandise offered was affected. To be concrete, think about the experiment brought about us to promote comparatively extra nitro espresso kits than emoji kits. The Mantel-Haenszel estimator (also called Cochran-Mantel-Haenszel) is supposed exactly to handle this drawback by adjusting for the combo of things being in contrast between remedy and management. The fundamental concept is first to compute the typical worth individually in remedy and management for every particular person stratum (aka “slice”) after which to take the ratio of weighted mixture of costs in remedy to that of management. The secret’s that the mixture weights are the identical for remedy and management, and these are chosen not directly to reduce estimator variance.

Extensively utilized in medication for depend information, the MH estimator and its generalizations are ubiquitous inside information science at Google. In a future publish we’ll cowl purposes and extensions of MH to analytic issues at Google. For now, we’ll simply go forward and compute level MH estimates for this (non-count) worth information, utilizing product classes as our slices. As an instance in product class $i$ we promote $n_{t,i}$ and $n_{c,i}$ gadgets in remedy and management respectively for a complete sale worth of $X_{t,i}$ and $X_{c,i}$. The MH estimate for the (mix-adjusted) ratio of costs between remedy and management is
MH(t,c)=frac{sum_i w_i frac{X_{t,i}}{n_{t,i}}} {sum_i w_i frac{X_{c,i}}{n_{c,i}}}
$$ the place the weights $w_i$ is the harmonic imply of $n_{t,i}$ and $n_{c,i}$. To facilitate computation, we’ll rewrite the components as
MH(t,c)=frac{sum_i X_{t,i} left(frac{n_{c,i}}{n_{t,i} + n_{c,i}}proper)}
                       {sum_i X_{c,i} left(frac{n_{t,i}}{n_{t,i} + n_{c,i}}proper)}
$$ taking care to omit phrases the place $n_{t,i} + n_{c,i} = 0$. Let’s begin with a csv of the next schema:

 exp_id, product_id, worth, sale_count
If I wrote a MapReduce pipeline to calculate the MH ratio of costs, it could require three MapReduce packages:

  1. Map (if header skip else worth ) → Cut back (sum reducer)
  2. Map (key worth ) → Cut back (no-op — worth is iter)
  3. Map (key <> worth ) → Cut back (sum reducer)

The online consequence can be a major quantity of code, contemplating a trivial word-count MapReduce requires roughly 60 lines of mostly boilerplate Java code. That’s a number of code that I’m not going to jot down. For the curious reader, a lot has been written about Hadoop/Mapreduce versus Spark, resembling this post at Dattamsha.

Compare Spark vs. Google Dataflow/Beam
A mashup of Apache Spark and Dataflow/Beam

MH in python

Readers preferring Python to prose can see the total Jupyter pocket book resolution in my Spark vs Dataflow github undertaking. To make the comparability simpler, I separate the enterprise logic from parallelized code. There are two enterprise logic features, calc_numerator and calc_denominator, that take all of depend and worth information for a specific product and calculate the numerator (and denominator) of the above components.

MH in Apache Spark

The very first thing to do for Spark is begin the Spark shell, which entails including the a number of issues to the PATH and working execfile on pyspark/ (see the pocket book for particulars). Subsequent comes the Spark code:

from operator import add

# We wish to calculate MH(v_{t,i},n_{t,i},v_{c,i},n_{c,i}), the place t and c are remedy
and management. v and n in our instances are worth of the sale costs and sale_count.

input_rdd = sc.textFile(‘sim_data_{0}_{1}.csv’.format(NUM_LE, NUM_HE))
header = input_rdd.first() # Take away the primary line.
parsed_input_rdd = input_rdd.filter(lambda x: x !=header)

.map(lambda x: convert_line(x.cut up(‘,’)))
remodeled = x: ((x[exp], x[prod]),

(x[sale_count]*x[price], x[sale_count])))

(sp, clks) = (0, 1) # sale worth and sale_count
(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count
(exp2, prod2) = (0, 1) # exp_id, product_id

# For every product cross exp_id, sum the sale costs and sale_count
grouped_result = remodeled.reduceByKey(lambda x,y: (x[sp]+y[sp], x[clks]+y[clks]))
grouped_by_product = x: ((x[ep][prod2]), (x[ep][exp2], x[spc][sp], x[spc][clks]))).groupByKey()

numerator_sum = x: calc_numerator(x)).scale back(add)
denominator_sum = x: calc_denominator(x)).scale back(add)
impact = numerator_sum / denominator_sum
print(numerator_sum, denominator_sum, impact)

Spark features fall into two classes: transformations that manipulate the information document by document (e.g., map and filter), and actions that trigger the information to be reorganized (e.g., groupByKey, reduceByKey). The importance of the excellence is that transformations haven’t any quick impact and are solely acted upon when an motion is reached, that’s, Spark does lazy analysis. 

Excluding business-logic, this system is simply a handful of traces of code.

  • sc.textFile(…) transforms a file on disk into an RDD (the distributed information construction in Spark)
  • input_rdd.first() acts on the RDD returning first, header, aspect to the motive force (my pocket book).
  • input_rdd.filter(…).map(…) transforms input_rdd eradicating the header then converts every csv line into floats and ints.
  •…) transforms data into key-value tuples ((exp_id, product_id), (value, clicks))
  • remodeled.reduceByKey(…) acts on remodeled inflicting input_rdd.filter(…).map(…) and…) to be executed and produces the entire clicks and price by (exp_id, product_id)
  •…).groupByKey() acts to supply the identical information, solely grouped by product_id as a substitute of product_id and experiment_id
  •…).scale back(add) transforms the information per product_id into the numerator and denominator of the MH calculation after which performs the motion of summing the outcomes utilizing the add operate.

MH in Apache Beam

The group of the Dataflow code is relatively just like Spark general, with a handful of delicate, however essential distinctions. One of many much less attention-grabbing variations is that Dataflow doesn’t but have a sum operate for the scale back, so I’ve to jot down my very own (t_sum). Word that this code is utilizing the Dataflow SDK (not the brand new Beam SDK).

import apache_beam as beam

def t_sum(values):
   consequence = [0,0]
   for v in values:
       consequence[0] += v[0]
       consequence[1] += v[1]
   return (consequence[0], consequence[1])

# Create a pipeline executing on a direct runner (native, non-cloud).
# DirectPipelineRunner is the default runner, I am setting it right here to indicate how one
# would change it to run on the Dataflow Service.
pipeline_options = beam.utils.choices.PipelineOptions([‘–runner=DirectPipelineRunner’])
p = beam.Pipeline(choices=pipeline_options)

parsed_input_rdd = (p
| ‘load data’ >>‘sim_data_{0}_{1}.csv’.format(NUM_LE, NUM_HE)))
| ‘filter header’ >> beam.Filter(lambda x: x[0] != ‘#’)
| ‘cut up line’ >> beam.Map(lambda x: convert_line(x.cut up(‘,’))))
remodeled = (parsed_input_rdd
| ‘reshape’ >> beam.Map((lambda x: ((x[exp], x[prod]),

(x[price]*x[sale_count], x[sale_count])))))

(sp, clks) = (0, 1) # sale worth and sale_count
(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count
(exp2, prod2) = (0, 1) # exp_id, product_id

# For every product cross exp_id, sum the sale costs and sale_count
grouped_result = (remodeled
| ‘mix per product/id’ >> beam.CombinePerKey(t_sum))
grouped_by_product = (grouped_result
| ‘keyByExpProduct’ >> beam.Map(lambda x: ((x[ep][prod2]),

(x[ep][exp2], x[spc][sp], x[spc][clks])))
| ‘group’ >> beam.GroupByKey())

numerator_sum = (grouped_by_product
| ‘MapForNum’ >> beam.Map(lambda x: calc_numerator(x))
| ‘CombineNum’ >> beam.CombineGlobally(sum))
numerator_sum | ‘save numerator’ >>‘./numerator_sum’))

denominator_sum = (grouped_by_product
| ‘MapForDenom’ >> beam.Map(lambda x: calc_denominator(x))
| ‘CombineDenom’ >> beam.CombineGlobally(sum))
denominator_sum | ‘save denominator’ >>‘./denominator_sum’))

Beam is conceptually two items: pipeline building and pipeline execution. beam.Pipeline() returns a pipeline, say p, on which to construct (utilizing beam.Map, beam.GroupByKey, and so forth.) and executes the pipeline on a cluster, by the Dataflow Service, or in our case, domestically.

  • beam.Pipeline(choices=pipeline_options) begins establishing a pipeline to run domestically. 
  • p |…) | beam.Filter(…) | beam.Map(…) add studying the file, filtering traces that appear like the header (beginning with ‘#’), changing every line into floats and ints to the graph.
  • parsed_input_rdd | beam.Map(…) provides mapping every document to be keyed by exp_id, product_id to the graph
  • remodeled | beam.CombinePerKey(…) | beam.Map(…) | beam.GroupByKey() provides summing clicks and price by exp_id, product_id and regrouping by product_id to the graph
  • grouped_by_product | beam.Map(…) | beam.CombineGlobally(…) provides calculating the numerator/denominator values and the worldwide sum to the graph
  • numerator_sum | beam.Write(…) provides a sync for the numerator (there’s a matching output for the denominator).
  • optimizes constructed graph and ships the consequence to be executed (in our case the native machine)

Comparability of implementations

At a high-level, each implementations look related and use the identical fundamental parallel operations. The primary distinction to level out is that Spark executes the graph solely on actions, e.g., reduceByKey, whereas the runner executing Beam, e.g., Cloud Dataflow, executes the entire graph (when run is invoked). One other distinction is that Dataflow graphs require sources and sinks, which implies outcomes have to be piped to recordsdata and can’t be returned to the calling program (except utilizing the native runner).

Spark’s sweet-spot: iterative algorithms

Spark began as a solution for doing exploratory analysis and Machine Learning. One of many easiest methods to indicate this off is the clustering approach k-means clustering. Ok-means works by repeatedly making use of two steps, assigning factors to clusters and updating cluster facilities, till the placement of the cluster facilities have stabilized. Beneath is the core of the algorithm written in Spark.

# Load the information, take away the primary line, and decide preliminary areas for the clusters.

# We put the x,y factors in pt_rdd, and RDD of PtAgg (a category containing x, y,

# and depend).
pt_rdd = x: PtAgg(x[0], x[1], 1))
MAX_STEPS = 100;  MIN_DELTA = 0.001; delta = 1.0; step = 0
whereas delta > MIN_DELTA and step < MAX_STEPS:
   step += 1
   c_centers_old = copy.deepcopy(c_centers)
   b_c_centers = sc.broadcast(c_centers_old)
   # For each level, discover the cluster its nearer to and add to its complete x, y, and depend
   totals = x: pick_closest_center(x, b_c_centers.worth)).scale back(
       lambda a,b: center_reduce(a,b))
   # Now replace the placement of the facilities because the imply of all the factors closest to it
   # (until there are none, during which case decide a brand new random spot).
   c_centers = [t.Normalize() if t.cnt != 0 else random_point_location() for t in totals]
   # compute the space that every cluster middle strikes, the set the max of these as
   # the delta used to the cease situation.
   deltas = [math.sqrt(c.DistSqr(c_old)) for c, c_old in zip(c_centers, c_centers_old)]
   delta = max(deltas)
s = ‘ ‘.be part of([str(x) for x in c_centers])
print(‘ultimate facilities: {0}’.format(s))

  •…) maps the uncooked into into the PtAgg class we use to symbolize factors
  • sc.broadcast(c_centers_old) sends the areas of the cluster facilities to all machines holding RDDs
  •…).scale back(…) maps every level to the middle it’s closest to after which reduces to supply the typical location of the factors which are closest to every middle (the result’s within the driver script)
  • [t.Normalize() if t.cnt != 0 else random_point_location() for t in totals] updates the brand new cluster areas.
Because it occurs, I didn’t really need to jot down that code as a result of Spark has an ML library (mllib) containing many algorithms together with k-means. In observe, k-means is a just some features calls, details are here.

Right here is the place the distinction between Spark and Beam is most obvious — no iterative algorithm that may cease early, together with k-means, may be expressed in Beam. It is because Beam builds the computation graph, then optimizes and ships it to be executed. With iterative algorithms, the entire graph construction can’t be identified earlier than hand (don’t know what number of loops can be achieved), so it will possibly’t be expressed in Beam. It’s potential to precise “loops” in Dataflow, however solely a hard and fast variety of loops may be added.

Beam/Dataflow’s candy spot: streaming processing

Streaming processing is an ever-increasingly essential matter for information science. In spite of everything, who amongst us actually desires to attend for a day by day batch pipeline to inform us how our stay visitors experiments are doing? For an excellent introduction to streaming processing, I extremely encourage studying Tyler Akidau’s two weblog posts on the subject, The world beyond batch: Streaming 101 and 102.

When Dataflow was launched, its strongest promoting level was streaming (simply learn the paper). That’s to not say that Spark doesn’t help streaming course of or that it lacks a unified engine for batch and streaming. The numerous step Dataflow initially made was (1) a really unified batch and streaming API and (2) help for occasion time processing of knowledge, that’s, evaluation of the outcomes windowed by once they occurred not once they reached the evaluation machine, (3) and a deal with watermarks (a way for monitoring assortment progress on unbounded information) and window completeness (has all the information for this time interval been collected). Extra virtually, Dataflow, now Beam, has a streaming API that cleanly separates the essential questions of streaming processing: what, the place, when, how). It’s value noting that completely different runners presently have completely different ranges of help, e.g., Dataflow helps streaming for Java, however not but for Python.

All that is best to indicate utilizing the examples the Dataflow workforce used of their recent blog post, which can be a superb learn. The starkest distinction between the 2 is proven by their instance of calculating hourly workforce scores for a web based recreation.

Different variations in framework “philosophy”

There are a number of design selections to indicate that Spark favors quick improvement by default and requires customers to opt-in to efficiency whereas Dataflow favors excessive efficiency by default with the price of slower improvement time.


Dataflow avoids the necessity for caching outcomes by “fusing” sibling phases that would wish the identical enter. Fusion comes with the limitation that mutable sorts shouldn’t be used. It is because the values are shared amongst fused sibling and modification would trigger invalid outcomes (that is verified towards at run-time). This implies writing right Beam code with mutable sorts requires considerably extra care. Spark re-computes by default, which may be a lot slower however doesn’t include correctness points. Spark does enable for a lot of completely different caching strategies, should caching be desired.


Dataflow makes use of a quick, language agnostic, serialized byte-level equality for evaluating courses in GroupByKey and different steps. This turns into an enormous situation if this system makes use of grouping operations on courses that include hashmaps, units, or something that causes semantic equality to vary from byte equality. Spark deserializes courses and makes use of the category’s comparability operator for grouping by default and permits person to opt-into byte-level equality making the other tradeoff.

Total efficiency

Sadly, we presently can’t make any definitive statements about which framework has higher general efficiency. The only study that in contrast Beam on the Dataflow Service to Spark used an older model of Spark (1.3). Given the numerous efficiency will increase with Spark 2.0, it’s unclear which framework has higher efficiency at current.


Spark and Dataflow each resolve the restrictions of MapReduce with completely different viewpoints. Which device is most acceptable will rely upon the undertaking. If the main target is interactivity, information exploration or algorithm improvement, Spark might be the higher possibility. If the main target is complicated streaming processing (particularly occasion time) then Beam would appear a greater match. For working scalable manufacturing pipelines, Beam on Cloud Dataflow is probably going the higher selection.


Source link

Write a comment