From bec6e5631e19447595a650ee18a79865300af5e9 Mon Sep 17 00:00:00 2001 From: "J. McConnell" Date: Tue, 29 Jun 2021 17:32:39 -0400 Subject: [PATCH] Create a jar from the current REPL session when reading a text file from GCS. Beam attempts to stage the jar for the current session as it is on the classpath, but it has not been created at this point. We need to create it as we do with run(). --- .../com/spotify/scio/repl/ReplScioContext.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/scio-repl/src/main/scala/com/spotify/scio/repl/ReplScioContext.scala b/scio-repl/src/main/scala/com/spotify/scio/repl/ReplScioContext.scala index a70ee1bb05..8c73568e74 100644 --- a/scio-repl/src/main/scala/com/spotify/scio/repl/ReplScioContext.scala +++ b/scio-repl/src/main/scala/com/spotify/scio/repl/ReplScioContext.scala @@ -17,14 +17,16 @@ package com.spotify.scio.repl +import com.spotify.scio.values.SCollection + import java.io.File import java.nio.file.Files import java.util.jar.JarOutputStream import java.io.FileOutputStream import java.util.jar.JarEntry - import org.apache.beam.sdk.options.PipelineOptions import com.spotify.scio.{ScioContext, ScioExecutionContext} +import org.apache.beam.sdk.io.Compression import scala.reflect.io.AbstractFile import scala.reflect.io.Path @@ -47,6 +49,15 @@ class ReplScioContext private (options: PipelineOptions, replOutputDir: String, super.run() } + /** + * Get an SCollection for a text file. + * @group input + */ + override def textFile(path: String, compression: Compression): SCollection[String] = { + createJar() + super.textFile(path, compression) + } + /** Ensure an operation is called before the pipeline is closed. */ override private[scio] def requireNotClosed[T](body: => T): T = { require(