Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.143.237.203
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /lib/python2.7/site-packages/google/protobuf/internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python2.7/site-packages/google/protobuf/internal/descriptor_pool_test.py
#
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc.  All rights reserved.
# https://developers.google.com/protocol-buffers/
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Tests for google.protobuf.descriptor_pool."""

__author__ = 'matthewtoia@google.com (Matt Toia)'

import os
import sys

try:
  import unittest2 as unittest  #PY26
except ImportError:
  import unittest

from google.protobuf import unittest_import_pb2
from google.protobuf import unittest_import_public_pb2
from google.protobuf import unittest_pb2
from google.protobuf import descriptor_pb2
from google.protobuf.internal import api_implementation
from google.protobuf.internal import descriptor_pool_test1_pb2
from google.protobuf.internal import descriptor_pool_test2_pb2
from google.protobuf.internal import factory_test1_pb2
from google.protobuf.internal import factory_test2_pb2
from google.protobuf.internal import file_options_test_pb2
from google.protobuf.internal import more_messages_pb2
from google.protobuf import descriptor
from google.protobuf import descriptor_database
from google.protobuf import descriptor_pool
from google.protobuf import message_factory
from google.protobuf import symbol_database



class DescriptorPoolTestBase(object):

  def testFindFileByName(self):
    name1 = 'google/protobuf/internal/factory_test1.proto'
    file_desc1 = self.pool.FindFileByName(name1)
    self.assertIsInstance(file_desc1, descriptor.FileDescriptor)
    self.assertEqual(name1, file_desc1.name)
    self.assertEqual('google.protobuf.python.internal', file_desc1.package)
    self.assertIn('Factory1Message', file_desc1.message_types_by_name)

    name2 = 'google/protobuf/internal/factory_test2.proto'
    file_desc2 = self.pool.FindFileByName(name2)
    self.assertIsInstance(file_desc2, descriptor.FileDescriptor)
    self.assertEqual(name2, file_desc2.name)
    self.assertEqual('google.protobuf.python.internal', file_desc2.package)
    self.assertIn('Factory2Message', file_desc2.message_types_by_name)

  def testFindFileByNameFailure(self):
    with self.assertRaises(KeyError):
      self.pool.FindFileByName('Does not exist')

  def testFindFileContainingSymbol(self):
    file_desc1 = self.pool.FindFileContainingSymbol(
        'google.protobuf.python.internal.Factory1Message')
    self.assertIsInstance(file_desc1, descriptor.FileDescriptor)
    self.assertEqual('google/protobuf/internal/factory_test1.proto',
                     file_desc1.name)
    self.assertEqual('google.protobuf.python.internal', file_desc1.package)
    self.assertIn('Factory1Message', file_desc1.message_types_by_name)

    file_desc2 = self.pool.FindFileContainingSymbol(
        'google.protobuf.python.internal.Factory2Message')
    self.assertIsInstance(file_desc2, descriptor.FileDescriptor)
    self.assertEqual('google/protobuf/internal/factory_test2.proto',
                     file_desc2.name)
    self.assertEqual('google.protobuf.python.internal', file_desc2.package)
    self.assertIn('Factory2Message', file_desc2.message_types_by_name)

    # Tests top level extension.
    file_desc3 = self.pool.FindFileContainingSymbol(
        'google.protobuf.python.internal.another_field')
    self.assertIsInstance(file_desc3, descriptor.FileDescriptor)
    self.assertEqual('google/protobuf/internal/factory_test2.proto',
                     file_desc3.name)

    # Tests nested extension inside a message.
    file_desc4 = self.pool.FindFileContainingSymbol(
        'google.protobuf.python.internal.Factory2Message.one_more_field')
    self.assertIsInstance(file_desc4, descriptor.FileDescriptor)
    self.assertEqual('google/protobuf/internal/factory_test2.proto',
                     file_desc4.name)

    file_desc5 = self.pool.FindFileContainingSymbol(
        'protobuf_unittest.TestService')
    self.assertIsInstance(file_desc5, descriptor.FileDescriptor)
    self.assertEqual('google/protobuf/unittest.proto',
                     file_desc5.name)

  def testFindFileContainingSymbolFailure(self):
    with self.assertRaises(KeyError):
      self.pool.FindFileContainingSymbol('Does not exist')

  def testFindMessageTypeByName(self):
    msg1 = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory1Message')
    self.assertIsInstance(msg1, descriptor.Descriptor)
    self.assertEqual('Factory1Message', msg1.name)
    self.assertEqual('google.protobuf.python.internal.Factory1Message',
                     msg1.full_name)
    self.assertEqual(None, msg1.containing_type)
    self.assertFalse(msg1.has_options)

    nested_msg1 = msg1.nested_types[0]
    self.assertEqual('NestedFactory1Message', nested_msg1.name)
    self.assertEqual(msg1, nested_msg1.containing_type)

    nested_enum1 = msg1.enum_types[0]
    self.assertEqual('NestedFactory1Enum', nested_enum1.name)
    self.assertEqual(msg1, nested_enum1.containing_type)

    self.assertEqual(nested_msg1, msg1.fields_by_name[
        'nested_factory_1_message'].message_type)
    self.assertEqual(nested_enum1, msg1.fields_by_name[
        'nested_factory_1_enum'].enum_type)

    msg2 = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory2Message')
    self.assertIsInstance(msg2, descriptor.Descriptor)
    self.assertEqual('Factory2Message', msg2.name)
    self.assertEqual('google.protobuf.python.internal.Factory2Message',
                     msg2.full_name)
    self.assertIsNone(msg2.containing_type)

    nested_msg2 = msg2.nested_types[0]
    self.assertEqual('NestedFactory2Message', nested_msg2.name)
    self.assertEqual(msg2, nested_msg2.containing_type)

    nested_enum2 = msg2.enum_types[0]
    self.assertEqual('NestedFactory2Enum', nested_enum2.name)
    self.assertEqual(msg2, nested_enum2.containing_type)

    self.assertEqual(nested_msg2, msg2.fields_by_name[
        'nested_factory_2_message'].message_type)
    self.assertEqual(nested_enum2, msg2.fields_by_name[
        'nested_factory_2_enum'].enum_type)

    self.assertTrue(msg2.fields_by_name['int_with_default'].has_default_value)
    self.assertEqual(
        1776, msg2.fields_by_name['int_with_default'].default_value)

    self.assertTrue(
        msg2.fields_by_name['double_with_default'].has_default_value)
    self.assertEqual(
        9.99, msg2.fields_by_name['double_with_default'].default_value)

    self.assertTrue(
        msg2.fields_by_name['string_with_default'].has_default_value)
    self.assertEqual(
        'hello world', msg2.fields_by_name['string_with_default'].default_value)

    self.assertTrue(msg2.fields_by_name['bool_with_default'].has_default_value)
    self.assertFalse(msg2.fields_by_name['bool_with_default'].default_value)

    self.assertTrue(msg2.fields_by_name['enum_with_default'].has_default_value)
    self.assertEqual(
        1, msg2.fields_by_name['enum_with_default'].default_value)

    msg3 = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory2Message.NestedFactory2Message')
    self.assertEqual(nested_msg2, msg3)

    self.assertTrue(msg2.fields_by_name['bytes_with_default'].has_default_value)
    self.assertEqual(
        b'a\xfb\x00c',
        msg2.fields_by_name['bytes_with_default'].default_value)

    self.assertEqual(1, len(msg2.oneofs))
    self.assertEqual(1, len(msg2.oneofs_by_name))
    self.assertEqual(2, len(msg2.oneofs[0].fields))
    for name in ['oneof_int', 'oneof_string']:
      self.assertEqual(msg2.oneofs[0],
                       msg2.fields_by_name[name].containing_oneof)
      self.assertIn(msg2.fields_by_name[name], msg2.oneofs[0].fields)

  def testFindTypeErrors(self):
    self.assertRaises(TypeError, self.pool.FindExtensionByNumber, '')

    # TODO(jieluo): Fix python to raise correct errors.
    if api_implementation.Type() == 'cpp':
      self.assertRaises(TypeError, self.pool.FindMethodByName, 0)
      self.assertRaises(KeyError, self.pool.FindMethodByName, '')
      error_type = TypeError
    else:
      error_type = AttributeError
    self.assertRaises(error_type, self.pool.FindMessageTypeByName, 0)
    self.assertRaises(error_type, self.pool.FindFieldByName, 0)
    self.assertRaises(error_type, self.pool.FindExtensionByName, 0)
    self.assertRaises(error_type, self.pool.FindEnumTypeByName, 0)
    self.assertRaises(error_type, self.pool.FindOneofByName, 0)
    self.assertRaises(error_type, self.pool.FindServiceByName, 0)
    self.assertRaises(error_type, self.pool.FindFileContainingSymbol, 0)
    if api_implementation.Type() == 'python':
      error_type = KeyError
    self.assertRaises(error_type, self.pool.FindFileByName, 0)

  def testFindMessageTypeByNameFailure(self):
    with self.assertRaises(KeyError):
      self.pool.FindMessageTypeByName('Does not exist')

  def testFindEnumTypeByName(self):
    enum1 = self.pool.FindEnumTypeByName(
        'google.protobuf.python.internal.Factory1Enum')
    self.assertIsInstance(enum1, descriptor.EnumDescriptor)
    self.assertEqual(0, enum1.values_by_name['FACTORY_1_VALUE_0'].number)
    self.assertEqual(1, enum1.values_by_name['FACTORY_1_VALUE_1'].number)
    self.assertFalse(enum1.has_options)

    nested_enum1 = self.pool.FindEnumTypeByName(
        'google.protobuf.python.internal.Factory1Message.NestedFactory1Enum')
    self.assertIsInstance(nested_enum1, descriptor.EnumDescriptor)
    self.assertEqual(
        0, nested_enum1.values_by_name['NESTED_FACTORY_1_VALUE_0'].number)
    self.assertEqual(
        1, nested_enum1.values_by_name['NESTED_FACTORY_1_VALUE_1'].number)

    enum2 = self.pool.FindEnumTypeByName(
        'google.protobuf.python.internal.Factory2Enum')
    self.assertIsInstance(enum2, descriptor.EnumDescriptor)
    self.assertEqual(0, enum2.values_by_name['FACTORY_2_VALUE_0'].number)
    self.assertEqual(1, enum2.values_by_name['FACTORY_2_VALUE_1'].number)

    nested_enum2 = self.pool.FindEnumTypeByName(
        'google.protobuf.python.internal.Factory2Message.NestedFactory2Enum')
    self.assertIsInstance(nested_enum2, descriptor.EnumDescriptor)
    self.assertEqual(
        0, nested_enum2.values_by_name['NESTED_FACTORY_2_VALUE_0'].number)
    self.assertEqual(
        1, nested_enum2.values_by_name['NESTED_FACTORY_2_VALUE_1'].number)

  def testFindEnumTypeByNameFailure(self):
    with self.assertRaises(KeyError):
      self.pool.FindEnumTypeByName('Does not exist')

  def testFindFieldByName(self):
    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # TODO(jieluo): Fix cpp extension to find field correctly
        # when descriptor pool is using an underlying database.
        return
    field = self.pool.FindFieldByName(
        'google.protobuf.python.internal.Factory1Message.list_value')
    self.assertEqual(field.name, 'list_value')
    self.assertEqual(field.label, field.LABEL_REPEATED)
    self.assertFalse(field.has_options)

    with self.assertRaises(KeyError):
      self.pool.FindFieldByName('Does not exist')

  def testFindOneofByName(self):
    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # TODO(jieluo): Fix cpp extension to find oneof correctly
        # when descriptor pool is using an underlying database.
        return
    oneof = self.pool.FindOneofByName(
        'google.protobuf.python.internal.Factory2Message.oneof_field')
    self.assertEqual(oneof.name, 'oneof_field')
    with self.assertRaises(KeyError):
      self.pool.FindOneofByName('Does not exist')

  def testFindExtensionByName(self):
    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # TODO(jieluo): Fix cpp extension to find extension correctly
        # when descriptor pool is using an underlying database.
        return
    # An extension defined in a message.
    extension = self.pool.FindExtensionByName(
        'google.protobuf.python.internal.Factory2Message.one_more_field')
    self.assertEqual(extension.name, 'one_more_field')
    # An extension defined at file scope.
    extension = self.pool.FindExtensionByName(
        'google.protobuf.python.internal.another_field')
    self.assertEqual(extension.name, 'another_field')
    self.assertEqual(extension.number, 1002)
    with self.assertRaises(KeyError):
      self.pool.FindFieldByName('Does not exist')

  def testFindAllExtensions(self):
    factory1_message = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory1Message')
    factory2_message = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory2Message')
    # An extension defined in a message.
    one_more_field = factory2_message.extensions_by_name['one_more_field']
    self.pool.AddExtensionDescriptor(one_more_field)
    # An extension defined at file scope.
    factory_test2 = self.pool.FindFileByName(
        'google/protobuf/internal/factory_test2.proto')
    another_field = factory_test2.extensions_by_name['another_field']
    self.pool.AddExtensionDescriptor(another_field)

    extensions = self.pool.FindAllExtensions(factory1_message)
    expected_extension_numbers = set([one_more_field, another_field])
    self.assertEqual(expected_extension_numbers, set(extensions))
    # Verify that mutating the returned list does not affect the pool.
    extensions.append('unexpected_element')
    # Get the extensions again, the returned value does not contain the
    # 'unexpected_element'.
    extensions = self.pool.FindAllExtensions(factory1_message)
    self.assertEqual(expected_extension_numbers, set(extensions))

  def testFindExtensionByNumber(self):
    factory1_message = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory1Message')
    factory2_message = self.pool.FindMessageTypeByName(
        'google.protobuf.python.internal.Factory2Message')
    # An extension defined in a message.
    one_more_field = factory2_message.extensions_by_name['one_more_field']
    self.pool.AddExtensionDescriptor(one_more_field)
    # An extension defined at file scope.
    factory_test2 = self.pool.FindFileByName(
        'google/protobuf/internal/factory_test2.proto')
    another_field = factory_test2.extensions_by_name['another_field']
    self.pool.AddExtensionDescriptor(another_field)

    # An extension defined in a message.
    extension = self.pool.FindExtensionByNumber(factory1_message, 1001)
    self.assertEqual(extension.name, 'one_more_field')
    # An extension defined at file scope.
    extension = self.pool.FindExtensionByNumber(factory1_message, 1002)
    self.assertEqual(extension.name, 'another_field')
    with self.assertRaises(KeyError):
      extension = self.pool.FindExtensionByNumber(factory1_message, 1234567)

  def testExtensionsAreNotFields(self):
    with self.assertRaises(KeyError):
      self.pool.FindFieldByName('google.protobuf.python.internal.another_field')
    with self.assertRaises(KeyError):
      self.pool.FindFieldByName(
          'google.protobuf.python.internal.Factory2Message.one_more_field')
    with self.assertRaises(KeyError):
      self.pool.FindExtensionByName(
          'google.protobuf.python.internal.Factory1Message.list_value')

  def testFindService(self):
    service = self.pool.FindServiceByName('protobuf_unittest.TestService')
    self.assertEqual(service.full_name, 'protobuf_unittest.TestService')
    with self.assertRaises(KeyError):
      self.pool.FindServiceByName('Does not exist')

  def testUserDefinedDB(self):
    db = descriptor_database.DescriptorDatabase()
    self.pool = descriptor_pool.DescriptorPool(db)
    db.Add(self.factory_test1_fd)
    db.Add(self.factory_test2_fd)
    self.testFindMessageTypeByName()

  def testAddSerializedFile(self):
    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # Cpp extension cannot call Add on a DescriptorPool
        # that uses a DescriptorDatabase.
        # TODO(jieluo): Fix python and cpp extension diff.
        return
    self.pool = descriptor_pool.DescriptorPool()
    self.pool.AddSerializedFile(self.factory_test1_fd.SerializeToString())
    self.pool.AddSerializedFile(self.factory_test2_fd.SerializeToString())
    self.testFindMessageTypeByName()


  def testEnumDefaultValue(self):
    """Test the default value of enums which don't start at zero."""
    def _CheckDefaultValue(file_descriptor):
      default_value = (file_descriptor
                       .message_types_by_name['DescriptorPoolTest1']
                       .fields_by_name['nested_enum']
                       .default_value)
      self.assertEqual(default_value,
                       descriptor_pool_test1_pb2.DescriptorPoolTest1.BETA)
    # First check what the generated descriptor contains.
    _CheckDefaultValue(descriptor_pool_test1_pb2.DESCRIPTOR)
    # Then check the generated pool. Normally this is the same descriptor.
    file_descriptor = symbol_database.Default().pool.FindFileByName(
        'google/protobuf/internal/descriptor_pool_test1.proto')
    self.assertIs(file_descriptor, descriptor_pool_test1_pb2.DESCRIPTOR)
    _CheckDefaultValue(file_descriptor)

    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # Cpp extension cannot call Add on a DescriptorPool
        # that uses a DescriptorDatabase.
        # TODO(jieluo): Fix python and cpp extension diff.
        return
    # Then check the dynamic pool and its internal DescriptorDatabase.
    descriptor_proto = descriptor_pb2.FileDescriptorProto.FromString(
        descriptor_pool_test1_pb2.DESCRIPTOR.serialized_pb)
    self.pool.Add(descriptor_proto)
    # And do the same check as above
    file_descriptor = self.pool.FindFileByName(
        'google/protobuf/internal/descriptor_pool_test1.proto')
    _CheckDefaultValue(file_descriptor)

  def testDefaultValueForCustomMessages(self):
    """Check the value returned by non-existent fields."""
    def _CheckValueAndType(value, expected_value, expected_type):
      self.assertEqual(value, expected_value)
      self.assertIsInstance(value, expected_type)

    def _CheckDefaultValues(msg):
      try:
        int64 = long
      except NameError:  # Python3
        int64 = int
      try:
        unicode_type = unicode
      except NameError:  # Python3
        unicode_type = str
      _CheckValueAndType(msg.optional_int32, 0, int)
      _CheckValueAndType(msg.optional_uint64, 0, (int64, int))
      _CheckValueAndType(msg.optional_float, 0, (float, int))
      _CheckValueAndType(msg.optional_double, 0, (float, int))
      _CheckValueAndType(msg.optional_bool, False, bool)
      _CheckValueAndType(msg.optional_string, u'', unicode_type)
      _CheckValueAndType(msg.optional_bytes, b'', bytes)
      _CheckValueAndType(msg.optional_nested_enum, msg.FOO, int)
    # First for the generated message
    _CheckDefaultValues(unittest_pb2.TestAllTypes())
    # Then for a message built with from the DescriptorPool.
    pool = descriptor_pool.DescriptorPool()
    pool.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_import_public_pb2.DESCRIPTOR.serialized_pb))
    pool.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_import_pb2.DESCRIPTOR.serialized_pb))
    pool.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_pb2.DESCRIPTOR.serialized_pb))
    message_class = message_factory.MessageFactory(pool).GetPrototype(
        pool.FindMessageTypeByName(
            unittest_pb2.TestAllTypes.DESCRIPTOR.full_name))
    _CheckDefaultValues(message_class())

  def testAddFileDescriptor(self):
    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # Cpp extension cannot call Add on a DescriptorPool
        # that uses a DescriptorDatabase.
        # TODO(jieluo): Fix python and cpp extension diff.
        return
    file_desc = descriptor_pb2.FileDescriptorProto(name='some/file.proto')
    self.pool.Add(file_desc)
    self.pool.AddSerializedFile(file_desc.SerializeToString())

  def testComplexNesting(self):
    if isinstance(self, SecondaryDescriptorFromDescriptorDB):
      if api_implementation.Type() == 'cpp':
        # Cpp extension cannot call Add on a DescriptorPool
        # that uses a DescriptorDatabase.
        # TODO(jieluo): Fix python and cpp extension diff.
        return
    more_messages_desc = descriptor_pb2.FileDescriptorProto.FromString(
        more_messages_pb2.DESCRIPTOR.serialized_pb)
    test1_desc = descriptor_pb2.FileDescriptorProto.FromString(
        descriptor_pool_test1_pb2.DESCRIPTOR.serialized_pb)
    test2_desc = descriptor_pb2.FileDescriptorProto.FromString(
        descriptor_pool_test2_pb2.DESCRIPTOR.serialized_pb)
    self.pool.Add(more_messages_desc)
    self.pool.Add(test1_desc)
    self.pool.Add(test2_desc)
    TEST1_FILE.CheckFile(self, self.pool)
    TEST2_FILE.CheckFile(self, self.pool)


