Projects
openEuler:Mainline
subunit
_service:tar_scm:Drop-compatibility-wrappers-fo...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:tar_scm:Drop-compatibility-wrappers-for-memoryview-in-older.patch of Package subunit
From 2dac1001c1f1a48235af10c7ccf0b36e59602ded Mon Sep 17 00:00:00 2001 From: starlet-dx <15929766099@163.com> Date: Fri, 4 Aug 2023 17:55:15 +0800 Subject: [PATCH 1/1] Drop compatibility wrappers for memoryview in older Python versions Refer: https://github.com/testing-cabal/subunit/commit/f315464df26e4122758da26f40a76aa19a08ff60 --- python/subunit/tests/test_subunit_tags.py | 2 +- python/subunit/tests/test_test_protocol2.py | 4 +-- python/subunit/v2.py | 39 ++++----------------- 3 files changed, 10 insertions(+), 35 deletions(-) diff --git a/python/subunit/tests/test_subunit_tags.py b/python/subunit/tests/test_subunit_tags.py index a16edc1..e62b9fe 100644 --- a/python/subunit/tests/test_subunit_tags.py +++ b/python/subunit/tests/test_subunit_tags.py @@ -65,7 +65,7 @@ class TestSubUnitTags(testtools.TestCase): self.original.seek(0) self.assertEqual( 0, subunit.tag_stream(self.original, self.filtered, ["quux"])) - self.assertThat(reference, Contains(self.filtered.getvalue())) + self.assertThat(reference, Contains(bytes(self.filtered.getvalue()))) def test_remove_tag(self): reference = BytesIO() diff --git a/python/subunit/tests/test_test_protocol2.py b/python/subunit/tests/test_test_protocol2.py index f970ed6..aa640e3 100644 --- a/python/subunit/tests/test_test_protocol2.py +++ b/python/subunit/tests/test_test_protocol2.py @@ -312,10 +312,10 @@ class TestByteStreamToStreamResult(TestCase): source, non_subunit_name="stdout").run(result) self.assertEqual(b'', source.read()) self.assertEqual(events, result._events) - #- any file attachments should be byte contents [as users assume that]. + #- any file attachments should be bytes equivalent [as users assume that]. for event in result._events: if event[5] is not None: - self.assertIsInstance(event[6], bytes) + bytes(event[6]) def check_event(self, source_bytes, test_status=None, test_id="foo", route_code=None, timestamp=None, tags=None, mime_type=None, diff --git a/python/subunit/v2.py b/python/subunit/v2.py index c299cab..81be733 100644 --- a/python/subunit/v2.py +++ b/python/subunit/v2.py @@ -56,22 +56,6 @@ _nul_test_broken = {} _PY3 = (sys.version_info >= (3,)) -def has_nul(buffer_or_bytes): - """Return True if a null byte is present in buffer_or_bytes.""" - # Simple "if NUL_ELEMENT in utf8_bytes:" fails on Python 3.1 and 3.2 with - # memoryviews. See https://bugs.launchpad.net/subunit/+bug/1216246 - buffer_type = type(buffer_or_bytes) - broken = _nul_test_broken.get(buffer_type) - if broken is None: - reference = buffer_type(b'\0') - broken = not NUL_ELEMENT in reference - _nul_test_broken[buffer_type] = broken - if broken: - return b'\0' in buffer_or_bytes - else: - return NUL_ELEMENT in buffer_or_bytes - - def read_exactly(stream, size): """Read exactly size bytes from stream. @@ -391,35 +375,26 @@ class ByteStreamToStreamResult(object): file_bytes=(error.args[0]).encode('utf8'), mime_type="text/plain;charset=utf8") - def _to_bytes(self, data, pos, length): - """Return a slice of data from pos for length as bytes.""" - # memoryview in 2.7.3 and 3.2 isn't directly usable with struct :(. - # see https://bugs.launchpad.net/subunit/+bug/1216163 - result = data[pos:pos+length] - if type(result) is not bytes: - return result.tobytes() - return result - def _parse_varint(self, data, pos, max_3_bytes=False): # because the only incremental IO we do is at the start, and the 32 bit # CRC means we can always safely read enough to cover any varint, we # can be sure that there should be enough data - and if not it is an # error not a normal situation. - data_0 = struct.unpack(FMT_8, self._to_bytes(data, pos, 1))[0] + data_0 = struct.unpack(FMT_8, data[pos:pos+1])[0] typeenum = data_0 & 0xc0 value_0 = data_0 & 0x3f if typeenum == 0x00: return value_0, 1 elif typeenum == 0x40: - data_1 = struct.unpack(FMT_8, self._to_bytes(data, pos+1, 1))[0] + data_1 = struct.unpack(FMT_8, data[pos+1:pos+2])[0] return (value_0 << 8) | data_1, 2 elif typeenum == 0x80: - data_1 = struct.unpack(FMT_16, self._to_bytes(data, pos+1, 2))[0] + data_1 = struct.unpack(FMT_16, data[pos+1:pos+3])[0] return (value_0 << 16) | data_1, 3 else: if max_3_bytes: raise ParseError('3 byte maximum given but 4 byte value found.') - data_1, data_2 = struct.unpack(FMT_24, self._to_bytes(data, pos+1, 3)) + data_1, data_2 = struct.unpack(FMT_24, data[pos+1:pos+4]) result = (value_0 << 24) | data_1 << 8 | data_2 return result, 4 @@ -464,7 +439,7 @@ class ByteStreamToStreamResult(object): # One packet could have both file and status data; the Python API # presents these separately (perhaps it shouldn't?) if flags & FLAG_TIMESTAMP: - seconds = struct.unpack(FMT_32, self._to_bytes(body, pos, 4))[0] + seconds = struct.unpack(FMT_32, body[pos:pos+4])[0] nanoseconds, consumed = self._parse_varint(body, pos+4) pos = pos + 4 + consumed timestamp = EPOCH + datetime.timedelta( @@ -496,7 +471,7 @@ class ByteStreamToStreamResult(object): file_name, pos = self._read_utf8(body, pos) content_length, consumed = self._parse_varint(body, pos) pos += consumed - file_bytes = self._to_bytes(body, pos, content_length) + file_bytes = body[pos:pos+content_length] if len(file_bytes) != content_length: raise ParseError('File content extends past end of packet: ' 'claimed %d bytes, %d available' % ( @@ -531,7 +506,7 @@ class ByteStreamToStreamResult(object): 'UTF8 string at offset %d extends past end of packet: ' 'claimed %d bytes, %d available' % (pos - 2, length, len(utf8_bytes))) - if has_nul(utf8_bytes): + if NUL_ELEMENT in utf8_bytes: raise ParseError('UTF8 string at offset %d contains NUL byte' % ( pos-2,)) try: -- 2.30.0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2