Back to Blog Home

Sentry for Data: Easier, Faster Apache Beam Debugging

Osmar Coronel image
Tony Xiao image
Syd Ryan image

Osmar Coronel, Tony Xiao, Syd Ryan -

In our Sentry for Data series, we explain precisely why Sentry is the perfect tool for your data team. The present post focuses on how we used Sentry to make debugging Apache Beam easier (and faster).

Since its creation, Sentry has embraced a single vision: help all developer teams build the best software, faster. We want to give developers the information they need to resolve issues quickly, without having to dig through noisy log lines. When the code that powers data pipelines breaks, data engineers often need more context than is available in logs to solve the issue.

Sentry’s data team feels the pains of searching logs first-hand. We build pipelines that run computations (aggregations, sums, etc.) over streaming data where the data stream never actually “ends.” We chose Apache Beam as our execution framework to manipulate, shape, aggregate, and estimate data in real time. Beam provides out-of-the-box support for technologies we already use (BigQuery and PubSub), which allows the team to focus on understanding our data.

While we appreciate these features, errors in Beam get written to traditional log files. In an attempt to capture these lost errors, we eliminated the need to search through logs to debug Beam by integrating Sentry into Beam’s Python and Java SDKs. Hooray!

Sentry + Beam + Python

Beam’s distributed execution model makes it tricky to instrument; the python SDK serializes our user code and uploads it for Google Dataflow to execute. As a result, injecting Sentry code into Beam is limited to a few files and the injected code has to be formatted with specific signatures.

With these restrictions in mind, we injected Sentry into the ParDo class, which prevented the integration from catching any errors derived from classes who do not inherit ParDo. For example, errors from any class that inherits from PTransform (and executes process functions) would not be included.

Installation

When running your pipeline, include the Beam Integration to your Sentry init.

import sentry_sdk
from sentry_sdk.integrations.beam import BeamIntegration

# Sentry init
integrations = [BeamIntegration()]
sentry_sdk.init(dsn="YOUR DSN", integrations=integrations)

Note: Make sure that you install the sentry_sdk in all your workers by running the --requirements_file flag, with https://github.com/getsentry/sentry-python/releases/tag/0.11.1

Errors should now be directed and reported to Sentry. In the example below, we expected to look up a key in a dictionary, but all_loaded quickly uncovers that we’re actually indexing into a string (oops!).

Sentry Issue display of Dataflow error received via Beam integration

Error received from Dataflow using the Beam integration as the mechanism.

Sentry + Beam + Java

Debugging exceptions in Java is intimidating — period. When working with Beam, we have to think about the distributed nature of the workers and provide sufficient context for the problem to be actionable.

xkcd comic about data pipeline

Credit: xkcd.com

Installation

Let’s see how we can use Sentry to help us debug Java exceptions. First, install the Sentry’s Beam integration.

git clone --branch=sentry-beam https://github.com/getsentry/sentry-java.git
cd sentry-java/sentry-beam
mvn clean install

Now add the following to your pom.xml with the correct version.

<dependency>
    <groupId>io.sentry</groupId>
    <artifactId>sentry-beam</artifactId>
    <version>VERSION</version>
</dependency>

Because of the distributed nature of Beam, setting SENTRY_DSN won’t be sufficient for the runners to pick it up. We have to specify SENTRY_DSN as an option so we can use it in the runners.

public interface MyOptions extends PipelineOptions {
    @Description("The Sentry DSN")
    String getSentryDsn();
    void setSentryDsn(String value);
}

public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(MyOptions.class);

    // create and run your pipeline using these options
}

Sentry DSN can now be designated at the command line with -Dexec.args="--sentryDsn=noop://localhost/1".

For starters, let’s wrap our main function to catch any errors that already occurred.

public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(MyOptions.class);

    Sentry.init(options.getSentryDsn());

    try {
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply(...) // apply your PTransforms
        pipeline.run().waitUntilFinish()
    } catch (Exception e) {
        Sentry.capture(e);
        throw e;
    }
}

Now we can capture any exceptions that propagate up our main function.

However, exceptions thrown in the runners (except the DirectRunner) do not propagate up to our main function. They are consumed at the runner level and, as a result, not reported to Sentry. Let’s fix this.

static class MyDoFn extends DoFn<String, String> {
    private String sentryDsn;

    // You'll have to pass the dsn to the `DoFn` so that
    // it is serialized and accessible on the runner
    public MyDoFn(String sentryDsn) {
        this.sentryDsn = sentryDsn;
    }

    @Setup
    public void setup() {
        // Make sure to re-initialize Sentry with your DSN.
        Sentry.init(sentryDsn);
    }

    @ProcessElement
    public void processElement(
        @Element String element,
        OutputReceiver<String> receiver,

        // these are optional arguments
        @Timestamp Instant timestamp,
        BoundedWindow boundedWindow,
        PaneInfo paneInfo,
        PipelineOptions pipelineOptions,
    ) {
        try {
            // apply your transformations here
        } catch (Exception e) {
            // Use `SentryBeam` here to attach additional tags
            SentryBeam.withTimestamp(timestamp)
                    .withBoundedWindow(boundedWindow)
                    .withPaneInfo(paneInfo)
                    .withPipelineOptions(pipelineOptions)
                    .capture(e);

            // When using `DirectRunner`, throwing this exception
            // here will cause it to propagate to the `main` function
            throw e;
        }
    }
}

Context about errors in Beam should now appear in Sentry’s UI.

Tags associated with error received through Beam integration.

Tags add useful context and provide an efficient interface for searching exceptions.

Deploy data pipelines with confidence

We’ve found this tooling to be essential to our success building data pipelines on top of Apache Beam. Hopefully these integrations will help you deploy with confidence as well.

If you have any feedback, want more features, or need help using the tool, open an issue on the GitHub repository or shout out to our support engineers. They’re here to help. And also to code. But mostly to help.

We’re also attending Beam Summit in Las Vegas September 11th and 12th. Reach out if you’d like to chat more about Beam + Sentry.

Share

Share on Twitter
Share on Facebook
Share on HackerNews
Share on LinkedIn

Published

Sentry Sign Up CTA

Code breaks, fix it faster

Sign up for Sentry and monitor your application in minutes.

Try Sentry Free

Topics

SDK Updates

The best way to debug slow web pages

Listen to the Syntax Podcast

Of course we sponsor a developer podcast. Check it out on your favorite listening platform.

Listen To Syntax
    TwitterGitHubDribbbleLinkedinDiscord
© 2024 • Sentry is a registered Trademark
of Functional Software, Inc.