class DefaultDescriptorPoolTest(DescriptorPoolTestBase, unittest.TestCase):

  def setUp(self):
    self.pool = descriptor_pool.Default()
    self.factory_test1_fd = descriptor_pb2.FileDescriptorProto.FromString(
        factory_test1_pb2.DESCRIPTOR.serialized_pb)
    self.factory_test2_fd = descriptor_pb2.FileDescriptorProto.FromString(
        factory_test2_pb2.DESCRIPTOR.serialized_pb)

  def testFindMethods(self):
    self.assertIs(
        self.pool.FindFileByName('google/protobuf/unittest.proto'),
        unittest_pb2.DESCRIPTOR)
    self.assertIs(
        self.pool.FindMessageTypeByName('protobuf_unittest.TestAllTypes'),
        unittest_pb2.TestAllTypes.DESCRIPTOR)
    self.assertIs(
        self.pool.FindFieldByName(
            'protobuf_unittest.TestAllTypes.optional_int32'),
        unittest_pb2.TestAllTypes.DESCRIPTOR.fields_by_name['optional_int32'])
    self.assertIs(
        self.pool.FindEnumTypeByName('protobuf_unittest.ForeignEnum'),
        unittest_pb2.ForeignEnum.DESCRIPTOR)
    self.assertIs(
        self.pool.FindExtensionByName(
            'protobuf_unittest.optional_int32_extension'),
        unittest_pb2.DESCRIPTOR.extensions_by_name['optional_int32_extension'])
    self.assertIs(
        self.pool.FindOneofByName('protobuf_unittest.TestAllTypes.oneof_field'),
        unittest_pb2.TestAllTypes.DESCRIPTOR.oneofs_by_name['oneof_field'])
    self.assertIs(
        self.pool.FindServiceByName('protobuf_unittest.TestService'),
        unittest_pb2.DESCRIPTOR.services_by_name['TestService'])


class CreateDescriptorPoolTest(DescriptorPoolTestBase, unittest.TestCase):

  def setUp(self):
    self.pool = descriptor_pool.DescriptorPool()
    self.factory_test1_fd = descriptor_pb2.FileDescriptorProto.FromString(
        factory_test1_pb2.DESCRIPTOR.serialized_pb)
    self.factory_test2_fd = descriptor_pb2.FileDescriptorProto.FromString(
        factory_test2_pb2.DESCRIPTOR.serialized_pb)
    self.pool.Add(self.factory_test1_fd)
    self.pool.Add(self.factory_test2_fd)

    self.pool.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_import_public_pb2.DESCRIPTOR.serialized_pb))
    self.pool.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_import_pb2.DESCRIPTOR.serialized_pb))
    self.pool.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_pb2.DESCRIPTOR.serialized_pb))


class SecondaryDescriptorFromDescriptorDB(DescriptorPoolTestBase,
                                          unittest.TestCase):

  def setUp(self):
    self.factory_test1_fd = descriptor_pb2.FileDescriptorProto.FromString(
        factory_test1_pb2.DESCRIPTOR.serialized_pb)
    self.factory_test2_fd = descriptor_pb2.FileDescriptorProto.FromString(
        factory_test2_pb2.DESCRIPTOR.serialized_pb)
    db = descriptor_database.DescriptorDatabase()
    db.Add(self.factory_test1_fd)
    db.Add(self.factory_test2_fd)
    db.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_import_public_pb2.DESCRIPTOR.serialized_pb))
    db.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_import_pb2.DESCRIPTOR.serialized_pb))
    db.Add(descriptor_pb2.FileDescriptorProto.FromString(
        unittest_pb2.DESCRIPTOR.serialized_pb))
    self.pool = descriptor_pool.DescriptorPool(descriptor_db=db)


class ProtoFile(object):

  def __init__(self, name, package, messages, dependencies=None,
               public_dependencies=None):
    self.name = name
    self.package = package
    self.messages = messages
    self.dependencies = dependencies or []
    self.public_dependencies = public_dependencies or []

  def CheckFile(self, test, pool):
    file_desc = pool.FindFileByName(self.name)
    test.assertEqual(self.name, file_desc.name)
    test.assertEqual(self.package, file_desc.package)
    dependencies_names = [f.name for f in file_desc.dependencies]
    test.assertEqual(self.dependencies, dependencies_names)
    public_dependencies_names = [f.name for f in file_desc.public_dependencies]
    test.assertEqual(self.public_dependencies, public_dependencies_names)
    for name, msg_type in self.messages.items():
      msg_type.CheckType(test, None, name, file_desc)


class EnumType(object):

  def __init__(self, values):
    self.values = values

  def CheckType(self, test, msg_desc, name, file_desc):
    enum_desc = msg_desc.enum_types_by_name[name]
    test.assertEqual(name, enum_desc.name)
    expected_enum_full_name = '.'.join([msg_desc.full_name, name])
    test.assertEqual(expected_enum_full_name, enum_desc.full_name)
    test.assertEqual(msg_desc, enum_desc.containing_type)
    test.assertEqual(file_desc, enum_desc.file)
    for index, (value, number) in enumerate(self.values):
      value_desc = enum_desc.values_by_name[value]
      test.assertEqual(value, value_desc.name)
      test.assertEqual(index, value_desc.index)
      test.assertEqual(number, value_desc.number)
      test.assertEqual(enum_desc, value_desc.type)
      test.assertIn(value, msg_desc.enum_values_by_name)


class MessageType(object):

  def __init__(self, type_dict, field_list, is_extendable=False,
               extensions=None):
    self.type_dict = type_dict
    self.field_list = field_list
    self.is_extendable = is_extendable
    self.extensions = extensions or []

  def CheckType(self, test, containing_type_desc, name, file_desc):
    if containing_type_desc is None:
      desc = file_desc.message_types_by_name[name]
      expected_full_name = '.'.join([file_desc.package, name])
    else:
      desc = containing_type_desc.nested_types_by_name[name]
      expected_full_name = '.'.join([containing_type_desc.full_name, name])

    test.assertEqual(name, desc.name)
    test.assertEqual(expected_full_name, desc.full_name)
    test.assertEqual(containing_type_desc, desc.containing_type)
    test.assertEqual(desc.file, file_desc)
    test.assertEqual(self.is_extendable, desc.is_extendable)
    for name, subtype in self.type_dict.items():
      subtype.CheckType(test, desc, name, file_desc)

    for index, (name, field) in enumerate(self.field_list):
      field.CheckField(test, desc, name, index, file_desc)

    for index, (name, field) in enumerate(self.extensions):
      field.CheckField(test, desc, name, index, file_desc)


class EnumField(object):

  def __init__(self, number, type_name, default_value):
    self.number = number
    self.type_name = type_name
    self.default_value = default_value

  def CheckField(self, test, msg_desc, name, index, file_desc):
    field_desc = msg_desc.fields_by_name[name]
    enum_desc = msg_desc.enum_types_by_name[self.type_name]
    test.assertEqual(name, field_desc.name)
    expected_field_full_name = '.'.join([msg_desc.full_name, name])
    test.assertEqual(expected_field_full_name, field_desc.full_name)
    test.assertEqual(index, field_desc.index)
    test.assertEqual(self.number, field_desc.number)
    test.assertEqual(descriptor.FieldDescriptor.TYPE_ENUM, field_desc.type)
    test.assertEqual(descriptor.FieldDescriptor.CPPTYPE_ENUM,
                     field_desc.cpp_type)
    test.assertTrue(field_desc.has_default_value)
    test.assertEqual(enum_desc.values_by_name[self.default_value].number,
                     field_desc.default_value)
    test.assertFalse(enum_desc.values_by_name[self.default_value].has_options)
    test.assertEqual(msg_desc, field_desc.containing_type)
    test.assertEqual(enum_desc, field_desc.enum_type)
    test.assertEqual(file_desc, enum_desc.file)


