Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] 写UDAF的时候,如果不加group by level就能正常接收参数,如果加了group by level就报301 #12508

Open
1 of 2 tasks
sky-sheep opened this issue May 11, 2024 · 1 comment

Comments

@sky-sheep
Copy link

Search before asking

  • I searched in the issues and found nothing similar.

Version

1.3.2

Describe the bug and provide the minimal reproduce step

写了一个UDAF,用来统计数量,加了一个参数filter用来过滤数据,如果不加group by level就能够正常执行,如果加了level就不能够正常执行,抛出异常301
不加的sql如下所示:
881715313071_ pic
如果加了 group by level就会报错,如下所示:
871715313067_ pic
我用count这种内置的聚合函数就没问题,即使参数filter这种没有定义的,而且,这个countFill也能够正常接收参数,filter能够传进去,
951715313417_ pic
报错日志如下:
1141715318978_ pic
UDAF代码如下:
`package org.apache.iotdb.udf.api;

import org.apache.iotdb.udf.api.customizer.config.UDAFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.iotdb.udf.api.utils.ExecuteEval;
import org.apache.iotdb.udf.api.utils.ResultValue;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.BitMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

/**

  • 个数统计函数

  • 获取所有符合条件的个数

  • @author ylx
    */
    public class CountFill implements UDAF {

    private Logger logger = LoggerFactory.getLogger(CountFill.class);

    private Type dataType;

    private List filter;
    static class CountState implements State {

     Long count;
    
     @Override
     public void reset() {
         count = 0L;
     }
    
     @Override
     public byte[] serialize() {
         ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES);
         buffer.putLong(count);
    
         return buffer.array();
     }
    
     @Override
     public void deserialize(byte[] bytes) {
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         count = buffer.getLong();
     }
    

    }

    /**

    • 在初始化方法beforeStart调用前执行,用于检测UDFParameters中用户输入的参数是否合法。该方法与 UDTF 的validate相同。
      */
      @OverRide
      public void validate(UDFParameterValidator validator) throws Exception {
      validator
      .validateInputSeriesNumber(1)
      .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE,Type.TEXT);
      }

    /**

    • 初始化方法,在 UDAF 处理输入数据前,调用用户自定义的初始化行为。与 UDTF 不同的是,这里的 configuration 是 UDAFConfiguration 类型
    • @param parameters
    • @param configurations
      */
      @OverRide
      public void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) {
      String inputFilter = parameters.getString("filter");
      if (inputFilter !=null ){
      filter = (Arrays.asList(inputFilter.split(",")));
      }
      dataType = parameters.getDataType(0);
      configurations.setOutputDataType(Type.INT64);
      }

    /**

    • 创建State对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。
      */
      @OverRide
      public State createState() {
      CountState countState = new CountState();
      countState.count = 0L;
      return countState;
      }

    /**

    • 根据传入的数据Column[]批量地更新State对象,注意 column[0] 总是代表时间列。另外BitMap表示之前已经被过滤掉的数据,您在编写该方法时需要手动判断对应的数据是否被过滤掉
    • values
    • @param state state to be updated
    • @param columns input columns from IoTDB TsBlock, time column is always the last column, the
    •            remaining columns are their parameter value columns
      
    • @param bitMap define some filtered position in columns
      */
      @OverRide
      public void addInput(State state, Column[] columns, BitMap bitMap) {
      CountState countState = (CountState) state;
      switch (dataType) {
      case INT32:
      case INT64:
      case FLOAT:
      case DOUBLE:
      case TEXT:
      case BOOLEAN:
      default:
      addIntInput(countState, columns, bitMap);
      break;
      }
      }

    public void addIntInput(CountState state, Column[] columns, BitMap bitMap) {
    int count = columns[0].getPositionCount();
    for (int i = 0; i < count; i++) {
    Boolean filterFlag = true;
    if (bitMap != null && !bitMap.isMarked(i)) {
    continue;
    }
    if (filter != null && filter.size()>0){
    for (String fill : filter) {
    String comparisonOperators = ExecuteEval.findComparisonOperators(fill);
    String replace = fill.replace(comparisonOperators, "").trim();
    if (ExecuteEval.isString(replace,dataType)){
    // 如果是字符串,则将其拼接上单引号
    replace = ("'"+replace+"'").trim();
    switch (dataType){
    case INT32:
    // int类型
    filterFlag =ExecuteEval.evalString("('"+columns[0].getInt(i)+"'"+comparisonOperators+replace+")?true:false");
    break;
    case INT64:
    filterFlag =ExecuteEval.evalString("('"+columns[0].getLong(i)+"'"+comparisonOperators+replace+")?true:false");
    break;
    case FLOAT:
    filterFlag =ExecuteEval.evalString("('"+columns[0].getFloat(i)+"'"+comparisonOperators+replace+")?true:false");
    break;
    case DOUBLE:
    filterFlag =ExecuteEval.evalString("('"+columns[0].getDouble(i)+"'"+comparisonOperators+replace+")?true:false");
    break;
    case TEXT:
    filterFlag =ExecuteEval.evalString("('"+columns[0].getObject(i)+"'"+comparisonOperators+replace+")?true:false");
    break;
    default:
    break;
    }

                 }else {
                     switch (dataType){
                         case INT32:
                             // int类型
                             filterFlag =ExecuteEval.evalString("("+columns[0].getInt(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case INT64:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getLong(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case FLOAT:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getFloat(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case DOUBLE:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getDouble(i)+comparisonOperators+replace+")?true:false");
                             break;
                         case TEXT:
                             filterFlag =ExecuteEval.evalString("("+columns[0].getObject(i)+comparisonOperators+replace+")?true:false");
                             break;
                         default:
                             break;
                     }
                 }
             }
         }
         if (!columns[0].isNull(i) && filterFlag) {
             state.count++;
         }
     }
    

    }

    /**

    • 将rhs状态合并至state状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State对象,然后调用该方法合并成完整的State。

    • @param state current state

    • @param rhs right-hand-side state to be merged
      */
      @OverRide
      public void combineState(State state, State rhs) {
      CountState avgState = (CountState) state;
      CountState avgRhs = (CountState) rhs;

      avgState.count += avgRhs.count;
      }

    /**

    • 根据State中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。

    • @param state final state

    • @param resultValue used to collect output data points
      */
      @OverRide
      public void outputFinal(State state, ResultValue resultValue) {
      CountState avgState = (CountState) state;

      resultValue.setLong(avgState.count);
      }

}`

What did you expect to see?

What did you see instead?

Anything else?

Are you willing to submit a PR?

  • I'm willing to submit a PR!
Copy link

Hi, this is your first issue in IoTDB project. Thanks for your report. Welcome to join the community!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant