Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/vaex-core/vaex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,22 @@ def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, cop
copy_index=copy_index)


def from_json_arrow(file, read_options=None, parse_options=None):
"""Create a DataFrame from a JSON file using Apache Arrow.

This is a much faster alternative to `pandas.read_json(file, lines=True)`.
The JSON file is read eagerly, and the resulting DataFrame is stored in memory.

:param str file: Path to the JSON file.
:param read_options: PyArrow JSON read options, see https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html
:param parse_options: PyArrow JSON parse options, see https://arrow.apache.org/docs/python/generated/pyarrow.json.ParseOptions.html
:return: DataFrame
"""
import vaex.json
ds = vaex.json.DatasetJSON(file, read_options=read_options, parse_options=parse_options)
return vaex.from_dataset(ds)


@docsubst
def from_records(records : List[Dict], array_type="arrow", defaults={}) -> vaex.dataframe.DataFrame:
'''Create a dataframe from a list of dict.
Expand Down
1 change: 1 addition & 0 deletions packages/vaex-core/vaex/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pyarrow as pa

import vaex
import vaex.encoding
import vaex.utils
import vaex.cache
from .array_types import supported_array_types, supported_arrow_array_types, string_types, is_string_type
Expand Down
12 changes: 6 additions & 6 deletions packages/vaex-core/vaex/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import vaex.dataset
import vaex.file
from vaex.dataset import Dataset, DatasetFile
from vaex.dataset import DatasetFile
from .itertools import pmap, pwait, buffer, consume, filter_none
from .multithreading import thread_count_default_io, get_main_io_pool

Expand All @@ -25,7 +25,7 @@ def file_chunks(file, chunk_size, newline_readahead):
file_size = file.tell()
file.seek(0)
begin_offset = 0

done = False
while not done:
# find the next newline boundary
Expand All @@ -50,7 +50,7 @@ def reader(file_offset=begin_offset, length=end_offset - begin_offset):

def file_chunks_mmap(file, chunk_size, newline_readahead):
"""Bytes chunks, split by chunk_size bytes, on newline boundaries

Using memory mapping (which avoids a memcpy)
"""
offset = 0
Expand All @@ -67,7 +67,7 @@ def file_chunks_mmap(file, chunk_size, newline_readahead):

file_map = mmap.mmap(file.fileno(), file_size, **kwargs)
data = memoryview(file_map)

done = False
while not done:
# find the next newline boundary
Expand Down Expand Up @@ -218,7 +218,7 @@ def close(self):

def _chunk_producer(self, columns, chunk_size=None, reverse=False, start=0, end=None):
pool = get_main_io_pool()

first = True
previous = None
for i, reader in enumerate(file_chunks_mmap(self.path, self.chunk_size, self.newline_readahead)):
Expand Down Expand Up @@ -287,7 +287,7 @@ def chunk_reader(reader=reader, first=first, previous=previous, fragment_info=fr
# we only need to cut off a piece of the end
length = end - row_start
table = table.slice(0, length)

# table = table.combine_chunks()
assert len(table)
chunks = dict(zip(table.column_names, table.columns))
Expand Down
35 changes: 23 additions & 12 deletions packages/vaex-core/vaex/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7027,7 +7027,7 @@ def export_csv_pandas(self, path, progress=None, chunk_size=default_chunk_size,
return

@docsubst
def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel=True, fs_options=None, fs=None):
def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel=True, fs_options=None, fs=None, backend='pandas', lines=False):
""" Exports the DataFrame to a CSV file.

:param to: filename or file object
Expand All @@ -7036,36 +7036,47 @@ def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel
:param parallel: {evaluate_parallel}
:param fs_options: {fs_options}
:param fs: {fs}
:param backend: Which backend to use for writting the JSON file. Can be "pandas" or "json".
:param lines: If True, each row is written as a separate JSON record. If False, dataframe is written as a list of JSON records.
:return:
"""

json = None # we may want to pass the module as parameter to use a faster library
import json as json_std
json = json or json_std

# not sure if we want to use pandas, it will treat datetime for us, but will convert null to nan
use_pandas = True

# we take on the '[' and ']' from each chunk, and insert it back ourselves
# and we also need to but ',' between each chunk
with vaex.progress.tree(progress, title="export(json)"), vaex.file.open(path=to, mode='wb', fs_options=fs_options, fs=fs) as f:
f.write(b"[")
if not lines:
f.write(b"[")
first = True
if use_pandas:
if backend == 'pandas':
for _i1, _i2, df in self.to_pandas_df(chunk_size=chunk_size, parallel=parallel):
if not first:
if (not first) and (not lines):
f.write(b", ")
first = False
f_temp = io.StringIO()
df.to_json(f_temp, orient='records')
f.write(f_temp.getvalue()[1:-1].encode('utf8'))
else:
df.to_json(f_temp, orient='records', lines=lines)
if lines:
f.write(f_temp.getvalue().encode('utf8'))
else:
f.write(f_temp.getvalue()[1:-1].encode('utf8'))
elif backend == 'json':
for _i1, _i2, records in self.to_records(chunk_size=chunk_size, parallel=parallel):
if not first:
if (not first) and (not lines):
f.write(b", ")
first = False
raw = json.dumps(records)[1:-1]
if (not first) and (lines):
raw = raw.replace('},', '}\n')
f.write(raw.encode("utf8"))
f.write(b"]")
f.write('\n'.encode('utf8'))
else:
raise ValueError(f"Unknown backend {backend}, should be 'pandas' or 'json'.")
if not lines:
f.write(b"]")


def _needs_copy(self, column_name):
import vaex.file.other
Expand Down
56 changes: 54 additions & 2 deletions packages/vaex-core/vaex/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import numpy as np
import pyarrow as pa

from frozendict import frozendict

import vaex
from vaex.encoding import Encoding
import vaex.dataset
import vaex.encoding


serializers = []
Expand Down Expand Up @@ -208,9 +211,58 @@ def default(self, obj):
class VaexJsonDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)

def object_hook(self, dct):
for serializer in serializers:
if serializer.can_decode(dct):
return serializer.decode(dct)
return dct


@vaex.dataset.register
class DatasetJSON(vaex.dataset.DatasetFile):
snake_name = "arrow-json"

def __init__(self, path, read_options=None, parse_options=None, fs=None, fs_options={}):
super(DatasetJSON, self).__init__(path, fs=fs, fs_options=fs_options)
self.read_options = read_options
self.parse_options = parse_options
self._read_file()

@property
def _fingerprint(self):
fp = vaex.file.fingerprint(self.path, fs_options=self.fs_options, fs=self.fs)
return f"dataset-{self.snake_name}-{fp}"

def _read_file(self):
import pyarrow.json

with vaex.file.open(self.path, fs=self.fs, fs_options=self.fs_options, for_arrow=True) as f:
try:
codec = pa.Codec.detect(self.path)
except Exception:
codec = None
if codec:
f = pa.CompressedInputStream(f, codec.name)
self._arrow_table = pyarrow.json.read_json(f, read_options=self.read_options, parse_options=self.parse_options)
self._columns = dict(zip(self._arrow_table.schema.names, self._arrow_table.columns))
self._set_row_count()
self._ids = frozendict({name: vaex.cache.fingerprint(self._fingerprint, name) for name in self._columns})

def _encode(self, encoding):
spec = super()._encode(encoding)
del spec["write"]
return spec

def __getstate__(self):
state = super().__getstate__()
state["read_options"] = self.read_options
state["parse_options"] = self.parse_options
return state

def __setstate__(self, state):
super().__setstate__(state)
self._read_file()

def close(self):
pass
9 changes: 4 additions & 5 deletions packages/vaex-core/vaex/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import ast
import collections
import concurrent.futures
import contextlib
import functools
import json
import math
import os
import platform
import re
Expand All @@ -27,7 +25,6 @@
import six
import yaml

from .json import VaexJsonEncoder, VaexJsonDecoder
import vaex.file


Expand Down Expand Up @@ -296,14 +293,15 @@ def yaml_load(f):


def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True):
import vaex.json
file, path = vaex.file.file_and_path(file, mode='w', fs_options=fs_options, fs=fs)
try:
if path:
base, ext = os.path.splitext(path)
else:
ext = '.json' # default
if ext == ".json":
json.dump(data, file, indent=2, cls=VaexJsonEncoder if old_style else None)
json.dump(data, file, indent=2, cls=vaex.json.VaexJsonEncoder if old_style else None)
elif ext == ".yaml":
yaml_dump(file, data)
else:
Expand All @@ -313,14 +311,15 @@ def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True):


def read_json_or_yaml(file, fs_options={}, fs=None, old_style=True):
import vaex.json
file, path = vaex.file.file_and_path(file, fs_options=fs_options, fs=fs)
try:
if path:
base, ext = os.path.splitext(path)
else:
ext = '.json' # default
if ext == ".json":
return json.load(file, cls=VaexJsonDecoder if old_style else None) or {}
return json.load(file, cls=vaex.json.VaexJsonDecoder if old_style else None) or {}
elif ext == ".yaml":
return yaml_load(file) or {}
else:
Expand Down
29 changes: 29 additions & 0 deletions tests/from_json_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from common import *
import tempfile

import vaex


def test_from_json(ds_local):
df = ds_local
Expand All @@ -18,3 +20,30 @@ def test_from_json(ds_local):
assert tmp_df.x.tolist() == df.x.tolist()
assert tmp_df.bool.tolist() == df.bool.tolist()


@pytest.mark.parametrize("backend", ["pandas", "json"])
@pytest.mark.parametrize("lines", [False, True])
def test_from_and_export_json(tmpdir, ds_local, backend, lines):
df = ds_local
df = df.drop(columns=['datetime'])
if 'timedelta' in df:
df = df.drop(columns=['timedelta'])
if 'obj' in df:
df = df.drop(columns=['obj'])

# Create temporary json files
tmp = str(tmpdir.join('test.json'))
df.export_json(tmp, backend=backend, lines=lines)

# Check if file can be read with default (pandas) backend
df_read = vaex.from_json(tmp, lines=lines)
assert df.shape == df_read.shape
assert df.x.tolist() == df_read.x.tolist()
assert df.get_column_names() == df_read.get_column_names()

# If lines is True, check if the file can be read with the from_json_arrow function
if lines:
df_read_arrow = vaex.from_json_arrow(tmp)
assert df.shape == df_read_arrow.shape
assert df.x.tolist() == df_read_arrow.x.tolist()
assert df.get_column_names() == df_read_arrow.get_column_names()