diff options
Diffstat (limited to 'python/google/protobuf/internal/decoder_test.py')
-rwxr-xr-x | python/google/protobuf/internal/decoder_test.py | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/python/google/protobuf/internal/decoder_test.py b/python/google/protobuf/internal/decoder_test.py new file mode 100755 index 00000000..e36a96fc --- /dev/null +++ b/python/google/protobuf/internal/decoder_test.py @@ -0,0 +1,230 @@ +# Protocol Buffers - Google's data interchange format +# Copyright 2008 Google Inc. +# http://code.google.com/p/protobuf/ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test for google.protobuf.internal.decoder.""" + +__author__ = 'robinson@google.com (Will Robinson)' + +import struct +import unittest +from google.protobuf.internal import wire_format +from google.protobuf.internal import encoder +from google.protobuf.internal import decoder +import logging +import mox +from google.protobuf.internal import input_stream +from google.protobuf import message + + +class DecoderTest(unittest.TestCase): + + def setUp(self): + self.mox = mox.Mox() + self.mock_stream = self.mox.CreateMock(input_stream.InputStream) + self.mock_message = self.mox.CreateMock(message.Message) + + def testReadFieldNumberAndWireType(self): + # Test field numbers that will require various varint sizes. + for expected_field_number in (1, 15, 16, 2047, 2048): + for expected_wire_type in range(6): # Highest-numbered wiretype is 5. + e = encoder.Encoder() + e._AppendTag(expected_field_number, expected_wire_type) + s = e.ToString() + d = decoder.Decoder(s) + field_number, wire_type = d.ReadFieldNumberAndWireType() + self.assertEqual(expected_field_number, field_number) + self.assertEqual(expected_wire_type, wire_type) + + def ReadScalarTestHelper(self, test_name, decoder_method, expected_result, + expected_stream_method_name, + stream_method_return, *args): + """Helper for testReadScalars below. + + Calls one of the Decoder.Read*() methods and ensures that the results are + as expected. + + Args: + test_name: Name of this test, used for logging only. + decoder_method: Unbound decoder.Decoder method to call. + expected_result: Value we expect returned from decoder_method(). + expected_stream_method_name: (string) Name of the InputStream + method we expect Decoder to call to actually read the value + on the wire. + stream_method_return: Value our mocked-out stream method should + return to the decoder. + args: Additional arguments that we expect to be passed to the + stream method. + """ + logging.info('Testing %s scalar input.\n' + 'Calling %r(), and expecting that to call the ' + 'stream method %s(%r), which will return %r. Finally, ' + 'expecting the Decoder method to return %r'% ( + test_name, decoder_method, + expected_stream_method_name, args, stream_method_return, + expected_result)) + + d = decoder.Decoder('') + d._stream = self.mock_stream + if decoder_method in (decoder.Decoder.ReadString, + decoder.Decoder.ReadBytes): + self.mock_stream.ReadVarUInt32().AndReturn(len(stream_method_return)) + # We have to use names instead of methods to work around some + # mox weirdness. (ResetAll() is overzealous). + expected_stream_method = getattr(self.mock_stream, + expected_stream_method_name) + expected_stream_method(*args).AndReturn(stream_method_return) + + self.mox.ReplayAll() + self.assertEqual(expected_result, decoder_method(d)) + self.mox.VerifyAll() + self.mox.ResetAll() + + def testReadScalars(self): + test_string = 'I can feel myself getting sutpider.' + scalar_tests = [ + ['int32', decoder.Decoder.ReadInt32, 0, 'ReadVarint32', 0], + ['int64', decoder.Decoder.ReadInt64, 0, 'ReadVarint64', 0], + ['uint32', decoder.Decoder.ReadUInt32, 0, 'ReadVarUInt32', 0], + ['uint64', decoder.Decoder.ReadUInt64, 0, 'ReadVarUInt64', 0], + ['fixed32', decoder.Decoder.ReadFixed32, 0xffffffff, + 'ReadLittleEndian32', 0xffffffff], + ['fixed64', decoder.Decoder.ReadFixed64, 0xffffffffffffffff, + 'ReadLittleEndian64', 0xffffffffffffffff], + ['sfixed32', decoder.Decoder.ReadSFixed32, -1, + 'ReadLittleEndian32', 0xffffffff], + ['sfixed64', decoder.Decoder.ReadSFixed64, -1, + 'ReadLittleEndian64', 0xffffffffffffffff], + ['float', decoder.Decoder.ReadFloat, 0.0, + 'ReadString', struct.pack('f', 0.0), 4], + ['double', decoder.Decoder.ReadDouble, 0.0, + 'ReadString', struct.pack('d', 0.0), 8], + ['bool', decoder.Decoder.ReadBool, True, 'ReadVarUInt32', 1], + ['enum', decoder.Decoder.ReadEnum, 23, 'ReadVarUInt32', 23], + ['string', decoder.Decoder.ReadString, + test_string, 'ReadString', test_string, len(test_string)], + ['bytes', decoder.Decoder.ReadBytes, + test_string, 'ReadString', test_string, len(test_string)], + # We test zigzag decoding routines more extensively below. + ['sint32', decoder.Decoder.ReadSInt32, -1, 'ReadVarUInt32', 1], + ['sint64', decoder.Decoder.ReadSInt64, -1, 'ReadVarUInt64', 1], + ] + # Ensure that we're testing different Decoder methods and using + # different test names in all test cases above. + self.assertEqual(len(scalar_tests), len(set(t[0] for t in scalar_tests))) + self.assertEqual(len(scalar_tests), len(set(t[1] for t in scalar_tests))) + for args in scalar_tests: + self.ReadScalarTestHelper(*args) + + def testReadMessageInto(self): + length = 23 + def Test(simulate_error): + d = decoder.Decoder('') + d._stream = self.mock_stream + self.mock_stream.ReadVarUInt32().AndReturn(length) + sub_buffer = object() + self.mock_stream.GetSubBuffer(length).AndReturn(sub_buffer) + + if simulate_error: + self.mock_message.MergeFromString(sub_buffer).AndReturn(length - 1) + self.mox.ReplayAll() + self.assertRaises( + message.DecodeError, d.ReadMessageInto, self.mock_message) + else: + self.mock_message.MergeFromString(sub_buffer).AndReturn(length) + self.mock_stream.SkipBytes(length) + self.mox.ReplayAll() + d.ReadMessageInto(self.mock_message) + + self.mox.VerifyAll() + self.mox.ResetAll() + + Test(simulate_error=False) + Test(simulate_error=True) + + def testReadGroupInto_Success(self): + # Test both the empty and nonempty cases. + for num_bytes in (5, 0): + field_number = expected_field_number = 10 + d = decoder.Decoder('') + d._stream = self.mock_stream + sub_buffer = object() + self.mock_stream.GetSubBuffer().AndReturn(sub_buffer) + self.mock_message.MergeFromString(sub_buffer).AndReturn(num_bytes) + self.mock_stream.SkipBytes(num_bytes) + self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag( + field_number, wire_format.WIRETYPE_END_GROUP)) + self.mox.ReplayAll() + d.ReadGroupInto(expected_field_number, self.mock_message) + self.mox.VerifyAll() + self.mox.ResetAll() + + def ReadGroupInto_FailureTestHelper(self, bytes_read): + d = decoder.Decoder('') + d._stream = self.mock_stream + sub_buffer = object() + self.mock_stream.GetSubBuffer().AndReturn(sub_buffer) + self.mock_message.MergeFromString(sub_buffer).AndReturn(bytes_read) + return d + + def testReadGroupInto_NegativeBytesReported(self): + expected_field_number = 10 + d = self.ReadGroupInto_FailureTestHelper(bytes_read=-1) + self.mox.ReplayAll() + self.assertRaises(message.DecodeError, + d.ReadGroupInto, expected_field_number, + self.mock_message) + self.mox.VerifyAll() + + def testReadGroupInto_NoEndGroupTag(self): + field_number = expected_field_number = 10 + num_bytes = 5 + d = self.ReadGroupInto_FailureTestHelper(bytes_read=num_bytes) + self.mock_stream.SkipBytes(num_bytes) + # Right field number, wrong wire type. + self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag( + field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)) + self.mox.ReplayAll() + self.assertRaises(message.DecodeError, + d.ReadGroupInto, expected_field_number, + self.mock_message) + self.mox.VerifyAll() + + def testReadGroupInto_WrongFieldNumberInEndGroupTag(self): + expected_field_number = 10 + field_number = expected_field_number + 1 + num_bytes = 5 + d = self.ReadGroupInto_FailureTestHelper(bytes_read=num_bytes) + self.mock_stream.SkipBytes(num_bytes) + # Wrong field number, right wire type. + self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag( + field_number, wire_format.WIRETYPE_END_GROUP)) + self.mox.ReplayAll() + self.assertRaises(message.DecodeError, + d.ReadGroupInto, expected_field_number, + self.mock_message) + self.mox.VerifyAll() + + def testSkipBytes(self): + d = decoder.Decoder('') + num_bytes = 1024 + self.mock_stream.SkipBytes(num_bytes) + d._stream = self.mock_stream + self.mox.ReplayAll() + d.SkipBytes(num_bytes) + self.mox.VerifyAll() + +if __name__ == '__main__': + unittest.main() |