Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST]How Do I Solve the Problem that Missing Values Cannot Be Converted to Int Values? #1770

Open
gukejun1 opened this issue Feb 22, 2023 · 19 comments
Labels
question Further information is requested

Comments

@gukejun1
Copy link

When I run the case in , an error is reported.

2023-02-22 16:07:04,128 - distributed.worker - WARNING - Compute Failed
Key:       ('write-processed-db337936ac67baee573cd5fd6543337d-partitiondb337936ac67baee573cd5fd6543337d', "('part_4.parquet',)")
Function:  _write_subgraph
args:      (<merlin.io.dask.DaskSubgraph object at 0x7f1772bc9100>, ('part_4.parquet',), '/raid/data/criteo/test_dask/output/train/', <Shuffle.PER_PARTITION: 0>, <fsspec.implementations.local.LocalFileSystem object at 0x7f175d0740a0>, ['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26'], ['I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13'], ['label'], 'parquet', 0, True, '')
kwargs:    {}
Exception: "ValueError('cannot convert NA to integer')"

Traceback (most recent call last):
  File "/Merlin/examples/scaling-criteo/02_etl_with_nvtabular.py", line 190, in <module>
    main()
  File "/Merlin/examples/scaling-criteo/02_etl_with_nvtabular.py", line 167, in main
    workflow.transform(train_dataset).to_parquet(
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py", line 910, in to_parquet
    _ddf_to_dataset(
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 367, in _ddf_to_dataset
    out = client.compute(out).result()
  File "/usr/local/lib/python3.8/dist-packages/distributed/client.py", line 280, in result
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 202, in _write_subgraph
    table = subgraph[part]
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 53, in __getitem__
    return dask.get(dsk, key)
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 557, in get_sync
    return get_async(
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 113, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 113, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 89, in __call__
    return read_parquet_part(
  File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 587, in read_parquet_part
    dfs = [
  File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 588, in <listcomp>
    func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py", line 88, in read_partition
    part[k] = part[k].astype(type_name.replace("Int", "int"))
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/generic.py", line 6240, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/managers.py", line 448, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/managers.py", line 352, in apply
    applied = getattr(b, f)(**kwargs)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/arrays/masked.py", line 471, in astype
    raise ValueError("cannot convert NA to integer")
ValueError: cannot convert NA to integer

Why is the error still reported that nan cannot be converted to an int value? The official website handles the problem of missing values.
How to solve this problem?

@gukejun1 gukejun1 added the question Further information is requested label Feb 22, 2023
@gukejun1
Copy link
Author

@rnyak
Copy link
Contributor

rnyak commented Feb 22, 2023

@gukejun1 can you provide more info about your env? how and where did you install merlin libraries? Are you using a docker image? if yes, which docker image? thanks.

@gukejun1
Copy link
Author

gukejun1 commented Feb 22, 2023

the code is same from this, I use docker images(nvcr.io/nvidia/merlin/merlin-tensorflow 22.12)

@gukejun1
Copy link
Author

@rnyak this is my full code


BASE_DIR = os.environ.get("BASE_DIR", "/raid/data/criteo")
    INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", BASE_DIR + "/converted/criteo")
    OUTPUT_DATA_DIR = os.environ.get("OUTPUT_DATA_DIR", BASE_DIR + "/test_dask/output")
    USE_HUGECTR = bool(os.environ.get("USE_HUGECTR", ""))
    print(USE_HUGECTR)
    stats_path = os.path.join(OUTPUT_DATA_DIR, "test_dask/stats")
    dask_workdir = os.path.join(OUTPUT_DATA_DIR, "test_dask/workdir")

    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.makedirs(dask_workdir)

    # Make sure we have a clean stats space for Dask
    if os.path.isdir(stats_path):
        shutil.rmtree(stats_path)
    os.mkdir(stats_path)

    # Make sure we have a clean output path
    if os.path.isdir(OUTPUT_DATA_DIR):
        shutil.rmtree(OUTPUT_DATA_DIR)
    os.mkdir(OUTPUT_DATA_DIR)


    fname = "day_{}.parquet"
    num_days = len(
        [i for i in os.listdir(INPUT_DATA_DIR) if re.match(fname.format("[0-9]{1,2}"), i) is not None]
    )
    train_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(num_days - 1)]
    valid_paths = [
        os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(num_days - 1, num_days)
    ]

    train_paths="/raid/data/criteo/converted/criteo/day_0_40000000.parquet"
    valid_paths="/raid/data/criteo/converted/criteo/day_1_4000000.parquet"
    print(train_paths)
    print(valid_paths)

    # Dask dashboard
    dashboard_port = "8787"

    protocol = "tcp"  # "tcp" or "ucx"
    if numba.cuda.is_available():
        NUM_GPUS = list(range(len(numba.cuda.gpus)))
    else:
        NUM_GPUS = []
    visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Select devices to place workers
    device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
    device_pool_frac = 0.8
    part_mem_frac = 0.15

    # Use total device size to calculate args.device_limit_frac
    device_size = device_mem_size(kind="total")
    device_limit = int(device_limit_frac * device_size)
    device_pool_size = int(device_pool_frac * device_size)
    part_size = int(part_mem_frac * device_size)

    # Check if any device memory is already occupied
    for dev in visible_devices.split(","):
        fmem = pynvml_mem_size(kind="free", index=int(dev))
        used = (device_size - fmem) / 1e9
        if used > 1.0:
            warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

    cluster = None  # (Optional) Specify existing scheduler port
    if cluster is None:
        cluster = LocalCUDACluster(
            protocol=protocol,
            n_workers=len(visible_devices.split(",")),
            CUDA_VISIBLE_DEVICES=visible_devices,
            device_memory_limit=device_limit,
            local_directory=dask_workdir,
            dashboard_address=":" + dashboard_port,
            rmm_pool_size=(device_pool_size // 256) * 256
        )

    # Create the distributed client
    client = Client(cluster)
    print(client)

    # define our dataset schema
    CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
    CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
    LABEL_COLUMNS = ["label"]
    COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS

    num_buckets = 10000000
    categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype='int32')
    # categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype=np.zeros(0))
    cat_features = CATEGORICAL_COLUMNS >> categorify_op
    cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')
    # cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype=np.zeros(0))
    label_features = LABEL_COLUMNS >> AddMetadata(
        tags=[str(Tags.BINARY_CLASSIFICATION), "target"]
    )

    features = cat_features + cont_features + label_features
    workflow = nvt.Workflow(features)

    dict_dtypes = {}

    # The environment variable USE_HUGECTR defines, if we want to use the output for HugeCTR or another framework
    for col in CATEGORICAL_COLUMNS:
        dict_dtypes[col] = np.int64 if USE_HUGECTR else np.int32

    for col in CONTINUOUS_COLUMNS:
        dict_dtypes[col] = np.float32

    for col in LABEL_COLUMNS:
        dict_dtypes[col] = np.int32



    print(dict_dtypes)

    train_dataset = nvt.Dataset(train_paths, engine="parquet", part_size=part_size,
                                )

    valid_dataset = nvt.Dataset(valid_paths, engine="parquet", part_size=part_size,
                                )



    output_train_dir = os.path.join(OUTPUT_DATA_DIR, "train/")
    output_valid_dir = os.path.join(OUTPUT_DATA_DIR, "valid/")
    # ! mkdir -p $output_train_dir
    # ! mkdir -p $output_valid_dir

    print(workflow)
     workflow.fit(train_dataset)

    # train_dataset.fillna(0, inplace=True)

    workflow.transform(train_dataset).to_parquet(
        output_files=len(NUM_GPUS),
        output_path=output_train_dir,
        shuffle=nvt.io.Shuffle.PER_PARTITION,
        dtypes=dict_dtypes,
        cats=CATEGORICAL_COLUMNS,
        conts=CONTINUOUS_COLUMNS,
        labels=LABEL_COLUMNS,
    )

    workflow.transform(valid_dataset).to_parquet(
        output_path=output_valid_dir,
        dtypes=dict_dtypes,
        cats=CATEGORICAL_COLUMNS,
        conts=CONTINUOUS_COLUMNS,
        labels=LABEL_COLUMNS,
    )

    workflow.save(os.path.join(OUTPUT_DATA_DIR, "workflow"))

I install merlin libraries from the web of https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow

@gukejun1
Copy link
Author

gukejun1 commented Feb 23, 2023

@rnyak The training data is from the first 40 million rows of day_0 in the criteo data set, and the verification data is from the first 4 million rows of day_1.The following figure shows some parquet data visualization.
image

@rnyak
Copy link
Contributor

rnyak commented Feb 24, 2023

@gukejun1 if you have null values, normally, when you apply the following lines in the NVT workflow the missing/null values should be filled..

cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')

can you share a subset of your parquet file like only couple hundreds rows, so that we can reproduce the issue? thanks.

@gukejun1
Copy link
Author

gukejun1 commented Feb 24, 2023

@rnyak day_1_100.parquet.txt
The data comes from the first 100 lines of data in criteo day_1 and is converted to the parquet file using the official method \Merlin\examples\scaling-criteo\01_download_convert.ipynb.

@rnyak
Copy link
Contributor

rnyak commented Feb 24, 2023

@gukejun1 I used your small dataset with this notebook and all worked fine for me. I cannot reproduce your error.. are you able to reproduce your error only with this small parquet file?

@gukejun1
Copy link
Author

@rnyak Very strange.
image

@rnyak
Copy link
Contributor

rnyak commented Feb 27, 2023

@gukejun1 please note that your screenshot shows that you are trying to read in a .parquet.txt file, not a .parquet file, that means your files extension type is not correct. it should be .parquet.

@gukejun1
Copy link
Author

@rnyak It's the same. The only difference is that the file name extension is in the parquet format. Because GitHub cannot upload files with the parquet file name extension, the file name extension is changed to txt.
image

@gukejun1
Copy link
Author

@rnyak So, this code didn't work.
image

@gukejun1
Copy link
Author

@rnyak Because my graphics card supports up to cuda 11.3, so I reinstalled cupy-cuda to 113. Is it related to this? Does cupy-cuda 113 support populating missing values?

@rnyak
Copy link
Contributor

rnyak commented Feb 27, 2023

@gukejun1 what's your graphic card?

your sample set does not have any nulls in the label column. so I am skeptical that this line gives you error. you can remove this line and test it. you can do like below.

CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
LABEL_COLUMNS = ["label"]
COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS

num_buckets = 10000000
categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype='int32')
cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')


features = cat_features + cont_features

workflow = nvt.Workflow(features)
...
...

@gukejun1
Copy link
Author

gukejun1 commented Feb 27, 2023

@rnyak The error is still reported.
image
my graphic card is NVIDIA Tesla P4

@rnyak
Copy link
Contributor

rnyak commented Feb 27, 2023

@gukejun1 cudf supports Pascal architecture or better (Compute Capability >=6.0) . see this doc.

can you test if you are able to run the notebooks 01 and 02 in this folder?

  • can you share your pip list output in a .txt file please?
  • can you share the result of nvidia-smi ?
  • can you tell us your docker image? did you pull merlin-tensorflow:22.12 ?

thanks.

@rnyak
Copy link
Contributor

rnyak commented Feb 27, 2023

@gukejun1 the error looks like because of pandas, and looks like you are running on CPU not on GPU... Please confirm that the visible devices from the following code below does not return empty. if it is empty that means you dont use GPU..

 protocol = "tcp"  # "tcp" or "ucx"
    if numba.cuda.is_available():
        NUM_GPUS = list(range(len(numba.cuda.gpus)))
    else:
        NUM_GPUS = []
    visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Select devices to place workers

@gukejun1
Copy link
Author

gukejun1 commented Feb 28, 2023

@rnyak For the movie_lens case, 01 / 02 is successful.
1、
requirements.txt
2、
image

3、i use docker pull nvcr.io/nvidia/merlin/merlin-tensorflow:22.12 to get the docker images.

@gukejun1
Copy link
Author

gukejun1 commented Feb 28, 2023

@rnyak
it used GPU
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants