Skip to content

Classes

socialgene.base.socialgene

SocialGene

Bases: Molbio, CompareProtein, SequenceParser, Neo4jQuery, HmmerParser

Main class for building, sotring, working with protein and genomic info

Source code in socialgene/base/socialgene.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
class SocialGene(Molbio, CompareProtein, SequenceParser, Neo4jQuery, HmmerParser):
    """Main class for building, sotring, working with protein and genomic info"""

    # _export_table_names exports tables for the nextflow pipeline
    _export_table_names = [
        "table_protein_to_go",
        "table_assembly_to_locus",
        "table_loci",
        "table_locus_to_protein",
        "table_assembly",
        "table_assembly_to_taxid",
        "table_protein_ids",
    ]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.protein_comparison = []

    ########################################
    # Get info
    ########################################

    def get_loci_ids(self):
        return list(self.loci.keys())

    def get_all_gene_clusters(self):
        for i in self.assemblies.values():
            for j in i.loci.values():
                for k in j.gene_clusters:
                    yield k

    @property
    def protein_iter(self):
        for i in self.proteins.values():
            yield i

    ########################################
    # Filter
    ########################################

    def get_protein_domains_from_db(self, protein_id_list):
        # Search neo4j for a protein with the same hash, if it exists,
        # retrieve the domain info for it
        get_protein_domains_result = self.query_neo4j(
            cypher_name="get_protein_domains",
            param=protein_id_list,
        )
        temp = {
            "seq_pro_score": float(0),
            "evalue": float(0),
            "domain_bias": float(0),
            "domain_score": float(0),
            "seq_pro_bias": float(0),
            "hmm_from": int(0),
            "hmm_to": int(0),
            "ali_from": int(0),
            "ali_to": int(0),
            "env_from": int(0),
            "env_to": int(0),
        }
        if get_protein_domains_result:
            for protein in get_protein_domains_result:
                for domain in protein["domains"]:
                    new_dict = temp | domain["domain_properties"]
                    new_dict["hmm_id"] = domain["hmm_id"]
                    self.proteins[protein["p1.uid"]].add_domain(
                        **new_dict,
                    )
        else:
            log.info(
                "None of the input protein ids were found or had domains in the database"
            )

    def annotate_proteins_with_neo4j(
        self,
        protein_uids: List[str] = None,
        chunk_size: int = 1000,
        annotate_all: bool = False,
        progress: bool = False,
    ):
        """
        The function `annotate_proteins_with_neo4j` takes a list of protein hash IDs, queries a database
        for matching proteins, and retrieves their HMM annotations.

        Args:
          protein_uids (List[str]): A list of protein hash IDs. These are unique identifiers for proteins in the database that you want to annotate.
          chunk_size (int): The `chunk_size` parameter determines the number of proteins to query at a time. Proteins are divided into chunks to improve efficiency when querying the database.  Defaults to 1000
          annotate_all (bool): The `annotate_all` parameter is a boolean flag that determines whether to annotate all proteins in the database or not. If set to `True`, all proteins in the database will be annotated. If set to `False`, you need to provide a list of protein hash IDs in the `protein_uids. Defaults to False
          progress (bool): The `progress` parameter is a boolean flag that determines whether or not to display a progress bar during the execution of the function. If `progress` is set to `True`, a progress bar will be displayed to track the progress of the function. If `progress` is set to `False`. Defaults to False

        """
        if protein_uids is None and annotate_all:
            protein_uids = self.get_all_feature_uids()
        elif isinstance(protein_uids, str):
            protein_uids = [protein_uids]
        log.info(
            f"Searching database for HMM annotations of {len(protein_uids)} proteins."
        )
        search_result = search_protein_hash(protein_uids)
        if not any([i for i in search_result.values()]):
            log.info("No identical proteins found in the database.")
        else:
            # log.info(
            #     f"{len([i for i in search_result.values() if i])} of {len(protein_uids)} searched proteins were found in the database, pulling their HMM annotations into python..."
            # )
            # get the number of chunks needed to to query "chunk_size" proteins at a time
            prot_len = len(protein_uids)
            n_chunks = prot_len // chunk_size
            if n_chunks == 0:
                n_chunks = 1
            if n_chunks > (len(protein_uids) - 1):
                chunked_list = chunk_a_list_with_numpy(
                    input_list=protein_uids, n_chunks=n_chunks
                )
            else:
                chunked_list = [protein_uids]
            del protein_uids
            if progress:
                with Progress(transient=True) as pg:
                    task = pg.add_task("Progress...", total=n_chunks)
                    for protein_id_list in chunked_list:
                        self.get_protein_domains_from_db(
                            protein_id_list=protein_id_list
                        )
                        pg.update(task, advance=1)
            else:
                for protein_id_list in chunked_list:
                    self.get_protein_domains_from_db(protein_id_list=protein_id_list)
        return search_result

    def annotate(
        self, use_neo4j_precalc: bool = False, neo4j_chunk_size: int = 1000, **kwargs
    ):
        """
        The `annotate` function is a convenience function that retrieves HMMER annotation for all
        proteins in a Socialgene object, either from a Neo4j database or by using HMMER directly.

        Args:
          use_neo4j_precalc (bool): A boolean flag indicating whether to use precalculated domain
        information from Neo4j for annotation. If set to True, the domains info will be retrieved from
        Neo4j, and proteins not found in Neo4j will be analyzed with HMMER. If set to False, all
        proteins will be analyzed with HMMER. Defaults to False
          neo4j_chunk_size (int): The `neo4j_chunk_size` parameter determines the number of proteins
        that will be sent to Neo4j at a time for annotation. Defaults to 1000
        """
        if use_neo4j_precalc:
            db_res = self.annotate_proteins_with_neo4j(
                annotate_all=True,
                chunk_size=neo4j_chunk_size,
            )
            n_not_in_db = [k for k, v in db_res.items() if not v]
        else:
            n_not_in_db = list(self.proteins.keys())
        if n_not_in_db:
            self.annotate_proteins_with_hmmscan(
                protein_id_list=n_not_in_db,
                **kwargs,
            )

    def annotate_proteins_with_hmmscan(
        self, protein_id_list, hmm_filepath, cpus=None, **kwargs
    ):
        """Run hmmscan on Protein objects

        Args:
            protein_id_list (list): list of protein hash ids to run hmmscan on
            hmm_filepath (str): path to file of HMMs
            cpus (int): number of parallel processes to spawn

        """
        log.info(f"Annotating {len(protein_id_list)} proteins with HMMER's hmmscan")
        if cpus is None:
            if cpu_count() > 2:
                cpus = cpu_count() - 1
            else:
                cpus = 1
        # create a list of proteins as FASTA
        temp1 = [
            self.proteins[i].fasta_string_defline_uid
            for i in protein_id_list
            if self.proteins[i].sequence
        ]
        if not temp1:
            log.info("None of the input sequences contain an amino acid sequence")
            return
        with tempfile.NamedTemporaryFile() as temp_path:
            hmmer_temp = HMMER(hmm_filepath=hmm_filepath)
            hmmer_temp.hmmscan(
                fasta_path="-",
                input="\n".join(temp1).encode(),
                domtblout_path=temp_path.name,
                overwrite=True,
                cpus=cpus,
                **kwargs,
            )
            # parse the resulting domtblout file, saving results to the class proteins/domains
            self.parse_hmmout(temp_path.name, hmmsearch_or_hmmscan="hmmscan")

    def get_proteins_from_neo4j(self):
        try:
            ncount = 0
            with GraphDriver() as db:
                for i in db.run(
                    """
                    MATCH (p1:protein)
                    RETURN p1.uid as uid
                    """,
                ):
                    _ = self.add_protein(uid=i["uid"])
                    ncount += 1
            log.info(f"Retrieved {ncount} proteins from Neo4j")
        except Exception:
            log.debug("Error trying to retrieve proteins from Neo4j")

    def add_sequences_from_neo4j(self):
        try:
            with GraphDriver() as db:
                for i in db.run(
                    """
                    MATCH (p1:protein)
                    WHERE p1.uid in $uid
                    RETURN p1.uid as uid, p1.sequence as sequence
                    """,
                    uid=[i.uid for i in self.protein_iter],
                ):
                    if i["uid"] in self.proteins:
                        self.proteins[i["uid"]].sequence = i["sequence"]
        except Exception:
            log.debug(f"Error trying to retrieve domains for {self.uid}")

    def hydrate_from_proteins(self):
        """Given a SocialGene object with proteins, retrieve from a running Neo4j database all locus and asssembly info for those proteins"""
        for result in Neo4jQuery.query_neo4j(
            cypher_name="retrieve_protein_locations",
            param=list(self.proteins.keys()),
        ):
            self.add_assembly(uid=result["assembly"], parent=self)
            for locus in result["loci"]:
                _ = self.assemblies[result["assembly"]].add_locus(
                    external_id=locus["locus"]
                )
                for feature in locus["features"]:
                    _ = (
                        self.assemblies[result["assembly"]]
                        .loci[locus["locus"]]
                        .add_feature(
                            type="protein",
                            uid=feature["external_id"],
                            start=feature["locus_start"],
                            end=feature["locus_end"],
                            strand=feature["strand"],
                        )
                    )

    def fill_given_locus_range(self, locus_uid, start, end):
        """Given a locus uid that's in the database, pull asssembly, locus, protein info for those proteins"""

        with GraphDriver() as db:
            assembly_uid = db.run(
                """
                MATCH (a1:assembly)<-[:ASSEMBLES_TO]-(n1:nucleotide {uid: $locus_uid})
                RETURN a1.uid as a_uid
            """,
                locus_uid=locus_uid,
            ).single()
        if not assembly_uid:
            raise ValueError("No assembly found in database")
        else:
            assembly_uid = assembly_uid.value()
        self.add_assembly(uid=assembly_uid, parent=self)
        with GraphDriver() as db:
            res = db.run(
                """
                MATCH (n1:nucleotide {uid: $locus_uid})
                RETURN n1 as nucleotide_node
            """,
                locus_uid=locus_uid,
            ).single()
        if not res:
            raise ValueError(f"{locus_uid} not found in database")
        external_id = res.value().get("external_id")
        self.assemblies[assembly_uid].add_locus(external_id=external_id)
        self.assemblies[assembly_uid].loci[external_id].metadata.update(
            dict(res.value())
        )

        with GraphDriver() as db:
            res = db.run(
                """
                MATCH (n1:nucleotide {uid: $locus_uid})-[e1:ENCODES]->(p1:protein)
                WHERE e1.start >= $start AND e1.start <= $end
                RETURN e1, p1
            """,
                locus_uid=locus_uid,
                start=start,
                end=end,
            )
            for feature in res:
                _ = self.add_protein(uid=feature.value().end_node["uid"])
                self.assemblies[assembly_uid].loci[external_id].add_feature(
                    type="protein",
                    uid=feature.value().end_node["uid"],
                    **feature.value(),
                )

        return {"assembly": assembly_uid, "locus": external_id}

    def _drop_all_cross_origin(self):
        # Drop likely cross-origin proteins (proteins that are longer than 100k bp)
        # have to remove from both gene cluster and locus
        for ak, av in self.assemblies.items():
            for nk, nv in av.loci.items():
                self.assemblies[ak].loci[nk].features = {
                    i for i in nv.features if abs(i.end - i.start) < 100000
                }
                for gene_cluster in self.assemblies[ak].loci[nk].gene_clusters:
                    gene_cluster.features = {
                        i
                        for i in gene_cluster.features
                        if abs(i.end - i.start) < 100000
                    }

    def hydrate_protein_info(self):
        """Pull name (original identifier) and description of proteins from Neo4j"""
        for result in Neo4jQuery.query_neo4j(
            cypher_name="get_protein_info",
            param=list(self.proteins.keys()),
        ):
            self.proteins[result["external_id"]].external_id = result["name"]
            self.proteins[result["external_id"]].description = result["description"]

    ########################################
    # File Outputs
    ########################################

    def export_all_domains_as_tsv(self, outpath, **kwargs):
        """
        The function exports all domains as a TSV file, sorted by protein ID and mean envelope position.

        Args:
          outpath: The `outpath` parameter is the path to the output file where the TSV (Tab-Separated
        Values) data will be written. It specifies the location and name of the file.
        """
        _domain_counter = 0
        with open_write(outpath, **kwargs) as f:
            tsv_writer = csv.writer(f, delimiter="\t")
            # sort to standardize the write order
            ordered_prot_ids = list(self.proteins.keys())
            ordered_prot_ids.sort()
            for prot_id in ordered_prot_ids:
                # sort to standardize the write order
                for domain in self.proteins[
                    prot_id
                ].domain_list_sorted_by_mean_envelope_position:
                    _domain_counter += 1
                    _temp = [prot_id]
                    _temp.extend(list(domain.all_attributes().values()))
                    tsv_writer.writerow(_temp)
        log.info(f"Wrote {str(_domain_counter)} domains to {outpath}")

    def ferment_pickle(self, outpath):
        """
        The function `ferment_pickle` saves a SocialGene object to a Python pickle file.

        Args:
          outpath: The `outpath` parameter is a string that represents the path where the pickled object will be saved. It should include the file name and extension. For example, "/path/to/save/object.pickle".
        """
        with open(outpath, "wb") as handle:
            pickle.dump(self, handle, protocol=pickle.HIGHEST_PROTOCOL)

    @staticmethod
    def eat_pickle(inpath):
        """
        The `eat_pickle` function reads a saved SocialGene pickle file from the given path and returns a SocialGene object.

        Args:
          inpath: The `inpath` parameter is a string that represents the path to the file from which we
        want to load the pickled object.

        Returns:
          SocialGene object
        """
        with open(inpath, "rb") as handle:
            return pickle.load(handle)

    def filter_proteins(self, hash_list: List):
        """Filter proteins by list of hash ids

        Args:
            hash_list (List): List of protein hash identifiers

        Returns:
            Generator: generator returning tuple of length two -> Generator[(str, 'Protein'])
        """
        return ((k, v) for k, v in self.proteins.items() if k in hash_list)

    @property
    def fasta_string_defline_uid(self):
        for v in self.proteins.values():
            yield f">{v.uid}\n{v.sequence}\n"

    @property
    def fasta_string_defline_external_id(self):
        for v in self.proteins.values():
            yield f">{v.external_id}\n{v.sequence}\n"

    def write_fasta(
        self,
        outpath,
        external_id: bool = False,
        **kwargs,
    ):
        """Write all proteins to a FASTA file

        Args:
            outpath (str): path of file that FASTA entries will be appended to
            external_id (bool, optional): Write protein identifiers as the hash (True) or the original identifier (False). Defaults to False.
            **kwargs: see print(open_write.__doc__)
        """

        with open_write(filepath=outpath, **kwargs) as handle:
            counter = 0
            if external_id:
                fasta_gen = self.fasta_string_defline_external_id
            else:
                fasta_gen = self.fasta_string_defline_uid
            for i in fasta_gen:
                counter += 1
                if "compression" in kwargs and kwargs["compression"]:
                    handle.write(i.encode())
                else:
                    handle.writelines(i)

        log.info(f"Wrote {str(counter)} proteins to {outpath}")

    def write_n_fasta(self, outdir, n_splits=1, **kwargs):
        """
        The function `write_n_fasta` exports protein sequences split into multiple fasta files.

        Args:
          outdir: The `outdir` parameter is a string that specifies the directory where the fasta files
        will be saved.
          n_splits: The `n_splits` parameter in the `write_n_fasta` function determines the number of
        fasta files the protein sequences will be split into. By default, it is set to 1, meaning all
        the protein sequences will be written into a single fasta file. If you specify a value greater
        than. Defaults to 1
        """

        # this can be done with itertools.batched in python 3.12
        def split(a, n):
            # https://stackoverflow.com/a/2135920
            k, m = divmod(len(a), n)
            return (
                a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)
            )

        protein_list = split(
            [value for key, value in sorted(self.proteins.items(), reverse=False)],
            n_splits,
        )
        counter = 1
        for protein_group in protein_list:
            with open_write(
                Path(outdir, f"fasta_split_{counter}.faa"), **kwargs
            ) as handle:
                for i in protein_group:
                    handle.writelines(f">{i.uid}\n{i.sequence}\n")
            counter += 1

    def _merge_proteins(self, sg_object):
        """
        The function merges the proteins from another SOcialGene object into the current object.

        Args:
          sg_object: The `sg_object` parameter is an object of the same class as the current object. It
        represents another object that contains a collection of proteins.
        """
        self.proteins.update(sg_object.proteins)

    def _merge_assemblies(self, sg_object):
        """
        The function `_merge_assemblies` merges assemblies and loci from `sg_object` into `self` if they
        do not already exist, and adds protein features to loci if they already exist.

        Args:
          sg_object: The `sg_object` parameter is an object of the same class as the current object.
        """
        for assembly_k, assembly_v in sg_object.assemblies.items():
            if assembly_k not in self.assemblies:
                self.assemblies[assembly_k] = assembly_v
            else:
                for locus_k, locus_v in assembly_v.loci.items():
                    if locus_k not in self.assemblies[assembly_k].loci:
                        self.assemblies[assembly_k].loci[locus_k] = locus_v
                    else:
                        for feature in (
                            sg_object.assemblies[assembly_k].loci[locus_k].features
                        ):
                            self.assemblies[assembly_k].loci[locus_k].add_feature(
                                type="protein",
                                uid=feature.uid,
                                start=feature.start,
                                end=feature.end,
                                strand=feature.strand,
                            )

    # def deep_merge_with_sg1(sg_object_1, sg_object_2):
    def __add__(self, sg_object):
        """
        The function merges proteins and assemblies from two objects and appends protein comparison data
        to a list or dataframe.

        Args:
          sg_object: The `sg_object` parameter is an object of the same class as the current object. It
        represents another instance of the class that you want to add to the current instance.
        """
        self._merge_proteins(sg_object)
        self._merge_assemblies(sg_object)
        self.protein_comparison.extend(sg_object.protein_comparison)

    def write_genbank(self, outpath, compression=None):
        for assembly in self.assemblies.values():
            for locus in assembly.loci.values():
                record = SeqRecord(
                    Seq(""),
                    id=locus.external_id,
                    name=locus.external_id,
                    description="A GenBank file generated by SocialGene.",
                    dbxrefs=[f"Assembly:{locus.parent.uid}"],
                )
                # Add annotation
                for feature in locus.features_sorted_by_midpoint:
                    biofeat = SeqFeature(
                        FeatureLocation(
                            start=feature.start - 1,
                            end=feature.end,
                            strand=feature.strand,
                        ),
                        type=feature.type,
                        qualifiers={
                            k: v
                            for k, v in feature.all_attributes().items()
                            if v and k not in ["parent", "protein"]
                        }
                        | {"translation": self.proteins[feature.uid].sequence},
                    )
                    record.features.append(biofeat)
                record.annotations["molecule_type"] = "DNA"
                with open_write(outpath, "a", compression) as h:
                    SeqIO.write(
                        record,
                        h,
                        "genbank",
                    )

    def drop_all_non_protein_features(self, **kwargs):
        """Drop features from all assembly/loci that aren't proteins/pseudo-proteins

        Returns:
            list: list of features that were removed (if return_removed=True)
        """
        for a_v in self.assemblies.values():
            for l_v in a_v.loci.values():
                l_v.drop_non_protein_features()

    ########################################
    # OUTPUTS FOR NEXTFLOW
    ########################################

    def write_table(
        self,
        outdir: str,
        tablename: str,
        filename: str = None,
        include_sequences=False,
        **kwargs,
    ):
        """
        The function writes a table to a specified output directory in TSV format, for import into Neo4j.

        Args:
          outdir (str): The `outdir` parameter is a string that specifies the directory where the output
        file will be saved.
          tablename (str): The `tablename` parameter is a string that specifies the name of the table to
        be written.
          filename (str): The `filename` parameter is an optional argument that specifies the name of
        the file to be written. If no `filename` is provided, the `tablename` will be used as the
        filename.
        """
        if not filename:
            filename = tablename
        outpath = Path(outdir, filename)
        with open_write(outpath, **kwargs) as handle:
            tsv_writer = csv.writer(
                handle, delimiter="\t", quotechar='"', quoting=csv.QUOTE_MINIMAL
            )
            for i in getattr(self, tablename)(include_sequences=include_sequences):
                tsv_writer.writerow(i)

    def table_protein_to_go(self, **kwargs):
        """
        The function `table_protein_to_go` iterates through the assemblies, loci, and features of a
        given object, and yields tuples containing the protein hash and GO term for each feature that
        has GO terms, for import into Neo4j.
        """
        for av in self.assemblies.values():
            for v in av.loci.values():
                for feature in v.features:
                    # not all features will have goterms so check here
                    if feature.goterms:
                        for goterm in feature.goterms:
                            yield (
                                feature.uid,
                                goterm.removeprefix("GO:").strip(),
                            )

    def table_locus_to_protein(self, **kwargs):
        """
        The function `table_locus_to_protein` generates a table of protein information from each locus, for import into Neo4j.
        """
        for ak, av in self.assemblies.items():
            for k, loci in av.loci.items():
                temp_list = list(loci.features)
                # sort features by id then start to maintain consistent output
                temp_list.sort(key=attrgetter("uid"))
                temp_list.sort(key=attrgetter("start"))
                for feature in temp_list:
                    if feature.feature_is_protein():
                        yield (
                            feature.parent.uid,
                            feature.uid,
                            feature.external_id,
                            feature.locus_tag,
                            feature.start,
                            feature.end,
                            feature.strand,
                            feature.description,
                            feature.partial_on_complete_genome,
                            feature.missing_start,
                            feature.missing_stop,
                            feature.internal_stop,
                            feature.partial_in_the_middle_of_a_contig,
                            feature.missing_N_terminus,
                            feature.missing_C_terminus,
                            feature.frameshifted,
                            feature.too_short_partial_abutting_assembly_gap,
                            feature.incomplete,
                        )

    def table_protein_ids(self, include_sequences=False, **kwargs):
        """Protein hash id table for import into Neo4j"""
        for protein in self.proteins.values():
            if include_sequences:
                yield (protein.uid, protein.crc64, protein.sequence)
            else:
                yield (protein.uid, protein.crc64)

    def table_assembly_to_locus(self, **kwargs):
        """Assembly to locus table for import into Neo4j

        Args:
            outdir (str, optional): Defaults to ".".
        """
        for ak, av in self.assemblies.items():
            for k, v in av.loci.items():
                #  ["assembly", "internal_locus_id"]
                yield (ak, v.uid)

    def table_loci(self, **kwargs):
        """
        Generate a table of loci information.

        Yields:
            tuple: (internal_locus_id, external_locus_id, [info])
        """
        for _av in self.assemblies.values():
            for locus in _av.loci.values():
                yield tuple(
                    [locus.uid]
                    + [locus.external_id]
                    + list(locus.metadata.all_attributes().values())
                )

    def table_assembly(self, **kwargs):
        """Assembly table for import into Neo4j

        Args:
            outdir (str, optional): Defaults to ".".
        """
        for assembly in self.assemblies.values():
            yield tuple(
                [assembly.uid] + list(assembly.metadata.all_attributes().values())
            )

    def table_assembly_to_taxid(self, **kwargs):
        """Assembly table for import into Neo4j

        Args:
            outdir (str, optional): Defaults to ".".
        """
        for k, v in self.assemblies.items():
            if v.taxid:
                yield (k, v.taxid)
