Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dependencies = [
"mmh3>=4.0.0",
"msgspec>=0.19.0",
"msgpack>=1.1.0",
"objectstore-client>=0.0.11",
"objectstore-client>=0.0.13",
"openai>=1.3.5",
"orjson>=3.10.10",
"p4python>=2025.1.2767466",
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2612,7 +2612,7 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
name="sentry-api-0-organization-conduit-demo",
),
re_path(
r"^(?P<organization_id_or_slug>[^/]+)/objectstore/$",
r"^(?P<organization_id_or_slug>[^/]+)/objectstore/(?P<path>.*)$",
OrganizationObjectstoreEndpoint.as_view(),
name="sentry-api-0-organization-objectstore",
),
Expand Down
1 change: 1 addition & 0 deletions src/sentry/hybridcloud/apigateway/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"sentry-api-0-installable-preprod-artifact-download": 90.0,
"sentry-api-0-project-preprod-artifact-download": 90.0,
"sentry-api-0-project-preprod-artifact-size-analysis-download": 90.0,
"sentry-api-0-organization-objectstore": 90.0,
}

# stream 0.5 MB at a time
Expand Down
177 changes: 167 additions & 10 deletions src/sentry/objectstore/endpoints/organization.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,194 @@
from collections.abc import Callable, Generator
from typing import Literal
from urllib.parse import urljoin, urlparse
from wsgiref.util import is_hop_by_hop

import requests
from django.core.exceptions import SuspiciousOperation
from django.http import StreamingHttpResponse
from requests import Response as ExternalResponse
from rest_framework.request import Request
from rest_framework.response import Response

from sentry import features
from sentry import features, options
from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
from sentry.api.bases import OrganizationEndpoint
from sentry.models.organization import Organization

CHUNK_SIZE = 512 * 1024


@region_silo_endpoint
class OrganizationObjectstoreEndpoint(OrganizationEndpoint):
publish_status = {
"GET": ApiPublishStatus.EXPERIMENTAL,
"PUT": ApiPublishStatus.EXPERIMENTAL,
"POST": ApiPublishStatus.EXPERIMENTAL,
"DELETE": ApiPublishStatus.EXPERIMENTAL,
}
owner = ApiOwner.FOUNDATIONAL_STORAGE
parser_classes = () # don't attempt to parse request data, so we can access the raw wsgi.input

