Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dependencies = [
"mmh3>=4.0.0",
"msgspec>=0.19.0",
"msgpack>=1.1.0",
"objectstore-client>=0.0.11",
"objectstore-client>=0.0.13",
"openai>=1.3.5",
"orjson>=3.10.10",
"p4python>=2025.1.2767466",
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2612,7 +2612,7 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
name="sentry-api-0-organization-conduit-demo",
),
re_path(
r"^(?P<organization_id_or_slug>[^/]+)/objectstore/$",
r"^(?P<organization_id_or_slug>[^/]+)/objectstore/(?P<path>.*)$",
OrganizationObjectstoreEndpoint.as_view(),
name="sentry-api-0-organization-objectstore",
),
Expand Down
1 change: 1 addition & 0 deletions src/sentry/hybridcloud/apigateway/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"sentry-api-0-installable-preprod-artifact-download": 90.0,
"sentry-api-0-project-preprod-artifact-download": 90.0,
"sentry-api-0-project-preprod-artifact-size-analysis-download": 90.0,
"sentry-api-0-organization-objectstore": 90.0,
}

# stream 0.5 MB at a time
Expand Down
215 changes: 200 additions & 15 deletions src/sentry/objectstore/endpoints/organization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
from collections.abc import Callable, Generator
from typing import Any
from urllib.parse import urlparse
from wsgiref.util import is_hop_by_hop

import requests
from django.http import StreamingHttpResponse
from requests import Response as ExternalResponse
from rest_framework.request import Request
from rest_framework.response import Response

from sentry import features
# TODO(granian): Remove this and related code paths when we fully switch from uwsgi to granian
uwsgi: Any = None
try:
import uwsgi
except ImportError:
pass

from sentry import features, options
from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
Expand All @@ -14,24 +29,194 @@ class OrganizationObjectstoreEndpoint(OrganizationEndpoint):
publish_status = {
"GET": ApiPublishStatus.EXPERIMENTAL,
"PUT": ApiPublishStatus.EXPERIMENTAL,
"POST": ApiPublishStatus.EXPERIMENTAL,
"DELETE": ApiPublishStatus.EXPERIMENTAL,
}
owner = ApiOwner.FOUNDATIONAL_STORAGE
parser_classes = () # don't attempt to parse request data, so we can access the raw body in wsgi.input

def get(self, request: Request, organization: Organization) -> Response:
def _check_flag(self, request: Request, organization: Organization) -> Response | None:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
# TODO: implement
return Response(status=200)
return Response(
{
"error": "This endpoint requires the organizations:objectstore-endpoint feature flag."
},
status=403,
)
return None

