Skip to content

augmented

Composable augmentation APIs for Rago.

Modules:

  • base

    Base classes for augmentation steps.

  • cohere

    Classes for augmentation with Cohere embeddings.

  • db

    Rago DB package.

  • fireworks

    Classes for augmentation with Fireworks embeddings.

  • openai

    Classes for augmentation with OpenAI embeddings.

  • sentence_transformer

    Classes for augmentation with hugging face.

  • spacy

    Classes for augmentation with SpaCy embeddings.

  • together

    Classes for augmentation with Together embeddings.

Classes:

  • Augmented

    Public augmentation wrapper that resolves a concrete backend lazily.

  • AugmentedBase

    Base class for all augmentation steps.

  • AugmentedParameters

    Parameters for configuring augmentation steps.

Augmented

Augmented(
    api_key: str = '',
    model_name: str = '',
    backend: str = '',
    engine: str = '',
    top_k: int | None = None,
    api_params: dict[str, Any] | None = None,
    db: Any = None,
    cache: Any = None,
    logs: dict[str, Any] | None = None,
)

Bases: StepBase

Public augmentation wrapper that resolves a concrete backend lazily.

Methods:

  • apply

    Apply declarative configuration to the augmentation wrapper.

  • process

    Process the current pipeline content with augmentation.

  • search

    Resolve the concrete augmenter and run search.

Source code in src/rago/augmented/__init__.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    api_key: str = '',
    model_name: str = '',
    backend: str = '',
    engine: str = '',
    top_k: int | None = None,
    api_params: dict[str, Any] | None = None,
    db: Any = None,
    cache: Any = None,
    logs: dict[str, Any] | None = None,
) -> None:
    super().__init__()
    self.backend = backend.lower() if backend else ''
    self.engine = engine.lower() if engine else ''
    self.params = AugmentedParameters(
        api_key=api_key,
        model_name=model_name,
        top_k=top_k,
        api_params=api_params or {},
    )
    self.db = db
    self.cache = cache
    self.logs = logs if logs is not None else {}

apply

apply(parameters: Any) -> None

Apply declarative configuration to the augmentation wrapper.

Source code in src/rago/augmented/__init__.py
68
69
70
71
72
73
74
75
76
77
78
79
def apply(self, parameters: Any) -> None:
    """Apply declarative configuration to the augmentation wrapper."""
    super().apply(parameters)
    for key, value in config_to_dict(parameters).items():
        if key == 'backend' and isinstance(value, str):
            self.backend = value.lower()
        elif key == 'engine' and isinstance(value, str):
            self.engine = value.lower()
        elif key == 'db':
            self.db = value
        else:
            self.params.params[key] = value

process

process(inp: Input) -> Output

Process the current pipeline content with augmentation.

Source code in src/rago/augmented/__init__.py
122
123
124
125
126
127
128
129
130
131
132
133
def process(self, inp: Input) -> Output:
    """Process the current pipeline content with augmentation."""
    content = inp.get('content', inp.get('data', inp.get('source')))
    result = self.search(
        inp.query,
        content,
        top_k=self.params.params.get('top_k', 0) or 0,
    )
    output = Output.from_input(inp)
    output.content = result
    output.data = result
    return output

search

search(
    query: str, documents: Any, top_k: int = 0
) -> list[str]

Resolve the concrete augmenter and run search.

Source code in src/rago/augmented/__init__.py
117
118
119
120
def search(self, query: str, documents: Any, top_k: int = 0) -> list[str]:
    """Resolve the concrete augmenter and run search."""
    augmented_instance = self._resolve()
    return augmented_instance.search(query, documents, top_k=top_k)

AugmentedBase

AugmentedBase(
    model_name: Optional[str] = None,
    db: DBBase | None = None,
    top_k: Optional[int] = None,
    api_key: str = '',
    api_params: dict[str, Any] | None = None,
    cache: Cache | None = None,
    logs: dict[str, Any] | None = None,
)

