#! /usr/bin/env python # # Protocol Buffers - Google's data interchange format # Copyright 2008 Google Inc. All rights reserved. # https://developers.google.com/protocol-buffers/ # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """Test for google.protobuf.internal.well_known_types.""" __author__ = 'jieluo@google.com (Jie Luo)' import collections from datetime import datetime try: import unittest2 as unittest #PY26 except ImportError: import unittest from google.protobuf import any_pb2 from google.protobuf import duration_pb2 from google.protobuf import field_mask_pb2 from google.protobuf import struct_pb2 from google.protobuf import timestamp_pb2 from google.protobuf import unittest_pb2 from google.protobuf.internal import any_test_pb2 from google.protobuf.internal import test_util from google.protobuf.internal import well_known_types from google.protobuf import descriptor from google.protobuf import text_format class TimeUtilTestBase(unittest.TestCase): def CheckTimestampConversion(self, message, text): self.assertEqual(text, message.ToJsonString()) parsed_message = timestamp_pb2.Timestamp() parsed_message.FromJsonString(text) self.assertEqual(message, parsed_message) def CheckDurationConversion(self, message, text): self.assertEqual(text, message.ToJsonString()) parsed_message = duration_pb2.Duration() parsed_message.FromJsonString(text) self.assertEqual(message, parsed_message) class TimeUtilTest(TimeUtilTestBase): def testTimestampSerializeAndParse(self): message = timestamp_pb2.Timestamp() # Generated output should contain 3, 6, or 9 fractional digits. message.seconds = 0 message.nanos = 0 self.CheckTimestampConversion(message, '1970-01-01T00:00:00Z') message.nanos = 10000000 self.CheckTimestampConversion(message, '1970-01-01T00:00:00.010Z') message.nanos = 10000 self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000010Z') message.nanos = 10 self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000000010Z') # Test min timestamps. message.seconds = -62135596800 message.nanos = 0 self.CheckTimestampConversion(message, '0001-01-01T00:00:00Z') # Test max timestamps. message.seconds = 253402300799 message.nanos = 999999999 self.CheckTimestampConversion(message, '9999-12-31T23:59:59.999999999Z') # Test negative timestamps. message.seconds = -1 self.CheckTimestampConversion(message, '1969-12-31T23:59:59.999999999Z') # Parsing accepts an fractional digits as long as they fit into nano # precision. message.FromJsonString('1970-01-01T00:00:00.1Z') self.assertEqual(0, message.seconds) self.assertEqual(100000000, message.nanos) # Parsing accepts offsets. message.FromJsonString('1970-01-01T00:00:00-08:00') self.assertEqual(8 * 3600, message.seconds) self.assertEqual(0, message.nanos) # It is not easy to check with current time. For test coverage only. message.GetCurrentTime() self.assertNotEqual(8 * 3600, message.seconds) def testDurationSerializeAndParse(self): message = duration_pb2.Duration() # Generated output should contain 3, 6, or 9 fractional digits. message.seconds = 0 message.nanos = 0 self.CheckDurationConversion(message, '0s') message.nanos = 10000000 self.CheckDurationConversion(message, '0.010s') message.nanos = 10000 self.CheckDurationConversion(message, '0.000010s') message.nanos = 10 self.CheckDurationConversion(message, '0.000000010s') # Test min and max message.seconds = 315576000000 message.nanos = 999999999 self.CheckDurationConversion(message, '315576000000.999999999s') message.seconds = -315576000000 message.nanos = -999999999 self.CheckDurationConversion(message, '-315576000000.999999999s') # Parsing accepts an fractional digits as long as they fit into nano # precision. message.FromJsonString('0.1s') self.assertEqual(100000000, message.nanos) message.FromJsonString('0.0000001s') self.assertEqual(100, message.nanos) def testTimestampIntegerConversion(self): message = timestamp_pb2.Timestamp() message.FromNanoseconds(1) self.assertEqual('1970-01-01T00:00:00.000000001Z', message.ToJsonString()) self.assertEqual(1, message.ToNanoseconds()) message.FromNanoseconds(-1) self.assertEqual('1969-12-31T23:59:59.999999999Z', message.ToJsonString()) self.assertEqual(-1, message.ToNanoseconds()) message.FromMicroseconds(1) self.assertEqual('1970-01-01T00:00:00.000001Z', message.ToJsonString()) self.assertEqual(1, message.ToMicroseconds()) message.FromMicroseconds(-1) self.assertEqual('1969-12-31T23:59:59.999999Z', message.ToJsonString()) self.assertEqual(-1, message.ToMicroseconds()) message.FromMilliseconds(1) self.assertEqual('1970-01-01T00:00:00.001Z', message.ToJsonString()) self.assertEqual(1, message.ToMilliseconds()) message.FromMilliseconds(-1) self.assertEqual('1969-12-31T23:59:59.999Z', message.ToJsonString()) self.assertEqual(-1, message.ToMilliseconds()) message.FromSeconds(1) self.assertEqual('1970-01-01T00:00:01Z', message.ToJsonString()) self.assertEqual(1, message.ToSeconds()) message.FromSeconds(-1) self.assertEqual('1969-12-31T23:59:59Z', message.ToJsonString()) self.assertEqual(-1, message.ToSeconds()) message.FromNanoseconds(1999) self.assertEqual(1, message.ToMicroseconds()) # For negative values, Timestamp will be rounded down. # For example, "1969-12-31T23:59:59.5Z" (i.e., -0.5s) rounded to seconds # will be "1969-12-31T23:59:59Z" (i.e., -1s) rather than # "1970-01-01T00:00:00Z" (i.e., 0s). message.FromNanoseconds(-1999) self.assertEqual(-2, message.ToMicroseconds()) def testDurationIntegerConversion(self): message = duration_pb2.Duration() message.FromNanoseconds(1) self.assertEqual('0.000000001s', message.ToJsonString()) self.assertEqual(1, message.ToNanoseconds()) message.FromNanoseconds(-1) self.assertEqual('-0.000000001s', message.ToJsonString()) self.assertEqual(-1, message.ToNanoseconds()) message.FromMicroseconds(1) self.assertEqual('0.000001s', message.ToJsonString()) self.assertEqual(1, message.ToMicroseconds()) message.FromMicroseconds(-1) self.assertEqual('-0.000001s', message.ToJsonString()) self.assertEqual(-1, message.ToMicroseconds()) message.FromMilliseconds(1) self.assertEqual('0.001s', message.ToJsonString()) self.assertEqual(1, message.ToMilliseconds()) message.FromMilliseconds(-1) self.assertEqual('-0.001s', message.ToJsonString()) self.assertEqual(-1, message.ToMilliseconds()) message.FromSeconds(1) self.assertEqual('1s', message.ToJsonString()) self.assertEqual(1, message.ToSeconds()) message.FromSeconds(-1) self.assertEqual('-1s', message.ToJsonString()) self.assertEqual(-1, message.ToSeconds()) # Test truncation behavior. message.FromNanoseconds(1999) self.assertEqual(1, message.ToMicroseconds()) # For negative values, Duration will be rounded towards 0. message.FromNanoseconds(-1999) self.assertEqual(-1, message.ToMicroseconds()) def testDatetimeConverison(self): message = timestamp_pb2.Timestamp() dt = datetime(1970, 1, 1) message.FromDatetime(dt) self.assertEqual(dt, message.ToDatetime()) message.FromMilliseconds(1999) self.assertEqual(datetime(1970, 1, 1, 0, 0, 1, 999000), message.ToDatetime()) def testTimedeltaConversion(self): message = duration_pb2.Duration() message.FromNanoseconds(1999999999) td = message.ToTimedelta() self.assertEqual(1, td.seconds) self.assertEqual(999999, td.microseconds) message.FromNanoseconds(-1999999999) td = message.ToTimedelta() self.assertEqual(-1, td.days) self.assertEqual(86398, td.seconds) self.assertEqual(1, td.microseconds) message.FromMicroseconds(-1) td = message.ToTimedelta() self.assertEqual(-1, td.days) self.assertEqual(86399, td.seconds) self.assertEqual(999999, td.microseconds) converted_message = duration_pb2.Duration() converted_message.FromTimedelta(td) self.assertEqual(message, converted_message) def testInvalidTimestamp(self): message = timestamp_pb2.Timestamp() self.assertRaisesRegexp( well_known_types.ParseError, 'Failed to parse timestamp: missing valid timezone offset.', message.FromJsonString, '') self.assertRaisesRegexp( well_known_types.ParseError, 'Failed to parse timestamp: invalid trailing data ' '1970-01-01T00:00:01Ztrail.', message.FromJsonString, '1970-01-01T00:00:01Ztrail') self.assertRaisesRegexp( ValueError, 'time data \'10000-01-01T00:00:00\' does not match' ' format \'%Y-%m-%dT%H:%M:%S\'', message.FromJsonString, '10000-01-01T00:00:00.00Z') self.assertRaisesRegexp( well_known_types.ParseError, 'nanos 0123456789012 more than 9 fractional digits.', message.FromJsonString, '1970-01-01T00:00:00.0123456789012Z') self.assertRaisesRegexp( well_known_types.ParseError, (r'Invalid timezone offset value: \+08.'), message.FromJsonString, '1972-01-01T01:00:00.01+08',) self.assertRaisesRegexp( ValueError, 'year (0 )?is out of range', message.FromJsonString, '0000-01-01T00:00:00Z') message.seconds = 253402300800 self.assertRaisesRegexp( OverflowError, 'date value out of range', message.ToJsonString) def testInvalidDuration(self): message = duration_pb2.Duration() self.assertRaisesRegexp( well_known_types.ParseError, 'Duration must end with letter "s": 1.', message.FromJsonString, '1') self.assertRaisesRegexp( well_known_types.ParseError, 'Couldn\'t parse duration: 1...2s.', message.FromJsonString, '1...2s') text = '-315576000001.000000000s' self.assertRaisesRegexp( well_known_types.Error, r'Duration is not valid\: Seconds -315576000001 must be in range' r' \[-315576000000\, 315576000000\].', message.FromJsonString, text) text = '315576000001.000000000s' self.assertRaisesRegexp( well_known_types.Error, r'Duration is not valid\: Seconds 315576000001 must be in range' r' \[-315576000000\, 315576000000\].', message.FromJsonString, text) message.seconds = -315576000001 message.nanos = 0 self.assertRaisesRegexp( well_known_types.Error, r'Duration is not valid\: Seconds -315576000001 must be in range' r' \[-315576000000\, 315576000000\].', message.ToJsonString) message.seconds = 0 message.nanos = 999999999 + 1 self.assertRaisesRegexp( well_known_types.Error, r'Duration is not valid\: Nanos 1000000000 must be in range' r' \[-999999999\, 999999999\].', message.ToJsonString) message.seconds = -1 message.nanos = 1 self.assertRaisesRegexp( well_known_types.Error, r'Duration is not valid\: Sign mismatch.', message.ToJsonString) class FieldMaskTest(unittest.TestCase): def testStringFormat(self): mask = field_mask_pb2.FieldMask() self.assertEqual('', mask.ToJsonString()) mask.paths.append('foo') self.assertEqual('foo', mask.ToJsonString()) mask.paths.append('bar') self.assertEqual('foo,bar', mask.ToJsonString()) mask.FromJsonString('') self.assertEqual('', mask.ToJsonString()) mask.FromJsonString('foo') self.assertEqual(['foo'], mask.paths) mask.FromJsonString('foo,bar') self.assertEqual(['foo', 'bar'], mask.paths) # Test camel case mask.Clear() mask.paths.append('foo_bar') self.assertEqual('fooBar', mask.ToJsonString()) mask.paths.append('bar_quz') self.assertEqual('fooBar,barQuz', mask.ToJsonString()) mask.FromJsonString('') self.assertEqual('', mask.ToJsonString()) mask.FromJsonString('fooBar') self.assertEqual(['foo_bar'], mask.paths) mask.FromJsonString('fooBar,barQuz') self.assertEqual(['foo_bar', 'bar_quz'], mask.paths) def testDescriptorToFieldMask(self): mask = field_mask_pb2.FieldMask() msg_descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR mask.AllFieldsFromDescriptor(msg_descriptor) self.assertEqual(75, len(mask.paths)) self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) for field in msg_descriptor.fields: self.assertTrue(field.name in mask.paths) def testIsValidForDescriptor(self): msg_descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR # Empty mask mask = field_mask_pb2.FieldMask() self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) # All fields from descriptor mask.AllFieldsFromDescriptor(msg_descriptor) self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) # Child under optional message mask.paths.append('optional_nested_message.bb') self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) # Repeated field is only allowed in the last position of path mask.paths.append('repeated_nested_message.bb') self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) # Invalid top level field mask = field_mask_pb2.FieldMask() mask.paths.append('xxx') self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) # Invalid field in root mask = field_mask_pb2.FieldMask() mask.paths.append('xxx.zzz') self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) # Invalid field in internal node mask = field_mask_pb2.FieldMask() mask.paths.append('optional_nested_message.xxx.zzz') self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) # Invalid field in leaf mask = field_mask_pb2.FieldMask() mask.paths.append('optional_nested_message.xxx') self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) def testCanonicalFrom(self): mask = field_mask_pb2.FieldMask() out_mask = field_mask_pb2.FieldMask() # Paths will be sorted. mask.FromJsonString('baz.quz,bar,foo') out_mask.CanonicalFormFromMask(mask) self.assertEqual('bar,baz.quz,foo', out_mask.ToJsonString()) # Duplicated paths will be removed. mask.FromJsonString('foo,bar,foo') out_mask.CanonicalFormFromMask(mask) self.assertEqual('bar,foo', out_mask.ToJsonString()) # Sub-paths of other paths will be removed. mask.FromJsonString('foo.b1,bar.b1,foo.b2,bar') out_mask.CanonicalFormFromMask(mask) self.assertEqual('bar,foo.b1,foo.b2', out_mask.ToJsonString()) # Test more deeply nested cases. mask.FromJsonString( 'foo.bar.baz1,foo.bar.baz2.quz,foo.bar.baz2') out_mask.CanonicalFormFromMask(mask) self.assertEqual('foo.bar.baz1,foo.bar.baz2', out_mask.ToJsonString()) mask.FromJsonString( 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz') out_mask.CanonicalFormFromMask(mask) self.assertEqual('foo.bar.baz1,foo.bar.baz2', out_mask.ToJsonString()) mask.FromJsonString( 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz,foo.bar') out_mask.CanonicalFormFromMask(mask) self.assertEqual('foo.bar', out_mask.ToJsonString()) mask.FromJsonString( 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz,foo') out_mask.CanonicalFormFromMask(mask) self.assertEqual('foo', out_mask.ToJsonString()) def testUnion(self): mask1 = field_mask_pb2.FieldMask() mask2 = field_mask_pb2.FieldMask() out_mask = field_mask_pb2.FieldMask() mask1.FromJsonString('foo,baz') mask2.FromJsonString('bar,quz') out_mask.Union(mask1, mask2) self.assertEqual('bar,baz,foo,quz', out_mask.ToJsonString()) # Overlap with duplicated paths. mask1.FromJsonString('foo,baz.bb') mask2.FromJsonString('baz.bb,quz') out_mask.Union(mask1, mask2) self.assertEqual('baz.bb,foo,quz', out_mask.ToJsonString()) # Overlap with paths covering some other paths. mask1.FromJsonString('foo.bar.baz,quz') mask2.FromJsonString('foo.bar,bar') out_mask.Union(mask1, mask2) self.assertEqual('bar,foo.bar,quz', out_mask.ToJsonString()) src = unittest_pb2.TestAllTypes() with self.assertRaises(ValueError): out_mask.Union(src, mask2) def testIntersect(self): mask1 = field_mask_pb2.FieldMask() mask2 = field_mask_pb2.FieldMask() out_mask = field_mask_pb2.FieldMask() # Test cases without overlapping. mask1.FromJsonString('foo,baz') mask2.FromJsonString('bar,quz') out_mask.Intersect(mask1, mask2) self.assertEqual('', out_mask.ToJsonString()) # Overlap with duplicated paths. mask1.FromJsonString('foo,baz.bb') mask2.FromJsonString('baz.bb,quz') out_mask.Intersect(mask1, mask2) self.assertEqual('baz.bb', out_mask.ToJsonString()) # Overlap with paths covering some other paths. mask1.FromJsonString('foo.bar.baz,quz') mask2.FromJsonString('foo.bar,bar') out_mask.Intersect(mask1, mask2) self.assertEqual('foo.bar.baz', out_mask.ToJsonString()) mask1.FromJsonString('foo.bar,bar') mask2.FromJsonString('foo.bar.baz,quz') out_mask.Intersect(mask1, mask2) self.assertEqual('foo.bar.baz', out_mask.ToJsonString()) def testMergeMessage(self): # Test merge one field. src = unittest_pb2.TestAllTypes() test_util.SetAllFields(src) for field in src.DESCRIPTOR.fields: if field.containing_oneof: continue field_name = field.name dst = unittest_pb2.TestAllTypes() # Only set one path to mask. mask = field_mask_pb2.FieldMask() mask.paths.append(field_name) mask.MergeMessage(src, dst) # The expected result message. msg = unittest_pb2.TestAllTypes() if field.label == descriptor.FieldDescriptor.LABEL_REPEATED: repeated_src = getattr(src, field_name) repeated_msg = getattr(msg, field_name) if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: for item in repeated_src: repeated_msg.add().CopyFrom(item) else: repeated_msg.extend(repeated_src) elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: getattr(msg, field_name).CopyFrom(getattr(src, field_name)) else: setattr(msg, field_name, getattr(src, field_name)) # Only field specified in mask is merged. self.assertEqual(msg, dst) # Test merge nested fields. nested_src = unittest_pb2.NestedTestAllTypes() nested_dst = unittest_pb2.NestedTestAllTypes() nested_src.child.payload.optional_int32 = 1234 nested_src.child.child.payload.optional_int32 = 5678 mask = field_mask_pb2.FieldMask() mask.FromJsonString('child.payload') mask.MergeMessage(nested_src, nested_dst) self.assertEqual(1234, nested_dst.child.payload.optional_int32) self.assertEqual(0, nested_dst.child.child.payload.optional_int32) mask.FromJsonString('child.child.payload') mask.MergeMessage(nested_src, nested_dst) self.assertEqual(1234, nested_dst.child.payload.optional_int32) self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) nested_dst.Clear() mask.FromJsonString('child.child.payload') mask.MergeMessage(nested_src, nested_dst) self.assertEqual(0, nested_dst.child.payload.optional_int32) self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) nested_dst.Clear() mask.FromJsonString('child') mask.MergeMessage(nested_src, nested_dst) self.assertEqual(1234, nested_dst.child.payload.optional_int32) self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) # Test MergeOptions. nested_dst.Clear() nested_dst.child.payload.optional_int64 = 4321 # Message fields will be merged by default. mask.FromJsonString('child.payload') mask.MergeMessage(nested_src, nested_dst) self.assertEqual(1234, nested_dst.child.payload.optional_int32) self.assertEqual(4321, nested_dst.child.payload.optional_int64) # Change the behavior to replace message fields. mask.FromJsonString('child.payload') mask.MergeMessage(nested_src, nested_dst, True, False) self.assertEqual(1234, nested_dst.child.payload.optional_int32) self.assertEqual(0, nested_dst.child.payload.optional_int64) # By default, fields missing in source are not cleared in destination. nested_dst.payload.optional_int32 = 1234 self.assertTrue(nested_dst.HasField('payload')) mask.FromJsonString('payload') mask.MergeMessage(nested_src, nested_dst) self.assertTrue(nested_dst.HasField('payload')) # But they are cleared when replacing message fields. nested_dst.Clear() nested_dst.payload.optional_int32 = 1234 mask.FromJsonString('payload') mask.MergeMessage(nested_src, nested_dst, True, False) self.assertFalse(nested_dst.HasField('payload')) nested_src.payload.repeated_int32.append(1234) nested_dst.payload.repeated_int32.append(5678) # Repeated fields will be appended by default. mask.FromJsonString('payload.repeatedInt32') mask.MergeMessage(nested_src, nested_dst) self.assertEqual(2, len(nested_dst.payload.repeated_int32)) self.assertEqual(5678, nested_dst.payload.repeated_int32[0]) self.assertEqual(1234, nested_dst.payload.repeated_int32[1]) # Change the behavior to replace repeated fields. mask.FromJsonString('payload.repeatedInt32') mask.MergeMessage(nested_src, nested_dst, False, True) self.assertEqual(1, len(nested_dst.payload.repeated_int32)) self.assertEqual(1234, nested_dst.payload.repeated_int32[0]) # Test Merge oneof field. new_msg = unittest_pb2.TestOneof2() dst = unittest_pb2.TestOneof2() dst.foo_message.qux_int = 1 mask = field_mask_pb2.FieldMask() mask.FromJsonString('fooMessage,fooLazyMessage.quxInt') mask.MergeMessage(new_msg, dst) self.assertTrue(dst.HasField('foo_message')) self.assertFalse(dst.HasField('foo_lazy_message')) def testMergeErrors(self): src = unittest_pb2.TestAllTypes() dst = unittest_pb2.TestAllTypes() mask = field_mask_pb2.FieldMask() test_util.SetAllFields(src) mask.FromJsonString('optionalInt32.field') with self.assertRaises(ValueError) as e: mask.MergeMessage(src, dst) self.assertEqual('Error: Field optional_int32 in message ' 'protobuf_unittest.TestAllTypes is not a singular ' 'message field and cannot have sub-fields.', str(e.exception)) def testSnakeCaseToCamelCase(self): self.assertEqual('fooBar', well_known_types._SnakeCaseToCamelCase('foo_bar')) self.assertEqual('FooBar', well_known_types._SnakeCaseToCamelCase('_foo_bar')) self.assertEqual('foo3Bar', well_known_types._SnakeCaseToCamelCase('foo3_bar')) # No uppercase letter is allowed. self.assertRaisesRegexp( well_known_types.Error, 'Fail to print FieldMask to Json string: Path name Foo must ' 'not contain uppercase letters.', well_known_types._SnakeCaseToCamelCase, 'Foo') # Any character after a "_" must be a lowercase letter. # 1. "_" cannot be followed by another "_". # 2. "_" cannot be followed by a digit. # 3. "_" cannot appear as the last character. self.assertRaisesRegexp( well_known_types.Error, 'Fail to print FieldMask to Json string: The character after a ' '"_" must be a lowercase letter in path name foo__bar.', well_known_types._SnakeCaseToCamelCase, 'foo__bar') self.assertRaisesRegexp( well_known_types.Error, 'Fail to print FieldMask to Json string: The character after a ' '"_" must be a lowercase letter in path name foo_3bar.', well_known_types._SnakeCaseToCamelCase, 'foo_3bar') self.assertRaisesRegexp( well_known_types.Error, 'Fail to print FieldMask to Json string: Trailing "_" in path ' 'name foo_bar_.', well_known_types._SnakeCaseToCamelCase, 'foo_bar_') def testCamelCaseToSnakeCase(self): self.assertEqual('foo_bar', well_known_types._CamelCaseToSnakeCase('fooBar')) self.assertEqual('_foo_bar', well_known_types._CamelCaseToSnakeCase('FooBar')) self.assertEqual('foo3_bar', well_known_types._CamelCaseToSnakeCase('foo3Bar')) self.assertRaisesRegexp( well_known_types.ParseError, 'Fail to parse FieldMask: Path name foo_bar must not contain "_"s.', well_known_types._CamelCaseToSnakeCase, 'foo_bar') class StructTest(unittest.TestCase): def testStruct(self): struct = struct_pb2.Struct() self.assertIsInstance(struct, collections.Mapping) self.assertEqual(0, len(struct)) struct_class = struct.__class__ struct['key1'] = 5 struct['key2'] = 'abc' struct['key3'] = True struct.get_or_create_struct('key4')['subkey'] = 11.0 struct_list = struct.get_or_create_list('key5') self.assertIsInstance(struct_list, collections.Sequence) struct_list.extend([6, 'seven', True, False, None]) struct_list.add_struct()['subkey2'] = 9 struct['key6'] = {'subkey': {}} struct['key7'] = [2, False] self.assertEqual(7, len(struct)) self.assertTrue(isinstance(struct, well_known_types.Struct)) self.assertEqual(5, struct['key1']) self.assertEqual('abc', struct['key2']) self.assertIs(True, struct['key3']) self.assertEqual(11, struct['key4']['subkey']) inner_struct = struct_class() inner_struct['subkey2'] = 9 self.assertEqual([6, 'seven', True, False, None, inner_struct], list(struct['key5'].items())) self.assertEqual({}, dict(struct['key6']['subkey'].fields)) self.assertEqual([2, False], list(struct['key7'].items())) serialized = struct.SerializeToString() struct2 = struct_pb2.Struct() struct2.ParseFromString(serialized) self.assertEqual(struct, struct2) for key, value in struct.items(): self.assertIn(key, struct) self.assertIn(key, struct2) self.assertEqual(value, struct2[key]) self.assertEqual(7, len(struct.keys())) self.assertEqual(7, len(struct.values())) for key in struct.keys(): self.assertIn(key, struct) self.assertIn(key, struct2) self.assertEqual(struct[key], struct2[key]) item = (next(iter(struct.keys())), next(iter(struct.values()))) self.assertEqual(item, next(iter(struct.items()))) self.assertTrue(isinstance(struct2, well_known_types.Struct)) self.assertEqual(5, struct2['key1']) self.assertEqual('abc', struct2['key2']) self.assertIs(True, struct2['key3']) self.assertEqual(11, struct2['key4']['subkey']) self.assertEqual([6, 'seven', True, False, None, inner_struct], list(struct2['key5'].items())) struct_list = struct2['key5'] self.assertEqual(6, struct_list[0]) self.assertEqual('seven', struct_list[1]) self.assertEqual(True, struct_list[2]) self.assertEqual(False, struct_list[3]) self.assertEqual(None, struct_list[4]) self.assertEqual(inner_struct, struct_list[5]) struct_list[1] = 7 self.assertEqual(7, struct_list[1]) struct_list.add_list().extend([1, 'two', True, False, None]) self.assertEqual([1, 'two', True, False, None], list(struct_list[6].items())) struct_list.extend([{'nested_struct': 30}, ['nested_list', 99], {}, []]) self.assertEqual(11, len(struct_list.values)) self.assertEqual(30, struct_list[7]['nested_struct']) self.assertEqual('nested_list', struct_list[8][0]) self.assertEqual(99, struct_list[8][1]) self.assertEqual({}, dict(struct_list[9].fields)) self.assertEqual([], list(struct_list[10].items())) struct_list[0] = {'replace': 'set'} struct_list[1] = ['replace', 'set'] self.assertEqual('set', struct_list[0]['replace']) self.assertEqual(['replace', 'set'], list(struct_list[1].items())) text_serialized = str(struct) struct3 = struct_pb2.Struct() text_format.Merge(text_serialized, struct3) self.assertEqual(struct, struct3) struct.get_or_create_struct('key3')['replace'] = 12 self.assertEqual(12, struct['key3']['replace']) # Tests empty list. struct.get_or_create_list('empty_list') empty_list = struct['empty_list'] self.assertEqual([], list(empty_list.items())) list2 = struct_pb2.ListValue() list2.add_list() empty_list = list2[0] self.assertEqual([], list(empty_list.items())) # Tests empty struct. struct.get_or_create_struct('empty_struct') empty_struct = struct['empty_struct'] self.assertEqual({}, dict(empty_struct.fields)) list2.add_struct() empty_struct = list2[1] self.assertEqual({}, dict(empty_struct.fields)) self.assertEqual(9, len(struct)) del struct['key3'] del struct['key4'] self.assertEqual(7, len(struct)) self.assertEqual(6, len(struct['key5'])) del struct['key5'][1] self.assertEqual(5, len(struct['key5'])) self.assertEqual([6, True, False, None, inner_struct], list(struct['key5'].items())) def testMergeFrom(self): struct = struct_pb2.Struct() struct_class = struct.__class__ dictionary = { 'key1': 5, 'key2': 'abc', 'key3': True, 'key4': {'subkey': 11.0}, 'key5': [6, 'seven', True, False, None, {'subkey2': 9}], 'key6': [['nested_list', True]], 'empty_struct': {}, 'empty_list': [] } struct.update(dictionary) self.assertEqual(5, struct['key1']) self.assertEqual('abc', struct['key2']) self.assertIs(True, struct['key3']) self.assertEqual(11, struct['key4']['subkey']) inner_struct = struct_class() inner_struct['subkey2'] = 9 self.assertEqual([6, 'seven', True, False, None, inner_struct], list(struct['key5'].items())) self.assertEqual(2, len(struct['key6'][0].values)) self.assertEqual('nested_list', struct['key6'][0][0]) self.assertEqual(True, struct['key6'][0][1]) empty_list = struct['empty_list'] self.assertEqual([], list(empty_list.items())) empty_struct = struct['empty_struct'] self.assertEqual({}, dict(empty_struct.fields)) # According to documentation: "When parsing from the wire or when merging, # if there are duplicate map keys the last key seen is used". duplicate = { 'key4': {'replace': 20}, 'key5': [[False, 5]] } struct.update(duplicate) self.assertEqual(1, len(struct['key4'].fields)) self.assertEqual(20, struct['key4']['replace']) self.assertEqual(1, len(struct['key5'].values)) self.assertEqual(False, struct['key5'][0][0]) self.assertEqual(5, struct['key5'][0][1]) class AnyTest(unittest.TestCase): def testAnyMessage(self): # Creates and sets message. msg = any_test_pb2.TestAny() msg_descriptor = msg.DESCRIPTOR all_types = unittest_pb2.TestAllTypes() all_descriptor = all_types.DESCRIPTOR all_types.repeated_string.append(u'\u00fc\ua71f') # Packs to Any. msg.value.Pack(all_types) self.assertEqual(msg.value.type_url, 'type.googleapis.com/%s' % all_descriptor.full_name) self.assertEqual(msg.value.value, all_types.SerializeToString()) # Tests Is() method. self.assertTrue(msg.value.Is(all_descriptor)) self.assertFalse(msg.value.Is(msg_descriptor)) # Unpacks Any. unpacked_message = unittest_pb2.TestAllTypes() self.assertTrue(msg.value.Unpack(unpacked_message)) self.assertEqual(all_types, unpacked_message) # Unpacks to different type. self.assertFalse(msg.value.Unpack(msg)) # Only Any messages have Pack method. try: msg.Pack(all_types) except AttributeError: pass else: raise AttributeError('%s should not have Pack method.' % msg_descriptor.full_name) def testMessageName(self): # Creates and sets message. submessage = any_test_pb2.TestAny() submessage.int_value = 12345 msg = any_pb2.Any() msg.Pack(submessage) self.assertEqual(msg.TypeName(), 'google.protobuf.internal.TestAny') def testPackWithCustomTypeUrl(self): submessage = any_test_pb2.TestAny() submessage.int_value = 12345 msg = any_pb2.Any() # Pack with a custom type URL prefix. msg.Pack(submessage, 'type.myservice.com') self.assertEqual(msg.type_url, 'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name) # Pack with a custom type URL prefix ending with '/'. msg.Pack(submessage, 'type.myservice.com/') self.assertEqual(msg.type_url, 'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name) # Pack with an empty type URL prefix. msg.Pack(submessage, '') self.assertEqual(msg.type_url, '/%s' % submessage.DESCRIPTOR.full_name) # Test unpacking the type. unpacked_message = any_test_pb2.TestAny() self.assertTrue(msg.Unpack(unpacked_message)) self.assertEqual(submessage, unpacked_message) def testPackDeterministic(self): submessage = any_test_pb2.TestAny() for i in range(10): submessage.map_value[str(i)] = i * 2 msg = any_pb2.Any() msg.Pack(submessage, deterministic=True) serialized = msg.SerializeToString(deterministic=True) golden = (b'\n4type.googleapis.com/google.protobuf.internal.TestAny\x12F' b'\x1a\x05\n\x010\x10\x00\x1a\x05\n\x011\x10\x02\x1a\x05\n\x01' b'2\x10\x04\x1a\x05\n\x013\x10\x06\x1a\x05\n\x014\x10\x08\x1a' b'\x05\n\x015\x10\n\x1a\x05\n\x016\x10\x0c\x1a\x05\n\x017\x10' b'\x0e\x1a\x05\n\x018\x10\x10\x1a\x05\n\x019\x10\x12') self.assertEqual(golden, serialized) if __name__ == '__main__': unittest.main()