class MessageField(object):

  def __init__(self, number, type_name):
    self.number = number
    self.type_name = type_name

  def CheckField(self, test, msg_desc, name, index, file_desc):
    field_desc = msg_desc.fields_by_name[name]
    field_type_desc = msg_desc.nested_types_by_name[self.type_name]
    test.assertEqual(name, field_desc.name)
    expected_field_full_name = '.'.join([msg_desc.full_name, name])
    test.assertEqual(expected_field_full_name, field_desc.full_name)
    test.assertEqual(index, field_desc.index)
    test.assertEqual(self.number, field_desc.number)
    test.assertEqual(descriptor.FieldDescriptor.TYPE_MESSAGE, field_desc.type)
    test.assertEqual(descriptor.FieldDescriptor.CPPTYPE_MESSAGE,
                     field_desc.cpp_type)
    test.assertFalse(field_desc.has_default_value)
    test.assertEqual(msg_desc, field_desc.containing_type)
    test.assertEqual(field_type_desc, field_desc.message_type)
    test.assertEqual(file_desc, field_desc.file)
    # TODO(jieluo): Fix python and cpp extension diff for message field
    # default value.
    if api_implementation.Type() == 'cpp':
      test.assertRaises(
          NotImplementedError, getattr, field_desc, 'default_value')


class StringField(object):

  def __init__(self, number, default_value):
    self.number = number
    self.default_value = default_value

  def CheckField(self, test, msg_desc, name, index, file_desc):
    field_desc = msg_desc.fields_by_name[name]
    test.assertEqual(name, field_desc.name)
    expected_field_full_name = '.'.join([msg_desc.full_name, name])
    test.assertEqual(expected_field_full_name, field_desc.full_name)
    test.assertEqual(index, field_desc.index)
    test.assertEqual(self.number, field_desc.number)
    test.assertEqual(descriptor.FieldDescriptor.TYPE_STRING, field_desc.type)
    test.assertEqual(descriptor.FieldDescriptor.CPPTYPE_STRING,
                     field_desc.cpp_type)
    test.assertTrue(field_desc.has_default_value)
    test.assertEqual(self.default_value, field_desc.default_value)
    test.assertEqual(file_desc, field_desc.file)


class ExtensionField(object):

  def __init__(self, number, extended_type):
    self.number = number
    self.extended_type = extended_type

  def CheckField(self, test, msg_desc, name, index, file_desc):
    field_desc = msg_desc.extensions_by_name[name]
    test.assertEqual(name, field_desc.name)
    expected_field_full_name = '.'.join([msg_desc.full_name, name])
    test.assertEqual(expected_field_full_name, field_desc.full_name)
    test.assertEqual(self.number, field_desc.number)
    test.assertEqual(index, field_desc.index)
    test.assertEqual(descriptor.FieldDescriptor.TYPE_MESSAGE, field_desc.type)
    test.assertEqual(descriptor.FieldDescriptor.CPPTYPE_MESSAGE,
                     field_desc.cpp_type)
    test.assertFalse(field_desc.has_default_value)
    test.assertTrue(field_desc.is_extension)
    test.assertEqual(msg_desc, field_desc.extension_scope)
    test.assertEqual(msg_desc, field_desc.message_type)
    test.assertEqual(self.extended_type, field_desc.containing_type.name)
    test.assertEqual(file_desc, field_desc.file)


