Skip to content

feat(parquet): opt-in streaming reads for large data pages#880

Open
joechenrh wants to merge 42 commits into
apache:mainfrom
joechenrh:streaming-large-pages
Open

feat(parquet): opt-in streaming reads for large data pages#880
joechenrh wants to merge 42 commits into
apache:mainfrom
joechenrh:streaming-large-pages

Conversation

@joechenrh

Copy link
Copy Markdown
Contributor

Rationale for this change

Addresses #865. Today the reader materializes each full uncompressed data page before decoding, so peak memory scales with the page's uncompressed size — a problem for files with very large data pages (e.g. a 1 GB compressed / 1.4 GB uncompressed page). This adds an opt-in path that keeps peak memory near the requested batch size for the common large-page shape.

Approach was discussed and agreed on the issue.

What changes are included in this PR

Opt-in via ReaderProperties.EnablePageStreaming (default false, no behavior change). When set, eligible data pages skip full-page decompression and decode their values incrementally; every ineligible page falls back to the existing materialized path.

Scope (first cut):

  • Data Page V1 + V2, PLAIN encoding
  • Physical types byte_array and fixed_len_byte_array
  • Codecs UNCOMPRESSED / GZIP / BROTLI / ZSTD (explicit allowlist — Snappy is a raw block whereas its NewReader is the framed format, and LZ4_RAW has no streaming reader)

Design:

  • parquet/internal/encoding/streaming: a small ValueBuffer — an incremental byte source that reads from an io.Reader, slides as values are consumed, grows to fit a single oversized value, and owns the page's stream (drained + closed on release).
  • The materialized PlainByteArrayDecoder / PlainFixedLenByteArrayDecoder keep their []byte logic byte-for-byte, gaining only a src field + a one-line guard in Decode/Discard. Streaming decode lives beside them; SetData resets src, so a cached decoder reused for a materialized page reverts. Other decoders are untouched.
  • Page reader: for eligible pages, V2 levels are read raw (uncompressed, header byte-lengths) and V1 levels are peeled off the decompressed stream using the same length rules as LevelDecoder.SetData; values become a ValueBuffer over the compressed region. On page release the underlying stream is drained + closed so the reader lands on the next page header even if values were skipped.
  • Column reader: the same cached PLAIN decoder is fed via SetSource (streaming) or SetData (materialized). The public Page/DataPage interfaces are unchanged (the streaming accessor is reached via an unexported interface, so external implementations don't break).

Are these changes tested?

Yes.

  • All existing encoding tests pass unchanged (materialized parity); the streaming decoders are additionally exercised with a one-byte-at-a-time reader and oversized values.
  • End-to-end round-trip (parquet/file/streaming_page_test.go): streaming vs materialized reads are asserted identical across V1/V2 and each codec, for required and nullable columns (the nullable case covers the V1 def-level peel + streaming DecodeSpaced), including values larger than a page and the stream buffer.

Are there any user-facing changes?

One new opt-in field, ReaderProperties.EnablePageStreaming (default false). No change to existing behavior or public interfaces.

Follow-ups (out of scope here): fixed-width numeric PLAIN types, raw-Snappy and LZ4_RAW streaming, and a memory benchmark.

joechenrh and others added 11 commits July 1, 2026 13:42
Add a small streaming util package (parquet/internal/encoding/streaming) with
ValueBuffer, an incremental byte source (streamBuffer) that reads from an io.Reader,
slides as values are consumed, grows to fit oversized values, and owns the page's
stream (Close runs a caller-supplied cleanup). Plus a Decoder interface (SetSource).

The PLAIN BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY decoders gain a streaming path,
implemented as methods in their own files: the materialized []byte logic is kept
byte-for-byte and only gains a `src streaming.ValueBuffer` field plus a one-line
guard in Decode/Discard dispatching to decodeStreaming. SetData resets src, so a
cached decoder reused for a materialized page reverts.

Materialized parity: all existing encoding tests pass; streaming verified for
byte_array and FLBA with a one-byte-at-a-time reader and oversized values.

Refs apache#865

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add ReaderProperties.EnablePageStreaming (default off). Eligible data pages —
PLAIN-encoded V1/V2, byte_array/fixed_len_byte_array, unencrypted, with a
streaming-capable codec (UNCOMPRESSED/GZIP/BROTLI/ZSTD) — skip full-page
decompression in serializedPageReader.Next(): the rep/def levels are materialized
(V2 by header byte-lengths; V1 peeled off the decompressed stream by readLevelData
using the same length rules as LevelDecoder.SetData) and the values are exposed as a
streaming.ValueBuffer over the compressed region. The page holds just that source;
on release drainAndClose consumes any unread compressed bytes and closes the
decompressor so the reader lands on the next page header. Ineligible pages fall back
to the materialized path.

initDataDecoder resolves the decoder and delegates value feeding to setDecoderData,
which routes streaming pages to SetSource and materialized pages to SetData on the
same cached PLAIN decoder. End-to-end round-trip tests assert streaming and
materialized reads agree across V1/V2 and codecs, for required and nullable columns.

Refs apache#865

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…elData

Fold the chunk-constant streaming inputs (streamEnabled + physicalType +
compressType) into a single canStream bool, evaluated once when the page reader is
built via streamablePhysicalType/streamableCodec; streamingEligible is now one
expression. Remove the unused LevelData accessor and its DataPage interface method
(the level decoders read Data(), which for a streaming page is already the level
region), and trim a few comments.

Refs apache#865

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Rename canStream -> columnCanStream (the column/chunk-constant part: property +
physical type + codec, evaluated once) and streamingEligible -> pageCanStream (the
per-page check that adds the page's PLAIN encoding), so the two names spell out
which dimension each judges.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e per path

The streaming and materialized branches of Next() built the DataPageV1/V2 literal
twice with identical fields except buf and valueSource. Construct the page once with
the common fields, then set buf (materialized) or buf + valueSource (streaming) in
the respective branch.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The V1 and V2 streaming branches set buf and valueSource identically (buf wraps the
level region, valueSource is a stream buffer drained + closed on release), differing
only in how the level bytes, value reader, limit, and closer are obtained. Extract
that shared wiring into page.setStreamingValues.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Mirror V1: build one io.LimitReader over the whole compressed page, read V2's raw
(uncompressed) levels off it, then take the values from what remains, instead of
reading levels directly from p.r plus a second limit reader sized to the value
region. Same bytes consumed; V1 and V2 now start identically.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Both branches build the value reader/closer the same way (decompressor over the
page limit reader when compressed, else the limit itself); extract that into
page reader's valueStream. V1 passes true (its whole body is under the codec,
a no-op passthrough when the column is uncompressed); V2 passes the header's
IsCompressed. Also drop the now-obvious doc comments on the streaming helpers.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fix the stale "see streaming_decoder.go" reference in the decoders' src field docs
(those methods live in the decoder files now) and drop the redundant impl-method doc
for page.ValueSource (the DataPage interface already documents it).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adding ValueSource to the exported DataPage interface would break any external
type that implements it. Access it instead through an unexported streamingPage
interface that the built-in *DataPageV1/*DataPageV2 satisfy; DataPage is unchanged
from upstream.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@joechenrh joechenrh requested a review from zeroshade as a code owner July 1, 2026 07:16
joechenrh and others added 14 commits July 1, 2026 15:36
Address review feedback on the streaming read path:

- Gate streaming to the internal Column() path so the public
  GetColumnPageReader() always returns materialized pages, preserving
  Page.Data()'s "whole page" contract. Add a regression test.
- Close() now releases the current page, draining and closing a
  streaming page's value stream; the V1 level-read error path drains
  the page limit via drainAndClose.
- Return an error instead of panicking when a streaming page's decoder
  is not streaming-capable (checked type assertion).
- Enable streaming only when the actual codec implements
  compress.StreamingCodec (a codec registered via RegisterCodec need
  not), and disable it on Reset, which lacks the column schema.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Back out gating page streaming to the internal Column() path. Making the
public GetColumnPageReader keep a materialized-only Data() contract while
Column() streams would require either that split or a lazy-materializing
Data(); the level decoders read the level region through Page.Data(), so
an honest full-page Data() is not a small change. Leave the Data()
semantics of streaming pages for review to decide.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add page.LevelBuffer() and have initLevelDecodersV1/V2 read the rep/def
level region through it instead of Page.Data(). No internal path now
calls Data() on a streaming page, so the public semantics of Data() for
streaming pages can be decided independently.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Move the level-region accessor off the shared base page onto unexported
levelBuffer() methods on *DataPageV1/*DataPageV2, so dictionary pages no
longer expose it and it stays out of the public API.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
columnChunkReader.Close already releases its current page (the same
object the page reader holds), so having serializedPageReader.Close
release curPage too freed the page's buffer twice and could panic. The
column reader's release already drains a streaming page's value stream,
so drop the redundant release from the page reader.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Extract a dataPageBase embedded struct (embedded by DataPageV1/V2 but not
DictionaryPage) holding the value source and the levelBuffer/valueBuffer
accessors, so dictionary pages no longer carry streaming members. Rename
the accessor to valueBuffer() (field valueBuf) to match its type and pair
with levelBuffer(). initDataDecoder/setDecoderData now take the concrete
*dataPageBase, so the streamingPage interface and its type assertion are
gone.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…face

Add an unexported valueBuffer() to DataPage (safe: DataPage is already
unimplementable outside the package since Page.Encoding returns an
internal type, and the only implementers are *DataPageV1/V2). This lets
initDataDecoder/setDecoderData take DataPage instead of the concrete
*dataPageBase, so processPage keeps its single shared call after the
type switch.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fill(consumed, need) welded together "commit consumed bytes" and "ensure
need bytes"; the flush call Fill(pos, 0) was just an obscure advance.
Split into Advance(n) (mark consumed) and Fill(need) (ensure/return the
window). The streaming decoders now Advance before refilling and end with
a plain Advance(pos).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fill read only until need was met, so a reader returning small chunks
would refill (and slide) nearly per value. Read until the buffer is full
(or EOF) instead, making the batch size a property of the buffer rather
than of the reader. Safe because the value stream is a bounded page region.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The streaming decoders copied every value out because the buffer slides
and gets overwritten on refill. Add FillOwned: it returns bytes that stay
valid after later calls, so decodeStreaming aliases values (3-index cap,
non-nil empties) exactly like the materialized path — dropping a per-value
alloc+copy. A shared bit tracks whether the buffer has been handed out for
aliasing; while set, a refill moves to a fresh buffer (the old one stays
alive through its aliases) instead of sliding in place. Discard keeps
using the reuse-in-place Fill. White-box tests cover the alias invariant.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
discardStreaming was the only caller; start it from a nil window like
decodeStreaming so the first Fill fetches the bytes. Removes Bytes from
the interface and trims the value_buffer comments.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
discardStreaming was the only caller; start it from a nil window like
decodeStreaming so the first Fill fetches the bytes. Remove Bytes from
the interface and trim verbose comments in value_buffer and the decoders.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@zeroshade zeroshade left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling #865 — the incremental-decode design is clean and the commit series is easy to follow.

I reviewed the read path in depth and ran the new streaming tests plus the existing encoding and parquet/file suites with -race; all pass and materialized parity holds. The drain-before-advance ordering in Next() (release the previous page, then read the next header), the codec allowlist (correctly excluding raw-block Snappy even though it implements StreamingCodec), the V1 level byte-length parity with LevelDecoder.SetData, and the DecodeSpaced -> Decode streaming dispatch all check out.

A few things I'd like addressed before merge:

  1. Page.Data() contract on streaming pages (blocking). A streaming page's buf holds only the rep/def level region, so the public Page.Data() returns truncated bytes when a page is obtained via the public GetColumnPageReader with EnablePageStreaming set. Since internal callers now go through levelBuffer()/valueBuffer(), I'd like us to either gate streaming to the internal Column() path or document Data() explicitly (see inline).
  2. Unbounded allocation from an untrusted RLE length prefix in readLevelData.
  3. Test coverage for BIT_PACKED levels, the skip/Discard path, repeated columns, and ineligible-codec fallback.

Details inline. Nice work overall — with the Data() semantics resolved this is close.

mem: r.props.Allocator(),
// Streaming (EnablePageStreaming) inputs; only the unencrypted path is
// ever streaming-eligible, so columnCanStream is left false elsewhere.
columnCanStream: r.props.EnablePageStreaming &&

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling streaming here also affects the public GetColumnPageReader: a streaming DataPageV1/V2's buf holds only the rep/def level region, so Page.Data() returns just the levels instead of the whole page. That's a silent semantic change to a public method for anyone reading Data() off a data page with EnablePageStreaming set.

Now that the internal callers read levels/values through levelBuffer()/valueBuffer(), the original blocker to keeping GetColumnPageReader materialized-only (the revert in 2f5aab9) looks gone. Could we gate streaming to the internal Column() path, or — failing that — document Data() as level-only/undefined for streaming pages? This is the main thing I'd like resolved before merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add document for both EnablePageStreaming and Data() to clarify the change. My judgment is as follows. I wonder if you have any opinions:

I find that pqarrow also uses GetColumnPageReader, so I prefer explicitly document this semantic change. Besides, since this new option is off by default, existing downstream user codes shouldn't be affected when bumping the package. When a user (or an AI agent) wants to enable the option, they can find this changes from the comments from EnablePageStreaming.

n := int(binary.LittleEndian.Uint32(hdr[:]))
start := len(buf)
buf = append(buf, hdr[:]...)
buf = append(buf, make([]byte, n)...)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n comes straight from the 4-byte RLE length prefix in the page and is used to make([]byte, n) before any bounds check, so a corrupt/malicious V1 page with a bogus prefix (e.g. ~4 GiB) triggers a huge allocation before the following io.ReadFull fails.

The materialized LevelDecoder.SetData guards this with nbytes > len(data)-4 against the already-bounded page buffer; the streaming path has no equivalent ceiling (V1 carries no per-level byte length in the header). Please bound n (e.g. against lenCompressed/lenUncompressed) before allocating.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, updated to bound the allocation with uncompressed size in 3cd0076

Comment thread parquet/file/page_reader.go Outdated
}
case format.Encoding_BIT_PACKED:
bitWidth := bits.Len64(uint64(maxLvl))
n := int(bitutil.BytesForBits(int64(numValues) * int64(bitWidth)))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor/consistency: LevelDecoder.SetData computes this length with the overflow-checked shared_utils.Mul(nbuffered, bitWidth), whereas here it's a raw int64(numValues) * int64(bitWidth). Not exploitable for real level bit-widths, but matching the sibling code's checked multiply would keep the two paths in sync.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I didn't notice that my AI has modified this part during reviewing. Updated in aeeb992


// NewStreamBuffer returns a ValueBuffer over r.
func NewStreamBuffer(r io.Reader, onClose func() error) ValueBuffer {
return &streamBuffer{r: r, onClose: onClose, buf: make([]byte, defaultStreamBufferSize)}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two notes on the backing buffer:

  • It's a fresh 1 MiB (defaultStreamBufferSize) allocation per streaming page via the Go allocator rather than the configured memory.Allocator, so it's untracked by CheckedAllocator and adds GC churn on files with many eligible pages. Consider pooling it (and/or sizing relative to the read batch).
  • Worth calling out in the EnablePageStreaming doc that peak memory is really batch + ~1 MiB + largest-value growth + levels, not just the batch size.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 1b82270, I cap the size of the buffer with the uncompressed size and manage all the buffers with Allocator. And the comment for EnablePageStreaming is also aligned with the new behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, maybe we can't use allocator here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, we should store the buffer inside the valueBuffer, and resue them for each Decode call.

// Varied sizes: several small, some > 4096 (stream buffer), some > 1024 (page).
values := makeStreamTestValues([]int{65, 100, 5000, 200, 9000, 50, 4096, 1})

cases := []struct {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this ships new decode/peel logic, a few gaps I'd like covered:

  • BIT_PACKED levels — only RLE levels are exercised, but readLevelData has a BIT_PACKED branch.
  • Skip pathdiscardStreaming (via SeekToRow/skipRows) isn't covered; the tests only do full ReadBatch.
  • Repeated columns (maxRepLevel > 0) — the V1 rep-level peel in readLevelData is untested (current cases are flat required/optional).
  • Ineligible-codec fallback — a case with Snappy or LZ4_RAW + EnablePageStreaming=true asserting correct (materialized) reads would lock in the allowlist behavior.

@zeroshade zeroshade left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling #865 — the incremental-decode design is clean and the commit series is easy to follow.

I reviewed the read path in depth and ran the new streaming tests plus the existing encoding and parquet/file suites with -race; all pass and materialized parity holds. The drain-before-advance ordering in Next() (release the previous page, then read the next header), the codec allowlist (correctly excluding raw-block Snappy even though it implements StreamingCodec), the V1 level byte-length parity with LevelDecoder.SetData, and the DecodeSpaced -> Decode streaming dispatch all check out.

A few things I'd like addressed before merge:

  1. Page.Data() contract on streaming pages (blocking). A streaming page's buf holds only the rep/def level region, so the public Page.Data() returns truncated bytes when a page is obtained via the public GetColumnPageReader with EnablePageStreaming set. Since internal callers now go through levelBuffer()/valueBuffer(), I'd like us to either gate streaming to the internal Column() path or document Data() explicitly (see inline).
  2. Unbounded allocation from an untrusted RLE length prefix in readLevelData.
  3. Test coverage for BIT_PACKED levels, the skip/Discard path, repeated columns, and ineligible-codec fallback.

Details inline. Nice work overall — with the Data() semantics resolved this is close.

@zeroshade zeroshade dismissed their stale review July 2, 2026 16:30

Accidental duplicate submission — dismissing this copy; the identical active review remains.

readLevelData used the RLE length prefix to make([]byte, n) with no
bounds check, so a corrupt V1 page with a bogus prefix could trigger a
huge allocation before the following ReadFull failed. Bound the level
region by the uncompressed page size (as LevelDecoder.SetData does
against the page buffer), erroring on an out-of-range length. White-box
tests cover the reject and accept paths.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
joechenrh and others added 5 commits July 3, 2026 15:21
Match LevelDecoder.SetData: size the BIT_PACKED level region with the
overflow-checked utils.Mul instead of a raw int64 multiply, erroring on
overflow before allocating.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Merge the reject/accept cases into one test that varies only maxBytes over
a shared level region, and drop the redundant comments.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Allocate the streaming ValueBuffer's backing buffers from the configured
memory.Allocator instead of make, tracking each allocation and freeing it
on Close, so they are visible to CheckedAllocator and can be pooled. Cap
the initial buffer at min(1 MiB, uncompressed page size) so small pages
don't each grab a full buffer. Aliased values are now valid until the page
is released, matching the existing ReadBatchInPage (alias) / ReadBatch
(clone) contract.

Reorder NewStreamBuffer params (mem, r, maxSize, onClose), document the
real peak memory on EnablePageStreaming, and reword "materialized" to
"non-streaming"/"read whole". CheckedAllocator alloc/free-balance tests
added (unit and end-to-end).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
With EnablePageStreaming, a streaming data page's buf holds only the
rep/def level region, so Page.Data returns just the levels. Document that
on the interface rather than gating streaming off the public
GetColumnPageReader, which would also disable streaming for the pqarrow
(arrow) read path. Internal decoders read via levelBuffer()/valueBuffer()
and never call Data() on a streaming page; the property is off by default,
so existing readers are unaffected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Cross-reference the level-only Page.Data behavior at the property where
streaming is enabled, so callers see it at the decision point.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@joechenrh

joechenrh commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

I have tried my best to control the size (as well as the scope) of this PR, but it still exceeds 1,000 1,500 rows 😢. Since it's not a small fix, I think I need more testing before merging it.

joechenrh and others added 11 commits July 3, 2026 18:01
…le codec

Add the streaming coverage requested in review:

- readLevelData BIT_PACKED branch (white-box; the writer only emits RLE)
- ineligible-codec fallback: Snappy/LZ4_RAW with EnablePageStreaming must
  match the materialized read
- skip path: discardStreaming via Skip
- repeated column (maxRepLevel > 0): the V1 rep-level peel

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Point the caveat at the low-level GetColumnPageReader path and note that
reading via RowGroup.Column()/ReadBatch is unaffected, so callers can tell
at a glance whether enabling streaming touches their code.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Rename NewStreamBuffer's maxSize -> sizeHint (it's the initial size, not a
  hard cap; the buffer still grows for an oversized value).
- Rename setStreamingValues/drainAndClose limit -> rawSrc.
- Move streamablePhysicalType/streamableCodec to row_group_reader.go (their
  only caller).
- Reword the Fill lifetime contract, the V2 level-read comment, a decoder-test
  comment, and trim the EnablePageStreaming doc.
- Rename a test's shadowing `const cap` to batchSize.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Close() releases curPage so a direct PageReader user's streaming value
  stream (buffers + decompressor) is freed; Release() is made idempotent so
  the column-reader path (which also releases) doesn't double-free.
- SeekToPageWithRow releases curPage before repositioning p.r, so a streaming
  page's drain-on-release reads from the old position, not the seeked-to
  offset (corrupted reads on >1 MiB partially-read pages).

Tests: direct-Close allocator balance, cross-page skip, SeekToRow parity, and
a large-page partial-read seek that reproduces the drain bug.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
discardStreaming used Fill(n*typeLen), which makes streamBuffer allocate the
whole skipped region contiguously — a page-level burst on wide columns.
Discard in buffer-bounded chunks instead (the bytes need not be contiguous),
so peak stays ~the stream buffer regardless of n or typeLen.

Add TestPageStreamingFLBASkip: FLBA round-trip + cross-page skip parity.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The streaming NewReader allocated a fresh zstd.Decoder per data page, while
the block-decode path reuses a singleton. Pool the streaming decoders and
Reset them per page (Close returns to the pool via Reset(nil) instead of
freeing), so EnablePageStreaming doesn't pay decoder init on columns with
many small pages. Concurrency 1 keeps pooled decoders goroutine-free.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… test

Remove the self-evident pool/Close/Reset comments, and drop the dedicated
pool-reuse test (globalEncoderPool has none either); the zstd streaming
round-trip/seek/skip tests already exercise the pooled decoder.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Collapse the four byte-array writer helpers into one parameterized
  writeByteArrayStream (repetition/levels/page size), with a thin
  writeStreamTestColumn wrapper for the common case.
- Extract the shared ReadBatch drain loop into collectByteArray.
- Drop test doc comments that only restate a self-explaining test name.

No coverage change; ~120 fewer test lines.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- zstdcloser.Close early-returns when the decoder is already returned to the
  pool, so a double Close can't Put the same decoder twice.
- Add TestPageStreamingEngaged: a required column's streaming page Data() holds
  only the (empty) level region, so the test fails if eligibility silently
  falls back to the whole-page read.
- Add streaming decode error-path tests (truncated value, negative length,
  truncated FLBA).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The streaming value buffer handed values out via FillOwned and kept every
backing buffer alive until the page closed, so peak memory was the whole
uncompressed page — no better than reading it at once.

Redesign the value buffer as a reusable chunk allocator: bytes from Fill stay
valid until Recycle, and Decode aliases values in place (no copy), calling
Recycle at entry to reclaim the previous batch. Because Decode again fills a
full batch, level/value counts stay aligned and no pending coordination is
needed. readBatchInPage clips the batch on a streaming page toward the average
value width so one Decode's aliased values stay near the buffer size
(best-effort; skewed sizes may overshoot). Decoded values are unchanged.

- value buffer: Fill/Advance/Skip/Recycle chunk allocator, corrupt-length bound
- add streaming.ClipBatch; only clips when the value region exceeds one buffer
- tests: record-reader (pqarrow path) streaming read, buffer + clip edge cases

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- V2 streaming rejects a level region larger than the page body before allocating,
  so a corrupt header can't drive a huge allocation (V1 already bounded this)
- releaseValueStream resets the page clip, so a pooled page reused as a
  non-streaming page is not clipped and under-read
- Reset releases a live streaming page's value stream instead of dropping it

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@zeroshade zeroshade left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-review of the latest push — thanks for the very thorough follow-up. All three of my blocking items are addressed, and I verified locally (built ./parquet/..., ran the TestPageStreaming* suite with PARQUET_TEST_DATA set — green):

  1. Page.Data() contract — documented on both EnablePageStreaming and Data(), with streaming kept to the eligible internal path via columnCanStream. Your rationale (off by default, pqarrow uses the same accessor, documented semantic) is sound.
  2. Unbounded RLE level allocation — now bounded by the uncompressed page size in the streaming level read (setStreamingValues), and the BIT_PACKED length uses the checked multiply. Both inline points resolved.
  3. CoverageTestPageStreaming{ByteArraySkip,FLBASkip,Repeated,ReadBatchInPageRepeated,IneligibleCodecFallback,AllocatorBalance,PeakMemoryBounded,PageReaderCloseFreesStream} cover the skip / repeated / ineligible-codec / allocator-tracking paths I asked for.

Scope note: this pass verified the above + ran the streaming suite; I haven't deep-audited the newer perf commits (zstd decoder pooling, chunked FLBA skip, V2 streaming hardening). Since it's large and you mentioned wanting more testing, I'll hold a formal approve until you say it's ready — ping me for a final pass. Great iteration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants