Bases: GenericWriter
Write outputs to a Parquet file.
Source code in src/unitorch/cli/writers/__init__.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154 | def __init__(
self,
output_file: str,
nrows_per_sample: Optional[int] = None,
columns: Optional[List[str]] = None,
schema: Optional[str] = None,
compression: Optional[str] = "snappy",
):
self.columns = columns
self.skip_n_samples = 0
self.output_file = output_file
self.pq_writer = None
self.pq_schema = None if schema is None else eval(schema)
self.compression = compression
|
columns
instance-attribute
skip_n_samples
instance-attribute
output_file
instance-attribute
output_file = output_file
pq_writer
instance-attribute
pq_schema
instance-attribute
pq_schema = None if schema is None else eval(schema)
compression
instance-attribute
compression = compression
from_config
classmethod
from_config(config, **kwargs)
Source code in src/unitorch/cli/writers/__init__.py
| @classmethod
@config_defaults_init("core/writer/parquet")
def from_config(cls, config, **kwargs):
pass
|
process_start
process_start(outputs: WriterOutputs)
Source code in src/unitorch/cli/writers/__init__.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 | def process_start(self, outputs: "WriterOutputs"):
dataframe = outputs.to_pandas()
if self.columns is not None:
cols = set(dataframe.columns)
dataframe = dataframe[[c for c in self.columns if c in cols]]
pa_table = pa.Table.from_pandas(dataframe)
if self.pq_schema is None:
self.pq_schema = pa_table.schema
pa_table = pa.Table.from_pandas(dataframe, schema=self.pq_schema)
self.pq_writer = pq.ParquetWriter(
self.output_file,
self.pq_schema,
version="1.0",
use_dictionary=False,
flavor="spark",
compression=self.compression,
use_compliant_nested_type=True,
)
self.pq_writer.write_table(pa_table)
|
process_end
Source code in src/unitorch/cli/writers/__init__.py
| def process_end(self):
self.pq_writer.close()
|
process_chunk
process_chunk(outputs: WriterOutputs)
Source code in src/unitorch/cli/writers/__init__.py
184
185
186
187
188
189
190
191 | def process_chunk(self, outputs: "WriterOutputs"):
assert self.pq_writer is not None
dataframe = outputs.to_pandas()
if self.columns is not None:
cols = set(dataframe.columns)
dataframe = dataframe[[c for c in self.columns if c in cols]]
pa_table = pa.Table.from_pandas(dataframe, schema=self.pq_schema)
self.pq_writer.write_table(pa_table)
|