__add__(sg_object)

The function merges proteins and assemblies from two objects and appends protein comparison data to a list or dataframe.

Parameters:

Name Type Description Default
sg_object

The sg_object parameter is an object of the same class as the current object. It

required

represents another instance of the class that you want to add to the current instance.

Source code in socialgene/base/socialgene.py
534
535
536
537
538
539
540
541
542
543
544
545
def __add__(self, sg_object):
    """
    The function merges proteins and assemblies from two objects and appends protein comparison data
    to a list or dataframe.

    Args:
      sg_object: The `sg_object` parameter is an object of the same class as the current object. It
    represents another instance of the class that you want to add to the current instance.
    """
    self._merge_proteins(sg_object)
    self._merge_assemblies(sg_object)
    self.protein_comparison.extend(sg_object.protein_comparison)
annotate(use_neo4j_precalc=False, neo4j_chunk_size=1000, **kwargs)

The annotate function is a convenience function that retrieves HMMER annotation for all proteins in a Socialgene object, either from a Neo4j database or by using HMMER directly.

Parameters:

Name Type Description Default
use_neo4j_precalc bool

A boolean flag indicating whether to use precalculated domain

False

information from Neo4j for annotation. If set to True, the domains info will be retrieved from Neo4j, and proteins not found in Neo4j will be analyzed with HMMER. If set to False, all proteins will be analyzed with HMMER. Defaults to False neo4j_chunk_size (int): The neo4j_chunk_size parameter determines the number of proteins that will be sent to Neo4j at a time for annotation. Defaults to 1000

Source code in socialgene/base/socialgene.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def annotate(
    self, use_neo4j_precalc: bool = False, neo4j_chunk_size: int = 1000, **kwargs
):
    """
    The `annotate` function is a convenience function that retrieves HMMER annotation for all
    proteins in a Socialgene object, either from a Neo4j database or by using HMMER directly.

    Args:
      use_neo4j_precalc (bool): A boolean flag indicating whether to use precalculated domain
    information from Neo4j for annotation. If set to True, the domains info will be retrieved from
    Neo4j, and proteins not found in Neo4j will be analyzed with HMMER. If set to False, all
    proteins will be analyzed with HMMER. Defaults to False
      neo4j_chunk_size (int): The `neo4j_chunk_size` parameter determines the number of proteins
    that will be sent to Neo4j at a time for annotation. Defaults to 1000
    """
    if use_neo4j_precalc:
        db_res = self.annotate_proteins_with_neo4j(
            annotate_all=True,
            chunk_size=neo4j_chunk_size,
        )
        n_not_in_db = [k for k, v in db_res.items() if not v]
    else:
        n_not_in_db = list(self.proteins.keys())
    if n_not_in_db:
        self.annotate_proteins_with_hmmscan(
            protein_id_list=n_not_in_db,
            **kwargs,
        )
annotate_proteins_with_hmmscan(protein_id_list, hmm_filepath, cpus=None, **kwargs)

Run hmmscan on Protein objects

Parameters:

Name Type Description Default
protein_id_list list

list of protein hash ids to run hmmscan on

required
hmm_filepath str

path to file of HMMs

required
cpus int

number of parallel processes to spawn

None
Source code in socialgene/base/socialgene.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def annotate_proteins_with_hmmscan(
    self, protein_id_list, hmm_filepath, cpus=None, **kwargs
):
    """Run hmmscan on Protein objects

    Args:
        protein_id_list (list): list of protein hash ids to run hmmscan on
        hmm_filepath (str): path to file of HMMs
        cpus (int): number of parallel processes to spawn

    """
    log.info(f"Annotating {len(protein_id_list)} proteins with HMMER's hmmscan")
    if cpus is None:
        if cpu_count() > 2:
            cpus = cpu_count() - 1
        else:
            cpus = 1
    # create a list of proteins as FASTA
    temp1 = [
        self.proteins[i].fasta_string_defline_uid
        for i in protein_id_list
        if self.proteins[i].sequence
    ]
    if not temp1:
        log.info("None of the input sequences contain an amino acid sequence")
        return
    with tempfile.NamedTemporaryFile() as temp_path:
        hmmer_temp = HMMER(hmm_filepath=hmm_filepath)
        hmmer_temp.hmmscan(
            fasta_path="-",
            input="\n".join(temp1).encode(),
            domtblout_path=temp_path.name,
            overwrite=True,
            cpus=cpus,
            **kwargs,
        )
        # parse the resulting domtblout file, saving results to the class proteins/domains
        self.parse_hmmout(temp_path.name, hmmsearch_or_hmmscan="hmmscan")
annotate_proteins_with_neo4j(protein_uids=None, chunk_size=1000, annotate_all=False, progress=False)

The function annotate_proteins_with_neo4j takes a list of protein hash IDs, queries a database for matching proteins, and retrieves their HMM annotations.

Parameters:

Name Type Description Default
protein_uids List[str]

A list of protein hash IDs. These are unique identifiers for proteins in the database that you want to annotate.

None
chunk_size int

The chunk_size parameter determines the number of proteins to query at a time. Proteins are divided into chunks to improve efficiency when querying the database. Defaults to 1000

1000
annotate_all bool

The annotate_all parameter is a boolean flag that determines whether to annotate all proteins in the database or not. If set to True, all proteins in the database will be annotated. If set to False, you need to provide a list of protein hash IDs in the `protein_uids. Defaults to False

False
progress bool

The progress parameter is a boolean flag that determines whether or not to display a progress bar during the execution of the function. If progress is set to True, a progress bar will be displayed to track the progress of the function. If progress is set to False. Defaults to False

False
Source code in socialgene/base/socialgene.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def annotate_proteins_with_neo4j(
    self,
    protein_uids: List[str] = None,
    chunk_size: int = 1000,
    annotate_all: bool = False,
    progress: bool = False,
):
    """
    The function `annotate_proteins_with_neo4j` takes a list of protein hash IDs, queries a database
    for matching proteins, and retrieves their HMM annotations.

    Args:
      protein_uids (List[str]): A list of protein hash IDs. These are unique identifiers for proteins in the database that you want to annotate.
      chunk_size (int): The `chunk_size` parameter determines the number of proteins to query at a time. Proteins are divided into chunks to improve efficiency when querying the database.  Defaults to 1000
      annotate_all (bool): The `annotate_all` parameter is a boolean flag that determines whether to annotate all proteins in the database or not. If set to `True`, all proteins in the database will be annotated. If set to `False`, you need to provide a list of protein hash IDs in the `protein_uids. Defaults to False
      progress (bool): The `progress` parameter is a boolean flag that determines whether or not to display a progress bar during the execution of the function. If `progress` is set to `True`, a progress bar will be displayed to track the progress of the function. If `progress` is set to `False`. Defaults to False

    """
    if protein_uids is None and annotate_all:
        protein_uids = self.get_all_feature_uids()
    elif isinstance(protein_uids, str):
        protein_uids = [protein_uids]
    log.info(
        f"Searching database for HMM annotations of {len(protein_uids)} proteins."
    )
    search_result = search_protein_hash(protein_uids)
    if not any([i for i in search_result.values()]):
        log.info("No identical proteins found in the database.")
    else:
        # log.info(
        #     f"{len([i for i in search_result.values() if i])} of {len(protein_uids)} searched proteins were found in the database, pulling their HMM annotations into python..."
        # )
        # get the number of chunks needed to to query "chunk_size" proteins at a time
        prot_len = len(protein_uids)
        n_chunks = prot_len // chunk_size
        if n_chunks == 0:
            n_chunks = 1
        if n_chunks > (len(protein_uids) - 1):
            chunked_list = chunk_a_list_with_numpy(
                input_list=protein_uids, n_chunks=n_chunks
            )
        else:
            chunked_list = [protein_uids]
        del protein_uids
        if progress:
            with Progress(transient=True) as pg:
                task = pg.add_task("Progress...", total=n_chunks)
                for protein_id_list in chunked_list:
                    self.get_protein_domains_from_db(
                        protein_id_list=protein_id_list
                    )
                    pg.update(task, advance=1)
        else:
            for protein_id_list in chunked_list:
                self.get_protein_domains_from_db(protein_id_list=protein_id_list)
    return search_result
drop_all_non_protein_features(**kwargs)

Drop features from all assembly/loci that aren't proteins/pseudo-proteins

Returns:

Name Type Description
list

list of features that were removed (if return_removed=True)

Source code in socialgene/base/socialgene.py
582
583
584
585
586
587
588
589
590
def drop_all_non_protein_features(self, **kwargs):
    """Drop features from all assembly/loci that aren't proteins/pseudo-proteins

    Returns:
        list: list of features that were removed (if return_removed=True)
    """
    for a_v in self.assemblies.values():
        for l_v in a_v.loci.values():
            l_v.drop_non_protein_features()
eat_pickle(inpath) staticmethod

The eat_pickle function reads a saved SocialGene pickle file from the given path and returns a SocialGene object.

Parameters:

Name Type Description Default
inpath

The inpath parameter is a string that represents the path to the file from which we

required

want to load the pickled object.

Returns:

Type Description

SocialGene object

Source code in socialgene/base/socialgene.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
@staticmethod
def eat_pickle(inpath):
    """
    The `eat_pickle` function reads a saved SocialGene pickle file from the given path and returns a SocialGene object.

    Args:
      inpath: The `inpath` parameter is a string that represents the path to the file from which we
    want to load the pickled object.

    Returns:
      SocialGene object
    """
    with open(inpath, "rb") as handle:
        return pickle.load(handle)
export_all_domains_as_tsv(outpath, **kwargs)

The function exports all domains as a TSV file, sorted by protein ID and mean envelope position.

Parameters:

Name Type Description Default
outpath

The outpath parameter is the path to the output file where the TSV (Tab-Separated

required

Values) data will be written. It specifies the location and name of the file.

Source code in socialgene/base/socialgene.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def export_all_domains_as_tsv(self, outpath, **kwargs):
    """
    The function exports all domains as a TSV file, sorted by protein ID and mean envelope position.

    Args:
      outpath: The `outpath` parameter is the path to the output file where the TSV (Tab-Separated
    Values) data will be written. It specifies the location and name of the file.
    """
    _domain_counter = 0
    with open_write(outpath, **kwargs) as f:
        tsv_writer = csv.writer(f, delimiter="\t")
        # sort to standardize the write order
        ordered_prot_ids = list(self.proteins.keys())
        ordered_prot_ids.sort()
        for prot_id in ordered_prot_ids:
            # sort to standardize the write order
            for domain in self.proteins[
                prot_id
            ].domain_list_sorted_by_mean_envelope_position:
                _domain_counter += 1
                _temp = [prot_id]
                _temp.extend(list(domain.all_attributes().values()))
                tsv_writer.writerow(_temp)
    log.info(f"Wrote {str(_domain_counter)} domains to {outpath}")
ferment_pickle(outpath)

The function ferment_pickle saves a SocialGene object to a Python pickle file.

Parameters:

Name Type Description Default
outpath

The outpath parameter is a string that represents the path where the pickled object will be saved. It should include the file name and extension. For example, "/path/to/save/object.pickle".

required
Source code in socialgene/base/socialgene.py
387
388
389
390
391
392
393
394
395
def ferment_pickle(self, outpath):
    """
    The function `ferment_pickle` saves a SocialGene object to a Python pickle file.

    Args:
      outpath: The `outpath` parameter is a string that represents the path where the pickled object will be saved. It should include the file name and extension. For example, "/path/to/save/object.pickle".
    """
    with open(outpath, "wb") as handle:
        pickle.dump(self, handle, protocol=pickle.HIGHEST_PROTOCOL)
fill_given_locus_range(locus_uid, start, end)

Given a locus uid that's in the database, pull asssembly, locus, protein info for those proteins

Source code in socialgene/base/socialgene.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def fill_given_locus_range(self, locus_uid, start, end):
    """Given a locus uid that's in the database, pull asssembly, locus, protein info for those proteins"""

    with GraphDriver() as db:
        assembly_uid = db.run(
            """
            MATCH (a1:assembly)<-[:ASSEMBLES_TO]-(n1:nucleotide {uid: $locus_uid})
            RETURN a1.uid as a_uid
        """,
            locus_uid=locus_uid,
        ).single()
    if not assembly_uid:
        raise ValueError("No assembly found in database")
    else:
        assembly_uid = assembly_uid.value()
    self.add_assembly(uid=assembly_uid, parent=self)
    with GraphDriver() as db:
        res = db.run(
            """
            MATCH (n1:nucleotide {uid: $locus_uid})
            RETURN n1 as nucleotide_node
        """,
            locus_uid=locus_uid,
        ).single()
    if not res:
        raise ValueError(f"{locus_uid} not found in database")
    external_id = res.value().get("external_id")
    self.assemblies[assembly_uid].add_locus(external_id=external_id)
    self.assemblies[assembly_uid].loci[external_id].metadata.update(
        dict(res.value())
    )

    with GraphDriver() as db:
        res = db.run(
            """
            MATCH (n1:nucleotide {uid: $locus_uid})-[e1:ENCODES]->(p1:protein)
            WHERE e1.start >= $start AND e1.start <= $end
            RETURN e1, p1
        """,
            locus_uid=locus_uid,
            start=start,
            end=end,
        )
        for feature in res:
            _ = self.add_protein(uid=feature.value().end_node["uid"])
            self.assemblies[assembly_uid].loci[external_id].add_feature(
                type="protein",
                uid=feature.value().end_node["uid"],
                **feature.value(),
            )

    return {"assembly": assembly_uid, "locus": external_id}
filter_proteins(hash_list)

Filter proteins by list of hash ids

Parameters:

Name Type Description Default
hash_list List

List of protein hash identifiers

required

Returns:

Name Type Description
Generator

generator returning tuple of length two -> Generator[(str, 'Protein'])

Source code in socialgene/base/socialgene.py
412
413
414
415
416
417
418
419
420
421
def filter_proteins(self, hash_list: List):
    """Filter proteins by list of hash ids

    Args:
        hash_list (List): List of protein hash identifiers

    Returns:
        Generator: generator returning tuple of length two -> Generator[(str, 'Protein'])
    """
    return ((k, v) for k, v in self.proteins.items() if k in hash_list)
hydrate_from_proteins()

Given a SocialGene object with proteins, retrieve from a running Neo4j database all locus and asssembly info for those proteins

Source code in socialgene/base/socialgene.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def hydrate_from_proteins(self):
    """Given a SocialGene object with proteins, retrieve from a running Neo4j database all locus and asssembly info for those proteins"""
    for result in Neo4jQuery.query_neo4j(
        cypher_name="retrieve_protein_locations",
        param=list(self.proteins.keys()),
    ):
        self.add_assembly(uid=result["assembly"], parent=self)
        for locus in result["loci"]:
            _ = self.assemblies[result["assembly"]].add_locus(
                external_id=locus["locus"]
            )
            for feature in locus["features"]:
                _ = (
                    self.assemblies[result["assembly"]]
                    .loci[locus["locus"]]
                    .add_feature(
                        type="protein",
                        uid=feature["external_id"],
                        start=feature["locus_start"],
                        end=feature["locus_end"],
                        strand=feature["strand"],
                    )
                )
hydrate_protein_info()

Pull name (original identifier) and description of proteins from Neo4j

Source code in socialgene/base/socialgene.py
349
350
351
352
353
354
355
356
def hydrate_protein_info(self):
    """Pull name (original identifier) and description of proteins from Neo4j"""
    for result in Neo4jQuery.query_neo4j(
        cypher_name="get_protein_info",
        param=list(self.proteins.keys()),
    ):
        self.proteins[result["external_id"]].external_id = result["name"]
        self.proteins[result["external_id"]].description = result["description"]
table_assembly(**kwargs)

Assembly table for import into Neo4j

Parameters:

Name Type Description Default
outdir str

Defaults to ".".

required
Source code in socialgene/base/socialgene.py
710
711
712
713
714
715
716
717
718
719
def table_assembly(self, **kwargs):
    """Assembly table for import into Neo4j

    Args:
        outdir (str, optional): Defaults to ".".
    """
    for assembly in self.assemblies.values():
        yield tuple(
            [assembly.uid] + list(assembly.metadata.all_attributes().values())
        )
table_assembly_to_locus(**kwargs)

Assembly to locus table for import into Neo4j

Parameters:

Name Type Description Default
outdir str

Defaults to ".".

required
Source code in socialgene/base/socialgene.py
684
685
686
687
688
689
690
691
692
693
def table_assembly_to_locus(self, **kwargs):
    """Assembly to locus table for import into Neo4j

    Args:
        outdir (str, optional): Defaults to ".".
    """
    for ak, av in self.assemblies.items():
        for k, v in av.loci.items():
            #  ["assembly", "internal_locus_id"]
            yield (ak, v.uid)
table_assembly_to_taxid(**kwargs)

Assembly table for import into Neo4j

Parameters:

Name Type Description Default
outdir str

Defaults to ".".

required
Source code in socialgene/base/socialgene.py
721
722
723
724
725
726
727
728
729
def table_assembly_to_taxid(self, **kwargs):
    """Assembly table for import into Neo4j

    Args:
        outdir (str, optional): Defaults to ".".
    """
    for k, v in self.assemblies.items():
        if v.taxid:
            yield (k, v.taxid)
table_loci(**kwargs)

Generate a table of loci information.

Yields:

Name Type Description
tuple

(internal_locus_id, external_locus_id, [info])

Source code in socialgene/base/socialgene.py
695
696
697
698
699
700
701
702
703
704
705
706
707
708
def table_loci(self, **kwargs):
    """
    Generate a table of loci information.

    Yields:
        tuple: (internal_locus_id, external_locus_id, [info])
    """
    for _av in self.assemblies.values():
        for locus in _av.loci.values():
            yield tuple(
                [locus.uid]
                + [locus.external_id]
                + list(locus.metadata.all_attributes().values())
            )
table_locus_to_protein(**kwargs)

The function table_locus_to_protein generates a table of protein information from each locus, for import into Neo4j.

Source code in socialgene/base/socialgene.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
def table_locus_to_protein(self, **kwargs):
    """
    The function `table_locus_to_protein` generates a table of protein information from each locus, for import into Neo4j.
    """
    for ak, av in self.assemblies.items():
        for k, loci in av.loci.items():
            temp_list = list(loci.features)
            # sort features by id then start to maintain consistent output
            temp_list.sort(key=attrgetter("uid"))
            temp_list.sort(key=attrgetter("start"))
            for feature in temp_list:
                if feature.feature_is_protein():
                    yield (
                        feature.parent.uid,
                        feature.uid,
                        feature.external_id,
                        feature.locus_tag,
                        feature.start,
                        feature.end,
                        feature.strand,
                        feature.description,
                        feature.partial_on_complete_genome,
                        feature.missing_start,
                        feature.missing_stop,
                        feature.internal_stop,
                        feature.partial_in_the_middle_of_a_contig,
                        feature.missing_N_terminus,
                        feature.missing_C_terminus,
                        feature.frameshifted,
                        feature.too_short_partial_abutting_assembly_gap,
                        feature.incomplete,
                    )
table_protein_ids(include_sequences=False, **kwargs)

Protein hash id table for import into Neo4j

Source code in socialgene/base/socialgene.py
676
677
678
679
680
681
682
def table_protein_ids(self, include_sequences=False, **kwargs):
    """Protein hash id table for import into Neo4j"""
    for protein in self.proteins.values():
        if include_sequences:
            yield (protein.uid, protein.crc64, protein.sequence)
        else:
            yield (protein.uid, protein.crc64)
table_protein_to_go(**kwargs)

The function table_protein_to_go iterates through the assemblies, loci, and features of a given object, and yields tuples containing the protein hash and GO term for each feature that has GO terms, for import into Neo4j.

Source code in socialgene/base/socialgene.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
def table_protein_to_go(self, **kwargs):
    """
    The function `table_protein_to_go` iterates through the assemblies, loci, and features of a
    given object, and yields tuples containing the protein hash and GO term for each feature that
    has GO terms, for import into Neo4j.
    """
    for av in self.assemblies.values():
        for v in av.loci.values():
            for feature in v.features:
                # not all features will have goterms so check here
                if feature.goterms:
                    for goterm in feature.goterms:
                        yield (
                            feature.uid,
                            goterm.removeprefix("GO:").strip(),
                        )
write_fasta(outpath, external_id=False, **kwargs)

Write all proteins to a FASTA file

Parameters:

Name Type Description Default
outpath str

path of file that FASTA entries will be appended to

required
external_id bool

Write protein identifiers as the hash (True) or the original identifier (False). Defaults to False.

False
**kwargs

see print(open_write.doc)

{}
Source code in socialgene/base/socialgene.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def write_fasta(
    self,
    outpath,
    external_id: bool = False,
    **kwargs,
):
    """Write all proteins to a FASTA file

    Args:
        outpath (str): path of file that FASTA entries will be appended to
        external_id (bool, optional): Write protein identifiers as the hash (True) or the original identifier (False). Defaults to False.
        **kwargs: see print(open_write.__doc__)
    """

    with open_write(filepath=outpath, **kwargs) as handle:
        counter = 0
        if external_id:
            fasta_gen = self.fasta_string_defline_external_id
        else:
            fasta_gen = self.fasta_string_defline_uid
        for i in fasta_gen:
            counter += 1
            if "compression" in kwargs and kwargs["compression"]:
                handle.write(i.encode())
            else:
                handle.writelines(i)

    log.info(f"Wrote {str(counter)} proteins to {outpath}")
write_n_fasta(outdir, n_splits=1, **kwargs)

The function write_n_fasta exports protein sequences split into multiple fasta files.

Parameters:

Name Type Description Default
outdir

The outdir parameter is a string that specifies the directory where the fasta files

required

will be saved. n_splits: The n_splits parameter in the write_n_fasta function determines the number of fasta files the protein sequences will be split into. By default, it is set to 1, meaning all the protein sequences will be written into a single fasta file. If you specify a value greater than. Defaults to 1

Source code in socialgene/base/socialgene.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def write_n_fasta(self, outdir, n_splits=1, **kwargs):
    """
    The function `write_n_fasta` exports protein sequences split into multiple fasta files.

    Args:
      outdir: The `outdir` parameter is a string that specifies the directory where the fasta files
    will be saved.
      n_splits: The `n_splits` parameter in the `write_n_fasta` function determines the number of
    fasta files the protein sequences will be split into. By default, it is set to 1, meaning all
    the protein sequences will be written into a single fasta file. If you specify a value greater
    than. Defaults to 1
    """

    # this can be done with itertools.batched in python 3.12
    def split(a, n):
        # https://stackoverflow.com/a/2135920
        k, m = divmod(len(a), n)
        return (
            a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)
        )

    protein_list = split(
        [value for key, value in sorted(self.proteins.items(), reverse=False)],
        n_splits,
    )
    counter = 1
    for protein_group in protein_list:
        with open_write(
            Path(outdir, f"fasta_split_{counter}.faa"), **kwargs
        ) as handle:
            for i in protein_group:
                handle.writelines(f">{i.uid}\n{i.sequence}\n")
        counter += 1
write_table(outdir, tablename, filename=None, include_sequences=False, **kwargs)

The function writes a table to a specified output directory in TSV format, for import into Neo4j.

Parameters:

Name Type Description Default
outdir str

The outdir parameter is a string that specifies the directory where the output

required

file will be saved. tablename (str): The tablename parameter is a string that specifies the name of the table to be written. filename (str): The filename parameter is an optional argument that specifies the name of the file to be written. If no filename is provided, the tablename will be used as the filename.

Source code in socialgene/base/socialgene.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def write_table(
    self,
    outdir: str,
    tablename: str,
    filename: str = None,
    include_sequences=False,
    **kwargs,
):
    """
    The function writes a table to a specified output directory in TSV format, for import into Neo4j.

    Args:
      outdir (str): The `outdir` parameter is a string that specifies the directory where the output
    file will be saved.
      tablename (str): The `tablename` parameter is a string that specifies the name of the table to
    be written.
      filename (str): The `filename` parameter is an optional argument that specifies the name of
    the file to be written. If no `filename` is provided, the `tablename` will be used as the
    filename.
    """
    if not filename:
        filename = tablename
    outpath = Path(outdir, filename)
    with open_write(outpath, **kwargs) as handle:
        tsv_writer = csv.writer(
            handle, delimiter="\t", quotechar='"', quoting=csv.QUOTE_MINIMAL
        )
        for i in getattr(self, tablename)(include_sequences=include_sequences):
            tsv_writer.writerow(i)

Assembly

Container class holding a dictionary of loci (ie genes/proteins)

Source code in socialgene/base/molbio.py
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
class Assembly:
    """Container class holding a dictionary of loci (ie genes/proteins)"""

    __slots__ = [
        "parent",
        "loci",
        "taxid",
        "metadata",
        "uid",
        "taxonomy",
        "name",
    ]

    def __init__(self, uid, parent=None):
        super().__init__()
        self.parent = parent
        self.uid = uid
        self.loci = {}
        self.taxid = None
        self.taxonomy = Taxonomy()
        self.metadata = LocusAssemblyMetadata()
        self.name = uid

    @property
    def feature_uid_set(self):
        """Return all protein hashes within assembly
        Returns:
            set: protein hashes
        """
        return set().union(*[i.feature_uid_set for i in self.loci.values()])

    def all_attributes(self):
        return {s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)}

    def add_locus(self, external_id: str = None):
        """Add a locus to an assembly object"""
        if external_id is None:
            external_id = str(uuid4())
        if external_id not in self.loci:
            self.loci[external_id] = Locus(parent=self, external_id=external_id)
        else:
            log.debug(f"{external_id} already present")

    def fill_taxonomy_from_db(self):
        try:
            with GraphDriver() as db:
                res = db.run(
                    """
                        MATCH (a1:assembly {uid: $uid})-[:IS_TAXON]->(t2:taxid)-[:TAXON_PARENT*1..]->(t1:taxid)
                        WHERE t1.rank in ["genus", "family", "order", "class", "phylum", "clade", "superkingdom"]
                        WITH  apoc.map.fromLists([t2.rank],[t2.name]) as a, apoc.map.fromLists([t1.rank],[t1.name]) as b
                        return  apoc.map.mergeList(collect(a)+collect(b)) as tax_dict
                        """,
                    uid=str(self.uid),
                ).value()
                # Dict comprehension appends '_' to the keys, to match the args in Taxonomy
                self.taxonomy = Taxonomy(**{f"{k}_": v for k, v in res[0].items()})
        except Exception:
            log.debug(f"Error trying to retrieve taxonomy for {self.uid}")

    def fill_properties(self):
        try:
            with GraphDriver() as db:
                res = db.run(
                    """
                        MATCH (a1:assembly {uid: $uid})
                        return properties(a1)
                        """,
                    uid=self.uid,
                ).value()[0]
                self.metadata.update(res)
        except Exception:
            log.debug(f"Error trying to retrieve taxonomy for {self.uid}")

    def get_locus_by_uid(self, uid):
        for k, v in self.loci.items():
            if v.uid == uid:
                return v

    @property
    def gene_clusters(self):
        for locus in self.loci.values():
            for gene_cluster in locus.gene_clusters:
                yield gene_cluster

    @property
    def fasta_string_defline_uid(self):
        for v in self.loci.values():
            yield v.fasta_string_defline_uid

    @property
    def fasta_string_defline_external_id(self):
        for v in self.loci.values():
            yield v.fasta_string_defline_external_id

    def _neo4j_node(
        self,
    ):
        n = ASSEMBLY_NODE()
        dict_to_add = {"uid": self.uid}
        dict_to_add = dict_to_add | {
            k: v
            for k, v in self.metadata.all_attributes().items()
            if k in ASSEMBLY_NODE.property_specification
        }
        n.fill_from_dict(dict_to_add)
        return n

    def add_to_neo4j(self, create=False):
        for i in self.loci.values():
            i.add_to_neo4j()
        if self.taxid:
            taxnode = TAXID_NODE()
            taxnode.fill_from_dict({"uid": self.taxid})
            IS_TAXON_REL(start=self._neo4j_node(), end=taxnode).add_to_neo4j(
                create=False
            )

feature_uid_set property

Return all protein hashes within assembly Returns: set: protein hashes

add_locus(external_id=None)

Add a locus to an assembly object

Source code in socialgene/base/molbio.py
1037
1038
1039
1040
1041
1042
1043
1044
def add_locus(self, external_id: str = None):
    """Add a locus to an assembly object"""
    if external_id is None:
        external_id = str(uuid4())
    if external_id not in self.loci:
        self.loci[external_id] = Locus(parent=self, external_id=external_id)
    else:
        log.debug(f"{external_id} already present")

Domain

Class for holding information about a domain/motif annotation

Source code in socialgene/base/molbio.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
class Domain:
    """Class for holding information about a domain/motif annotation"""

    __slots__ = [
        "hmm_id",
        "env_from",  # page 38 of the HMMER User Guide (http://eddylab.org/software/hmmer/Userguide.pdf) suggests using envelope (not hmm_from/to, ali_from/to)
        "env_to",
        "seq_pro_score",
        "evalue",
        "i_evalue",
        "domain_bias",
        "domain_score",
        "seq_pro_bias",
        "hmm_from",
        "hmm_to",
        "ali_from",
        "ali_to",
        "exponentialized",
    ]

    def __init__(
        self,
        hmm_id: str = None,
        env_from: int = None,
        env_to: int = None,
        seq_pro_score: float = None,
        evalue: float = None,
        i_evalue: float = None,
        domain_bias: float = None,
        domain_score: float = None,
        seq_pro_bias: float = None,
        hmm_from: int = None,
        hmm_to: int = None,
        ali_from: int = None,
        ali_to: int = None,
        exponentialized: bool = True,
        **kwargs,  # this kwarg isn't accessed but is here so that calling Domain with dict unpacking with extra args doesn't fail
    ):
        """
        Class for holding information about a domain/motif annotation

        Args:
          hmm_id (str): The `hmm_id` parameter is a string that represents the identifier of the hidden
        Markov model (HMM) associated with the domain.
          env_from (int): The `env_from` parameter represents the starting position of the domain in the
        target sequence.
          env_to (int): The `env_to` parameter represents the end position of the environment (sequence)
        in the domain. It is an integer value.
          seq_pro_score (float): The `seq_pro_score` parameter is a floating-point number that
        represents the sequence profile score of the domain.
          evalue (float): The `evalue` parameter is a floating-point number that represents the E-value
        of the domain. The E-value is a statistical measure that indicates the expected number of
        domains with a similar score or better that would occur by chance in a database of the same
        size.
          i_evalue (float): The `i_evalue` parameter is a floating-point number that represents the
        independent E-value of a domain. It is used to assess the statistical significance of the match
        between the domain and the sequence.
          domain_bias (float): The `domain_bias` parameter is a float that represents the bias score of
        a domain. It is used to measure the likelihood that a domain is present in a sequence due to
        chance rather than functional significance.
          domain_score (float): The `domain_score` parameter is a float that represents the score of the
        domain.
          seq_pro_bias (float): The `seq_pro_bias` parameter is a float value that represents the
        sequence profile bias of a domain. It is used to measure the bias in the sequence profile
        alignment.
          hmm_from (int): The `hmm_from` parameter represents the starting position of the domain in the
        HMM (Hidden Markov Model) profile. It is an integer value.
          hmm_to (int): The `hmm_to` parameter is an integer that represents the ending position of the
        hidden Markov model (HMM) alignment in the domain.
          ali_from (int): The `ali_from` parameter represents the starting position of the alignment in
        the sequence alignment. It is an integer value.
          ali_to (int): The `ali_to` parameter represents the ending position of the alignment in the
        sequence.
          exponentialized (bool): A boolean flag indicating whether the evalue and i_evalue should be
        exponentialized or not. If set to True, the evalue and i_evalue will be converted to exponential
        form. Defaults to True
        """
        super(Domain, self).__init__()
        if not isinstance(exponentialized, bool):
            raise ValueError(
                f"exponentialized mus be bool, was {type(exponentialized)}"
            )
        self.exponentialized = exponentialized
        if exponentialized:
            self.evalue = find_exp(evalue)
            self.i_evalue = find_exp(i_evalue)
        else:
            self.evalue = round(float(evalue), 1)
            self.i_evalue = round(float(i_evalue), 1)
        # some of are rounded because of differences between pyhmmer and hmmer results
        self.hmm_id = str(hmm_id)
        self.env_from = int(env_from)
        self.env_to = int(env_to)
        self.seq_pro_score = round(float(seq_pro_score), 1)
        self.domain_bias = round(float(domain_bias), 1)
        self.domain_score = round(float(domain_score), 1)
        self.seq_pro_bias = round(float(seq_pro_bias), 1)
        self.hmm_from = int(hmm_from)
        self.hmm_to = int(hmm_to)
        self.ali_from = int(ali_from)
        self.ali_to = int(ali_to)

    def all_attributes(self):
        ord_dict = OrderedDict()
        ord_dict["hmm_id"] = str(self.hmm_id)
        ord_dict["env_from"] = int(self.env_from)
        ord_dict["env_to"] = int(self.env_to)
        ord_dict["seq_pro_score"] = round(self.seq_pro_score, 1)
        ord_dict["evalue"] = round(self.evalue, 1)
        ord_dict["i_evalue"] = round(self.i_evalue, 1)
        ord_dict["domain_bias"] = round(self.domain_bias, 1)
        ord_dict["domain_score"] = round(self.domain_score, 1)
        ord_dict["seq_pro_bias"] = round(self.seq_pro_bias, 1)
        ord_dict["hmm_from"] = int(self.hmm_from)
        ord_dict["hmm_to"] = int(self.hmm_to)
        ord_dict["ali_from"] = int(self.ali_from)
        ord_dict["ali_to"] = int(self.ali_to)
        ord_dict["exponentialized"] = bool(self.exponentialized)
        return ord_dict

    def tsv_attributes(self):
        ord_dict = OrderedDict()
        ord_dict["hmm_id"] = str(self.hmm_id)
        ord_dict["env_from"] = int(self.env_from)
        ord_dict["env_to"] = int(self.env_to)
        ord_dict["seq_pro_score"] = round(self.seq_pro_score, 1)
        ord_dict["evalue"] = round(self.evalue, 1)
        ord_dict["i_evalue"] = round(self.i_evalue, 1)
        ord_dict["domain_bias"] = round(self.domain_bias, 1)
        ord_dict["domain_score"] = round(self.domain_score, 1)
        ord_dict["seq_pro_bias"] = round(self.seq_pro_bias, 1)
        ord_dict["hmm_from"] = int(self.hmm_from)
        ord_dict["hmm_to"] = int(self.hmm_to)
        ord_dict["ali_from"] = int(self.ali_from)
        ord_dict["ali_to"] = int(self.ali_to)
        ord_dict["exponentialized"] = bool(self.exponentialized)
        return ord_dict

    def get_hmm_id(self):
        return self.hmm_id

    @property
    def domain_within_threshold(self):
        """Check if a protein's domain annotation is within the threshold for inclusion (currently i_evalue based)
        Returns:
            bool: is less than or equal to the set ievalue cutoff?
        """
        if self.exponentialized:
            return self.i_evalue <= find_exp(env_vars["HMMSEARCH_IEVALUE"])
        else:
            return self.i_evalue <= env_vars["HMMSEARCH_IEVALUE"]

    def __hash__(self):
        """Used to prevent adding duplicate domains
        Returns:
            hash: hash for set()
        """
        return hash((self.hmm_id, self.env_from, self.env_to, self.i_evalue))

    def __eq__(self, other):  # pragma: no cover
        if not isinstance(other, type(self)):
            return NotImplemented
        return (
            self.hmm_id == other.hmm_id
            and self.env_from == other.env_from
            and self.env_to == other.env_to
            and self.i_evalue == other.i_evalue
        )

domain_within_threshold property

Check if a protein's domain annotation is within the threshold for inclusion (currently i_evalue based) Returns: bool: is less than or equal to the set ievalue cutoff?

__hash__()

Used to prevent adding duplicate domains Returns: hash: hash for set()

Source code in socialgene/base/molbio.py
335
336
337
338
339
340
def __hash__(self):
    """Used to prevent adding duplicate domains
    Returns:
        hash: hash for set()
    """
    return hash((self.hmm_id, self.env_from, self.env_to, self.i_evalue))

__init__(hmm_id=None, env_from=None, env_to=None, seq_pro_score=None, evalue=None, i_evalue=None, domain_bias=None, domain_score=None, seq_pro_bias=None, hmm_from=None, hmm_to=None, ali_from=None, ali_to=None, exponentialized=True, **kwargs)

Class for holding information about a domain/motif annotation

Parameters:

Name Type Description Default
hmm_id str

The hmm_id parameter is a string that represents the identifier of the hidden

None

Markov model (HMM) associated with the domain. env_from (int): The env_from parameter represents the starting position of the domain in the target sequence. env_to (int): The env_to parameter represents the end position of the environment (sequence) in the domain. It is an integer value. seq_pro_score (float): The seq_pro_score parameter is a floating-point number that represents the sequence profile score of the domain. evalue (float): The evalue parameter is a floating-point number that represents the E-value of the domain. The E-value is a statistical measure that indicates the expected number of domains with a similar score or better that would occur by chance in a database of the same size. i_evalue (float): The i_evalue parameter is a floating-point number that represents the independent E-value of a domain. It is used to assess the statistical significance of the match between the domain and the sequence. domain_bias (float): The domain_bias parameter is a float that represents the bias score of a domain. It is used to measure the likelihood that a domain is present in a sequence due to chance rather than functional significance. domain_score (float): The domain_score parameter is a float that represents the score of the domain. seq_pro_bias (float): The seq_pro_bias parameter is a float value that represents the sequence profile bias of a domain. It is used to measure the bias in the sequence profile alignment. hmm_from (int): The hmm_from parameter represents the starting position of the domain in the HMM (Hidden Markov Model) profile. It is an integer value. hmm_to (int): The hmm_to parameter is an integer that represents the ending position of the hidden Markov model (HMM) alignment in the domain. ali_from (int): The ali_from parameter represents the starting position of the alignment in the sequence alignment. It is an integer value. ali_to (int): The ali_to parameter represents the ending position of the alignment in the sequence. exponentialized (bool): A boolean flag indicating whether the evalue and i_evalue should be exponentialized or not. If set to True, the evalue and i_evalue will be converted to exponential form. Defaults to True

Source code in socialgene/base/molbio.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def __init__(
    self,
    hmm_id: str = None,
    env_from: int = None,
    env_to: int = None,
    seq_pro_score: float = None,
    evalue: float = None,
    i_evalue: float = None,
    domain_bias: float = None,
    domain_score: float = None,
    seq_pro_bias: float = None,
    hmm_from: int = None,
    hmm_to: int = None,
    ali_from: int = None,
    ali_to: int = None,
    exponentialized: bool = True,
    **kwargs,  # this kwarg isn't accessed but is here so that calling Domain with dict unpacking with extra args doesn't fail
):
    """
    Class for holding information about a domain/motif annotation

    Args:
      hmm_id (str): The `hmm_id` parameter is a string that represents the identifier of the hidden
    Markov model (HMM) associated with the domain.
      env_from (int): The `env_from` parameter represents the starting position of the domain in the
    target sequence.
      env_to (int): The `env_to` parameter represents the end position of the environment (sequence)
    in the domain. It is an integer value.
      seq_pro_score (float): The `seq_pro_score` parameter is a floating-point number that
    represents the sequence profile score of the domain.
      evalue (float): The `evalue` parameter is a floating-point number that represents the E-value
    of the domain. The E-value is a statistical measure that indicates the expected number of
    domains with a similar score or better that would occur by chance in a database of the same
    size.
      i_evalue (float): The `i_evalue` parameter is a floating-point number that represents the
    independent E-value of a domain. It is used to assess the statistical significance of the match
    between the domain and the sequence.
      domain_bias (float): The `domain_bias` parameter is a float that represents the bias score of
    a domain. It is used to measure the likelihood that a domain is present in a sequence due to
    chance rather than functional significance.
      domain_score (float): The `domain_score` parameter is a float that represents the score of the
    domain.
      seq_pro_bias (float): The `seq_pro_bias` parameter is a float value that represents the
    sequence profile bias of a domain. It is used to measure the bias in the sequence profile
    alignment.
      hmm_from (int): The `hmm_from` parameter represents the starting position of the domain in the
    HMM (Hidden Markov Model) profile. It is an integer value.
      hmm_to (int): The `hmm_to` parameter is an integer that represents the ending position of the
    hidden Markov model (HMM) alignment in the domain.
      ali_from (int): The `ali_from` parameter represents the starting position of the alignment in
    the sequence alignment. It is an integer value.
      ali_to (int): The `ali_to` parameter represents the ending position of the alignment in the
    sequence.
      exponentialized (bool): A boolean flag indicating whether the evalue and i_evalue should be
    exponentialized or not. If set to True, the evalue and i_evalue will be converted to exponential
    form. Defaults to True
    """
    super(Domain, self).__init__()
    if not isinstance(exponentialized, bool):
        raise ValueError(
            f"exponentialized mus be bool, was {type(exponentialized)}"
        )
    self.exponentialized = exponentialized
    if exponentialized:
        self.evalue = find_exp(evalue)
        self.i_evalue = find_exp(i_evalue)
    else:
        self.evalue = round(float(evalue), 1)
        self.i_evalue = round(float(i_evalue), 1)
    # some of are rounded because of differences between pyhmmer and hmmer results
    self.hmm_id = str(hmm_id)
    self.env_from = int(env_from)
    self.env_to = int(env_to)
    self.seq_pro_score = round(float(seq_pro_score), 1)
    self.domain_bias = round(float(domain_bias), 1)
    self.domain_score = round(float(domain_score), 1)
    self.seq_pro_bias = round(float(seq_pro_bias), 1)
    self.hmm_from = int(hmm_from)
    self.hmm_to = int(hmm_to)
    self.ali_from = int(ali_from)
    self.ali_to = int(ali_to)

Feature

Bases: Location

Container class for describing a feature on a locus

Source code in socialgene/base/molbio.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
class Feature(Location):
    """Container class for describing a feature on a locus"""

    __slots__ = [
        "parent",
        "uid",
        "external_id",
        "type",
        "locus_tag",
        "description",
        "note",
        "goterms",
        "partial_on_complete_genome",
        "missing_start",
        "missing_stop",
        "internal_stop",
        "partial_in_the_middle_of_a_contig",
        "missing_N_terminus",
        "missing_C_terminus",
        "frameshifted",
        "too_short_partial_abutting_assembly_gap",
        "incomplete",
        "protein",
    ]

    def __init__(
        self,
        parent=None,
        uid: str = None,
        external_id: str = None,
        type: str = None,
        locus_tag: str = None,
        description=None,
        note=None,
        goterms=None,
        partial_on_complete_genome=None,
        missing_start=None,
        missing_stop=None,
        internal_stop=None,
        partial_in_the_middle_of_a_contig=None,
        missing_N_terminus=None,
        missing_C_terminus=None,
        frameshifted=None,
        too_short_partial_abutting_assembly_gap=None,
        incomplete=None,
        **kwargs,
    ):
        """
        The above function is a constructor for a class that represents a feature on a locus, with
        various attributes and optional arguments.

        Args:
          uid (str): A string representing the hash value of the protein.
          external_id (str): The unique identifier for the protein associated with the feature.
          type (str): The "type" parameter is used to specify the type of feature. In this case, it is
        used to specify the type of the feature on a locus, such as "protein".
          locus_tag (str): The locus_tag parameter is a string that represents the unique identifier for
        a feature on a locus. It is typically used in genomics to identify a specific gene or protein
        within a genome.
          description: A description of the feature on a locus.
          note: The `note` parameter is used to provide additional information or comments about the
        feature. It is an optional parameter and can be any string value.
          goterms: The `goterms` parameter is used to store the Gene Ontology terms associated with the
        feature. Gene Ontology (GO) terms are standardized terms used to describe the function,
        location, and involvement of genes and gene products in biological processes.
          partial_on_complete_genome: A boolean flag indicating whether the feature is partial on a
        complete genome.
          missing_start: A boolean indicating whether the start of the feature is missing.
          missing_stop: A boolean flag indicating whether the stop codon of the feature is missing.
          internal_stop: A boolean flag indicating whether the feature has an internal stop codon.
          partial_in_the_middle_of_a_contig: This parameter is used to indicate whether the feature is
        partial and located in the middle of a contig.
          missing_N_terminus: This parameter is used to indicate whether the N-terminus (the starting
        end) of the feature is missing. It is a boolean value, where True indicates that the N-terminus
        is missing and False indicates that it is not missing.
          missing_C_terminus: The parameter "missing_C_terminus" is used to indicate whether the feature
        is missing the C-terminus (the end) of the protein sequence. It is a boolean value that can be
        set to True or False.
          frameshifted: The "frameshifted" parameter is a boolean flag that indicates whether the
        feature has a frameshift mutation. A frameshift mutation occurs when the reading frame of a gene
        is disrupted by the insertion or deletion of nucleotides, causing a shift in the codon reading
        frame and potentially altering the amino
          too_short_partial_abutting_assembly_gap: This parameter is used to indicate whether the
        feature is too short and abuts an assembly gap.
          incomplete: A boolean flag indicating whether the feature is incomplete.
        """
        super().__init__(**kwargs)
        self.parent = parent
        self.uid = uid
        self.external_id = external_id
        self.type = type
        self.locus_tag = locus_tag
        self.description = description
        self.note = note
        self.goterms = goterms
        self.partial_on_complete_genome = partial_on_complete_genome
        self.missing_start = missing_start
        self.missing_stop = missing_stop
        self.internal_stop = internal_stop
        self.partial_in_the_middle_of_a_contig = partial_in_the_middle_of_a_contig
        self.missing_N_terminus = missing_N_terminus
        self.missing_C_terminus = missing_C_terminus
        self.frameshifted = frameshifted
        self.too_short_partial_abutting_assembly_gap = (
            too_short_partial_abutting_assembly_gap
        )
        self.incomplete = incomplete
        if self.feature_is_protein():
            # Check if there is a SocialGene object in the parent chain
            # If there is, link the feature protein attribute to the SocialGene protein object
            sg = None
            current_object = self
            for _ in range(1, 100):
                if (
                    f"{current_object.__class__.__module__}.{current_object.__class__.__name__}"
                    == "socialgene.base.socialgene.SocialGene"
                ):
                    sg = current_object
                    break
                else:
                    if hasattr(current_object, "parent"):
                        current_object = current_object.parent
            if sg:
                if self.uid in sg.proteins:
                    self.protein = sg.proteins[self.uid]

    def all_attributes(self):
        return {s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)}

    def feature_is_protein(self):
        """Check if the feature "is a protein"

        Returns:
            bool: True if socialgene thinks it's a protein, False if not
        """
        # types of genbank file features to classify as a "protein"
        types_list = ["protein", "CDS"]
        return any([True for i in types_list if i == self.type])

    @property
    def fasta_string_defline_uid(self):
        try:
            if self.protein:
                return f">{self.protein.uid}\n{self.protein.sequence}\n"
        except AttributeError:
            log.debug(f"Erorr in fasta_string_defline_uid for {self}")

    @property
    def fasta_string_defline_external_id(self):
        try:
            if self.protein:
                return f">{self.protein.external_id}\n{self.protein.sequence}\n"
        except AttributeError:
            log.debug(f"Erorr in fasta_string_defline_external_id for {self}")

    def __hash__(self):
        """Used to prevent adding duplicate features to a locus (for hash in set() in Assembly.add_locus())

        Returns:
            hash: hash for set()
        """
        return hash((self.end, self.start, self.uid, self.strand, self.type))

    def __eq__(self, other):
        """Used for set() in Assembly.add_locus()"""
        if not isinstance(other, type(self)):
            return NotImplemented
        return (
            self.parent == other.parent
            and self.end == other.end
            and self.start == other.start
            and self.uid == other.uid
            and self.strand == other.strand
            and self.type == other.type
        )

    def __lt__(self, other):
        return self.start < other.start and self.parent == other.parent

__eq__(other)

Used for set() in Assembly.add_locus()

Source code in socialgene/base/molbio.py
670
671
672
673
674
675
676
677
678
679
680
681
def __eq__(self, other):
    """Used for set() in Assembly.add_locus()"""
    if not isinstance(other, type(self)):
        return NotImplemented
    return (
        self.parent == other.parent
        and self.end == other.end
        and self.start == other.start
        and self.uid == other.uid
        and self.strand == other.strand
        and self.type == other.type
    )

__hash__()

Used to prevent adding duplicate features to a locus (for hash in set() in Assembly.add_locus())

Returns:

Name Type Description
hash

hash for set()

Source code in socialgene/base/molbio.py
662
663
664
665
666
667
668
def __hash__(self):
    """Used to prevent adding duplicate features to a locus (for hash in set() in Assembly.add_locus())

    Returns:
        hash: hash for set()
    """
    return hash((self.end, self.start, self.uid, self.strand, self.type))

__init__(parent=None, uid=None, external_id=None, type=None, locus_tag=None, description=None, note=None, goterms=None, partial_on_complete_genome=None, missing_start=None, missing_stop=None, internal_stop=None, partial_in_the_middle_of_a_contig=None, missing_N_terminus=None, missing_C_terminus=None, frameshifted=None, too_short_partial_abutting_assembly_gap=None, incomplete=None, **kwargs)

The above function is a constructor for a class that represents a feature on a locus, with various attributes and optional arguments.

Parameters:

Name Type Description Default
uid str

A string representing the hash value of the protein.

None
external_id str

The unique identifier for the protein associated with the feature.

None
type str

The "type" parameter is used to specify the type of feature. In this case, it is

None

used to specify the type of the feature on a locus, such as "protein". locus_tag (str): The locus_tag parameter is a string that represents the unique identifier for a feature on a locus. It is typically used in genomics to identify a specific gene or protein within a genome. description: A description of the feature on a locus. note: The note parameter is used to provide additional information or comments about the feature. It is an optional parameter and can be any string value. goterms: The goterms parameter is used to store the Gene Ontology terms associated with the feature. Gene Ontology (GO) terms are standardized terms used to describe the function, location, and involvement of genes and gene products in biological processes. partial_on_complete_genome: A boolean flag indicating whether the feature is partial on a complete genome. missing_start: A boolean indicating whether the start of the feature is missing. missing_stop: A boolean flag indicating whether the stop codon of the feature is missing. internal_stop: A boolean flag indicating whether the feature has an internal stop codon. partial_in_the_middle_of_a_contig: This parameter is used to indicate whether the feature is partial and located in the middle of a contig. missing_N_terminus: This parameter is used to indicate whether the N-terminus (the starting end) of the feature is missing. It is a boolean value, where True indicates that the N-terminus is missing and False indicates that it is not missing. missing_C_terminus: The parameter "missing_C_terminus" is used to indicate whether the feature is missing the C-terminus (the end) of the protein sequence. It is a boolean value that can be set to True or False. frameshifted: The "frameshifted" parameter is a boolean flag that indicates whether the feature has a frameshift mutation. A frameshift mutation occurs when the reading frame of a gene is disrupted by the insertion or deletion of nucleotides, causing a shift in the codon reading frame and potentially altering the amino too_short_partial_abutting_assembly_gap: This parameter is used to indicate whether the feature is too short and abuts an assembly gap. incomplete: A boolean flag indicating whether the feature is incomplete.

Source code in socialgene/base/molbio.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
def __init__(
    self,
    parent=None,
    uid: str = None,
    external_id: str = None,
    type: str = None,
    locus_tag: str = None,
    description=None,
    note=None,
    goterms=None,
    partial_on_complete_genome=None,
    missing_start=None,
    missing_stop=None,
    internal_stop=None,
    partial_in_the_middle_of_a_contig=None,
    missing_N_terminus=None,
    missing_C_terminus=None,
    frameshifted=None,
    too_short_partial_abutting_assembly_gap=None,
    incomplete=None,
    **kwargs,
):
    """
    The above function is a constructor for a class that represents a feature on a locus, with
    various attributes and optional arguments.

    Args:
      uid (str): A string representing the hash value of the protein.
      external_id (str): The unique identifier for the protein associated with the feature.
      type (str): The "type" parameter is used to specify the type of feature. In this case, it is
    used to specify the type of the feature on a locus, such as "protein".
      locus_tag (str): The locus_tag parameter is a string that represents the unique identifier for
    a feature on a locus. It is typically used in genomics to identify a specific gene or protein
    within a genome.
      description: A description of the feature on a locus.
      note: The `note` parameter is used to provide additional information or comments about the
    feature. It is an optional parameter and can be any string value.
      goterms: The `goterms` parameter is used to store the Gene Ontology terms associated with the
    feature. Gene Ontology (GO) terms are standardized terms used to describe the function,
    location, and involvement of genes and gene products in biological processes.
      partial_on_complete_genome: A boolean flag indicating whether the feature is partial on a
    complete genome.
      missing_start: A boolean indicating whether the start of the feature is missing.
      missing_stop: A boolean flag indicating whether the stop codon of the feature is missing.
      internal_stop: A boolean flag indicating whether the feature has an internal stop codon.
      partial_in_the_middle_of_a_contig: This parameter is used to indicate whether the feature is
    partial and located in the middle of a contig.
      missing_N_terminus: This parameter is used to indicate whether the N-terminus (the starting
    end) of the feature is missing. It is a boolean value, where True indicates that the N-terminus
    is missing and False indicates that it is not missing.
      missing_C_terminus: The parameter "missing_C_terminus" is used to indicate whether the feature
    is missing the C-terminus (the end) of the protein sequence. It is a boolean value that can be
    set to True or False.
      frameshifted: The "frameshifted" parameter is a boolean flag that indicates whether the
    feature has a frameshift mutation. A frameshift mutation occurs when the reading frame of a gene
    is disrupted by the insertion or deletion of nucleotides, causing a shift in the codon reading
    frame and potentially altering the amino
      too_short_partial_abutting_assembly_gap: This parameter is used to indicate whether the
    feature is too short and abuts an assembly gap.
      incomplete: A boolean flag indicating whether the feature is incomplete.
    """
    super().__init__(**kwargs)
    self.parent = parent
    self.uid = uid
    self.external_id = external_id
    self.type = type
    self.locus_tag = locus_tag
    self.description = description
    self.note = note
    self.goterms = goterms
    self.partial_on_complete_genome = partial_on_complete_genome
    self.missing_start = missing_start
    self.missing_stop = missing_stop
    self.internal_stop = internal_stop
    self.partial_in_the_middle_of_a_contig = partial_in_the_middle_of_a_contig
    self.missing_N_terminus = missing_N_terminus
    self.missing_C_terminus = missing_C_terminus
    self.frameshifted = frameshifted
    self.too_short_partial_abutting_assembly_gap = (
        too_short_partial_abutting_assembly_gap
    )
    self.incomplete = incomplete
    if self.feature_is_protein():
        # Check if there is a SocialGene object in the parent chain
        # If there is, link the feature protein attribute to the SocialGene protein object
        sg = None
        current_object = self
        for _ in range(1, 100):
            if (
                f"{current_object.__class__.__module__}.{current_object.__class__.__name__}"
                == "socialgene.base.socialgene.SocialGene"
            ):
                sg = current_object
                break
            else:
                if hasattr(current_object, "parent"):
                    current_object = current_object.parent
        if sg:
            if self.uid in sg.proteins:
                self.protein = sg.proteins[self.uid]

feature_is_protein()

Check if the feature "is a protein"

Returns:

Name Type Description
bool

True if socialgene thinks it's a protein, False if not

Source code in socialgene/base/molbio.py
636
637
638
639
640
641
642
643
644
def feature_is_protein(self):
    """Check if the feature "is a protein"

    Returns:
        bool: True if socialgene thinks it's a protein, False if not
    """
    # types of genbank file features to classify as a "protein"
    types_list = ["protein", "CDS"]
    return any([True for i in types_list if i == self.type])

FeatureCollection

Source code in socialgene/base/molbio.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
class FeatureCollection:
    __slots__ = [
        "parent",
        "features",
    ]

    def add_feature(self, **kwargs):
        """Add a feature to a locus"""
        self.features.add(Feature(parent=self, **kwargs))

    def all_attributes(self):
        return {s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)}

    @property
    def feature_uid_set(self):
        return {i.uid for i in self.features}

    @property
    def features_sorted_by_midpoint(self):
        """Sorts features by mid-coordinate of each feature"""
        for i in sorted(list(self.features), key=lambda i: int((i.end + i.start) / 2)):
            yield i

    def drop_non_protein_features(self):
        self.features = {i for i in self.features if i.type == "protein"}

    def get_feature_by_uid(self, uid):
        for k, v in self.features.items():
            if v.uid == uid:
                return v

    @property
    def proteins(self):
        sg = None
        current_object = self
        for i in range(1, 100):
            if (
                str(type(current_object))
                == "<class 'socialgene.base.socialgene.SocialGene'>"
            ):
                sg = current_object
                break
            else:
                current_object = current_object.parent
        return {
            i: sg.proteins.get(i, None)
            for i in {i.uid for i in self.features}
            if i in sg.proteins
        }

    @property
    def protein_iter(self):
        for i in self.proteins.values():
            yield i

    @property
    def fasta_string_defline_uid(self):
        for v in self.proteins.values():
            yield f">{v.uid}\n{v.sequence}\n"

    @property
    def fasta_string_defline_external_id(self):
        for v in self.proteins.values():
            yield f">{v.external_id}\n{v.sequence}\n"

    def _neo4j_node(self, **kwargs):
        n = NUCLEOTIDE_NODE()
        dict_to_add = {"uid": self.uid, "external_id": self.external_id}
        dict_to_add = dict_to_add | dict(self.metadata.all_attributes())
        n.fill_from_dict(dict_to_add)
        return n

    def add_to_neo4j(self):
        # create node
        n = self._neo4j_node()
        n.add_to_neo4j(create=False)
        protein_nodes = set()
        encodes_rels = set()
        # connect to proteins
        for i in self.features:
            protein_nodes.add(i.protein._neo4j_node())
            temp_rel = ENCODES_REL(start=n, end=i.protein._neo4j_node())
            properties = {
                k: v
                for k, v in i.all_attributes().items()
                if k in ENCODES_REL.property_specification
            }
            properties = properties | {
                "start": i.start,
                "end": i.end,
                "strand": i.strand,
            }
            temp_rel.fill_from_dict(properties)
            encodes_rels.add(temp_rel)
        if protein_nodes:
            protein_nodes = list(protein_nodes)
            protein_nodes[0].add_multiple_to_neo4j(protein_nodes, create=False)
        if encodes_rels:
            encodes_rels = list(encodes_rels)
            encodes_rels[0].add_multiple_to_neo4j(encodes_rels, create=False)
        # connect to assembly
        assembly_node = self.parent._neo4j_node()
        assembly_node.add_to_neo4j(create=False)
        ASSEMBLES_TO_REL(start=n, end=assembly_node).add_to_neo4j(create=False)

    def write_genbank(self, outpath, start=float("-inf"), end=float("inf")):
        record = SeqRecord(
            Seq(""),
            id=self.external_id,
            name=self.external_id,
            description="A GenBank file generated by SocialGene.",
            dbxrefs=[f"Assembly:{self.parent.uid}"],
        )
        # Add annotation
        for feature in self.features_sorted_by_midpoint:
            if start:
                if int(feature.start) < int(start):
                    continue
            if end:
                if int(feature.end) > int(end):
                    continue
            biofeat = SeqFeature(
                FeatureLocation(
                    start=feature.start
                    - 1,  # biopython modifies the start position by 1
                    end=feature.end,
                    strand=feature.strand,
                ),
                type=feature.type,
                qualifiers={
                    k: v
                    for k, v in feature.all_attributes().items()
                    if v and k != "parent"
                }
                | {"translation": self.protein.sequence},
            )
            record.features.append(biofeat)
        record.annotations["molecule_type"] = "DNA"
        SeqIO.write(
            record,
            outpath,
            "genbank",
        )

features_sorted_by_midpoint property

Sorts features by mid-coordinate of each feature

add_feature(**kwargs)

Add a feature to a locus

Source code in socialgene/base/molbio.py
693
694
695
def add_feature(self, **kwargs):
    """Add a feature to a locus"""
    self.features.add(Feature(parent=self, **kwargs))

Location

Source code in socialgene/base/molbio.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class Location:
    __slots__ = ["start", "end", "strand"]
    # TODO: handle zero indexing

    def __init__(
        self,
        start: int = None,
        end: int = None,
        strand: int = None,
        **kwargs,  # this kwarg isn't accessed but is here so that calling Location with dict unpacking with extra args doesn't fail
    ):
        """Class describing genomic coordinates and strand direction.

        Args:
            start (int, optional): start coordinate
            end (int, optional): end coordinate
            strand (int, optional): DNA strand
        """
        self.start = start
        self.end = end
        self.strand = strand

    def all_attributes(self):
        return {s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)}

__init__(start=None, end=None, strand=None, **kwargs)

Class describing genomic coordinates and strand direction.

Parameters:

Name Type Description Default
start int

start coordinate

None
end int

end coordinate

None
strand int

DNA strand

None
Source code in socialgene/base/molbio.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def __init__(
    self,
    start: int = None,
    end: int = None,
    strand: int = None,
    **kwargs,  # this kwarg isn't accessed but is here so that calling Location with dict unpacking with extra args doesn't fail
):
    """Class describing genomic coordinates and strand direction.

    Args:
        start (int, optional): start coordinate
        end (int, optional): end coordinate
        strand (int, optional): DNA strand
    """
    self.start = start
    self.end = end
    self.strand = strand

Locus

Bases: FeatureCollection

Container holding a set() of genomic features

Source code in socialgene/base/molbio.py
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
class Locus(FeatureCollection):
    """Container holding a set() of genomic features"""

    __slots__ = [
        "parent",
        "features",
        "metadata",
        "external_id",
        "uid",
        "gene_clusters",
    ]

    def __init__(self, parent=None, external_id=None):
        super().__init__()
        self.parent = parent
        self.features = set()
        self.metadata = LocusAssemblyMetadata()
        self.external_id = external_id
        self.uid = self.calc_uid()
        self.gene_clusters = list()

    def calc_uid(self):
        return hasher.hasher(f"{self.parent.uid}___{self.external_id}")

    def add_bgcs_by_feature(self, features, **kwargs):
        if not all([isinstance(i, Feature) for i in features]):
            raise ValueError(
                f"All features must be of type Feature, not {[type(i) for i in features if not isinstance(i, Feature)]}"
            )
        self.gene_clusters.append(GeneCluster(features, parent=self, **kwargs))

    def add_bgcs_by_start_end(self, start, end, **kwargs):
        features = {i for i in self.features if i.start >= start and i.end <= end}
        if features:
            self.add_bgcs_by_feature(features=features, **kwargs)

    def write_genbank(self, outpath, start=None, end=None):
        record = SeqRecord(
            Seq(""),
            id=self.external_id,
            name=self.external_id,
            description="A GenBank file generated by SocialGene.",
            dbxrefs=[f"Assembly:{self.parent.uid}"],
        )
        # Add annotation
        for feature in self.features_sorted_by_midpoint:
            if start:
                if int(feature.start) < int(start):
                    continue
            if end:
                if int(feature.end) > int(end):
                    continue
            if feature.type == "protein":
                feat_type = "CDS"
            else:
                feat_type = feature.type
            biofeat = SeqFeature(
                FeatureLocation(
                    start=feature.start - 1,
                    end=feature.end,
                    strand=feature.strand,
                ),
                type=feat_type,
                qualifiers={
                    k: v
                    for k, v in feature.all_attributes().items()
                    if v and k != "parent"
                }
                | {"translation": self.parent.parent.proteins[feature.uid].sequence},
            )
            record.features.append(biofeat)
        record.annotations["molecule_type"] = "DNA"
        SeqIO.write(
            record,
            outpath,
            "genbank",
        )

Molbio

Class for inheriting by SocialGene()

Source code in socialgene/base/molbio.py
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
class Molbio:
    """Class for inheriting by SocialGene()"""

    def __init__(self):
        super().__init__()  # TODO: ?

        self.assemblies = {}  # TODO: ?
        self.proteins = {}  # TODO: ?

    def get_all_feature_uids(self):
        """Return a list of all proteins uids

        Returns:
            list: List of all proteins uids
        """
        return [i.uid for i in self.proteins.values()]

    def add_protein(
        self,
        return_uid=True,
        **kwargs,
    ):
        """Add a protein to the protein dictionary

        Args:
            no_return (bool, optional): Whether the function should return the protein's uid. Defaults to False.

        Returns:
            str: Protein's hash
        """
        temp_protein = Protein(**kwargs)
        # only add protein if it doesn't already exist
        if temp_protein.uid not in self.proteins:
            # deepcopy teo ensure instances aren't shared
            self.proteins[temp_protein.uid] = temp_protein
        if return_uid:
            return temp_protein.uid

    def add_assembly(self, uid: str = None, parent=None):
        """Add an assembly to a SocialGene object

        Args:
            uid (str, optional): Assembly identifier, should be unique across parsed input. Defaults to None.
        """
        if uid is None:
            uid = str(uuid4())
        if uid not in self.assemblies:
            self.assemblies[uid] = Assembly(uid=uid, parent=parent)
        else:
            log.debug(f"{uid} already present")

add_assembly(uid=None, parent=None)

Add an assembly to a SocialGene object

Parameters:

Name Type Description Default
uid str

Assembly identifier, should be unique across parsed input. Defaults to None.

None
Source code in socialgene/base/molbio.py
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
def add_assembly(self, uid: str = None, parent=None):
    """Add an assembly to a SocialGene object

    Args:
        uid (str, optional): Assembly identifier, should be unique across parsed input. Defaults to None.
    """
    if uid is None:
        uid = str(uuid4())
    if uid not in self.assemblies:
        self.assemblies[uid] = Assembly(uid=uid, parent=parent)
    else:
        log.debug(f"{uid} already present")

add_protein(return_uid=True, **kwargs)

Add a protein to the protein dictionary

Parameters:

Name Type Description Default
no_return bool

Whether the function should return the protein's uid. Defaults to False.

required

Returns:

Name Type Description
str

Protein's hash

Source code in socialgene/base/molbio.py
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
def add_protein(
    self,
    return_uid=True,
    **kwargs,
):
    """Add a protein to the protein dictionary

    Args:
        no_return (bool, optional): Whether the function should return the protein's uid. Defaults to False.

    Returns:
        str: Protein's hash
    """
    temp_protein = Protein(**kwargs)
    # only add protein if it doesn't already exist
    if temp_protein.uid not in self.proteins:
        # deepcopy teo ensure instances aren't shared
        self.proteins[temp_protein.uid] = temp_protein
    if return_uid:
        return temp_protein.uid

get_all_feature_uids()

Return a list of all proteins uids

Returns:

Name Type Description
list

List of all proteins uids

Source code in socialgene/base/molbio.py
1131
1132
1133
1134
1135
1136
1137
def get_all_feature_uids(self):
    """Return a list of all proteins uids

    Returns:
        list: List of all proteins uids
    """
    return [i.uid for i in self.proteins.values()]

Protein

Bases: ProteinSequence

Container class for describing a single protein

Source code in socialgene/base/molbio.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
class Protein(
    ProteinSequence,
):
    """Container class for describing a single protein"""

    # seqlen is included as a variable so it can be provided (e.g. a sequence isn't provided and proteins are created manually)
    __slots__ = ["description", "external_id", "domains"]

    def __init__(
        self,
        description: str = None,
        external_id: str = None,
        domains: set = None,
        *args,
        **kwargs,
    ):
        """_summary_

        Args:
            description (str, optional): Protein description.
            external_id (str, optional): Non-hash-id descriptor (usually a database accession, e.g. NCBI's).
            seqlen (int, optional): Amino acid sequence length
            domains (Set, optional): Set of Domain() objects.
        """
        super().__init__(*args, **kwargs)
        self.description = description
        self.external_id = external_id
        self.domains = domains if domains is not None else set()

    def __eq__(self, other):  # pragma: no cover
        if not isinstance(other, type(self)):
            return NotImplemented
        return self.uid == other.uid

    def __hash__(self):
        return hash(self.uid)

    def all_attributes(self):
        return {
            s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)
        } | {"seq_len": self.seq_len}

    def add_domain(
        self,
        *args,
        **kwargs,
    ):
        """Add a domain to a protein if it meets threshold (ievalue)

        Returns:
            bool: used for counting # of domains added to protein
        """
        self.domains.add(Domain(*args, **kwargs))

    def add_domains_from_neo4j(self):
        try:
            with GraphDriver() as db:
                for i in db.run(
                    """
                    MATCH (p1:protein)<-[a1:ANNOTATES]-(h1:hmm)
                    where p1.uid in ["_xvHFg1H1f43WxPcZT1P8Qbcx60chSxh"]
                    RETURN apoc.map.merge({hmm_id:h1.uid}, properties(a1))
                    """,
                    uid=str(self.uid),
                ):
                    self.add_domain(**i.value())
        except Exception:
            log.debug(f"Error trying to retrieve domains for {self.uid}")

    def add_sequences_from_neo4j(self):
        try:
            with GraphDriver() as db:
                for i in db.run(
                    """
                    MATCH (p1:protein {uid: $uid})
                    RETURN p1.sequence as sequence
                    """,
                    uid=str(self.uid),
                ):
                    if x := i.value():
                        self.sequence = x
        except Exception:
            log.debug(f"Error trying to retrieve sequences for {self.uid}")

    @property
    def domain_list_sorted_by_mean_envelope_position(self):
        # (ie can't sort a set())
        # 'if' is to check whether domains is None
        # A tuple is passed so that the HMM annnotations are first sorted by the envelope midpoint
        # then ties are sorted alphabetically based on hmm_id for consistency
        return sorted(
            list(self.domains),
            key=lambda tx: ((tx.env_from + tx.env_to) / 2, tx.hmm_id),
        )

    @property
    def domain_vector(
        self,
    ):
        """Get the domain uids for a protein as an ordered list

        list: list of domain uids
        """
        if not self.domains:
            log.debug(f"Tried to get domains from domain-less protein {self}")
            return []
        return [
            i.get_hmm_id() for i in self.domain_list_sorted_by_mean_envelope_position
        ]

    def filter_domains(self):
        """Prune all domains in all proteins that don't meet the inclusion threshold (currently HMMER's i_evalue)"""
        _before_count = len(self.domains)
        temp = []
        self.domains = {i for i in self.domains if i.domain_within_threshold}

        del temp
        log.debug(
            f"Removed {str(_before_count - len(self.domains))} domains from {self.external_id}"
        )

    @property
    def fasta_string_defline_uid(self):
        return f">{self.uid}\n{self.sequence}\n"

    @property
    def fasta_string_defline_external_id(self):
        return f">{self.external_id}\n{self.sequence}\n"

    def _neo4j_node(self, include_sequences=True) -> PROTEIN_NODE:
        n = PROTEIN_NODE()
        dict_to_add = {"uid": self.uid, "crc64": self.crc64}
        if include_sequences:
            dict_to_add["sequence"] = self.sequence
        n.fill_from_dict(dict_to_add)
        return n

    def add_to_neo4j(self, include_sequences=True):
        n = self._neo4j_node(include_sequences=include_sequences)
        n.add_to_neo4j(create=False)
        for i in self.domains:
            hmmnode = HMM_NODE()
            hmmnode.fill_from_dict({"uid": i.hmm_id})
            rel = ANNOTATES_REL(start=hmmnode, end=self._neo4j_node())
            rel.fill_from_dict(
                {
                    k: v
                    for k, v in i.all_attributes().items()
                    if k in ANNOTATES_REL.property_specification
                }
            )
            rel.add_to_neo4j(create=False)

domain_vector property

Get the domain uids for a protein as an ordered list

list: list of domain uids

__init__(description=None, external_id=None, domains=None, *args, **kwargs)

summary

Parameters:

Name Type Description Default
description str

Protein description.

None
external_id str

Non-hash-id descriptor (usually a database accession, e.g. NCBI's).

None
seqlen int

Amino acid sequence length

required
domains Set

Set of Domain() objects.

None
Source code in socialgene/base/molbio.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def __init__(
    self,
    description: str = None,
    external_id: str = None,
    domains: set = None,
    *args,
    **kwargs,
):
    """_summary_

    Args:
        description (str, optional): Protein description.
        external_id (str, optional): Non-hash-id descriptor (usually a database accession, e.g. NCBI's).
        seqlen (int, optional): Amino acid sequence length
        domains (Set, optional): Set of Domain() objects.
    """
    super().__init__(*args, **kwargs)
    self.description = description
    self.external_id = external_id
    self.domains = domains if domains is not None else set()

add_domain(*args, **kwargs)

Add a domain to a protein if it meets threshold (ievalue)

Returns:

Name Type Description
bool

used for counting # of domains added to protein

Source code in socialgene/base/molbio.py
395
396
397
398
399
400
401
402
403
404
405
def add_domain(
    self,
    *args,
    **kwargs,
):
    """Add a domain to a protein if it meets threshold (ievalue)

    Returns:
        bool: used for counting # of domains added to protein
    """
    self.domains.add(Domain(*args, **kwargs))

filter_domains()

Prune all domains in all proteins that don't meet the inclusion threshold (currently HMMER's i_evalue)

Source code in socialgene/base/molbio.py
463
464
465
466
467
468
469
470
471
472
def filter_domains(self):
    """Prune all domains in all proteins that don't meet the inclusion threshold (currently HMMER's i_evalue)"""
    _before_count = len(self.domains)
    temp = []
    self.domains = {i for i in self.domains if i.domain_within_threshold}

    del temp
    log.debug(
        f"Removed {str(_before_count - len(self.domains))} domains from {self.external_id}"
    )

ProteinSequence

Class used for working with protein sequences and can be initialized with either a sequence or a uid.

Source code in socialgene/base/molbio.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class ProteinSequence:
    """Class used for working with protein sequences and can be initialized with either a sequence or a uid."""

    _amino_acids = [
        "A",
        "R",
        "N",
        "D",
        "C",
        "Q",
        "E",
        "G",
        "H",
        "I",
        "L",
        "K",
        "M",
        "F",
        "P",
        "S",
        "T",
        "W",
        "Y",
        "V",
        "X",
        "Z",
        "J",
        "U",
        "B",
        "O",
        "*",
    ]
    __slots__ = ["uid", "__crc64", "__md5", "_seq_len", "sequence"]

    def __init__(self, sequence: str = None, uid: str = None, seq_len: int = None):
        """Class for holding an amino acid sequence (protein)

        Args:
            sequence (str, optional): amino acid sequence
            uid (str, optional): a uid can be provided if a sequence isn't

        Raises:
            ValueError: Must provide either a sequence or a uid
        """
        # the sequence input variable has a default so that domtblout can create a socialgene object
        self.sequence = sequence
        if isinstance(self.sequence, type(None)):
            if isinstance(uid, type(None)):
                raise ValueError("Must provide either a sequence or a uid")
            else:
                self.uid = uid
        else:
            self._assign_hash()
        if seq_len:
            self._seq_len = seq_len

    @property
    def seq_len(self):
        """Return the length of the protein

        Returns:
            int: number of amino acids
        """
        return len(self.sequence)

    @seq_len.setter
    def seq_len(self, value):
        if isinstance(value, int):
            self._seq_len = value
        else:
            self._seq_len = len(self.sequence)

    def all_attributes(self):
        """
        The function returns a dictionary containing the attributes of an object.

        Returns:
          The `__dict__` method is returning a dictionary that contains the names and values of all the
        attributes of the object. The attributes are obtained using the `getattr` function and are filtered
        to only include attributes that are defined in the `__slots__` list and that exist in the object.
        The dictionary is sorted based on the names of the attributes.
        """
        return {s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)}

    def _one_letter_amino_acids(self):
        """Create an ordered dictionary of amino acids. Used to count AAs in a protein sequence.

        Returns:
            dict: amino acid count
        """
        return OrderedDict({i: 0 for i in self._amino_acids})

    def _amino_acid_count(self):
        """Create a '-' separated string of an amino acid count

        Returns:
            str: sequence of amino acids
        """
        if self.sequence is None:
            return "-".join(["0" for i in self._one_letter_amino_acids()])
        else:
            return "-".join(
                [str(self.sequence.count(i)) for i in self._one_letter_amino_acids()]
            )

    def _assign_hash(self):
        """
        The function assigns a hash value to a sequence of amino acids.
        """
        self._standardize_sequence()
        self.uid = hasher.hash_aminos(self.sequence)

    @property
    def crc64(self):
        return hasher.hash_aminos(self.sequence, algo="crc64")

    @property
    def md5(self):
        return hasher.hash_aminos(self.sequence, algo="md5")

    def _standardize_sequence(self):
        """
        The function converts the protein sequence to uppercase and checks if all characters are valid
        amino acids, raising an error if an unknown character is found.
        """
        self.sequence = self.sequence.upper()
        if not all([i in self._amino_acids for i in set(self.sequence)]):
            log.error(self.sequence)
            raise ValueError("Unknown character/letter in protein sequence")

seq_len property writable

Return the length of the protein

Returns:

Name Type Description
int

number of amino acids

__init__(sequence=None, uid=None, seq_len=None)

Class for holding an amino acid sequence (protein)

Parameters:

Name Type Description Default
sequence str

amino acid sequence

None
uid str

a uid can be provided if a sequence isn't

None

Raises:

Type Description
ValueError

Must provide either a sequence or a uid

Source code in socialgene/base/molbio.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(self, sequence: str = None, uid: str = None, seq_len: int = None):
    """Class for holding an amino acid sequence (protein)

    Args:
        sequence (str, optional): amino acid sequence
        uid (str, optional): a uid can be provided if a sequence isn't

    Raises:
        ValueError: Must provide either a sequence or a uid
    """
    # the sequence input variable has a default so that domtblout can create a socialgene object
    self.sequence = sequence
    if isinstance(self.sequence, type(None)):
        if isinstance(uid, type(None)):
            raise ValueError("Must provide either a sequence or a uid")
        else:
            self.uid = uid
    else:
        self._assign_hash()
    if seq_len:
        self._seq_len = seq_len

all_attributes()

The function returns a dictionary containing the attributes of an object.

Returns:

Type Description

The __dict__ method is returning a dictionary that contains the names and values of all the

attributes of the object. The attributes are obtained using the getattr function and are filtered to only include attributes that are defined in the __slots__ list and that exist in the object. The dictionary is sorted based on the names of the attributes.

Source code in socialgene/base/molbio.py
 98
 99
100
101
102
103
104
105
106
107
108
def all_attributes(self):
    """
    The function returns a dictionary containing the attributes of an object.

    Returns:
      The `__dict__` method is returning a dictionary that contains the names and values of all the
    attributes of the object. The attributes are obtained using the `getattr` function and are filtered
    to only include attributes that are defined in the `__slots__` list and that exist in the object.
    The dictionary is sorted based on the names of the attributes.
    """
    return {s: getattr(self, s) for s in sorted(self.__slots__) if hasattr(self, s)}

Taxonomy

Class is a reserved word so just underscore all ranks to be consistent

Source code in socialgene/base/molbio.py
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
class Taxonomy:
    "Class is a reserved word so just underscore all ranks to be consistent"
    __slots__ = [
        "species_",
        "genus_",
        "family_",
        "order_",
        "class_",
        "phylum_",
        "clade_",
        "superkingdom_",
    ]

    def __init__(
        self,
        species_: str = None,
        genus_: str = None,
        family_: str = None,
        order_: str = None,
        class_: str = None,
        phylum_: str = None,
        clade_: str = None,
        superkingdom_: str = None,
        **kwargs,
    ) -> None:
        self.species_ = species_
        self.genus_ = genus_
        self.family_ = family_
        self.order_ = order_
        self.class_ = class_
        self.phylum_ = phylum_
        self.clade_ = clade_
        self.superkingdom_ = superkingdom_