def put(self, request: Request, organization: Organization) -> Response:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
# TODO: implement
return Response(status=200)
def get(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if response := self._check_flag(request, organization):
return response
return self._proxy(request, path)

def delete(self, request: Request, organization: Organization) -> Response:
if not features.has("organizations:objectstore-endpoint", organization, actor=request.user):
return Response(status=404)
# TODO: implement
return Response(status=200)
def put(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if response := self._check_flag(request, organization):
return response
return self._proxy(request, path)

def post(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if response := self._check_flag(request, organization):
return response
return self._proxy(request, path)

def delete(
self, request: Request, organization: Organization, path: str
) -> Response | StreamingHttpResponse:
if response := self._check_flag(request, organization):
return response
return self._proxy(request, path)

def _proxy(
self,
request: Request,
path: str,
) -> Response | StreamingHttpResponse:
assert request.method
target_url = get_target_url(path)

headers = dict(request.headers)

if (
request.method in ("PUT", "POST")
and not (headers.get("Transfer-Encoding") or "").lower() == "chunked"
):
return Response({"error": "Only Transfer-Encoding: chunked is supported"}, status=400)

headers.pop("Host", None)
headers.pop("Content-Length", None)
headers.pop("Transfer-Encoding", None)

stream: Generator[bytes] | ChunkedEncodingDecoder | None = None
wsgi_input = request.META.get("wsgi.input")

if "granian" in request.META.get("SERVER_SOFTWARE", "").lower():
stream = wsgi_input
# uwsgi and wsgiref will respectively raise an exception and hang when attempting to read wsgi.input while there's no body.
# For now, support bodies only on PUT and POST requests.
elif request.method in ("PUT", "POST"):
if uwsgi:

def stream_generator():
while True:
chunk = uwsgi.chunked_read()
if not chunk:
break
yield chunk

stream = stream_generator()
else: # assumes wsgiref, used in dev/test mode
stream = ChunkedEncodingDecoder(wsgi_input._read) # type: ignore[union-attr]

response = requests.request(
request.method,
url=target_url,
headers=headers,
data=stream,
params=dict(request.GET) if request.GET else None,
stream=True,
allow_redirects=False,
)
return stream_response(response)


def get_target_url(path: str) -> str:
base = options.get("objectstore.config")["base_url"].rstrip("/")
# `path` should be a relative path, only grab that part
path = urlparse(path).path
# Simply concatenate base and path, without resolving URLs
# This means that if the user supplies path traversal patterns like "/../", we include them literally rather than resolving them
# It's responsibility of Objectstore to deal with them correctly
target = base + "/" + path
return target


def stream_response(external_response: ExternalResponse) -> StreamingHttpResponse:
CHUNK_SIZE = 512 * 1024

def stream_generator() -> Generator[bytes]:
external_response.raw.decode_content = False
while True:
chunk = external_response.raw.read(CHUNK_SIZE)
if not chunk:
break
yield chunk

response = StreamingHttpResponse(
streaming_content=stream_generator(),
status=external_response.status_code,
)

for header, value in external_response.headers.items():
if header.lower() == "server":
continue
if not is_hop_by_hop(header):
response[header] = value

return response


class ChunkedEncodingDecoder:
"""
Wrapper around a read function that returns chunked transfer encoded data.
Provides a file-like interface to the decoded data stream.
This should only be needed in dev/test mode, when we need to manually decode wsgi.input.
"""

def __init__(self, read: Callable[[int], bytes]):
self._read = read
self._done = False
self._current_chunk_remaining = 0

def read(self, size: int = -1) -> bytes:
if self._done:
return b""

read = 0
buffer: list[bytes] = []
while size == -1 or read < size:
if self._current_chunk_remaining == 0:
# Read next chunk size line
size_line = b""
while not size_line.endswith(b"\r\n"):
byte = self._read(1)
if not byte:
self._done = True
return b"".join(buffer)
size_line += byte

chunk_size = int(size_line.strip(), 16)
if chunk_size == 0:
trail = self._read(2)
if trail != b"\r\n":
raise ValueError("Malformed chunk encoded stream")
self._done = True
return b"".join(buffer)

self._current_chunk_remaining = chunk_size
else:
# Read (part of) next chunk
to_read = (
min(self._current_chunk_remaining, size - read)
if size != -1
else self._current_chunk_remaining
)
chunk = self._read(to_read)
if not chunk:
raise ValueError("Unexpected end of stream")
buffer.append(chunk)
read += len(chunk)
self._current_chunk_remaining -= len(chunk)

if self._current_chunk_remaining == 0:
trail = self._read(2)
if trail != b"\r\n":
raise ValueError("Malformed chunk encoded stream")

return b"".join(buffer)
2 changes: 1 addition & 1 deletion static/app/utils/api/knownSentryApiUrls.generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ export type KnownSentryApiUrls =
| '/organizations/$organizationIdOrSlug/notifications/actions/'
| '/organizations/$organizationIdOrSlug/notifications/actions/$actionId/'
| '/organizations/$organizationIdOrSlug/notifications/available-actions/'
| '/organizations/$organizationIdOrSlug/objectstore/'
| '/organizations/$organizationIdOrSlug/objectstore/$path'
| '/organizations/$organizationIdOrSlug/onboarding-continuation-email/'
| '/organizations/$organizationIdOrSlug/onboarding-tasks/'
| '/organizations/$organizationIdOrSlug/ondemand-rules-stats/'
Expand Down
100 changes: 90 additions & 10 deletions tests/sentry/objectstore/endpoints/test_organization.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,103 @@
from sentry.testutils.cases import APITestCase
import pytest
import requests
from django.urls import reverse
from objectstore_client import Client, RequestError, Session, Usecase
from pytest_django.live_server_helper import LiveServer

from sentry.testutils.cases import TransactionTestCase
from sentry.testutils.helpers.features import with_feature
from sentry.testutils.silo import region_silo_test
from sentry.testutils.skips import requires_objectstore


@pytest.fixture(scope="function")
def local_live_server(request: pytest.FixtureRequest, live_server: LiveServer) -> None:
if hasattr(request, "cls"):
request.cls.live_server = live_server
request.node.live_server = live_server


@region_silo_test
class OrganizationObjectstoreEndpointTest(APITestCase):
@requires_objectstore
@pytest.mark.usefixtures("local_live_server")
class OrganizationObjectstoreEndpointTest(TransactionTestCase):
endpoint = "sentry-api-0-organization-objectstore"
live_server: LiveServer

def setUp(self) -> None:
super().setUp()
self.login_as(user=self.user)
self.organization = self.create_organization(owner=self.user)
self.api_key = self.create_api_key(
organization=self.organization,
scope_list=["org:admin"],
)

def test_feature_flag_disabled(self):
"""Without feature flag, returns 404"""
response = self.get_response(self.organization.slug)
assert response.status_code == 404
def get_endpoint_url(self) -> str:
path = reverse(
self.endpoint,
kwargs={
"organization_id_or_slug": self.organization.id,
"path": "",
},
)
return f"{self.live_server.url}{path}"

def get_auth_headers(self) -> dict[str, str]:
auth_header = self.create_basic_auth_header(self.api_key.key)
return {"Authorization": auth_header.decode()}

def get_session(self) -> Session:
client = Client(
self.get_endpoint_url(), connection_kwargs={"headers": self.get_auth_headers()}
)
session = client.session(Usecase("test"), org=self.organization.id)
return session

@with_feature("organizations:objectstore-endpoint")
def test_feature_flag_enabled(self):
"""With feature flag, endpoint is accessible"""
response = self.get_response(self.organization.slug)
assert response.status_code == 200
def test_health(self):
url = self.get_endpoint_url() + "health"
res = requests.get(url, headers=self.get_auth_headers())
res.raise_for_status()

@with_feature("organizations:objectstore-endpoint")
def test_full_cycle(self):
session = self.get_session()

object_key = session.put(b"test data")
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == b"test data"

new_key = session.put(b"new data", key=object_key)
assert new_key == object_key

retrieved = session.get(object_key)
assert retrieved.payload.read() == b"new data"

session.delete(object_key)

with pytest.raises(RequestError):
session.get(object_key)

@with_feature("organizations:objectstore-endpoint")
def test_uncompressed(self):
session = self.get_session()

object_key = session.put(b"test data", compression="none")
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == b"test data"

@with_feature("organizations:objectstore-endpoint")
def test_large_payload(self):
session = self.get_session()
data = b"A" * 1_000_000

object_key = session.put(data)
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == data
Loading
Loading