class AddDescriptorTest(unittest.TestCase):

  def _TestMessage(self, prefix):
    pool = descriptor_pool.DescriptorPool()
    pool.AddDescriptor(unittest_pb2.TestAllTypes.DESCRIPTOR)
    self.assertEqual(
        'protobuf_unittest.TestAllTypes',
        pool.FindMessageTypeByName(
            prefix + 'protobuf_unittest.TestAllTypes').full_name)

    # AddDescriptor is not recursive.
    with self.assertRaises(KeyError):
      pool.FindMessageTypeByName(
          prefix + 'protobuf_unittest.TestAllTypes.NestedMessage')

    pool.AddDescriptor(unittest_pb2.TestAllTypes.NestedMessage.DESCRIPTOR)
    self.assertEqual(
        'protobuf_unittest.TestAllTypes.NestedMessage',
        pool.FindMessageTypeByName(
            prefix + 'protobuf_unittest.TestAllTypes.NestedMessage').full_name)

    # Files are implicitly also indexed when messages are added.
    self.assertEqual(
        'google/protobuf/unittest.proto',
        pool.FindFileByName(
            'google/protobuf/unittest.proto').name)

    self.assertEqual(
        'google/protobuf/unittest.proto',
        pool.FindFileContainingSymbol(
            prefix + 'protobuf_unittest.TestAllTypes.NestedMessage').name)

  @unittest.skipIf(api_implementation.Type() == 'cpp',
                   'With the cpp implementation, Add() must be called first')
  def testMessage(self):
    self._TestMessage('')
    self._TestMessage('.')

  def _TestEnum(self, prefix):
    pool = descriptor_pool.DescriptorPool()
    pool.AddEnumDescriptor(unittest_pb2.ForeignEnum.DESCRIPTOR)
    self.assertEqual(
        'protobuf_unittest.ForeignEnum',
        pool.FindEnumTypeByName(
            prefix + 'protobuf_unittest.ForeignEnum').full_name)

    # AddEnumDescriptor is not recursive.
    with self.assertRaises(KeyError):
      pool.FindEnumTypeByName(
          prefix + 'protobuf_unittest.ForeignEnum.NestedEnum')

    pool.AddEnumDescriptor(unittest_pb2.TestAllTypes.NestedEnum.DESCRIPTOR)
    self.assertEqual(
        'protobuf_unittest.TestAllTypes.NestedEnum',
        pool.FindEnumTypeByName(
            prefix + 'protobuf_unittest.TestAllTypes.NestedEnum').full_name)

    # Files are implicitly also indexed when enums are added.
    self.assertEqual(
        'google/protobuf/unittest.proto',
        pool.FindFileByName(
            'google/protobuf/unittest.proto').name)

    self.assertEqual(
        'google/protobuf/unittest.proto',
        pool.FindFileContainingSymbol(
            prefix + 'protobuf_unittest.TestAllTypes.NestedEnum').name)

  @unittest.skipIf(api_implementation.Type() == 'cpp',
                   'With the cpp implementation, Add() must be called first')
  def testEnum(self):
    self._TestEnum('')
    self._TestEnum('.')

  @unittest.skipIf(api_implementation.Type() == 'cpp',
                   'With the cpp implementation, Add() must be called first')
  def testService(self):
    pool = descriptor_pool.DescriptorPool()
    with self.assertRaises(KeyError):
      pool.FindServiceByName('protobuf_unittest.TestService')
    pool.AddServiceDescriptor(unittest_pb2._TESTSERVICE)
    self.assertEqual(
        'protobuf_unittest.TestService',
        pool.FindServiceByName('protobuf_unittest.TestService').full_name)

  @unittest.skipIf(api_implementation.Type() == 'cpp',
                   'With the cpp implementation, Add() must be called first')
  def testFile(self):
    pool = descriptor_pool.DescriptorPool()
    pool.AddFileDescriptor(unittest_pb2.DESCRIPTOR)
    self.assertEqual(
        'google/protobuf/unittest.proto',
        pool.FindFileByName(
            'google/protobuf/unittest.proto').name)

    # AddFileDescriptor is not recursive; messages and enums within files must
    # be explicitly registered.
    with self.assertRaises(KeyError):
      pool.FindFileContainingSymbol(
          'protobuf_unittest.TestAllTypes')

  def testEmptyDescriptorPool(self):
    # Check that an empty DescriptorPool() contains no messages.
    pool = descriptor_pool.DescriptorPool()
    proto_file_name = descriptor_pb2.DESCRIPTOR.name
    self.assertRaises(KeyError, pool.FindFileByName, proto_file_name)
    # Add the above file to the pool
    file_descriptor = descriptor_pb2.FileDescriptorProto()
    descriptor_pb2.DESCRIPTOR.CopyToProto(file_descriptor)
    pool.Add(file_descriptor)
    # Now it exists.
    self.assertTrue(pool.FindFileByName(proto_file_name))

  def testCustomDescriptorPool(self):
    # Create a new pool, and add a file descriptor.
    pool = descriptor_pool.DescriptorPool()
    file_desc = descriptor_pb2.FileDescriptorProto(
        name='some/file.proto', package='package')
    file_desc.message_type.add(name='Message')
    pool.Add(file_desc)
    self.assertEqual(pool.FindFileByName('some/file.proto').name,
                     'some/file.proto')
    self.assertEqual(pool.FindMessageTypeByName('package.Message').name,
                     'Message')
    # Test no package
    file_proto = descriptor_pb2.FileDescriptorProto(
        name='some/filename/container.proto')
    message_proto = file_proto.message_type.add(
        name='TopMessage')
    message_proto.field.add(
        name='bb',
        number=1,
        type=descriptor_pb2.FieldDescriptorProto.TYPE_INT32,
        label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL)
    enum_proto = file_proto.enum_type.add(name='TopEnum')
    enum_proto.value.add(name='FOREIGN_FOO', number=4)
    file_proto.service.add(name='TopService')
    pool = descriptor_pool.DescriptorPool()
    pool.Add(file_proto)
    self.assertEqual('TopMessage',
                     pool.FindMessageTypeByName('TopMessage').name)
    self.assertEqual('TopEnum', pool.FindEnumTypeByName('TopEnum').name)
    self.assertEqual('TopService', pool.FindServiceByName('TopService').name)

  def testFileDescriptorOptionsWithCustomDescriptorPool(self):
    # Create a descriptor pool, and add a new FileDescriptorProto to it.
    pool = descriptor_pool.DescriptorPool()
    file_name = 'file_descriptor_options_with_custom_descriptor_pool.proto'
    file_descriptor_proto = descriptor_pb2.FileDescriptorProto(name=file_name)
    extension_id = file_options_test_pb2.foo_options
    file_descriptor_proto.options.Extensions[extension_id].foo_name = 'foo'
    pool.Add(file_descriptor_proto)
    # The options set on the FileDescriptorProto should be available in the
    # descriptor even if they contain extensions that cannot be deserialized
    # using the pool.
    file_descriptor = pool.FindFileByName(file_name)
    options = file_descriptor.GetOptions()
    self.assertEqual('foo', options.Extensions[extension_id].foo_name)
    # The object returned by GetOptions() is cached.
    self.assertIs(options, file_descriptor.GetOptions())

  def testAddTypeError(self):
    pool = descriptor_pool.DescriptorPool()
    with self.assertRaises(TypeError):
      pool.AddDescriptor(0)
    with self.assertRaises(TypeError):
      pool.AddEnumDescriptor(0)
    with self.assertRaises(TypeError):
      pool.AddServiceDescriptor(0)
    with self.assertRaises(TypeError):
      pool.AddExtensionDescriptor(0)
    with self.assertRaises(TypeError):
      pool.AddFileDescriptor(0)


