DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Apache SeaTunnel 2.3.8 JDBC Connector Development Guide

Overview

This guide helps developers quickly understand SeaTunnel 2.3.8's framework and implement JDBC connector development.

Prerequisites

Essential documentation (located in Docs directory):

  • docs/en/concept/config.md
  • docs/en/concept/connector-v2-features.md
  • docs/en/concept/JobEnvConfig.md
  • docs/en/connector-v2/sink/Jdbc.md
  • docs/en/connector-v2/sink-common-options.md
  • docs/en/connector-v2/source-common-options.md

Core Development Workflow

  1. Clone Repository

git clone https://github.com/apache/seatunnel.git

  1. Build Project

mvn clean install -Dmaven.test.skip=true

  1. Run Example Execute fake_to_console.conf in seatunnel-examples module:

Image description

  1. Build Distribution

Image description

JDBC Connector Implementation

Package Structure

Image description

Key Components

Focus on two core packages:

  1. catalog - Database metadata handling
  2. internal.dialect - Database-specific implementations

Catalog Implementation

Image description

MySqlCatalogFactory

`@AutoService(Factory.class)
public class MySqlCatalogFactory implements CatalogFactory {

@Override
public String factoryIdentifier() {
    return DatabaseIdentifier.MYSQL;
}

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
    String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
    Preconditions.checkArgument(
            StringUtils.isNoneBlank(urlWithDatabase),
            "Miss config <base-url>! Please check your config.");
    JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
    return new MySqlCatalog(
            catalogName,
            options.get(JdbcCatalogOptions.USERNAME),
            options.get(JdbcCatalogOptions.PASSWORD),
            urlInfo);
}

@Override
public OptionRule optionRule() {
    return JdbcCatalogOptions.BASE_RULE.build();
}
Enter fullscreen mode Exit fullscreen mode

}
`

MySqlCatalog

