Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,23 @@ object ArrowUtils extends LazyLogging {

/**
* Converts an Arrow Schema into Texera Schema.
* Checks field metadata to detect LARGE_BINARY types.
* Checks field metadata to recover types that share an Arrow representation
* (LARGE_BINARY and ANY both ride on Utf8).
*
* @param arrowSchema The Arrow Schema to be converted.
* @return A Texera Schema.
*/
def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): Schema =
Schema(
arrowSchema.getFields.asScala.map { field =>
val isLargeBinary = Option(field.getMetadata)
.exists(m => m.containsKey("texera_type") && m.get("texera_type") == "LARGE_BINARY")
val taggedType = Option(field.getMetadata)
.flatMap(m => Option(m.get("texera_type")))
.collect {
case "LARGE_BINARY" => AttributeType.LARGE_BINARY
case "ANY" => AttributeType.ANY
}

val attributeType =
if (isLargeBinary) AttributeType.LARGE_BINARY else toAttributeType(field.getType)
val attributeType = taggedType.getOrElse(toAttributeType(field.getType))
new Attribute(field.getName, attributeType)
}.toList
)
Expand Down Expand Up @@ -232,16 +236,22 @@ object ArrowUtils extends LazyLogging {

/**
* Converts an Amber schema into Arrow schema.
* Stores AttributeType in field metadata to preserve LARGE_BINARY type information.
* Stores AttributeType in field metadata to preserve LARGE_BINARY and ANY,
* which both collapse onto Utf8 in Arrow.
*
* @param schema The Texera Schema.
* @return An Arrow Schema.
*/
def fromTexeraSchema(schema: Schema): org.apache.arrow.vector.types.pojo.Schema = {
val arrowFields = schema.getAttributes.map { attribute =>
val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) {
val metadataTag = attribute.getType match {
case AttributeType.LARGE_BINARY => "LARGE_BINARY"
case AttributeType.ANY => "ANY"
case _ => null
}
val metadata = if (metadataTag != null) {
val map = new util.HashMap[String, String]()
map.put("texera_type", "LARGE_BINARY")
map.put("texera_type", metadataTag)
map
} else null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,26 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
)
}

it should "lose the ANY distinction (round-trips as STRING)" in {
// Pin: ANY fromAttributeType produces Utf8 with no metadata. toAttributeType
// then can only see Utf8, so the recovered type is STRING. Documenting this
// information loss so a future fix that round-trips ANY can break the spec.
it should "preserve ANY through the metadata-based path" in {
val original = Schema(List(new Attribute("v", AttributeType.ANY)))
val recovered = ArrowUtils.toTexeraSchema(ArrowUtils.fromTexeraSchema(original))
recovered.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe List(
("v", AttributeType.STRING)
("v", AttributeType.ANY)
)
}

it should "attach texera_type=ANY metadata to ANY fields and only those" in {
val schema = Schema(
List(
new Attribute("v", AttributeType.ANY),
new Attribute("name", AttributeType.STRING)
)
)
val arrow = ArrowUtils.fromTexeraSchema(schema)
val fields = arrow.getFields.asScala.toList
val any = fields.find(_.getName == "v").get
val name = fields.find(_.getName == "name").get
any.getMetadata.get("texera_type") shouldBe "ANY"
Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) shouldBe false
}
}