Run non-native code in Apache Beam/Dataflow

0 0 vote
Article Rating

To run external code (non-native code) in Apache Beam/Dataflow is not straight forward, especially with little documentation/support available online. Apache beam supports Python and Java natively and migrating your existing workloads to a distributed processing engine is a little easier if you are already using these languages. However, many parallel workloads use non-native binaries/libraries written in C++. To run these workloads in Apache beam or Dataflow, first, you need to have these runtimes installed on the VM and then you can use them in the pipeline. Google has a nice article related to this with more details.

If you have a custom installation of Beam, you can easily install the runtime on the VM and then trigger your job. With a serverless model of Google Dataflow, you don’t have access to the VM. You also cannot change the container image that Dataflow is using to run your pipeline.

Not all hope is lost, you can programatically install the runtime dependencies to workaround this and run non-native code in Apache Beam/Dataflow.

For this example, instead of running a C++ binary, I am running python dependencies from a Java beam/dataflow pipeline. The idea is the same, python is non-native for a Java Dataflow runtime. So, let’s check the code.

package com.test.nonnative.dependencies;

import org.apache.beam.sdk.transforms.DoFn;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class SetupPythonFn extends DoFn<String, String> {
    @Setup
    public void setup() throws Exception {
        this.executeShellCommand("apt update && apt -y install python3-pip && apt -y install python3 && pip3 install -U scikit-learn");
        this.executeShellCommand("python3 -c 'from sklearn import datasets'");
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        c.output("");
    }

    private void executeShellCommand(String command) throws Exception {
        try {
            System.out.println("Executing shell command " + command);
            
            Process process = Runtime.getRuntime().exec(new String[]{"/bin/bash", "-c", command});

            StringBuilder output = new StringBuilder();
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(process.getInputStream()));

            String line;
            while ((line = reader.readLine()) != null) {
                output.append(line + "\n");
            }

            int exitVal = process.waitFor();
            if (exitVal == 0) {
                System.out.println("Success!");
                System.out.println(output.toString());
            } else {
                System.out.println(output.toString());
                System.out.println("Exit code is non-zero");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ParDo transformation of Beam allows you to specify a custom function that you can apply to your partition of data. This custom function has to extend DoFn abstract class. See here for the documentation. DoFn has its own lifecycle and different events are fired at different times. More information can be found in the documentation but the most important event is tagged with @ProcessElement annotation. For each row of your partitioned data, this event is raised so that you can apply your custom transformation to it.

It also comes with other handy events like @Setup and @TearDown where for your partition you can initialize any external resource or cleanup resources respectively. These events are not raised for each input row but rather are executed for a bundle of elements or you can say that for each worker that Dataflow spins up. In the code above, I am using the @Setup to install my dependencies by executing multiple shell commands. First command installs pip, python and scikit-learn using native debian package manager.

apt update && apt -y install python3-pip && apt -y install python3 && pip3 install -U scikit-learn

Second command executes an import statement inside a python shell. You can do whatever you like here. To show the audience about the possibilities, I used scikit-learn to show that runtime environment for python is ready.

python3 -c 'from sklearn import datasets'

Here is the pipeline code itself. Nothing fancy, just reading a file and applying custom SetupPythonFn function on partition.

package com.test.nonnative.dependencies;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.test.nonnative.dependencies.SetupPythonFn;

public class TestPipeline {
    public interface TestPipelineOptions extends PipelineOptions {
        @Description("Input for the pipeline")
        @Default.String("/random/input/param")
        @Validation.Required
        String getInputFilePath();
        void setInputFilePath(String input);
    }

    public static void main(String[] args) {
        PipelineOptionsFactory.register(TestPipelineOptions.class);
        TestPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TestPipelineOptions.class);

        Pipeline p = Pipeline.create(options);
        p
            .apply(TextIO.read().from(options.getInputFilePath()))
            .apply(ParDo.of(new SetupPythonFn()));
        p.run();
    }
}

Note: It can be quite difficult to figure out the right commands to execute because it depends on the underlying OS image being used. To ease this, it is better to find out the container that dataflow is using from the UI and then run it locally yourself and play with it. Figure out all the commands and save some time :). For the above example, I used the following

docker run -it --entrypoint /bin/bash gcr.io/cloud-dataflow/v1beta3/beam-java-batch:beam-2.18.0

Generally, for usecases like this an environment where you have more control on the infrastructure is better. See the following diagram to decide if a serverless distributed engine is better suited for you or a more controlled engine is better.

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x