Handles database metadata operations:
`
@Slf4j
public class MySqlCatalog extends AbstractJdbcCatalog {

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
        "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";

private static final String SELECT_DATABASE_EXISTS =
        "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'";

private static final String SELECT_TABLE_EXISTS =
        "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

private MySqlVersion version;
private MySqlTypeConverter typeConverter;

public MySqlCatalog(
        String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
    super(catalogName, username, pwd, urlInfo, null);
    this.version = resolveVersion();
    this.typeConverter = new MySqlTypeConverter(version);
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
    return String.format(SELECT_DATABASE_EXISTS, databaseName);
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
    return String.format(
            SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getListDatabaseSql() {
    return "SHOW DATABASES;";
}

@Override
protected String getListTableSql(String databaseName) {
    return "SHOW TABLES;";
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
    return rs.getString(1);
}

@Override
protected String getTableName(TablePath tablePath) {
    return tablePath.getTableName();
}

@Override
protected String getSelectColumnsSql(TablePath tablePath) {
    return String.format(
            SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected TableIdentifier getTableIdentifier(TablePath tablePath) {
    return TableIdentifier.of(
            catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)
        throws SQLException {
    List<ConstraintKey> indexList =
            super.getConstraintKeys(
                    metaData,
                    tablePath.getDatabaseName(),
                    tablePath.getSchemaName(),
                    tablePath.getTableName());
    for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); ) {
        ConstraintKey index = it.next();
        if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType())
                && "PRIMARY".equals(index.getConstraintName())) {
            it.remove();
        }
    }
    return indexList;
}

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
    String columnName = resultSet.getString("COLUMN_NAME");
    // e.g. tinyint(1) unsigned
    String columnType = resultSet.getString("COLUMN_TYPE");
    // e.g. tinyint
    String dataType = resultSet.getString("DATA_TYPE").toUpperCase();
    String comment = resultSet.getString("COLUMN_COMMENT");
    Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
    String isNullableStr = resultSet.getString("IS_NULLABLE");
    boolean isNullable = isNullableStr.equals("YES");
    // e.g. `decimal(10, 2)` is 10
    long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
    // e.g. `decimal(10, 2)` is 2
    int numberScale = resultSet.getInt("NUMERIC_SCALE");
    // e.g. `varchar(10)` is 40
    long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
    // e.g. `timestamp(3)` is 3
    int timePrecision =
            MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION");

    Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));
    Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));

    MysqlType mysqlType = MysqlType.getByName(columnType);
    boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned");

    BasicTypeDefine<MysqlType> typeDefine =
            BasicTypeDefine.<MysqlType>builder()
                    .name(columnName)
                    .columnType(columnType)
                    .dataType(dataType)
                    .nativeType(mysqlType)
                    .unsigned(unsigned)
                    .length(Math.max(charOctetLength, numberPrecision))
                    .precision(numberPrecision)
                    .scale(Math.max(numberScale, timePrecision))
                    .nullable(isNullable)
                    .defaultValue(defaultValue)
                    .comment(comment)
                    .build();
    return typeConverter.convert(typeDefine);
}

@Override
protected String getCreateTableSql(
        TablePath tablePath, CatalogTable table, boolean createIndex) {
    return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex)
            .build(table.getCatalogName());
}

@Override
protected String getDropTableSql(TablePath tablePath) {
    return String.format(
            "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getCreateDatabaseSql(String databaseName) {
    return String.format("CREATE DATABASE `%s`;", databaseName);
}

@Override
protected String getDropDatabaseSql(String databaseName) {
    return String.format("DROP DATABASE `%s`;", databaseName);
}

@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
    Connection defaultConnection = getConnection(defaultUrl);
    return CatalogUtils.getCatalogTable(
            defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter));
}

@Override
protected String getTruncateTableSql(TablePath tablePath) throws CatalogException {
    return String.format(
            "TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());
}

public String getExistDataSql(TablePath tablePath) {
    return String.format(
            "SELECT * FROM `%s`.`%s` LIMIT 1;",
            tablePath.getDatabaseName(), tablePath.getTableName());
}

private MySqlVersion resolveVersion() {
    try (Statement statement = getConnection(defaultUrl).createStatement();
         ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) {
        resultSet.next();
        return MySqlVersion.parse(resultSet.getString(1));
    } catch (Exception e) {
        log.info(
                "Failed to get mysql version, fallback to default version: {}",
                MySqlVersion.V_5_7,
                e);
        return MySqlVersion.V_5_7;
    }
}
Enter fullscreen mode Exit fullscreen mode

}
`

Dialect Implementation

MySqlDialectFactory

`
@AutoService(JdbcDialectFactory.class)
public class MySqlDialectFactory implements JdbcDialectFactory {
@override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:mysql:");
}

@Override
public JdbcDialect create() {
    return new MysqlDialect();
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
    if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) {
        return new StarRocksDialect(fieldIde);
    }
    return new MysqlDialect(fieldIde);
}
Enter fullscreen mode Exit fullscreen mode

}
`

MySqlDialect