TEST1_FILE = ProtoFile(
    'google/protobuf/internal/descriptor_pool_test1.proto',
    'google.protobuf.python.internal',
    {
        'DescriptorPoolTest1': MessageType({
            'NestedEnum': EnumType([('ALPHA', 1), ('BETA', 2)]),
            'NestedMessage': MessageType({
                'NestedEnum': EnumType([('EPSILON', 5), ('ZETA', 6)]),
                'DeepNestedMessage': MessageType({
                    'NestedEnum': EnumType([('ETA', 7), ('THETA', 8)]),
                }, [
                    ('nested_enum', EnumField(1, 'NestedEnum', 'ETA')),
                    ('nested_field', StringField(2, 'theta')),
                ]),
            }, [
                ('nested_enum', EnumField(1, 'NestedEnum', 'ZETA')),
                ('nested_field', StringField(2, 'beta')),
                ('deep_nested_message', MessageField(3, 'DeepNestedMessage')),
            ])
        }, [
            ('nested_enum', EnumField(1, 'NestedEnum', 'BETA')),
            ('nested_message', MessageField(2, 'NestedMessage')),
        ], is_extendable=True),

        'DescriptorPoolTest2': MessageType({
            'NestedEnum': EnumType([('GAMMA', 3), ('DELTA', 4)]),
            'NestedMessage': MessageType({
                'NestedEnum': EnumType([('IOTA', 9), ('KAPPA', 10)]),
                'DeepNestedMessage': MessageType({
                    'NestedEnum': EnumType([('LAMBDA', 11), ('MU', 12)]),
                }, [
                    ('nested_enum', EnumField(1, 'NestedEnum', 'MU')),
                    ('nested_field', StringField(2, 'lambda')),
                ]),
            }, [
                ('nested_enum', EnumField(1, 'NestedEnum', 'IOTA')),
                ('nested_field', StringField(2, 'delta')),
                ('deep_nested_message', MessageField(3, 'DeepNestedMessage')),
            ])
        }, [
            ('nested_enum', EnumField(1, 'NestedEnum', 'GAMMA')),
            ('nested_message', MessageField(2, 'NestedMessage')),
        ]),
    })


TEST2_FILE = ProtoFile(
    'google/protobuf/internal/descriptor_pool_test2.proto',
    'google.protobuf.python.internal',
    {
        'DescriptorPoolTest3': MessageType({
            'NestedEnum': EnumType([('NU', 13), ('XI', 14)]),
            'NestedMessage': MessageType({
                'NestedEnum': EnumType([('OMICRON', 15), ('PI', 16)]),
                'DeepNestedMessage': MessageType({
                    'NestedEnum': EnumType([('RHO', 17), ('SIGMA', 18)]),
                }, [
                    ('nested_enum', EnumField(1, 'NestedEnum', 'RHO')),
                    ('nested_field', StringField(2, 'sigma')),
                ]),
            }, [
                ('nested_enum', EnumField(1, 'NestedEnum', 'PI')),
                ('nested_field', StringField(2, 'nu')),
                ('deep_nested_message', MessageField(3, 'DeepNestedMessage')),
            ])
        }, [
            ('nested_enum', EnumField(1, 'NestedEnum', 'XI')),
            ('nested_message', MessageField(2, 'NestedMessage')),
        ], extensions=[
            ('descriptor_pool_test',
             ExtensionField(1001, 'DescriptorPoolTest1')),
        ]),
    },
    dependencies=['google/protobuf/internal/descriptor_pool_test1.proto',
                  'google/protobuf/internal/more_messages.proto'],
    public_dependencies=['google/protobuf/internal/more_messages.proto'])


if __name__ == '__main__':
  unittest.main()

Youez - 2016 - github.com/yon3zu
LinuXploit