Skip to content

Fix flaky test_csv_to_json in Beam YAML SDK#38617

Draft
shunping wants to merge 1 commit into
apache:masterfrom
shunping:fix-test-csv-to-json
Draft

Fix flaky test_csv_to_json in Beam YAML SDK#38617
shunping wants to merge 1 commit into
apache:masterfrom
shunping:fix-test-csv-to-json

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

Below logs is from https://github.com/apache/beam/actions/runs/26350925447/job/77568997003?pr=38612

=================================== FAILURES ===================================
____________________ YamlTransformE2ETest.test_csv_to_json _____________________
[gw3] linux -- Python 3.11.15 /runner/_work/beam/beam/sdks/python/test-suites/tox/py311/build/srcs/sdks/python/target/.tox-py311/py311/bin/python

self = <apache_beam.yaml.yaml_transform_test.YamlTransformE2ETest testMethod=test_csv_to_json>

    def test_csv_to_json(self):
      try:
        import pandas as pd
      except ImportError:
        raise unittest.SkipTest('Pandas not available.')
    
      with tempfile.TemporaryDirectory() as tmpdir:
        data = pd.DataFrame([{'label': f'{i}a', 'rank': i} for i in range(1024)])
    
        input = os.path.join(tmpdir, 'input.csv')
        output = os.path.join(tmpdir, 'output.json')
        data.to_csv(input, index=False)
        with open(input, 'r') as f:
          lines = f.readlines()
        _LOGGER.debug("input.csv has these {lines} lines.")
        self.assertEqual(len(lines), len(data) + 1)  # +1 for header
    
        with beam.Pipeline() as p:
          result = p | YamlTransform(
              '''
              type: chain
              transforms:
                - type: ReadFromCsv
                  config:
                      path: %s
                - type: WriteToJson
                  config:
                      path: %s
                      num_shards: 1
                - type: LogForTesting
              ''' % (repr(input), repr(output)))
        all_output = list(glob.glob(output + "-*"))
        file_and_size = {f: os.path.getsize(f) for f in all_output}
        self.assertEqual(
            len(all_output),
            1,
            msg=f"Expected 1 shard file, but found {len(all_output)}. "
            f"Files & sizes (bytes): {file_and_size}")
        output_shard = all_output[0]
        result = pd.read_json(
            output_shard, orient='records',
            lines=True).sort_values('rank').reindex()
>       pd.testing.assert_frame_equal(data, result)

apache_beam/yaml/yaml_transform_test.py:291: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
testing.pyx:55: in pandas._libs.testing.assert_almost_equal
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   AssertionError: DataFrame.index are different
E   
E   DataFrame.index values are different (48.53516 %)
E   [left]:  RangeIndex(start=0, stop=1024, step=1)
E   [right]: Index([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,
E          ...
E          763, 764, 765, 766, 767, 768, 769, 770, 771, 772],
E         dtype='int64', length=1024)
E   At positional index 527, first diff: 527 != 773

testing.pyx:173: AssertionError

Sorting this DataFrame by `rank` preserves the permuted index values.
Using `.reindex()` does not reset the index, causing `assert_frame_equal`
to fail when comparing against the original sequential DataFrame.

Replaced `.reindex()` with `.reset_index(drop=True)` to correctly discard
the permuted index and reset it to a sequential RangeIndex matching the
original input.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant