Skip to content

base

Base classes for composable Rago pipelines.

Classes:

  • ParametersBase

    Base class for declarative step configuration.

  • Pipeline

    Composable pipeline of arbitrary Rago steps.

  • PipelineBase

    Base pipeline class that can be built declaratively.

  • StepBase

    Abstract base for all pipeline steps.

Functions:

  • config_to_dict

    Normalize step configuration objects into a plain dictionary.

  • ensure_list

    Normalize scalar or iterable values into a list.

ParametersBase

ParametersBase(**kwargs: Any)

Bases: UserDict[str, Any]

Base class for declarative step configuration.

Methods:

  • apply

    Merge additional configuration into this object.

  • process

    Return the input unchanged for configuration-only objects.

Attributes:

  • params (dict[str, Any]) –

    Expose the underlying parameter mapping.

Source code in src/rago/base.py
62
63
def __init__(self, **kwargs: Any) -> None:
    super().__init__(kwargs)

params property

params: dict[str, Any]

Expose the underlying parameter mapping.

apply

apply(parameters: Any) -> None

Merge additional configuration into this object.

Source code in src/rago/base.py
86
87
88
def apply(self, parameters: Any) -> None:
    """Merge additional configuration into this object."""
    self.data.update(config_to_dict(parameters))

process

process(inp: Input) -> Output

Return the input unchanged for configuration-only objects.

Source code in src/rago/base.py
90
91
92
def process(self, inp: Input) -> Output:
    """Return the input unchanged for configuration-only objects."""
    return inp.to_output()

Pipeline

Pipeline()

Bases: PipelineBase

Composable pipeline of arbitrary Rago steps.

Methods:

  • process

    Run this pipeline when embedded as a step.

  • prompt

    Run the pipeline and return the primary result value.

  • run

    Run all configured steps for the given query and source.

Source code in src/rago/base.py
102
103
104
def __init__(self) -> None:
    self.stack: list[StepBase] = []
    self.global_params: list[Any] = []

process

process(inp: Input) -> Output

Run this pipeline when embedded as a step.

Source code in src/rago/base.py
137
138
139
140
141
142
143
144
145
146
def process(self, inp: Input) -> Output:
    """Run this pipeline when embedded as a step."""
    source = inp.get('source')
    data = inp.get('data', inp.get('content', source))
    extra = {
        key: value
        for key, value in inp.items()
        if key not in {'content', 'data', 'query', 'source'}
    }
    return self.run(inp.query, source=source, data=data, **extra)

prompt

prompt(
    query: str,
    source: Any = None,
    data: Any = None,
    **kwargs: Any,
) -> Any

Run the pipeline and return the primary result value.

Source code in src/rago/base.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def prompt(
    self,
    query: str,
    source: Any = None,
    data: Any = None,
    **kwargs: Any,
) -> Any:
    """Run the pipeline and return the primary result value."""
    output = self.run(query=query, source=source, data=data, **kwargs)
    if 'result' in output:
        return output.result
    if 'content' in output:
        return output.content
    return output

run

run(
    query: str = '',
    source: Any = None,
    data: Any = None,
    **kwargs: Any,
) -> Output

Run all configured steps for the given query and source.

Source code in src/rago/base.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def run(
    self,
    query: str = '',
    source: Any = None,
    data: Any = None,
    **kwargs: Any,
) -> Output:
    """Run all configured steps for the given query and source."""
    if source is None and data is not None:
        source = data

    content = kwargs.pop('content', None)
    if content is None:
        content = data if data is not None else source

    cur_output = Output(
        query=query,
        source=source,
        data=content,
        content=content,
        **kwargs,
    )

    for step in self.stack:
        cur_input = cur_output.as_input()
        cur_input.query = query
        cur_output = step.process(cur_input)

    return cur_output

PipelineBase

PipelineBase()

Bases: ABC

Base pipeline class that can be built declaratively.

Methods:

  • process

    Run this pipeline when embedded as a step.

  • run

    Run the pipeline.

Source code in src/rago/base.py
102
103
104
def __init__(self) -> None:
    self.stack: list[StepBase] = []
    self.global_params: list[Any] = []

process

process(inp: Input) -> Output

Run this pipeline when embedded as a step.

Source code in src/rago/base.py
137
138
139
140
141
142
143
144
145
146
def process(self, inp: Input) -> Output:
    """Run this pipeline when embedded as a step."""
    source = inp.get('source')
    data = inp.get('data', inp.get('content', source))
    extra = {
        key: value
        for key, value in inp.items()
        if key not in {'content', 'data', 'query', 'source'}
    }
    return self.run(inp.query, source=source, data=data, **extra)

run abstractmethod

run(
    query: str = '',
    source: Any = None,
    data: Any = None,
    **kwargs: Any,
) -> Output

Run the pipeline.

Source code in src/rago/base.py
127
128
129
130
131
132
133
134
135
@abstractmethod
def run(
    self,
    query: str = '',
    source: Any = None,
    data: Any = None,
    **kwargs: Any,
) -> Output:
    """Run the pipeline."""

StepBase

StepBase()

Bases: ABC

Abstract base for all pipeline steps.

Methods:

  • apply

    Apply attached configuration to the step.

  • process

    Process the current pipeline input.

Source code in src/rago/base.py
154
155
156
def __init__(self) -> None:
    self.cache: Any = None
    self.logs: dict[str, Any] = {}

apply

apply(parameters: Any) -> None

Apply attached configuration to the step.

Source code in src/rago/base.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def apply(self, parameters: Any) -> None:
    """Apply attached configuration to the step."""
    if parameters is None:
        return

    if _is_cache_backend(parameters):
        self.cache = parameters
        return

    if _is_vector_db(parameters):
        setattr(self, 'db', parameters)
        return

    if _is_text_splitter(parameters):
        setattr(self, 'splitter', parameters)
        return

    for key, value in config_to_dict(parameters).items():
        if key == 'cache':
            self.cache = value
        elif key == 'logs':
            self.logs = value if value is not None else {}
        else:
            setattr(self, key, value)

process abstractmethod

process(inp: Input) -> Output

Process the current pipeline input.

Source code in src/rago/base.py
178
179
180
@abstractmethod
def process(self, inp: Input) -> Output:
    """Process the current pipeline input."""

config_to_dict

config_to_dict(parameters: Any) -> dict[str, Any]

Normalize step configuration objects into a plain dictionary.

Source code in src/rago/base.py
32
33
34
35
36
37
38
39
40
def config_to_dict(parameters: Any) -> dict[str, Any]:
    """Normalize step configuration objects into a plain dictionary."""
    if parameters is None:
        return {}
    if isinstance(parameters, ParametersBase):
        return dict(parameters.params)
    if isinstance(parameters, Mapping):
        return dict(parameters)
    return {}

ensure_list

ensure_list(value: Any) -> list[Any]

Normalize scalar or iterable values into a list.

Source code in src/rago/base.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def ensure_list(value: Any) -> list[Any]:
    """Normalize scalar or iterable values into a list."""
    if value is None:
        return []
    if isinstance(value, list):
        return value
    if isinstance(value, tuple):
        return list(value)
    if isinstance(value, str):
        return [value]
    if isinstance(value, Iterable):
        return list(value)
    return [value]