Port last integration tests to python (#2966)

* port custom ca cert test to python
This commit is contained in:
Hubert Deng 2024-04-16 15:51:23 -07:00 committed by GitHub
parent 3d63c9b79e
commit b5237d2a62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 301 additions and 127 deletions

View File

@ -1,6 +1,7 @@
import subprocess
import os
import subprocess
import time
import httpx
import pytest
@ -10,8 +11,10 @@ TEST_USER = "test@example.com"
TEST_PASS = "test123TEST"
TIMEOUT_SECONDS = 60
def pytest_addoption(parser):
parser.addoption("--customizations", default="disabled")
parser.addoption("--customizations", default="disabled")
@pytest.fixture(scope="session", autouse=True)
def configure_self_hosted_environment(request):
@ -28,23 +31,23 @@ def configure_self_hosted_environment(request):
raise AssertionError("timeout waiting for self-hosted to come up")
if request.config.getoption("--customizations") == "enabled":
os.environ['TEST_CUSTOMIZATIONS'] = "enabled"
script_content = '''\
os.environ["TEST_CUSTOMIZATIONS"] = "enabled"
script_content = """\
#!/bin/bash
touch /created-by-enhance-image
apt-get update
apt-get install -y gcc libsasl2-dev python-dev libldap2-dev libssl-dev
'''
"""
with open('sentry/enhance-image.sh', 'w') as script_file:
with open("sentry/enhance-image.sh", "w") as script_file:
script_file.write(script_content)
# Set executable permissions for the shell script
os.chmod('sentry/enhance-image.sh', 0o755)
os.chmod("sentry/enhance-image.sh", 0o755)
# Write content to the requirements.txt file
with open('sentry/requirements.txt', 'w') as req_file:
req_file.write('python-ldap\n')
os.environ['MINIMIZE_DOWNTIME'] = "1"
with open("sentry/requirements.txt", "w") as req_file:
req_file.write("python-ldap\n")
os.environ["MINIMIZE_DOWNTIME"] = "1"
subprocess.run(["./install.sh"], check=True)
# Create test user
subprocess.run(
@ -68,7 +71,8 @@ apt-get install -y gcc libsasl2-dev python-dev libldap2-dev libssl-dev
text=True,
)
@pytest.fixture()
def setup_backup_restore_env_variables():
os.environ['SENTRY_DOCKER_IO_DIR'] = os.path.join(os.getcwd(), 'sentry')
os.environ['SKIP_USER_CREATION'] = "1"
os.environ["SENTRY_DOCKER_IO_DIR"] = os.path.join(os.getcwd(), "sentry")
os.environ["SKIP_USER_CREATION"] = "1"

View File

@ -1,46 +0,0 @@
#!/usr/bin/env bash
set -e
export COMPOSE_FILE=docker-compose.yml:_integration-test/custom-ca-roots/docker-compose.test.yml
TEST_NGINX_CONF_PATH=_integration-test/custom-ca-roots/nginx
CUSTOM_CERTS_PATH=certificates
# generate tightly constrained CA
# NB: `-addext` requires LibreSSL 3.1.0+, or OpenSSL (brew install openssl)
openssl req -x509 -new -nodes -newkey rsa:2048 -keyout $TEST_NGINX_CONF_PATH/ca.key \
-sha256 -days 1 -out $TEST_NGINX_CONF_PATH/ca.crt -batch \
-subj "/CN=TEST CA *DO NOT TRUST*" \
-addext "keyUsage = critical, keyCertSign, cRLSign" \
-addext "nameConstraints = critical, permitted;DNS:self.test"
## Lines like the following are debug helpers ...
# openssl x509 -in nginx/ca.crt -text -noout
mkdir -p $CUSTOM_CERTS_PATH
cp $TEST_NGINX_CONF_PATH/ca.crt $CUSTOM_CERTS_PATH/test-custom-ca-roots.crt
# generate server certificate
openssl req -new -nodes -newkey rsa:2048 -keyout $TEST_NGINX_CONF_PATH/self.test.key \
-addext "subjectAltName=DNS:self.test" \
-out $TEST_NGINX_CONF_PATH/self.test.req -batch -subj "/CN=Self Signed with CA Test Server"
# openssl req -in nginx/self.test.req -text -noout
openssl x509 -req -in $TEST_NGINX_CONF_PATH/self.test.req -CA $TEST_NGINX_CONF_PATH/ca.crt -CAkey $TEST_NGINX_CONF_PATH/ca.key \
-extfile <(printf "subjectAltName=DNS:self.test") \
-CAcreateserial -out $TEST_NGINX_CONF_PATH/self.test.crt -days 1 -sha256
# openssl x509 -in nginx/self.test.crt -text -noout
# sanity check that signed certificate passes OpenSSL's validation
openssl verify -CAfile $TEST_NGINX_CONF_PATH/ca.crt $TEST_NGINX_CONF_PATH/self.test.crt
# self signed certificate, for sanity check of not just accepting all certs
openssl req -x509 -newkey rsa:2048 -nodes -days 1 -keyout $TEST_NGINX_CONF_PATH/fake.test.key \
-out $TEST_NGINX_CONF_PATH/fake.test.crt -addext "subjectAltName=DNS:fake.test" -subj "/CN=Self Signed Test Server"
# openssl x509 -in nginx/fake.test.crt -text -noout
cp _integration-test/custom-ca-roots/test.py sentry/test-custom-ca-roots.py
docker compose --ansi never up -d fixture-custom-ca-roots

View File

@ -1,4 +0,0 @@
#!/usr/bin/env bash
$dc rm -s -f -v fixture-custom-ca-roots
rm -f certificates/test-custom-ca-roots.crt sentry/test-custom-ca-roots.py
unset COMPOSE_FILE

View File

@ -1,15 +1,16 @@
import unittest
import requests
class CustomCATests(unittest.TestCase):
def test_valid_self_signed(self):
self.assertEqual(requests.get("https://self.test").text, 'ok')
self.assertEqual(requests.get("https://self.test").text, "ok")
def test_invalid_self_signed(self):
with self.assertRaises(requests.exceptions.SSLError):
requests.get("https://fail.test")
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@ -1,27 +0,0 @@
#!/usr/bin/env bash
set -ex
source install/_lib.sh
source install/dc-detect-version.sh
echo "${_group}Test that sentry-admin works..."
echo "Global help documentation..."
global_help_doc=$(/bin/bash --help)
if ! echo "$global_help_doc" | grep -q "^Usage: ./sentry-admin.sh"; then
echo "Assertion failed: Incorrect binary name in global help docs"
exit 1
fi
if ! echo "$global_help_doc" | grep -q "SENTRY_DOCKER_IO_DIR"; then
echo "Assertion failed: Missing SENTRY_DOCKER_IO_DIR global help doc"
exit 1
fi
echo "Command-specific help documentation..."
command_help_doc=$(/bin/bash permissions --help)
if ! echo "$command_help_doc" | grep -q "^Usage: ./sentry-admin.sh permissions"; then
echo "Assertion failed: Incorrect binary name in command-specific help docs"
exit 1
fi

View File

@ -1,25 +1,69 @@
import subprocess
import os
import subprocess
def test_sentry_admin(setup_backup_restore_env_variables):
sentry_admin_sh = os.path.join(os.getcwd(), "sentry-admin.sh")
output = subprocess.run(
[sentry_admin_sh, "--help"], check=True, capture_output=True, encoding="utf8"
).stdout
assert "Usage: ./sentry-admin.sh" in output
assert "SENTRY_DOCKER_IO_DIR" in output
output = subprocess.run(
[sentry_admin_sh, "permissions", "--help"],
check=True,
capture_output=True,
encoding="utf8",
).stdout
assert "Usage: ./sentry-admin.sh permissions" in output
def test_backup(setup_backup_restore_env_variables):
# Docker was giving me permissioning issues when trying to create this file and write to it even after giving read + write access
# to group and owner. Instead, try creating the empty file and then give everyone write access to the backup file
file_path = os.path.join(os.getcwd(), 'sentry', 'backup.json')
sentry_admin_sh = os.path.join(os.getcwd(), 'sentry-admin.sh')
open(file_path, 'a', encoding='utf8').close()
file_path = os.path.join(os.getcwd(), "sentry", "backup.json")
sentry_admin_sh = os.path.join(os.getcwd(), "sentry-admin.sh")
open(file_path, "a", encoding="utf8").close()
os.chmod(file_path, 0o666)
assert os.path.getsize(file_path) == 0
subprocess.run([sentry_admin_sh, "export", "global", "/sentry-admin/backup.json", "--no-prompt"], check=True)
subprocess.run(
[
sentry_admin_sh,
"export",
"global",
"/sentry-admin/backup.json",
"--no-prompt",
],
check=True,
)
assert os.path.getsize(file_path) > 0
def test_import(setup_backup_restore_env_variables):
# Bring postgres down and recreate the docker volume
subprocess.run(["docker", "compose", "--ansi", "never", "stop", "postgres"], check=True)
subprocess.run(["docker", "compose", "--ansi", "never", "rm", "-f", "-v", "postgres"], check=True)
subprocess.run(
["docker", "compose", "--ansi", "never", "stop", "postgres"], check=True
)
subprocess.run(
["docker", "compose", "--ansi", "never", "rm", "-f", "-v", "postgres"],
check=True,
)
subprocess.run(["docker", "volume", "rm", "sentry-postgres"], check=True)
subprocess.run(["docker", "volume", "create", "--name=sentry-postgres"], check=True)
subprocess.run(["docker", "compose", "--ansi", "never", "run", "web", "upgrade", "--noinput"], check=True)
subprocess.run(
["docker", "compose", "--ansi", "never", "run", "web", "upgrade", "--noinput"],
check=True,
)
subprocess.run(["docker", "compose", "--ansi", "never", "up", "-d"], check=True)
sentry_admin_sh = os.path.join(os.getcwd(), 'sentry-admin.sh')
subprocess.run([sentry_admin_sh, "import", "global", "/sentry-admin/backup.json", "--no-prompt"], check=True)
sentry_admin_sh = os.path.join(os.getcwd(), "sentry-admin.sh")
subprocess.run(
[
sentry_admin_sh,
"import",
"global",
"/sentry-admin/backup.json",
"--no-prompt",
],
check=True,
)

View File

@ -1,14 +1,22 @@
import subprocess
import datetime
import json
import os
import re
import shutil
import subprocess
import time
from functools import lru_cache
from bs4 import BeautifulSoup
from typing import Callable
import httpx
import pytest
import sentry_sdk
import time
import json
import re
from typing import Callable
from bs4 import BeautifulSoup
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
SENTRY_CONFIG_PY = "sentry/sentry.conf.py"
SENTRY_TEST_HOST = os.getenv("SENTRY_TEST_HOST", "http://localhost:9000")
@ -137,25 +145,218 @@ def test_cleanup_crons_running():
assert len(cleanup_crons) > 0
def test_custom_cas():
try:
subprocess.run(["./_integration-test/custom-ca-roots/setup.sh"], check=True)
subprocess.run(
[
"docker",
"compose",
"--ansi",
"never",
"run",
"--no-deps",
"web",
"python3",
"/etc/sentry/test-custom-ca-roots.py",
],
check=True,
def test_custom_certificate_authorities():
# Set environment variable
os.environ["COMPOSE_FILE"] = (
"docker-compose.yml:_integration-test/custom-ca-roots/docker-compose.test.yml"
)
test_nginx_conf_path = "_integration-test/custom-ca-roots/nginx"
custom_certs_path = "certificates"
# Generate tightly constrained CA
ca_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
ca_name = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, "TEST CA *DO NOT TRUST*")]
)
ca_cert = (
x509.CertificateBuilder()
.subject_name(ca_name)
.issuer_name(ca_name)
.public_key(ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=1))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=False,
key_encipherment=False,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
finally:
subprocess.run(["./_integration-test/custom-ca-roots/teardown.sh"], check=True)
.add_extension(
x509.NameConstraints([x509.DNSName("self.test")], None), critical=True
)
.sign(private_key=ca_key, algorithm=hashes.SHA256(), backend=default_backend())
)
ca_key_path = f"{test_nginx_conf_path}/ca.key"
ca_crt_path = f"{test_nginx_conf_path}/ca.crt"
with open(ca_key_path, "wb") as key_file:
key_file.write(
ca_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
with open(ca_crt_path, "wb") as cert_file:
cert_file.write(ca_cert.public_bytes(serialization.Encoding.PEM))
# Create custom certs path and copy ca.crt
os.makedirs(custom_certs_path, exist_ok=True)
shutil.copyfile(ca_crt_path, f"{custom_certs_path}/test-custom-ca-roots.crt")
# Generate server key and certificate
self_test_key_path = os.path.join(test_nginx_conf_path, "self.test.key")
self_test_csr_path = os.path.join(test_nginx_conf_path, "self.test.csr")
self_test_cert_path = os.path.join(test_nginx_conf_path, "self.test.crt")
self_test_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
self_test_req = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(
NameOID.COMMON_NAME, "Self Signed with CA Test Server"
)
]
)
)
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("self.test")]), critical=False
)
.sign(self_test_key, hashes.SHA256())
)
self_test_cert = (
x509.CertificateBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(
NameOID.COMMON_NAME, "Self Signed with CA Test Server"
)
]
)
)
.issuer_name(ca_cert.issuer)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=1))
.public_key(self_test_req.public_key())
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("self.test")]), critical=False
)
.sign(private_key=ca_key, algorithm=hashes.SHA256())
)
# Save server key, CSR, and certificate
with open(self_test_key_path, "wb") as key_file:
key_file.write(
self_test_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
with open(self_test_csr_path, "wb") as csr_file:
csr_file.write(self_test_req.public_bytes(serialization.Encoding.PEM))
with open(self_test_cert_path, "wb") as cert_file:
cert_file.write(self_test_cert.public_bytes(serialization.Encoding.PEM))
# Generate server key and certificate for fake.test
fake_test_key_path = os.path.join(test_nginx_conf_path, "fake.test.key")
fake_test_cert_path = os.path.join(test_nginx_conf_path, "fake.test.crt")
fake_test_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
fake_test_cert = (
x509.CertificateBuilder()
.subject_name(
x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, "Self Signed Test Server")]
)
)
.issuer_name(
x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, "Self Signed Test Server")]
)
)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=1))
.public_key(fake_test_key.public_key())
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("fake.test")]), critical=False
)
.sign(private_key=fake_test_key, algorithm=hashes.SHA256())
)
# Save server key and certificate for fake.test
with open(fake_test_key_path, "wb") as key_file:
key_file.write(
fake_test_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
# Our asserts for this test case must be executed within the web container, so we are copying a python test script into the mounted sentry directory
with open(fake_test_cert_path, "wb") as cert_file:
cert_file.write(fake_test_cert.public_bytes(serialization.Encoding.PEM))
shutil.copyfile(
"_integration-test/custom-ca-roots/test.py",
"sentry/test-custom-ca-roots.py",
)
subprocess.run(
["docker", "compose", "--ansi", "never", "up", "-d", "fixture-custom-ca-roots"],
check=True,
)
subprocess.run(
[
"docker",
"compose",
"--ansi",
"never",
"run",
"--no-deps",
"web",
"python3",
"/etc/sentry/test-custom-ca-roots.py",
],
check=True,
)
subprocess.run(
[
"docker",
"compose",
"--ansi",
"never",
"rm",
"-s",
"-f",
"-v",
"fixture-custom-ca-roots",
],
check=True,
)
# Remove files
os.remove(f"{custom_certs_path}/test-custom-ca-roots.crt")
os.remove("sentry/test-custom-ca-roots.py")
# Unset environment variable
if "COMPOSE_FILE" in os.environ:
del os.environ["COMPOSE_FILE"]
def test_receive_transaction_events(client_login):
@ -235,7 +436,7 @@ def test_customizations():
"python",
"-c",
"import ldap",
]
],
]
for command in commands:
result = subprocess.run(command, check=False)

View File

@ -19,7 +19,8 @@ on the host filesystem. Commands that write files should write them to the '/sen
# Actual invocation that runs the command in the container.
invocation() {
$dc run -v "$VOLUME_MAPPING" --rm -T -e SENTRY_LOG_LEVEL=CRITICAL web "$@"
output=$($dc run -v "$VOLUME_MAPPING" --rm -T -e SENTRY_LOG_LEVEL=CRITICAL web "$@" 2>&1)
echo "$output"
}
# Function to modify lines starting with `Usage: sentry` to say `Usage: ./sentry-admin.sh` instead.