Bases: StepBase

Base class for all augmentation steps.

Methods:

  • apply

    Apply attached configuration to the step.

  • get_embedding

    Retrieve embeddings for the given texts.

  • process

    Run augmentation against the current pipeline content.

  • search

    Search for the most relevant documents.

Source code in src/rago/augmented/base.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def __init__(
    self,
    model_name: Optional[str] = None,
    db: DBBase | None = None,
    top_k: Optional[int] = None,
    api_key: str = '',
    api_params: dict[str, Any] | None = None,
    cache: Cache | None = None,
    logs: dict[str, Any] | None = None,
) -> None:
    super().__init__()
    self.api_key = api_key
    self.api_params = api_params or {}
    self.cache = cache
    self.logs = logs if logs is not None else {}
    self.db = db or FaissDB()
    self.top_k = top_k if top_k is not None else self.default_top_k
    self.model_name = (
        model_name if model_name is not None else self.default_model_name
    )
    self.model = None

    self._validate()
    self._load_optional_modules()
    self._setup()

apply

apply(parameters: Any) -> None

Apply attached configuration to the step.

Source code in src/rago/base.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def apply(self, parameters: Any) -> None:
    """Apply attached configuration to the step."""
    if parameters is None:
        return

    if _is_cache_backend(parameters):
        self.cache = parameters
        return

    if _is_vector_db(parameters):
        setattr(self, 'db', parameters)
        return

    if _is_text_splitter(parameters):
        setattr(self, 'splitter', parameters)
        return

    for key, value in config_to_dict(parameters).items():
        if key == 'cache':
            self.cache = value
        elif key == 'logs':
            self.logs = value if value is not None else {}
        else:
            setattr(self, key, value)

get_embedding

get_embedding(content: list[str]) -> EmbeddingType

Retrieve embeddings for the given texts.

Source code in src/rago/augmented/base.py
127
128
129
def get_embedding(self, content: list[str]) -> EmbeddingType:
    """Retrieve embeddings for the given texts."""
    raise Exception('Method not implemented.')

process

process(inp: Input) -> Output

Run augmentation against the current pipeline content.

Source code in src/rago/augmented/base.py
154
155
156
157
158
159
160
161
162
def process(self, inp: Input) -> Output:
    """Run augmentation against the current pipeline content."""
    query = str(inp.query)
    content = inp.get('content', inp.get('data', inp.get('source')))
    result = self.search(query, ensure_list(content), top_k=self.top_k)
    output = Output.from_input(inp)
    output.content = result
    output.data = result
    return output

search abstractmethod

search(
    query: str, documents: Any, top_k: int = 0
) -> list[str]

Search for the most relevant documents.

Source code in src/rago/augmented/base.py
131
132
133
@abstractmethod
def search(self, query: str, documents: Any, top_k: int = 0) -> list[str]:
    """Search for the most relevant documents."""

AugmentedParameters

AugmentedParameters(**kwargs: Any)

Bases: ParametersBase

Parameters for configuring augmentation steps.

Methods:

  • apply

    Merge additional configuration into this object.

  • process

    Return the input unchanged for configuration-only objects.

Attributes:

  • params (dict[str, Any]) –

    Expose the underlying parameter mapping.

Source code in src/rago/base.py
62
63
def __init__(self, **kwargs: Any) -> None:
    super().__init__(kwargs)

params property

params: dict[str, Any]

Expose the underlying parameter mapping.

apply

apply(parameters: Any) -> None

Merge additional configuration into this object.

Source code in src/rago/base.py
86
87
88
def apply(self, parameters: Any) -> None:
    """Merge additional configuration into this object."""
    self.data.update(config_to_dict(parameters))

process

process(inp: Input) -> Output

Return the input unchanged for configuration-only objects.

Source code in src/rago/base.py
90
91
92
def process(self, inp: Input) -> Output:
    """Return the input unchanged for configuration-only objects."""
    return inp.to_output()