Implements database-specific features:
`
public class MysqlDialect implements JdbcDialect {

private static final List NOT_SUPPORTED_DEFAULT_VALUES =
        Arrays.asList(MysqlType.BLOB, MysqlType.TEXT, MysqlType.JSON, MysqlType.GEOMETRY);

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public MysqlDialect() {
}

public MysqlDialect(String fieldIde) {
    this.fieldIde = fieldIde;
}

@Override
public String dialectName() {
    return DatabaseIdentifier.MYSQL;
}

@Override
public JdbcRowConverter getRowConverter() {
    return new MysqlJdbcRowConverter();
}

@Override
public TypeConverter<BasicTypeDefine> getTypeConverter() {
    TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE;
    return typeConverter;
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
    return new MySqlTypeMapper();
}

@Override
public String quoteIdentifier(String identifier) {
    return "`" + getFieldIde(identifier, fieldIde) + "`";
}

@Override
public String quoteDatabaseIdentifier(String identifier) {
    return "`" + identifier + "`";
}

@Override
public String tableIdentifier(TablePath tablePath) {
    return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
public Optional<String> getUpsertStatement(
        String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
    String updateClause =
            Arrays.stream(fieldNames)
                    .map(
                            fieldName ->
                                    quoteIdentifier(fieldName)
                                            + "=VALUES("
                                            + quoteIdentifier(fieldName)
                                            + ")")
                    .collect(Collectors.joining(", "));
    String upsertSQL =
            getInsertIntoStatement(database, tableName, fieldNames)
                    + " ON DUPLICATE KEY UPDATE "
                    + updateClause;
    return Optional.of(upsertSQL);
}

@Override
public PreparedStatement creatPreparedStatement(
        Connection connection, String queryTemplate, int fetchSize) throws SQLException {
    PreparedStatement statement =
            connection.prepareStatement(
                    queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    statement.setFetchSize(Integer.MIN_VALUE);
    return statement;
}

@Override
public String extractTableName(TablePath tablePath) {
    return tablePath.getTableName();
}

@Override
public Map<String, String> defaultParameter() {
    HashMap<String, String> map = new HashMap<>();
    map.put("rewriteBatchedStatements", "true");
    return map;
}

@Override
public TablePath parse(String tablePath) {
    return TablePath.of(tablePath, false);
}

@Override
public Object[] sampleDataFromColumn(
        Connection connection,
        JdbcSourceTable table,
        String columnName,
        int samplingRate,
        int fetchSize)
        throws Exception {
    String sampleQuery;
    if (StringUtils.isNotBlank(table.getQuery())) {
        sampleQuery =
                String.format(
                        "SELECT %s FROM (%s) AS T",
                        quoteIdentifier(columnName), table.getQuery());
    } else {
        sampleQuery =
                String.format(
                        "SELECT %s FROM %s",
                        quoteIdentifier(columnName), tableIdentifier(table.getTablePath()));
    }

    try (Statement stmt =
                 connection.createStatement(
                         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
        stmt.setFetchSize(Integer.MIN_VALUE);
        try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
            int count = 0;
            List<Object> results = new ArrayList<>();

            while (rs.next()) {
                count++;
                if (count % samplingRate == 0) {
                    results.add(rs.getObject(1));
                }
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException("Thread interrupted");
                }
            }
            Object[] resultsArray = results.toArray();
            Arrays.sort(resultsArray);
            return resultsArray;
        }
    }
}

@Override
public Long approximateRowCntStatement(Connection connection, JdbcSourceTable table)
        throws SQLException {

    // 1. If no query is configured, use TABLE STATUS.
    // 2. If a query is configured but does not contain a WHERE clause and tablePath is
    // configured , use TABLE STATUS.
    // 3. If a query is configured with a WHERE clause, or a query statement is configured but
    // tablePath is TablePath.DEFAULT, use COUNT(*).

    boolean useTableStats =
            StringUtils.isBlank(table.getQuery())
                    || (!table.getQuery().toLowerCase().contains("where")
                    && table.getTablePath() != null
                    && !TablePath.DEFAULT
                    .getFullName()
                    .equals(table.getTablePath().getFullName()));

    if (useTableStats) {
        // The statement used to get approximate row count which is less
        // accurate than COUNT(*), but is more efficient for large table.
        TablePath tablePath = table.getTablePath();
        String useDatabaseStatement =
                String.format("USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName()));
        String rowCountQuery =
                String.format("SHOW TABLE STATUS LIKE '%s';", tablePath.getTableName());

        try (Statement stmt = connection.createStatement()) {
            log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement);
            stmt.execute(useDatabaseStatement);
            log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery);
            try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
                    throw new SQLException(
                            String.format(
                                    "No result returned after running query [%s]",
                                    rowCountQuery));
                }
                return rs.getLong(5);
            }
        }
    }

    return SQLUtils.countForSubquery(connection, table.getQuery());
}

@Override
public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
    MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();
    return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
}

@Override
public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
    MysqlType mysqlType = MysqlType.getByName(columnDefine.getColumnType());
    switch (mysqlType) {
        case CHAR:
        case VARCHAR:
        case TEXT:
        case TINYTEXT:
        case MEDIUMTEXT:
        case LONGTEXT:
        case ENUM:
        case SET:
        case BLOB:
        case TINYBLOB:
        case MEDIUMBLOB:
        case LONGBLOB:
        case DATE:
        case DATETIME:
        case TIMESTAMP:
        case TIME:
        case YEAR:
            return true;
        default:
            return false;
    }
}
Enter fullscreen mode Exit fullscreen mode

}
`

