Source code for bonobo.nodes.io.json

import json
from collections import OrderedDict

from bonobo.config import Method
from bonobo.config.processors import ContextProcessor, use_context
from bonobo.constants import NOT_MODIFIED
from bonobo.nodes.io.base import FileHandler
from bonobo.nodes.io.file import FileReader, FileWriter


class JsonHandler(FileHandler):
    eol = ',\n'
    prefix, suffix = '[', ']'


class LdjsonHandler(FileHandler):
    eol = '\n'
    prefix, suffix = '', ''


[docs]class JsonReader(JsonHandler, FileReader): @Method(positional=False) def loader(self, file): return json.loads(file)
[docs] def read(self, file, *, fs): yield from self.loader(file.read())
__call__ = read
[docs]class LdjsonReader(LdjsonHandler, JsonReader): """ Read a stream of line-delimited JSON objects (one object per line). Not to be mistaken with JSON-LD (where LD stands for linked data). """
[docs] def read(self, file, *, fs): yield from map(self.loader, file)
__call__ = read
[docs]@use_context class JsonWriter(JsonHandler, FileWriter): @ContextProcessor def envelope(self, context, file, *, fs): file.write(self.prefix) yield file.write(self.suffix)
[docs] def write(self, file, context, *args, fs): """ Write a json row on the next line of file pointed by ctx.file. :param ctx: :param row: """ context.setdefault('lineno', 0) fields = context.get_input_fields() if fields: prefix = self.eol if context.lineno else '' self._write_line(file, prefix + json.dumps(OrderedDict(zip(fields, args)))) context.lineno += 1 else: for arg in args: prefix = self.eol if context.lineno else '' self._write_line(file, prefix + json.dumps(arg)) context.lineno += 1 return NOT_MODIFIED
__call__ = write
[docs]@use_context class LdjsonWriter(LdjsonHandler, JsonWriter): """ Write a stream of Line-delimited JSON objects (one object per line). Not to be mistaken with JSON-LD (where LD stands for linked data). """