def get(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
return self._proxy("GET", path, request)

def get(self, request: Request, organization: Organization) -> Response:
def put(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
# TODO: implement
return Response(status=200)
return self._proxy("PUT", path, request)

def put(self, request: Request, organization: Organization) -> Response:
def post(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
# TODO: implement
return Response(status=200)
return self._proxy("POST", path, request)

def delete(self, request: Request, organization: Organization) -> Response:
def delete(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
# TODO: implement
return Response(status=200)
return self._proxy("DELETE", path, request)

def _proxy(
self,
method: Literal["GET", "PUT", "POST", "DELETE"],
path: str,
request: Request,
) -> Response | StreamingHttpResponse:

target_url = get_target_url(path)

headers = dict(request.headers)
if method in ("PUT", "POST") and not headers.get("Transfer-Encoding") == "chunked":
return Response("Only Transfer-Encoding: chunked is supported", status=400)

headers.pop("Host", None)
headers.pop("Content-Length", None)
headers.pop("Transfer-Encoding", None)

stream = None
if method in ("PUT", "POST"):
wsgi_input = request.META.get("wsgi.input")
if not wsgi_input:
return Response("Expected a request body", status=400)
stream = ChunkedEncodingDecoder(wsgi_input._read)

response = requests.request(
method,
url=target_url,
headers=headers,
data=stream,
params=dict(request.GET) if request.GET else None,
stream=True,
allow_redirects=False,
)
return stream_response(response)


class ChunkedEncodingDecoder:
"""
Wrapper around a read function returning chunked transfer encoded data.
Provides a file-like interface to the decoded data stream.
"""

def __init__(self, read: Callable[[int], bytes]):
self._read = read
self._done = False
self._current_chunk_remaining = 0

def read(self, size: int = -1) -> bytes:
if self._done:
return b""
if size == -1:
self._done = True
return self._read(-1)

read = 0
buffer: list[bytes] = []
while read < size:
if self._current_chunk_remaining == 0:
# Read next chunk size line
size_line = b""
while not size_line.endswith(b"\r\n"):
byte = self._read(1)
if not byte:
self._done = True
return b"".join(buffer)
size_line += byte

try:
chunk_size = int(size_line.strip(), 16)
except ValueError:
self._done = True
return b"".join(buffer)

if chunk_size == 0:
self._read(2) # Read trailing \r\n
self._done = True
return b"".join(buffer)

self._current_chunk_remaining = chunk_size
else:
to_read = min(self._current_chunk_remaining, size - read)
chunk = self._read(to_read)
if not chunk:
self._done = True
break
buffer.append(chunk)
read += len(chunk)
self._current_chunk_remaining -= len(chunk)

if self._current_chunk_remaining == 0:
self._read(2) # Read trailing \r\n

return b"".join(buffer)


def get_target_url(path: str) -> str:
base = options.get("objectstore.config")["base_url"].rstrip("/")
base_parsed = urlparse(base)

target = urljoin(base, path)
target_parsed = urlparse(target)

if (
target_parsed.scheme != base_parsed.scheme
or target_parsed.netloc != base_parsed.netloc
or not target.startswith(base)
):
raise SuspiciousOperation("Possible SSRF attempt")
if ".." in path:
raise SuspiciousOperation("Possible path traversal attempt")

return target


def stream_response(response: ExternalResponse) -> StreamingHttpResponse:
def stream() -> Generator[bytes]:
response.raw.decode_content = False
while True:
chunk = response.raw.read(CHUNK_SIZE)
if not chunk:
break
yield chunk

streamed_response = StreamingHttpResponse(
streaming_content=stream(),
status=response.status_code,
)

for header, value in response.headers.items():
if not is_hop_by_hop(header):
streamed_response[header] = value

return streamed_response
2 changes: 1 addition & 1 deletion static/app/utils/api/knownSentryApiUrls.generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ export type KnownSentryApiUrls =
| '/organizations/$organizationIdOrSlug/notifications/actions/'
| '/organizations/$organizationIdOrSlug/notifications/actions/$actionId/'
| '/organizations/$organizationIdOrSlug/notifications/available-actions/'
| '/organizations/$organizationIdOrSlug/objectstore/'
| '/organizations/$organizationIdOrSlug/objectstore/$path'
| '/organizations/$organizationIdOrSlug/onboarding-continuation-email/'
| '/organizations/$organizationIdOrSlug/onboarding-tasks/'
| '/organizations/$organizationIdOrSlug/ondemand-rules-stats/'
Expand Down
100 changes: 90 additions & 10 deletions tests/sentry/objectstore/endpoints/test_organization.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,103 @@
from sentry.testutils.cases import APITestCase
import pytest
import requests
from django.urls import reverse
from objectstore_client import Client, RequestError, Session, Usecase
from pytest_django.live_server_helper import LiveServer

from sentry.testutils.cases import TransactionTestCase
from sentry.testutils.helpers.features import with_feature
from sentry.testutils.silo import region_silo_test
from sentry.testutils.skips import requires_objectstore


@pytest.fixture(scope="function")
def local_live_server(request: pytest.FixtureRequest, live_server: LiveServer) -> None:
if hasattr(request, "cls"):
request.cls.live_server = live_server
request.node.live_server = live_server


@region_silo_test
class OrganizationObjectstoreEndpointTest(APITestCase):
@requires_objectstore
@pytest.mark.usefixtures("local_live_server")
class OrganizationObjectstoreEndpointTest(TransactionTestCase):
endpoint = "sentry-api-0-organization-objectstore"
live_server: LiveServer

def setUp(self) -> None:
super().setUp()
self.login_as(user=self.user)
self.organization = self.create_organization(owner=self.user)
self.api_key = self.create_api_key(
organization=self.organization,
scope_list=["org:admin"],
)

def test_feature_flag_disabled(self):
"""Without feature flag, returns 404"""
response = self.get_response(self.organization.slug)
assert response.status_code == 404
def get_endpoint_url(self) -> str:
path = reverse(
self.endpoint,
kwargs={
"organization_id_or_slug": self.organization.id,
"path": "",
},
)
return f"{self.live_server.url}{path}"

def get_auth_headers(self) -> dict[str, str]:
auth_header = self.create_basic_auth_header(self.api_key.key)
return {"Authorization": auth_header.decode()}

def get_session(self) -> Session:
client = Client(
self.get_endpoint_url(), connection_kwargs={"headers": self.get_auth_headers()}
)
session = client.session(Usecase("test"), org=self.organization.id)
return session

@with_feature("organizations:objectstore-endpoint")
def test_feature_flag_enabled(self):
"""With feature flag, endpoint is accessible"""
response = self.get_response(self.organization.slug)
assert response.status_code == 200
def test_health(self):
url = self.get_endpoint_url() + "health"
res = requests.get(url, headers=self.get_auth_headers())
res.raise_for_status()

@with_feature("organizations:objectstore-endpoint")
def test_full_cycle(self):
session = self.get_session()

object_key = session.put(b"test data")
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == b"test data"

new_key = session.put(b"new data", key=object_key)
assert new_key == object_key

retrieved = session.get(object_key)
assert retrieved.payload.read() == b"new data"

session.delete(object_key)

with pytest.raises(RequestError):
session.get(object_key)

@with_feature("organizations:objectstore-endpoint")
def test_uncompressed(self):
session = self.get_session()

object_key = session.put(b"test data", compression="none")
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == b"test data"

@with_feature("organizations:objectstore-endpoint")
def test_large_payload(self):
session = self.get_session()
data = b"A" * 1_000_000

object_key = session.put(data)
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == data
6 changes: 3 additions & 3 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading