DEV Community

Cover image for Custom traits in Apache Calcite
Vladimir Ozerov
Vladimir Ozerov

Posted on • Originally published at querifylabs.com

Custom traits in Apache Calcite

Abstract

Physical properties are an essential part of the optimization process that allows you to explore more alternative plans.

Apache Calcite comes with convention and collation (sort order) properties. Many query engines require custom properties. For example, distributed and heterogeneous engines that we often see in our daily practice need to carefully plan the movement of data between machines and devices, which requires a custom property to describe data location.

In this blog post, we will explore how to define, register and enforce a custom property, also known as a trait, with Apache Calcite cost-based optimizer.

Physical Properties

We start our journey by looking at the example of common physical property - sort order.

Query optimizers work with relational operators, such as Scan, Project, Filter, and Join. During the optimization, an operator may require its input to satisfy a specific condition. To check whether the condition is satisfied, operators may expose physical properties - plain values associated with an operator. Operators may compare the desired and actual properties of their inputs and enforce the desired property by injecting a special enforcer operator on top of the input.

Consider the join operator t1 JOIN t2 ON t1.a = t2.b. We could use a merge join if both inputs are sorted on their join attributes, t1.a and t2.b, respectively. We may define the collation property for every operator, describing the sort order of produced rows:

Join[t1.a=t2.b]
  Input[t1]      [SORTED by a]
  Input[t2]      [NOT SORTED]
Enter fullscreen mode Exit fullscreen mode

The merge join operator may enforce the sorting on t1.a and t2.b on its inputs. Since the first input is already sorted on t1.a, it remains unchanged. The second input is not sorted, so the enforcer Sort operator is injected, making a merge join possible:

MergeJoin[t1.a=t2.b]  
  Input[t1]           [SORTED by t1.a]
  Sort[t2.a]          [SORTED by t2.b]
    Input[t2]         [NOT SORTED]
Enter fullscreen mode Exit fullscreen mode

Apache Calcite API

In Apache Calcite, properties are defined by the RelTrait and RelTraitDef classes. RelTrait is a concrete value of the property. RelTraitDef is a property definition, which describes the property name, expected Java class of the property, the default value of the property, and how to enforce the property. Property definitions are registered in the planner via the RelOptPlanner.addRelTraitDef method. The planner will ensure that every operator has a specific value for every registered property definition, whether the default or not.
All properties of a node are organized in an immutable data structure RelTraitSet. This class has convenient methods to add and update properties with copying semantics. You may access the properties of a concrete operator using the RelOptNode.getTraitSet method.
To enforce a specific property on the operator during planning, you should do the following from within the rule:

  1. Get the current properties of a node using RelOptNode.getTraitSet method.
  2. Create a new instance of RelTraitSet with updated properties.
  3. Enforce the properties by calling the RelOptRule.convert method.

Finally, before invoking the planner program, you may define the desired properties of the root operator of the optimized relational tree. After the optimization, the planner will either return the operator that satisfies these properties or throw an exception.

Internally, the Apache Calcite enforces properties by adding a special AbstractConverter operator with the desired traits on top of the target operator.

AbstractConverter [SORTED by a]
  Input[t2]       [NOT SORTED]
Enter fullscreen mode Exit fullscreen mode

To transform the AbstractConverter into a real enforcer node, such as Sort, you should add the built-in ExpandConversionRule rule to your optimization program. This rule will attempt to expand the AbstractConverter into a sequence of enforcers to satisfy the desired traits. We have only one unsatisfied property in our example, so the converter expands into a single Sort operator.

Sort[t2.a]        [SORTED by a]
  Input[t2]       [NOT SORTED]
Enter fullscreen mode Exit fullscreen mode

You may use your custom expansion rule if needed. See Apache Flink custom rule as an example.

Custom Property

As we understand the purpose of properties and which Apache Calcite API to use, we will define, register, and enforce our custom property.

Consider that we have a distributed database, where every relational operator might be distributed between nodes in one of two ways:

  1. PARTITIONED - relation is partitioned between nodes. Every tuple (row) resides on one of the nodes. An example is a typical distributed data structure.
  2. SINGLETON - relation is located on a single node. An example is a cursor that delivers the final result to the user application.

In our example, we would like to ensure that the top operator always has a SINGLETON distribution, simulating the results' delivery to a single node.

Enforcer

First, we define the enforcer operator. To ensure the SINGLETON distribution, we need to move from all nodes to a single node. In distributed databases, data movement operators are often called Exchange. The minimal requirement for a custom operator in Apache Calcite is to define the constructor and the copy method.

public class ExchangeRel extends SingleRel {
    public RedistributeRel(
        RelOptCluster cluster,
        RelTraitSet traits,
        RelNode input
    ) {
        super(cluster, traits, input);
    }

    @Override
    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
        return new ExchangeRel(getCluster(), traitSet, inputs.get(0));
    }
}
Enter fullscreen mode Exit fullscreen mode

Trait

Next, we define our custom trait and trait definition. Our implementation must adhere to the following rules:

  1. The trait must refer to a common trait definition instance in the method getTraitDef.
  2. The trait must override the satisfies method to define whether the current trait satisfies the target trait. If not, the enforcer will be used.
  3. The trait definition must declare the expected Java class of the trait in the getTraitClass method.
  4. The trait definition must declare the default value of the trait in the getDefault method.
  5. The trait definition must implement the method convert, which Apache Calcite will invoke to create the enforcer if the current trait doesn't satisfy the desired trait. If there is no valid conversion between traits, null should be returned.

Below is the source code of our trait. We define two concrete values, PARTITIONED and SINGLETON. We also define the special value ANY, which we use as the default. We say that both PARTITIONED and SINGLETON satisfy ANY but PARTITIONED and SINGLETON do not satisfy each other.

public class Distribution implements RelTrait {

    public static final Distribution ANY = new Distribution(Type.ANY);
    public static final Distribution PARTITIONED = new Distribution(Type.PARTITIONED);
    public static final Distribution SINGLETON = new Distribution(Type.SINGLETON);

    private final Type type;

    private Distribution(Type type) {
        this.type = type;
    }

    @Override
    public RelTraitDef getTraitDef() {
        return DistributionTraitDef.INSTANCE;
    }

    @Override
    public boolean satisfies(RelTrait toTrait) {
        Distribution toTrait0 = (Distribution) toTrait;

        if (toTrait0.type == Type.ANY) {
            return true;
        }

        return this.type.equals(toTrait0.type);
    }

    enum Type {
        ANY,
        PARTITIONED,
        SINGLETON
    }
}
Enter fullscreen mode Exit fullscreen mode

Our trait definition defines the convert function, which injects the ExchangeRel enforcer if the current property doesn't satisfy the target one.

public class DistributionTraitDef extends RelTraitDef<Distribution> {

    public static DistributionTraitDef INSTANCE = new DistributionTraitDef();

    private DistributionTraitDef() {
        // No-op.
    }

    @Override
    public Class<Distribution> getTraitClass() {
        return Distribution.class;
    }

    @Override
    public String getSimpleName() {
        return "DISTRIBUTION";
    }

    @Override
    public RelNode convert(
        RelOptPlanner planner,
        RelNode rel,
        Distribution toTrait,
        boolean allowInfiniteCostConverters
    ) {
        Distribution fromTrait = rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);

        if (fromTrait.satisfies(toTrait)) {
            return rel;
        }

        return new ExchangeRel(
            rel.getCluster(),
            rel.getTraitSet().plus(toTrait),
            rel
        );
    }

    @Override
    public boolean canConvert(
        RelOptPlanner planner,
        Distribution fromTrait,
        Distribution toTrait
    ) {
        return true;
    }

    @Override
    public Distribution getDefault() {
        return Distribution.ANY;
    }
}
Enter fullscreen mode Exit fullscreen mode

You would likely have more distribution types, dedicated distribution columns, and different exchange types in production implementations. You may refer to Apache Flink as an example of a real distribution trait.

Putting It All Together

Let's see the new trait in action. The complete source code is available here.

First, we create a schema with a couple of tables - one with PARTITIONED distribution and another with SINGLETON distribution. We use custom table and schema implementation, similar to the ones we used in the previous blog post.

// Table with PARTITIONED distribution.
Table table1 = Table.newBuilder("table1", Distribution.PARTITIONED)
  .addField("field", SqlTypeName.DECIMAL).build();

// Table with SINGLETON distribution.
Table table2 = Table.newBuilder("table2", Distribution.SINGLETON)
  .addField("field", SqlTypeName.DECIMAL).build();

Schema schema = Schema.newBuilder("schema").addTable(table1).addTable(table2).build();
Enter fullscreen mode Exit fullscreen mode

Then we create a planner instance and register our custom trait definition in it.

VolcanoPlanner planner = new VolcanoPlanner();

planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(DistributionTraitDef.INSTANCE);
Enter fullscreen mode Exit fullscreen mode

Finally, we create a table scan operator for each of our tables and enforce the SINGLETON distribution. Notice that we use the aforementioned ExpandConversionRule in our optimization program. Otherwise, the enforcement will not work.

// Use the built-in rule that will expand abstract converters.
RuleSet rules = RuleSets.ofList(AbstractConverter.ExpandConversionRule.INSTANCE);

// Prepare the desired traits with the SINGLETON distribution.
RelTraitSet desiredTraits = node.getTraitSet().plus(Distribution.SINGLETON);

// Use the planner to enforce the desired traits
RelNode optimizedNode = Programs.of(rules).run(
    planner,
    node,
    desiredTraits,
    Collections.emptyList(),
    Collections.emptyList()
);
Enter fullscreen mode Exit fullscreen mode

Now we run the TraitTest from the sample project to see this in action. For the PARTITIONED table, the planner has added the ExchangeRel to enforce the SINGLETON distribution.

BEFORE:
2:LogicalTableScan(table=[[schema, partitioned]])

AFTER:
7:ExchangeRel
  2:LogicalTableScan(table=[[schema, partitioned]])
Enter fullscreen mode Exit fullscreen mode

But the table with the SINGLETON distribution remains unchanged because it already has the desired distribution.

BEFORE:
0:LogicalTableScan(table=[[schema, singleton]])

AFTER:
0:LogicalTableScan(table=[[schema, singleton]])
Enter fullscreen mode Exit fullscreen mode

Congratulations! Our custom property is ready.

Summary

Physical properties are an important concept in query optimization that allows you to explore more alternative plans.

In this blog post, we demonstrated how to define the custom physical property in Apache Calcite. We created custom RelTraitDef and RelTrait classes, registered them in the planner, and used the custom operator to enforce the desired value of the property.

However, we omitted one crucial question - how to propagate properties between operators? It turns out, Apache Calcite cannot do this well, and you will have to make a tough decision choosing between several non-ideal solutions. We will discuss property propagation in detail in future posts. Stay tuned!

We are always ready to help you with your SQL query optimizer design. Just let us know.

Top comments (0)