Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JsonFileFormatProperties extends FileFormatProperties {
public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
public static final String PROP_NUM_AS_STRING = "num_as_string";
public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
public static final String PROP_FILL_MISSING_COLUMNS = "fill_missing_columns";

// from ExternalFileTableValuedFunction:
private String jsonRoot = "";
Expand All @@ -44,6 +45,7 @@ public class JsonFileFormatProperties extends FileFormatProperties {
private boolean readJsonByLine;
private boolean numAsString = false;
private boolean fuzzyParse = false;
private boolean fillMissingColumns = false;
private String lineDelimiter = CsvFileFormatProperties.DEFAULT_LINE_DELIMITER;


Expand Down Expand Up @@ -77,6 +79,9 @@ public void analyzeFileFormatProperties(Map<String, String> formatProperties, bo
fuzzyParse = Boolean.valueOf(
getOrDefault(formatProperties, PROP_FUZZY_PARSE,
"", isRemoveOriginProperty)).booleanValue();
fillMissingColumns = Boolean.valueOf(
getOrDefault(formatProperties, PROP_FILL_MISSING_COLUMNS,
"false", isRemoveOriginProperty)).booleanValue();
lineDelimiter = getOrDefault(formatProperties, CsvFileFormatProperties.PROP_LINE_DELIMITER,
CsvFileFormatProperties.DEFAULT_LINE_DELIMITER, isRemoveOriginProperty);
if (Strings.isNullOrEmpty(lineDelimiter)) {
Expand Down Expand Up @@ -136,6 +141,10 @@ public boolean isFuzzyParse() {
return fuzzyParse;
}

public boolean isFillMissingColumns() {
return fillMissingColumns;
}

public String getLineDelimiter() {
return lineDelimiter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException {
String.valueOf(jsonFileFormatProperties.isNumAsString()));
jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE,
String.valueOf(jsonFileFormatProperties.isFuzzyParse()));
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS,
String.valueOf(jsonFileFormatProperties.isFillMissingColumns()));
} else {
throw new UserException("Invalid format type.");
}
Expand Down Expand Up @@ -710,6 +712,11 @@ public boolean isFuzzyParse() {
return Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FUZZY_PARSE));
}

@Override
public boolean isFillMissingColumns() {
return Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS));
}

@Override
public int getSendBatchParallelism() {
return sendBatchParallelism;
Expand Down Expand Up @@ -1809,6 +1816,7 @@ public String getShowCreateInfo() {
appendProperties(sb, JsonFileFormatProperties.PROP_NUM_AS_STRING, isNumAsString(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_FUZZY_PARSE, isFuzzyParse(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_JSON_ROOT, getJsonRoot(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, isFillMissingColumns(), false);
appendProperties(sb, LoadCommand.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadCommand.TIMEZONE, getTimezone(), false);
appendProperties(sb, LoadCommand.EXEC_MEM_LIMIT, getMemLimit(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
Expand Down Expand Up @@ -116,7 +117,7 @@ private void handleSequenceColumn(List<NereidsImportColumnDesc> columnDescList,
.filter(c -> c.getColumnName().equalsIgnoreCase(finalSequenceCol)).findAny();
// if `columnDescs.descs` is empty, that means it's not a partial update load, and user not specify
// column name.
if (foundCol.isPresent() || shouldAddSequenceColumn(columnDescList)) {
if (foundCol.isPresent() || shouldAddSequenceColumn(columnDescList, fileGroup)) {
columnDescList.add(new NereidsImportColumnDesc(Column.SEQUENCE_COL,
new UnboundSlot(sequenceCol)));
} else if (!fileGroupInfo.isFixedPartialUpdate()) {
Expand Down Expand Up @@ -195,9 +196,16 @@ private void fillContextExprMap(List<NereidsImportColumnDesc> columnDescList, Ne
// If user does not specify the file field names, generate it by using base schema of table.
// So that the following process can be unified
boolean specifyFileFieldNames = copiedColumnExprs.stream().anyMatch(p -> p.isColumn());
if (!specifyFileFieldNames) {
if (!specifyFileFieldNames || isFillMissingColumns(fileGroup)) {
Set<String> existingColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
for (NereidsImportColumnDesc desc : copiedColumnExprs) {
existingColumns.add(desc.getColumnName());
}
List<Column> columns = tbl.getBaseSchema(false);
for (Column column : columns) {
if (existingColumns.contains(column.getName())) {
continue;
}
if (constantMappingColumns.contains(column.getName())) {
// Skip this column because user has already specified a constant mapping expression for it
// in the COLUMNS parameter (e.g., "column_name = 'constant_value'")
Expand Down Expand Up @@ -420,15 +428,37 @@ private void fillContextExprMap(List<NereidsImportColumnDesc> columnDescList, Ne
}

/**
* if not set sequence column and column size is null or only have deleted sign ,return true
* Returns true when the sequence column should be auto-added, i.e.,
* if not set sequence column and column size is null or only have deleted sign,
* or fill_missing_columns is enabled, meaning schema will be auto-filled.
*/
private boolean shouldAddSequenceColumn(List<NereidsImportColumnDesc> columnDescList) {
private boolean shouldAddSequenceColumn(List<NereidsImportColumnDesc> columnDescList,
NereidsBrokerFileGroup fileGroup) {
if (isFillMissingColumns(fileGroup)) {
return true;
}
if (columnDescList.isEmpty()) {
return true;
}
return columnDescList.size() == 1 && columnDescList.get(0).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN);
}

/**
* Returns true if the file format is JSON and fill_missing_columns is enabled. Only meaningful for JSON.
*/
private boolean isFillMissingColumns(NereidsBrokerFileGroup fileGroup) {
return fileGroup.getFileFormatProperties() instanceof JsonFileFormatProperties
&& ((JsonFileFormatProperties) fileGroup.getFileFormatProperties()).isFillMissingColumns();
}

/**
* Returns true if the file format is JSON and fill_missing_columns is enabled. Only meaningful for JSON.
*/
private boolean isFillMissingColumns(NereidsBrokerFileGroup fileGroup) {
return fileGroup.getFileFormatProperties() instanceof JsonFileFormatProperties
&& ((JsonFileFormatProperties) fileGroup.getFileFormatProperties()).isFillMissingColumns();
}

private TFileFormatType formatType(String fileFormat) throws UserException {
if (fileFormat == null) {
// get file format by the file path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.add(CreateRoutineLoadInfo.STRICT_MODE)
.add(CreateRoutineLoadInfo.TIMEZONE)
.add(CreateRoutineLoadInfo.WORKLOAD_GROUP)
.add(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS)
.add(JsonFileFormatProperties.PROP_JSON_PATHS)
.add(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY)
.add(JsonFileFormatProperties.PROP_NUM_AS_STRING)
Expand Down Expand Up @@ -313,6 +314,15 @@ private void checkJobProperties() throws UserException {
analyzedJobProperties.put(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
String.valueOf(emptyFieldAsNull));
}

if (jobProperties.containsKey(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS)) {
String val = jobProperties.get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS);
if (!"true".equalsIgnoreCase(val) && !"false".equalsIgnoreCase(val)) {
throw new AnalysisException(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS
+ " must be 'true' or 'false', but found: " + val);
}
analyzedJobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, val);
}
}

private void checkDataSourceProperties() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class CreateRoutineLoadInfo {
.add(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY)
.add(JsonFileFormatProperties.PROP_NUM_AS_STRING)
.add(JsonFileFormatProperties.PROP_FUZZY_PARSE)
.add(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS)
.add(JsonFileFormatProperties.PROP_JSON_ROOT)
.add(CsvFileFormatProperties.PROP_ENCLOSE)
.add(CsvFileFormatProperties.PROP_ESCAPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public interface LoadTaskInfo {

boolean isFuzzyParse();

boolean isFillMissingColumns();

boolean isNumAsString();

boolean isReadJsonByLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,29 @@ public void testAnalyzeFileFormatPropertiesEmpty() throws AnalysisException {
Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
Assert.assertEquals(false, jsonFileFormatProperties.isFillMissingColumns());
Assert.assertEquals(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER,
jsonFileFormatProperties.getLineDelimiter());
}

@Test
public void testAnalyzeFileFormatPropertiesFillMissingColumnsTrue() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
properties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");

jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(true, jsonFileFormatProperties.isFillMissingColumns());
}

@Test
public void testAnalyzeFileFormatPropertiesFillMissingColumnsFalse() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
properties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "false");

jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(false, jsonFileFormatProperties.isFillMissingColumns());
}

@Test
public void testAnalyzeFileFormatPropertiesValidJsonRoot() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
Expand Down Expand Up @@ -159,6 +178,7 @@ public void testAnalyzeFileFormatPropertiesAllProperties() throws AnalysisExcept
properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, "true");
properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true");
properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true");
properties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");

jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);

Expand All @@ -168,6 +188,7 @@ public void testAnalyzeFileFormatPropertiesAllProperties() throws AnalysisExcept
Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString());
Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse());
Assert.assertEquals(true, jsonFileFormatProperties.isFillMissingColumns());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,60 @@ public void testGetShowCreateInfo() throws UserException {
System.out.println(showCreateInfo);
Assert.assertEquals(expect, showCreateInfo);
}

@Test
public void testGetShowCreateInfoWithFillMissingColumns() throws UserException {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(111L, "test_load", 1,
11, "localhost:9092", "test_topic", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);

// Set fill_missing_columns to true
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");
jobProperties.put("format", "json");
jobProperties.put("strip_outer_array", "false");
jobProperties.put("num_as_string", "false");
jobProperties.put("fuzzy_parse", "false");
jobProperties.put("strict_mode", "false");
jobProperties.put("timezone", "Asia/Shanghai");
jobProperties.put("desired_concurrent_number", "0");
jobProperties.put("max_error_number", "10");
jobProperties.put("max_filter_ratio", "1.0");
jobProperties.put("max_batch_interval", "60");
jobProperties.put("max_batch_rows", "10");
jobProperties.put("max_batch_size", "1073741824");
jobProperties.put("exec_mem_limit", "2147483648");
Deencapsulation.setField(routineLoadJob, "jobProperties", jobProperties);

String showCreateInfo = routineLoadJob.getShowCreateInfo();
String expect = "CREATE ROUTINE LOAD test_load ON 11\n"
+ "WITH APPEND\n"
+ "PROPERTIES\n"
+ "(\n"
+ "\"desired_concurrent_number\" = \"0\",\n"
+ "\"max_error_number\" = \"10\",\n"
+ "\"max_filter_ratio\" = \"1.0\",\n"
+ "\"max_batch_interval\" = \"60\",\n"
+ "\"max_batch_rows\" = \"10\",\n"
+ "\"max_batch_size\" = \"1073741824\",\n"
+ "\"format\" = \"json\",\n"
+ "\"strip_outer_array\" = \"false\",\n"
+ "\"num_as_string\" = \"false\",\n"
+ "\"fuzzy_parse\" = \"false\",\n"
+ "\"fill_missing_columns\" = \"true\",\n"
+ "\"strict_mode\" = \"false\",\n"
+ "\"timezone\" = \"Asia/Shanghai\",\n"
+ "\"exec_mem_limit\" = \"2147483648\"\n"
+ ")\n"
+ "FROM KAFKA\n"
+ "(\n"
+ "\"kafka_broker_list\" = \"localhost:9092\",\n"
+ "\"kafka_topic\" = \"test_topic\"\n"
+ ");";
System.out.println(showCreateInfo);
Assert.assertEquals(expect, showCreateInfo);
}

@Test
public void testParseUniqueKeyUpdateMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
Expand Down Expand Up @@ -98,14 +99,15 @@ public void testValidate() {
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY, "1048576000");
jobProperties.put(CreateRoutineLoadInfo.STRICT_MODE, "false");
jobProperties.put(CreateRoutineLoadInfo.TIMEZONE, "Asia/Shanghai");
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "true");

Map<String, String> dataSourceProperties = Maps.newHashMap();
LabelNameInfo labelNameInfo = new LabelNameInfo("testDb", "label1");

AlterRoutineLoadCommand command = new AlterRoutineLoadCommand(labelNameInfo, jobProperties, dataSourceProperties);
Assertions.assertDoesNotThrow(() -> command.validate(connectContext));

Assertions.assertEquals(8, command.getAnalyzedJobProperties().size());
Assertions.assertEquals(9, command.getAnalyzedJobProperties().size());
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY));
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY));
Expand All @@ -114,5 +116,34 @@ public void testValidate() {
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.STRICT_MODE));
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.TIMEZONE));
Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS));
Assertions.assertEquals("true", command.getAnalyzedJobProperties().get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS));
}

@Test
public void testValidateFillMissingColumnsFalse() {
runBefore();
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "false");

Map<String, String> dataSourceProperties = Maps.newHashMap();
LabelNameInfo labelNameInfo = new LabelNameInfo("testDb", "label1");

AlterRoutineLoadCommand command = new AlterRoutineLoadCommand(labelNameInfo, jobProperties, dataSourceProperties);
Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
Assertions.assertEquals("false", command.getAnalyzedJobProperties().get(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS));
}

@Test
public void testValidateFillMissingColumnsInvalidValue() {
runBefore();
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(JsonFileFormatProperties.PROP_FILL_MISSING_COLUMNS, "invalid");

Map<String, String> dataSourceProperties = Maps.newHashMap();
LabelNameInfo labelNameInfo = new LabelNameInfo("testDb", "label1");

AlterRoutineLoadCommand command = new AlterRoutineLoadCommand(labelNameInfo, jobProperties, dataSourceProperties);
Assertions.assertThrows(org.apache.doris.common.AnalysisException.class, () -> command.validate(connectContext));
}
}
Loading