Type Conversion

MySqlTypeConverter handles bidirectional type mapping:

`
public class MySqlTypeConverter implements TypeConverter> {

// ============================data types=====================
static final String MYSQL_NULL = "NULL";
static final String MYSQL_BIT = "BIT";
static final String MYSQL_BIT_UNSIGNED = "BIT UNSIGNED";

// -------------------------number----------------------------
static final String MYSQL_TINYINT = "TINYINT";
static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
static final String MYSQL_SMALLINT = "SMALLINT";
static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
static final String MYSQL_MEDIUMINT = "MEDIUMINT";
static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
static final String MYSQL_INT = "INT";
static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
static final String MYSQL_INTEGER = "INTEGER";
static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
static final String MYSQL_BIGINT = "BIGINT";
static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
static final String MYSQL_DECIMAL = "DECIMAL";
static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
static final String MYSQL_FLOAT = "FLOAT";
static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
static final String MYSQL_DOUBLE = "DOUBLE";
static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";

// -------------------------string----------------------------
public static final String MYSQL_CHAR = "CHAR";
public static final String MYSQL_VARCHAR = "VARCHAR";
static final String MYSQL_TINYTEXT = "TINYTEXT";
static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
static final String MYSQL_TEXT = "TEXT";
static final String MYSQL_LONGTEXT = "LONGTEXT";
static final String MYSQL_JSON = "JSON";
static final String MYSQL_ENUM = "ENUM";

// ------------------------------time-------------------------
static final String MYSQL_DATE = "DATE";
public static final String MYSQL_DATETIME = "DATETIME";
public static final String MYSQL_TIME = "TIME";
public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
static final String MYSQL_YEAR = "YEAR";
static final String MYSQL_YEAR_UNSIGNED = "YEAR UNSIGNED";

// ------------------------------blob-------------------------
static final String MYSQL_TINYBLOB = "TINYBLOB";
static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
static final String MYSQL_BLOB = "BLOB";
static final String MYSQL_LONGBLOB = "LONGBLOB";
static final String MYSQL_BINARY = "BINARY";
static final String MYSQL_VARBINARY = "VARBINARY";
static final String MYSQL_GEOMETRY = "GEOMETRY";

public static final int DEFAULT_PRECISION = 38;
public static final int MAX_PRECISION = 65;
public static final int DEFAULT_SCALE = 18;
public static final int MAX_SCALE = 30;
public static final int MAX_TIME_SCALE = 6;
public static final int MAX_TIMESTAMP_SCALE = 6;
public static final long POWER_2_8 = (long) Math.pow(2, 8);
public static final long POWER_2_16 = (long) Math.pow(2, 16);
public static final long POWER_2_24 = (long) Math.pow(2, 24);
public static final long POWER_2_32 = (long) Math.pow(2, 32);
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
public static final MySqlTypeConverter DEFAULT_INSTANCE =
        new MySqlTypeConverter(MySqlVersion.V_5_7);

private final MySqlVersion version;

public MySqlTypeConverter(MySqlVersion version) {
    this.version = version;
}

public MySqlTypeConverter() {
    this(MySqlVersion.V_5_7);
}

@Override
public String identifier() {
    return DatabaseIdentifier.MYSQL;
}

@Override
public Column convert(BasicTypeDefine typeDefine) {
    PhysicalColumn.PhysicalColumnBuilder builder =
            PhysicalColumn.builder()
                    .name(typeDefine.getName())
                    .sourceType(typeDefine.getColumnType())
                    .nullable(typeDefine.isNullable())
                    .defaultValue(typeDefine.getDefaultValue())
                    .comment(typeDefine.getComment());

    String mysqlDataType = typeDefine.getDataType().toUpperCase();
    if (mysqlDataType.endsWith("ZEROFILL")) {
        mysqlDataType =
                mysqlDataType.substring(0, mysqlDataType.length() - "ZEROFILL".length()).trim();
    }
    if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED"))) {
        mysqlDataType = mysqlDataType + " UNSIGNED";
    }
    switch (mysqlDataType) {
        case MYSQL_NULL:
            builder.dataType(BasicType.VOID_TYPE);
            break;
        case MYSQL_BIT:
        case MYSQL_BIT_UNSIGNED:
            if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
                builder.dataType(BasicType.BOOLEAN_TYPE);
            } else if (typeDefine.getLength() == 1) {
                builder.dataType(BasicType.BOOLEAN_TYPE);
            } else {
                builder.dataType(PrimitiveByteArrayType.INSTANCE);
                // BIT(M) -> BYTE(M/8)
                long byteLength = typeDefine.getLength() / 8;
                byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
                builder.columnLength(byteLength);
            }
            break;
        case MYSQL_TINYINT:
            if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)")) {
                builder.dataType(BasicType.BOOLEAN_TYPE);
            } else {
                builder.dataType(BasicType.BYTE_TYPE);
            }
            break;
        case MYSQL_TINYINT_UNSIGNED:
        case MYSQL_SMALLINT:
            builder.dataType(BasicType.SHORT_TYPE);
            break;
        case MYSQL_SMALLINT_UNSIGNED:
        case MYSQL_MEDIUMINT:
        case MYSQL_MEDIUMINT_UNSIGNED:
        case MYSQL_INT:
        case MYSQL_INTEGER:
        case MYSQL_YEAR:
        case MYSQL_YEAR_UNSIGNED:
            builder.dataType(BasicType.INT_TYPE);
            break;
        case MYSQL_INT_UNSIGNED:
        case MYSQL_INTEGER_UNSIGNED:
        case MYSQL_BIGINT:
            builder.dataType(BasicType.LONG_TYPE);
            break;
        case MYSQL_BIGINT_UNSIGNED:
            DecimalType intDecimalType = new DecimalType(20, 0);
            builder.dataType(intDecimalType);
            builder.columnLength(Long.valueOf(intDecimalType.getPrecision()));
            builder.scale(intDecimalType.getScale());
            break;
        case MYSQL_FLOAT:
            builder.dataType(BasicType.FLOAT_TYPE);
            break;
        case MYSQL_FLOAT_UNSIGNED:
            log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
            builder.dataType(BasicType.FLOAT_TYPE);
            break;
        case MYSQL_DOUBLE:
            builder.dataType(BasicType.DOUBLE_TYPE);
            break;
        case MYSQL_DOUBLE_UNSIGNED:
            log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
            builder.dataType(BasicType.DOUBLE_TYPE);
            break;
        case MYSQL_DECIMAL:
            Preconditions.checkArgument(typeDefine.getPrecision() > 0);

            DecimalType decimalType;
            if (typeDefine.getPrecision() > DEFAULT_PRECISION) {
                log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL);
                decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
            } else {
                decimalType =
                        new DecimalType(
                                typeDefine.getPrecision().intValue(),
                                typeDefine.getScale() == null
                                        ? 0
                                        : typeDefine.getScale().intValue());
            }
            builder.dataType(decimalType);
            builder.columnLength(Long.valueOf(decimalType.getPrecision()));
            builder.scale(decimalType.getScale());
            break;
        case MYSQL_DECIMAL_UNSIGNED:
            Preconditions.checkArgument(typeDefine.getPrecision() > 0);

            log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL_UNSIGNED);
            DecimalType decimalUnsignedType =
                    new DecimalType(
                            typeDefine.getPrecision().intValue() + 1,
                            typeDefine.getScale() == null
                                    ? 0
                                    : typeDefine.getScale().intValue());
            builder.dataType(decimalUnsignedType);
            builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision()));
            builder.scale(decimalUnsignedType.getScale());
            break;
        case MYSQL_ENUM:
            builder.dataType(BasicType.STRING_TYPE);
            if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
                builder.columnLength(100L);
            } else {
                builder.columnLength(typeDefine.getLength());
            }
            break;
        case MYSQL_CHAR:
        case MYSQL_VARCHAR:
            if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
                builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L));
            } else {
                builder.columnLength(typeDefine.getLength());
            }
            builder.dataType(BasicType.STRING_TYPE);
            break;
        case MYSQL_TINYTEXT:
            builder.dataType(BasicType.STRING_TYPE);
            builder.columnLength(POWER_2_8 - 1);
            break;
        case MYSQL_TEXT:
            builder.dataType(BasicType.STRING_TYPE);
            builder.columnLength(POWER_2_16 - 1);
            break;
        case MYSQL_MEDIUMTEXT:
            builder.dataType(BasicType.STRING_TYPE);
            builder.columnLength(POWER_2_24 - 1);
            break;
        case MYSQL_LONGTEXT:
            builder.dataType(BasicType.STRING_TYPE);
            builder.columnLength(POWER_2_32 - 1);
            break;
        case MYSQL_JSON:
            builder.dataType(BasicType.STRING_TYPE);
            break;
        case MYSQL_BINARY:
        case MYSQL_VARBINARY:
            if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
                builder.columnLength(1L);
            } else {
                builder.columnLength(typeDefine.getLength());
            }
            builder.dataType(PrimitiveByteArrayType.INSTANCE);
            break;
        case MYSQL_TINYBLOB:
            builder.dataType(PrimitiveByteArrayType.INSTANCE);
            builder.columnLength(POWER_2_8 - 1);
            break;
        case MYSQL_BLOB:
            builder.dataType(PrimitiveByteArrayType.INSTANCE);
            builder.columnLength(POWER_2_16 - 1);
            break;
        case MYSQL_MEDIUMBLOB:
            builder.dataType(PrimitiveByteArrayType.INSTANCE);
            builder.columnLength(POWER_2_24 - 1);
            break;
        case MYSQL_LONGBLOB:
            builder.dataType(PrimitiveByteArrayType.INSTANCE);
            builder.columnLength(POWER_2_32 - 1);
            break;
        case MYSQL_GEOMETRY:
            builder.dataType(PrimitiveByteArrayType.INSTANCE);
            break;
        case MYSQL_DATE:
            builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
            break;
        case MYSQL_TIME:
            builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
            builder.scale(typeDefine.getScale());
            break;
        case MYSQL_DATETIME:
        case MYSQL_TIMESTAMP:
            builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
            builder.scale(typeDefine.getScale());
            break;
        default:
            throw CommonError.convertToSeaTunnelTypeError(
                    DatabaseIdentifier.MYSQL, mysqlDataType, typeDefine.getName());
    }
    return builder.build();
}

@Override
public BasicTypeDefine<MysqlType> reconvert(Column column) {
    BasicTypeDefine.BasicTypeDefineBuilder builder =
            BasicTypeDefine.<MysqlType>builder()
                    .name(column.getName())
                    .nullable(column.isNullable())
                    .comment(column.getComment())
                    .defaultValue(column.getDefaultValue());
    switch (column.getDataType().getSqlType()) {
        case NULL:
            builder.nativeType(MysqlType.NULL);
            builder.columnType(MYSQL_NULL);
            builder.dataType(MYSQL_NULL);
            break;
        case BOOLEAN:
            builder.nativeType(MysqlType.BOOLEAN);
            builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1));
            builder.dataType(MYSQL_TINYINT);
            builder.length(1L);
            break;
        case TINYINT:
            builder.nativeType(MysqlType.TINYINT);
            builder.columnType(MYSQL_TINYINT);
            builder.dataType(MYSQL_TINYINT);
            break;
        case SMALLINT:
            builder.nativeType(MysqlType.SMALLINT);
            builder.columnType(MYSQL_SMALLINT);
            builder.dataType(MYSQL_SMALLINT);
            break;
        case INT:
            builder.nativeType(MysqlType.INT);
            builder.columnType(MYSQL_INT);
            builder.dataType(MYSQL_INT);
            break;
        case BIGINT:
            builder.nativeType(MysqlType.BIGINT);
            builder.columnType(MYSQL_BIGINT);
            builder.dataType(MYSQL_BIGINT);
            break;
        case FLOAT:
            builder.nativeType(MysqlType.FLOAT);
            builder.columnType(MYSQL_FLOAT);
            builder.dataType(MYSQL_FLOAT);
            break;
        case DOUBLE:
            builder.nativeType(MysqlType.DOUBLE);
            builder.columnType(MYSQL_DOUBLE);
            builder.dataType(MYSQL_DOUBLE);
            break;
        case DECIMAL:
            DecimalType decimalType = (DecimalType) column.getDataType();
            long precision = decimalType.getPrecision();
            int scale = decimalType.getScale();
            if (precision <= 0) {
                precision = DEFAULT_PRECISION;
                scale = DEFAULT_SCALE;
                log.warn(
                        "The decimal column {} type decimal({},{}) is out of range, "
                                + "which is precision less than 0, "
                                + "it will be converted to decimal({},{})",
                        column.getName(),
                        decimalType.getPrecision(),
                        decimalType.getScale(),
                        precision,
                        scale);
            } else if (precision > MAX_PRECISION) {
                scale = (int) Math.max(0, scale - (precision - MAX_PRECISION));
                precision = MAX_PRECISION;
                log.warn(
                        "The decimal column {} type decimal({},{}) is out of range, "
                                + "which exceeds the maximum precision of {}, "
                                + "it will be converted to decimal({},{})",
                        column.getName(),
                        decimalType.getPrecision(),
                        decimalType.getScale(),
                        MAX_PRECISION,
                        precision,
                        scale);
            }
            if (scale < 0) {
                scale = 0;
                log.warn(
                        "The decimal column {} type decimal({},{}) is out of range, "
                                + "which is scale less than 0, "
                                + "it will be converted to decimal({},{})",
                        column.getName(),
                        decimalType.getPrecision(),
                        decimalType.getScale(),
                        precision,
                        scale);
            } else if (scale > MAX_SCALE) {
                scale = MAX_SCALE;
                log.warn(
                        "The decimal column {} type decimal({},{}) is out of range, "
                                + "which exceeds the maximum scale of {}, "
                                + "it will be converted to decimal({},{})",
                        column.getName(),
                        decimalType.getPrecision(),
                        decimalType.getScale(),
                        MAX_SCALE,
                        precision,
                        scale);
            }

            builder.nativeType(MysqlType.DECIMAL);
            builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL, precision, scale));
            builder.dataType(MYSQL_DECIMAL);
            builder.precision(precision);
            builder.scale(scale);
            break;
        case BYTES:
            if (column.getColumnLength() == null || column.getColumnLength() <= 0) {
                builder.nativeType(MysqlType.VARBINARY);
                builder.columnType(
                        String.format("%s(%s)", MYSQL_VARBINARY, MAX_VARBINARY_LENGTH / 2));
                builder.dataType(MYSQL_VARBINARY);
            } else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) {
                builder.nativeType(MysqlType.VARBINARY);
                builder.columnType(
                        String.format("%s(%s)", MYSQL_VARBINARY, column.getColumnLength()));
                builder.dataType(MYSQL_VARBINARY);
            } else if (column.getColumnLength() < POWER_2_24) {
                builder.nativeType(MysqlType.MEDIUMBLOB);
                builder.columnType(MYSQL_MEDIUMBLOB);
                builder.dataType(MYSQL_MEDIUMBLOB);
            } else {
                builder.nativeType(MysqlType.LONGBLOB);
                builder.columnType(MYSQL_LONGBLOB);
                builder.dataType(MYSQL_LONGBLOB);
            }
            break;
        case STRING:
            if (column.getColumnLength() == null || column.getColumnLength() <= 0) {
                builder.nativeType(MysqlType.LONGTEXT);
                builder.columnType(MYSQL_LONGTEXT);
                builder.dataType(MYSQL_LONGTEXT);
            } else if (column.getColumnLength() < POWER_2_8) {
                builder.nativeType(MysqlType.VARCHAR);
                builder.columnType(
                        String.format("%s(%s)", MYSQL_VARCHAR, column.getColumnLength()));
                builder.dataType(MYSQL_VARCHAR);
            } else if (column.getColumnLength() < POWER_2_16) {
                builder.nativeType(MysqlType.TEXT);
                builder.columnType(MYSQL_TEXT);
                builder.dataType(MYSQL_TEXT);
            } else if (column.getColumnLength() < POWER_2_24) {
                builder.nativeType(MysqlType.MEDIUMTEXT);
                builder.columnType(MYSQL_MEDIUMTEXT);
                builder.dataType(MYSQL_MEDIUMTEXT);
            } else {
                builder.nativeType(MysqlType.LONGTEXT);
                builder.columnType(MYSQL_LONGTEXT);
                builder.dataType(MYSQL_LONGTEXT);
            }
            break;
        case DATE:
            builder.nativeType(MysqlType.DATE);
            builder.columnType(MYSQL_DATE);
            builder.dataType(MYSQL_DATE);
            break;
        case TIME:
            builder.nativeType(MysqlType.TIME);
            builder.dataType(MYSQL_TIME);
            if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
                builder.columnType(MYSQL_TIME);
            } else if (column.getScale() != null && column.getScale() > 0) {
                int timeScale = column.getScale();
                if (timeScale > MAX_TIME_SCALE) {
                    timeScale = MAX_TIME_SCALE;
                    log.warn(
                            "The time column {} type time({}) is out of range, "
                                    + "which exceeds the maximum scale of {}, "
                                    + "it will be converted to time({})",
                            column.getName(),
                            column.getScale(),
                            MAX_SCALE,
                            timeScale);
                }
                builder.columnType(String.format("%s(%s)", MYSQL_TIME, timeScale));
                builder.scale(timeScale);
            } else {
                builder.columnType(MYSQL_TIME);
            }
            break;
        case TIMESTAMP:
            builder.nativeType(MysqlType.DATETIME);
            builder.dataType(MYSQL_DATETIME);
            if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
                builder.columnType(MYSQL_DATETIME);
            } else if (column.getScale() != null && column.getScale() > 0) {
                int timestampScale = column.getScale();
                if (timestampScale > MAX_TIMESTAMP_SCALE) {
                    timestampScale = MAX_TIMESTAMP_SCALE;
                    log.warn(
                            "The timestamp column {} type timestamp({}) is out of range, "
                                    + "which exceeds the maximum scale of {}, "
                                    + "it will be converted to timestamp({})",
                            column.getName(),
                            column.getScale(),
                            MAX_TIMESTAMP_SCALE,
                            timestampScale);
                }
                builder.columnType(String.format("%s(%s)", MYSQL_DATETIME, timestampScale));
                builder.scale(timestampScale);
            } else {
                builder.columnType(MYSQL_DATETIME);
            }
            break;
        default:
            throw CommonError.convertToConnectorTypeError(
                    DatabaseIdentifier.MYSQL,
                    column.getDataType().getSqlType().name(),
                    column.getName());
    }

    return builder.build();
}
Enter fullscreen mode Exit fullscreen mode

}
`

Contribution Guidelines

  • Task Assignment: Comment on GitHub issues to claim tasks
  • Mentorship: Work with core maintainers
  • Quick Start: git clone https://github.com/apache/seatunnel.git cd seatunnel-transforms-v2/transform-python mvn clean install -DskipTests

Top comments (0)